Export the hypervisor.ValidateParameters over RPC
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement ExpandNames
52     - implement CheckPrereq
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements:
57         REQ_MASTER: the LU needs to run on the master node
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_MASTER = True
67   REQ_BGL = True
68
69   def __init__(self, processor, op, context, rpc):
70     """Constructor for LogicalUnit.
71
72     This needs to be overriden in derived classes in order to check op
73     validity.
74
75     """
76     self.proc = processor
77     self.op = op
78     self.cfg = context.cfg
79     self.context = context
80     self.rpc = rpc
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90
91     for attr_name in self._OP_REQP:
92       attr_val = getattr(op, attr_name, None)
93       if attr_val is None:
94         raise errors.OpPrereqError("Required parameter '%s' missing" %
95                                    attr_name)
96
97     if not self.cfg.IsCluster():
98       raise errors.OpPrereqError("Cluster not initialized yet,"
99                                  " use 'gnt-cluster init' first.")
100     if self.REQ_MASTER:
101       master = self.cfg.GetMasterNode()
102       if master != utils.HostInfo().name:
103         raise errors.OpPrereqError("Commands must be run on the master"
104                                    " node %s" % master)
105
106   def __GetSSH(self):
107     """Returns the SshRunner object
108
109     """
110     if not self.__ssh:
111       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
112     return self.__ssh
113
114   ssh = property(fget=__GetSSH)
115
116   def ExpandNames(self):
117     """Expand names for this LU.
118
119     This method is called before starting to execute the opcode, and it should
120     update all the parameters of the opcode to their canonical form (e.g. a
121     short node name must be fully expanded after this method has successfully
122     completed). This way locking, hooks, logging, ecc. can work correctly.
123
124     LUs which implement this method must also populate the self.needed_locks
125     member, as a dict with lock levels as keys, and a list of needed lock names
126     as values. Rules:
127       - Use an empty dict if you don't need any lock
128       - If you don't need any lock at a particular level omit that level
129       - Don't put anything for the BGL level
130       - If you want all locks at a level use locking.ALL_SET as a value
131
132     If you need to share locks (rather than acquire them exclusively) at one
133     level you can modify self.share_locks, setting a true value (usually 1) for
134     that level. By default locks are not shared.
135
136     Examples:
137     # Acquire all nodes and one instance
138     self.needed_locks = {
139       locking.LEVEL_NODE: locking.ALL_SET,
140       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
141     }
142     # Acquire just two nodes
143     self.needed_locks = {
144       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145     }
146     # Acquire no locks
147     self.needed_locks = {} # No, you can't leave it to the default value None
148
149     """
150     # The implementation of this method is mandatory only if the new LU is
151     # concurrent, so that old LUs don't need to be changed all at the same
152     # time.
153     if self.REQ_BGL:
154       self.needed_locks = {} # Exclusive LUs don't need locks.
155     else:
156       raise NotImplementedError
157
158   def DeclareLocks(self, level):
159     """Declare LU locking needs for a level
160
161     While most LUs can just declare their locking needs at ExpandNames time,
162     sometimes there's the need to calculate some locks after having acquired
163     the ones before. This function is called just before acquiring locks at a
164     particular level, but after acquiring the ones at lower levels, and permits
165     such calculations. It can be used to modify self.needed_locks, and by
166     default it does nothing.
167
168     This function is only called if you have something already set in
169     self.needed_locks for the level.
170
171     @param level: Locking level which is going to be locked
172     @type level: member of ganeti.locking.LEVELS
173
174     """
175
176   def CheckPrereq(self):
177     """Check prerequisites for this LU.
178
179     This method should check that the prerequisites for the execution
180     of this LU are fulfilled. It can do internode communication, but
181     it should be idempotent - no cluster or system changes are
182     allowed.
183
184     The method should raise errors.OpPrereqError in case something is
185     not fulfilled. Its return value is ignored.
186
187     This method should also update all the parameters of the opcode to
188     their canonical form if it hasn't been done by ExpandNames before.
189
190     """
191     raise NotImplementedError
192
193   def Exec(self, feedback_fn):
194     """Execute the LU.
195
196     This method should implement the actual work. It should raise
197     errors.OpExecError for failures that are somewhat dealt with in
198     code, or expected.
199
200     """
201     raise NotImplementedError
202
203   def BuildHooksEnv(self):
204     """Build hooks environment for this LU.
205
206     This method should return a three-node tuple consisting of: a dict
207     containing the environment that will be used for running the
208     specific hook for this LU, a list of node names on which the hook
209     should run before the execution, and a list of node names on which
210     the hook should run after the execution.
211
212     The keys of the dict must not have 'GANETI_' prefixed as this will
213     be handled in the hooks runner. Also note additional keys will be
214     added by the hooks runner. If the LU doesn't define any
215     environment, an empty dict (and not None) should be returned.
216
217     No nodes should be returned as an empty list (and not None).
218
219     Note that if the HPATH for a LU class is None, this function will
220     not be called.
221
222     """
223     raise NotImplementedError
224
225   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226     """Notify the LU about the results of its hooks.
227
228     This method is called every time a hooks phase is executed, and notifies
229     the Logical Unit about the hooks' result. The LU can then use it to alter
230     its result based on the hooks.  By default the method does nothing and the
231     previous result is passed back unchanged but any LU can define it if it
232     wants to use the local cluster hook-scripts somehow.
233
234     Args:
235       phase: the hooks phase that has just been run
236       hooks_results: the results of the multi-node hooks rpc call
237       feedback_fn: function to send feedback back to the caller
238       lu_result: the previous result this LU had, or None in the PRE phase.
239
240     """
241     return lu_result
242
243   def _ExpandAndLockInstance(self):
244     """Helper function to expand and lock an instance.
245
246     Many LUs that work on an instance take its name in self.op.instance_name
247     and need to expand it and then declare the expanded name for locking. This
248     function does it, and then updates self.op.instance_name to the expanded
249     name. It also initializes needed_locks as a dict, if this hasn't been done
250     before.
251
252     """
253     if self.needed_locks is None:
254       self.needed_locks = {}
255     else:
256       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257         "_ExpandAndLockInstance called with instance-level locks set"
258     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259     if expanded_name is None:
260       raise errors.OpPrereqError("Instance '%s' not known" %
261                                   self.op.instance_name)
262     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263     self.op.instance_name = expanded_name
264
265   def _LockInstancesNodes(self, primary_only=False):
266     """Helper function to declare instances' nodes for locking.
267
268     This function should be called after locking one or more instances to lock
269     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270     with all primary or secondary nodes for instances already locked and
271     present in self.needed_locks[locking.LEVEL_INSTANCE].
272
273     It should be called from DeclareLocks, and for safety only works if
274     self.recalculate_locks[locking.LEVEL_NODE] is set.
275
276     In the future it may grow parameters to just lock some instance's nodes, or
277     to just lock primaries or secondary nodes, if needed.
278
279     If should be called in DeclareLocks in a way similar to:
280
281     if level == locking.LEVEL_NODE:
282       self._LockInstancesNodes()
283
284     @type primary_only: boolean
285     @param primary_only: only lock primary nodes of locked instances
286
287     """
288     assert locking.LEVEL_NODE in self.recalculate_locks, \
289       "_LockInstancesNodes helper function called with no nodes to recalculate"
290
291     # TODO: check if we're really been called with the instance locks held
292
293     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294     # future we might want to have different behaviors depending on the value
295     # of self.recalculate_locks[locking.LEVEL_NODE]
296     wanted_nodes = []
297     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298       instance = self.context.cfg.GetInstanceInfo(instance_name)
299       wanted_nodes.append(instance.primary_node)
300       if not primary_only:
301         wanted_nodes.extend(instance.secondary_nodes)
302
303     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
307
308     del self.recalculate_locks[locking.LEVEL_NODE]
309
310
311 class NoHooksLU(LogicalUnit):
312   """Simple LU which runs no hooks.
313
314   This LU is intended as a parent for other LogicalUnits which will
315   run no hooks, in order to reduce duplicate code.
316
317   """
318   HPATH = None
319   HTYPE = None
320
321
322 def _GetWantedNodes(lu, nodes):
323   """Returns list of checked and expanded node names.
324
325   Args:
326     nodes: List of nodes (strings) or None for all
327
328   """
329   if not isinstance(nodes, list):
330     raise errors.OpPrereqError("Invalid argument type 'nodes'")
331
332   if not nodes:
333     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334       " non-empty list of nodes whose name is to be expanded.")
335
336   wanted = []
337   for name in nodes:
338     node = lu.cfg.ExpandNodeName(name)
339     if node is None:
340       raise errors.OpPrereqError("No such node name '%s'" % name)
341     wanted.append(node)
342
343   return utils.NiceSort(wanted)
344
345
346 def _GetWantedInstances(lu, instances):
347   """Returns list of checked and expanded instance names.
348
349   Args:
350     instances: List of instances (strings) or None for all
351
352   """
353   if not isinstance(instances, list):
354     raise errors.OpPrereqError("Invalid argument type 'instances'")
355
356   if instances:
357     wanted = []
358
359     for name in instances:
360       instance = lu.cfg.ExpandInstanceName(name)
361       if instance is None:
362         raise errors.OpPrereqError("No such instance name '%s'" % name)
363       wanted.append(instance)
364
365   else:
366     wanted = lu.cfg.GetInstanceList()
367   return utils.NiceSort(wanted)
368
369
370 def _CheckOutputFields(static, dynamic, selected):
371   """Checks whether all selected fields are valid.
372
373   Args:
374     static: Static fields
375     dynamic: Dynamic fields
376
377   """
378   static_fields = frozenset(static)
379   dynamic_fields = frozenset(dynamic)
380
381   all_fields = static_fields | dynamic_fields
382
383   if not all_fields.issuperset(selected):
384     raise errors.OpPrereqError("Unknown output fields selected: %s"
385                                % ",".join(frozenset(selected).
386                                           difference(all_fields)))
387
388
389 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390                           memory, vcpus, nics):
391   """Builds instance related env variables for hooks from single variables.
392
393   Args:
394     secondary_nodes: List of secondary nodes as strings
395   """
396   env = {
397     "OP_TARGET": name,
398     "INSTANCE_NAME": name,
399     "INSTANCE_PRIMARY": primary_node,
400     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401     "INSTANCE_OS_TYPE": os_type,
402     "INSTANCE_STATUS": status,
403     "INSTANCE_MEMORY": memory,
404     "INSTANCE_VCPUS": vcpus,
405   }
406
407   if nics:
408     nic_count = len(nics)
409     for idx, (ip, bridge, mac) in enumerate(nics):
410       if ip is None:
411         ip = ""
412       env["INSTANCE_NIC%d_IP" % idx] = ip
413       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
415   else:
416     nic_count = 0
417
418   env["INSTANCE_NIC_COUNT"] = nic_count
419
420   return env
421
422
423 def _BuildInstanceHookEnvByObject(instance, override=None):
424   """Builds instance related env variables for hooks from an object.
425
426   Args:
427     instance: objects.Instance object of instance
428     override: dict of values to override
429   """
430   args = {
431     'name': instance.name,
432     'primary_node': instance.primary_node,
433     'secondary_nodes': instance.secondary_nodes,
434     'os_type': instance.os,
435     'status': instance.os,
436     'memory': instance.memory,
437     'vcpus': instance.vcpus,
438     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
439   }
440   if override:
441     args.update(override)
442   return _BuildInstanceHookEnv(**args)
443
444
445 def _CheckInstanceBridgesExist(lu, instance):
446   """Check that the brigdes needed by an instance exist.
447
448   """
449   # check bridges existance
450   brlist = [nic.bridge for nic in instance.nics]
451   if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
452     raise errors.OpPrereqError("one or more target bridges %s does not"
453                                " exist on destination node '%s'" %
454                                (brlist, instance.primary_node))
455
456
457 class LUDestroyCluster(NoHooksLU):
458   """Logical unit for destroying the cluster.
459
460   """
461   _OP_REQP = []
462
463   def CheckPrereq(self):
464     """Check prerequisites.
465
466     This checks whether the cluster is empty.
467
468     Any errors are signalled by raising errors.OpPrereqError.
469
470     """
471     master = self.cfg.GetMasterNode()
472
473     nodelist = self.cfg.GetNodeList()
474     if len(nodelist) != 1 or nodelist[0] != master:
475       raise errors.OpPrereqError("There are still %d node(s) in"
476                                  " this cluster." % (len(nodelist) - 1))
477     instancelist = self.cfg.GetInstanceList()
478     if instancelist:
479       raise errors.OpPrereqError("There are still %d instance(s) in"
480                                  " this cluster." % len(instancelist))
481
482   def Exec(self, feedback_fn):
483     """Destroys the cluster.
484
485     """
486     master = self.cfg.GetMasterNode()
487     if not self.rpc.call_node_stop_master(master, False):
488       raise errors.OpExecError("Could not disable the master role")
489     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
490     utils.CreateBackup(priv_key)
491     utils.CreateBackup(pub_key)
492     return master
493
494
495 class LUVerifyCluster(LogicalUnit):
496   """Verifies the cluster status.
497
498   """
499   HPATH = "cluster-verify"
500   HTYPE = constants.HTYPE_CLUSTER
501   _OP_REQP = ["skip_checks"]
502   REQ_BGL = False
503
504   def ExpandNames(self):
505     self.needed_locks = {
506       locking.LEVEL_NODE: locking.ALL_SET,
507       locking.LEVEL_INSTANCE: locking.ALL_SET,
508     }
509     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
510
511   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
512                   remote_version, feedback_fn):
513     """Run multiple tests against a node.
514
515     Test list:
516       - compares ganeti version
517       - checks vg existance and size > 20G
518       - checks config file checksum
519       - checks ssh to other nodes
520
521     Args:
522       node: name of the node to check
523       file_list: required list of files
524       local_cksum: dictionary of local files and their checksums
525
526     """
527     # compares ganeti version
528     local_version = constants.PROTOCOL_VERSION
529     if not remote_version:
530       feedback_fn("  - ERROR: connection to %s failed" % (node))
531       return True
532
533     if local_version != remote_version:
534       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
535                       (local_version, node, remote_version))
536       return True
537
538     # checks vg existance and size > 20G
539
540     bad = False
541     if not vglist:
542       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
543                       (node,))
544       bad = True
545     else:
546       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
547                                             constants.MIN_VG_SIZE)
548       if vgstatus:
549         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
550         bad = True
551
552     if not node_result:
553       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
554       return True
555
556     # checks config file checksum
557     # checks ssh to any
558
559     if 'filelist' not in node_result:
560       bad = True
561       feedback_fn("  - ERROR: node hasn't returned file checksum data")
562     else:
563       remote_cksum = node_result['filelist']
564       for file_name in file_list:
565         if file_name not in remote_cksum:
566           bad = True
567           feedback_fn("  - ERROR: file '%s' missing" % file_name)
568         elif remote_cksum[file_name] != local_cksum[file_name]:
569           bad = True
570           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
571
572     if 'nodelist' not in node_result:
573       bad = True
574       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
575     else:
576       if node_result['nodelist']:
577         bad = True
578         for node in node_result['nodelist']:
579           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
580                           (node, node_result['nodelist'][node]))
581     if 'node-net-test' not in node_result:
582       bad = True
583       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
584     else:
585       if node_result['node-net-test']:
586         bad = True
587         nlist = utils.NiceSort(node_result['node-net-test'].keys())
588         for node in nlist:
589           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
590                           (node, node_result['node-net-test'][node]))
591
592     hyp_result = node_result.get('hypervisor', None)
593     if isinstance(hyp_result, dict):
594       for hv_name, hv_result in hyp_result.iteritems():
595         if hv_result is not None:
596           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
597                       (hv_name, hv_result))
598     return bad
599
600   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
601                       node_instance, feedback_fn):
602     """Verify an instance.
603
604     This function checks to see if the required block devices are
605     available on the instance's node.
606
607     """
608     bad = False
609
610     node_current = instanceconfig.primary_node
611
612     node_vol_should = {}
613     instanceconfig.MapLVsByNode(node_vol_should)
614
615     for node in node_vol_should:
616       for volume in node_vol_should[node]:
617         if node not in node_vol_is or volume not in node_vol_is[node]:
618           feedback_fn("  - ERROR: volume %s missing on node %s" %
619                           (volume, node))
620           bad = True
621
622     if not instanceconfig.status == 'down':
623       if (node_current not in node_instance or
624           not instance in node_instance[node_current]):
625         feedback_fn("  - ERROR: instance %s not running on node %s" %
626                         (instance, node_current))
627         bad = True
628
629     for node in node_instance:
630       if (not node == node_current):
631         if instance in node_instance[node]:
632           feedback_fn("  - ERROR: instance %s should not run on node %s" %
633                           (instance, node))
634           bad = True
635
636     return bad
637
638   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
639     """Verify if there are any unknown volumes in the cluster.
640
641     The .os, .swap and backup volumes are ignored. All other volumes are
642     reported as unknown.
643
644     """
645     bad = False
646
647     for node in node_vol_is:
648       for volume in node_vol_is[node]:
649         if node not in node_vol_should or volume not in node_vol_should[node]:
650           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
651                       (volume, node))
652           bad = True
653     return bad
654
655   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
656     """Verify the list of running instances.
657
658     This checks what instances are running but unknown to the cluster.
659
660     """
661     bad = False
662     for node in node_instance:
663       for runninginstance in node_instance[node]:
664         if runninginstance not in instancelist:
665           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
666                           (runninginstance, node))
667           bad = True
668     return bad
669
670   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
671     """Verify N+1 Memory Resilience.
672
673     Check that if one single node dies we can still start all the instances it
674     was primary for.
675
676     """
677     bad = False
678
679     for node, nodeinfo in node_info.iteritems():
680       # This code checks that every node which is now listed as secondary has
681       # enough memory to host all instances it is supposed to should a single
682       # other node in the cluster fail.
683       # FIXME: not ready for failover to an arbitrary node
684       # FIXME: does not support file-backed instances
685       # WARNING: we currently take into account down instances as well as up
686       # ones, considering that even if they're down someone might want to start
687       # them even in the event of a node failure.
688       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
689         needed_mem = 0
690         for instance in instances:
691           needed_mem += instance_cfg[instance].memory
692         if nodeinfo['mfree'] < needed_mem:
693           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
694                       " failovers should node %s fail" % (node, prinode))
695           bad = True
696     return bad
697
698   def CheckPrereq(self):
699     """Check prerequisites.
700
701     Transform the list of checks we're going to skip into a set and check that
702     all its members are valid.
703
704     """
705     self.skip_set = frozenset(self.op.skip_checks)
706     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
707       raise errors.OpPrereqError("Invalid checks to be skipped specified")
708
709   def BuildHooksEnv(self):
710     """Build hooks env.
711
712     Cluster-Verify hooks just rone in the post phase and their failure makes
713     the output be logged in the verify output and the verification to fail.
714
715     """
716     all_nodes = self.cfg.GetNodeList()
717     # TODO: populate the environment with useful information for verify hooks
718     env = {}
719     return env, [], all_nodes
720
721   def Exec(self, feedback_fn):
722     """Verify integrity of cluster, performing various test on nodes.
723
724     """
725     bad = False
726     feedback_fn("* Verifying global settings")
727     for msg in self.cfg.VerifyConfig():
728       feedback_fn("  - ERROR: %s" % msg)
729
730     vg_name = self.cfg.GetVGName()
731     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
732     nodelist = utils.NiceSort(self.cfg.GetNodeList())
733     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
734     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
735     i_non_redundant = [] # Non redundant instances
736     node_volume = {}
737     node_instance = {}
738     node_info = {}
739     instance_cfg = {}
740
741     # FIXME: verify OS list
742     # do local checksums
743     file_names = []
744     file_names.append(constants.SSL_CERT_FILE)
745     file_names.append(constants.CLUSTER_CONF_FILE)
746     local_checksums = utils.FingerprintFiles(file_names)
747
748     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
749     all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
750     all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
751     all_vglist = self.rpc.call_vg_list(nodelist)
752     node_verify_param = {
753       'filelist': file_names,
754       'nodelist': nodelist,
755       'hypervisor': hypervisors,
756       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
757                         for node in nodeinfo]
758       }
759     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
760                                            self.cfg.GetClusterName())
761     all_rversion = self.rpc.call_version(nodelist)
762     all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
763                                         self.cfg.GetHypervisorType())
764
765     for node in nodelist:
766       feedback_fn("* Verifying node %s" % node)
767       result = self._VerifyNode(node, file_names, local_checksums,
768                                 all_vglist[node], all_nvinfo[node],
769                                 all_rversion[node], feedback_fn)
770       bad = bad or result
771
772       # node_volume
773       volumeinfo = all_volumeinfo[node]
774
775       if isinstance(volumeinfo, basestring):
776         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
777                     (node, volumeinfo[-400:].encode('string_escape')))
778         bad = True
779         node_volume[node] = {}
780       elif not isinstance(volumeinfo, dict):
781         feedback_fn("  - ERROR: connection to %s failed" % (node,))
782         bad = True
783         continue
784       else:
785         node_volume[node] = volumeinfo
786
787       # node_instance
788       nodeinstance = all_instanceinfo[node]
789       if type(nodeinstance) != list:
790         feedback_fn("  - ERROR: connection to %s failed" % (node,))
791         bad = True
792         continue
793
794       node_instance[node] = nodeinstance
795
796       # node_info
797       nodeinfo = all_ninfo[node]
798       if not isinstance(nodeinfo, dict):
799         feedback_fn("  - ERROR: connection to %s failed" % (node,))
800         bad = True
801         continue
802
803       try:
804         node_info[node] = {
805           "mfree": int(nodeinfo['memory_free']),
806           "dfree": int(nodeinfo['vg_free']),
807           "pinst": [],
808           "sinst": [],
809           # dictionary holding all instances this node is secondary for,
810           # grouped by their primary node. Each key is a cluster node, and each
811           # value is a list of instances which have the key as primary and the
812           # current node as secondary.  this is handy to calculate N+1 memory
813           # availability if you can only failover from a primary to its
814           # secondary.
815           "sinst-by-pnode": {},
816         }
817       except ValueError:
818         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
819         bad = True
820         continue
821
822     node_vol_should = {}
823
824     for instance in instancelist:
825       feedback_fn("* Verifying instance %s" % instance)
826       inst_config = self.cfg.GetInstanceInfo(instance)
827       result =  self._VerifyInstance(instance, inst_config, node_volume,
828                                      node_instance, feedback_fn)
829       bad = bad or result
830
831       inst_config.MapLVsByNode(node_vol_should)
832
833       instance_cfg[instance] = inst_config
834
835       pnode = inst_config.primary_node
836       if pnode in node_info:
837         node_info[pnode]['pinst'].append(instance)
838       else:
839         feedback_fn("  - ERROR: instance %s, connection to primary node"
840                     " %s failed" % (instance, pnode))
841         bad = True
842
843       # If the instance is non-redundant we cannot survive losing its primary
844       # node, so we are not N+1 compliant. On the other hand we have no disk
845       # templates with more than one secondary so that situation is not well
846       # supported either.
847       # FIXME: does not support file-backed instances
848       if len(inst_config.secondary_nodes) == 0:
849         i_non_redundant.append(instance)
850       elif len(inst_config.secondary_nodes) > 1:
851         feedback_fn("  - WARNING: multiple secondaries for instance %s"
852                     % instance)
853
854       for snode in inst_config.secondary_nodes:
855         if snode in node_info:
856           node_info[snode]['sinst'].append(instance)
857           if pnode not in node_info[snode]['sinst-by-pnode']:
858             node_info[snode]['sinst-by-pnode'][pnode] = []
859           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
860         else:
861           feedback_fn("  - ERROR: instance %s, connection to secondary node"
862                       " %s failed" % (instance, snode))
863
864     feedback_fn("* Verifying orphan volumes")
865     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
866                                        feedback_fn)
867     bad = bad or result
868
869     feedback_fn("* Verifying remaining instances")
870     result = self._VerifyOrphanInstances(instancelist, node_instance,
871                                          feedback_fn)
872     bad = bad or result
873
874     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
875       feedback_fn("* Verifying N+1 Memory redundancy")
876       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
877       bad = bad or result
878
879     feedback_fn("* Other Notes")
880     if i_non_redundant:
881       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
882                   % len(i_non_redundant))
883
884     return not bad
885
886   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
887     """Analize the post-hooks' result, handle it, and send some
888     nicely-formatted feedback back to the user.
889
890     Args:
891       phase: the hooks phase that has just been run
892       hooks_results: the results of the multi-node hooks rpc call
893       feedback_fn: function to send feedback back to the caller
894       lu_result: previous Exec result
895
896     """
897     # We only really run POST phase hooks, and are only interested in
898     # their results
899     if phase == constants.HOOKS_PHASE_POST:
900       # Used to change hooks' output to proper indentation
901       indent_re = re.compile('^', re.M)
902       feedback_fn("* Hooks Results")
903       if not hooks_results:
904         feedback_fn("  - ERROR: general communication failure")
905         lu_result = 1
906       else:
907         for node_name in hooks_results:
908           show_node_header = True
909           res = hooks_results[node_name]
910           if res is False or not isinstance(res, list):
911             feedback_fn("    Communication failure")
912             lu_result = 1
913             continue
914           for script, hkr, output in res:
915             if hkr == constants.HKR_FAIL:
916               # The node header is only shown once, if there are
917               # failing hooks on that node
918               if show_node_header:
919                 feedback_fn("  Node %s:" % node_name)
920                 show_node_header = False
921               feedback_fn("    ERROR: Script %s failed, output:" % script)
922               output = indent_re.sub('      ', output)
923               feedback_fn("%s" % output)
924               lu_result = 1
925
926       return lu_result
927
928
929 class LUVerifyDisks(NoHooksLU):
930   """Verifies the cluster disks status.
931
932   """
933   _OP_REQP = []
934   REQ_BGL = False
935
936   def ExpandNames(self):
937     self.needed_locks = {
938       locking.LEVEL_NODE: locking.ALL_SET,
939       locking.LEVEL_INSTANCE: locking.ALL_SET,
940     }
941     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
942
943   def CheckPrereq(self):
944     """Check prerequisites.
945
946     This has no prerequisites.
947
948     """
949     pass
950
951   def Exec(self, feedback_fn):
952     """Verify integrity of cluster disks.
953
954     """
955     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
956
957     vg_name = self.cfg.GetVGName()
958     nodes = utils.NiceSort(self.cfg.GetNodeList())
959     instances = [self.cfg.GetInstanceInfo(name)
960                  for name in self.cfg.GetInstanceList()]
961
962     nv_dict = {}
963     for inst in instances:
964       inst_lvs = {}
965       if (inst.status != "up" or
966           inst.disk_template not in constants.DTS_NET_MIRROR):
967         continue
968       inst.MapLVsByNode(inst_lvs)
969       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
970       for node, vol_list in inst_lvs.iteritems():
971         for vol in vol_list:
972           nv_dict[(node, vol)] = inst
973
974     if not nv_dict:
975       return result
976
977     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
978
979     to_act = set()
980     for node in nodes:
981       # node_volume
982       lvs = node_lvs[node]
983
984       if isinstance(lvs, basestring):
985         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
986         res_nlvm[node] = lvs
987       elif not isinstance(lvs, dict):
988         logger.Info("connection to node %s failed or invalid data returned" %
989                     (node,))
990         res_nodes.append(node)
991         continue
992
993       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
994         inst = nv_dict.pop((node, lv_name), None)
995         if (not lv_online and inst is not None
996             and inst.name not in res_instances):
997           res_instances.append(inst.name)
998
999     # any leftover items in nv_dict are missing LVs, let's arrange the
1000     # data better
1001     for key, inst in nv_dict.iteritems():
1002       if inst.name not in res_missing:
1003         res_missing[inst.name] = []
1004       res_missing[inst.name].append(key)
1005
1006     return result
1007
1008
1009 class LURenameCluster(LogicalUnit):
1010   """Rename the cluster.
1011
1012   """
1013   HPATH = "cluster-rename"
1014   HTYPE = constants.HTYPE_CLUSTER
1015   _OP_REQP = ["name"]
1016
1017   def BuildHooksEnv(self):
1018     """Build hooks env.
1019
1020     """
1021     env = {
1022       "OP_TARGET": self.cfg.GetClusterName(),
1023       "NEW_NAME": self.op.name,
1024       }
1025     mn = self.cfg.GetMasterNode()
1026     return env, [mn], [mn]
1027
1028   def CheckPrereq(self):
1029     """Verify that the passed name is a valid one.
1030
1031     """
1032     hostname = utils.HostInfo(self.op.name)
1033
1034     new_name = hostname.name
1035     self.ip = new_ip = hostname.ip
1036     old_name = self.cfg.GetClusterName()
1037     old_ip = self.cfg.GetMasterIP()
1038     if new_name == old_name and new_ip == old_ip:
1039       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1040                                  " cluster has changed")
1041     if new_ip != old_ip:
1042       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1043         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1044                                    " reachable on the network. Aborting." %
1045                                    new_ip)
1046
1047     self.op.name = new_name
1048
1049   def Exec(self, feedback_fn):
1050     """Rename the cluster.
1051
1052     """
1053     clustername = self.op.name
1054     ip = self.ip
1055
1056     # shutdown the master IP
1057     master = self.cfg.GetMasterNode()
1058     if not self.rpc.call_node_stop_master(master, False):
1059       raise errors.OpExecError("Could not disable the master role")
1060
1061     try:
1062       # modify the sstore
1063       # TODO: sstore
1064       ss.SetKey(ss.SS_MASTER_IP, ip)
1065       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1066
1067       # Distribute updated ss config to all nodes
1068       myself = self.cfg.GetNodeInfo(master)
1069       dist_nodes = self.cfg.GetNodeList()
1070       if myself.name in dist_nodes:
1071         dist_nodes.remove(myself.name)
1072
1073       logger.Debug("Copying updated ssconf data to all nodes")
1074       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1075         fname = ss.KeyToFilename(keyname)
1076         result = self.rpc.call_upload_file(dist_nodes, fname)
1077         for to_node in dist_nodes:
1078           if not result[to_node]:
1079             logger.Error("copy of file %s to node %s failed" %
1080                          (fname, to_node))
1081     finally:
1082       if not self.rpc.call_node_start_master(master, False):
1083         logger.Error("Could not re-enable the master role on the master,"
1084                      " please restart manually.")
1085
1086
1087 def _RecursiveCheckIfLVMBased(disk):
1088   """Check if the given disk or its children are lvm-based.
1089
1090   Args:
1091     disk: ganeti.objects.Disk object
1092
1093   Returns:
1094     boolean indicating whether a LD_LV dev_type was found or not
1095
1096   """
1097   if disk.children:
1098     for chdisk in disk.children:
1099       if _RecursiveCheckIfLVMBased(chdisk):
1100         return True
1101   return disk.dev_type == constants.LD_LV
1102
1103
1104 class LUSetClusterParams(LogicalUnit):
1105   """Change the parameters of the cluster.
1106
1107   """
1108   HPATH = "cluster-modify"
1109   HTYPE = constants.HTYPE_CLUSTER
1110   _OP_REQP = []
1111   REQ_BGL = False
1112
1113   def ExpandNames(self):
1114     # FIXME: in the future maybe other cluster params won't require checking on
1115     # all nodes to be modified.
1116     self.needed_locks = {
1117       locking.LEVEL_NODE: locking.ALL_SET,
1118     }
1119     self.share_locks[locking.LEVEL_NODE] = 1
1120
1121   def BuildHooksEnv(self):
1122     """Build hooks env.
1123
1124     """
1125     env = {
1126       "OP_TARGET": self.cfg.GetClusterName(),
1127       "NEW_VG_NAME": self.op.vg_name,
1128       }
1129     mn = self.cfg.GetMasterNode()
1130     return env, [mn], [mn]
1131
1132   def CheckPrereq(self):
1133     """Check prerequisites.
1134
1135     This checks whether the given params don't conflict and
1136     if the given volume group is valid.
1137
1138     """
1139     # FIXME: This only works because there is only one parameter that can be
1140     # changed or removed.
1141     if not self.op.vg_name:
1142       instances = self.cfg.GetAllInstancesInfo().values()
1143       for inst in instances:
1144         for disk in inst.disks:
1145           if _RecursiveCheckIfLVMBased(disk):
1146             raise errors.OpPrereqError("Cannot disable lvm storage while"
1147                                        " lvm-based instances exist")
1148
1149     # if vg_name not None, checks given volume group on all nodes
1150     if self.op.vg_name:
1151       node_list = self.acquired_locks[locking.LEVEL_NODE]
1152       vglist = self.rpc.call_vg_list(node_list)
1153       for node in node_list:
1154         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1155                                               constants.MIN_VG_SIZE)
1156         if vgstatus:
1157           raise errors.OpPrereqError("Error on node '%s': %s" %
1158                                      (node, vgstatus))
1159
1160   def Exec(self, feedback_fn):
1161     """Change the parameters of the cluster.
1162
1163     """
1164     if self.op.vg_name != self.cfg.GetVGName():
1165       self.cfg.SetVGName(self.op.vg_name)
1166     else:
1167       feedback_fn("Cluster LVM configuration already in desired"
1168                   " state, not changing")
1169
1170
1171 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1172   """Sleep and poll for an instance's disk to sync.
1173
1174   """
1175   if not instance.disks:
1176     return True
1177
1178   if not oneshot:
1179     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1180
1181   node = instance.primary_node
1182
1183   for dev in instance.disks:
1184     lu.cfg.SetDiskID(dev, node)
1185
1186   retries = 0
1187   while True:
1188     max_time = 0
1189     done = True
1190     cumul_degraded = False
1191     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1192     if not rstats:
1193       lu.proc.LogWarning("Can't get any data from node %s" % node)
1194       retries += 1
1195       if retries >= 10:
1196         raise errors.RemoteError("Can't contact node %s for mirror data,"
1197                                  " aborting." % node)
1198       time.sleep(6)
1199       continue
1200     retries = 0
1201     for i in range(len(rstats)):
1202       mstat = rstats[i]
1203       if mstat is None:
1204         lu.proc.LogWarning("Can't compute data for node %s/%s" %
1205                            (node, instance.disks[i].iv_name))
1206         continue
1207       # we ignore the ldisk parameter
1208       perc_done, est_time, is_degraded, _ = mstat
1209       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1210       if perc_done is not None:
1211         done = False
1212         if est_time is not None:
1213           rem_time = "%d estimated seconds remaining" % est_time
1214           max_time = est_time
1215         else:
1216           rem_time = "no time estimate"
1217         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1218                         (instance.disks[i].iv_name, perc_done, rem_time))
1219     if done or oneshot:
1220       break
1221
1222     time.sleep(min(60, max_time))
1223
1224   if done:
1225     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1226   return not cumul_degraded
1227
1228
1229 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1230   """Check that mirrors are not degraded.
1231
1232   The ldisk parameter, if True, will change the test from the
1233   is_degraded attribute (which represents overall non-ok status for
1234   the device(s)) to the ldisk (representing the local storage status).
1235
1236   """
1237   lu.cfg.SetDiskID(dev, node)
1238   if ldisk:
1239     idx = 6
1240   else:
1241     idx = 5
1242
1243   result = True
1244   if on_primary or dev.AssembleOnSecondary():
1245     rstats = lu.rpc.call_blockdev_find(node, dev)
1246     if not rstats:
1247       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1248       result = False
1249     else:
1250       result = result and (not rstats[idx])
1251   if dev.children:
1252     for child in dev.children:
1253       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1254
1255   return result
1256
1257
1258 class LUDiagnoseOS(NoHooksLU):
1259   """Logical unit for OS diagnose/query.
1260
1261   """
1262   _OP_REQP = ["output_fields", "names"]
1263   REQ_BGL = False
1264
1265   def ExpandNames(self):
1266     if self.op.names:
1267       raise errors.OpPrereqError("Selective OS query not supported")
1268
1269     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1270     _CheckOutputFields(static=[],
1271                        dynamic=self.dynamic_fields,
1272                        selected=self.op.output_fields)
1273
1274     # Lock all nodes, in shared mode
1275     self.needed_locks = {}
1276     self.share_locks[locking.LEVEL_NODE] = 1
1277     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1278
1279   def CheckPrereq(self):
1280     """Check prerequisites.
1281
1282     """
1283
1284   @staticmethod
1285   def _DiagnoseByOS(node_list, rlist):
1286     """Remaps a per-node return list into an a per-os per-node dictionary
1287
1288       Args:
1289         node_list: a list with the names of all nodes
1290         rlist: a map with node names as keys and OS objects as values
1291
1292       Returns:
1293         map: a map with osnames as keys and as value another map, with
1294              nodes as
1295              keys and list of OS objects as values
1296              e.g. {"debian-etch": {"node1": [<object>,...],
1297                                    "node2": [<object>,]}
1298                   }
1299
1300     """
1301     all_os = {}
1302     for node_name, nr in rlist.iteritems():
1303       if not nr:
1304         continue
1305       for os_obj in nr:
1306         if os_obj.name not in all_os:
1307           # build a list of nodes for this os containing empty lists
1308           # for each node in node_list
1309           all_os[os_obj.name] = {}
1310           for nname in node_list:
1311             all_os[os_obj.name][nname] = []
1312         all_os[os_obj.name][node_name].append(os_obj)
1313     return all_os
1314
1315   def Exec(self, feedback_fn):
1316     """Compute the list of OSes.
1317
1318     """
1319     node_list = self.acquired_locks[locking.LEVEL_NODE]
1320     node_data = self.rpc.call_os_diagnose(node_list)
1321     if node_data == False:
1322       raise errors.OpExecError("Can't gather the list of OSes")
1323     pol = self._DiagnoseByOS(node_list, node_data)
1324     output = []
1325     for os_name, os_data in pol.iteritems():
1326       row = []
1327       for field in self.op.output_fields:
1328         if field == "name":
1329           val = os_name
1330         elif field == "valid":
1331           val = utils.all([osl and osl[0] for osl in os_data.values()])
1332         elif field == "node_status":
1333           val = {}
1334           for node_name, nos_list in os_data.iteritems():
1335             val[node_name] = [(v.status, v.path) for v in nos_list]
1336         else:
1337           raise errors.ParameterError(field)
1338         row.append(val)
1339       output.append(row)
1340
1341     return output
1342
1343
1344 class LURemoveNode(LogicalUnit):
1345   """Logical unit for removing a node.
1346
1347   """
1348   HPATH = "node-remove"
1349   HTYPE = constants.HTYPE_NODE
1350   _OP_REQP = ["node_name"]
1351
1352   def BuildHooksEnv(self):
1353     """Build hooks env.
1354
1355     This doesn't run on the target node in the pre phase as a failed
1356     node would then be impossible to remove.
1357
1358     """
1359     env = {
1360       "OP_TARGET": self.op.node_name,
1361       "NODE_NAME": self.op.node_name,
1362       }
1363     all_nodes = self.cfg.GetNodeList()
1364     all_nodes.remove(self.op.node_name)
1365     return env, all_nodes, all_nodes
1366
1367   def CheckPrereq(self):
1368     """Check prerequisites.
1369
1370     This checks:
1371      - the node exists in the configuration
1372      - it does not have primary or secondary instances
1373      - it's not the master
1374
1375     Any errors are signalled by raising errors.OpPrereqError.
1376
1377     """
1378     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1379     if node is None:
1380       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1381
1382     instance_list = self.cfg.GetInstanceList()
1383
1384     masternode = self.cfg.GetMasterNode()
1385     if node.name == masternode:
1386       raise errors.OpPrereqError("Node is the master node,"
1387                                  " you need to failover first.")
1388
1389     for instance_name in instance_list:
1390       instance = self.cfg.GetInstanceInfo(instance_name)
1391       if node.name == instance.primary_node:
1392         raise errors.OpPrereqError("Instance %s still running on the node,"
1393                                    " please remove first." % instance_name)
1394       if node.name in instance.secondary_nodes:
1395         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1396                                    " please remove first." % instance_name)
1397     self.op.node_name = node.name
1398     self.node = node
1399
1400   def Exec(self, feedback_fn):
1401     """Removes the node from the cluster.
1402
1403     """
1404     node = self.node
1405     logger.Info("stopping the node daemon and removing configs from node %s" %
1406                 node.name)
1407
1408     self.context.RemoveNode(node.name)
1409
1410     self.rpc.call_node_leave_cluster(node.name)
1411
1412
1413 class LUQueryNodes(NoHooksLU):
1414   """Logical unit for querying nodes.
1415
1416   """
1417   _OP_REQP = ["output_fields", "names"]
1418   REQ_BGL = False
1419
1420   def ExpandNames(self):
1421     self.dynamic_fields = frozenset([
1422       "dtotal", "dfree",
1423       "mtotal", "mnode", "mfree",
1424       "bootid",
1425       "ctotal",
1426       ])
1427
1428     self.static_fields = frozenset([
1429       "name", "pinst_cnt", "sinst_cnt",
1430       "pinst_list", "sinst_list",
1431       "pip", "sip", "tags",
1432       "serial_no",
1433       ])
1434
1435     _CheckOutputFields(static=self.static_fields,
1436                        dynamic=self.dynamic_fields,
1437                        selected=self.op.output_fields)
1438
1439     self.needed_locks = {}
1440     self.share_locks[locking.LEVEL_NODE] = 1
1441
1442     if self.op.names:
1443       self.wanted = _GetWantedNodes(self, self.op.names)
1444     else:
1445       self.wanted = locking.ALL_SET
1446
1447     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1448     if self.do_locking:
1449       # if we don't request only static fields, we need to lock the nodes
1450       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1451
1452
1453   def CheckPrereq(self):
1454     """Check prerequisites.
1455
1456     """
1457     # The validation of the node list is done in the _GetWantedNodes,
1458     # if non empty, and if empty, there's no validation to do
1459     pass
1460
1461   def Exec(self, feedback_fn):
1462     """Computes the list of nodes and their attributes.
1463
1464     """
1465     all_info = self.cfg.GetAllNodesInfo()
1466     if self.do_locking:
1467       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1468     elif self.wanted != locking.ALL_SET:
1469       nodenames = self.wanted
1470       missing = set(nodenames).difference(all_info.keys())
1471       if missing:
1472         raise errors.OpExecError(
1473           "Some nodes were removed before retrieving their data: %s" % missing)
1474     else:
1475       nodenames = all_info.keys()
1476     nodelist = [all_info[name] for name in nodenames]
1477
1478     # begin data gathering
1479
1480     if self.dynamic_fields.intersection(self.op.output_fields):
1481       live_data = {}
1482       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1483                                           self.cfg.GetHypervisorType())
1484       for name in nodenames:
1485         nodeinfo = node_data.get(name, None)
1486         if nodeinfo:
1487           live_data[name] = {
1488             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1489             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1490             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1491             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1492             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1493             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1494             "bootid": nodeinfo['bootid'],
1495             }
1496         else:
1497           live_data[name] = {}
1498     else:
1499       live_data = dict.fromkeys(nodenames, {})
1500
1501     node_to_primary = dict([(name, set()) for name in nodenames])
1502     node_to_secondary = dict([(name, set()) for name in nodenames])
1503
1504     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1505                              "sinst_cnt", "sinst_list"))
1506     if inst_fields & frozenset(self.op.output_fields):
1507       instancelist = self.cfg.GetInstanceList()
1508
1509       for instance_name in instancelist:
1510         inst = self.cfg.GetInstanceInfo(instance_name)
1511         if inst.primary_node in node_to_primary:
1512           node_to_primary[inst.primary_node].add(inst.name)
1513         for secnode in inst.secondary_nodes:
1514           if secnode in node_to_secondary:
1515             node_to_secondary[secnode].add(inst.name)
1516
1517     # end data gathering
1518
1519     output = []
1520     for node in nodelist:
1521       node_output = []
1522       for field in self.op.output_fields:
1523         if field == "name":
1524           val = node.name
1525         elif field == "pinst_list":
1526           val = list(node_to_primary[node.name])
1527         elif field == "sinst_list":
1528           val = list(node_to_secondary[node.name])
1529         elif field == "pinst_cnt":
1530           val = len(node_to_primary[node.name])
1531         elif field == "sinst_cnt":
1532           val = len(node_to_secondary[node.name])
1533         elif field == "pip":
1534           val = node.primary_ip
1535         elif field == "sip":
1536           val = node.secondary_ip
1537         elif field == "tags":
1538           val = list(node.GetTags())
1539         elif field == "serial_no":
1540           val = node.serial_no
1541         elif field in self.dynamic_fields:
1542           val = live_data[node.name].get(field, None)
1543         else:
1544           raise errors.ParameterError(field)
1545         node_output.append(val)
1546       output.append(node_output)
1547
1548     return output
1549
1550
1551 class LUQueryNodeVolumes(NoHooksLU):
1552   """Logical unit for getting volumes on node(s).
1553
1554   """
1555   _OP_REQP = ["nodes", "output_fields"]
1556   REQ_BGL = False
1557
1558   def ExpandNames(self):
1559     _CheckOutputFields(static=["node"],
1560                        dynamic=["phys", "vg", "name", "size", "instance"],
1561                        selected=self.op.output_fields)
1562
1563     self.needed_locks = {}
1564     self.share_locks[locking.LEVEL_NODE] = 1
1565     if not self.op.nodes:
1566       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1567     else:
1568       self.needed_locks[locking.LEVEL_NODE] = \
1569         _GetWantedNodes(self, self.op.nodes)
1570
1571   def CheckPrereq(self):
1572     """Check prerequisites.
1573
1574     This checks that the fields required are valid output fields.
1575
1576     """
1577     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1578
1579   def Exec(self, feedback_fn):
1580     """Computes the list of nodes and their attributes.
1581
1582     """
1583     nodenames = self.nodes
1584     volumes = self.rpc.call_node_volumes(nodenames)
1585
1586     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1587              in self.cfg.GetInstanceList()]
1588
1589     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1590
1591     output = []
1592     for node in nodenames:
1593       if node not in volumes or not volumes[node]:
1594         continue
1595
1596       node_vols = volumes[node][:]
1597       node_vols.sort(key=lambda vol: vol['dev'])
1598
1599       for vol in node_vols:
1600         node_output = []
1601         for field in self.op.output_fields:
1602           if field == "node":
1603             val = node
1604           elif field == "phys":
1605             val = vol['dev']
1606           elif field == "vg":
1607             val = vol['vg']
1608           elif field == "name":
1609             val = vol['name']
1610           elif field == "size":
1611             val = int(float(vol['size']))
1612           elif field == "instance":
1613             for inst in ilist:
1614               if node not in lv_by_node[inst]:
1615                 continue
1616               if vol['name'] in lv_by_node[inst][node]:
1617                 val = inst.name
1618                 break
1619             else:
1620               val = '-'
1621           else:
1622             raise errors.ParameterError(field)
1623           node_output.append(str(val))
1624
1625         output.append(node_output)
1626
1627     return output
1628
1629
1630 class LUAddNode(LogicalUnit):
1631   """Logical unit for adding node to the cluster.
1632
1633   """
1634   HPATH = "node-add"
1635   HTYPE = constants.HTYPE_NODE
1636   _OP_REQP = ["node_name"]
1637
1638   def BuildHooksEnv(self):
1639     """Build hooks env.
1640
1641     This will run on all nodes before, and on all nodes + the new node after.
1642
1643     """
1644     env = {
1645       "OP_TARGET": self.op.node_name,
1646       "NODE_NAME": self.op.node_name,
1647       "NODE_PIP": self.op.primary_ip,
1648       "NODE_SIP": self.op.secondary_ip,
1649       }
1650     nodes_0 = self.cfg.GetNodeList()
1651     nodes_1 = nodes_0 + [self.op.node_name, ]
1652     return env, nodes_0, nodes_1
1653
1654   def CheckPrereq(self):
1655     """Check prerequisites.
1656
1657     This checks:
1658      - the new node is not already in the config
1659      - it is resolvable
1660      - its parameters (single/dual homed) matches the cluster
1661
1662     Any errors are signalled by raising errors.OpPrereqError.
1663
1664     """
1665     node_name = self.op.node_name
1666     cfg = self.cfg
1667
1668     dns_data = utils.HostInfo(node_name)
1669
1670     node = dns_data.name
1671     primary_ip = self.op.primary_ip = dns_data.ip
1672     secondary_ip = getattr(self.op, "secondary_ip", None)
1673     if secondary_ip is None:
1674       secondary_ip = primary_ip
1675     if not utils.IsValidIP(secondary_ip):
1676       raise errors.OpPrereqError("Invalid secondary IP given")
1677     self.op.secondary_ip = secondary_ip
1678
1679     node_list = cfg.GetNodeList()
1680     if not self.op.readd and node in node_list:
1681       raise errors.OpPrereqError("Node %s is already in the configuration" %
1682                                  node)
1683     elif self.op.readd and node not in node_list:
1684       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1685
1686     for existing_node_name in node_list:
1687       existing_node = cfg.GetNodeInfo(existing_node_name)
1688
1689       if self.op.readd and node == existing_node_name:
1690         if (existing_node.primary_ip != primary_ip or
1691             existing_node.secondary_ip != secondary_ip):
1692           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1693                                      " address configuration as before")
1694         continue
1695
1696       if (existing_node.primary_ip == primary_ip or
1697           existing_node.secondary_ip == primary_ip or
1698           existing_node.primary_ip == secondary_ip or
1699           existing_node.secondary_ip == secondary_ip):
1700         raise errors.OpPrereqError("New node ip address(es) conflict with"
1701                                    " existing node %s" % existing_node.name)
1702
1703     # check that the type of the node (single versus dual homed) is the
1704     # same as for the master
1705     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1706     master_singlehomed = myself.secondary_ip == myself.primary_ip
1707     newbie_singlehomed = secondary_ip == primary_ip
1708     if master_singlehomed != newbie_singlehomed:
1709       if master_singlehomed:
1710         raise errors.OpPrereqError("The master has no private ip but the"
1711                                    " new node has one")
1712       else:
1713         raise errors.OpPrereqError("The master has a private ip but the"
1714                                    " new node doesn't have one")
1715
1716     # checks reachablity
1717     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1718       raise errors.OpPrereqError("Node not reachable by ping")
1719
1720     if not newbie_singlehomed:
1721       # check reachability from my secondary ip to newbie's secondary ip
1722       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1723                            source=myself.secondary_ip):
1724         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1725                                    " based ping to noded port")
1726
1727     self.new_node = objects.Node(name=node,
1728                                  primary_ip=primary_ip,
1729                                  secondary_ip=secondary_ip)
1730
1731   def Exec(self, feedback_fn):
1732     """Adds the new node to the cluster.
1733
1734     """
1735     new_node = self.new_node
1736     node = new_node.name
1737
1738     # check connectivity
1739     result = self.rpc.call_version([node])[node]
1740     if result:
1741       if constants.PROTOCOL_VERSION == result:
1742         logger.Info("communication to node %s fine, sw version %s match" %
1743                     (node, result))
1744       else:
1745         raise errors.OpExecError("Version mismatch master version %s,"
1746                                  " node version %s" %
1747                                  (constants.PROTOCOL_VERSION, result))
1748     else:
1749       raise errors.OpExecError("Cannot get version from the new node")
1750
1751     # setup ssh on node
1752     logger.Info("copy ssh key to node %s" % node)
1753     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1754     keyarray = []
1755     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1756                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1757                 priv_key, pub_key]
1758
1759     for i in keyfiles:
1760       f = open(i, 'r')
1761       try:
1762         keyarray.append(f.read())
1763       finally:
1764         f.close()
1765
1766     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1767                                     keyarray[2],
1768                                     keyarray[3], keyarray[4], keyarray[5])
1769
1770     if not result:
1771       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1772
1773     # Add node to our /etc/hosts, and add key to known_hosts
1774     utils.AddHostToEtcHosts(new_node.name)
1775
1776     if new_node.secondary_ip != new_node.primary_ip:
1777       if not self.rpc.call_node_has_ip_address(new_node.name,
1778                                                new_node.secondary_ip):
1779         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1780                                  " you gave (%s). Please fix and re-run this"
1781                                  " command." % new_node.secondary_ip)
1782
1783     node_verify_list = [self.cfg.GetMasterNode()]
1784     node_verify_param = {
1785       'nodelist': [node],
1786       # TODO: do a node-net-test as well?
1787     }
1788
1789     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1790                                        self.cfg.GetClusterName())
1791     for verifier in node_verify_list:
1792       if not result[verifier]:
1793         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1794                                  " for remote verification" % verifier)
1795       if result[verifier]['nodelist']:
1796         for failed in result[verifier]['nodelist']:
1797           feedback_fn("ssh/hostname verification failed %s -> %s" %
1798                       (verifier, result[verifier]['nodelist'][failed]))
1799         raise errors.OpExecError("ssh/hostname verification failed.")
1800
1801     # Distribute updated /etc/hosts and known_hosts to all nodes,
1802     # including the node just added
1803     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1804     dist_nodes = self.cfg.GetNodeList()
1805     if not self.op.readd:
1806       dist_nodes.append(node)
1807     if myself.name in dist_nodes:
1808       dist_nodes.remove(myself.name)
1809
1810     logger.Debug("Copying hosts and known_hosts to all nodes")
1811     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1812       result = self.rpc.call_upload_file(dist_nodes, fname)
1813       for to_node in dist_nodes:
1814         if not result[to_node]:
1815           logger.Error("copy of file %s to node %s failed" %
1816                        (fname, to_node))
1817
1818     to_copy = []
1819     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1820       to_copy.append(constants.VNC_PASSWORD_FILE)
1821     for fname in to_copy:
1822       result = self.rpc.call_upload_file([node], fname)
1823       if not result[node]:
1824         logger.Error("could not copy file %s to node %s" % (fname, node))
1825
1826     if self.op.readd:
1827       self.context.ReaddNode(new_node)
1828     else:
1829       self.context.AddNode(new_node)
1830
1831
1832 class LUQueryClusterInfo(NoHooksLU):
1833   """Query cluster configuration.
1834
1835   """
1836   _OP_REQP = []
1837   REQ_MASTER = False
1838   REQ_BGL = False
1839
1840   def ExpandNames(self):
1841     self.needed_locks = {}
1842
1843   def CheckPrereq(self):
1844     """No prerequsites needed for this LU.
1845
1846     """
1847     pass
1848
1849   def Exec(self, feedback_fn):
1850     """Return cluster config.
1851
1852     """
1853     result = {
1854       "name": self.cfg.GetClusterName(),
1855       "software_version": constants.RELEASE_VERSION,
1856       "protocol_version": constants.PROTOCOL_VERSION,
1857       "config_version": constants.CONFIG_VERSION,
1858       "os_api_version": constants.OS_API_VERSION,
1859       "export_version": constants.EXPORT_VERSION,
1860       "master": self.cfg.GetMasterNode(),
1861       "architecture": (platform.architecture()[0], platform.machine()),
1862       "hypervisor_type": self.cfg.GetHypervisorType(),
1863       "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1864       }
1865
1866     return result
1867
1868
1869 class LUQueryConfigValues(NoHooksLU):
1870   """Return configuration values.
1871
1872   """
1873   _OP_REQP = []
1874   REQ_BGL = False
1875
1876   def ExpandNames(self):
1877     self.needed_locks = {}
1878
1879     static_fields = ["cluster_name", "master_node"]
1880     _CheckOutputFields(static=static_fields,
1881                        dynamic=[],
1882                        selected=self.op.output_fields)
1883
1884   def CheckPrereq(self):
1885     """No prerequisites.
1886
1887     """
1888     pass
1889
1890   def Exec(self, feedback_fn):
1891     """Dump a representation of the cluster config to the standard output.
1892
1893     """
1894     values = []
1895     for field in self.op.output_fields:
1896       if field == "cluster_name":
1897         values.append(self.cfg.GetClusterName())
1898       elif field == "master_node":
1899         values.append(self.cfg.GetMasterNode())
1900       else:
1901         raise errors.ParameterError(field)
1902     return values
1903
1904
1905 class LUActivateInstanceDisks(NoHooksLU):
1906   """Bring up an instance's disks.
1907
1908   """
1909   _OP_REQP = ["instance_name"]
1910   REQ_BGL = False
1911
1912   def ExpandNames(self):
1913     self._ExpandAndLockInstance()
1914     self.needed_locks[locking.LEVEL_NODE] = []
1915     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1916
1917   def DeclareLocks(self, level):
1918     if level == locking.LEVEL_NODE:
1919       self._LockInstancesNodes()
1920
1921   def CheckPrereq(self):
1922     """Check prerequisites.
1923
1924     This checks that the instance is in the cluster.
1925
1926     """
1927     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1928     assert self.instance is not None, \
1929       "Cannot retrieve locked instance %s" % self.op.instance_name
1930
1931   def Exec(self, feedback_fn):
1932     """Activate the disks.
1933
1934     """
1935     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1936     if not disks_ok:
1937       raise errors.OpExecError("Cannot activate block devices")
1938
1939     return disks_info
1940
1941
1942 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
1943   """Prepare the block devices for an instance.
1944
1945   This sets up the block devices on all nodes.
1946
1947   Args:
1948     instance: a ganeti.objects.Instance object
1949     ignore_secondaries: if true, errors on secondary nodes won't result
1950                         in an error return from the function
1951
1952   Returns:
1953     false if the operation failed
1954     list of (host, instance_visible_name, node_visible_name) if the operation
1955          suceeded with the mapping from node devices to instance devices
1956   """
1957   device_info = []
1958   disks_ok = True
1959   iname = instance.name
1960   # With the two passes mechanism we try to reduce the window of
1961   # opportunity for the race condition of switching DRBD to primary
1962   # before handshaking occured, but we do not eliminate it
1963
1964   # The proper fix would be to wait (with some limits) until the
1965   # connection has been made and drbd transitions from WFConnection
1966   # into any other network-connected state (Connected, SyncTarget,
1967   # SyncSource, etc.)
1968
1969   # 1st pass, assemble on all nodes in secondary mode
1970   for inst_disk in instance.disks:
1971     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1972       lu.cfg.SetDiskID(node_disk, node)
1973       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
1974       if not result:
1975         logger.Error("could not prepare block device %s on node %s"
1976                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1977         if not ignore_secondaries:
1978           disks_ok = False
1979
1980   # FIXME: race condition on drbd migration to primary
1981
1982   # 2nd pass, do only the primary node
1983   for inst_disk in instance.disks:
1984     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1985       if node != instance.primary_node:
1986         continue
1987       lu.cfg.SetDiskID(node_disk, node)
1988       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
1989       if not result:
1990         logger.Error("could not prepare block device %s on node %s"
1991                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1992         disks_ok = False
1993     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1994
1995   # leave the disks configured for the primary node
1996   # this is a workaround that would be fixed better by
1997   # improving the logical/physical id handling
1998   for disk in instance.disks:
1999     lu.cfg.SetDiskID(disk, instance.primary_node)
2000
2001   return disks_ok, device_info
2002
2003
2004 def _StartInstanceDisks(lu, instance, force):
2005   """Start the disks of an instance.
2006
2007   """
2008   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2009                                            ignore_secondaries=force)
2010   if not disks_ok:
2011     _ShutdownInstanceDisks(lu, instance)
2012     if force is not None and not force:
2013       logger.Error("If the message above refers to a secondary node,"
2014                    " you can retry the operation using '--force'.")
2015     raise errors.OpExecError("Disk consistency error")
2016
2017
2018 class LUDeactivateInstanceDisks(NoHooksLU):
2019   """Shutdown an instance's disks.
2020
2021   """
2022   _OP_REQP = ["instance_name"]
2023   REQ_BGL = False
2024
2025   def ExpandNames(self):
2026     self._ExpandAndLockInstance()
2027     self.needed_locks[locking.LEVEL_NODE] = []
2028     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2029
2030   def DeclareLocks(self, level):
2031     if level == locking.LEVEL_NODE:
2032       self._LockInstancesNodes()
2033
2034   def CheckPrereq(self):
2035     """Check prerequisites.
2036
2037     This checks that the instance is in the cluster.
2038
2039     """
2040     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2041     assert self.instance is not None, \
2042       "Cannot retrieve locked instance %s" % self.op.instance_name
2043
2044   def Exec(self, feedback_fn):
2045     """Deactivate the disks
2046
2047     """
2048     instance = self.instance
2049     _SafeShutdownInstanceDisks(self, instance)
2050
2051
2052 def _SafeShutdownInstanceDisks(lu, instance):
2053   """Shutdown block devices of an instance.
2054
2055   This function checks if an instance is running, before calling
2056   _ShutdownInstanceDisks.
2057
2058   """
2059   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2060                                       [instance.hypervisor])
2061   ins_l = ins_l[instance.primary_node]
2062   if not type(ins_l) is list:
2063     raise errors.OpExecError("Can't contact node '%s'" %
2064                              instance.primary_node)
2065
2066   if instance.name in ins_l:
2067     raise errors.OpExecError("Instance is running, can't shutdown"
2068                              " block devices.")
2069
2070   _ShutdownInstanceDisks(lu, instance)
2071
2072
2073 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2074   """Shutdown block devices of an instance.
2075
2076   This does the shutdown on all nodes of the instance.
2077
2078   If the ignore_primary is false, errors on the primary node are
2079   ignored.
2080
2081   """
2082   result = True
2083   for disk in instance.disks:
2084     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2085       lu.cfg.SetDiskID(top_disk, node)
2086       if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2087         logger.Error("could not shutdown block device %s on node %s" %
2088                      (disk.iv_name, node))
2089         if not ignore_primary or node != instance.primary_node:
2090           result = False
2091   return result
2092
2093
2094 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2095   """Checks if a node has enough free memory.
2096
2097   This function check if a given node has the needed amount of free
2098   memory. In case the node has less memory or we cannot get the
2099   information from the node, this function raise an OpPrereqError
2100   exception.
2101
2102   @type lu: C{LogicalUnit}
2103   @param lu: a logical unit from which we get configuration data
2104   @type node: C{str}
2105   @param node: the node to check
2106   @type reason: C{str}
2107   @param reason: string to use in the error message
2108   @type requested: C{int}
2109   @param requested: the amount of memory in MiB to check for
2110   @type hypervisor: C{str}
2111   @param hypervisor: the hypervisor to ask for memory stats
2112   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2113       we cannot check the node
2114
2115   """
2116   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2117   if not nodeinfo or not isinstance(nodeinfo, dict):
2118     raise errors.OpPrereqError("Could not contact node %s for resource"
2119                              " information" % (node,))
2120
2121   free_mem = nodeinfo[node].get('memory_free')
2122   if not isinstance(free_mem, int):
2123     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2124                              " was '%s'" % (node, free_mem))
2125   if requested > free_mem:
2126     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2127                              " needed %s MiB, available %s MiB" %
2128                              (node, reason, requested, free_mem))
2129
2130
2131 class LUStartupInstance(LogicalUnit):
2132   """Starts an instance.
2133
2134   """
2135   HPATH = "instance-start"
2136   HTYPE = constants.HTYPE_INSTANCE
2137   _OP_REQP = ["instance_name", "force"]
2138   REQ_BGL = False
2139
2140   def ExpandNames(self):
2141     self._ExpandAndLockInstance()
2142     self.needed_locks[locking.LEVEL_NODE] = []
2143     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2144
2145   def DeclareLocks(self, level):
2146     if level == locking.LEVEL_NODE:
2147       self._LockInstancesNodes()
2148
2149   def BuildHooksEnv(self):
2150     """Build hooks env.
2151
2152     This runs on master, primary and secondary nodes of the instance.
2153
2154     """
2155     env = {
2156       "FORCE": self.op.force,
2157       }
2158     env.update(_BuildInstanceHookEnvByObject(self.instance))
2159     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2160           list(self.instance.secondary_nodes))
2161     return env, nl, nl
2162
2163   def CheckPrereq(self):
2164     """Check prerequisites.
2165
2166     This checks that the instance is in the cluster.
2167
2168     """
2169     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2170     assert self.instance is not None, \
2171       "Cannot retrieve locked instance %s" % self.op.instance_name
2172
2173     # check bridges existance
2174     _CheckInstanceBridgesExist(self, instance)
2175
2176     _CheckNodeFreeMemory(self, instance.primary_node,
2177                          "starting instance %s" % instance.name,
2178                          instance.memory, instance.hypervisor)
2179
2180   def Exec(self, feedback_fn):
2181     """Start the instance.
2182
2183     """
2184     instance = self.instance
2185     force = self.op.force
2186     extra_args = getattr(self.op, "extra_args", "")
2187
2188     self.cfg.MarkInstanceUp(instance.name)
2189
2190     node_current = instance.primary_node
2191
2192     _StartInstanceDisks(self, instance, force)
2193
2194     if not self.rpc.call_instance_start(node_current, instance, extra_args):
2195       _ShutdownInstanceDisks(self, instance)
2196       raise errors.OpExecError("Could not start instance")
2197
2198
2199 class LURebootInstance(LogicalUnit):
2200   """Reboot an instance.
2201
2202   """
2203   HPATH = "instance-reboot"
2204   HTYPE = constants.HTYPE_INSTANCE
2205   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2206   REQ_BGL = False
2207
2208   def ExpandNames(self):
2209     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2210                                    constants.INSTANCE_REBOOT_HARD,
2211                                    constants.INSTANCE_REBOOT_FULL]:
2212       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2213                                   (constants.INSTANCE_REBOOT_SOFT,
2214                                    constants.INSTANCE_REBOOT_HARD,
2215                                    constants.INSTANCE_REBOOT_FULL))
2216     self._ExpandAndLockInstance()
2217     self.needed_locks[locking.LEVEL_NODE] = []
2218     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2219
2220   def DeclareLocks(self, level):
2221     if level == locking.LEVEL_NODE:
2222       primary_only = not constants.INSTANCE_REBOOT_FULL
2223       self._LockInstancesNodes(primary_only=primary_only)
2224
2225   def BuildHooksEnv(self):
2226     """Build hooks env.
2227
2228     This runs on master, primary and secondary nodes of the instance.
2229
2230     """
2231     env = {
2232       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2233       }
2234     env.update(_BuildInstanceHookEnvByObject(self.instance))
2235     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2236           list(self.instance.secondary_nodes))
2237     return env, nl, nl
2238
2239   def CheckPrereq(self):
2240     """Check prerequisites.
2241
2242     This checks that the instance is in the cluster.
2243
2244     """
2245     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2246     assert self.instance is not None, \
2247       "Cannot retrieve locked instance %s" % self.op.instance_name
2248
2249     # check bridges existance
2250     _CheckInstanceBridgesExist(self, instance)
2251
2252   def Exec(self, feedback_fn):
2253     """Reboot the instance.
2254
2255     """
2256     instance = self.instance
2257     ignore_secondaries = self.op.ignore_secondaries
2258     reboot_type = self.op.reboot_type
2259     extra_args = getattr(self.op, "extra_args", "")
2260
2261     node_current = instance.primary_node
2262
2263     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2264                        constants.INSTANCE_REBOOT_HARD]:
2265       if not self.rpc.call_instance_reboot(node_current, instance,
2266                                            reboot_type, extra_args):
2267         raise errors.OpExecError("Could not reboot instance")
2268     else:
2269       if not self.rpc.call_instance_shutdown(node_current, instance):
2270         raise errors.OpExecError("could not shutdown instance for full reboot")
2271       _ShutdownInstanceDisks(self, instance)
2272       _StartInstanceDisks(self, instance, ignore_secondaries)
2273       if not self.rpc.call_instance_start(node_current, instance, extra_args):
2274         _ShutdownInstanceDisks(self, instance)
2275         raise errors.OpExecError("Could not start instance for full reboot")
2276
2277     self.cfg.MarkInstanceUp(instance.name)
2278
2279
2280 class LUShutdownInstance(LogicalUnit):
2281   """Shutdown an instance.
2282
2283   """
2284   HPATH = "instance-stop"
2285   HTYPE = constants.HTYPE_INSTANCE
2286   _OP_REQP = ["instance_name"]
2287   REQ_BGL = False
2288
2289   def ExpandNames(self):
2290     self._ExpandAndLockInstance()
2291     self.needed_locks[locking.LEVEL_NODE] = []
2292     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2293
2294   def DeclareLocks(self, level):
2295     if level == locking.LEVEL_NODE:
2296       self._LockInstancesNodes()
2297
2298   def BuildHooksEnv(self):
2299     """Build hooks env.
2300
2301     This runs on master, primary and secondary nodes of the instance.
2302
2303     """
2304     env = _BuildInstanceHookEnvByObject(self.instance)
2305     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2306           list(self.instance.secondary_nodes))
2307     return env, nl, nl
2308
2309   def CheckPrereq(self):
2310     """Check prerequisites.
2311
2312     This checks that the instance is in the cluster.
2313
2314     """
2315     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2316     assert self.instance is not None, \
2317       "Cannot retrieve locked instance %s" % self.op.instance_name
2318
2319   def Exec(self, feedback_fn):
2320     """Shutdown the instance.
2321
2322     """
2323     instance = self.instance
2324     node_current = instance.primary_node
2325     self.cfg.MarkInstanceDown(instance.name)
2326     if not self.rpc.call_instance_shutdown(node_current, instance):
2327       logger.Error("could not shutdown instance")
2328
2329     _ShutdownInstanceDisks(self, instance)
2330
2331
2332 class LUReinstallInstance(LogicalUnit):
2333   """Reinstall an instance.
2334
2335   """
2336   HPATH = "instance-reinstall"
2337   HTYPE = constants.HTYPE_INSTANCE
2338   _OP_REQP = ["instance_name"]
2339   REQ_BGL = False
2340
2341   def ExpandNames(self):
2342     self._ExpandAndLockInstance()
2343     self.needed_locks[locking.LEVEL_NODE] = []
2344     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2345
2346   def DeclareLocks(self, level):
2347     if level == locking.LEVEL_NODE:
2348       self._LockInstancesNodes()
2349
2350   def BuildHooksEnv(self):
2351     """Build hooks env.
2352
2353     This runs on master, primary and secondary nodes of the instance.
2354
2355     """
2356     env = _BuildInstanceHookEnvByObject(self.instance)
2357     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2358           list(self.instance.secondary_nodes))
2359     return env, nl, nl
2360
2361   def CheckPrereq(self):
2362     """Check prerequisites.
2363
2364     This checks that the instance is in the cluster and is not running.
2365
2366     """
2367     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2368     assert instance is not None, \
2369       "Cannot retrieve locked instance %s" % self.op.instance_name
2370
2371     if instance.disk_template == constants.DT_DISKLESS:
2372       raise errors.OpPrereqError("Instance '%s' has no disks" %
2373                                  self.op.instance_name)
2374     if instance.status != "down":
2375       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2376                                  self.op.instance_name)
2377     remote_info = self.rpc.call_instance_info(instance.primary_node,
2378                                               instance.name,
2379                                               instance.hypervisor)
2380     if remote_info:
2381       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2382                                  (self.op.instance_name,
2383                                   instance.primary_node))
2384
2385     self.op.os_type = getattr(self.op, "os_type", None)
2386     if self.op.os_type is not None:
2387       # OS verification
2388       pnode = self.cfg.GetNodeInfo(
2389         self.cfg.ExpandNodeName(instance.primary_node))
2390       if pnode is None:
2391         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2392                                    self.op.pnode)
2393       os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2394       if not os_obj:
2395         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2396                                    " primary node"  % self.op.os_type)
2397
2398     self.instance = instance
2399
2400   def Exec(self, feedback_fn):
2401     """Reinstall the instance.
2402
2403     """
2404     inst = self.instance
2405
2406     if self.op.os_type is not None:
2407       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2408       inst.os = self.op.os_type
2409       self.cfg.Update(inst)
2410
2411     _StartInstanceDisks(self, inst, None)
2412     try:
2413       feedback_fn("Running the instance OS create scripts...")
2414       if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2415                                            "sda", "sdb"):
2416         raise errors.OpExecError("Could not install OS for instance %s"
2417                                  " on node %s" %
2418                                  (inst.name, inst.primary_node))
2419     finally:
2420       _ShutdownInstanceDisks(self, inst)
2421
2422
2423 class LURenameInstance(LogicalUnit):
2424   """Rename an instance.
2425
2426   """
2427   HPATH = "instance-rename"
2428   HTYPE = constants.HTYPE_INSTANCE
2429   _OP_REQP = ["instance_name", "new_name"]
2430
2431   def BuildHooksEnv(self):
2432     """Build hooks env.
2433
2434     This runs on master, primary and secondary nodes of the instance.
2435
2436     """
2437     env = _BuildInstanceHookEnvByObject(self.instance)
2438     env["INSTANCE_NEW_NAME"] = self.op.new_name
2439     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2440           list(self.instance.secondary_nodes))
2441     return env, nl, nl
2442
2443   def CheckPrereq(self):
2444     """Check prerequisites.
2445
2446     This checks that the instance is in the cluster and is not running.
2447
2448     """
2449     instance = self.cfg.GetInstanceInfo(
2450       self.cfg.ExpandInstanceName(self.op.instance_name))
2451     if instance is None:
2452       raise errors.OpPrereqError("Instance '%s' not known" %
2453                                  self.op.instance_name)
2454     if instance.status != "down":
2455       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2456                                  self.op.instance_name)
2457     remote_info = self.rpc.call_instance_info(instance.primary_node,
2458                                               instance.name,
2459                                               instance.hypervisor)
2460     if remote_info:
2461       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2462                                  (self.op.instance_name,
2463                                   instance.primary_node))
2464     self.instance = instance
2465
2466     # new name verification
2467     name_info = utils.HostInfo(self.op.new_name)
2468
2469     self.op.new_name = new_name = name_info.name
2470     instance_list = self.cfg.GetInstanceList()
2471     if new_name in instance_list:
2472       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2473                                  new_name)
2474
2475     if not getattr(self.op, "ignore_ip", False):
2476       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2477         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2478                                    (name_info.ip, new_name))
2479
2480
2481   def Exec(self, feedback_fn):
2482     """Reinstall the instance.
2483
2484     """
2485     inst = self.instance
2486     old_name = inst.name
2487
2488     if inst.disk_template == constants.DT_FILE:
2489       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2490
2491     self.cfg.RenameInstance(inst.name, self.op.new_name)
2492     # Change the instance lock. This is definitely safe while we hold the BGL
2493     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2494     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2495
2496     # re-read the instance from the configuration after rename
2497     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2498
2499     if inst.disk_template == constants.DT_FILE:
2500       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2501       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2502                                                      old_file_storage_dir,
2503                                                      new_file_storage_dir)
2504
2505       if not result:
2506         raise errors.OpExecError("Could not connect to node '%s' to rename"
2507                                  " directory '%s' to '%s' (but the instance"
2508                                  " has been renamed in Ganeti)" % (
2509                                  inst.primary_node, old_file_storage_dir,
2510                                  new_file_storage_dir))
2511
2512       if not result[0]:
2513         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2514                                  " (but the instance has been renamed in"
2515                                  " Ganeti)" % (old_file_storage_dir,
2516                                                new_file_storage_dir))
2517
2518     _StartInstanceDisks(self, inst, None)
2519     try:
2520       if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2521                                                old_name,
2522                                                "sda", "sdb"):
2523         msg = ("Could not run OS rename script for instance %s on node %s"
2524                " (but the instance has been renamed in Ganeti)" %
2525                (inst.name, inst.primary_node))
2526         logger.Error(msg)
2527     finally:
2528       _ShutdownInstanceDisks(self, inst)
2529
2530
2531 class LURemoveInstance(LogicalUnit):
2532   """Remove an instance.
2533
2534   """
2535   HPATH = "instance-remove"
2536   HTYPE = constants.HTYPE_INSTANCE
2537   _OP_REQP = ["instance_name", "ignore_failures"]
2538   REQ_BGL = False
2539
2540   def ExpandNames(self):
2541     self._ExpandAndLockInstance()
2542     self.needed_locks[locking.LEVEL_NODE] = []
2543     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2544
2545   def DeclareLocks(self, level):
2546     if level == locking.LEVEL_NODE:
2547       self._LockInstancesNodes()
2548
2549   def BuildHooksEnv(self):
2550     """Build hooks env.
2551
2552     This runs on master, primary and secondary nodes of the instance.
2553
2554     """
2555     env = _BuildInstanceHookEnvByObject(self.instance)
2556     nl = [self.cfg.GetMasterNode()]
2557     return env, nl, nl
2558
2559   def CheckPrereq(self):
2560     """Check prerequisites.
2561
2562     This checks that the instance is in the cluster.
2563
2564     """
2565     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2566     assert self.instance is not None, \
2567       "Cannot retrieve locked instance %s" % self.op.instance_name
2568
2569   def Exec(self, feedback_fn):
2570     """Remove the instance.
2571
2572     """
2573     instance = self.instance
2574     logger.Info("shutting down instance %s on node %s" %
2575                 (instance.name, instance.primary_node))
2576
2577     if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2578       if self.op.ignore_failures:
2579         feedback_fn("Warning: can't shutdown instance")
2580       else:
2581         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2582                                  (instance.name, instance.primary_node))
2583
2584     logger.Info("removing block devices for instance %s" % instance.name)
2585
2586     if not _RemoveDisks(self, instance):
2587       if self.op.ignore_failures:
2588         feedback_fn("Warning: can't remove instance's disks")
2589       else:
2590         raise errors.OpExecError("Can't remove instance's disks")
2591
2592     logger.Info("removing instance %s out of cluster config" % instance.name)
2593
2594     self.cfg.RemoveInstance(instance.name)
2595     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2596
2597
2598 class LUQueryInstances(NoHooksLU):
2599   """Logical unit for querying instances.
2600
2601   """
2602   _OP_REQP = ["output_fields", "names"]
2603   REQ_BGL = False
2604
2605   def ExpandNames(self):
2606     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2607     self.static_fields = frozenset([
2608       "name", "os", "pnode", "snodes",
2609       "admin_state", "admin_ram",
2610       "disk_template", "ip", "mac", "bridge",
2611       "sda_size", "sdb_size", "vcpus", "tags",
2612       "network_port", "kernel_path", "initrd_path",
2613       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2614       "hvm_cdrom_image_path", "hvm_nic_type",
2615       "hvm_disk_type", "vnc_bind_address",
2616       "serial_no", "hypervisor",
2617       ])
2618     _CheckOutputFields(static=self.static_fields,
2619                        dynamic=self.dynamic_fields,
2620                        selected=self.op.output_fields)
2621
2622     self.needed_locks = {}
2623     self.share_locks[locking.LEVEL_INSTANCE] = 1
2624     self.share_locks[locking.LEVEL_NODE] = 1
2625
2626     if self.op.names:
2627       self.wanted = _GetWantedInstances(self, self.op.names)
2628     else:
2629       self.wanted = locking.ALL_SET
2630
2631     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2632     if self.do_locking:
2633       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2634       self.needed_locks[locking.LEVEL_NODE] = []
2635       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2636
2637   def DeclareLocks(self, level):
2638     if level == locking.LEVEL_NODE and self.do_locking:
2639       self._LockInstancesNodes()
2640
2641   def CheckPrereq(self):
2642     """Check prerequisites.
2643
2644     """
2645     pass
2646
2647   def Exec(self, feedback_fn):
2648     """Computes the list of nodes and their attributes.
2649
2650     """
2651     all_info = self.cfg.GetAllInstancesInfo()
2652     if self.do_locking:
2653       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2654     elif self.wanted != locking.ALL_SET:
2655       instance_names = self.wanted
2656       missing = set(instance_names).difference(all_info.keys())
2657       if missing:
2658         raise errors.OpExecError(
2659           "Some instances were removed before retrieving their data: %s"
2660           % missing)
2661     else:
2662       instance_names = all_info.keys()
2663     instance_list = [all_info[iname] for iname in instance_names]
2664
2665     # begin data gathering
2666
2667     nodes = frozenset([inst.primary_node for inst in instance_list])
2668     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2669
2670     bad_nodes = []
2671     if self.dynamic_fields.intersection(self.op.output_fields):
2672       live_data = {}
2673       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2674       for name in nodes:
2675         result = node_data[name]
2676         if result:
2677           live_data.update(result)
2678         elif result == False:
2679           bad_nodes.append(name)
2680         # else no instance is alive
2681     else:
2682       live_data = dict([(name, {}) for name in instance_names])
2683
2684     # end data gathering
2685
2686     output = []
2687     for instance in instance_list:
2688       iout = []
2689       for field in self.op.output_fields:
2690         if field == "name":
2691           val = instance.name
2692         elif field == "os":
2693           val = instance.os
2694         elif field == "pnode":
2695           val = instance.primary_node
2696         elif field == "snodes":
2697           val = list(instance.secondary_nodes)
2698         elif field == "admin_state":
2699           val = (instance.status != "down")
2700         elif field == "oper_state":
2701           if instance.primary_node in bad_nodes:
2702             val = None
2703           else:
2704             val = bool(live_data.get(instance.name))
2705         elif field == "status":
2706           if instance.primary_node in bad_nodes:
2707             val = "ERROR_nodedown"
2708           else:
2709             running = bool(live_data.get(instance.name))
2710             if running:
2711               if instance.status != "down":
2712                 val = "running"
2713               else:
2714                 val = "ERROR_up"
2715             else:
2716               if instance.status != "down":
2717                 val = "ERROR_down"
2718               else:
2719                 val = "ADMIN_down"
2720         elif field == "admin_ram":
2721           val = instance.memory
2722         elif field == "oper_ram":
2723           if instance.primary_node in bad_nodes:
2724             val = None
2725           elif instance.name in live_data:
2726             val = live_data[instance.name].get("memory", "?")
2727           else:
2728             val = "-"
2729         elif field == "disk_template":
2730           val = instance.disk_template
2731         elif field == "ip":
2732           val = instance.nics[0].ip
2733         elif field == "bridge":
2734           val = instance.nics[0].bridge
2735         elif field == "mac":
2736           val = instance.nics[0].mac
2737         elif field == "sda_size" or field == "sdb_size":
2738           disk = instance.FindDisk(field[:3])
2739           if disk is None:
2740             val = None
2741           else:
2742             val = disk.size
2743         elif field == "vcpus":
2744           val = instance.vcpus
2745         elif field == "tags":
2746           val = list(instance.GetTags())
2747         elif field == "serial_no":
2748           val = instance.serial_no
2749         elif field in ("network_port", "kernel_path", "initrd_path",
2750                        "hvm_boot_order", "hvm_acpi", "hvm_pae",
2751                        "hvm_cdrom_image_path", "hvm_nic_type",
2752                        "hvm_disk_type", "vnc_bind_address"):
2753           val = getattr(instance, field, None)
2754           if val is not None:
2755             pass
2756           elif field in ("hvm_nic_type", "hvm_disk_type",
2757                          "kernel_path", "initrd_path"):
2758             val = "default"
2759           else:
2760             val = "-"
2761         elif field == "hypervisor":
2762           val = instance.hypervisor
2763         else:
2764           raise errors.ParameterError(field)
2765         iout.append(val)
2766       output.append(iout)
2767
2768     return output
2769
2770
2771 class LUFailoverInstance(LogicalUnit):
2772   """Failover an instance.
2773
2774   """
2775   HPATH = "instance-failover"
2776   HTYPE = constants.HTYPE_INSTANCE
2777   _OP_REQP = ["instance_name", "ignore_consistency"]
2778   REQ_BGL = False
2779
2780   def ExpandNames(self):
2781     self._ExpandAndLockInstance()
2782     self.needed_locks[locking.LEVEL_NODE] = []
2783     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2784
2785   def DeclareLocks(self, level):
2786     if level == locking.LEVEL_NODE:
2787       self._LockInstancesNodes()
2788
2789   def BuildHooksEnv(self):
2790     """Build hooks env.
2791
2792     This runs on master, primary and secondary nodes of the instance.
2793
2794     """
2795     env = {
2796       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2797       }
2798     env.update(_BuildInstanceHookEnvByObject(self.instance))
2799     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2800     return env, nl, nl
2801
2802   def CheckPrereq(self):
2803     """Check prerequisites.
2804
2805     This checks that the instance is in the cluster.
2806
2807     """
2808     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2809     assert self.instance is not None, \
2810       "Cannot retrieve locked instance %s" % self.op.instance_name
2811
2812     if instance.disk_template not in constants.DTS_NET_MIRROR:
2813       raise errors.OpPrereqError("Instance's disk layout is not"
2814                                  " network mirrored, cannot failover.")
2815
2816     secondary_nodes = instance.secondary_nodes
2817     if not secondary_nodes:
2818       raise errors.ProgrammerError("no secondary node but using "
2819                                    "a mirrored disk template")
2820
2821     target_node = secondary_nodes[0]
2822     # check memory requirements on the secondary node
2823     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2824                          instance.name, instance.memory,
2825                          instance.hypervisor)
2826
2827     # check bridge existance
2828     brlist = [nic.bridge for nic in instance.nics]
2829     if not self.rpc.call_bridges_exist(target_node, brlist):
2830       raise errors.OpPrereqError("One or more target bridges %s does not"
2831                                  " exist on destination node '%s'" %
2832                                  (brlist, target_node))
2833
2834   def Exec(self, feedback_fn):
2835     """Failover an instance.
2836
2837     The failover is done by shutting it down on its present node and
2838     starting it on the secondary.
2839
2840     """
2841     instance = self.instance
2842
2843     source_node = instance.primary_node
2844     target_node = instance.secondary_nodes[0]
2845
2846     feedback_fn("* checking disk consistency between source and target")
2847     for dev in instance.disks:
2848       # for drbd, these are drbd over lvm
2849       if not _CheckDiskConsistency(self, dev, target_node, False):
2850         if instance.status == "up" and not self.op.ignore_consistency:
2851           raise errors.OpExecError("Disk %s is degraded on target node,"
2852                                    " aborting failover." % dev.iv_name)
2853
2854     feedback_fn("* shutting down instance on source node")
2855     logger.Info("Shutting down instance %s on node %s" %
2856                 (instance.name, source_node))
2857
2858     if not self.rpc.call_instance_shutdown(source_node, instance):
2859       if self.op.ignore_consistency:
2860         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2861                      " anyway. Please make sure node %s is down"  %
2862                      (instance.name, source_node, source_node))
2863       else:
2864         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2865                                  (instance.name, source_node))
2866
2867     feedback_fn("* deactivating the instance's disks on source node")
2868     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2869       raise errors.OpExecError("Can't shut down the instance's disks.")
2870
2871     instance.primary_node = target_node
2872     # distribute new instance config to the other nodes
2873     self.cfg.Update(instance)
2874
2875     # Only start the instance if it's marked as up
2876     if instance.status == "up":
2877       feedback_fn("* activating the instance's disks on target node")
2878       logger.Info("Starting instance %s on node %s" %
2879                   (instance.name, target_node))
2880
2881       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2882                                                ignore_secondaries=True)
2883       if not disks_ok:
2884         _ShutdownInstanceDisks(self, instance)
2885         raise errors.OpExecError("Can't activate the instance's disks")
2886
2887       feedback_fn("* starting the instance on the target node")
2888       if not self.rpc.call_instance_start(target_node, instance, None):
2889         _ShutdownInstanceDisks(self, instance)
2890         raise errors.OpExecError("Could not start instance %s on node %s." %
2891                                  (instance.name, target_node))
2892
2893
2894 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2895   """Create a tree of block devices on the primary node.
2896
2897   This always creates all devices.
2898
2899   """
2900   if device.children:
2901     for child in device.children:
2902       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2903         return False
2904
2905   lu.cfg.SetDiskID(device, node)
2906   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2907                                        instance.name, True, info)
2908   if not new_id:
2909     return False
2910   if device.physical_id is None:
2911     device.physical_id = new_id
2912   return True
2913
2914
2915 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2916   """Create a tree of block devices on a secondary node.
2917
2918   If this device type has to be created on secondaries, create it and
2919   all its children.
2920
2921   If not, just recurse to children keeping the same 'force' value.
2922
2923   """
2924   if device.CreateOnSecondary():
2925     force = True
2926   if device.children:
2927     for child in device.children:
2928       if not _CreateBlockDevOnSecondary(lu, node, instance,
2929                                         child, force, info):
2930         return False
2931
2932   if not force:
2933     return True
2934   lu.cfg.SetDiskID(device, node)
2935   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2936                                        instance.name, False, info)
2937   if not new_id:
2938     return False
2939   if device.physical_id is None:
2940     device.physical_id = new_id
2941   return True
2942
2943
2944 def _GenerateUniqueNames(lu, exts):
2945   """Generate a suitable LV name.
2946
2947   This will generate a logical volume name for the given instance.
2948
2949   """
2950   results = []
2951   for val in exts:
2952     new_id = lu.cfg.GenerateUniqueID()
2953     results.append("%s%s" % (new_id, val))
2954   return results
2955
2956
2957 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
2958                          p_minor, s_minor):
2959   """Generate a drbd8 device complete with its children.
2960
2961   """
2962   port = lu.cfg.AllocatePort()
2963   vgname = lu.cfg.GetVGName()
2964   shared_secret = lu.cfg.GenerateDRBDSecret()
2965   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2966                           logical_id=(vgname, names[0]))
2967   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2968                           logical_id=(vgname, names[1]))
2969   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2970                           logical_id=(primary, secondary, port,
2971                                       p_minor, s_minor,
2972                                       shared_secret),
2973                           children=[dev_data, dev_meta],
2974                           iv_name=iv_name)
2975   return drbd_dev
2976
2977
2978 def _GenerateDiskTemplate(lu, template_name,
2979                           instance_name, primary_node,
2980                           secondary_nodes, disk_sz, swap_sz,
2981                           file_storage_dir, file_driver):
2982   """Generate the entire disk layout for a given template type.
2983
2984   """
2985   #TODO: compute space requirements
2986
2987   vgname = lu.cfg.GetVGName()
2988   if template_name == constants.DT_DISKLESS:
2989     disks = []
2990   elif template_name == constants.DT_PLAIN:
2991     if len(secondary_nodes) != 0:
2992       raise errors.ProgrammerError("Wrong template configuration")
2993
2994     names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
2995     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2996                            logical_id=(vgname, names[0]),
2997                            iv_name = "sda")
2998     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2999                            logical_id=(vgname, names[1]),
3000                            iv_name = "sdb")
3001     disks = [sda_dev, sdb_dev]
3002   elif template_name == constants.DT_DRBD8:
3003     if len(secondary_nodes) != 1:
3004       raise errors.ProgrammerError("Wrong template configuration")
3005     remote_node = secondary_nodes[0]
3006     (minor_pa, minor_pb,
3007      minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3008       [primary_node, primary_node, remote_node, remote_node], instance_name)
3009
3010     names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3011                                       ".sdb_data", ".sdb_meta"])
3012     drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3013                                         disk_sz, names[0:2], "sda",
3014                                         minor_pa, minor_sa)
3015     drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3016                                         swap_sz, names[2:4], "sdb",
3017                                         minor_pb, minor_sb)
3018     disks = [drbd_sda_dev, drbd_sdb_dev]
3019   elif template_name == constants.DT_FILE:
3020     if len(secondary_nodes) != 0:
3021       raise errors.ProgrammerError("Wrong template configuration")
3022
3023     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3024                                 iv_name="sda", logical_id=(file_driver,
3025                                 "%s/sda" % file_storage_dir))
3026     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3027                                 iv_name="sdb", logical_id=(file_driver,
3028                                 "%s/sdb" % file_storage_dir))
3029     disks = [file_sda_dev, file_sdb_dev]
3030   else:
3031     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3032   return disks
3033
3034
3035 def _GetInstanceInfoText(instance):
3036   """Compute that text that should be added to the disk's metadata.
3037
3038   """
3039   return "originstname+%s" % instance.name
3040
3041
3042 def _CreateDisks(lu, instance):
3043   """Create all disks for an instance.
3044
3045   This abstracts away some work from AddInstance.
3046
3047   Args:
3048     instance: the instance object
3049
3050   Returns:
3051     True or False showing the success of the creation process
3052
3053   """
3054   info = _GetInstanceInfoText(instance)
3055
3056   if instance.disk_template == constants.DT_FILE:
3057     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3058     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3059                                                  file_storage_dir)
3060
3061     if not result:
3062       logger.Error("Could not connect to node '%s'" % instance.primary_node)
3063       return False
3064
3065     if not result[0]:
3066       logger.Error("failed to create directory '%s'" % file_storage_dir)
3067       return False
3068
3069   for device in instance.disks:
3070     logger.Info("creating volume %s for instance %s" %
3071                 (device.iv_name, instance.name))
3072     #HARDCODE
3073     for secondary_node in instance.secondary_nodes:
3074       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3075                                         device, False, info):
3076         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3077                      (device.iv_name, device, secondary_node))
3078         return False
3079     #HARDCODE
3080     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3081                                     instance, device, info):
3082       logger.Error("failed to create volume %s on primary!" %
3083                    device.iv_name)
3084       return False
3085
3086   return True
3087
3088
3089 def _RemoveDisks(lu, instance):
3090   """Remove all disks for an instance.
3091
3092   This abstracts away some work from `AddInstance()` and
3093   `RemoveInstance()`. Note that in case some of the devices couldn't
3094   be removed, the removal will continue with the other ones (compare
3095   with `_CreateDisks()`).
3096
3097   Args:
3098     instance: the instance object
3099
3100   Returns:
3101     True or False showing the success of the removal proces
3102
3103   """
3104   logger.Info("removing block devices for instance %s" % instance.name)
3105
3106   result = True
3107   for device in instance.disks:
3108     for node, disk in device.ComputeNodeTree(instance.primary_node):
3109       lu.cfg.SetDiskID(disk, node)
3110       if not lu.rpc.call_blockdev_remove(node, disk):
3111         logger.Error("could not remove block device %s on node %s,"
3112                      " continuing anyway" %
3113                      (device.iv_name, node))
3114         result = False
3115
3116   if instance.disk_template == constants.DT_FILE:
3117     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3118     if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3119                                                file_storage_dir):
3120       logger.Error("could not remove directory '%s'" % file_storage_dir)
3121       result = False
3122
3123   return result
3124
3125
3126 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3127   """Compute disk size requirements in the volume group
3128
3129   This is currently hard-coded for the two-drive layout.
3130
3131   """
3132   # Required free disk space as a function of disk and swap space
3133   req_size_dict = {
3134     constants.DT_DISKLESS: None,
3135     constants.DT_PLAIN: disk_size + swap_size,
3136     # 256 MB are added for drbd metadata, 128MB for each drbd device
3137     constants.DT_DRBD8: disk_size + swap_size + 256,
3138     constants.DT_FILE: None,
3139   }
3140
3141   if disk_template not in req_size_dict:
3142     raise errors.ProgrammerError("Disk template '%s' size requirement"
3143                                  " is unknown" %  disk_template)
3144
3145   return req_size_dict[disk_template]
3146
3147
3148 class LUCreateInstance(LogicalUnit):
3149   """Create an instance.
3150
3151   """
3152   HPATH = "instance-add"
3153   HTYPE = constants.HTYPE_INSTANCE
3154   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3155               "disk_template", "swap_size", "mode", "start", "vcpus",
3156               "wait_for_sync", "ip_check", "mac"]
3157   REQ_BGL = False
3158
3159   def _ExpandNode(self, node):
3160     """Expands and checks one node name.
3161
3162     """
3163     node_full = self.cfg.ExpandNodeName(node)
3164     if node_full is None:
3165       raise errors.OpPrereqError("Unknown node %s" % node)
3166     return node_full
3167
3168   def ExpandNames(self):
3169     """ExpandNames for CreateInstance.
3170
3171     Figure out the right locks for instance creation.
3172
3173     """
3174     self.needed_locks = {}
3175
3176     # set optional parameters to none if they don't exist
3177     for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3178                  "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3179                  "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3180                  "vnc_bind_address", "hypervisor"]:
3181       if not hasattr(self.op, attr):
3182         setattr(self.op, attr, None)
3183
3184     # cheap checks, mostly valid constants given
3185
3186     # verify creation mode
3187     if self.op.mode not in (constants.INSTANCE_CREATE,
3188                             constants.INSTANCE_IMPORT):
3189       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3190                                  self.op.mode)
3191
3192     # disk template and mirror node verification
3193     if self.op.disk_template not in constants.DISK_TEMPLATES:
3194       raise errors.OpPrereqError("Invalid disk template name")
3195
3196     if self.op.hypervisor is None:
3197       self.op.hypervisor = self.cfg.GetHypervisorType()
3198
3199     enabled_hvs = self.cfg.GetClusterInfo().enabled_hypervisors
3200     if self.op.hypervisor not in enabled_hvs:
3201       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3202                                  " cluster (%s)" % (self.op.hypervisor,
3203                                   ",".join(enabled_hvs)))
3204
3205     #### instance parameters check
3206
3207     # instance name verification
3208     hostname1 = utils.HostInfo(self.op.instance_name)
3209     self.op.instance_name = instance_name = hostname1.name
3210
3211     # this is just a preventive check, but someone might still add this
3212     # instance in the meantime, and creation will fail at lock-add time
3213     if instance_name in self.cfg.GetInstanceList():
3214       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3215                                  instance_name)
3216
3217     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3218
3219     # ip validity checks
3220     ip = getattr(self.op, "ip", None)
3221     if ip is None or ip.lower() == "none":
3222       inst_ip = None
3223     elif ip.lower() == "auto":
3224       inst_ip = hostname1.ip
3225     else:
3226       if not utils.IsValidIP(ip):
3227         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3228                                    " like a valid IP" % ip)
3229       inst_ip = ip
3230     self.inst_ip = self.op.ip = inst_ip
3231     # used in CheckPrereq for ip ping check
3232     self.check_ip = hostname1.ip
3233
3234     # MAC address verification
3235     if self.op.mac != "auto":
3236       if not utils.IsValidMac(self.op.mac.lower()):
3237         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3238                                    self.op.mac)
3239
3240     # boot order verification
3241     if self.op.hvm_boot_order is not None:
3242       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3243         raise errors.OpPrereqError("invalid boot order specified,"
3244                                    " must be one or more of [acdn]")
3245     # file storage checks
3246     if (self.op.file_driver and
3247         not self.op.file_driver in constants.FILE_DRIVER):
3248       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3249                                  self.op.file_driver)
3250
3251     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3252       raise errors.OpPrereqError("File storage directory path not absolute")
3253
3254     ### Node/iallocator related checks
3255     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3256       raise errors.OpPrereqError("One and only one of iallocator and primary"
3257                                  " node must be given")
3258
3259     if self.op.iallocator:
3260       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3261     else:
3262       self.op.pnode = self._ExpandNode(self.op.pnode)
3263       nodelist = [self.op.pnode]
3264       if self.op.snode is not None:
3265         self.op.snode = self._ExpandNode(self.op.snode)
3266         nodelist.append(self.op.snode)
3267       self.needed_locks[locking.LEVEL_NODE] = nodelist
3268
3269     # in case of import lock the source node too
3270     if self.op.mode == constants.INSTANCE_IMPORT:
3271       src_node = getattr(self.op, "src_node", None)
3272       src_path = getattr(self.op, "src_path", None)
3273
3274       if src_node is None or src_path is None:
3275         raise errors.OpPrereqError("Importing an instance requires source"
3276                                    " node and path options")
3277
3278       if not os.path.isabs(src_path):
3279         raise errors.OpPrereqError("The source path must be absolute")
3280
3281       self.op.src_node = src_node = self._ExpandNode(src_node)
3282       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3283         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3284
3285     else: # INSTANCE_CREATE
3286       if getattr(self.op, "os_type", None) is None:
3287         raise errors.OpPrereqError("No guest OS specified")
3288
3289   def _RunAllocator(self):
3290     """Run the allocator based on input opcode.
3291
3292     """
3293     disks = [{"size": self.op.disk_size, "mode": "w"},
3294              {"size": self.op.swap_size, "mode": "w"}]
3295     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3296              "bridge": self.op.bridge}]
3297     ial = IAllocator(self,
3298                      mode=constants.IALLOCATOR_MODE_ALLOC,
3299                      name=self.op.instance_name,
3300                      disk_template=self.op.disk_template,
3301                      tags=[],
3302                      os=self.op.os_type,
3303                      vcpus=self.op.vcpus,
3304                      mem_size=self.op.mem_size,
3305                      disks=disks,
3306                      nics=nics,
3307                      )
3308
3309     ial.Run(self.op.iallocator)
3310
3311     if not ial.success:
3312       raise errors.OpPrereqError("Can't compute nodes using"
3313                                  " iallocator '%s': %s" % (self.op.iallocator,
3314                                                            ial.info))
3315     if len(ial.nodes) != ial.required_nodes:
3316       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3317                                  " of nodes (%s), required %s" %
3318                                  (self.op.iallocator, len(ial.nodes),
3319                                   ial.required_nodes))
3320     self.op.pnode = ial.nodes[0]
3321     logger.ToStdout("Selected nodes for the instance: %s" %
3322                     (", ".join(ial.nodes),))
3323     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3324                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3325     if ial.required_nodes == 2:
3326       self.op.snode = ial.nodes[1]
3327
3328   def BuildHooksEnv(self):
3329     """Build hooks env.
3330
3331     This runs on master, primary and secondary nodes of the instance.
3332
3333     """
3334     env = {
3335       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3336       "INSTANCE_DISK_SIZE": self.op.disk_size,
3337       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3338       "INSTANCE_ADD_MODE": self.op.mode,
3339       }
3340     if self.op.mode == constants.INSTANCE_IMPORT:
3341       env["INSTANCE_SRC_NODE"] = self.op.src_node
3342       env["INSTANCE_SRC_PATH"] = self.op.src_path
3343       env["INSTANCE_SRC_IMAGE"] = self.src_image
3344
3345     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3346       primary_node=self.op.pnode,
3347       secondary_nodes=self.secondaries,
3348       status=self.instance_status,
3349       os_type=self.op.os_type,
3350       memory=self.op.mem_size,
3351       vcpus=self.op.vcpus,
3352       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3353     ))
3354
3355     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3356           self.secondaries)
3357     return env, nl, nl
3358
3359
3360   def CheckPrereq(self):
3361     """Check prerequisites.
3362
3363     """
3364     if (not self.cfg.GetVGName() and
3365         self.op.disk_template not in constants.DTS_NOT_LVM):
3366       raise errors.OpPrereqError("Cluster does not support lvm-based"
3367                                  " instances")
3368
3369
3370     if self.op.mode == constants.INSTANCE_IMPORT:
3371       src_node = self.op.src_node
3372       src_path = self.op.src_path
3373
3374       export_info = self.rpc.call_export_info(src_node, src_path)
3375
3376       if not export_info:
3377         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3378
3379       if not export_info.has_section(constants.INISECT_EXP):
3380         raise errors.ProgrammerError("Corrupted export config")
3381
3382       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3383       if (int(ei_version) != constants.EXPORT_VERSION):
3384         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3385                                    (ei_version, constants.EXPORT_VERSION))
3386
3387       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3388         raise errors.OpPrereqError("Can't import instance with more than"
3389                                    " one data disk")
3390
3391       # FIXME: are the old os-es, disk sizes, etc. useful?
3392       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3393       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3394                                                          'disk0_dump'))
3395       self.src_image = diskimage
3396
3397     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3398
3399     if self.op.start and not self.op.ip_check:
3400       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3401                                  " adding an instance in start mode")
3402
3403     if self.op.ip_check:
3404       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3405         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3406                                    (self.check_ip, self.op.instance_name))
3407
3408     # bridge verification
3409     bridge = getattr(self.op, "bridge", None)
3410     if bridge is None:
3411       self.op.bridge = self.cfg.GetDefBridge()
3412     else:
3413       self.op.bridge = bridge
3414
3415     #### allocator run
3416
3417     if self.op.iallocator is not None:
3418       self._RunAllocator()
3419
3420     #### node related checks
3421
3422     # check primary node
3423     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3424     assert self.pnode is not None, \
3425       "Cannot retrieve locked node %s" % self.op.pnode
3426     self.secondaries = []
3427
3428     # mirror node verification
3429     if self.op.disk_template in constants.DTS_NET_MIRROR:
3430       if self.op.snode is None:
3431         raise errors.OpPrereqError("The networked disk templates need"
3432                                    " a mirror node")
3433       if self.op.snode == pnode.name:
3434         raise errors.OpPrereqError("The secondary node cannot be"
3435                                    " the primary node.")
3436       self.secondaries.append(self.op.snode)
3437
3438     req_size = _ComputeDiskSize(self.op.disk_template,
3439                                 self.op.disk_size, self.op.swap_size)
3440
3441     # Check lv size requirements
3442     if req_size is not None:
3443       nodenames = [pnode.name] + self.secondaries
3444       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3445                                          self.op.hypervisor)
3446       for node in nodenames:
3447         info = nodeinfo.get(node, None)
3448         if not info:
3449           raise errors.OpPrereqError("Cannot get current information"
3450                                      " from node '%s'" % node)
3451         vg_free = info.get('vg_free', None)
3452         if not isinstance(vg_free, int):
3453           raise errors.OpPrereqError("Can't compute free disk space on"
3454                                      " node %s" % node)
3455         if req_size > info['vg_free']:
3456           raise errors.OpPrereqError("Not enough disk space on target node %s."
3457                                      " %d MB available, %d MB required" %
3458                                      (node, info['vg_free'], req_size))
3459
3460     # os verification
3461     os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3462     if not os_obj:
3463       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3464                                  " primary node"  % self.op.os_type)
3465
3466     if self.op.kernel_path == constants.VALUE_NONE:
3467       raise errors.OpPrereqError("Can't set instance kernel to none")
3468
3469     # bridge check on primary node
3470     if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3471       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3472                                  " destination node '%s'" %
3473                                  (self.op.bridge, pnode.name))
3474
3475     # memory check on primary node
3476     if self.op.start:
3477       _CheckNodeFreeMemory(self, self.pnode.name,
3478                            "creating instance %s" % self.op.instance_name,
3479                            self.op.mem_size, self.op.hypervisor)
3480
3481     # hvm_cdrom_image_path verification
3482     if self.op.hvm_cdrom_image_path is not None:
3483       # FIXME (als): shouldn't these checks happen on the destination node?
3484       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3485         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3486                                    " be an absolute path or None, not %s" %
3487                                    self.op.hvm_cdrom_image_path)
3488       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3489         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3490                                    " regular file or a symlink pointing to"
3491                                    " an existing regular file, not %s" %
3492                                    self.op.hvm_cdrom_image_path)
3493
3494     # vnc_bind_address verification
3495     if self.op.vnc_bind_address is not None:
3496       if not utils.IsValidIP(self.op.vnc_bind_address):
3497         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3498                                    " like a valid IP address" %
3499                                    self.op.vnc_bind_address)
3500
3501     # Xen HVM device type checks
3502     if self.op.hypervisor == constants.HT_XEN_HVM:
3503       if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3504         raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3505                                    " hypervisor" % self.op.hvm_nic_type)
3506       if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3507         raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3508                                    " hypervisor" % self.op.hvm_disk_type)
3509
3510     if self.op.start:
3511       self.instance_status = 'up'
3512     else:
3513       self.instance_status = 'down'
3514
3515   def Exec(self, feedback_fn):
3516     """Create and add the instance to the cluster.
3517
3518     """
3519     instance = self.op.instance_name
3520     pnode_name = self.pnode.name
3521
3522     if self.op.mac == "auto":
3523       mac_address = self.cfg.GenerateMAC()
3524     else:
3525       mac_address = self.op.mac
3526
3527     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3528     if self.inst_ip is not None:
3529       nic.ip = self.inst_ip
3530
3531     ht_kind = self.op.hypervisor
3532     if ht_kind in constants.HTS_REQ_PORT:
3533       network_port = self.cfg.AllocatePort()
3534     else:
3535       network_port = None
3536
3537     if self.op.vnc_bind_address is None:
3538       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3539
3540     # this is needed because os.path.join does not accept None arguments
3541     if self.op.file_storage_dir is None:
3542       string_file_storage_dir = ""
3543     else:
3544       string_file_storage_dir = self.op.file_storage_dir
3545
3546     # build the full file storage dir path
3547     file_storage_dir = os.path.normpath(os.path.join(
3548                                         self.cfg.GetFileStorageDir(),
3549                                         string_file_storage_dir, instance))
3550
3551
3552     disks = _GenerateDiskTemplate(self,
3553                                   self.op.disk_template,
3554                                   instance, pnode_name,
3555                                   self.secondaries, self.op.disk_size,
3556                                   self.op.swap_size,
3557                                   file_storage_dir,
3558                                   self.op.file_driver)
3559
3560     iobj = objects.Instance(name=instance, os=self.op.os_type,
3561                             primary_node=pnode_name,
3562                             memory=self.op.mem_size,
3563                             vcpus=self.op.vcpus,
3564                             nics=[nic], disks=disks,
3565                             disk_template=self.op.disk_template,
3566                             status=self.instance_status,
3567                             network_port=network_port,
3568                             kernel_path=self.op.kernel_path,
3569                             initrd_path=self.op.initrd_path,
3570                             hvm_boot_order=self.op.hvm_boot_order,
3571                             hvm_acpi=self.op.hvm_acpi,
3572                             hvm_pae=self.op.hvm_pae,
3573                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3574                             vnc_bind_address=self.op.vnc_bind_address,
3575                             hvm_nic_type=self.op.hvm_nic_type,
3576                             hvm_disk_type=self.op.hvm_disk_type,
3577                             hypervisor=self.op.hypervisor,
3578                             )
3579
3580     feedback_fn("* creating instance disks...")
3581     if not _CreateDisks(self, iobj):
3582       _RemoveDisks(self, iobj)
3583       self.cfg.ReleaseDRBDMinors(instance)
3584       raise errors.OpExecError("Device creation failed, reverting...")
3585
3586     feedback_fn("adding instance %s to cluster config" % instance)
3587
3588     self.cfg.AddInstance(iobj)
3589     # Declare that we don't want to remove the instance lock anymore, as we've
3590     # added the instance to the config
3591     del self.remove_locks[locking.LEVEL_INSTANCE]
3592     # Remove the temp. assignements for the instance's drbds
3593     self.cfg.ReleaseDRBDMinors(instance)
3594
3595     if self.op.wait_for_sync:
3596       disk_abort = not _WaitForSync(self, iobj)
3597     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3598       # make sure the disks are not degraded (still sync-ing is ok)
3599       time.sleep(15)
3600       feedback_fn("* checking mirrors status")
3601       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3602     else:
3603       disk_abort = False
3604
3605     if disk_abort:
3606       _RemoveDisks(self, iobj)
3607       self.cfg.RemoveInstance(iobj.name)
3608       # Make sure the instance lock gets removed
3609       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3610       raise errors.OpExecError("There are some degraded disks for"
3611                                " this instance")
3612
3613     feedback_fn("creating os for instance %s on node %s" %
3614                 (instance, pnode_name))
3615
3616     if iobj.disk_template != constants.DT_DISKLESS:
3617       if self.op.mode == constants.INSTANCE_CREATE:
3618         feedback_fn("* running the instance OS create scripts...")
3619         if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3620           raise errors.OpExecError("could not add os for instance %s"
3621                                    " on node %s" %
3622                                    (instance, pnode_name))
3623
3624       elif self.op.mode == constants.INSTANCE_IMPORT:
3625         feedback_fn("* running the instance OS import scripts...")
3626         src_node = self.op.src_node
3627         src_image = self.src_image
3628         cluster_name = self.cfg.GetClusterName()
3629         if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3630                                                 src_node, src_image,
3631                                                 cluster_name):
3632           raise errors.OpExecError("Could not import os for instance"
3633                                    " %s on node %s" %
3634                                    (instance, pnode_name))
3635       else:
3636         # also checked in the prereq part
3637         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3638                                      % self.op.mode)
3639
3640     if self.op.start:
3641       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3642       feedback_fn("* starting instance...")
3643       if not self.rpc.call_instance_start(pnode_name, iobj, None):
3644         raise errors.OpExecError("Could not start instance")
3645
3646
3647 class LUConnectConsole(NoHooksLU):
3648   """Connect to an instance's console.
3649
3650   This is somewhat special in that it returns the command line that
3651   you need to run on the master node in order to connect to the
3652   console.
3653
3654   """
3655   _OP_REQP = ["instance_name"]
3656   REQ_BGL = False
3657
3658   def ExpandNames(self):
3659     self._ExpandAndLockInstance()
3660
3661   def CheckPrereq(self):
3662     """Check prerequisites.
3663
3664     This checks that the instance is in the cluster.
3665
3666     """
3667     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3668     assert self.instance is not None, \
3669       "Cannot retrieve locked instance %s" % self.op.instance_name
3670
3671   def Exec(self, feedback_fn):
3672     """Connect to the console of an instance
3673
3674     """
3675     instance = self.instance
3676     node = instance.primary_node
3677
3678     node_insts = self.rpc.call_instance_list([node],
3679                                              [instance.hypervisor])[node]
3680     if node_insts is False:
3681       raise errors.OpExecError("Can't connect to node %s." % node)
3682
3683     if instance.name not in node_insts:
3684       raise errors.OpExecError("Instance %s is not running." % instance.name)
3685
3686     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3687
3688     hyper = hypervisor.GetHypervisor(instance.hypervisor)
3689     console_cmd = hyper.GetShellCommandForConsole(instance)
3690
3691     # build ssh cmdline
3692     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3693
3694
3695 class LUReplaceDisks(LogicalUnit):
3696   """Replace the disks of an instance.
3697
3698   """
3699   HPATH = "mirrors-replace"
3700   HTYPE = constants.HTYPE_INSTANCE
3701   _OP_REQP = ["instance_name", "mode", "disks"]
3702   REQ_BGL = False
3703
3704   def ExpandNames(self):
3705     self._ExpandAndLockInstance()
3706
3707     if not hasattr(self.op, "remote_node"):
3708       self.op.remote_node = None
3709
3710     ia_name = getattr(self.op, "iallocator", None)
3711     if ia_name is not None:
3712       if self.op.remote_node is not None:
3713         raise errors.OpPrereqError("Give either the iallocator or the new"
3714                                    " secondary, not both")
3715       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3716     elif self.op.remote_node is not None:
3717       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3718       if remote_node is None:
3719         raise errors.OpPrereqError("Node '%s' not known" %
3720                                    self.op.remote_node)
3721       self.op.remote_node = remote_node
3722       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3723       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3724     else:
3725       self.needed_locks[locking.LEVEL_NODE] = []
3726       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3727
3728   def DeclareLocks(self, level):
3729     # If we're not already locking all nodes in the set we have to declare the
3730     # instance's primary/secondary nodes.
3731     if (level == locking.LEVEL_NODE and
3732         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3733       self._LockInstancesNodes()
3734
3735   def _RunAllocator(self):
3736     """Compute a new secondary node using an IAllocator.
3737
3738     """
3739     ial = IAllocator(self,
3740                      mode=constants.IALLOCATOR_MODE_RELOC,
3741                      name=self.op.instance_name,
3742                      relocate_from=[self.sec_node])
3743
3744     ial.Run(self.op.iallocator)
3745
3746     if not ial.success:
3747       raise errors.OpPrereqError("Can't compute nodes using"
3748                                  " iallocator '%s': %s" % (self.op.iallocator,
3749                                                            ial.info))
3750     if len(ial.nodes) != ial.required_nodes:
3751       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3752                                  " of nodes (%s), required %s" %
3753                                  (len(ial.nodes), ial.required_nodes))
3754     self.op.remote_node = ial.nodes[0]
3755     logger.ToStdout("Selected new secondary for the instance: %s" %
3756                     self.op.remote_node)
3757
3758   def BuildHooksEnv(self):
3759     """Build hooks env.
3760
3761     This runs on the master, the primary and all the secondaries.
3762
3763     """
3764     env = {
3765       "MODE": self.op.mode,
3766       "NEW_SECONDARY": self.op.remote_node,
3767       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3768       }
3769     env.update(_BuildInstanceHookEnvByObject(self.instance))
3770     nl = [
3771       self.cfg.GetMasterNode(),
3772       self.instance.primary_node,
3773       ]
3774     if self.op.remote_node is not None:
3775       nl.append(self.op.remote_node)
3776     return env, nl, nl
3777
3778   def CheckPrereq(self):
3779     """Check prerequisites.
3780
3781     This checks that the instance is in the cluster.
3782
3783     """
3784     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3785     assert instance is not None, \
3786       "Cannot retrieve locked instance %s" % self.op.instance_name
3787     self.instance = instance
3788
3789     if instance.disk_template not in constants.DTS_NET_MIRROR:
3790       raise errors.OpPrereqError("Instance's disk layout is not"
3791                                  " network mirrored.")
3792
3793     if len(instance.secondary_nodes) != 1:
3794       raise errors.OpPrereqError("The instance has a strange layout,"
3795                                  " expected one secondary but found %d" %
3796                                  len(instance.secondary_nodes))
3797
3798     self.sec_node = instance.secondary_nodes[0]
3799
3800     ia_name = getattr(self.op, "iallocator", None)
3801     if ia_name is not None:
3802       self._RunAllocator()
3803
3804     remote_node = self.op.remote_node
3805     if remote_node is not None:
3806       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3807       assert self.remote_node_info is not None, \
3808         "Cannot retrieve locked node %s" % remote_node
3809     else:
3810       self.remote_node_info = None
3811     if remote_node == instance.primary_node:
3812       raise errors.OpPrereqError("The specified node is the primary node of"
3813                                  " the instance.")
3814     elif remote_node == self.sec_node:
3815       if self.op.mode == constants.REPLACE_DISK_SEC:
3816         # this is for DRBD8, where we can't execute the same mode of
3817         # replacement as for drbd7 (no different port allocated)
3818         raise errors.OpPrereqError("Same secondary given, cannot execute"
3819                                    " replacement")
3820     if instance.disk_template == constants.DT_DRBD8:
3821       if (self.op.mode == constants.REPLACE_DISK_ALL and
3822           remote_node is not None):
3823         # switch to replace secondary mode
3824         self.op.mode = constants.REPLACE_DISK_SEC
3825
3826       if self.op.mode == constants.REPLACE_DISK_ALL:
3827         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3828                                    " secondary disk replacement, not"
3829                                    " both at once")
3830       elif self.op.mode == constants.REPLACE_DISK_PRI:
3831         if remote_node is not None:
3832           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3833                                      " the secondary while doing a primary"
3834                                      " node disk replacement")
3835         self.tgt_node = instance.primary_node
3836         self.oth_node = instance.secondary_nodes[0]
3837       elif self.op.mode == constants.REPLACE_DISK_SEC:
3838         self.new_node = remote_node # this can be None, in which case
3839                                     # we don't change the secondary
3840         self.tgt_node = instance.secondary_nodes[0]
3841         self.oth_node = instance.primary_node
3842       else:
3843         raise errors.ProgrammerError("Unhandled disk replace mode")
3844
3845     for name in self.op.disks:
3846       if instance.FindDisk(name) is None:
3847         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3848                                    (name, instance.name))
3849
3850   def _ExecD8DiskOnly(self, feedback_fn):
3851     """Replace a disk on the primary or secondary for dbrd8.
3852
3853     The algorithm for replace is quite complicated:
3854       - for each disk to be replaced:
3855         - create new LVs on the target node with unique names
3856         - detach old LVs from the drbd device
3857         - rename old LVs to name_replaced.<time_t>
3858         - rename new LVs to old LVs
3859         - attach the new LVs (with the old names now) to the drbd device
3860       - wait for sync across all devices
3861       - for each modified disk:
3862         - remove old LVs (which have the name name_replaces.<time_t>)
3863
3864     Failures are not very well handled.
3865
3866     """
3867     steps_total = 6
3868     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3869     instance = self.instance
3870     iv_names = {}
3871     vgname = self.cfg.GetVGName()
3872     # start of work
3873     cfg = self.cfg
3874     tgt_node = self.tgt_node
3875     oth_node = self.oth_node
3876
3877     # Step: check device activation
3878     self.proc.LogStep(1, steps_total, "check device existence")
3879     info("checking volume groups")
3880     my_vg = cfg.GetVGName()
3881     results = self.rpc.call_vg_list([oth_node, tgt_node])
3882     if not results:
3883       raise errors.OpExecError("Can't list volume groups on the nodes")
3884     for node in oth_node, tgt_node:
3885       res = results.get(node, False)
3886       if not res or my_vg not in res:
3887         raise errors.OpExecError("Volume group '%s' not found on %s" %
3888                                  (my_vg, node))
3889     for dev in instance.disks:
3890       if not dev.iv_name in self.op.disks:
3891         continue
3892       for node in tgt_node, oth_node:
3893         info("checking %s on %s" % (dev.iv_name, node))
3894         cfg.SetDiskID(dev, node)
3895         if not self.rpc.call_blockdev_find(node, dev):
3896           raise errors.OpExecError("Can't find device %s on node %s" %
3897                                    (dev.iv_name, node))
3898
3899     # Step: check other node consistency
3900     self.proc.LogStep(2, steps_total, "check peer consistency")
3901     for dev in instance.disks:
3902       if not dev.iv_name in self.op.disks:
3903         continue
3904       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3905       if not _CheckDiskConsistency(self, dev, oth_node,
3906                                    oth_node==instance.primary_node):
3907         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3908                                  " to replace disks on this node (%s)" %
3909                                  (oth_node, tgt_node))
3910
3911     # Step: create new storage
3912     self.proc.LogStep(3, steps_total, "allocate new storage")
3913     for dev in instance.disks:
3914       if not dev.iv_name in self.op.disks:
3915         continue
3916       size = dev.size
3917       cfg.SetDiskID(dev, tgt_node)
3918       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3919       names = _GenerateUniqueNames(self, lv_names)
3920       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3921                              logical_id=(vgname, names[0]))
3922       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3923                              logical_id=(vgname, names[1]))
3924       new_lvs = [lv_data, lv_meta]
3925       old_lvs = dev.children
3926       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3927       info("creating new local storage on %s for %s" %
3928            (tgt_node, dev.iv_name))
3929       # since we *always* want to create this LV, we use the
3930       # _Create...OnPrimary (which forces the creation), even if we
3931       # are talking about the secondary node
3932       for new_lv in new_lvs:
3933         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3934                                         _GetInstanceInfoText(instance)):
3935           raise errors.OpExecError("Failed to create new LV named '%s' on"
3936                                    " node '%s'" %
3937                                    (new_lv.logical_id[1], tgt_node))
3938
3939     # Step: for each lv, detach+rename*2+attach
3940     self.proc.LogStep(4, steps_total, "change drbd configuration")
3941     for dev, old_lvs, new_lvs in iv_names.itervalues():
3942       info("detaching %s drbd from local storage" % dev.iv_name)
3943       if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3944         raise errors.OpExecError("Can't detach drbd from local storage on node"
3945                                  " %s for device %s" % (tgt_node, dev.iv_name))
3946       #dev.children = []
3947       #cfg.Update(instance)
3948
3949       # ok, we created the new LVs, so now we know we have the needed
3950       # storage; as such, we proceed on the target node to rename
3951       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3952       # using the assumption that logical_id == physical_id (which in
3953       # turn is the unique_id on that node)
3954
3955       # FIXME(iustin): use a better name for the replaced LVs
3956       temp_suffix = int(time.time())
3957       ren_fn = lambda d, suff: (d.physical_id[0],
3958                                 d.physical_id[1] + "_replaced-%s" % suff)
3959       # build the rename list based on what LVs exist on the node
3960       rlist = []
3961       for to_ren in old_lvs:
3962         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
3963         if find_res is not None: # device exists
3964           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3965
3966       info("renaming the old LVs on the target node")
3967       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3968         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3969       # now we rename the new LVs to the old LVs
3970       info("renaming the new LVs on the target node")
3971       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3972       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3973         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3974
3975       for old, new in zip(old_lvs, new_lvs):
3976         new.logical_id = old.logical_id
3977         cfg.SetDiskID(new, tgt_node)
3978
3979       for disk in old_lvs:
3980         disk.logical_id = ren_fn(disk, temp_suffix)
3981         cfg.SetDiskID(disk, tgt_node)
3982
3983       # now that the new lvs have the old name, we can add them to the device
3984       info("adding new mirror component on %s" % tgt_node)
3985       if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3986         for new_lv in new_lvs:
3987           if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
3988             warning("Can't rollback device %s", hint="manually cleanup unused"
3989                     " logical volumes")
3990         raise errors.OpExecError("Can't add local storage to drbd")
3991
3992       dev.children = new_lvs
3993       cfg.Update(instance)
3994
3995     # Step: wait for sync
3996
3997     # this can fail as the old devices are degraded and _WaitForSync
3998     # does a combined result over all disks, so we don't check its
3999     # return value
4000     self.proc.LogStep(5, steps_total, "sync devices")
4001     _WaitForSync(self, instance, unlock=True)
4002
4003     # so check manually all the devices
4004     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4005       cfg.SetDiskID(dev, instance.primary_node)
4006       is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4007       if is_degr:
4008         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4009
4010     # Step: remove old storage
4011     self.proc.LogStep(6, steps_total, "removing old storage")
4012     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4013       info("remove logical volumes for %s" % name)
4014       for lv in old_lvs:
4015         cfg.SetDiskID(lv, tgt_node)
4016         if not self.rpc.call_blockdev_remove(tgt_node, lv):
4017           warning("Can't remove old LV", hint="manually remove unused LVs")
4018           continue
4019
4020   def _ExecD8Secondary(self, feedback_fn):
4021     """Replace the secondary node for drbd8.
4022
4023     The algorithm for replace is quite complicated:
4024       - for all disks of the instance:
4025         - create new LVs on the new node with same names
4026         - shutdown the drbd device on the old secondary
4027         - disconnect the drbd network on the primary
4028         - create the drbd device on the new secondary
4029         - network attach the drbd on the primary, using an artifice:
4030           the drbd code for Attach() will connect to the network if it
4031           finds a device which is connected to the good local disks but
4032           not network enabled
4033       - wait for sync across all devices
4034       - remove all disks from the old secondary
4035
4036     Failures are not very well handled.
4037
4038     """
4039     steps_total = 6
4040     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4041     instance = self.instance
4042     iv_names = {}
4043     vgname = self.cfg.GetVGName()
4044     # start of work
4045     cfg = self.cfg
4046     old_node = self.tgt_node
4047     new_node = self.new_node
4048     pri_node = instance.primary_node
4049
4050     # Step: check device activation
4051     self.proc.LogStep(1, steps_total, "check device existence")
4052     info("checking volume groups")
4053     my_vg = cfg.GetVGName()
4054     results = self.rpc.call_vg_list([pri_node, new_node])
4055     if not results:
4056       raise errors.OpExecError("Can't list volume groups on the nodes")
4057     for node in pri_node, new_node:
4058       res = results.get(node, False)
4059       if not res or my_vg not in res:
4060         raise errors.OpExecError("Volume group '%s' not found on %s" %
4061                                  (my_vg, node))
4062     for dev in instance.disks:
4063       if not dev.iv_name in self.op.disks:
4064         continue
4065       info("checking %s on %s" % (dev.iv_name, pri_node))
4066       cfg.SetDiskID(dev, pri_node)
4067       if not self.rpc.call_blockdev_find(pri_node, dev):
4068         raise errors.OpExecError("Can't find device %s on node %s" %
4069                                  (dev.iv_name, pri_node))
4070
4071     # Step: check other node consistency
4072     self.proc.LogStep(2, steps_total, "check peer consistency")
4073     for dev in instance.disks:
4074       if not dev.iv_name in self.op.disks:
4075         continue
4076       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4077       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4078         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4079                                  " unsafe to replace the secondary" %
4080                                  pri_node)
4081
4082     # Step: create new storage
4083     self.proc.LogStep(3, steps_total, "allocate new storage")
4084     for dev in instance.disks:
4085       size = dev.size
4086       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4087       # since we *always* want to create this LV, we use the
4088       # _Create...OnPrimary (which forces the creation), even if we
4089       # are talking about the secondary node
4090       for new_lv in dev.children:
4091         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4092                                         _GetInstanceInfoText(instance)):
4093           raise errors.OpExecError("Failed to create new LV named '%s' on"
4094                                    " node '%s'" %
4095                                    (new_lv.logical_id[1], new_node))
4096
4097
4098     # Step 4: dbrd minors and drbd setups changes
4099     # after this, we must manually remove the drbd minors on both the
4100     # error and the success paths
4101     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4102                                    instance.name)
4103     logging.debug("Allocated minors %s" % (minors,))
4104     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4105     for dev, new_minor in zip(instance.disks, minors):
4106       size = dev.size
4107       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4108       # create new devices on new_node
4109       if pri_node == dev.logical_id[0]:
4110         new_logical_id = (pri_node, new_node,
4111                           dev.logical_id[2], dev.logical_id[3], new_minor,
4112                           dev.logical_id[5])
4113       else:
4114         new_logical_id = (new_node, pri_node,
4115                           dev.logical_id[2], new_minor, dev.logical_id[4],
4116                           dev.logical_id[5])
4117       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4118       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4119                     new_logical_id)
4120       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4121                               logical_id=new_logical_id,
4122                               children=dev.children)
4123       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4124                                         new_drbd, False,
4125                                         _GetInstanceInfoText(instance)):
4126         self.cfg.ReleaseDRBDMinors(instance.name)
4127         raise errors.OpExecError("Failed to create new DRBD on"
4128                                  " node '%s'" % new_node)
4129
4130     for dev in instance.disks:
4131       # we have new devices, shutdown the drbd on the old secondary
4132       info("shutting down drbd for %s on old node" % dev.iv_name)
4133       cfg.SetDiskID(dev, old_node)
4134       if not self.rpc.call_blockdev_shutdown(old_node, dev):
4135         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4136                 hint="Please cleanup this device manually as soon as possible")
4137
4138     info("detaching primary drbds from the network (=> standalone)")
4139     done = 0
4140     for dev in instance.disks:
4141       cfg.SetDiskID(dev, pri_node)
4142       # set the network part of the physical (unique in bdev terms) id
4143       # to None, meaning detach from network
4144       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4145       # and 'find' the device, which will 'fix' it to match the
4146       # standalone state
4147       if self.rpc.call_blockdev_find(pri_node, dev):
4148         done += 1
4149       else:
4150         warning("Failed to detach drbd %s from network, unusual case" %
4151                 dev.iv_name)
4152
4153     if not done:
4154       # no detaches succeeded (very unlikely)
4155       self.cfg.ReleaseDRBDMinors(instance.name)
4156       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4157
4158     # if we managed to detach at least one, we update all the disks of
4159     # the instance to point to the new secondary
4160     info("updating instance configuration")
4161     for dev, _, new_logical_id in iv_names.itervalues():
4162       dev.logical_id = new_logical_id
4163       cfg.SetDiskID(dev, pri_node)
4164     cfg.Update(instance)
4165     # we can remove now the temp minors as now the new values are
4166     # written to the config file (and therefore stable)
4167     self.cfg.ReleaseDRBDMinors(instance.name)
4168
4169     # and now perform the drbd attach
4170     info("attaching primary drbds to new secondary (standalone => connected)")
4171     failures = []
4172     for dev in instance.disks:
4173       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4174       # since the attach is smart, it's enough to 'find' the device,
4175       # it will automatically activate the network, if the physical_id
4176       # is correct
4177       cfg.SetDiskID(dev, pri_node)
4178       logging.debug("Disk to attach: %s", dev)
4179       if not self.rpc.call_blockdev_find(pri_node, dev):
4180         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4181                 "please do a gnt-instance info to see the status of disks")
4182
4183     # this can fail as the old devices are degraded and _WaitForSync
4184     # does a combined result over all disks, so we don't check its
4185     # return value
4186     self.proc.LogStep(5, steps_total, "sync devices")
4187     _WaitForSync(self, instance, unlock=True)
4188
4189     # so check manually all the devices
4190     for name, (dev, old_lvs, _) in iv_names.iteritems():
4191       cfg.SetDiskID(dev, pri_node)
4192       is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4193       if is_degr:
4194         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4195
4196     self.proc.LogStep(6, steps_total, "removing old storage")
4197     for name, (dev, old_lvs, _) in iv_names.iteritems():
4198       info("remove logical volumes for %s" % name)
4199       for lv in old_lvs:
4200         cfg.SetDiskID(lv, old_node)
4201         if not self.rpc.call_blockdev_remove(old_node, lv):
4202           warning("Can't remove LV on old secondary",
4203                   hint="Cleanup stale volumes by hand")
4204
4205   def Exec(self, feedback_fn):
4206     """Execute disk replacement.
4207
4208     This dispatches the disk replacement to the appropriate handler.
4209
4210     """
4211     instance = self.instance
4212
4213     # Activate the instance disks if we're replacing them on a down instance
4214     if instance.status == "down":
4215       _StartInstanceDisks(self, instance, True)
4216
4217     if instance.disk_template == constants.DT_DRBD8:
4218       if self.op.remote_node is None:
4219         fn = self._ExecD8DiskOnly
4220       else:
4221         fn = self._ExecD8Secondary
4222     else:
4223       raise errors.ProgrammerError("Unhandled disk replacement case")
4224
4225     ret = fn(feedback_fn)
4226
4227     # Deactivate the instance disks if we're replacing them on a down instance
4228     if instance.status == "down":
4229       _SafeShutdownInstanceDisks(self, instance)
4230
4231     return ret
4232
4233
4234 class LUGrowDisk(LogicalUnit):
4235   """Grow a disk of an instance.
4236
4237   """
4238   HPATH = "disk-grow"
4239   HTYPE = constants.HTYPE_INSTANCE
4240   _OP_REQP = ["instance_name", "disk", "amount"]
4241   REQ_BGL = False
4242
4243   def ExpandNames(self):
4244     self._ExpandAndLockInstance()
4245     self.needed_locks[locking.LEVEL_NODE] = []
4246     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4247
4248   def DeclareLocks(self, level):
4249     if level == locking.LEVEL_NODE:
4250       self._LockInstancesNodes()
4251
4252   def BuildHooksEnv(self):
4253     """Build hooks env.
4254
4255     This runs on the master, the primary and all the secondaries.
4256
4257     """
4258     env = {
4259       "DISK": self.op.disk,
4260       "AMOUNT": self.op.amount,
4261       }
4262     env.update(_BuildInstanceHookEnvByObject(self.instance))
4263     nl = [
4264       self.cfg.GetMasterNode(),
4265       self.instance.primary_node,
4266       ]
4267     return env, nl, nl
4268
4269   def CheckPrereq(self):
4270     """Check prerequisites.
4271
4272     This checks that the instance is in the cluster.
4273
4274     """
4275     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4276     assert instance is not None, \
4277       "Cannot retrieve locked instance %s" % self.op.instance_name
4278
4279     self.instance = instance
4280
4281     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4282       raise errors.OpPrereqError("Instance's disk layout does not support"
4283                                  " growing.")
4284
4285     if instance.FindDisk(self.op.disk) is None:
4286       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4287                                  (self.op.disk, instance.name))
4288
4289     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4290     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4291                                        instance.hypervisor)
4292     for node in nodenames:
4293       info = nodeinfo.get(node, None)
4294       if not info:
4295         raise errors.OpPrereqError("Cannot get current information"
4296                                    " from node '%s'" % node)
4297       vg_free = info.get('vg_free', None)
4298       if not isinstance(vg_free, int):
4299         raise errors.OpPrereqError("Can't compute free disk space on"
4300                                    " node %s" % node)
4301       if self.op.amount > info['vg_free']:
4302         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4303                                    " %d MiB available, %d MiB required" %
4304                                    (node, info['vg_free'], self.op.amount))
4305
4306   def Exec(self, feedback_fn):
4307     """Execute disk grow.
4308
4309     """
4310     instance = self.instance
4311     disk = instance.FindDisk(self.op.disk)
4312     for node in (instance.secondary_nodes + (instance.primary_node,)):
4313       self.cfg.SetDiskID(disk, node)
4314       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4315       if (not result or not isinstance(result, (list, tuple)) or
4316           len(result) != 2):
4317         raise errors.OpExecError("grow request failed to node %s" % node)
4318       elif not result[0]:
4319         raise errors.OpExecError("grow request failed to node %s: %s" %
4320                                  (node, result[1]))
4321     disk.RecordGrow(self.op.amount)
4322     self.cfg.Update(instance)
4323     return
4324
4325
4326 class LUQueryInstanceData(NoHooksLU):
4327   """Query runtime instance data.
4328
4329   """
4330   _OP_REQP = ["instances"]
4331   REQ_BGL = False
4332
4333   def ExpandNames(self):
4334     self.needed_locks = {}
4335     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4336
4337     if not isinstance(self.op.instances, list):
4338       raise errors.OpPrereqError("Invalid argument type 'instances'")
4339
4340     if self.op.instances:
4341       self.wanted_names = []
4342       for name in self.op.instances:
4343         full_name = self.cfg.ExpandInstanceName(name)
4344         if full_name is None:
4345           raise errors.OpPrereqError("Instance '%s' not known" %
4346                                      self.op.instance_name)
4347         self.wanted_names.append(full_name)
4348       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4349     else:
4350       self.wanted_names = None
4351       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4352
4353     self.needed_locks[locking.LEVEL_NODE] = []
4354     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4355
4356   def DeclareLocks(self, level):
4357     if level == locking.LEVEL_NODE:
4358       self._LockInstancesNodes()
4359
4360   def CheckPrereq(self):
4361     """Check prerequisites.
4362
4363     This only checks the optional instance list against the existing names.
4364
4365     """
4366     if self.wanted_names is None:
4367       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4368
4369     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4370                              in self.wanted_names]
4371     return
4372
4373   def _ComputeDiskStatus(self, instance, snode, dev):
4374     """Compute block device status.
4375
4376     """
4377     self.cfg.SetDiskID(dev, instance.primary_node)
4378     dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4379     if dev.dev_type in constants.LDS_DRBD:
4380       # we change the snode then (otherwise we use the one passed in)
4381       if dev.logical_id[0] == instance.primary_node:
4382         snode = dev.logical_id[1]
4383       else:
4384         snode = dev.logical_id[0]
4385
4386     if snode:
4387       self.cfg.SetDiskID(dev, snode)
4388       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4389     else:
4390       dev_sstatus = None
4391
4392     if dev.children:
4393       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4394                       for child in dev.children]
4395     else:
4396       dev_children = []
4397
4398     data = {
4399       "iv_name": dev.iv_name,
4400       "dev_type": dev.dev_type,
4401       "logical_id": dev.logical_id,
4402       "physical_id": dev.physical_id,
4403       "pstatus": dev_pstatus,
4404       "sstatus": dev_sstatus,
4405       "children": dev_children,
4406       }
4407
4408     return data
4409
4410   def Exec(self, feedback_fn):
4411     """Gather and return data"""
4412     result = {}
4413     for instance in self.wanted_instances:
4414       remote_info = self.rpc.call_instance_info(instance.primary_node,
4415                                                 instance.name,
4416                                                 instance.hypervisor)
4417       if remote_info and "state" in remote_info:
4418         remote_state = "up"
4419       else:
4420         remote_state = "down"
4421       if instance.status == "down":
4422         config_state = "down"
4423       else:
4424         config_state = "up"
4425
4426       disks = [self._ComputeDiskStatus(instance, None, device)
4427                for device in instance.disks]
4428
4429       idict = {
4430         "name": instance.name,
4431         "config_state": config_state,
4432         "run_state": remote_state,
4433         "pnode": instance.primary_node,
4434         "snodes": instance.secondary_nodes,
4435         "os": instance.os,
4436         "memory": instance.memory,
4437         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4438         "disks": disks,
4439         "vcpus": instance.vcpus,
4440         "hypervisor": instance.hypervisor,
4441         }
4442
4443       htkind = instance.hypervisor
4444       if htkind == constants.HT_XEN_PVM:
4445         idict["kernel_path"] = instance.kernel_path
4446         idict["initrd_path"] = instance.initrd_path
4447
4448       if htkind == constants.HT_XEN_HVM:
4449         idict["hvm_boot_order"] = instance.hvm_boot_order
4450         idict["hvm_acpi"] = instance.hvm_acpi
4451         idict["hvm_pae"] = instance.hvm_pae
4452         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4453         idict["hvm_nic_type"] = instance.hvm_nic_type
4454         idict["hvm_disk_type"] = instance.hvm_disk_type
4455
4456       if htkind in constants.HTS_REQ_PORT:
4457         if instance.vnc_bind_address is None:
4458           vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4459         else:
4460           vnc_bind_address = instance.vnc_bind_address
4461         if instance.network_port is None:
4462           vnc_console_port = None
4463         elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4464           vnc_console_port = "%s:%s" % (instance.primary_node,
4465                                        instance.network_port)
4466         elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4467           vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4468                                                    instance.network_port,
4469                                                    instance.primary_node)
4470         else:
4471           vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4472                                         instance.network_port)
4473         idict["vnc_console_port"] = vnc_console_port
4474         idict["vnc_bind_address"] = vnc_bind_address
4475         idict["network_port"] = instance.network_port
4476
4477       result[instance.name] = idict
4478
4479     return result
4480
4481
4482 class LUSetInstanceParams(LogicalUnit):
4483   """Modifies an instances's parameters.
4484
4485   """
4486   HPATH = "instance-modify"
4487   HTYPE = constants.HTYPE_INSTANCE
4488   _OP_REQP = ["instance_name"]
4489   REQ_BGL = False
4490
4491   def ExpandNames(self):
4492     self._ExpandAndLockInstance()
4493
4494   def BuildHooksEnv(self):
4495     """Build hooks env.
4496
4497     This runs on the master, primary and secondaries.
4498
4499     """
4500     args = dict()
4501     if self.mem:
4502       args['memory'] = self.mem
4503     if self.vcpus:
4504       args['vcpus'] = self.vcpus
4505     if self.do_ip or self.do_bridge or self.mac:
4506       if self.do_ip:
4507         ip = self.ip
4508       else:
4509         ip = self.instance.nics[0].ip
4510       if self.bridge:
4511         bridge = self.bridge
4512       else:
4513         bridge = self.instance.nics[0].bridge
4514       if self.mac:
4515         mac = self.mac
4516       else:
4517         mac = self.instance.nics[0].mac
4518       args['nics'] = [(ip, bridge, mac)]
4519     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4520     nl = [self.cfg.GetMasterNode(),
4521           self.instance.primary_node] + list(self.instance.secondary_nodes)
4522     return env, nl, nl
4523
4524   def CheckPrereq(self):
4525     """Check prerequisites.
4526
4527     This only checks the instance list against the existing names.
4528
4529     """
4530     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4531     # a separate CheckArguments function, if we implement one, so the operation
4532     # can be aborted without waiting for any lock, should it have an error...
4533     self.mem = getattr(self.op, "mem", None)
4534     self.vcpus = getattr(self.op, "vcpus", None)
4535     self.ip = getattr(self.op, "ip", None)
4536     self.mac = getattr(self.op, "mac", None)
4537     self.bridge = getattr(self.op, "bridge", None)
4538     self.kernel_path = getattr(self.op, "kernel_path", None)
4539     self.initrd_path = getattr(self.op, "initrd_path", None)
4540     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4541     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4542     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4543     self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4544     self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4545     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4546     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4547     self.force = getattr(self.op, "force", None)
4548     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4549                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4550                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4551                  self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4552     if all_parms.count(None) == len(all_parms):
4553       raise errors.OpPrereqError("No changes submitted")
4554     if self.mem is not None:
4555       try:
4556         self.mem = int(self.mem)
4557       except ValueError, err:
4558         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4559     if self.vcpus is not None:
4560       try:
4561         self.vcpus = int(self.vcpus)
4562       except ValueError, err:
4563         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4564     if self.ip is not None:
4565       self.do_ip = True
4566       if self.ip.lower() == "none":
4567         self.ip = None
4568       else:
4569         if not utils.IsValidIP(self.ip):
4570           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4571     else:
4572       self.do_ip = False
4573     self.do_bridge = (self.bridge is not None)
4574     if self.mac is not None:
4575       if self.cfg.IsMacInUse(self.mac):
4576         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4577                                    self.mac)
4578       if not utils.IsValidMac(self.mac):
4579         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4580
4581     if self.kernel_path is not None:
4582       self.do_kernel_path = True
4583       if self.kernel_path == constants.VALUE_NONE:
4584         raise errors.OpPrereqError("Can't set instance to no kernel")
4585
4586       if self.kernel_path != constants.VALUE_DEFAULT:
4587         if not os.path.isabs(self.kernel_path):
4588           raise errors.OpPrereqError("The kernel path must be an absolute"
4589                                     " filename")
4590     else:
4591       self.do_kernel_path = False
4592
4593     if self.initrd_path is not None:
4594       self.do_initrd_path = True
4595       if self.initrd_path not in (constants.VALUE_NONE,
4596                                   constants.VALUE_DEFAULT):
4597         if not os.path.isabs(self.initrd_path):
4598           raise errors.OpPrereqError("The initrd path must be an absolute"
4599                                     " filename")
4600     else:
4601       self.do_initrd_path = False
4602
4603     # boot order verification
4604     if self.hvm_boot_order is not None:
4605       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4606         if len(self.hvm_boot_order.strip("acdn")) != 0:
4607           raise errors.OpPrereqError("invalid boot order specified,"
4608                                      " must be one or more of [acdn]"
4609                                      " or 'default'")
4610
4611     # hvm_cdrom_image_path verification
4612     if self.op.hvm_cdrom_image_path is not None:
4613       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4614               self.op.hvm_cdrom_image_path.lower() == "none"):
4615         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4616                                    " be an absolute path or None, not %s" %
4617                                    self.op.hvm_cdrom_image_path)
4618       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4619               self.op.hvm_cdrom_image_path.lower() == "none"):
4620         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4621                                    " regular file or a symlink pointing to"
4622                                    " an existing regular file, not %s" %
4623                                    self.op.hvm_cdrom_image_path)
4624
4625     # vnc_bind_address verification
4626     if self.op.vnc_bind_address is not None:
4627       if not utils.IsValidIP(self.op.vnc_bind_address):
4628         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4629                                    " like a valid IP address" %
4630                                    self.op.vnc_bind_address)
4631
4632     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4633     assert self.instance is not None, \
4634       "Cannot retrieve locked instance %s" % self.op.instance_name
4635     self.warn = []
4636     if self.mem is not None and not self.force:
4637       pnode = self.instance.primary_node
4638       nodelist = [pnode]
4639       nodelist.extend(instance.secondary_nodes)
4640       instance_info = self.rpc.call_instance_info(pnode, instance.name,
4641                                                   instance.hypervisor)
4642       nodeinfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
4643                                          instance.hypervisor)
4644
4645       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4646         # Assume the primary node is unreachable and go ahead
4647         self.warn.append("Can't get info from primary node %s" % pnode)
4648       else:
4649         if instance_info:
4650           current_mem = instance_info['memory']
4651         else:
4652           # Assume instance not running
4653           # (there is a slight race condition here, but it's not very probable,
4654           # and we have no other way to check)
4655           current_mem = 0
4656         miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4657         if miss_mem > 0:
4658           raise errors.OpPrereqError("This change will prevent the instance"
4659                                      " from starting, due to %d MB of memory"
4660                                      " missing on its primary node" % miss_mem)
4661
4662       for node in instance.secondary_nodes:
4663         if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4664           self.warn.append("Can't get info from secondary node %s" % node)
4665         elif self.mem > nodeinfo[node]['memory_free']:
4666           self.warn.append("Not enough memory to failover instance to secondary"
4667                            " node %s" % node)
4668
4669     # Xen HVM device type checks
4670     if instance.hypervisor == constants.HT_XEN_HVM:
4671       if self.op.hvm_nic_type is not None:
4672         if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4673           raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4674                                      " HVM  hypervisor" % self.op.hvm_nic_type)
4675       if self.op.hvm_disk_type is not None:
4676         if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4677           raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4678                                      " HVM hypervisor" % self.op.hvm_disk_type)
4679
4680     return
4681
4682   def Exec(self, feedback_fn):
4683     """Modifies an instance.
4684
4685     All parameters take effect only at the next restart of the instance.
4686     """
4687     # Process here the warnings from CheckPrereq, as we don't have a
4688     # feedback_fn there.
4689     for warn in self.warn:
4690       feedback_fn("WARNING: %s" % warn)
4691
4692     result = []
4693     instance = self.instance
4694     if self.mem:
4695       instance.memory = self.mem
4696       result.append(("mem", self.mem))
4697     if self.vcpus:
4698       instance.vcpus = self.vcpus
4699       result.append(("vcpus",  self.vcpus))
4700     if self.do_ip:
4701       instance.nics[0].ip = self.ip
4702       result.append(("ip", self.ip))
4703     if self.bridge:
4704       instance.nics[0].bridge = self.bridge
4705       result.append(("bridge", self.bridge))
4706     if self.mac:
4707       instance.nics[0].mac = self.mac
4708       result.append(("mac", self.mac))
4709     if self.do_kernel_path:
4710       instance.kernel_path = self.kernel_path
4711       result.append(("kernel_path", self.kernel_path))
4712     if self.do_initrd_path:
4713       instance.initrd_path = self.initrd_path
4714       result.append(("initrd_path", self.initrd_path))
4715     if self.hvm_boot_order:
4716       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4717         instance.hvm_boot_order = None
4718       else:
4719         instance.hvm_boot_order = self.hvm_boot_order
4720       result.append(("hvm_boot_order", self.hvm_boot_order))
4721     if self.hvm_acpi is not None:
4722       instance.hvm_acpi = self.hvm_acpi
4723       result.append(("hvm_acpi", self.hvm_acpi))
4724     if self.hvm_pae is not None:
4725       instance.hvm_pae = self.hvm_pae
4726       result.append(("hvm_pae", self.hvm_pae))
4727     if self.hvm_nic_type is not None:
4728       instance.hvm_nic_type = self.hvm_nic_type
4729       result.append(("hvm_nic_type", self.hvm_nic_type))
4730     if self.hvm_disk_type is not None:
4731       instance.hvm_disk_type = self.hvm_disk_type
4732       result.append(("hvm_disk_type", self.hvm_disk_type))
4733     if self.hvm_cdrom_image_path:
4734       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4735         instance.hvm_cdrom_image_path = None
4736       else:
4737         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4738       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4739     if self.vnc_bind_address:
4740       instance.vnc_bind_address = self.vnc_bind_address
4741       result.append(("vnc_bind_address", self.vnc_bind_address))
4742
4743     self.cfg.Update(instance)
4744
4745     return result
4746
4747
4748 class LUQueryExports(NoHooksLU):
4749   """Query the exports list
4750
4751   """
4752   _OP_REQP = ['nodes']
4753   REQ_BGL = False
4754
4755   def ExpandNames(self):
4756     self.needed_locks = {}
4757     self.share_locks[locking.LEVEL_NODE] = 1
4758     if not self.op.nodes:
4759       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4760     else:
4761       self.needed_locks[locking.LEVEL_NODE] = \
4762         _GetWantedNodes(self, self.op.nodes)
4763
4764   def CheckPrereq(self):
4765     """Check prerequisites.
4766
4767     """
4768     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4769
4770   def Exec(self, feedback_fn):
4771     """Compute the list of all the exported system images.
4772
4773     Returns:
4774       a dictionary with the structure node->(export-list)
4775       where export-list is a list of the instances exported on
4776       that node.
4777
4778     """
4779     return self.rpc.call_export_list(self.nodes)
4780
4781
4782 class LUExportInstance(LogicalUnit):
4783   """Export an instance to an image in the cluster.
4784
4785   """
4786   HPATH = "instance-export"
4787   HTYPE = constants.HTYPE_INSTANCE
4788   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4789   REQ_BGL = False
4790
4791   def ExpandNames(self):
4792     self._ExpandAndLockInstance()
4793     # FIXME: lock only instance primary and destination node
4794     #
4795     # Sad but true, for now we have do lock all nodes, as we don't know where
4796     # the previous export might be, and and in this LU we search for it and
4797     # remove it from its current node. In the future we could fix this by:
4798     #  - making a tasklet to search (share-lock all), then create the new one,
4799     #    then one to remove, after
4800     #  - removing the removal operation altoghether
4801     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4802
4803   def DeclareLocks(self, level):
4804     """Last minute lock declaration."""
4805     # All nodes are locked anyway, so nothing to do here.
4806
4807   def BuildHooksEnv(self):
4808     """Build hooks env.
4809
4810     This will run on the master, primary node and target node.
4811
4812     """
4813     env = {
4814       "EXPORT_NODE": self.op.target_node,
4815       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4816       }
4817     env.update(_BuildInstanceHookEnvByObject(self.instance))
4818     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4819           self.op.target_node]
4820     return env, nl, nl
4821
4822   def CheckPrereq(self):
4823     """Check prerequisites.
4824
4825     This checks that the instance and node names are valid.
4826
4827     """
4828     instance_name = self.op.instance_name
4829     self.instance = self.cfg.GetInstanceInfo(instance_name)
4830     assert self.instance is not None, \
4831           "Cannot retrieve locked instance %s" % self.op.instance_name
4832
4833     self.dst_node = self.cfg.GetNodeInfo(
4834       self.cfg.ExpandNodeName(self.op.target_node))
4835
4836     assert self.dst_node is not None, \
4837           "Cannot retrieve locked node %s" % self.op.target_node
4838
4839     # instance disk type verification
4840     for disk in self.instance.disks:
4841       if disk.dev_type == constants.LD_FILE:
4842         raise errors.OpPrereqError("Export not supported for instances with"
4843                                    " file-based disks")
4844
4845   def Exec(self, feedback_fn):
4846     """Export an instance to an image in the cluster.
4847
4848     """
4849     instance = self.instance
4850     dst_node = self.dst_node
4851     src_node = instance.primary_node
4852     if self.op.shutdown:
4853       # shutdown the instance, but not the disks
4854       if not self.rpc.call_instance_shutdown(src_node, instance):
4855         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4856                                  (instance.name, src_node))
4857
4858     vgname = self.cfg.GetVGName()
4859
4860     snap_disks = []
4861
4862     try:
4863       for disk in instance.disks:
4864         if disk.iv_name == "sda":
4865           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4866           new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4867
4868           if not new_dev_name:
4869             logger.Error("could not snapshot block device %s on node %s" %
4870                          (disk.logical_id[1], src_node))
4871           else:
4872             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4873                                       logical_id=(vgname, new_dev_name),
4874                                       physical_id=(vgname, new_dev_name),
4875                                       iv_name=disk.iv_name)
4876             snap_disks.append(new_dev)
4877
4878     finally:
4879       if self.op.shutdown and instance.status == "up":
4880         if not self.rpc.call_instance_start(src_node, instance, None):
4881           _ShutdownInstanceDisks(self, instance)
4882           raise errors.OpExecError("Could not start instance")
4883
4884     # TODO: check for size
4885
4886     cluster_name = self.cfg.GetClusterName()
4887     for dev in snap_disks:
4888       if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4889                                       instance, cluster_name):
4890         logger.Error("could not export block device %s from node %s to node %s"
4891                      % (dev.logical_id[1], src_node, dst_node.name))
4892       if not self.rpc.call_blockdev_remove(src_node, dev):
4893         logger.Error("could not remove snapshot block device %s from node %s" %
4894                      (dev.logical_id[1], src_node))
4895
4896     if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4897       logger.Error("could not finalize export for instance %s on node %s" %
4898                    (instance.name, dst_node.name))
4899
4900     nodelist = self.cfg.GetNodeList()
4901     nodelist.remove(dst_node.name)
4902
4903     # on one-node clusters nodelist will be empty after the removal
4904     # if we proceed the backup would be removed because OpQueryExports
4905     # substitutes an empty list with the full cluster node list.
4906     if nodelist:
4907       exportlist = self.rpc.call_export_list(nodelist)
4908       for node in exportlist:
4909         if instance.name in exportlist[node]:
4910           if not self.rpc.call_export_remove(node, instance.name):
4911             logger.Error("could not remove older export for instance %s"
4912                          " on node %s" % (instance.name, node))
4913
4914
4915 class LURemoveExport(NoHooksLU):
4916   """Remove exports related to the named instance.
4917
4918   """
4919   _OP_REQP = ["instance_name"]
4920   REQ_BGL = False
4921
4922   def ExpandNames(self):
4923     self.needed_locks = {}
4924     # We need all nodes to be locked in order for RemoveExport to work, but we
4925     # don't need to lock the instance itself, as nothing will happen to it (and
4926     # we can remove exports also for a removed instance)
4927     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4928
4929   def CheckPrereq(self):
4930     """Check prerequisites.
4931     """
4932     pass
4933
4934   def Exec(self, feedback_fn):
4935     """Remove any export.
4936
4937     """
4938     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4939     # If the instance was not found we'll try with the name that was passed in.
4940     # This will only work if it was an FQDN, though.
4941     fqdn_warn = False
4942     if not instance_name:
4943       fqdn_warn = True
4944       instance_name = self.op.instance_name
4945
4946     exportlist = self.rpc.call_export_list(self.acquired_locks[
4947       locking.LEVEL_NODE])
4948     found = False
4949     for node in exportlist:
4950       if instance_name in exportlist[node]:
4951         found = True
4952         if not self.rpc.call_export_remove(node, instance_name):
4953           logger.Error("could not remove export for instance %s"
4954                        " on node %s" % (instance_name, node))
4955
4956     if fqdn_warn and not found:
4957       feedback_fn("Export not found. If trying to remove an export belonging"
4958                   " to a deleted instance please use its Fully Qualified"
4959                   " Domain Name.")
4960
4961
4962 class TagsLU(NoHooksLU):
4963   """Generic tags LU.
4964
4965   This is an abstract class which is the parent of all the other tags LUs.
4966
4967   """
4968
4969   def ExpandNames(self):
4970     self.needed_locks = {}
4971     if self.op.kind == constants.TAG_NODE:
4972       name = self.cfg.ExpandNodeName(self.op.name)
4973       if name is None:
4974         raise errors.OpPrereqError("Invalid node name (%s)" %
4975                                    (self.op.name,))
4976       self.op.name = name
4977       self.needed_locks[locking.LEVEL_NODE] = name
4978     elif self.op.kind == constants.TAG_INSTANCE:
4979       name = self.cfg.ExpandInstanceName(self.op.name)
4980       if name is None:
4981         raise errors.OpPrereqError("Invalid instance name (%s)" %
4982                                    (self.op.name,))
4983       self.op.name = name
4984       self.needed_locks[locking.LEVEL_INSTANCE] = name
4985
4986   def CheckPrereq(self):
4987     """Check prerequisites.
4988
4989     """
4990     if self.op.kind == constants.TAG_CLUSTER:
4991       self.target = self.cfg.GetClusterInfo()
4992     elif self.op.kind == constants.TAG_NODE:
4993       self.target = self.cfg.GetNodeInfo(self.op.name)
4994     elif self.op.kind == constants.TAG_INSTANCE:
4995       self.target = self.cfg.GetInstanceInfo(self.op.name)
4996     else:
4997       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4998                                  str(self.op.kind))
4999
5000
5001 class LUGetTags(TagsLU):
5002   """Returns the tags of a given object.
5003
5004   """
5005   _OP_REQP = ["kind", "name"]
5006   REQ_BGL = False
5007
5008   def Exec(self, feedback_fn):
5009     """Returns the tag list.
5010
5011     """
5012     return list(self.target.GetTags())
5013
5014
5015 class LUSearchTags(NoHooksLU):
5016   """Searches the tags for a given pattern.
5017
5018   """
5019   _OP_REQP = ["pattern"]
5020   REQ_BGL = False
5021
5022   def ExpandNames(self):
5023     self.needed_locks = {}
5024
5025   def CheckPrereq(self):
5026     """Check prerequisites.
5027
5028     This checks the pattern passed for validity by compiling it.
5029
5030     """
5031     try:
5032       self.re = re.compile(self.op.pattern)
5033     except re.error, err:
5034       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5035                                  (self.op.pattern, err))
5036
5037   def Exec(self, feedback_fn):
5038     """Returns the tag list.
5039
5040     """
5041     cfg = self.cfg
5042     tgts = [("/cluster", cfg.GetClusterInfo())]
5043     ilist = cfg.GetAllInstancesInfo().values()
5044     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5045     nlist = cfg.GetAllNodesInfo().values()
5046     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5047     results = []
5048     for path, target in tgts:
5049       for tag in target.GetTags():
5050         if self.re.search(tag):
5051           results.append((path, tag))
5052     return results
5053
5054
5055 class LUAddTags(TagsLU):
5056   """Sets a tag on a given object.
5057
5058   """
5059   _OP_REQP = ["kind", "name", "tags"]
5060   REQ_BGL = False
5061
5062   def CheckPrereq(self):
5063     """Check prerequisites.
5064
5065     This checks the type and length of the tag name and value.
5066
5067     """
5068     TagsLU.CheckPrereq(self)
5069     for tag in self.op.tags:
5070       objects.TaggableObject.ValidateTag(tag)
5071
5072   def Exec(self, feedback_fn):
5073     """Sets the tag.
5074
5075     """
5076     try:
5077       for tag in self.op.tags:
5078         self.target.AddTag(tag)
5079     except errors.TagError, err:
5080       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5081     try:
5082       self.cfg.Update(self.target)
5083     except errors.ConfigurationError:
5084       raise errors.OpRetryError("There has been a modification to the"
5085                                 " config file and the operation has been"
5086                                 " aborted. Please retry.")
5087
5088
5089 class LUDelTags(TagsLU):
5090   """Delete a list of tags from a given object.
5091
5092   """
5093   _OP_REQP = ["kind", "name", "tags"]
5094   REQ_BGL = False
5095
5096   def CheckPrereq(self):
5097     """Check prerequisites.
5098
5099     This checks that we have the given tag.
5100
5101     """
5102     TagsLU.CheckPrereq(self)
5103     for tag in self.op.tags:
5104       objects.TaggableObject.ValidateTag(tag)
5105     del_tags = frozenset(self.op.tags)
5106     cur_tags = self.target.GetTags()
5107     if not del_tags <= cur_tags:
5108       diff_tags = del_tags - cur_tags
5109       diff_names = ["'%s'" % tag for tag in diff_tags]
5110       diff_names.sort()
5111       raise errors.OpPrereqError("Tag(s) %s not found" %
5112                                  (",".join(diff_names)))
5113
5114   def Exec(self, feedback_fn):
5115     """Remove the tag from the object.
5116
5117     """
5118     for tag in self.op.tags:
5119       self.target.RemoveTag(tag)
5120     try:
5121       self.cfg.Update(self.target)
5122     except errors.ConfigurationError:
5123       raise errors.OpRetryError("There has been a modification to the"
5124                                 " config file and the operation has been"
5125                                 " aborted. Please retry.")
5126
5127
5128 class LUTestDelay(NoHooksLU):
5129   """Sleep for a specified amount of time.
5130
5131   This LU sleeps on the master and/or nodes for a specified amount of
5132   time.
5133
5134   """
5135   _OP_REQP = ["duration", "on_master", "on_nodes"]
5136   REQ_BGL = False
5137
5138   def ExpandNames(self):
5139     """Expand names and set required locks.
5140
5141     This expands the node list, if any.
5142
5143     """
5144     self.needed_locks = {}
5145     if self.op.on_nodes:
5146       # _GetWantedNodes can be used here, but is not always appropriate to use
5147       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5148       # more information.
5149       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5150       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5151
5152   def CheckPrereq(self):
5153     """Check prerequisites.
5154
5155     """
5156
5157   def Exec(self, feedback_fn):
5158     """Do the actual sleep.
5159
5160     """
5161     if self.op.on_master:
5162       if not utils.TestDelay(self.op.duration):
5163         raise errors.OpExecError("Error during master delay test")
5164     if self.op.on_nodes:
5165       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5166       if not result:
5167         raise errors.OpExecError("Complete failure from rpc call")
5168       for node, node_result in result.items():
5169         if not node_result:
5170           raise errors.OpExecError("Failure during rpc call to node %s,"
5171                                    " result: %s" % (node, node_result))
5172
5173
5174 class IAllocator(object):
5175   """IAllocator framework.
5176
5177   An IAllocator instance has three sets of attributes:
5178     - cfg that is needed to query the cluster
5179     - input data (all members of the _KEYS class attribute are required)
5180     - four buffer attributes (in|out_data|text), that represent the
5181       input (to the external script) in text and data structure format,
5182       and the output from it, again in two formats
5183     - the result variables from the script (success, info, nodes) for
5184       easy usage
5185
5186   """
5187   _ALLO_KEYS = [
5188     "mem_size", "disks", "disk_template",
5189     "os", "tags", "nics", "vcpus",
5190     ]
5191   _RELO_KEYS = [
5192     "relocate_from",
5193     ]
5194
5195   def __init__(self, lu, mode, name, **kwargs):
5196     self.lu = lu
5197     # init buffer variables
5198     self.in_text = self.out_text = self.in_data = self.out_data = None
5199     # init all input fields so that pylint is happy
5200     self.mode = mode
5201     self.name = name
5202     self.mem_size = self.disks = self.disk_template = None
5203     self.os = self.tags = self.nics = self.vcpus = None
5204     self.relocate_from = None
5205     # computed fields
5206     self.required_nodes = None
5207     # init result fields
5208     self.success = self.info = self.nodes = None
5209     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5210       keyset = self._ALLO_KEYS
5211     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5212       keyset = self._RELO_KEYS
5213     else:
5214       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5215                                    " IAllocator" % self.mode)
5216     for key in kwargs:
5217       if key not in keyset:
5218         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5219                                      " IAllocator" % key)
5220       setattr(self, key, kwargs[key])
5221     for key in keyset:
5222       if key not in kwargs:
5223         raise errors.ProgrammerError("Missing input parameter '%s' to"
5224                                      " IAllocator" % key)
5225     self._BuildInputData()
5226
5227   def _ComputeClusterData(self):
5228     """Compute the generic allocator input data.
5229
5230     This is the data that is independent of the actual operation.
5231
5232     """
5233     cfg = self.lu.cfg
5234     cluster_info = cfg.GetClusterInfo()
5235     # cluster data
5236     data = {
5237       "version": 1,
5238       "cluster_name": cfg.GetClusterName(),
5239       "cluster_tags": list(cluster_info.GetTags()),
5240       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5241       # we don't have job IDs
5242       }
5243
5244     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5245
5246     # node data
5247     node_results = {}
5248     node_list = cfg.GetNodeList()
5249     # FIXME: here we have only one hypervisor information, but
5250     # instance can belong to different hypervisors
5251     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5252                                            cfg.GetHypervisorType())
5253     for nname in node_list:
5254       ninfo = cfg.GetNodeInfo(nname)
5255       if nname not in node_data or not isinstance(node_data[nname], dict):
5256         raise errors.OpExecError("Can't get data for node %s" % nname)
5257       remote_info = node_data[nname]
5258       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5259                    'vg_size', 'vg_free', 'cpu_total']:
5260         if attr not in remote_info:
5261           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5262                                    (nname, attr))
5263         try:
5264           remote_info[attr] = int(remote_info[attr])
5265         except ValueError, err:
5266           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5267                                    " %s" % (nname, attr, str(err)))
5268       # compute memory used by primary instances
5269       i_p_mem = i_p_up_mem = 0
5270       for iinfo in i_list:
5271         if iinfo.primary_node == nname:
5272           i_p_mem += iinfo.memory
5273           if iinfo.status == "up":
5274             i_p_up_mem += iinfo.memory
5275
5276       # compute memory used by instances
5277       pnr = {
5278         "tags": list(ninfo.GetTags()),
5279         "total_memory": remote_info['memory_total'],
5280         "reserved_memory": remote_info['memory_dom0'],
5281         "free_memory": remote_info['memory_free'],
5282         "i_pri_memory": i_p_mem,
5283         "i_pri_up_memory": i_p_up_mem,
5284         "total_disk": remote_info['vg_size'],
5285         "free_disk": remote_info['vg_free'],
5286         "primary_ip": ninfo.primary_ip,
5287         "secondary_ip": ninfo.secondary_ip,
5288         "total_cpus": remote_info['cpu_total'],
5289         }
5290       node_results[nname] = pnr
5291     data["nodes"] = node_results
5292
5293     # instance data
5294     instance_data = {}
5295     for iinfo in i_list:
5296       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5297                   for n in iinfo.nics]
5298       pir = {
5299         "tags": list(iinfo.GetTags()),
5300         "should_run": iinfo.status == "up",
5301         "vcpus": iinfo.vcpus,
5302         "memory": iinfo.memory,
5303         "os": iinfo.os,
5304         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5305         "nics": nic_data,
5306         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5307         "disk_template": iinfo.disk_template,
5308         "hypervisor": iinfo.hypervisor,
5309         }
5310       instance_data[iinfo.name] = pir
5311
5312     data["instances"] = instance_data
5313
5314     self.in_data = data
5315
5316   def _AddNewInstance(self):
5317     """Add new instance data to allocator structure.
5318
5319     This in combination with _AllocatorGetClusterData will create the
5320     correct structure needed as input for the allocator.
5321
5322     The checks for the completeness of the opcode must have already been
5323     done.
5324
5325     """
5326     data = self.in_data
5327     if len(self.disks) != 2:
5328       raise errors.OpExecError("Only two-disk configurations supported")
5329
5330     disk_space = _ComputeDiskSize(self.disk_template,
5331                                   self.disks[0]["size"], self.disks[1]["size"])
5332
5333     if self.disk_template in constants.DTS_NET_MIRROR:
5334       self.required_nodes = 2
5335     else:
5336       self.required_nodes = 1
5337     request = {
5338       "type": "allocate",
5339       "name": self.name,
5340       "disk_template": self.disk_template,
5341       "tags": self.tags,
5342       "os": self.os,
5343       "vcpus": self.vcpus,
5344       "memory": self.mem_size,
5345       "disks": self.disks,
5346       "disk_space_total": disk_space,
5347       "nics": self.nics,
5348       "required_nodes": self.required_nodes,
5349       }
5350     data["request"] = request
5351
5352   def _AddRelocateInstance(self):
5353     """Add relocate instance data to allocator structure.
5354
5355     This in combination with _IAllocatorGetClusterData will create the
5356     correct structure needed as input for the allocator.
5357
5358     The checks for the completeness of the opcode must have already been
5359     done.
5360
5361     """
5362     instance = self.lu.cfg.GetInstanceInfo(self.name)
5363     if instance is None:
5364       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5365                                    " IAllocator" % self.name)
5366
5367     if instance.disk_template not in constants.DTS_NET_MIRROR:
5368       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5369
5370     if len(instance.secondary_nodes) != 1:
5371       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5372
5373     self.required_nodes = 1
5374
5375     disk_space = _ComputeDiskSize(instance.disk_template,
5376                                   instance.disks[0].size,
5377                                   instance.disks[1].size)
5378
5379     request = {
5380       "type": "relocate",
5381       "name": self.name,
5382       "disk_space_total": disk_space,
5383       "required_nodes": self.required_nodes,
5384       "relocate_from": self.relocate_from,
5385       }
5386     self.in_data["request"] = request
5387
5388   def _BuildInputData(self):
5389     """Build input data structures.
5390
5391     """
5392     self._ComputeClusterData()
5393
5394     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5395       self._AddNewInstance()
5396     else:
5397       self._AddRelocateInstance()
5398
5399     self.in_text = serializer.Dump(self.in_data)
5400
5401   def Run(self, name, validate=True, call_fn=None):
5402     """Run an instance allocator and return the results.
5403
5404     """
5405     if call_fn is None:
5406       call_fn = self.lu.rpc.call_iallocator_runner
5407     data = self.in_text
5408
5409     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5410
5411     if not isinstance(result, (list, tuple)) or len(result) != 4:
5412       raise errors.OpExecError("Invalid result from master iallocator runner")
5413
5414     rcode, stdout, stderr, fail = result
5415
5416     if rcode == constants.IARUN_NOTFOUND:
5417       raise errors.OpExecError("Can't find allocator '%s'" % name)
5418     elif rcode == constants.IARUN_FAILURE:
5419       raise errors.OpExecError("Instance allocator call failed: %s,"
5420                                " output: %s" % (fail, stdout+stderr))
5421     self.out_text = stdout
5422     if validate:
5423       self._ValidateResult()
5424
5425   def _ValidateResult(self):
5426     """Process the allocator results.
5427
5428     This will process and if successful save the result in
5429     self.out_data and the other parameters.
5430
5431     """
5432     try:
5433       rdict = serializer.Load(self.out_text)
5434     except Exception, err:
5435       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5436
5437     if not isinstance(rdict, dict):
5438       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5439
5440     for key in "success", "info", "nodes":
5441       if key not in rdict:
5442         raise errors.OpExecError("Can't parse iallocator results:"
5443                                  " missing key '%s'" % key)
5444       setattr(self, key, rdict[key])
5445
5446     if not isinstance(rdict["nodes"], list):
5447       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5448                                " is not a list")
5449     self.out_data = rdict
5450
5451
5452 class LUTestAllocator(NoHooksLU):
5453   """Run allocator tests.
5454
5455   This LU runs the allocator tests
5456
5457   """
5458   _OP_REQP = ["direction", "mode", "name"]
5459
5460   def CheckPrereq(self):
5461     """Check prerequisites.
5462
5463     This checks the opcode parameters depending on the director and mode test.
5464
5465     """
5466     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5467       for attr in ["name", "mem_size", "disks", "disk_template",
5468                    "os", "tags", "nics", "vcpus"]:
5469         if not hasattr(self.op, attr):
5470           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5471                                      attr)
5472       iname = self.cfg.ExpandInstanceName(self.op.name)
5473       if iname is not None:
5474         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5475                                    iname)
5476       if not isinstance(self.op.nics, list):
5477         raise errors.OpPrereqError("Invalid parameter 'nics'")
5478       for row in self.op.nics:
5479         if (not isinstance(row, dict) or
5480             "mac" not in row or
5481             "ip" not in row or
5482             "bridge" not in row):
5483           raise errors.OpPrereqError("Invalid contents of the"
5484                                      " 'nics' parameter")
5485       if not isinstance(self.op.disks, list):
5486         raise errors.OpPrereqError("Invalid parameter 'disks'")
5487       if len(self.op.disks) != 2:
5488         raise errors.OpPrereqError("Only two-disk configurations supported")
5489       for row in self.op.disks:
5490         if (not isinstance(row, dict) or
5491             "size" not in row or
5492             not isinstance(row["size"], int) or
5493             "mode" not in row or
5494             row["mode"] not in ['r', 'w']):
5495           raise errors.OpPrereqError("Invalid contents of the"
5496                                      " 'disks' parameter")
5497     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5498       if not hasattr(self.op, "name"):
5499         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5500       fname = self.cfg.ExpandInstanceName(self.op.name)
5501       if fname is None:
5502         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5503                                    self.op.name)
5504       self.op.name = fname
5505       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5506     else:
5507       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5508                                  self.op.mode)
5509
5510     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5511       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5512         raise errors.OpPrereqError("Missing allocator name")
5513     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5514       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5515                                  self.op.direction)
5516
5517   def Exec(self, feedback_fn):
5518     """Run the allocator test.
5519
5520     """
5521     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5522       ial = IAllocator(self,
5523                        mode=self.op.mode,
5524                        name=self.op.name,
5525                        mem_size=self.op.mem_size,
5526                        disks=self.op.disks,
5527                        disk_template=self.op.disk_template,
5528                        os=self.op.os,
5529                        tags=self.op.tags,
5530                        nics=self.op.nics,
5531                        vcpus=self.op.vcpus,
5532                        )
5533     else:
5534       ial = IAllocator(self,
5535                        mode=self.op.mode,
5536                        name=self.op.name,
5537                        relocate_from=list(self.relocate_from),
5538                        )
5539
5540     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5541       result = ial.in_text
5542     else:
5543       ial.Run(self.op.allocator, validate=False)
5544       result = ial.out_text
5545     return result