backend.py change to get cluster name from master
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_BGL = True
69
70   def __init__(self, processor, op, context):
71     """Constructor for LogicalUnit.
72
73     This needs to be overriden in derived classes in order to check op
74     validity.
75
76     """
77     self.proc = processor
78     self.op = op
79     self.cfg = context.cfg
80     self.context = context
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90
91     for attr_name in self._OP_REQP:
92       attr_val = getattr(op, attr_name, None)
93       if attr_val is None:
94         raise errors.OpPrereqError("Required parameter '%s' missing" %
95                                    attr_name)
96
97     if not self.cfg.IsCluster():
98       raise errors.OpPrereqError("Cluster not initialized yet,"
99                                  " use 'gnt-cluster init' first.")
100     if self.REQ_MASTER:
101       master = self.cfg.GetMasterNode()
102       if master != utils.HostInfo().name:
103         raise errors.OpPrereqError("Commands must be run on the master"
104                                    " node %s" % master)
105
106   def __GetSSH(self):
107     """Returns the SshRunner object
108
109     """
110     if not self.__ssh:
111       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
112     return self.__ssh
113
114   ssh = property(fget=__GetSSH)
115
116   def ExpandNames(self):
117     """Expand names for this LU.
118
119     This method is called before starting to execute the opcode, and it should
120     update all the parameters of the opcode to their canonical form (e.g. a
121     short node name must be fully expanded after this method has successfully
122     completed). This way locking, hooks, logging, ecc. can work correctly.
123
124     LUs which implement this method must also populate the self.needed_locks
125     member, as a dict with lock levels as keys, and a list of needed lock names
126     as values. Rules:
127       - Use an empty dict if you don't need any lock
128       - If you don't need any lock at a particular level omit that level
129       - Don't put anything for the BGL level
130       - If you want all locks at a level use locking.ALL_SET as a value
131
132     If you need to share locks (rather than acquire them exclusively) at one
133     level you can modify self.share_locks, setting a true value (usually 1) for
134     that level. By default locks are not shared.
135
136     Examples:
137     # Acquire all nodes and one instance
138     self.needed_locks = {
139       locking.LEVEL_NODE: locking.ALL_SET,
140       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
141     }
142     # Acquire just two nodes
143     self.needed_locks = {
144       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145     }
146     # Acquire no locks
147     self.needed_locks = {} # No, you can't leave it to the default value None
148
149     """
150     # The implementation of this method is mandatory only if the new LU is
151     # concurrent, so that old LUs don't need to be changed all at the same
152     # time.
153     if self.REQ_BGL:
154       self.needed_locks = {} # Exclusive LUs don't need locks.
155     else:
156       raise NotImplementedError
157
158   def DeclareLocks(self, level):
159     """Declare LU locking needs for a level
160
161     While most LUs can just declare their locking needs at ExpandNames time,
162     sometimes there's the need to calculate some locks after having acquired
163     the ones before. This function is called just before acquiring locks at a
164     particular level, but after acquiring the ones at lower levels, and permits
165     such calculations. It can be used to modify self.needed_locks, and by
166     default it does nothing.
167
168     This function is only called if you have something already set in
169     self.needed_locks for the level.
170
171     @param level: Locking level which is going to be locked
172     @type level: member of ganeti.locking.LEVELS
173
174     """
175
176   def CheckPrereq(self):
177     """Check prerequisites for this LU.
178
179     This method should check that the prerequisites for the execution
180     of this LU are fulfilled. It can do internode communication, but
181     it should be idempotent - no cluster or system changes are
182     allowed.
183
184     The method should raise errors.OpPrereqError in case something is
185     not fulfilled. Its return value is ignored.
186
187     This method should also update all the parameters of the opcode to
188     their canonical form if it hasn't been done by ExpandNames before.
189
190     """
191     raise NotImplementedError
192
193   def Exec(self, feedback_fn):
194     """Execute the LU.
195
196     This method should implement the actual work. It should raise
197     errors.OpExecError for failures that are somewhat dealt with in
198     code, or expected.
199
200     """
201     raise NotImplementedError
202
203   def BuildHooksEnv(self):
204     """Build hooks environment for this LU.
205
206     This method should return a three-node tuple consisting of: a dict
207     containing the environment that will be used for running the
208     specific hook for this LU, a list of node names on which the hook
209     should run before the execution, and a list of node names on which
210     the hook should run after the execution.
211
212     The keys of the dict must not have 'GANETI_' prefixed as this will
213     be handled in the hooks runner. Also note additional keys will be
214     added by the hooks runner. If the LU doesn't define any
215     environment, an empty dict (and not None) should be returned.
216
217     No nodes should be returned as an empty list (and not None).
218
219     Note that if the HPATH for a LU class is None, this function will
220     not be called.
221
222     """
223     raise NotImplementedError
224
225   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226     """Notify the LU about the results of its hooks.
227
228     This method is called every time a hooks phase is executed, and notifies
229     the Logical Unit about the hooks' result. The LU can then use it to alter
230     its result based on the hooks.  By default the method does nothing and the
231     previous result is passed back unchanged but any LU can define it if it
232     wants to use the local cluster hook-scripts somehow.
233
234     Args:
235       phase: the hooks phase that has just been run
236       hooks_results: the results of the multi-node hooks rpc call
237       feedback_fn: function to send feedback back to the caller
238       lu_result: the previous result this LU had, or None in the PRE phase.
239
240     """
241     return lu_result
242
243   def _ExpandAndLockInstance(self):
244     """Helper function to expand and lock an instance.
245
246     Many LUs that work on an instance take its name in self.op.instance_name
247     and need to expand it and then declare the expanded name for locking. This
248     function does it, and then updates self.op.instance_name to the expanded
249     name. It also initializes needed_locks as a dict, if this hasn't been done
250     before.
251
252     """
253     if self.needed_locks is None:
254       self.needed_locks = {}
255     else:
256       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257         "_ExpandAndLockInstance called with instance-level locks set"
258     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259     if expanded_name is None:
260       raise errors.OpPrereqError("Instance '%s' not known" %
261                                   self.op.instance_name)
262     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263     self.op.instance_name = expanded_name
264
265   def _LockInstancesNodes(self, primary_only=False):
266     """Helper function to declare instances' nodes for locking.
267
268     This function should be called after locking one or more instances to lock
269     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270     with all primary or secondary nodes for instances already locked and
271     present in self.needed_locks[locking.LEVEL_INSTANCE].
272
273     It should be called from DeclareLocks, and for safety only works if
274     self.recalculate_locks[locking.LEVEL_NODE] is set.
275
276     In the future it may grow parameters to just lock some instance's nodes, or
277     to just lock primaries or secondary nodes, if needed.
278
279     If should be called in DeclareLocks in a way similar to:
280
281     if level == locking.LEVEL_NODE:
282       self._LockInstancesNodes()
283
284     @type primary_only: boolean
285     @param primary_only: only lock primary nodes of locked instances
286
287     """
288     assert locking.LEVEL_NODE in self.recalculate_locks, \
289       "_LockInstancesNodes helper function called with no nodes to recalculate"
290
291     # TODO: check if we're really been called with the instance locks held
292
293     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294     # future we might want to have different behaviors depending on the value
295     # of self.recalculate_locks[locking.LEVEL_NODE]
296     wanted_nodes = []
297     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298       instance = self.context.cfg.GetInstanceInfo(instance_name)
299       wanted_nodes.append(instance.primary_node)
300       if not primary_only:
301         wanted_nodes.extend(instance.secondary_nodes)
302
303     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
307
308     del self.recalculate_locks[locking.LEVEL_NODE]
309
310
311 class NoHooksLU(LogicalUnit):
312   """Simple LU which runs no hooks.
313
314   This LU is intended as a parent for other LogicalUnits which will
315   run no hooks, in order to reduce duplicate code.
316
317   """
318   HPATH = None
319   HTYPE = None
320
321
322 def _GetWantedNodes(lu, nodes):
323   """Returns list of checked and expanded node names.
324
325   Args:
326     nodes: List of nodes (strings) or None for all
327
328   """
329   if not isinstance(nodes, list):
330     raise errors.OpPrereqError("Invalid argument type 'nodes'")
331
332   if not nodes:
333     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334       " non-empty list of nodes whose name is to be expanded.")
335
336   wanted = []
337   for name in nodes:
338     node = lu.cfg.ExpandNodeName(name)
339     if node is None:
340       raise errors.OpPrereqError("No such node name '%s'" % name)
341     wanted.append(node)
342
343   return utils.NiceSort(wanted)
344
345
346 def _GetWantedInstances(lu, instances):
347   """Returns list of checked and expanded instance names.
348
349   Args:
350     instances: List of instances (strings) or None for all
351
352   """
353   if not isinstance(instances, list):
354     raise errors.OpPrereqError("Invalid argument type 'instances'")
355
356   if instances:
357     wanted = []
358
359     for name in instances:
360       instance = lu.cfg.ExpandInstanceName(name)
361       if instance is None:
362         raise errors.OpPrereqError("No such instance name '%s'" % name)
363       wanted.append(instance)
364
365   else:
366     wanted = lu.cfg.GetInstanceList()
367   return utils.NiceSort(wanted)
368
369
370 def _CheckOutputFields(static, dynamic, selected):
371   """Checks whether all selected fields are valid.
372
373   Args:
374     static: Static fields
375     dynamic: Dynamic fields
376
377   """
378   static_fields = frozenset(static)
379   dynamic_fields = frozenset(dynamic)
380
381   all_fields = static_fields | dynamic_fields
382
383   if not all_fields.issuperset(selected):
384     raise errors.OpPrereqError("Unknown output fields selected: %s"
385                                % ",".join(frozenset(selected).
386                                           difference(all_fields)))
387
388
389 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390                           memory, vcpus, nics):
391   """Builds instance related env variables for hooks from single variables.
392
393   Args:
394     secondary_nodes: List of secondary nodes as strings
395   """
396   env = {
397     "OP_TARGET": name,
398     "INSTANCE_NAME": name,
399     "INSTANCE_PRIMARY": primary_node,
400     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401     "INSTANCE_OS_TYPE": os_type,
402     "INSTANCE_STATUS": status,
403     "INSTANCE_MEMORY": memory,
404     "INSTANCE_VCPUS": vcpus,
405   }
406
407   if nics:
408     nic_count = len(nics)
409     for idx, (ip, bridge, mac) in enumerate(nics):
410       if ip is None:
411         ip = ""
412       env["INSTANCE_NIC%d_IP" % idx] = ip
413       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
415   else:
416     nic_count = 0
417
418   env["INSTANCE_NIC_COUNT"] = nic_count
419
420   return env
421
422
423 def _BuildInstanceHookEnvByObject(instance, override=None):
424   """Builds instance related env variables for hooks from an object.
425
426   Args:
427     instance: objects.Instance object of instance
428     override: dict of values to override
429   """
430   args = {
431     'name': instance.name,
432     'primary_node': instance.primary_node,
433     'secondary_nodes': instance.secondary_nodes,
434     'os_type': instance.os,
435     'status': instance.os,
436     'memory': instance.memory,
437     'vcpus': instance.vcpus,
438     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
439   }
440   if override:
441     args.update(override)
442   return _BuildInstanceHookEnv(**args)
443
444
445 def _CheckInstanceBridgesExist(instance):
446   """Check that the brigdes needed by an instance exist.
447
448   """
449   # check bridges existance
450   brlist = [nic.bridge for nic in instance.nics]
451   if not rpc.call_bridges_exist(instance.primary_node, brlist):
452     raise errors.OpPrereqError("one or more target bridges %s does not"
453                                " exist on destination node '%s'" %
454                                (brlist, instance.primary_node))
455
456
457 class LUDestroyCluster(NoHooksLU):
458   """Logical unit for destroying the cluster.
459
460   """
461   _OP_REQP = []
462
463   def CheckPrereq(self):
464     """Check prerequisites.
465
466     This checks whether the cluster is empty.
467
468     Any errors are signalled by raising errors.OpPrereqError.
469
470     """
471     master = self.cfg.GetMasterNode()
472
473     nodelist = self.cfg.GetNodeList()
474     if len(nodelist) != 1 or nodelist[0] != master:
475       raise errors.OpPrereqError("There are still %d node(s) in"
476                                  " this cluster." % (len(nodelist) - 1))
477     instancelist = self.cfg.GetInstanceList()
478     if instancelist:
479       raise errors.OpPrereqError("There are still %d instance(s) in"
480                                  " this cluster." % len(instancelist))
481
482   def Exec(self, feedback_fn):
483     """Destroys the cluster.
484
485     """
486     master = self.cfg.GetMasterNode()
487     if not rpc.call_node_stop_master(master, False):
488       raise errors.OpExecError("Could not disable the master role")
489     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
490     utils.CreateBackup(priv_key)
491     utils.CreateBackup(pub_key)
492     return master
493
494
495 class LUVerifyCluster(LogicalUnit):
496   """Verifies the cluster status.
497
498   """
499   HPATH = "cluster-verify"
500   HTYPE = constants.HTYPE_CLUSTER
501   _OP_REQP = ["skip_checks"]
502   REQ_BGL = False
503
504   def ExpandNames(self):
505     self.needed_locks = {
506       locking.LEVEL_NODE: locking.ALL_SET,
507       locking.LEVEL_INSTANCE: locking.ALL_SET,
508     }
509     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
510
511   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
512                   remote_version, feedback_fn):
513     """Run multiple tests against a node.
514
515     Test list:
516       - compares ganeti version
517       - checks vg existance and size > 20G
518       - checks config file checksum
519       - checks ssh to other nodes
520
521     Args:
522       node: name of the node to check
523       file_list: required list of files
524       local_cksum: dictionary of local files and their checksums
525
526     """
527     # compares ganeti version
528     local_version = constants.PROTOCOL_VERSION
529     if not remote_version:
530       feedback_fn("  - ERROR: connection to %s failed" % (node))
531       return True
532
533     if local_version != remote_version:
534       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
535                       (local_version, node, remote_version))
536       return True
537
538     # checks vg existance and size > 20G
539
540     bad = False
541     if not vglist:
542       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
543                       (node,))
544       bad = True
545     else:
546       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
547                                             constants.MIN_VG_SIZE)
548       if vgstatus:
549         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
550         bad = True
551
552     # checks config file checksum
553     # checks ssh to any
554
555     if 'filelist' not in node_result:
556       bad = True
557       feedback_fn("  - ERROR: node hasn't returned file checksum data")
558     else:
559       remote_cksum = node_result['filelist']
560       for file_name in file_list:
561         if file_name not in remote_cksum:
562           bad = True
563           feedback_fn("  - ERROR: file '%s' missing" % file_name)
564         elif remote_cksum[file_name] != local_cksum[file_name]:
565           bad = True
566           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
567
568     if 'nodelist' not in node_result:
569       bad = True
570       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
571     else:
572       if node_result['nodelist']:
573         bad = True
574         for node in node_result['nodelist']:
575           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
576                           (node, node_result['nodelist'][node]))
577     if 'node-net-test' not in node_result:
578       bad = True
579       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
580     else:
581       if node_result['node-net-test']:
582         bad = True
583         nlist = utils.NiceSort(node_result['node-net-test'].keys())
584         for node in nlist:
585           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
586                           (node, node_result['node-net-test'][node]))
587
588     hyp_result = node_result.get('hypervisor', None)
589     if hyp_result is not None:
590       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
591     return bad
592
593   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
594                       node_instance, feedback_fn):
595     """Verify an instance.
596
597     This function checks to see if the required block devices are
598     available on the instance's node.
599
600     """
601     bad = False
602
603     node_current = instanceconfig.primary_node
604
605     node_vol_should = {}
606     instanceconfig.MapLVsByNode(node_vol_should)
607
608     for node in node_vol_should:
609       for volume in node_vol_should[node]:
610         if node not in node_vol_is or volume not in node_vol_is[node]:
611           feedback_fn("  - ERROR: volume %s missing on node %s" %
612                           (volume, node))
613           bad = True
614
615     if not instanceconfig.status == 'down':
616       if (node_current not in node_instance or
617           not instance in node_instance[node_current]):
618         feedback_fn("  - ERROR: instance %s not running on node %s" %
619                         (instance, node_current))
620         bad = True
621
622     for node in node_instance:
623       if (not node == node_current):
624         if instance in node_instance[node]:
625           feedback_fn("  - ERROR: instance %s should not run on node %s" %
626                           (instance, node))
627           bad = True
628
629     return bad
630
631   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
632     """Verify if there are any unknown volumes in the cluster.
633
634     The .os, .swap and backup volumes are ignored. All other volumes are
635     reported as unknown.
636
637     """
638     bad = False
639
640     for node in node_vol_is:
641       for volume in node_vol_is[node]:
642         if node not in node_vol_should or volume not in node_vol_should[node]:
643           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
644                       (volume, node))
645           bad = True
646     return bad
647
648   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
649     """Verify the list of running instances.
650
651     This checks what instances are running but unknown to the cluster.
652
653     """
654     bad = False
655     for node in node_instance:
656       for runninginstance in node_instance[node]:
657         if runninginstance not in instancelist:
658           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
659                           (runninginstance, node))
660           bad = True
661     return bad
662
663   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
664     """Verify N+1 Memory Resilience.
665
666     Check that if one single node dies we can still start all the instances it
667     was primary for.
668
669     """
670     bad = False
671
672     for node, nodeinfo in node_info.iteritems():
673       # This code checks that every node which is now listed as secondary has
674       # enough memory to host all instances it is supposed to should a single
675       # other node in the cluster fail.
676       # FIXME: not ready for failover to an arbitrary node
677       # FIXME: does not support file-backed instances
678       # WARNING: we currently take into account down instances as well as up
679       # ones, considering that even if they're down someone might want to start
680       # them even in the event of a node failure.
681       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
682         needed_mem = 0
683         for instance in instances:
684           needed_mem += instance_cfg[instance].memory
685         if nodeinfo['mfree'] < needed_mem:
686           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
687                       " failovers should node %s fail" % (node, prinode))
688           bad = True
689     return bad
690
691   def CheckPrereq(self):
692     """Check prerequisites.
693
694     Transform the list of checks we're going to skip into a set and check that
695     all its members are valid.
696
697     """
698     self.skip_set = frozenset(self.op.skip_checks)
699     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
700       raise errors.OpPrereqError("Invalid checks to be skipped specified")
701
702   def BuildHooksEnv(self):
703     """Build hooks env.
704
705     Cluster-Verify hooks just rone in the post phase and their failure makes
706     the output be logged in the verify output and the verification to fail.
707
708     """
709     all_nodes = self.cfg.GetNodeList()
710     # TODO: populate the environment with useful information for verify hooks
711     env = {}
712     return env, [], all_nodes
713
714   def Exec(self, feedback_fn):
715     """Verify integrity of cluster, performing various test on nodes.
716
717     """
718     bad = False
719     feedback_fn("* Verifying global settings")
720     for msg in self.cfg.VerifyConfig():
721       feedback_fn("  - ERROR: %s" % msg)
722
723     vg_name = self.cfg.GetVGName()
724     nodelist = utils.NiceSort(self.cfg.GetNodeList())
725     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
726     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
727     i_non_redundant = [] # Non redundant instances
728     node_volume = {}
729     node_instance = {}
730     node_info = {}
731     instance_cfg = {}
732
733     # FIXME: verify OS list
734     # do local checksums
735     file_names = []
736     file_names.append(constants.SSL_CERT_FILE)
737     file_names.append(constants.CLUSTER_CONF_FILE)
738     local_checksums = utils.FingerprintFiles(file_names)
739
740     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
741     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
742     all_instanceinfo = rpc.call_instance_list(nodelist)
743     all_vglist = rpc.call_vg_list(nodelist)
744     node_verify_param = {
745       'filelist': file_names,
746       'nodelist': nodelist,
747       'hypervisor': None,
748       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
749                         for node in nodeinfo]
750       }
751     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param,
752                                       self.cfg.GetClusterName())
753     all_rversion = rpc.call_version(nodelist)
754     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
755
756     for node in nodelist:
757       feedback_fn("* Verifying node %s" % node)
758       result = self._VerifyNode(node, file_names, local_checksums,
759                                 all_vglist[node], all_nvinfo[node],
760                                 all_rversion[node], feedback_fn)
761       bad = bad or result
762
763       # node_volume
764       volumeinfo = all_volumeinfo[node]
765
766       if isinstance(volumeinfo, basestring):
767         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
768                     (node, volumeinfo[-400:].encode('string_escape')))
769         bad = True
770         node_volume[node] = {}
771       elif not isinstance(volumeinfo, dict):
772         feedback_fn("  - ERROR: connection to %s failed" % (node,))
773         bad = True
774         continue
775       else:
776         node_volume[node] = volumeinfo
777
778       # node_instance
779       nodeinstance = all_instanceinfo[node]
780       if type(nodeinstance) != list:
781         feedback_fn("  - ERROR: connection to %s failed" % (node,))
782         bad = True
783         continue
784
785       node_instance[node] = nodeinstance
786
787       # node_info
788       nodeinfo = all_ninfo[node]
789       if not isinstance(nodeinfo, dict):
790         feedback_fn("  - ERROR: connection to %s failed" % (node,))
791         bad = True
792         continue
793
794       try:
795         node_info[node] = {
796           "mfree": int(nodeinfo['memory_free']),
797           "dfree": int(nodeinfo['vg_free']),
798           "pinst": [],
799           "sinst": [],
800           # dictionary holding all instances this node is secondary for,
801           # grouped by their primary node. Each key is a cluster node, and each
802           # value is a list of instances which have the key as primary and the
803           # current node as secondary.  this is handy to calculate N+1 memory
804           # availability if you can only failover from a primary to its
805           # secondary.
806           "sinst-by-pnode": {},
807         }
808       except ValueError:
809         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
810         bad = True
811         continue
812
813     node_vol_should = {}
814
815     for instance in instancelist:
816       feedback_fn("* Verifying instance %s" % instance)
817       inst_config = self.cfg.GetInstanceInfo(instance)
818       result =  self._VerifyInstance(instance, inst_config, node_volume,
819                                      node_instance, feedback_fn)
820       bad = bad or result
821
822       inst_config.MapLVsByNode(node_vol_should)
823
824       instance_cfg[instance] = inst_config
825
826       pnode = inst_config.primary_node
827       if pnode in node_info:
828         node_info[pnode]['pinst'].append(instance)
829       else:
830         feedback_fn("  - ERROR: instance %s, connection to primary node"
831                     " %s failed" % (instance, pnode))
832         bad = True
833
834       # If the instance is non-redundant we cannot survive losing its primary
835       # node, so we are not N+1 compliant. On the other hand we have no disk
836       # templates with more than one secondary so that situation is not well
837       # supported either.
838       # FIXME: does not support file-backed instances
839       if len(inst_config.secondary_nodes) == 0:
840         i_non_redundant.append(instance)
841       elif len(inst_config.secondary_nodes) > 1:
842         feedback_fn("  - WARNING: multiple secondaries for instance %s"
843                     % instance)
844
845       for snode in inst_config.secondary_nodes:
846         if snode in node_info:
847           node_info[snode]['sinst'].append(instance)
848           if pnode not in node_info[snode]['sinst-by-pnode']:
849             node_info[snode]['sinst-by-pnode'][pnode] = []
850           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
851         else:
852           feedback_fn("  - ERROR: instance %s, connection to secondary node"
853                       " %s failed" % (instance, snode))
854
855     feedback_fn("* Verifying orphan volumes")
856     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
857                                        feedback_fn)
858     bad = bad or result
859
860     feedback_fn("* Verifying remaining instances")
861     result = self._VerifyOrphanInstances(instancelist, node_instance,
862                                          feedback_fn)
863     bad = bad or result
864
865     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
866       feedback_fn("* Verifying N+1 Memory redundancy")
867       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
868       bad = bad or result
869
870     feedback_fn("* Other Notes")
871     if i_non_redundant:
872       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
873                   % len(i_non_redundant))
874
875     return not bad
876
877   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
878     """Analize the post-hooks' result, handle it, and send some
879     nicely-formatted feedback back to the user.
880
881     Args:
882       phase: the hooks phase that has just been run
883       hooks_results: the results of the multi-node hooks rpc call
884       feedback_fn: function to send feedback back to the caller
885       lu_result: previous Exec result
886
887     """
888     # We only really run POST phase hooks, and are only interested in
889     # their results
890     if phase == constants.HOOKS_PHASE_POST:
891       # Used to change hooks' output to proper indentation
892       indent_re = re.compile('^', re.M)
893       feedback_fn("* Hooks Results")
894       if not hooks_results:
895         feedback_fn("  - ERROR: general communication failure")
896         lu_result = 1
897       else:
898         for node_name in hooks_results:
899           show_node_header = True
900           res = hooks_results[node_name]
901           if res is False or not isinstance(res, list):
902             feedback_fn("    Communication failure")
903             lu_result = 1
904             continue
905           for script, hkr, output in res:
906             if hkr == constants.HKR_FAIL:
907               # The node header is only shown once, if there are
908               # failing hooks on that node
909               if show_node_header:
910                 feedback_fn("  Node %s:" % node_name)
911                 show_node_header = False
912               feedback_fn("    ERROR: Script %s failed, output:" % script)
913               output = indent_re.sub('      ', output)
914               feedback_fn("%s" % output)
915               lu_result = 1
916
917       return lu_result
918
919
920 class LUVerifyDisks(NoHooksLU):
921   """Verifies the cluster disks status.
922
923   """
924   _OP_REQP = []
925   REQ_BGL = False
926
927   def ExpandNames(self):
928     self.needed_locks = {
929       locking.LEVEL_NODE: locking.ALL_SET,
930       locking.LEVEL_INSTANCE: locking.ALL_SET,
931     }
932     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
933
934   def CheckPrereq(self):
935     """Check prerequisites.
936
937     This has no prerequisites.
938
939     """
940     pass
941
942   def Exec(self, feedback_fn):
943     """Verify integrity of cluster disks.
944
945     """
946     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
947
948     vg_name = self.cfg.GetVGName()
949     nodes = utils.NiceSort(self.cfg.GetNodeList())
950     instances = [self.cfg.GetInstanceInfo(name)
951                  for name in self.cfg.GetInstanceList()]
952
953     nv_dict = {}
954     for inst in instances:
955       inst_lvs = {}
956       if (inst.status != "up" or
957           inst.disk_template not in constants.DTS_NET_MIRROR):
958         continue
959       inst.MapLVsByNode(inst_lvs)
960       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
961       for node, vol_list in inst_lvs.iteritems():
962         for vol in vol_list:
963           nv_dict[(node, vol)] = inst
964
965     if not nv_dict:
966       return result
967
968     node_lvs = rpc.call_volume_list(nodes, vg_name)
969
970     to_act = set()
971     for node in nodes:
972       # node_volume
973       lvs = node_lvs[node]
974
975       if isinstance(lvs, basestring):
976         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
977         res_nlvm[node] = lvs
978       elif not isinstance(lvs, dict):
979         logger.Info("connection to node %s failed or invalid data returned" %
980                     (node,))
981         res_nodes.append(node)
982         continue
983
984       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
985         inst = nv_dict.pop((node, lv_name), None)
986         if (not lv_online and inst is not None
987             and inst.name not in res_instances):
988           res_instances.append(inst.name)
989
990     # any leftover items in nv_dict are missing LVs, let's arrange the
991     # data better
992     for key, inst in nv_dict.iteritems():
993       if inst.name not in res_missing:
994         res_missing[inst.name] = []
995       res_missing[inst.name].append(key)
996
997     return result
998
999
1000 class LURenameCluster(LogicalUnit):
1001   """Rename the cluster.
1002
1003   """
1004   HPATH = "cluster-rename"
1005   HTYPE = constants.HTYPE_CLUSTER
1006   _OP_REQP = ["name"]
1007
1008   def BuildHooksEnv(self):
1009     """Build hooks env.
1010
1011     """
1012     env = {
1013       "OP_TARGET": self.cfg.GetClusterName(),
1014       "NEW_NAME": self.op.name,
1015       }
1016     mn = self.cfg.GetMasterNode()
1017     return env, [mn], [mn]
1018
1019   def CheckPrereq(self):
1020     """Verify that the passed name is a valid one.
1021
1022     """
1023     hostname = utils.HostInfo(self.op.name)
1024
1025     new_name = hostname.name
1026     self.ip = new_ip = hostname.ip
1027     old_name = self.cfg.GetClusterName()
1028     old_ip = self.cfg.GetMasterIP()
1029     if new_name == old_name and new_ip == old_ip:
1030       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1031                                  " cluster has changed")
1032     if new_ip != old_ip:
1033       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1034         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1035                                    " reachable on the network. Aborting." %
1036                                    new_ip)
1037
1038     self.op.name = new_name
1039
1040   def Exec(self, feedback_fn):
1041     """Rename the cluster.
1042
1043     """
1044     clustername = self.op.name
1045     ip = self.ip
1046
1047     # shutdown the master IP
1048     master = self.cfg.GetMasterNode()
1049     if not rpc.call_node_stop_master(master, False):
1050       raise errors.OpExecError("Could not disable the master role")
1051
1052     try:
1053       # modify the sstore
1054       # TODO: sstore
1055       ss.SetKey(ss.SS_MASTER_IP, ip)
1056       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1057
1058       # Distribute updated ss config to all nodes
1059       myself = self.cfg.GetNodeInfo(master)
1060       dist_nodes = self.cfg.GetNodeList()
1061       if myself.name in dist_nodes:
1062         dist_nodes.remove(myself.name)
1063
1064       logger.Debug("Copying updated ssconf data to all nodes")
1065       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1066         fname = ss.KeyToFilename(keyname)
1067         result = rpc.call_upload_file(dist_nodes, fname)
1068         for to_node in dist_nodes:
1069           if not result[to_node]:
1070             logger.Error("copy of file %s to node %s failed" %
1071                          (fname, to_node))
1072     finally:
1073       if not rpc.call_node_start_master(master, False):
1074         logger.Error("Could not re-enable the master role on the master,"
1075                      " please restart manually.")
1076
1077
1078 def _RecursiveCheckIfLVMBased(disk):
1079   """Check if the given disk or its children are lvm-based.
1080
1081   Args:
1082     disk: ganeti.objects.Disk object
1083
1084   Returns:
1085     boolean indicating whether a LD_LV dev_type was found or not
1086
1087   """
1088   if disk.children:
1089     for chdisk in disk.children:
1090       if _RecursiveCheckIfLVMBased(chdisk):
1091         return True
1092   return disk.dev_type == constants.LD_LV
1093
1094
1095 class LUSetClusterParams(LogicalUnit):
1096   """Change the parameters of the cluster.
1097
1098   """
1099   HPATH = "cluster-modify"
1100   HTYPE = constants.HTYPE_CLUSTER
1101   _OP_REQP = []
1102   REQ_BGL = False
1103
1104   def ExpandNames(self):
1105     # FIXME: in the future maybe other cluster params won't require checking on
1106     # all nodes to be modified.
1107     self.needed_locks = {
1108       locking.LEVEL_NODE: locking.ALL_SET,
1109     }
1110     self.share_locks[locking.LEVEL_NODE] = 1
1111
1112   def BuildHooksEnv(self):
1113     """Build hooks env.
1114
1115     """
1116     env = {
1117       "OP_TARGET": self.cfg.GetClusterName(),
1118       "NEW_VG_NAME": self.op.vg_name,
1119       }
1120     mn = self.cfg.GetMasterNode()
1121     return env, [mn], [mn]
1122
1123   def CheckPrereq(self):
1124     """Check prerequisites.
1125
1126     This checks whether the given params don't conflict and
1127     if the given volume group is valid.
1128
1129     """
1130     # FIXME: This only works because there is only one parameter that can be
1131     # changed or removed.
1132     if not self.op.vg_name:
1133       instances = self.cfg.GetAllInstancesInfo().values()
1134       for inst in instances:
1135         for disk in inst.disks:
1136           if _RecursiveCheckIfLVMBased(disk):
1137             raise errors.OpPrereqError("Cannot disable lvm storage while"
1138                                        " lvm-based instances exist")
1139
1140     # if vg_name not None, checks given volume group on all nodes
1141     if self.op.vg_name:
1142       node_list = self.acquired_locks[locking.LEVEL_NODE]
1143       vglist = rpc.call_vg_list(node_list)
1144       for node in node_list:
1145         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1146                                               constants.MIN_VG_SIZE)
1147         if vgstatus:
1148           raise errors.OpPrereqError("Error on node '%s': %s" %
1149                                      (node, vgstatus))
1150
1151   def Exec(self, feedback_fn):
1152     """Change the parameters of the cluster.
1153
1154     """
1155     if self.op.vg_name != self.cfg.GetVGName():
1156       self.cfg.SetVGName(self.op.vg_name)
1157     else:
1158       feedback_fn("Cluster LVM configuration already in desired"
1159                   " state, not changing")
1160
1161
1162 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1163   """Sleep and poll for an instance's disk to sync.
1164
1165   """
1166   if not instance.disks:
1167     return True
1168
1169   if not oneshot:
1170     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1171
1172   node = instance.primary_node
1173
1174   for dev in instance.disks:
1175     cfgw.SetDiskID(dev, node)
1176
1177   retries = 0
1178   while True:
1179     max_time = 0
1180     done = True
1181     cumul_degraded = False
1182     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1183     if not rstats:
1184       proc.LogWarning("Can't get any data from node %s" % node)
1185       retries += 1
1186       if retries >= 10:
1187         raise errors.RemoteError("Can't contact node %s for mirror data,"
1188                                  " aborting." % node)
1189       time.sleep(6)
1190       continue
1191     retries = 0
1192     for i in range(len(rstats)):
1193       mstat = rstats[i]
1194       if mstat is None:
1195         proc.LogWarning("Can't compute data for node %s/%s" %
1196                         (node, instance.disks[i].iv_name))
1197         continue
1198       # we ignore the ldisk parameter
1199       perc_done, est_time, is_degraded, _ = mstat
1200       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1201       if perc_done is not None:
1202         done = False
1203         if est_time is not None:
1204           rem_time = "%d estimated seconds remaining" % est_time
1205           max_time = est_time
1206         else:
1207           rem_time = "no time estimate"
1208         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1209                      (instance.disks[i].iv_name, perc_done, rem_time))
1210     if done or oneshot:
1211       break
1212
1213     time.sleep(min(60, max_time))
1214
1215   if done:
1216     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1217   return not cumul_degraded
1218
1219
1220 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1221   """Check that mirrors are not degraded.
1222
1223   The ldisk parameter, if True, will change the test from the
1224   is_degraded attribute (which represents overall non-ok status for
1225   the device(s)) to the ldisk (representing the local storage status).
1226
1227   """
1228   cfgw.SetDiskID(dev, node)
1229   if ldisk:
1230     idx = 6
1231   else:
1232     idx = 5
1233
1234   result = True
1235   if on_primary or dev.AssembleOnSecondary():
1236     rstats = rpc.call_blockdev_find(node, dev)
1237     if not rstats:
1238       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1239       result = False
1240     else:
1241       result = result and (not rstats[idx])
1242   if dev.children:
1243     for child in dev.children:
1244       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1245
1246   return result
1247
1248
1249 class LUDiagnoseOS(NoHooksLU):
1250   """Logical unit for OS diagnose/query.
1251
1252   """
1253   _OP_REQP = ["output_fields", "names"]
1254   REQ_BGL = False
1255
1256   def ExpandNames(self):
1257     if self.op.names:
1258       raise errors.OpPrereqError("Selective OS query not supported")
1259
1260     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1261     _CheckOutputFields(static=[],
1262                        dynamic=self.dynamic_fields,
1263                        selected=self.op.output_fields)
1264
1265     # Lock all nodes, in shared mode
1266     self.needed_locks = {}
1267     self.share_locks[locking.LEVEL_NODE] = 1
1268     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1269
1270   def CheckPrereq(self):
1271     """Check prerequisites.
1272
1273     """
1274
1275   @staticmethod
1276   def _DiagnoseByOS(node_list, rlist):
1277     """Remaps a per-node return list into an a per-os per-node dictionary
1278
1279       Args:
1280         node_list: a list with the names of all nodes
1281         rlist: a map with node names as keys and OS objects as values
1282
1283       Returns:
1284         map: a map with osnames as keys and as value another map, with
1285              nodes as
1286              keys and list of OS objects as values
1287              e.g. {"debian-etch": {"node1": [<object>,...],
1288                                    "node2": [<object>,]}
1289                   }
1290
1291     """
1292     all_os = {}
1293     for node_name, nr in rlist.iteritems():
1294       if not nr:
1295         continue
1296       for os_obj in nr:
1297         if os_obj.name not in all_os:
1298           # build a list of nodes for this os containing empty lists
1299           # for each node in node_list
1300           all_os[os_obj.name] = {}
1301           for nname in node_list:
1302             all_os[os_obj.name][nname] = []
1303         all_os[os_obj.name][node_name].append(os_obj)
1304     return all_os
1305
1306   def Exec(self, feedback_fn):
1307     """Compute the list of OSes.
1308
1309     """
1310     node_list = self.acquired_locks[locking.LEVEL_NODE]
1311     node_data = rpc.call_os_diagnose(node_list)
1312     if node_data == False:
1313       raise errors.OpExecError("Can't gather the list of OSes")
1314     pol = self._DiagnoseByOS(node_list, node_data)
1315     output = []
1316     for os_name, os_data in pol.iteritems():
1317       row = []
1318       for field in self.op.output_fields:
1319         if field == "name":
1320           val = os_name
1321         elif field == "valid":
1322           val = utils.all([osl and osl[0] for osl in os_data.values()])
1323         elif field == "node_status":
1324           val = {}
1325           for node_name, nos_list in os_data.iteritems():
1326             val[node_name] = [(v.status, v.path) for v in nos_list]
1327         else:
1328           raise errors.ParameterError(field)
1329         row.append(val)
1330       output.append(row)
1331
1332     return output
1333
1334
1335 class LURemoveNode(LogicalUnit):
1336   """Logical unit for removing a node.
1337
1338   """
1339   HPATH = "node-remove"
1340   HTYPE = constants.HTYPE_NODE
1341   _OP_REQP = ["node_name"]
1342
1343   def BuildHooksEnv(self):
1344     """Build hooks env.
1345
1346     This doesn't run on the target node in the pre phase as a failed
1347     node would then be impossible to remove.
1348
1349     """
1350     env = {
1351       "OP_TARGET": self.op.node_name,
1352       "NODE_NAME": self.op.node_name,
1353       }
1354     all_nodes = self.cfg.GetNodeList()
1355     all_nodes.remove(self.op.node_name)
1356     return env, all_nodes, all_nodes
1357
1358   def CheckPrereq(self):
1359     """Check prerequisites.
1360
1361     This checks:
1362      - the node exists in the configuration
1363      - it does not have primary or secondary instances
1364      - it's not the master
1365
1366     Any errors are signalled by raising errors.OpPrereqError.
1367
1368     """
1369     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1370     if node is None:
1371       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1372
1373     instance_list = self.cfg.GetInstanceList()
1374
1375     masternode = self.cfg.GetMasterNode()
1376     if node.name == masternode:
1377       raise errors.OpPrereqError("Node is the master node,"
1378                                  " you need to failover first.")
1379
1380     for instance_name in instance_list:
1381       instance = self.cfg.GetInstanceInfo(instance_name)
1382       if node.name == instance.primary_node:
1383         raise errors.OpPrereqError("Instance %s still running on the node,"
1384                                    " please remove first." % instance_name)
1385       if node.name in instance.secondary_nodes:
1386         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1387                                    " please remove first." % instance_name)
1388     self.op.node_name = node.name
1389     self.node = node
1390
1391   def Exec(self, feedback_fn):
1392     """Removes the node from the cluster.
1393
1394     """
1395     node = self.node
1396     logger.Info("stopping the node daemon and removing configs from node %s" %
1397                 node.name)
1398
1399     self.context.RemoveNode(node.name)
1400
1401     rpc.call_node_leave_cluster(node.name)
1402
1403
1404 class LUQueryNodes(NoHooksLU):
1405   """Logical unit for querying nodes.
1406
1407   """
1408   _OP_REQP = ["output_fields", "names"]
1409   REQ_BGL = False
1410
1411   def ExpandNames(self):
1412     self.dynamic_fields = frozenset([
1413       "dtotal", "dfree",
1414       "mtotal", "mnode", "mfree",
1415       "bootid",
1416       "ctotal",
1417       ])
1418
1419     self.static_fields = frozenset([
1420       "name", "pinst_cnt", "sinst_cnt",
1421       "pinst_list", "sinst_list",
1422       "pip", "sip", "tags",
1423       "serial_no",
1424       ])
1425
1426     _CheckOutputFields(static=self.static_fields,
1427                        dynamic=self.dynamic_fields,
1428                        selected=self.op.output_fields)
1429
1430     self.needed_locks = {}
1431     self.share_locks[locking.LEVEL_NODE] = 1
1432
1433     if self.op.names:
1434       self.wanted = _GetWantedNodes(self, self.op.names)
1435     else:
1436       self.wanted = locking.ALL_SET
1437
1438     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1439     if self.do_locking:
1440       # if we don't request only static fields, we need to lock the nodes
1441       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1442
1443
1444   def CheckPrereq(self):
1445     """Check prerequisites.
1446
1447     """
1448     # The validation of the node list is done in the _GetWantedNodes,
1449     # if non empty, and if empty, there's no validation to do
1450     pass
1451
1452   def Exec(self, feedback_fn):
1453     """Computes the list of nodes and their attributes.
1454
1455     """
1456     all_info = self.cfg.GetAllNodesInfo()
1457     if self.do_locking:
1458       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1459     elif self.wanted != locking.ALL_SET:
1460       nodenames = self.wanted
1461       missing = set(nodenames).difference(all_info.keys())
1462       if missing:
1463         raise self.OpExecError(
1464           "Some nodes were removed before retrieving their data: %s" % missing)
1465     else:
1466       nodenames = all_info.keys()
1467     nodelist = [all_info[name] for name in nodenames]
1468
1469     # begin data gathering
1470
1471     if self.dynamic_fields.intersection(self.op.output_fields):
1472       live_data = {}
1473       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1474       for name in nodenames:
1475         nodeinfo = node_data.get(name, None)
1476         if nodeinfo:
1477           live_data[name] = {
1478             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1479             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1480             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1481             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1482             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1483             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1484             "bootid": nodeinfo['bootid'],
1485             }
1486         else:
1487           live_data[name] = {}
1488     else:
1489       live_data = dict.fromkeys(nodenames, {})
1490
1491     node_to_primary = dict([(name, set()) for name in nodenames])
1492     node_to_secondary = dict([(name, set()) for name in nodenames])
1493
1494     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1495                              "sinst_cnt", "sinst_list"))
1496     if inst_fields & frozenset(self.op.output_fields):
1497       instancelist = self.cfg.GetInstanceList()
1498
1499       for instance_name in instancelist:
1500         inst = self.cfg.GetInstanceInfo(instance_name)
1501         if inst.primary_node in node_to_primary:
1502           node_to_primary[inst.primary_node].add(inst.name)
1503         for secnode in inst.secondary_nodes:
1504           if secnode in node_to_secondary:
1505             node_to_secondary[secnode].add(inst.name)
1506
1507     # end data gathering
1508
1509     output = []
1510     for node in nodelist:
1511       node_output = []
1512       for field in self.op.output_fields:
1513         if field == "name":
1514           val = node.name
1515         elif field == "pinst_list":
1516           val = list(node_to_primary[node.name])
1517         elif field == "sinst_list":
1518           val = list(node_to_secondary[node.name])
1519         elif field == "pinst_cnt":
1520           val = len(node_to_primary[node.name])
1521         elif field == "sinst_cnt":
1522           val = len(node_to_secondary[node.name])
1523         elif field == "pip":
1524           val = node.primary_ip
1525         elif field == "sip":
1526           val = node.secondary_ip
1527         elif field == "tags":
1528           val = list(node.GetTags())
1529         elif field == "serial_no":
1530           val = node.serial_no
1531         elif field in self.dynamic_fields:
1532           val = live_data[node.name].get(field, None)
1533         else:
1534           raise errors.ParameterError(field)
1535         node_output.append(val)
1536       output.append(node_output)
1537
1538     return output
1539
1540
1541 class LUQueryNodeVolumes(NoHooksLU):
1542   """Logical unit for getting volumes on node(s).
1543
1544   """
1545   _OP_REQP = ["nodes", "output_fields"]
1546   REQ_BGL = False
1547
1548   def ExpandNames(self):
1549     _CheckOutputFields(static=["node"],
1550                        dynamic=["phys", "vg", "name", "size", "instance"],
1551                        selected=self.op.output_fields)
1552
1553     self.needed_locks = {}
1554     self.share_locks[locking.LEVEL_NODE] = 1
1555     if not self.op.nodes:
1556       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1557     else:
1558       self.needed_locks[locking.LEVEL_NODE] = \
1559         _GetWantedNodes(self, self.op.nodes)
1560
1561   def CheckPrereq(self):
1562     """Check prerequisites.
1563
1564     This checks that the fields required are valid output fields.
1565
1566     """
1567     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1568
1569   def Exec(self, feedback_fn):
1570     """Computes the list of nodes and their attributes.
1571
1572     """
1573     nodenames = self.nodes
1574     volumes = rpc.call_node_volumes(nodenames)
1575
1576     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1577              in self.cfg.GetInstanceList()]
1578
1579     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1580
1581     output = []
1582     for node in nodenames:
1583       if node not in volumes or not volumes[node]:
1584         continue
1585
1586       node_vols = volumes[node][:]
1587       node_vols.sort(key=lambda vol: vol['dev'])
1588
1589       for vol in node_vols:
1590         node_output = []
1591         for field in self.op.output_fields:
1592           if field == "node":
1593             val = node
1594           elif field == "phys":
1595             val = vol['dev']
1596           elif field == "vg":
1597             val = vol['vg']
1598           elif field == "name":
1599             val = vol['name']
1600           elif field == "size":
1601             val = int(float(vol['size']))
1602           elif field == "instance":
1603             for inst in ilist:
1604               if node not in lv_by_node[inst]:
1605                 continue
1606               if vol['name'] in lv_by_node[inst][node]:
1607                 val = inst.name
1608                 break
1609             else:
1610               val = '-'
1611           else:
1612             raise errors.ParameterError(field)
1613           node_output.append(str(val))
1614
1615         output.append(node_output)
1616
1617     return output
1618
1619
1620 class LUAddNode(LogicalUnit):
1621   """Logical unit for adding node to the cluster.
1622
1623   """
1624   HPATH = "node-add"
1625   HTYPE = constants.HTYPE_NODE
1626   _OP_REQP = ["node_name"]
1627
1628   def BuildHooksEnv(self):
1629     """Build hooks env.
1630
1631     This will run on all nodes before, and on all nodes + the new node after.
1632
1633     """
1634     env = {
1635       "OP_TARGET": self.op.node_name,
1636       "NODE_NAME": self.op.node_name,
1637       "NODE_PIP": self.op.primary_ip,
1638       "NODE_SIP": self.op.secondary_ip,
1639       }
1640     nodes_0 = self.cfg.GetNodeList()
1641     nodes_1 = nodes_0 + [self.op.node_name, ]
1642     return env, nodes_0, nodes_1
1643
1644   def CheckPrereq(self):
1645     """Check prerequisites.
1646
1647     This checks:
1648      - the new node is not already in the config
1649      - it is resolvable
1650      - its parameters (single/dual homed) matches the cluster
1651
1652     Any errors are signalled by raising errors.OpPrereqError.
1653
1654     """
1655     node_name = self.op.node_name
1656     cfg = self.cfg
1657
1658     dns_data = utils.HostInfo(node_name)
1659
1660     node = dns_data.name
1661     primary_ip = self.op.primary_ip = dns_data.ip
1662     secondary_ip = getattr(self.op, "secondary_ip", None)
1663     if secondary_ip is None:
1664       secondary_ip = primary_ip
1665     if not utils.IsValidIP(secondary_ip):
1666       raise errors.OpPrereqError("Invalid secondary IP given")
1667     self.op.secondary_ip = secondary_ip
1668
1669     node_list = cfg.GetNodeList()
1670     if not self.op.readd and node in node_list:
1671       raise errors.OpPrereqError("Node %s is already in the configuration" %
1672                                  node)
1673     elif self.op.readd and node not in node_list:
1674       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1675
1676     for existing_node_name in node_list:
1677       existing_node = cfg.GetNodeInfo(existing_node_name)
1678
1679       if self.op.readd and node == existing_node_name:
1680         if (existing_node.primary_ip != primary_ip or
1681             existing_node.secondary_ip != secondary_ip):
1682           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1683                                      " address configuration as before")
1684         continue
1685
1686       if (existing_node.primary_ip == primary_ip or
1687           existing_node.secondary_ip == primary_ip or
1688           existing_node.primary_ip == secondary_ip or
1689           existing_node.secondary_ip == secondary_ip):
1690         raise errors.OpPrereqError("New node ip address(es) conflict with"
1691                                    " existing node %s" % existing_node.name)
1692
1693     # check that the type of the node (single versus dual homed) is the
1694     # same as for the master
1695     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1696     master_singlehomed = myself.secondary_ip == myself.primary_ip
1697     newbie_singlehomed = secondary_ip == primary_ip
1698     if master_singlehomed != newbie_singlehomed:
1699       if master_singlehomed:
1700         raise errors.OpPrereqError("The master has no private ip but the"
1701                                    " new node has one")
1702       else:
1703         raise errors.OpPrereqError("The master has a private ip but the"
1704                                    " new node doesn't have one")
1705
1706     # checks reachablity
1707     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1708       raise errors.OpPrereqError("Node not reachable by ping")
1709
1710     if not newbie_singlehomed:
1711       # check reachability from my secondary ip to newbie's secondary ip
1712       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1713                            source=myself.secondary_ip):
1714         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1715                                    " based ping to noded port")
1716
1717     self.new_node = objects.Node(name=node,
1718                                  primary_ip=primary_ip,
1719                                  secondary_ip=secondary_ip)
1720
1721   def Exec(self, feedback_fn):
1722     """Adds the new node to the cluster.
1723
1724     """
1725     new_node = self.new_node
1726     node = new_node.name
1727
1728     # check connectivity
1729     result = rpc.call_version([node])[node]
1730     if result:
1731       if constants.PROTOCOL_VERSION == result:
1732         logger.Info("communication to node %s fine, sw version %s match" %
1733                     (node, result))
1734       else:
1735         raise errors.OpExecError("Version mismatch master version %s,"
1736                                  " node version %s" %
1737                                  (constants.PROTOCOL_VERSION, result))
1738     else:
1739       raise errors.OpExecError("Cannot get version from the new node")
1740
1741     # setup ssh on node
1742     logger.Info("copy ssh key to node %s" % node)
1743     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1744     keyarray = []
1745     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1746                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1747                 priv_key, pub_key]
1748
1749     for i in keyfiles:
1750       f = open(i, 'r')
1751       try:
1752         keyarray.append(f.read())
1753       finally:
1754         f.close()
1755
1756     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1757                                keyarray[3], keyarray[4], keyarray[5])
1758
1759     if not result:
1760       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1761
1762     # Add node to our /etc/hosts, and add key to known_hosts
1763     utils.AddHostToEtcHosts(new_node.name)
1764
1765     if new_node.secondary_ip != new_node.primary_ip:
1766       if not rpc.call_node_tcp_ping(new_node.name,
1767                                     constants.LOCALHOST_IP_ADDRESS,
1768                                     new_node.secondary_ip,
1769                                     constants.DEFAULT_NODED_PORT,
1770                                     10, False):
1771         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1772                                  " you gave (%s). Please fix and re-run this"
1773                                  " command." % new_node.secondary_ip)
1774
1775     node_verify_list = [self.cfg.GetMasterNode()]
1776     node_verify_param = {
1777       'nodelist': [node],
1778       # TODO: do a node-net-test as well?
1779     }
1780
1781     result = rpc.call_node_verify(node_verify_list, node_verify_param,
1782                                   self.cfg.GetClusterName())
1783     for verifier in node_verify_list:
1784       if not result[verifier]:
1785         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1786                                  " for remote verification" % verifier)
1787       if result[verifier]['nodelist']:
1788         for failed in result[verifier]['nodelist']:
1789           feedback_fn("ssh/hostname verification failed %s -> %s" %
1790                       (verifier, result[verifier]['nodelist'][failed]))
1791         raise errors.OpExecError("ssh/hostname verification failed.")
1792
1793     # Distribute updated /etc/hosts and known_hosts to all nodes,
1794     # including the node just added
1795     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1796     dist_nodes = self.cfg.GetNodeList()
1797     if not self.op.readd:
1798       dist_nodes.append(node)
1799     if myself.name in dist_nodes:
1800       dist_nodes.remove(myself.name)
1801
1802     logger.Debug("Copying hosts and known_hosts to all nodes")
1803     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1804       result = rpc.call_upload_file(dist_nodes, fname)
1805       for to_node in dist_nodes:
1806         if not result[to_node]:
1807           logger.Error("copy of file %s to node %s failed" %
1808                        (fname, to_node))
1809
1810     to_copy = []
1811     if self.cfg.GetHypervisorType() == constants.HT_XEN_HVM31:
1812       to_copy.append(constants.VNC_PASSWORD_FILE)
1813     for fname in to_copy:
1814       result = rpc.call_upload_file([node], fname)
1815       if not result[node]:
1816         logger.Error("could not copy file %s to node %s" % (fname, node))
1817
1818     if self.op.readd:
1819       self.context.ReaddNode(new_node)
1820     else:
1821       self.context.AddNode(new_node)
1822
1823
1824 class LUQueryClusterInfo(NoHooksLU):
1825   """Query cluster configuration.
1826
1827   """
1828   _OP_REQP = []
1829   REQ_MASTER = False
1830   REQ_BGL = False
1831
1832   def ExpandNames(self):
1833     self.needed_locks = {}
1834
1835   def CheckPrereq(self):
1836     """No prerequsites needed for this LU.
1837
1838     """
1839     pass
1840
1841   def Exec(self, feedback_fn):
1842     """Return cluster config.
1843
1844     """
1845     result = {
1846       "name": self.cfg.GetClusterName(),
1847       "software_version": constants.RELEASE_VERSION,
1848       "protocol_version": constants.PROTOCOL_VERSION,
1849       "config_version": constants.CONFIG_VERSION,
1850       "os_api_version": constants.OS_API_VERSION,
1851       "export_version": constants.EXPORT_VERSION,
1852       "master": self.cfg.GetMasterNode(),
1853       "architecture": (platform.architecture()[0], platform.machine()),
1854       "hypervisor_type": self.cfg.GetHypervisorType(),
1855       }
1856
1857     return result
1858
1859
1860 class LUQueryConfigValues(NoHooksLU):
1861   """Return configuration values.
1862
1863   """
1864   _OP_REQP = []
1865   REQ_BGL = False
1866
1867   def ExpandNames(self):
1868     self.needed_locks = {}
1869
1870     static_fields = ["cluster_name", "master_node"]
1871     _CheckOutputFields(static=static_fields,
1872                        dynamic=[],
1873                        selected=self.op.output_fields)
1874
1875   def CheckPrereq(self):
1876     """No prerequisites.
1877
1878     """
1879     pass
1880
1881   def Exec(self, feedback_fn):
1882     """Dump a representation of the cluster config to the standard output.
1883
1884     """
1885     values = []
1886     for field in self.op.output_fields:
1887       if field == "cluster_name":
1888         values.append(self.cfg.GetClusterName())
1889       elif field == "master_node":
1890         values.append(self.cfg.GetMasterNode())
1891       else:
1892         raise errors.ParameterError(field)
1893     return values
1894
1895
1896 class LUActivateInstanceDisks(NoHooksLU):
1897   """Bring up an instance's disks.
1898
1899   """
1900   _OP_REQP = ["instance_name"]
1901   REQ_BGL = False
1902
1903   def ExpandNames(self):
1904     self._ExpandAndLockInstance()
1905     self.needed_locks[locking.LEVEL_NODE] = []
1906     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1907
1908   def DeclareLocks(self, level):
1909     if level == locking.LEVEL_NODE:
1910       self._LockInstancesNodes()
1911
1912   def CheckPrereq(self):
1913     """Check prerequisites.
1914
1915     This checks that the instance is in the cluster.
1916
1917     """
1918     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1919     assert self.instance is not None, \
1920       "Cannot retrieve locked instance %s" % self.op.instance_name
1921
1922   def Exec(self, feedback_fn):
1923     """Activate the disks.
1924
1925     """
1926     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1927     if not disks_ok:
1928       raise errors.OpExecError("Cannot activate block devices")
1929
1930     return disks_info
1931
1932
1933 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1934   """Prepare the block devices for an instance.
1935
1936   This sets up the block devices on all nodes.
1937
1938   Args:
1939     instance: a ganeti.objects.Instance object
1940     ignore_secondaries: if true, errors on secondary nodes won't result
1941                         in an error return from the function
1942
1943   Returns:
1944     false if the operation failed
1945     list of (host, instance_visible_name, node_visible_name) if the operation
1946          suceeded with the mapping from node devices to instance devices
1947   """
1948   device_info = []
1949   disks_ok = True
1950   iname = instance.name
1951   # With the two passes mechanism we try to reduce the window of
1952   # opportunity for the race condition of switching DRBD to primary
1953   # before handshaking occured, but we do not eliminate it
1954
1955   # The proper fix would be to wait (with some limits) until the
1956   # connection has been made and drbd transitions from WFConnection
1957   # into any other network-connected state (Connected, SyncTarget,
1958   # SyncSource, etc.)
1959
1960   # 1st pass, assemble on all nodes in secondary mode
1961   for inst_disk in instance.disks:
1962     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1963       cfg.SetDiskID(node_disk, node)
1964       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1965       if not result:
1966         logger.Error("could not prepare block device %s on node %s"
1967                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1968         if not ignore_secondaries:
1969           disks_ok = False
1970
1971   # FIXME: race condition on drbd migration to primary
1972
1973   # 2nd pass, do only the primary node
1974   for inst_disk in instance.disks:
1975     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1976       if node != instance.primary_node:
1977         continue
1978       cfg.SetDiskID(node_disk, node)
1979       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1980       if not result:
1981         logger.Error("could not prepare block device %s on node %s"
1982                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1983         disks_ok = False
1984     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1985
1986   # leave the disks configured for the primary node
1987   # this is a workaround that would be fixed better by
1988   # improving the logical/physical id handling
1989   for disk in instance.disks:
1990     cfg.SetDiskID(disk, instance.primary_node)
1991
1992   return disks_ok, device_info
1993
1994
1995 def _StartInstanceDisks(cfg, instance, force):
1996   """Start the disks of an instance.
1997
1998   """
1999   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2000                                            ignore_secondaries=force)
2001   if not disks_ok:
2002     _ShutdownInstanceDisks(instance, cfg)
2003     if force is not None and not force:
2004       logger.Error("If the message above refers to a secondary node,"
2005                    " you can retry the operation using '--force'.")
2006     raise errors.OpExecError("Disk consistency error")
2007
2008
2009 class LUDeactivateInstanceDisks(NoHooksLU):
2010   """Shutdown an instance's disks.
2011
2012   """
2013   _OP_REQP = ["instance_name"]
2014   REQ_BGL = False
2015
2016   def ExpandNames(self):
2017     self._ExpandAndLockInstance()
2018     self.needed_locks[locking.LEVEL_NODE] = []
2019     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2020
2021   def DeclareLocks(self, level):
2022     if level == locking.LEVEL_NODE:
2023       self._LockInstancesNodes()
2024
2025   def CheckPrereq(self):
2026     """Check prerequisites.
2027
2028     This checks that the instance is in the cluster.
2029
2030     """
2031     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2032     assert self.instance is not None, \
2033       "Cannot retrieve locked instance %s" % self.op.instance_name
2034
2035   def Exec(self, feedback_fn):
2036     """Deactivate the disks
2037
2038     """
2039     instance = self.instance
2040     _SafeShutdownInstanceDisks(instance, self.cfg)
2041
2042
2043 def _SafeShutdownInstanceDisks(instance, cfg):
2044   """Shutdown block devices of an instance.
2045
2046   This function checks if an instance is running, before calling
2047   _ShutdownInstanceDisks.
2048
2049   """
2050   ins_l = rpc.call_instance_list([instance.primary_node])
2051   ins_l = ins_l[instance.primary_node]
2052   if not type(ins_l) is list:
2053     raise errors.OpExecError("Can't contact node '%s'" %
2054                              instance.primary_node)
2055
2056   if instance.name in ins_l:
2057     raise errors.OpExecError("Instance is running, can't shutdown"
2058                              " block devices.")
2059
2060   _ShutdownInstanceDisks(instance, cfg)
2061
2062
2063 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2064   """Shutdown block devices of an instance.
2065
2066   This does the shutdown on all nodes of the instance.
2067
2068   If the ignore_primary is false, errors on the primary node are
2069   ignored.
2070
2071   """
2072   result = True
2073   for disk in instance.disks:
2074     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2075       cfg.SetDiskID(top_disk, node)
2076       if not rpc.call_blockdev_shutdown(node, top_disk):
2077         logger.Error("could not shutdown block device %s on node %s" %
2078                      (disk.iv_name, node))
2079         if not ignore_primary or node != instance.primary_node:
2080           result = False
2081   return result
2082
2083
2084 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2085   """Checks if a node has enough free memory.
2086
2087   This function check if a given node has the needed amount of free
2088   memory. In case the node has less memory or we cannot get the
2089   information from the node, this function raise an OpPrereqError
2090   exception.
2091
2092   Args:
2093     - cfg: a ConfigWriter instance
2094     - node: the node name
2095     - reason: string to use in the error message
2096     - requested: the amount of memory in MiB
2097
2098   """
2099   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2100   if not nodeinfo or not isinstance(nodeinfo, dict):
2101     raise errors.OpPrereqError("Could not contact node %s for resource"
2102                              " information" % (node,))
2103
2104   free_mem = nodeinfo[node].get('memory_free')
2105   if not isinstance(free_mem, int):
2106     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2107                              " was '%s'" % (node, free_mem))
2108   if requested > free_mem:
2109     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2110                              " needed %s MiB, available %s MiB" %
2111                              (node, reason, requested, free_mem))
2112
2113
2114 class LUStartupInstance(LogicalUnit):
2115   """Starts an instance.
2116
2117   """
2118   HPATH = "instance-start"
2119   HTYPE = constants.HTYPE_INSTANCE
2120   _OP_REQP = ["instance_name", "force"]
2121   REQ_BGL = False
2122
2123   def ExpandNames(self):
2124     self._ExpandAndLockInstance()
2125     self.needed_locks[locking.LEVEL_NODE] = []
2126     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2127
2128   def DeclareLocks(self, level):
2129     if level == locking.LEVEL_NODE:
2130       self._LockInstancesNodes()
2131
2132   def BuildHooksEnv(self):
2133     """Build hooks env.
2134
2135     This runs on master, primary and secondary nodes of the instance.
2136
2137     """
2138     env = {
2139       "FORCE": self.op.force,
2140       }
2141     env.update(_BuildInstanceHookEnvByObject(self.instance))
2142     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2143           list(self.instance.secondary_nodes))
2144     return env, nl, nl
2145
2146   def CheckPrereq(self):
2147     """Check prerequisites.
2148
2149     This checks that the instance is in the cluster.
2150
2151     """
2152     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2153     assert self.instance is not None, \
2154       "Cannot retrieve locked instance %s" % self.op.instance_name
2155
2156     # check bridges existance
2157     _CheckInstanceBridgesExist(instance)
2158
2159     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2160                          "starting instance %s" % instance.name,
2161                          instance.memory)
2162
2163   def Exec(self, feedback_fn):
2164     """Start the instance.
2165
2166     """
2167     instance = self.instance
2168     force = self.op.force
2169     extra_args = getattr(self.op, "extra_args", "")
2170
2171     self.cfg.MarkInstanceUp(instance.name)
2172
2173     node_current = instance.primary_node
2174
2175     _StartInstanceDisks(self.cfg, instance, force)
2176
2177     if not rpc.call_instance_start(node_current, instance, extra_args):
2178       _ShutdownInstanceDisks(instance, self.cfg)
2179       raise errors.OpExecError("Could not start instance")
2180
2181
2182 class LURebootInstance(LogicalUnit):
2183   """Reboot an instance.
2184
2185   """
2186   HPATH = "instance-reboot"
2187   HTYPE = constants.HTYPE_INSTANCE
2188   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2189   REQ_BGL = False
2190
2191   def ExpandNames(self):
2192     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2193                                    constants.INSTANCE_REBOOT_HARD,
2194                                    constants.INSTANCE_REBOOT_FULL]:
2195       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2196                                   (constants.INSTANCE_REBOOT_SOFT,
2197                                    constants.INSTANCE_REBOOT_HARD,
2198                                    constants.INSTANCE_REBOOT_FULL))
2199     self._ExpandAndLockInstance()
2200     self.needed_locks[locking.LEVEL_NODE] = []
2201     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2202
2203   def DeclareLocks(self, level):
2204     if level == locking.LEVEL_NODE:
2205       primary_only = not constants.INSTANCE_REBOOT_FULL
2206       self._LockInstancesNodes(primary_only=primary_only)
2207
2208   def BuildHooksEnv(self):
2209     """Build hooks env.
2210
2211     This runs on master, primary and secondary nodes of the instance.
2212
2213     """
2214     env = {
2215       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2216       }
2217     env.update(_BuildInstanceHookEnvByObject(self.instance))
2218     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2219           list(self.instance.secondary_nodes))
2220     return env, nl, nl
2221
2222   def CheckPrereq(self):
2223     """Check prerequisites.
2224
2225     This checks that the instance is in the cluster.
2226
2227     """
2228     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2229     assert self.instance is not None, \
2230       "Cannot retrieve locked instance %s" % self.op.instance_name
2231
2232     # check bridges existance
2233     _CheckInstanceBridgesExist(instance)
2234
2235   def Exec(self, feedback_fn):
2236     """Reboot the instance.
2237
2238     """
2239     instance = self.instance
2240     ignore_secondaries = self.op.ignore_secondaries
2241     reboot_type = self.op.reboot_type
2242     extra_args = getattr(self.op, "extra_args", "")
2243
2244     node_current = instance.primary_node
2245
2246     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2247                        constants.INSTANCE_REBOOT_HARD]:
2248       if not rpc.call_instance_reboot(node_current, instance,
2249                                       reboot_type, extra_args):
2250         raise errors.OpExecError("Could not reboot instance")
2251     else:
2252       if not rpc.call_instance_shutdown(node_current, instance):
2253         raise errors.OpExecError("could not shutdown instance for full reboot")
2254       _ShutdownInstanceDisks(instance, self.cfg)
2255       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2256       if not rpc.call_instance_start(node_current, instance, extra_args):
2257         _ShutdownInstanceDisks(instance, self.cfg)
2258         raise errors.OpExecError("Could not start instance for full reboot")
2259
2260     self.cfg.MarkInstanceUp(instance.name)
2261
2262
2263 class LUShutdownInstance(LogicalUnit):
2264   """Shutdown an instance.
2265
2266   """
2267   HPATH = "instance-stop"
2268   HTYPE = constants.HTYPE_INSTANCE
2269   _OP_REQP = ["instance_name"]
2270   REQ_BGL = False
2271
2272   def ExpandNames(self):
2273     self._ExpandAndLockInstance()
2274     self.needed_locks[locking.LEVEL_NODE] = []
2275     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2276
2277   def DeclareLocks(self, level):
2278     if level == locking.LEVEL_NODE:
2279       self._LockInstancesNodes()
2280
2281   def BuildHooksEnv(self):
2282     """Build hooks env.
2283
2284     This runs on master, primary and secondary nodes of the instance.
2285
2286     """
2287     env = _BuildInstanceHookEnvByObject(self.instance)
2288     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2289           list(self.instance.secondary_nodes))
2290     return env, nl, nl
2291
2292   def CheckPrereq(self):
2293     """Check prerequisites.
2294
2295     This checks that the instance is in the cluster.
2296
2297     """
2298     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2299     assert self.instance is not None, \
2300       "Cannot retrieve locked instance %s" % self.op.instance_name
2301
2302   def Exec(self, feedback_fn):
2303     """Shutdown the instance.
2304
2305     """
2306     instance = self.instance
2307     node_current = instance.primary_node
2308     self.cfg.MarkInstanceDown(instance.name)
2309     if not rpc.call_instance_shutdown(node_current, instance):
2310       logger.Error("could not shutdown instance")
2311
2312     _ShutdownInstanceDisks(instance, self.cfg)
2313
2314
2315 class LUReinstallInstance(LogicalUnit):
2316   """Reinstall an instance.
2317
2318   """
2319   HPATH = "instance-reinstall"
2320   HTYPE = constants.HTYPE_INSTANCE
2321   _OP_REQP = ["instance_name"]
2322   REQ_BGL = False
2323
2324   def ExpandNames(self):
2325     self._ExpandAndLockInstance()
2326     self.needed_locks[locking.LEVEL_NODE] = []
2327     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2328
2329   def DeclareLocks(self, level):
2330     if level == locking.LEVEL_NODE:
2331       self._LockInstancesNodes()
2332
2333   def BuildHooksEnv(self):
2334     """Build hooks env.
2335
2336     This runs on master, primary and secondary nodes of the instance.
2337
2338     """
2339     env = _BuildInstanceHookEnvByObject(self.instance)
2340     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2341           list(self.instance.secondary_nodes))
2342     return env, nl, nl
2343
2344   def CheckPrereq(self):
2345     """Check prerequisites.
2346
2347     This checks that the instance is in the cluster and is not running.
2348
2349     """
2350     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2351     assert instance is not None, \
2352       "Cannot retrieve locked instance %s" % self.op.instance_name
2353
2354     if instance.disk_template == constants.DT_DISKLESS:
2355       raise errors.OpPrereqError("Instance '%s' has no disks" %
2356                                  self.op.instance_name)
2357     if instance.status != "down":
2358       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2359                                  self.op.instance_name)
2360     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2361     if remote_info:
2362       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2363                                  (self.op.instance_name,
2364                                   instance.primary_node))
2365
2366     self.op.os_type = getattr(self.op, "os_type", None)
2367     if self.op.os_type is not None:
2368       # OS verification
2369       pnode = self.cfg.GetNodeInfo(
2370         self.cfg.ExpandNodeName(instance.primary_node))
2371       if pnode is None:
2372         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2373                                    self.op.pnode)
2374       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2375       if not os_obj:
2376         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2377                                    " primary node"  % self.op.os_type)
2378
2379     self.instance = instance
2380
2381   def Exec(self, feedback_fn):
2382     """Reinstall the instance.
2383
2384     """
2385     inst = self.instance
2386
2387     if self.op.os_type is not None:
2388       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2389       inst.os = self.op.os_type
2390       self.cfg.Update(inst)
2391
2392     _StartInstanceDisks(self.cfg, inst, None)
2393     try:
2394       feedback_fn("Running the instance OS create scripts...")
2395       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2396         raise errors.OpExecError("Could not install OS for instance %s"
2397                                  " on node %s" %
2398                                  (inst.name, inst.primary_node))
2399     finally:
2400       _ShutdownInstanceDisks(inst, self.cfg)
2401
2402
2403 class LURenameInstance(LogicalUnit):
2404   """Rename an instance.
2405
2406   """
2407   HPATH = "instance-rename"
2408   HTYPE = constants.HTYPE_INSTANCE
2409   _OP_REQP = ["instance_name", "new_name"]
2410
2411   def BuildHooksEnv(self):
2412     """Build hooks env.
2413
2414     This runs on master, primary and secondary nodes of the instance.
2415
2416     """
2417     env = _BuildInstanceHookEnvByObject(self.instance)
2418     env["INSTANCE_NEW_NAME"] = self.op.new_name
2419     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2420           list(self.instance.secondary_nodes))
2421     return env, nl, nl
2422
2423   def CheckPrereq(self):
2424     """Check prerequisites.
2425
2426     This checks that the instance is in the cluster and is not running.
2427
2428     """
2429     instance = self.cfg.GetInstanceInfo(
2430       self.cfg.ExpandInstanceName(self.op.instance_name))
2431     if instance is None:
2432       raise errors.OpPrereqError("Instance '%s' not known" %
2433                                  self.op.instance_name)
2434     if instance.status != "down":
2435       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2436                                  self.op.instance_name)
2437     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2438     if remote_info:
2439       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2440                                  (self.op.instance_name,
2441                                   instance.primary_node))
2442     self.instance = instance
2443
2444     # new name verification
2445     name_info = utils.HostInfo(self.op.new_name)
2446
2447     self.op.new_name = new_name = name_info.name
2448     instance_list = self.cfg.GetInstanceList()
2449     if new_name in instance_list:
2450       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2451                                  new_name)
2452
2453     if not getattr(self.op, "ignore_ip", False):
2454       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2455         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2456                                    (name_info.ip, new_name))
2457
2458
2459   def Exec(self, feedback_fn):
2460     """Reinstall the instance.
2461
2462     """
2463     inst = self.instance
2464     old_name = inst.name
2465
2466     if inst.disk_template == constants.DT_FILE:
2467       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2468
2469     self.cfg.RenameInstance(inst.name, self.op.new_name)
2470     # Change the instance lock. This is definitely safe while we hold the BGL
2471     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2472     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2473
2474     # re-read the instance from the configuration after rename
2475     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2476
2477     if inst.disk_template == constants.DT_FILE:
2478       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2479       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2480                                                 old_file_storage_dir,
2481                                                 new_file_storage_dir)
2482
2483       if not result:
2484         raise errors.OpExecError("Could not connect to node '%s' to rename"
2485                                  " directory '%s' to '%s' (but the instance"
2486                                  " has been renamed in Ganeti)" % (
2487                                  inst.primary_node, old_file_storage_dir,
2488                                  new_file_storage_dir))
2489
2490       if not result[0]:
2491         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2492                                  " (but the instance has been renamed in"
2493                                  " Ganeti)" % (old_file_storage_dir,
2494                                                new_file_storage_dir))
2495
2496     _StartInstanceDisks(self.cfg, inst, None)
2497     try:
2498       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2499                                           "sda", "sdb"):
2500         msg = ("Could not run OS rename script for instance %s on node %s"
2501                " (but the instance has been renamed in Ganeti)" %
2502                (inst.name, inst.primary_node))
2503         logger.Error(msg)
2504     finally:
2505       _ShutdownInstanceDisks(inst, self.cfg)
2506
2507
2508 class LURemoveInstance(LogicalUnit):
2509   """Remove an instance.
2510
2511   """
2512   HPATH = "instance-remove"
2513   HTYPE = constants.HTYPE_INSTANCE
2514   _OP_REQP = ["instance_name", "ignore_failures"]
2515   REQ_BGL = False
2516
2517   def ExpandNames(self):
2518     self._ExpandAndLockInstance()
2519     self.needed_locks[locking.LEVEL_NODE] = []
2520     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2521
2522   def DeclareLocks(self, level):
2523     if level == locking.LEVEL_NODE:
2524       self._LockInstancesNodes()
2525
2526   def BuildHooksEnv(self):
2527     """Build hooks env.
2528
2529     This runs on master, primary and secondary nodes of the instance.
2530
2531     """
2532     env = _BuildInstanceHookEnvByObject(self.instance)
2533     nl = [self.cfg.GetMasterNode()]
2534     return env, nl, nl
2535
2536   def CheckPrereq(self):
2537     """Check prerequisites.
2538
2539     This checks that the instance is in the cluster.
2540
2541     """
2542     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2543     assert self.instance is not None, \
2544       "Cannot retrieve locked instance %s" % self.op.instance_name
2545
2546   def Exec(self, feedback_fn):
2547     """Remove the instance.
2548
2549     """
2550     instance = self.instance
2551     logger.Info("shutting down instance %s on node %s" %
2552                 (instance.name, instance.primary_node))
2553
2554     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2555       if self.op.ignore_failures:
2556         feedback_fn("Warning: can't shutdown instance")
2557       else:
2558         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2559                                  (instance.name, instance.primary_node))
2560
2561     logger.Info("removing block devices for instance %s" % instance.name)
2562
2563     if not _RemoveDisks(instance, self.cfg):
2564       if self.op.ignore_failures:
2565         feedback_fn("Warning: can't remove instance's disks")
2566       else:
2567         raise errors.OpExecError("Can't remove instance's disks")
2568
2569     logger.Info("removing instance %s out of cluster config" % instance.name)
2570
2571     self.cfg.RemoveInstance(instance.name)
2572     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2573
2574
2575 class LUQueryInstances(NoHooksLU):
2576   """Logical unit for querying instances.
2577
2578   """
2579   _OP_REQP = ["output_fields", "names"]
2580   REQ_BGL = False
2581
2582   def ExpandNames(self):
2583     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2584     self.static_fields = frozenset([
2585       "name", "os", "pnode", "snodes",
2586       "admin_state", "admin_ram",
2587       "disk_template", "ip", "mac", "bridge",
2588       "sda_size", "sdb_size", "vcpus", "tags",
2589       "network_port", "kernel_path", "initrd_path",
2590       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2591       "hvm_cdrom_image_path", "hvm_nic_type",
2592       "hvm_disk_type", "vnc_bind_address",
2593       "serial_no",
2594       ])
2595     _CheckOutputFields(static=self.static_fields,
2596                        dynamic=self.dynamic_fields,
2597                        selected=self.op.output_fields)
2598
2599     self.needed_locks = {}
2600     self.share_locks[locking.LEVEL_INSTANCE] = 1
2601     self.share_locks[locking.LEVEL_NODE] = 1
2602
2603     if self.op.names:
2604       self.wanted = _GetWantedInstances(self, self.op.names)
2605     else:
2606       self.wanted = locking.ALL_SET
2607
2608     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2609     if self.do_locking:
2610       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2611       self.needed_locks[locking.LEVEL_NODE] = []
2612       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2613
2614   def DeclareLocks(self, level):
2615     if level == locking.LEVEL_NODE and self.do_locking:
2616       self._LockInstancesNodes()
2617
2618   def CheckPrereq(self):
2619     """Check prerequisites.
2620
2621     """
2622     pass
2623
2624   def Exec(self, feedback_fn):
2625     """Computes the list of nodes and their attributes.
2626
2627     """
2628     all_info = self.cfg.GetAllInstancesInfo()
2629     if self.do_locking:
2630       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2631     elif self.wanted != locking.ALL_SET:
2632       instance_names = self.wanted
2633       missing = set(instance_names).difference(all_info.keys())
2634       if missing:
2635         raise self.OpExecError(
2636           "Some instances were removed before retrieving their data: %s"
2637           % missing)
2638     else:
2639       instance_names = all_info.keys()
2640     instance_list = [all_info[iname] for iname in instance_names]
2641
2642     # begin data gathering
2643
2644     nodes = frozenset([inst.primary_node for inst in instance_list])
2645
2646     bad_nodes = []
2647     if self.dynamic_fields.intersection(self.op.output_fields):
2648       live_data = {}
2649       node_data = rpc.call_all_instances_info(nodes)
2650       for name in nodes:
2651         result = node_data[name]
2652         if result:
2653           live_data.update(result)
2654         elif result == False:
2655           bad_nodes.append(name)
2656         # else no instance is alive
2657     else:
2658       live_data = dict([(name, {}) for name in instance_names])
2659
2660     # end data gathering
2661
2662     output = []
2663     for instance in instance_list:
2664       iout = []
2665       for field in self.op.output_fields:
2666         if field == "name":
2667           val = instance.name
2668         elif field == "os":
2669           val = instance.os
2670         elif field == "pnode":
2671           val = instance.primary_node
2672         elif field == "snodes":
2673           val = list(instance.secondary_nodes)
2674         elif field == "admin_state":
2675           val = (instance.status != "down")
2676         elif field == "oper_state":
2677           if instance.primary_node in bad_nodes:
2678             val = None
2679           else:
2680             val = bool(live_data.get(instance.name))
2681         elif field == "status":
2682           if instance.primary_node in bad_nodes:
2683             val = "ERROR_nodedown"
2684           else:
2685             running = bool(live_data.get(instance.name))
2686             if running:
2687               if instance.status != "down":
2688                 val = "running"
2689               else:
2690                 val = "ERROR_up"
2691             else:
2692               if instance.status != "down":
2693                 val = "ERROR_down"
2694               else:
2695                 val = "ADMIN_down"
2696         elif field == "admin_ram":
2697           val = instance.memory
2698         elif field == "oper_ram":
2699           if instance.primary_node in bad_nodes:
2700             val = None
2701           elif instance.name in live_data:
2702             val = live_data[instance.name].get("memory", "?")
2703           else:
2704             val = "-"
2705         elif field == "disk_template":
2706           val = instance.disk_template
2707         elif field == "ip":
2708           val = instance.nics[0].ip
2709         elif field == "bridge":
2710           val = instance.nics[0].bridge
2711         elif field == "mac":
2712           val = instance.nics[0].mac
2713         elif field == "sda_size" or field == "sdb_size":
2714           disk = instance.FindDisk(field[:3])
2715           if disk is None:
2716             val = None
2717           else:
2718             val = disk.size
2719         elif field == "vcpus":
2720           val = instance.vcpus
2721         elif field == "tags":
2722           val = list(instance.GetTags())
2723         elif field == "serial_no":
2724           val = instance.serial_no
2725         elif field in ("network_port", "kernel_path", "initrd_path",
2726                        "hvm_boot_order", "hvm_acpi", "hvm_pae",
2727                        "hvm_cdrom_image_path", "hvm_nic_type",
2728                        "hvm_disk_type", "vnc_bind_address"):
2729           val = getattr(instance, field, None)
2730           if val is not None:
2731             pass
2732           elif field in ("hvm_nic_type", "hvm_disk_type",
2733                          "kernel_path", "initrd_path"):
2734             val = "default"
2735           else:
2736             val = "-"
2737         else:
2738           raise errors.ParameterError(field)
2739         iout.append(val)
2740       output.append(iout)
2741
2742     return output
2743
2744
2745 class LUFailoverInstance(LogicalUnit):
2746   """Failover an instance.
2747
2748   """
2749   HPATH = "instance-failover"
2750   HTYPE = constants.HTYPE_INSTANCE
2751   _OP_REQP = ["instance_name", "ignore_consistency"]
2752   REQ_BGL = False
2753
2754   def ExpandNames(self):
2755     self._ExpandAndLockInstance()
2756     self.needed_locks[locking.LEVEL_NODE] = []
2757     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2758
2759   def DeclareLocks(self, level):
2760     if level == locking.LEVEL_NODE:
2761       self._LockInstancesNodes()
2762
2763   def BuildHooksEnv(self):
2764     """Build hooks env.
2765
2766     This runs on master, primary and secondary nodes of the instance.
2767
2768     """
2769     env = {
2770       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2771       }
2772     env.update(_BuildInstanceHookEnvByObject(self.instance))
2773     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2774     return env, nl, nl
2775
2776   def CheckPrereq(self):
2777     """Check prerequisites.
2778
2779     This checks that the instance is in the cluster.
2780
2781     """
2782     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2783     assert self.instance is not None, \
2784       "Cannot retrieve locked instance %s" % self.op.instance_name
2785
2786     if instance.disk_template not in constants.DTS_NET_MIRROR:
2787       raise errors.OpPrereqError("Instance's disk layout is not"
2788                                  " network mirrored, cannot failover.")
2789
2790     secondary_nodes = instance.secondary_nodes
2791     if not secondary_nodes:
2792       raise errors.ProgrammerError("no secondary node but using "
2793                                    "a mirrored disk template")
2794
2795     target_node = secondary_nodes[0]
2796     # check memory requirements on the secondary node
2797     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2798                          instance.name, instance.memory)
2799
2800     # check bridge existance
2801     brlist = [nic.bridge for nic in instance.nics]
2802     if not rpc.call_bridges_exist(target_node, brlist):
2803       raise errors.OpPrereqError("One or more target bridges %s does not"
2804                                  " exist on destination node '%s'" %
2805                                  (brlist, target_node))
2806
2807   def Exec(self, feedback_fn):
2808     """Failover an instance.
2809
2810     The failover is done by shutting it down on its present node and
2811     starting it on the secondary.
2812
2813     """
2814     instance = self.instance
2815
2816     source_node = instance.primary_node
2817     target_node = instance.secondary_nodes[0]
2818
2819     feedback_fn("* checking disk consistency between source and target")
2820     for dev in instance.disks:
2821       # for drbd, these are drbd over lvm
2822       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2823         if instance.status == "up" and not self.op.ignore_consistency:
2824           raise errors.OpExecError("Disk %s is degraded on target node,"
2825                                    " aborting failover." % dev.iv_name)
2826
2827     feedback_fn("* shutting down instance on source node")
2828     logger.Info("Shutting down instance %s on node %s" %
2829                 (instance.name, source_node))
2830
2831     if not rpc.call_instance_shutdown(source_node, instance):
2832       if self.op.ignore_consistency:
2833         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2834                      " anyway. Please make sure node %s is down"  %
2835                      (instance.name, source_node, source_node))
2836       else:
2837         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2838                                  (instance.name, source_node))
2839
2840     feedback_fn("* deactivating the instance's disks on source node")
2841     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2842       raise errors.OpExecError("Can't shut down the instance's disks.")
2843
2844     instance.primary_node = target_node
2845     # distribute new instance config to the other nodes
2846     self.cfg.Update(instance)
2847
2848     # Only start the instance if it's marked as up
2849     if instance.status == "up":
2850       feedback_fn("* activating the instance's disks on target node")
2851       logger.Info("Starting instance %s on node %s" %
2852                   (instance.name, target_node))
2853
2854       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2855                                                ignore_secondaries=True)
2856       if not disks_ok:
2857         _ShutdownInstanceDisks(instance, self.cfg)
2858         raise errors.OpExecError("Can't activate the instance's disks")
2859
2860       feedback_fn("* starting the instance on the target node")
2861       if not rpc.call_instance_start(target_node, instance, None):
2862         _ShutdownInstanceDisks(instance, self.cfg)
2863         raise errors.OpExecError("Could not start instance %s on node %s." %
2864                                  (instance.name, target_node))
2865
2866
2867 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2868   """Create a tree of block devices on the primary node.
2869
2870   This always creates all devices.
2871
2872   """
2873   if device.children:
2874     for child in device.children:
2875       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2876         return False
2877
2878   cfg.SetDiskID(device, node)
2879   new_id = rpc.call_blockdev_create(node, device, device.size,
2880                                     instance.name, True, info)
2881   if not new_id:
2882     return False
2883   if device.physical_id is None:
2884     device.physical_id = new_id
2885   return True
2886
2887
2888 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2889   """Create a tree of block devices on a secondary node.
2890
2891   If this device type has to be created on secondaries, create it and
2892   all its children.
2893
2894   If not, just recurse to children keeping the same 'force' value.
2895
2896   """
2897   if device.CreateOnSecondary():
2898     force = True
2899   if device.children:
2900     for child in device.children:
2901       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2902                                         child, force, info):
2903         return False
2904
2905   if not force:
2906     return True
2907   cfg.SetDiskID(device, node)
2908   new_id = rpc.call_blockdev_create(node, device, device.size,
2909                                     instance.name, False, info)
2910   if not new_id:
2911     return False
2912   if device.physical_id is None:
2913     device.physical_id = new_id
2914   return True
2915
2916
2917 def _GenerateUniqueNames(cfg, exts):
2918   """Generate a suitable LV name.
2919
2920   This will generate a logical volume name for the given instance.
2921
2922   """
2923   results = []
2924   for val in exts:
2925     new_id = cfg.GenerateUniqueID()
2926     results.append("%s%s" % (new_id, val))
2927   return results
2928
2929
2930 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name,
2931                          p_minor, s_minor):
2932   """Generate a drbd8 device complete with its children.
2933
2934   """
2935   port = cfg.AllocatePort()
2936   vgname = cfg.GetVGName()
2937   shared_secret = cfg.GenerateDRBDSecret()
2938   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2939                           logical_id=(vgname, names[0]))
2940   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2941                           logical_id=(vgname, names[1]))
2942   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2943                           logical_id=(primary, secondary, port,
2944                                       p_minor, s_minor,
2945                                       shared_secret),
2946                           children=[dev_data, dev_meta],
2947                           iv_name=iv_name)
2948   return drbd_dev
2949
2950
2951 def _GenerateDiskTemplate(cfg, template_name,
2952                           instance_name, primary_node,
2953                           secondary_nodes, disk_sz, swap_sz,
2954                           file_storage_dir, file_driver):
2955   """Generate the entire disk layout for a given template type.
2956
2957   """
2958   #TODO: compute space requirements
2959
2960   vgname = cfg.GetVGName()
2961   if template_name == constants.DT_DISKLESS:
2962     disks = []
2963   elif template_name == constants.DT_PLAIN:
2964     if len(secondary_nodes) != 0:
2965       raise errors.ProgrammerError("Wrong template configuration")
2966
2967     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2968     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2969                            logical_id=(vgname, names[0]),
2970                            iv_name = "sda")
2971     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2972                            logical_id=(vgname, names[1]),
2973                            iv_name = "sdb")
2974     disks = [sda_dev, sdb_dev]
2975   elif template_name == constants.DT_DRBD8:
2976     if len(secondary_nodes) != 1:
2977       raise errors.ProgrammerError("Wrong template configuration")
2978     remote_node = secondary_nodes[0]
2979     (minor_pa, minor_pb,
2980      minor_sa, minor_sb) = cfg.AllocateDRBDMinor(
2981       [primary_node, primary_node, remote_node, remote_node], instance_name)
2982
2983     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2984                                        ".sdb_data", ".sdb_meta"])
2985     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2986                                         disk_sz, names[0:2], "sda",
2987                                         minor_pa, minor_sa)
2988     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2989                                         swap_sz, names[2:4], "sdb",
2990                                         minor_pb, minor_sb)
2991     disks = [drbd_sda_dev, drbd_sdb_dev]
2992   elif template_name == constants.DT_FILE:
2993     if len(secondary_nodes) != 0:
2994       raise errors.ProgrammerError("Wrong template configuration")
2995
2996     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2997                                 iv_name="sda", logical_id=(file_driver,
2998                                 "%s/sda" % file_storage_dir))
2999     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3000                                 iv_name="sdb", logical_id=(file_driver,
3001                                 "%s/sdb" % file_storage_dir))
3002     disks = [file_sda_dev, file_sdb_dev]
3003   else:
3004     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3005   return disks
3006
3007
3008 def _GetInstanceInfoText(instance):
3009   """Compute that text that should be added to the disk's metadata.
3010
3011   """
3012   return "originstname+%s" % instance.name
3013
3014
3015 def _CreateDisks(cfg, instance):
3016   """Create all disks for an instance.
3017
3018   This abstracts away some work from AddInstance.
3019
3020   Args:
3021     instance: the instance object
3022
3023   Returns:
3024     True or False showing the success of the creation process
3025
3026   """
3027   info = _GetInstanceInfoText(instance)
3028
3029   if instance.disk_template == constants.DT_FILE:
3030     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3031     result = rpc.call_file_storage_dir_create(instance.primary_node,
3032                                               file_storage_dir)
3033
3034     if not result:
3035       logger.Error("Could not connect to node '%s'" % instance.primary_node)
3036       return False
3037
3038     if not result[0]:
3039       logger.Error("failed to create directory '%s'" % file_storage_dir)
3040       return False
3041
3042   for device in instance.disks:
3043     logger.Info("creating volume %s for instance %s" %
3044                 (device.iv_name, instance.name))
3045     #HARDCODE
3046     for secondary_node in instance.secondary_nodes:
3047       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3048                                         device, False, info):
3049         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3050                      (device.iv_name, device, secondary_node))
3051         return False
3052     #HARDCODE
3053     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3054                                     instance, device, info):
3055       logger.Error("failed to create volume %s on primary!" %
3056                    device.iv_name)
3057       return False
3058
3059   return True
3060
3061
3062 def _RemoveDisks(instance, cfg):
3063   """Remove all disks for an instance.
3064
3065   This abstracts away some work from `AddInstance()` and
3066   `RemoveInstance()`. Note that in case some of the devices couldn't
3067   be removed, the removal will continue with the other ones (compare
3068   with `_CreateDisks()`).
3069
3070   Args:
3071     instance: the instance object
3072
3073   Returns:
3074     True or False showing the success of the removal proces
3075
3076   """
3077   logger.Info("removing block devices for instance %s" % instance.name)
3078
3079   result = True
3080   for device in instance.disks:
3081     for node, disk in device.ComputeNodeTree(instance.primary_node):
3082       cfg.SetDiskID(disk, node)
3083       if not rpc.call_blockdev_remove(node, disk):
3084         logger.Error("could not remove block device %s on node %s,"
3085                      " continuing anyway" %
3086                      (device.iv_name, node))
3087         result = False
3088
3089   if instance.disk_template == constants.DT_FILE:
3090     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3091     if not rpc.call_file_storage_dir_remove(instance.primary_node,
3092                                             file_storage_dir):
3093       logger.Error("could not remove directory '%s'" % file_storage_dir)
3094       result = False
3095
3096   return result
3097
3098
3099 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3100   """Compute disk size requirements in the volume group
3101
3102   This is currently hard-coded for the two-drive layout.
3103
3104   """
3105   # Required free disk space as a function of disk and swap space
3106   req_size_dict = {
3107     constants.DT_DISKLESS: None,
3108     constants.DT_PLAIN: disk_size + swap_size,
3109     # 256 MB are added for drbd metadata, 128MB for each drbd device
3110     constants.DT_DRBD8: disk_size + swap_size + 256,
3111     constants.DT_FILE: None,
3112   }
3113
3114   if disk_template not in req_size_dict:
3115     raise errors.ProgrammerError("Disk template '%s' size requirement"
3116                                  " is unknown" %  disk_template)
3117
3118   return req_size_dict[disk_template]
3119
3120
3121 class LUCreateInstance(LogicalUnit):
3122   """Create an instance.
3123
3124   """
3125   HPATH = "instance-add"
3126   HTYPE = constants.HTYPE_INSTANCE
3127   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3128               "disk_template", "swap_size", "mode", "start", "vcpus",
3129               "wait_for_sync", "ip_check", "mac"]
3130   REQ_BGL = False
3131
3132   def _ExpandNode(self, node):
3133     """Expands and checks one node name.
3134
3135     """
3136     node_full = self.cfg.ExpandNodeName(node)
3137     if node_full is None:
3138       raise errors.OpPrereqError("Unknown node %s" % node)
3139     return node_full
3140
3141   def ExpandNames(self):
3142     """ExpandNames for CreateInstance.
3143
3144     Figure out the right locks for instance creation.
3145
3146     """
3147     self.needed_locks = {}
3148
3149     # set optional parameters to none if they don't exist
3150     for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3151                  "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3152                  "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3153                  "vnc_bind_address"]:
3154       if not hasattr(self.op, attr):
3155         setattr(self.op, attr, None)
3156
3157     # verify creation mode
3158     if self.op.mode not in (constants.INSTANCE_CREATE,
3159                             constants.INSTANCE_IMPORT):
3160       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3161                                  self.op.mode)
3162     # disk template and mirror node verification
3163     if self.op.disk_template not in constants.DISK_TEMPLATES:
3164       raise errors.OpPrereqError("Invalid disk template name")
3165
3166     #### instance parameters check
3167
3168     # instance name verification
3169     hostname1 = utils.HostInfo(self.op.instance_name)
3170     self.op.instance_name = instance_name = hostname1.name
3171
3172     # this is just a preventive check, but someone might still add this
3173     # instance in the meantime, and creation will fail at lock-add time
3174     if instance_name in self.cfg.GetInstanceList():
3175       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3176                                  instance_name)
3177
3178     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3179
3180     # ip validity checks
3181     ip = getattr(self.op, "ip", None)
3182     if ip is None or ip.lower() == "none":
3183       inst_ip = None
3184     elif ip.lower() == "auto":
3185       inst_ip = hostname1.ip
3186     else:
3187       if not utils.IsValidIP(ip):
3188         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3189                                    " like a valid IP" % ip)
3190       inst_ip = ip
3191     self.inst_ip = self.op.ip = inst_ip
3192     # used in CheckPrereq for ip ping check
3193     self.check_ip = hostname1.ip
3194
3195     # MAC address verification
3196     if self.op.mac != "auto":
3197       if not utils.IsValidMac(self.op.mac.lower()):
3198         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3199                                    self.op.mac)
3200
3201     # boot order verification
3202     if self.op.hvm_boot_order is not None:
3203       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3204         raise errors.OpPrereqError("invalid boot order specified,"
3205                                    " must be one or more of [acdn]")
3206     # file storage checks
3207     if (self.op.file_driver and
3208         not self.op.file_driver in constants.FILE_DRIVER):
3209       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3210                                  self.op.file_driver)
3211
3212     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3213       raise errors.OpPrereqError("File storage directory path not absolute")
3214
3215     ### Node/iallocator related checks
3216     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3217       raise errors.OpPrereqError("One and only one of iallocator and primary"
3218                                  " node must be given")
3219
3220     if self.op.iallocator:
3221       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3222     else:
3223       self.op.pnode = self._ExpandNode(self.op.pnode)
3224       nodelist = [self.op.pnode]
3225       if self.op.snode is not None:
3226         self.op.snode = self._ExpandNode(self.op.snode)
3227         nodelist.append(self.op.snode)
3228       self.needed_locks[locking.LEVEL_NODE] = nodelist
3229
3230     # in case of import lock the source node too
3231     if self.op.mode == constants.INSTANCE_IMPORT:
3232       src_node = getattr(self.op, "src_node", None)
3233       src_path = getattr(self.op, "src_path", None)
3234
3235       if src_node is None or src_path is None:
3236         raise errors.OpPrereqError("Importing an instance requires source"
3237                                    " node and path options")
3238
3239       if not os.path.isabs(src_path):
3240         raise errors.OpPrereqError("The source path must be absolute")
3241
3242       self.op.src_node = src_node = self._ExpandNode(src_node)
3243       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3244         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3245
3246     else: # INSTANCE_CREATE
3247       if getattr(self.op, "os_type", None) is None:
3248         raise errors.OpPrereqError("No guest OS specified")
3249
3250   def _RunAllocator(self):
3251     """Run the allocator based on input opcode.
3252
3253     """
3254     disks = [{"size": self.op.disk_size, "mode": "w"},
3255              {"size": self.op.swap_size, "mode": "w"}]
3256     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3257              "bridge": self.op.bridge}]
3258     ial = IAllocator(self.cfg,
3259                      mode=constants.IALLOCATOR_MODE_ALLOC,
3260                      name=self.op.instance_name,
3261                      disk_template=self.op.disk_template,
3262                      tags=[],
3263                      os=self.op.os_type,
3264                      vcpus=self.op.vcpus,
3265                      mem_size=self.op.mem_size,
3266                      disks=disks,
3267                      nics=nics,
3268                      )
3269
3270     ial.Run(self.op.iallocator)
3271
3272     if not ial.success:
3273       raise errors.OpPrereqError("Can't compute nodes using"
3274                                  " iallocator '%s': %s" % (self.op.iallocator,
3275                                                            ial.info))
3276     if len(ial.nodes) != ial.required_nodes:
3277       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3278                                  " of nodes (%s), required %s" %
3279                                  (self.op.iallocator, len(ial.nodes),
3280                                   ial.required_nodes))
3281     self.op.pnode = ial.nodes[0]
3282     logger.ToStdout("Selected nodes for the instance: %s" %
3283                     (", ".join(ial.nodes),))
3284     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3285                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3286     if ial.required_nodes == 2:
3287       self.op.snode = ial.nodes[1]
3288
3289   def BuildHooksEnv(self):
3290     """Build hooks env.
3291
3292     This runs on master, primary and secondary nodes of the instance.
3293
3294     """
3295     env = {
3296       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3297       "INSTANCE_DISK_SIZE": self.op.disk_size,
3298       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3299       "INSTANCE_ADD_MODE": self.op.mode,
3300       }
3301     if self.op.mode == constants.INSTANCE_IMPORT:
3302       env["INSTANCE_SRC_NODE"] = self.op.src_node
3303       env["INSTANCE_SRC_PATH"] = self.op.src_path
3304       env["INSTANCE_SRC_IMAGE"] = self.src_image
3305
3306     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3307       primary_node=self.op.pnode,
3308       secondary_nodes=self.secondaries,
3309       status=self.instance_status,
3310       os_type=self.op.os_type,
3311       memory=self.op.mem_size,
3312       vcpus=self.op.vcpus,
3313       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3314     ))
3315
3316     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3317           self.secondaries)
3318     return env, nl, nl
3319
3320
3321   def CheckPrereq(self):
3322     """Check prerequisites.
3323
3324     """
3325     if (not self.cfg.GetVGName() and
3326         self.op.disk_template not in constants.DTS_NOT_LVM):
3327       raise errors.OpPrereqError("Cluster does not support lvm-based"
3328                                  " instances")
3329
3330     if self.op.mode == constants.INSTANCE_IMPORT:
3331       src_node = self.op.src_node
3332       src_path = self.op.src_path
3333
3334       export_info = rpc.call_export_info(src_node, src_path)
3335
3336       if not export_info:
3337         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3338
3339       if not export_info.has_section(constants.INISECT_EXP):
3340         raise errors.ProgrammerError("Corrupted export config")
3341
3342       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3343       if (int(ei_version) != constants.EXPORT_VERSION):
3344         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3345                                    (ei_version, constants.EXPORT_VERSION))
3346
3347       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3348         raise errors.OpPrereqError("Can't import instance with more than"
3349                                    " one data disk")
3350
3351       # FIXME: are the old os-es, disk sizes, etc. useful?
3352       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3353       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3354                                                          'disk0_dump'))
3355       self.src_image = diskimage
3356
3357     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3358
3359     if self.op.start and not self.op.ip_check:
3360       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3361                                  " adding an instance in start mode")
3362
3363     if self.op.ip_check:
3364       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3365         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3366                                    (self.check_ip, instance_name))
3367
3368     # bridge verification
3369     bridge = getattr(self.op, "bridge", None)
3370     if bridge is None:
3371       self.op.bridge = self.cfg.GetDefBridge()
3372     else:
3373       self.op.bridge = bridge
3374
3375     #### allocator run
3376
3377     if self.op.iallocator is not None:
3378       self._RunAllocator()
3379
3380     #### node related checks
3381
3382     # check primary node
3383     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3384     assert self.pnode is not None, \
3385       "Cannot retrieve locked node %s" % self.op.pnode
3386     self.secondaries = []
3387
3388     # mirror node verification
3389     if self.op.disk_template in constants.DTS_NET_MIRROR:
3390       if self.op.snode is None:
3391         raise errors.OpPrereqError("The networked disk templates need"
3392                                    " a mirror node")
3393       if self.op.snode == pnode.name:
3394         raise errors.OpPrereqError("The secondary node cannot be"
3395                                    " the primary node.")
3396       self.secondaries.append(self.op.snode)
3397
3398     req_size = _ComputeDiskSize(self.op.disk_template,
3399                                 self.op.disk_size, self.op.swap_size)
3400
3401     # Check lv size requirements
3402     if req_size is not None:
3403       nodenames = [pnode.name] + self.secondaries
3404       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3405       for node in nodenames:
3406         info = nodeinfo.get(node, None)
3407         if not info:
3408           raise errors.OpPrereqError("Cannot get current information"
3409                                      " from node '%s'" % node)
3410         vg_free = info.get('vg_free', None)
3411         if not isinstance(vg_free, int):
3412           raise errors.OpPrereqError("Can't compute free disk space on"
3413                                      " node %s" % node)
3414         if req_size > info['vg_free']:
3415           raise errors.OpPrereqError("Not enough disk space on target node %s."
3416                                      " %d MB available, %d MB required" %
3417                                      (node, info['vg_free'], req_size))
3418
3419     # os verification
3420     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3421     if not os_obj:
3422       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3423                                  " primary node"  % self.op.os_type)
3424
3425     if self.op.kernel_path == constants.VALUE_NONE:
3426       raise errors.OpPrereqError("Can't set instance kernel to none")
3427
3428     # bridge check on primary node
3429     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3430       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3431                                  " destination node '%s'" %
3432                                  (self.op.bridge, pnode.name))
3433
3434     # memory check on primary node
3435     if self.op.start:
3436       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3437                            "creating instance %s" % self.op.instance_name,
3438                            self.op.mem_size)
3439
3440     # hvm_cdrom_image_path verification
3441     if self.op.hvm_cdrom_image_path is not None:
3442       # FIXME (als): shouldn't these checks happen on the destination node?
3443       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3444         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3445                                    " be an absolute path or None, not %s" %
3446                                    self.op.hvm_cdrom_image_path)
3447       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3448         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3449                                    " regular file or a symlink pointing to"
3450                                    " an existing regular file, not %s" %
3451                                    self.op.hvm_cdrom_image_path)
3452
3453     # vnc_bind_address verification
3454     if self.op.vnc_bind_address is not None:
3455       if not utils.IsValidIP(self.op.vnc_bind_address):
3456         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3457                                    " like a valid IP address" %
3458                                    self.op.vnc_bind_address)
3459
3460     # Xen HVM device type checks
3461     if self.cfg.GetHypervisorType() == constants.HT_XEN_HVM31:
3462       if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3463         raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3464                                    " hypervisor" % self.op.hvm_nic_type)
3465       if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3466         raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3467                                    " hypervisor" % self.op.hvm_disk_type)
3468
3469     if self.op.start:
3470       self.instance_status = 'up'
3471     else:
3472       self.instance_status = 'down'
3473
3474   def Exec(self, feedback_fn):
3475     """Create and add the instance to the cluster.
3476
3477     """
3478     instance = self.op.instance_name
3479     pnode_name = self.pnode.name
3480
3481     if self.op.mac == "auto":
3482       mac_address = self.cfg.GenerateMAC()
3483     else:
3484       mac_address = self.op.mac
3485
3486     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3487     if self.inst_ip is not None:
3488       nic.ip = self.inst_ip
3489
3490     ht_kind = self.cfg.GetHypervisorType()
3491     if ht_kind in constants.HTS_REQ_PORT:
3492       network_port = self.cfg.AllocatePort()
3493     else:
3494       network_port = None
3495
3496     if self.op.vnc_bind_address is None:
3497       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3498
3499     # this is needed because os.path.join does not accept None arguments
3500     if self.op.file_storage_dir is None:
3501       string_file_storage_dir = ""
3502     else:
3503       string_file_storage_dir = self.op.file_storage_dir
3504
3505     # build the full file storage dir path
3506     file_storage_dir = os.path.normpath(os.path.join(
3507                                         self.cfg.GetFileStorageDir(),
3508                                         string_file_storage_dir, instance))
3509
3510
3511     disks = _GenerateDiskTemplate(self.cfg,
3512                                   self.op.disk_template,
3513                                   instance, pnode_name,
3514                                   self.secondaries, self.op.disk_size,
3515                                   self.op.swap_size,
3516                                   file_storage_dir,
3517                                   self.op.file_driver)
3518
3519     iobj = objects.Instance(name=instance, os=self.op.os_type,
3520                             primary_node=pnode_name,
3521                             memory=self.op.mem_size,
3522                             vcpus=self.op.vcpus,
3523                             nics=[nic], disks=disks,
3524                             disk_template=self.op.disk_template,
3525                             status=self.instance_status,
3526                             network_port=network_port,
3527                             kernel_path=self.op.kernel_path,
3528                             initrd_path=self.op.initrd_path,
3529                             hvm_boot_order=self.op.hvm_boot_order,
3530                             hvm_acpi=self.op.hvm_acpi,
3531                             hvm_pae=self.op.hvm_pae,
3532                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3533                             vnc_bind_address=self.op.vnc_bind_address,
3534                             hvm_nic_type=self.op.hvm_nic_type,
3535                             hvm_disk_type=self.op.hvm_disk_type,
3536                             )
3537
3538     feedback_fn("* creating instance disks...")
3539     if not _CreateDisks(self.cfg, iobj):
3540       _RemoveDisks(iobj, self.cfg)
3541       self.cfg.ReleaseDRBDMinors(instance)
3542       raise errors.OpExecError("Device creation failed, reverting...")
3543
3544     feedback_fn("adding instance %s to cluster config" % instance)
3545
3546     self.cfg.AddInstance(iobj)
3547     # Declare that we don't want to remove the instance lock anymore, as we've
3548     # added the instance to the config
3549     del self.remove_locks[locking.LEVEL_INSTANCE]
3550     # Remove the temp. assignements for the instance's drbds
3551     self.cfg.ReleaseDRBDMinors(instance)
3552
3553     if self.op.wait_for_sync:
3554       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3555     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3556       # make sure the disks are not degraded (still sync-ing is ok)
3557       time.sleep(15)
3558       feedback_fn("* checking mirrors status")
3559       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3560     else:
3561       disk_abort = False
3562
3563     if disk_abort:
3564       _RemoveDisks(iobj, self.cfg)
3565       self.cfg.RemoveInstance(iobj.name)
3566       # Make sure the instance lock gets removed
3567       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3568       raise errors.OpExecError("There are some degraded disks for"
3569                                " this instance")
3570
3571     feedback_fn("creating os for instance %s on node %s" %
3572                 (instance, pnode_name))
3573
3574     if iobj.disk_template != constants.DT_DISKLESS:
3575       if self.op.mode == constants.INSTANCE_CREATE:
3576         feedback_fn("* running the instance OS create scripts...")
3577         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3578           raise errors.OpExecError("could not add os for instance %s"
3579                                    " on node %s" %
3580                                    (instance, pnode_name))
3581
3582       elif self.op.mode == constants.INSTANCE_IMPORT:
3583         feedback_fn("* running the instance OS import scripts...")
3584         src_node = self.op.src_node
3585         src_image = self.src_image
3586         cluster_name = self.cfg.GetClusterName()
3587         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3588                                            src_node, src_image, cluster_name):
3589           raise errors.OpExecError("Could not import os for instance"
3590                                    " %s on node %s" %
3591                                    (instance, pnode_name))
3592       else:
3593         # also checked in the prereq part
3594         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3595                                      % self.op.mode)
3596
3597     if self.op.start:
3598       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3599       feedback_fn("* starting instance...")
3600       if not rpc.call_instance_start(pnode_name, iobj, None):
3601         raise errors.OpExecError("Could not start instance")
3602
3603
3604 class LUConnectConsole(NoHooksLU):
3605   """Connect to an instance's console.
3606
3607   This is somewhat special in that it returns the command line that
3608   you need to run on the master node in order to connect to the
3609   console.
3610
3611   """
3612   _OP_REQP = ["instance_name"]
3613   REQ_BGL = False
3614
3615   def ExpandNames(self):
3616     self._ExpandAndLockInstance()
3617
3618   def CheckPrereq(self):
3619     """Check prerequisites.
3620
3621     This checks that the instance is in the cluster.
3622
3623     """
3624     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3625     assert self.instance is not None, \
3626       "Cannot retrieve locked instance %s" % self.op.instance_name
3627
3628   def Exec(self, feedback_fn):
3629     """Connect to the console of an instance
3630
3631     """
3632     instance = self.instance
3633     node = instance.primary_node
3634
3635     node_insts = rpc.call_instance_list([node])[node]
3636     if node_insts is False:
3637       raise errors.OpExecError("Can't connect to node %s." % node)
3638
3639     if instance.name not in node_insts:
3640       raise errors.OpExecError("Instance %s is not running." % instance.name)
3641
3642     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3643
3644     hyper = hypervisor.GetHypervisor(self.cfg)
3645     console_cmd = hyper.GetShellCommandForConsole(instance)
3646
3647     # build ssh cmdline
3648     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3649
3650
3651 class LUReplaceDisks(LogicalUnit):
3652   """Replace the disks of an instance.
3653
3654   """
3655   HPATH = "mirrors-replace"
3656   HTYPE = constants.HTYPE_INSTANCE
3657   _OP_REQP = ["instance_name", "mode", "disks"]
3658   REQ_BGL = False
3659
3660   def ExpandNames(self):
3661     self._ExpandAndLockInstance()
3662
3663     if not hasattr(self.op, "remote_node"):
3664       self.op.remote_node = None
3665
3666     ia_name = getattr(self.op, "iallocator", None)
3667     if ia_name is not None:
3668       if self.op.remote_node is not None:
3669         raise errors.OpPrereqError("Give either the iallocator or the new"
3670                                    " secondary, not both")
3671       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3672     elif self.op.remote_node is not None:
3673       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3674       if remote_node is None:
3675         raise errors.OpPrereqError("Node '%s' not known" %
3676                                    self.op.remote_node)
3677       self.op.remote_node = remote_node
3678       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3679       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3680     else:
3681       self.needed_locks[locking.LEVEL_NODE] = []
3682       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3683
3684   def DeclareLocks(self, level):
3685     # If we're not already locking all nodes in the set we have to declare the
3686     # instance's primary/secondary nodes.
3687     if (level == locking.LEVEL_NODE and
3688         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3689       self._LockInstancesNodes()
3690
3691   def _RunAllocator(self):
3692     """Compute a new secondary node using an IAllocator.
3693
3694     """
3695     ial = IAllocator(self.cfg,
3696                      mode=constants.IALLOCATOR_MODE_RELOC,
3697                      name=self.op.instance_name,
3698                      relocate_from=[self.sec_node])
3699
3700     ial.Run(self.op.iallocator)
3701
3702     if not ial.success:
3703       raise errors.OpPrereqError("Can't compute nodes using"
3704                                  " iallocator '%s': %s" % (self.op.iallocator,
3705                                                            ial.info))
3706     if len(ial.nodes) != ial.required_nodes:
3707       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3708                                  " of nodes (%s), required %s" %
3709                                  (len(ial.nodes), ial.required_nodes))
3710     self.op.remote_node = ial.nodes[0]
3711     logger.ToStdout("Selected new secondary for the instance: %s" %
3712                     self.op.remote_node)
3713
3714   def BuildHooksEnv(self):
3715     """Build hooks env.
3716
3717     This runs on the master, the primary and all the secondaries.
3718
3719     """
3720     env = {
3721       "MODE": self.op.mode,
3722       "NEW_SECONDARY": self.op.remote_node,
3723       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3724       }
3725     env.update(_BuildInstanceHookEnvByObject(self.instance))
3726     nl = [
3727       self.cfg.GetMasterNode(),
3728       self.instance.primary_node,
3729       ]
3730     if self.op.remote_node is not None:
3731       nl.append(self.op.remote_node)
3732     return env, nl, nl
3733
3734   def CheckPrereq(self):
3735     """Check prerequisites.
3736
3737     This checks that the instance is in the cluster.
3738
3739     """
3740     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3741     assert instance is not None, \
3742       "Cannot retrieve locked instance %s" % self.op.instance_name
3743     self.instance = instance
3744
3745     if instance.disk_template not in constants.DTS_NET_MIRROR:
3746       raise errors.OpPrereqError("Instance's disk layout is not"
3747                                  " network mirrored.")
3748
3749     if len(instance.secondary_nodes) != 1:
3750       raise errors.OpPrereqError("The instance has a strange layout,"
3751                                  " expected one secondary but found %d" %
3752                                  len(instance.secondary_nodes))
3753
3754     self.sec_node = instance.secondary_nodes[0]
3755
3756     ia_name = getattr(self.op, "iallocator", None)
3757     if ia_name is not None:
3758       self._RunAllocator()
3759
3760     remote_node = self.op.remote_node
3761     if remote_node is not None:
3762       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3763       assert self.remote_node_info is not None, \
3764         "Cannot retrieve locked node %s" % remote_node
3765     else:
3766       self.remote_node_info = None
3767     if remote_node == instance.primary_node:
3768       raise errors.OpPrereqError("The specified node is the primary node of"
3769                                  " the instance.")
3770     elif remote_node == self.sec_node:
3771       if self.op.mode == constants.REPLACE_DISK_SEC:
3772         # this is for DRBD8, where we can't execute the same mode of
3773         # replacement as for drbd7 (no different port allocated)
3774         raise errors.OpPrereqError("Same secondary given, cannot execute"
3775                                    " replacement")
3776     if instance.disk_template == constants.DT_DRBD8:
3777       if (self.op.mode == constants.REPLACE_DISK_ALL and
3778           remote_node is not None):
3779         # switch to replace secondary mode
3780         self.op.mode = constants.REPLACE_DISK_SEC
3781
3782       if self.op.mode == constants.REPLACE_DISK_ALL:
3783         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3784                                    " secondary disk replacement, not"
3785                                    " both at once")
3786       elif self.op.mode == constants.REPLACE_DISK_PRI:
3787         if remote_node is not None:
3788           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3789                                      " the secondary while doing a primary"
3790                                      " node disk replacement")
3791         self.tgt_node = instance.primary_node
3792         self.oth_node = instance.secondary_nodes[0]
3793       elif self.op.mode == constants.REPLACE_DISK_SEC:
3794         self.new_node = remote_node # this can be None, in which case
3795                                     # we don't change the secondary
3796         self.tgt_node = instance.secondary_nodes[0]
3797         self.oth_node = instance.primary_node
3798       else:
3799         raise errors.ProgrammerError("Unhandled disk replace mode")
3800
3801     for name in self.op.disks:
3802       if instance.FindDisk(name) is None:
3803         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3804                                    (name, instance.name))
3805
3806   def _ExecD8DiskOnly(self, feedback_fn):
3807     """Replace a disk on the primary or secondary for dbrd8.
3808
3809     The algorithm for replace is quite complicated:
3810       - for each disk to be replaced:
3811         - create new LVs on the target node with unique names
3812         - detach old LVs from the drbd device
3813         - rename old LVs to name_replaced.<time_t>
3814         - rename new LVs to old LVs
3815         - attach the new LVs (with the old names now) to the drbd device
3816       - wait for sync across all devices
3817       - for each modified disk:
3818         - remove old LVs (which have the name name_replaces.<time_t>)
3819
3820     Failures are not very well handled.
3821
3822     """
3823     steps_total = 6
3824     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3825     instance = self.instance
3826     iv_names = {}
3827     vgname = self.cfg.GetVGName()
3828     # start of work
3829     cfg = self.cfg
3830     tgt_node = self.tgt_node
3831     oth_node = self.oth_node
3832
3833     # Step: check device activation
3834     self.proc.LogStep(1, steps_total, "check device existence")
3835     info("checking volume groups")
3836     my_vg = cfg.GetVGName()
3837     results = rpc.call_vg_list([oth_node, tgt_node])
3838     if not results:
3839       raise errors.OpExecError("Can't list volume groups on the nodes")
3840     for node in oth_node, tgt_node:
3841       res = results.get(node, False)
3842       if not res or my_vg not in res:
3843         raise errors.OpExecError("Volume group '%s' not found on %s" %
3844                                  (my_vg, node))
3845     for dev in instance.disks:
3846       if not dev.iv_name in self.op.disks:
3847         continue
3848       for node in tgt_node, oth_node:
3849         info("checking %s on %s" % (dev.iv_name, node))
3850         cfg.SetDiskID(dev, node)
3851         if not rpc.call_blockdev_find(node, dev):
3852           raise errors.OpExecError("Can't find device %s on node %s" %
3853                                    (dev.iv_name, node))
3854
3855     # Step: check other node consistency
3856     self.proc.LogStep(2, steps_total, "check peer consistency")
3857     for dev in instance.disks:
3858       if not dev.iv_name in self.op.disks:
3859         continue
3860       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3861       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3862                                    oth_node==instance.primary_node):
3863         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3864                                  " to replace disks on this node (%s)" %
3865                                  (oth_node, tgt_node))
3866
3867     # Step: create new storage
3868     self.proc.LogStep(3, steps_total, "allocate new storage")
3869     for dev in instance.disks:
3870       if not dev.iv_name in self.op.disks:
3871         continue
3872       size = dev.size
3873       cfg.SetDiskID(dev, tgt_node)
3874       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3875       names = _GenerateUniqueNames(cfg, lv_names)
3876       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3877                              logical_id=(vgname, names[0]))
3878       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3879                              logical_id=(vgname, names[1]))
3880       new_lvs = [lv_data, lv_meta]
3881       old_lvs = dev.children
3882       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3883       info("creating new local storage on %s for %s" %
3884            (tgt_node, dev.iv_name))
3885       # since we *always* want to create this LV, we use the
3886       # _Create...OnPrimary (which forces the creation), even if we
3887       # are talking about the secondary node
3888       for new_lv in new_lvs:
3889         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3890                                         _GetInstanceInfoText(instance)):
3891           raise errors.OpExecError("Failed to create new LV named '%s' on"
3892                                    " node '%s'" %
3893                                    (new_lv.logical_id[1], tgt_node))
3894
3895     # Step: for each lv, detach+rename*2+attach
3896     self.proc.LogStep(4, steps_total, "change drbd configuration")
3897     for dev, old_lvs, new_lvs in iv_names.itervalues():
3898       info("detaching %s drbd from local storage" % dev.iv_name)
3899       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3900         raise errors.OpExecError("Can't detach drbd from local storage on node"
3901                                  " %s for device %s" % (tgt_node, dev.iv_name))
3902       #dev.children = []
3903       #cfg.Update(instance)
3904
3905       # ok, we created the new LVs, so now we know we have the needed
3906       # storage; as such, we proceed on the target node to rename
3907       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3908       # using the assumption that logical_id == physical_id (which in
3909       # turn is the unique_id on that node)
3910
3911       # FIXME(iustin): use a better name for the replaced LVs
3912       temp_suffix = int(time.time())
3913       ren_fn = lambda d, suff: (d.physical_id[0],
3914                                 d.physical_id[1] + "_replaced-%s" % suff)
3915       # build the rename list based on what LVs exist on the node
3916       rlist = []
3917       for to_ren in old_lvs:
3918         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3919         if find_res is not None: # device exists
3920           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3921
3922       info("renaming the old LVs on the target node")
3923       if not rpc.call_blockdev_rename(tgt_node, rlist):
3924         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3925       # now we rename the new LVs to the old LVs
3926       info("renaming the new LVs on the target node")
3927       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3928       if not rpc.call_blockdev_rename(tgt_node, rlist):
3929         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3930
3931       for old, new in zip(old_lvs, new_lvs):
3932         new.logical_id = old.logical_id
3933         cfg.SetDiskID(new, tgt_node)
3934
3935       for disk in old_lvs:
3936         disk.logical_id = ren_fn(disk, temp_suffix)
3937         cfg.SetDiskID(disk, tgt_node)
3938
3939       # now that the new lvs have the old name, we can add them to the device
3940       info("adding new mirror component on %s" % tgt_node)
3941       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3942         for new_lv in new_lvs:
3943           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3944             warning("Can't rollback device %s", hint="manually cleanup unused"
3945                     " logical volumes")
3946         raise errors.OpExecError("Can't add local storage to drbd")
3947
3948       dev.children = new_lvs
3949       cfg.Update(instance)
3950
3951     # Step: wait for sync
3952
3953     # this can fail as the old devices are degraded and _WaitForSync
3954     # does a combined result over all disks, so we don't check its
3955     # return value
3956     self.proc.LogStep(5, steps_total, "sync devices")
3957     _WaitForSync(cfg, instance, self.proc, unlock=True)
3958
3959     # so check manually all the devices
3960     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3961       cfg.SetDiskID(dev, instance.primary_node)
3962       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3963       if is_degr:
3964         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3965
3966     # Step: remove old storage
3967     self.proc.LogStep(6, steps_total, "removing old storage")
3968     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3969       info("remove logical volumes for %s" % name)
3970       for lv in old_lvs:
3971         cfg.SetDiskID(lv, tgt_node)
3972         if not rpc.call_blockdev_remove(tgt_node, lv):
3973           warning("Can't remove old LV", hint="manually remove unused LVs")
3974           continue
3975
3976   def _ExecD8Secondary(self, feedback_fn):
3977     """Replace the secondary node for drbd8.
3978
3979     The algorithm for replace is quite complicated:
3980       - for all disks of the instance:
3981         - create new LVs on the new node with same names
3982         - shutdown the drbd device on the old secondary
3983         - disconnect the drbd network on the primary
3984         - create the drbd device on the new secondary
3985         - network attach the drbd on the primary, using an artifice:
3986           the drbd code for Attach() will connect to the network if it
3987           finds a device which is connected to the good local disks but
3988           not network enabled
3989       - wait for sync across all devices
3990       - remove all disks from the old secondary
3991
3992     Failures are not very well handled.
3993
3994     """
3995     steps_total = 6
3996     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3997     instance = self.instance
3998     iv_names = {}
3999     vgname = self.cfg.GetVGName()
4000     # start of work
4001     cfg = self.cfg
4002     old_node = self.tgt_node
4003     new_node = self.new_node
4004     pri_node = instance.primary_node
4005
4006     # Step: check device activation
4007     self.proc.LogStep(1, steps_total, "check device existence")
4008     info("checking volume groups")
4009     my_vg = cfg.GetVGName()
4010     results = rpc.call_vg_list([pri_node, new_node])
4011     if not results:
4012       raise errors.OpExecError("Can't list volume groups on the nodes")
4013     for node in pri_node, new_node:
4014       res = results.get(node, False)
4015       if not res or my_vg not in res:
4016         raise errors.OpExecError("Volume group '%s' not found on %s" %
4017                                  (my_vg, node))
4018     for dev in instance.disks:
4019       if not dev.iv_name in self.op.disks:
4020         continue
4021       info("checking %s on %s" % (dev.iv_name, pri_node))
4022       cfg.SetDiskID(dev, pri_node)
4023       if not rpc.call_blockdev_find(pri_node, dev):
4024         raise errors.OpExecError("Can't find device %s on node %s" %
4025                                  (dev.iv_name, pri_node))
4026
4027     # Step: check other node consistency
4028     self.proc.LogStep(2, steps_total, "check peer consistency")
4029     for dev in instance.disks:
4030       if not dev.iv_name in self.op.disks:
4031         continue
4032       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4033       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4034         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4035                                  " unsafe to replace the secondary" %
4036                                  pri_node)
4037
4038     # Step: create new storage
4039     self.proc.LogStep(3, steps_total, "allocate new storage")
4040     for dev in instance.disks:
4041       size = dev.size
4042       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4043       # since we *always* want to create this LV, we use the
4044       # _Create...OnPrimary (which forces the creation), even if we
4045       # are talking about the secondary node
4046       for new_lv in dev.children:
4047         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4048                                         _GetInstanceInfoText(instance)):
4049           raise errors.OpExecError("Failed to create new LV named '%s' on"
4050                                    " node '%s'" %
4051                                    (new_lv.logical_id[1], new_node))
4052
4053
4054     # Step 4: dbrd minors and drbd setups changes
4055     # after this, we must manually remove the drbd minors on both the
4056     # error and the success paths
4057     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4058                                    instance.name)
4059     logging.debug("Allocated minors %s" % (minors,))
4060     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4061     for dev, new_minor in zip(instance.disks, minors):
4062       size = dev.size
4063       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4064       # create new devices on new_node
4065       if pri_node == dev.logical_id[0]:
4066         new_logical_id = (pri_node, new_node,
4067                           dev.logical_id[2], dev.logical_id[3], new_minor,
4068                           dev.logical_id[5])
4069       else:
4070         new_logical_id = (new_node, pri_node,
4071                           dev.logical_id[2], new_minor, dev.logical_id[4],
4072                           dev.logical_id[5])
4073       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4074       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4075                     new_logical_id)
4076       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4077                               logical_id=new_logical_id,
4078                               children=dev.children)
4079       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4080                                         new_drbd, False,
4081                                       _GetInstanceInfoText(instance)):
4082         self.cfg.ReleaseDRBDMinors(instance.name)
4083         raise errors.OpExecError("Failed to create new DRBD on"
4084                                  " node '%s'" % new_node)
4085
4086     for dev in instance.disks:
4087       # we have new devices, shutdown the drbd on the old secondary
4088       info("shutting down drbd for %s on old node" % dev.iv_name)
4089       cfg.SetDiskID(dev, old_node)
4090       if not rpc.call_blockdev_shutdown(old_node, dev):
4091         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4092                 hint="Please cleanup this device manually as soon as possible")
4093
4094     info("detaching primary drbds from the network (=> standalone)")
4095     done = 0
4096     for dev in instance.disks:
4097       cfg.SetDiskID(dev, pri_node)
4098       # set the network part of the physical (unique in bdev terms) id
4099       # to None, meaning detach from network
4100       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4101       # and 'find' the device, which will 'fix' it to match the
4102       # standalone state
4103       if rpc.call_blockdev_find(pri_node, dev):
4104         done += 1
4105       else:
4106         warning("Failed to detach drbd %s from network, unusual case" %
4107                 dev.iv_name)
4108
4109     if not done:
4110       # no detaches succeeded (very unlikely)
4111       self.cfg.ReleaseDRBDMinors(instance.name)
4112       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4113
4114     # if we managed to detach at least one, we update all the disks of
4115     # the instance to point to the new secondary
4116     info("updating instance configuration")
4117     for dev, _, new_logical_id in iv_names.itervalues():
4118       dev.logical_id = new_logical_id
4119       cfg.SetDiskID(dev, pri_node)
4120     cfg.Update(instance)
4121     # we can remove now the temp minors as now the new values are
4122     # written to the config file (and therefore stable)
4123     self.cfg.ReleaseDRBDMinors(instance.name)
4124
4125     # and now perform the drbd attach
4126     info("attaching primary drbds to new secondary (standalone => connected)")
4127     failures = []
4128     for dev in instance.disks:
4129       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4130       # since the attach is smart, it's enough to 'find' the device,
4131       # it will automatically activate the network, if the physical_id
4132       # is correct
4133       cfg.SetDiskID(dev, pri_node)
4134       logging.debug("Disk to attach: %s", dev)
4135       if not rpc.call_blockdev_find(pri_node, dev):
4136         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4137                 "please do a gnt-instance info to see the status of disks")
4138
4139     # this can fail as the old devices are degraded and _WaitForSync
4140     # does a combined result over all disks, so we don't check its
4141     # return value
4142     self.proc.LogStep(5, steps_total, "sync devices")
4143     _WaitForSync(cfg, instance, self.proc, unlock=True)
4144
4145     # so check manually all the devices
4146     for name, (dev, old_lvs, _) in iv_names.iteritems():
4147       cfg.SetDiskID(dev, pri_node)
4148       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4149       if is_degr:
4150         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4151
4152     self.proc.LogStep(6, steps_total, "removing old storage")
4153     for name, (dev, old_lvs, _) in iv_names.iteritems():
4154       info("remove logical volumes for %s" % name)
4155       for lv in old_lvs:
4156         cfg.SetDiskID(lv, old_node)
4157         if not rpc.call_blockdev_remove(old_node, lv):
4158           warning("Can't remove LV on old secondary",
4159                   hint="Cleanup stale volumes by hand")
4160
4161   def Exec(self, feedback_fn):
4162     """Execute disk replacement.
4163
4164     This dispatches the disk replacement to the appropriate handler.
4165
4166     """
4167     instance = self.instance
4168
4169     # Activate the instance disks if we're replacing them on a down instance
4170     if instance.status == "down":
4171       _StartInstanceDisks(self.cfg, instance, True)
4172
4173     if instance.disk_template == constants.DT_DRBD8:
4174       if self.op.remote_node is None:
4175         fn = self._ExecD8DiskOnly
4176       else:
4177         fn = self._ExecD8Secondary
4178     else:
4179       raise errors.ProgrammerError("Unhandled disk replacement case")
4180
4181     ret = fn(feedback_fn)
4182
4183     # Deactivate the instance disks if we're replacing them on a down instance
4184     if instance.status == "down":
4185       _SafeShutdownInstanceDisks(instance, self.cfg)
4186
4187     return ret
4188
4189
4190 class LUGrowDisk(LogicalUnit):
4191   """Grow a disk of an instance.
4192
4193   """
4194   HPATH = "disk-grow"
4195   HTYPE = constants.HTYPE_INSTANCE
4196   _OP_REQP = ["instance_name", "disk", "amount"]
4197   REQ_BGL = False
4198
4199   def ExpandNames(self):
4200     self._ExpandAndLockInstance()
4201     self.needed_locks[locking.LEVEL_NODE] = []
4202     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4203
4204   def DeclareLocks(self, level):
4205     if level == locking.LEVEL_NODE:
4206       self._LockInstancesNodes()
4207
4208   def BuildHooksEnv(self):
4209     """Build hooks env.
4210
4211     This runs on the master, the primary and all the secondaries.
4212
4213     """
4214     env = {
4215       "DISK": self.op.disk,
4216       "AMOUNT": self.op.amount,
4217       }
4218     env.update(_BuildInstanceHookEnvByObject(self.instance))
4219     nl = [
4220       self.cfg.GetMasterNode(),
4221       self.instance.primary_node,
4222       ]
4223     return env, nl, nl
4224
4225   def CheckPrereq(self):
4226     """Check prerequisites.
4227
4228     This checks that the instance is in the cluster.
4229
4230     """
4231     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4232     assert instance is not None, \
4233       "Cannot retrieve locked instance %s" % self.op.instance_name
4234
4235     self.instance = instance
4236
4237     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4238       raise errors.OpPrereqError("Instance's disk layout does not support"
4239                                  " growing.")
4240
4241     if instance.FindDisk(self.op.disk) is None:
4242       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4243                                  (self.op.disk, instance.name))
4244
4245     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4246     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4247     for node in nodenames:
4248       info = nodeinfo.get(node, None)
4249       if not info:
4250         raise errors.OpPrereqError("Cannot get current information"
4251                                    " from node '%s'" % node)
4252       vg_free = info.get('vg_free', None)
4253       if not isinstance(vg_free, int):
4254         raise errors.OpPrereqError("Can't compute free disk space on"
4255                                    " node %s" % node)
4256       if self.op.amount > info['vg_free']:
4257         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4258                                    " %d MiB available, %d MiB required" %
4259                                    (node, info['vg_free'], self.op.amount))
4260
4261   def Exec(self, feedback_fn):
4262     """Execute disk grow.
4263
4264     """
4265     instance = self.instance
4266     disk = instance.FindDisk(self.op.disk)
4267     for node in (instance.secondary_nodes + (instance.primary_node,)):
4268       self.cfg.SetDiskID(disk, node)
4269       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4270       if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4271         raise errors.OpExecError("grow request failed to node %s" % node)
4272       elif not result[0]:
4273         raise errors.OpExecError("grow request failed to node %s: %s" %
4274                                  (node, result[1]))
4275     disk.RecordGrow(self.op.amount)
4276     self.cfg.Update(instance)
4277     return
4278
4279
4280 class LUQueryInstanceData(NoHooksLU):
4281   """Query runtime instance data.
4282
4283   """
4284   _OP_REQP = ["instances"]
4285   REQ_BGL = False
4286
4287   def ExpandNames(self):
4288     self.needed_locks = {}
4289     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4290
4291     if not isinstance(self.op.instances, list):
4292       raise errors.OpPrereqError("Invalid argument type 'instances'")
4293
4294     if self.op.instances:
4295       self.wanted_names = []
4296       for name in self.op.instances:
4297         full_name = self.cfg.ExpandInstanceName(name)
4298         if full_name is None:
4299           raise errors.OpPrereqError("Instance '%s' not known" %
4300                                      self.op.instance_name)
4301         self.wanted_names.append(full_name)
4302       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4303     else:
4304       self.wanted_names = None
4305       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4306
4307     self.needed_locks[locking.LEVEL_NODE] = []
4308     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4309
4310   def DeclareLocks(self, level):
4311     if level == locking.LEVEL_NODE:
4312       self._LockInstancesNodes()
4313
4314   def CheckPrereq(self):
4315     """Check prerequisites.
4316
4317     This only checks the optional instance list against the existing names.
4318
4319     """
4320     if self.wanted_names is None:
4321       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4322
4323     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4324                              in self.wanted_names]
4325     return
4326
4327   def _ComputeDiskStatus(self, instance, snode, dev):
4328     """Compute block device status.
4329
4330     """
4331     self.cfg.SetDiskID(dev, instance.primary_node)
4332     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4333     if dev.dev_type in constants.LDS_DRBD:
4334       # we change the snode then (otherwise we use the one passed in)
4335       if dev.logical_id[0] == instance.primary_node:
4336         snode = dev.logical_id[1]
4337       else:
4338         snode = dev.logical_id[0]
4339
4340     if snode:
4341       self.cfg.SetDiskID(dev, snode)
4342       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4343     else:
4344       dev_sstatus = None
4345
4346     if dev.children:
4347       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4348                       for child in dev.children]
4349     else:
4350       dev_children = []
4351
4352     data = {
4353       "iv_name": dev.iv_name,
4354       "dev_type": dev.dev_type,
4355       "logical_id": dev.logical_id,
4356       "physical_id": dev.physical_id,
4357       "pstatus": dev_pstatus,
4358       "sstatus": dev_sstatus,
4359       "children": dev_children,
4360       }
4361
4362     return data
4363
4364   def Exec(self, feedback_fn):
4365     """Gather and return data"""
4366     result = {}
4367     for instance in self.wanted_instances:
4368       remote_info = rpc.call_instance_info(instance.primary_node,
4369                                                 instance.name)
4370       if remote_info and "state" in remote_info:
4371         remote_state = "up"
4372       else:
4373         remote_state = "down"
4374       if instance.status == "down":
4375         config_state = "down"
4376       else:
4377         config_state = "up"
4378
4379       disks = [self._ComputeDiskStatus(instance, None, device)
4380                for device in instance.disks]
4381
4382       idict = {
4383         "name": instance.name,
4384         "config_state": config_state,
4385         "run_state": remote_state,
4386         "pnode": instance.primary_node,
4387         "snodes": instance.secondary_nodes,
4388         "os": instance.os,
4389         "memory": instance.memory,
4390         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4391         "disks": disks,
4392         "vcpus": instance.vcpus,
4393         }
4394
4395       htkind = self.cfg.GetHypervisorType()
4396       if htkind == constants.HT_XEN_PVM30:
4397         idict["kernel_path"] = instance.kernel_path
4398         idict["initrd_path"] = instance.initrd_path
4399
4400       if htkind == constants.HT_XEN_HVM31:
4401         idict["hvm_boot_order"] = instance.hvm_boot_order
4402         idict["hvm_acpi"] = instance.hvm_acpi
4403         idict["hvm_pae"] = instance.hvm_pae
4404         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4405         idict["hvm_nic_type"] = instance.hvm_nic_type
4406         idict["hvm_disk_type"] = instance.hvm_disk_type
4407
4408       if htkind in constants.HTS_REQ_PORT:
4409         if instance.vnc_bind_address is None:
4410           vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4411         else:
4412           vnc_bind_address = instance.vnc_bind_address
4413         if instance.network_port is None:
4414           vnc_console_port = None
4415         elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4416           vnc_console_port = "%s:%s" % (instance.primary_node,
4417                                        instance.network_port)
4418         elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4419           vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4420                                                    instance.network_port,
4421                                                    instance.primary_node)
4422         else:
4423           vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4424                                         instance.network_port)
4425         idict["vnc_console_port"] = vnc_console_port
4426         idict["vnc_bind_address"] = vnc_bind_address
4427         idict["network_port"] = instance.network_port
4428
4429       result[instance.name] = idict
4430
4431     return result
4432
4433
4434 class LUSetInstanceParams(LogicalUnit):
4435   """Modifies an instances's parameters.
4436
4437   """
4438   HPATH = "instance-modify"
4439   HTYPE = constants.HTYPE_INSTANCE
4440   _OP_REQP = ["instance_name"]
4441   REQ_BGL = False
4442
4443   def ExpandNames(self):
4444     self._ExpandAndLockInstance()
4445
4446   def BuildHooksEnv(self):
4447     """Build hooks env.
4448
4449     This runs on the master, primary and secondaries.
4450
4451     """
4452     args = dict()
4453     if self.mem:
4454       args['memory'] = self.mem
4455     if self.vcpus:
4456       args['vcpus'] = self.vcpus
4457     if self.do_ip or self.do_bridge or self.mac:
4458       if self.do_ip:
4459         ip = self.ip
4460       else:
4461         ip = self.instance.nics[0].ip
4462       if self.bridge:
4463         bridge = self.bridge
4464       else:
4465         bridge = self.instance.nics[0].bridge
4466       if self.mac:
4467         mac = self.mac
4468       else:
4469         mac = self.instance.nics[0].mac
4470       args['nics'] = [(ip, bridge, mac)]
4471     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4472     nl = [self.cfg.GetMasterNode(),
4473           self.instance.primary_node] + list(self.instance.secondary_nodes)
4474     return env, nl, nl
4475
4476   def CheckPrereq(self):
4477     """Check prerequisites.
4478
4479     This only checks the instance list against the existing names.
4480
4481     """
4482     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4483     # a separate CheckArguments function, if we implement one, so the operation
4484     # can be aborted without waiting for any lock, should it have an error...
4485     self.mem = getattr(self.op, "mem", None)
4486     self.vcpus = getattr(self.op, "vcpus", None)
4487     self.ip = getattr(self.op, "ip", None)
4488     self.mac = getattr(self.op, "mac", None)
4489     self.bridge = getattr(self.op, "bridge", None)
4490     self.kernel_path = getattr(self.op, "kernel_path", None)
4491     self.initrd_path = getattr(self.op, "initrd_path", None)
4492     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4493     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4494     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4495     self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4496     self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4497     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4498     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4499     self.force = getattr(self.op, "force", None)
4500     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4501                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4502                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4503                  self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4504     if all_parms.count(None) == len(all_parms):
4505       raise errors.OpPrereqError("No changes submitted")
4506     if self.mem is not None:
4507       try:
4508         self.mem = int(self.mem)
4509       except ValueError, err:
4510         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4511     if self.vcpus is not None:
4512       try:
4513         self.vcpus = int(self.vcpus)
4514       except ValueError, err:
4515         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4516     if self.ip is not None:
4517       self.do_ip = True
4518       if self.ip.lower() == "none":
4519         self.ip = None
4520       else:
4521         if not utils.IsValidIP(self.ip):
4522           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4523     else:
4524       self.do_ip = False
4525     self.do_bridge = (self.bridge is not None)
4526     if self.mac is not None:
4527       if self.cfg.IsMacInUse(self.mac):
4528         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4529                                    self.mac)
4530       if not utils.IsValidMac(self.mac):
4531         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4532
4533     if self.kernel_path is not None:
4534       self.do_kernel_path = True
4535       if self.kernel_path == constants.VALUE_NONE:
4536         raise errors.OpPrereqError("Can't set instance to no kernel")
4537
4538       if self.kernel_path != constants.VALUE_DEFAULT:
4539         if not os.path.isabs(self.kernel_path):
4540           raise errors.OpPrereqError("The kernel path must be an absolute"
4541                                     " filename")
4542     else:
4543       self.do_kernel_path = False
4544
4545     if self.initrd_path is not None:
4546       self.do_initrd_path = True
4547       if self.initrd_path not in (constants.VALUE_NONE,
4548                                   constants.VALUE_DEFAULT):
4549         if not os.path.isabs(self.initrd_path):
4550           raise errors.OpPrereqError("The initrd path must be an absolute"
4551                                     " filename")
4552     else:
4553       self.do_initrd_path = False
4554
4555     # boot order verification
4556     if self.hvm_boot_order is not None:
4557       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4558         if len(self.hvm_boot_order.strip("acdn")) != 0:
4559           raise errors.OpPrereqError("invalid boot order specified,"
4560                                      " must be one or more of [acdn]"
4561                                      " or 'default'")
4562
4563     # hvm_cdrom_image_path verification
4564     if self.op.hvm_cdrom_image_path is not None:
4565       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4566               self.op.hvm_cdrom_image_path.lower() == "none"):
4567         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4568                                    " be an absolute path or None, not %s" %
4569                                    self.op.hvm_cdrom_image_path)
4570       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4571               self.op.hvm_cdrom_image_path.lower() == "none"):
4572         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4573                                    " regular file or a symlink pointing to"
4574                                    " an existing regular file, not %s" %
4575                                    self.op.hvm_cdrom_image_path)
4576
4577     # vnc_bind_address verification
4578     if self.op.vnc_bind_address is not None:
4579       if not utils.IsValidIP(self.op.vnc_bind_address):
4580         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4581                                    " like a valid IP address" %
4582                                    self.op.vnc_bind_address)
4583
4584     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4585     assert self.instance is not None, \
4586       "Cannot retrieve locked instance %s" % self.op.instance_name
4587     self.warn = []
4588     if self.mem is not None and not self.force:
4589       pnode = self.instance.primary_node
4590       nodelist = [pnode]
4591       nodelist.extend(instance.secondary_nodes)
4592       instance_info = rpc.call_instance_info(pnode, instance.name)
4593       nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4594
4595       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4596         # Assume the primary node is unreachable and go ahead
4597         self.warn.append("Can't get info from primary node %s" % pnode)
4598       else:
4599         if instance_info:
4600           current_mem = instance_info['memory']
4601         else:
4602           # Assume instance not running
4603           # (there is a slight race condition here, but it's not very probable,
4604           # and we have no other way to check)
4605           current_mem = 0
4606         miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4607         if miss_mem > 0:
4608           raise errors.OpPrereqError("This change will prevent the instance"
4609                                      " from starting, due to %d MB of memory"
4610                                      " missing on its primary node" % miss_mem)
4611
4612       for node in instance.secondary_nodes:
4613         if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4614           self.warn.append("Can't get info from secondary node %s" % node)
4615         elif self.mem > nodeinfo[node]['memory_free']:
4616           self.warn.append("Not enough memory to failover instance to secondary"
4617                            " node %s" % node)
4618
4619     # Xen HVM device type checks
4620     if self.cfg.GetHypervisorType() == constants.HT_XEN_HVM31:
4621       if self.op.hvm_nic_type is not None:
4622         if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4623           raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4624                                      " HVM  hypervisor" % self.op.hvm_nic_type)
4625       if self.op.hvm_disk_type is not None:
4626         if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4627           raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4628                                      " HVM hypervisor" % self.op.hvm_disk_type)
4629
4630     return
4631
4632   def Exec(self, feedback_fn):
4633     """Modifies an instance.
4634
4635     All parameters take effect only at the next restart of the instance.
4636     """
4637     # Process here the warnings from CheckPrereq, as we don't have a
4638     # feedback_fn there.
4639     for warn in self.warn:
4640       feedback_fn("WARNING: %s" % warn)
4641
4642     result = []
4643     instance = self.instance
4644     if self.mem:
4645       instance.memory = self.mem
4646       result.append(("mem", self.mem))
4647     if self.vcpus:
4648       instance.vcpus = self.vcpus
4649       result.append(("vcpus",  self.vcpus))
4650     if self.do_ip:
4651       instance.nics[0].ip = self.ip
4652       result.append(("ip", self.ip))
4653     if self.bridge:
4654       instance.nics[0].bridge = self.bridge
4655       result.append(("bridge", self.bridge))
4656     if self.mac:
4657       instance.nics[0].mac = self.mac
4658       result.append(("mac", self.mac))
4659     if self.do_kernel_path:
4660       instance.kernel_path = self.kernel_path
4661       result.append(("kernel_path", self.kernel_path))
4662     if self.do_initrd_path:
4663       instance.initrd_path = self.initrd_path
4664       result.append(("initrd_path", self.initrd_path))
4665     if self.hvm_boot_order:
4666       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4667         instance.hvm_boot_order = None
4668       else:
4669         instance.hvm_boot_order = self.hvm_boot_order
4670       result.append(("hvm_boot_order", self.hvm_boot_order))
4671     if self.hvm_acpi is not None:
4672       instance.hvm_acpi = self.hvm_acpi
4673       result.append(("hvm_acpi", self.hvm_acpi))
4674     if self.hvm_pae is not None:
4675       instance.hvm_pae = self.hvm_pae
4676       result.append(("hvm_pae", self.hvm_pae))
4677     if self.hvm_nic_type is not None:
4678       instance.hvm_nic_type = self.hvm_nic_type
4679       result.append(("hvm_nic_type", self.hvm_nic_type))
4680     if self.hvm_disk_type is not None:
4681       instance.hvm_disk_type = self.hvm_disk_type
4682       result.append(("hvm_disk_type", self.hvm_disk_type))
4683     if self.hvm_cdrom_image_path:
4684       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4685         instance.hvm_cdrom_image_path = None
4686       else:
4687         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4688       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4689     if self.vnc_bind_address:
4690       instance.vnc_bind_address = self.vnc_bind_address
4691       result.append(("vnc_bind_address", self.vnc_bind_address))
4692
4693     self.cfg.Update(instance)
4694
4695     return result
4696
4697
4698 class LUQueryExports(NoHooksLU):
4699   """Query the exports list
4700
4701   """
4702   _OP_REQP = ['nodes']
4703   REQ_BGL = False
4704
4705   def ExpandNames(self):
4706     self.needed_locks = {}
4707     self.share_locks[locking.LEVEL_NODE] = 1
4708     if not self.op.nodes:
4709       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4710     else:
4711       self.needed_locks[locking.LEVEL_NODE] = \
4712         _GetWantedNodes(self, self.op.nodes)
4713
4714   def CheckPrereq(self):
4715     """Check prerequisites.
4716
4717     """
4718     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4719
4720   def Exec(self, feedback_fn):
4721     """Compute the list of all the exported system images.
4722
4723     Returns:
4724       a dictionary with the structure node->(export-list)
4725       where export-list is a list of the instances exported on
4726       that node.
4727
4728     """
4729     return rpc.call_export_list(self.nodes)
4730
4731
4732 class LUExportInstance(LogicalUnit):
4733   """Export an instance to an image in the cluster.
4734
4735   """
4736   HPATH = "instance-export"
4737   HTYPE = constants.HTYPE_INSTANCE
4738   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4739   REQ_BGL = False
4740
4741   def ExpandNames(self):
4742     self._ExpandAndLockInstance()
4743     # FIXME: lock only instance primary and destination node
4744     #
4745     # Sad but true, for now we have do lock all nodes, as we don't know where
4746     # the previous export might be, and and in this LU we search for it and
4747     # remove it from its current node. In the future we could fix this by:
4748     #  - making a tasklet to search (share-lock all), then create the new one,
4749     #    then one to remove, after
4750     #  - removing the removal operation altoghether
4751     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4752
4753   def DeclareLocks(self, level):
4754     """Last minute lock declaration."""
4755     # All nodes are locked anyway, so nothing to do here.
4756
4757   def BuildHooksEnv(self):
4758     """Build hooks env.
4759
4760     This will run on the master, primary node and target node.
4761
4762     """
4763     env = {
4764       "EXPORT_NODE": self.op.target_node,
4765       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4766       }
4767     env.update(_BuildInstanceHookEnvByObject(self.instance))
4768     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4769           self.op.target_node]
4770     return env, nl, nl
4771
4772   def CheckPrereq(self):
4773     """Check prerequisites.
4774
4775     This checks that the instance and node names are valid.
4776
4777     """
4778     instance_name = self.op.instance_name
4779     self.instance = self.cfg.GetInstanceInfo(instance_name)
4780     assert self.instance is not None, \
4781           "Cannot retrieve locked instance %s" % self.op.instance_name
4782
4783     self.dst_node = self.cfg.GetNodeInfo(
4784       self.cfg.ExpandNodeName(self.op.target_node))
4785
4786     assert self.dst_node is not None, \
4787           "Cannot retrieve locked node %s" % self.op.target_node
4788
4789     # instance disk type verification
4790     for disk in self.instance.disks:
4791       if disk.dev_type == constants.LD_FILE:
4792         raise errors.OpPrereqError("Export not supported for instances with"
4793                                    " file-based disks")
4794
4795   def Exec(self, feedback_fn):
4796     """Export an instance to an image in the cluster.
4797
4798     """
4799     instance = self.instance
4800     dst_node = self.dst_node
4801     src_node = instance.primary_node
4802     if self.op.shutdown:
4803       # shutdown the instance, but not the disks
4804       if not rpc.call_instance_shutdown(src_node, instance):
4805         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4806                                  (instance.name, src_node))
4807
4808     vgname = self.cfg.GetVGName()
4809
4810     snap_disks = []
4811
4812     try:
4813       for disk in instance.disks:
4814         if disk.iv_name == "sda":
4815           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4816           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4817
4818           if not new_dev_name:
4819             logger.Error("could not snapshot block device %s on node %s" %
4820                          (disk.logical_id[1], src_node))
4821           else:
4822             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4823                                       logical_id=(vgname, new_dev_name),
4824                                       physical_id=(vgname, new_dev_name),
4825                                       iv_name=disk.iv_name)
4826             snap_disks.append(new_dev)
4827
4828     finally:
4829       if self.op.shutdown and instance.status == "up":
4830         if not rpc.call_instance_start(src_node, instance, None):
4831           _ShutdownInstanceDisks(instance, self.cfg)
4832           raise errors.OpExecError("Could not start instance")
4833
4834     # TODO: check for size
4835
4836     cluster_name = self.cfg.GetClusterName()
4837     for dev in snap_disks:
4838       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4839                                       instance, cluster_name):
4840         logger.Error("could not export block device %s from node %s to node %s"
4841                      % (dev.logical_id[1], src_node, dst_node.name))
4842       if not rpc.call_blockdev_remove(src_node, dev):
4843         logger.Error("could not remove snapshot block device %s from node %s" %
4844                      (dev.logical_id[1], src_node))
4845
4846     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4847       logger.Error("could not finalize export for instance %s on node %s" %
4848                    (instance.name, dst_node.name))
4849
4850     nodelist = self.cfg.GetNodeList()
4851     nodelist.remove(dst_node.name)
4852
4853     # on one-node clusters nodelist will be empty after the removal
4854     # if we proceed the backup would be removed because OpQueryExports
4855     # substitutes an empty list with the full cluster node list.
4856     if nodelist:
4857       exportlist = rpc.call_export_list(nodelist)
4858       for node in exportlist:
4859         if instance.name in exportlist[node]:
4860           if not rpc.call_export_remove(node, instance.name):
4861             logger.Error("could not remove older export for instance %s"
4862                          " on node %s" % (instance.name, node))
4863
4864
4865 class LURemoveExport(NoHooksLU):
4866   """Remove exports related to the named instance.
4867
4868   """
4869   _OP_REQP = ["instance_name"]
4870   REQ_BGL = False
4871
4872   def ExpandNames(self):
4873     self.needed_locks = {}
4874     # We need all nodes to be locked in order for RemoveExport to work, but we
4875     # don't need to lock the instance itself, as nothing will happen to it (and
4876     # we can remove exports also for a removed instance)
4877     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4878
4879   def CheckPrereq(self):
4880     """Check prerequisites.
4881     """
4882     pass
4883
4884   def Exec(self, feedback_fn):
4885     """Remove any export.
4886
4887     """
4888     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4889     # If the instance was not found we'll try with the name that was passed in.
4890     # This will only work if it was an FQDN, though.
4891     fqdn_warn = False
4892     if not instance_name:
4893       fqdn_warn = True
4894       instance_name = self.op.instance_name
4895
4896     exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4897     found = False
4898     for node in exportlist:
4899       if instance_name in exportlist[node]:
4900         found = True
4901         if not rpc.call_export_remove(node, instance_name):
4902           logger.Error("could not remove export for instance %s"
4903                        " on node %s" % (instance_name, node))
4904
4905     if fqdn_warn and not found:
4906       feedback_fn("Export not found. If trying to remove an export belonging"
4907                   " to a deleted instance please use its Fully Qualified"
4908                   " Domain Name.")
4909
4910
4911 class TagsLU(NoHooksLU):
4912   """Generic tags LU.
4913
4914   This is an abstract class which is the parent of all the other tags LUs.
4915
4916   """
4917
4918   def ExpandNames(self):
4919     self.needed_locks = {}
4920     if self.op.kind == constants.TAG_NODE:
4921       name = self.cfg.ExpandNodeName(self.op.name)
4922       if name is None:
4923         raise errors.OpPrereqError("Invalid node name (%s)" %
4924                                    (self.op.name,))
4925       self.op.name = name
4926       self.needed_locks[locking.LEVEL_NODE] = name
4927     elif self.op.kind == constants.TAG_INSTANCE:
4928       name = self.cfg.ExpandInstanceName(self.op.name)
4929       if name is None:
4930         raise errors.OpPrereqError("Invalid instance name (%s)" %
4931                                    (self.op.name,))
4932       self.op.name = name
4933       self.needed_locks[locking.LEVEL_INSTANCE] = name
4934
4935   def CheckPrereq(self):
4936     """Check prerequisites.
4937
4938     """
4939     if self.op.kind == constants.TAG_CLUSTER:
4940       self.target = self.cfg.GetClusterInfo()
4941     elif self.op.kind == constants.TAG_NODE:
4942       self.target = self.cfg.GetNodeInfo(self.op.name)
4943     elif self.op.kind == constants.TAG_INSTANCE:
4944       self.target = self.cfg.GetInstanceInfo(self.op.name)
4945     else:
4946       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4947                                  str(self.op.kind))
4948
4949
4950 class LUGetTags(TagsLU):
4951   """Returns the tags of a given object.
4952
4953   """
4954   _OP_REQP = ["kind", "name"]
4955   REQ_BGL = False
4956
4957   def Exec(self, feedback_fn):
4958     """Returns the tag list.
4959
4960     """
4961     return list(self.target.GetTags())
4962
4963
4964 class LUSearchTags(NoHooksLU):
4965   """Searches the tags for a given pattern.
4966
4967   """
4968   _OP_REQP = ["pattern"]
4969   REQ_BGL = False
4970
4971   def ExpandNames(self):
4972     self.needed_locks = {}
4973
4974   def CheckPrereq(self):
4975     """Check prerequisites.
4976
4977     This checks the pattern passed for validity by compiling it.
4978
4979     """
4980     try:
4981       self.re = re.compile(self.op.pattern)
4982     except re.error, err:
4983       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4984                                  (self.op.pattern, err))
4985
4986   def Exec(self, feedback_fn):
4987     """Returns the tag list.
4988
4989     """
4990     cfg = self.cfg
4991     tgts = [("/cluster", cfg.GetClusterInfo())]
4992     ilist = cfg.GetAllInstancesInfo().values()
4993     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4994     nlist = cfg.GetAllNodesInfo().values()
4995     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4996     results = []
4997     for path, target in tgts:
4998       for tag in target.GetTags():
4999         if self.re.search(tag):
5000           results.append((path, tag))
5001     return results
5002
5003
5004 class LUAddTags(TagsLU):
5005   """Sets a tag on a given object.
5006
5007   """
5008   _OP_REQP = ["kind", "name", "tags"]
5009   REQ_BGL = False
5010
5011   def CheckPrereq(self):
5012     """Check prerequisites.
5013
5014     This checks the type and length of the tag name and value.
5015
5016     """
5017     TagsLU.CheckPrereq(self)
5018     for tag in self.op.tags:
5019       objects.TaggableObject.ValidateTag(tag)
5020
5021   def Exec(self, feedback_fn):
5022     """Sets the tag.
5023
5024     """
5025     try:
5026       for tag in self.op.tags:
5027         self.target.AddTag(tag)
5028     except errors.TagError, err:
5029       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5030     try:
5031       self.cfg.Update(self.target)
5032     except errors.ConfigurationError:
5033       raise errors.OpRetryError("There has been a modification to the"
5034                                 " config file and the operation has been"
5035                                 " aborted. Please retry.")
5036
5037
5038 class LUDelTags(TagsLU):
5039   """Delete a list of tags from a given object.
5040
5041   """
5042   _OP_REQP = ["kind", "name", "tags"]
5043   REQ_BGL = False
5044
5045   def CheckPrereq(self):
5046     """Check prerequisites.
5047
5048     This checks that we have the given tag.
5049
5050     """
5051     TagsLU.CheckPrereq(self)
5052     for tag in self.op.tags:
5053       objects.TaggableObject.ValidateTag(tag)
5054     del_tags = frozenset(self.op.tags)
5055     cur_tags = self.target.GetTags()
5056     if not del_tags <= cur_tags:
5057       diff_tags = del_tags - cur_tags
5058       diff_names = ["'%s'" % tag for tag in diff_tags]
5059       diff_names.sort()
5060       raise errors.OpPrereqError("Tag(s) %s not found" %
5061                                  (",".join(diff_names)))
5062
5063   def Exec(self, feedback_fn):
5064     """Remove the tag from the object.
5065
5066     """
5067     for tag in self.op.tags:
5068       self.target.RemoveTag(tag)
5069     try:
5070       self.cfg.Update(self.target)
5071     except errors.ConfigurationError:
5072       raise errors.OpRetryError("There has been a modification to the"
5073                                 " config file and the operation has been"
5074                                 " aborted. Please retry.")
5075
5076
5077 class LUTestDelay(NoHooksLU):
5078   """Sleep for a specified amount of time.
5079
5080   This LU sleeps on the master and/or nodes for a specified amount of
5081   time.
5082
5083   """
5084   _OP_REQP = ["duration", "on_master", "on_nodes"]
5085   REQ_BGL = False
5086
5087   def ExpandNames(self):
5088     """Expand names and set required locks.
5089
5090     This expands the node list, if any.
5091
5092     """
5093     self.needed_locks = {}
5094     if self.op.on_nodes:
5095       # _GetWantedNodes can be used here, but is not always appropriate to use
5096       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5097       # more information.
5098       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5099       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5100
5101   def CheckPrereq(self):
5102     """Check prerequisites.
5103
5104     """
5105
5106   def Exec(self, feedback_fn):
5107     """Do the actual sleep.
5108
5109     """
5110     if self.op.on_master:
5111       if not utils.TestDelay(self.op.duration):
5112         raise errors.OpExecError("Error during master delay test")
5113     if self.op.on_nodes:
5114       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5115       if not result:
5116         raise errors.OpExecError("Complete failure from rpc call")
5117       for node, node_result in result.items():
5118         if not node_result:
5119           raise errors.OpExecError("Failure during rpc call to node %s,"
5120                                    " result: %s" % (node, node_result))
5121
5122
5123 class IAllocator(object):
5124   """IAllocator framework.
5125
5126   An IAllocator instance has three sets of attributes:
5127     - cfg that is needed to query the cluster
5128     - input data (all members of the _KEYS class attribute are required)
5129     - four buffer attributes (in|out_data|text), that represent the
5130       input (to the external script) in text and data structure format,
5131       and the output from it, again in two formats
5132     - the result variables from the script (success, info, nodes) for
5133       easy usage
5134
5135   """
5136   _ALLO_KEYS = [
5137     "mem_size", "disks", "disk_template",
5138     "os", "tags", "nics", "vcpus",
5139     ]
5140   _RELO_KEYS = [
5141     "relocate_from",
5142     ]
5143
5144   def __init__(self, cfg, mode, name, **kwargs):
5145     self.cfg = cfg
5146     # init buffer variables
5147     self.in_text = self.out_text = self.in_data = self.out_data = None
5148     # init all input fields so that pylint is happy
5149     self.mode = mode
5150     self.name = name
5151     self.mem_size = self.disks = self.disk_template = None
5152     self.os = self.tags = self.nics = self.vcpus = None
5153     self.relocate_from = None
5154     # computed fields
5155     self.required_nodes = None
5156     # init result fields
5157     self.success = self.info = self.nodes = None
5158     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5159       keyset = self._ALLO_KEYS
5160     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5161       keyset = self._RELO_KEYS
5162     else:
5163       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5164                                    " IAllocator" % self.mode)
5165     for key in kwargs:
5166       if key not in keyset:
5167         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5168                                      " IAllocator" % key)
5169       setattr(self, key, kwargs[key])
5170     for key in keyset:
5171       if key not in kwargs:
5172         raise errors.ProgrammerError("Missing input parameter '%s' to"
5173                                      " IAllocator" % key)
5174     self._BuildInputData()
5175
5176   def _ComputeClusterData(self):
5177     """Compute the generic allocator input data.
5178
5179     This is the data that is independent of the actual operation.
5180
5181     """
5182     cfg = self.cfg
5183     # cluster data
5184     data = {
5185       "version": 1,
5186       "cluster_name": self.cfg.GetClusterName(),
5187       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5188       "hypervisor_type": self.cfg.GetHypervisorType(),
5189       # we don't have job IDs
5190       }
5191
5192     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5193
5194     # node data
5195     node_results = {}
5196     node_list = cfg.GetNodeList()
5197     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5198     for nname in node_list:
5199       ninfo = cfg.GetNodeInfo(nname)
5200       if nname not in node_data or not isinstance(node_data[nname], dict):
5201         raise errors.OpExecError("Can't get data for node %s" % nname)
5202       remote_info = node_data[nname]
5203       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5204                    'vg_size', 'vg_free', 'cpu_total']:
5205         if attr not in remote_info:
5206           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5207                                    (nname, attr))
5208         try:
5209           remote_info[attr] = int(remote_info[attr])
5210         except ValueError, err:
5211           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5212                                    " %s" % (nname, attr, str(err)))
5213       # compute memory used by primary instances
5214       i_p_mem = i_p_up_mem = 0
5215       for iinfo in i_list:
5216         if iinfo.primary_node == nname:
5217           i_p_mem += iinfo.memory
5218           if iinfo.status == "up":
5219             i_p_up_mem += iinfo.memory
5220
5221       # compute memory used by instances
5222       pnr = {
5223         "tags": list(ninfo.GetTags()),
5224         "total_memory": remote_info['memory_total'],
5225         "reserved_memory": remote_info['memory_dom0'],
5226         "free_memory": remote_info['memory_free'],
5227         "i_pri_memory": i_p_mem,
5228         "i_pri_up_memory": i_p_up_mem,
5229         "total_disk": remote_info['vg_size'],
5230         "free_disk": remote_info['vg_free'],
5231         "primary_ip": ninfo.primary_ip,
5232         "secondary_ip": ninfo.secondary_ip,
5233         "total_cpus": remote_info['cpu_total'],
5234         }
5235       node_results[nname] = pnr
5236     data["nodes"] = node_results
5237
5238     # instance data
5239     instance_data = {}
5240     for iinfo in i_list:
5241       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5242                   for n in iinfo.nics]
5243       pir = {
5244         "tags": list(iinfo.GetTags()),
5245         "should_run": iinfo.status == "up",
5246         "vcpus": iinfo.vcpus,
5247         "memory": iinfo.memory,
5248         "os": iinfo.os,
5249         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5250         "nics": nic_data,
5251         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5252         "disk_template": iinfo.disk_template,
5253         }
5254       instance_data[iinfo.name] = pir
5255
5256     data["instances"] = instance_data
5257
5258     self.in_data = data
5259
5260   def _AddNewInstance(self):
5261     """Add new instance data to allocator structure.
5262
5263     This in combination with _AllocatorGetClusterData will create the
5264     correct structure needed as input for the allocator.
5265
5266     The checks for the completeness of the opcode must have already been
5267     done.
5268
5269     """
5270     data = self.in_data
5271     if len(self.disks) != 2:
5272       raise errors.OpExecError("Only two-disk configurations supported")
5273
5274     disk_space = _ComputeDiskSize(self.disk_template,
5275                                   self.disks[0]["size"], self.disks[1]["size"])
5276
5277     if self.disk_template in constants.DTS_NET_MIRROR:
5278       self.required_nodes = 2
5279     else:
5280       self.required_nodes = 1
5281     request = {
5282       "type": "allocate",
5283       "name": self.name,
5284       "disk_template": self.disk_template,
5285       "tags": self.tags,
5286       "os": self.os,
5287       "vcpus": self.vcpus,
5288       "memory": self.mem_size,
5289       "disks": self.disks,
5290       "disk_space_total": disk_space,
5291       "nics": self.nics,
5292       "required_nodes": self.required_nodes,
5293       }
5294     data["request"] = request
5295
5296   def _AddRelocateInstance(self):
5297     """Add relocate instance data to allocator structure.
5298
5299     This in combination with _IAllocatorGetClusterData will create the
5300     correct structure needed as input for the allocator.
5301
5302     The checks for the completeness of the opcode must have already been
5303     done.
5304
5305     """
5306     instance = self.cfg.GetInstanceInfo(self.name)
5307     if instance is None:
5308       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5309                                    " IAllocator" % self.name)
5310
5311     if instance.disk_template not in constants.DTS_NET_MIRROR:
5312       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5313
5314     if len(instance.secondary_nodes) != 1:
5315       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5316
5317     self.required_nodes = 1
5318
5319     disk_space = _ComputeDiskSize(instance.disk_template,
5320                                   instance.disks[0].size,
5321                                   instance.disks[1].size)
5322
5323     request = {
5324       "type": "relocate",
5325       "name": self.name,
5326       "disk_space_total": disk_space,
5327       "required_nodes": self.required_nodes,
5328       "relocate_from": self.relocate_from,
5329       }
5330     self.in_data["request"] = request
5331
5332   def _BuildInputData(self):
5333     """Build input data structures.
5334
5335     """
5336     self._ComputeClusterData()
5337
5338     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5339       self._AddNewInstance()
5340     else:
5341       self._AddRelocateInstance()
5342
5343     self.in_text = serializer.Dump(self.in_data)
5344
5345   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5346     """Run an instance allocator and return the results.
5347
5348     """
5349     data = self.in_text
5350
5351     result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
5352
5353     if not isinstance(result, (list, tuple)) or len(result) != 4:
5354       raise errors.OpExecError("Invalid result from master iallocator runner")
5355
5356     rcode, stdout, stderr, fail = result
5357
5358     if rcode == constants.IARUN_NOTFOUND:
5359       raise errors.OpExecError("Can't find allocator '%s'" % name)
5360     elif rcode == constants.IARUN_FAILURE:
5361       raise errors.OpExecError("Instance allocator call failed: %s,"
5362                                " output: %s" % (fail, stdout+stderr))
5363     self.out_text = stdout
5364     if validate:
5365       self._ValidateResult()
5366
5367   def _ValidateResult(self):
5368     """Process the allocator results.
5369
5370     This will process and if successful save the result in
5371     self.out_data and the other parameters.
5372
5373     """
5374     try:
5375       rdict = serializer.Load(self.out_text)
5376     except Exception, err:
5377       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5378
5379     if not isinstance(rdict, dict):
5380       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5381
5382     for key in "success", "info", "nodes":
5383       if key not in rdict:
5384         raise errors.OpExecError("Can't parse iallocator results:"
5385                                  " missing key '%s'" % key)
5386       setattr(self, key, rdict[key])
5387
5388     if not isinstance(rdict["nodes"], list):
5389       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5390                                " is not a list")
5391     self.out_data = rdict
5392
5393
5394 class LUTestAllocator(NoHooksLU):
5395   """Run allocator tests.
5396
5397   This LU runs the allocator tests
5398
5399   """
5400   _OP_REQP = ["direction", "mode", "name"]
5401
5402   def CheckPrereq(self):
5403     """Check prerequisites.
5404
5405     This checks the opcode parameters depending on the director and mode test.
5406
5407     """
5408     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5409       for attr in ["name", "mem_size", "disks", "disk_template",
5410                    "os", "tags", "nics", "vcpus"]:
5411         if not hasattr(self.op, attr):
5412           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5413                                      attr)
5414       iname = self.cfg.ExpandInstanceName(self.op.name)
5415       if iname is not None:
5416         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5417                                    iname)
5418       if not isinstance(self.op.nics, list):
5419         raise errors.OpPrereqError("Invalid parameter 'nics'")
5420       for row in self.op.nics:
5421         if (not isinstance(row, dict) or
5422             "mac" not in row or
5423             "ip" not in row or
5424             "bridge" not in row):
5425           raise errors.OpPrereqError("Invalid contents of the"
5426                                      " 'nics' parameter")
5427       if not isinstance(self.op.disks, list):
5428         raise errors.OpPrereqError("Invalid parameter 'disks'")
5429       if len(self.op.disks) != 2:
5430         raise errors.OpPrereqError("Only two-disk configurations supported")
5431       for row in self.op.disks:
5432         if (not isinstance(row, dict) or
5433             "size" not in row or
5434             not isinstance(row["size"], int) or
5435             "mode" not in row or
5436             row["mode"] not in ['r', 'w']):
5437           raise errors.OpPrereqError("Invalid contents of the"
5438                                      " 'disks' parameter")
5439     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5440       if not hasattr(self.op, "name"):
5441         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5442       fname = self.cfg.ExpandInstanceName(self.op.name)
5443       if fname is None:
5444         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5445                                    self.op.name)
5446       self.op.name = fname
5447       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5448     else:
5449       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5450                                  self.op.mode)
5451
5452     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5453       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5454         raise errors.OpPrereqError("Missing allocator name")
5455     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5456       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5457                                  self.op.direction)
5458
5459   def Exec(self, feedback_fn):
5460     """Run the allocator test.
5461
5462     """
5463     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5464       ial = IAllocator(self.cfg,
5465                        mode=self.op.mode,
5466                        name=self.op.name,
5467                        mem_size=self.op.mem_size,
5468                        disks=self.op.disks,
5469                        disk_template=self.op.disk_template,
5470                        os=self.op.os,
5471                        tags=self.op.tags,
5472                        nics=self.op.nics,
5473                        vcpus=self.op.vcpus,
5474                        )
5475     else:
5476       ial = IAllocator(self.cfg,
5477                        mode=self.op.mode,
5478                        name=self.op.name,
5479                        relocate_from=list(self.relocate_from),
5480                        )
5481
5482     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5483       result = ial.in_text
5484     else:
5485       ial.Run(self.op.allocator, validate=False)
5486       result = ial.out_text
5487     return result