Mainloop: handle sigterm
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_BGL = True
69
70   def __init__(self, processor, op, context, rpc):
71     """Constructor for LogicalUnit.
72
73     This needs to be overriden in derived classes in order to check op
74     validity.
75
76     """
77     self.proc = processor
78     self.op = op
79     self.cfg = context.cfg
80     self.context = context
81     self.rpc = rpc
82     # Dicts used to declare locking needs to mcpu
83     self.needed_locks = None
84     self.acquired_locks = {}
85     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86     self.add_locks = {}
87     self.remove_locks = {}
88     # Used to force good behavior when calling helper functions
89     self.recalculate_locks = {}
90     self.__ssh = None
91
92     for attr_name in self._OP_REQP:
93       attr_val = getattr(op, attr_name, None)
94       if attr_val is None:
95         raise errors.OpPrereqError("Required parameter '%s' missing" %
96                                    attr_name)
97
98     if not self.cfg.IsCluster():
99       raise errors.OpPrereqError("Cluster not initialized yet,"
100                                  " use 'gnt-cluster init' first.")
101     if self.REQ_MASTER:
102       master = self.cfg.GetMasterNode()
103       if master != utils.HostInfo().name:
104         raise errors.OpPrereqError("Commands must be run on the master"
105                                    " node %s" % master)
106
107   def __GetSSH(self):
108     """Returns the SshRunner object
109
110     """
111     if not self.__ssh:
112       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113     return self.__ssh
114
115   ssh = property(fget=__GetSSH)
116
117   def ExpandNames(self):
118     """Expand names for this LU.
119
120     This method is called before starting to execute the opcode, and it should
121     update all the parameters of the opcode to their canonical form (e.g. a
122     short node name must be fully expanded after this method has successfully
123     completed). This way locking, hooks, logging, ecc. can work correctly.
124
125     LUs which implement this method must also populate the self.needed_locks
126     member, as a dict with lock levels as keys, and a list of needed lock names
127     as values. Rules:
128       - Use an empty dict if you don't need any lock
129       - If you don't need any lock at a particular level omit that level
130       - Don't put anything for the BGL level
131       - If you want all locks at a level use locking.ALL_SET as a value
132
133     If you need to share locks (rather than acquire them exclusively) at one
134     level you can modify self.share_locks, setting a true value (usually 1) for
135     that level. By default locks are not shared.
136
137     Examples:
138     # Acquire all nodes and one instance
139     self.needed_locks = {
140       locking.LEVEL_NODE: locking.ALL_SET,
141       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
142     }
143     # Acquire just two nodes
144     self.needed_locks = {
145       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146     }
147     # Acquire no locks
148     self.needed_locks = {} # No, you can't leave it to the default value None
149
150     """
151     # The implementation of this method is mandatory only if the new LU is
152     # concurrent, so that old LUs don't need to be changed all at the same
153     # time.
154     if self.REQ_BGL:
155       self.needed_locks = {} # Exclusive LUs don't need locks.
156     else:
157       raise NotImplementedError
158
159   def DeclareLocks(self, level):
160     """Declare LU locking needs for a level
161
162     While most LUs can just declare their locking needs at ExpandNames time,
163     sometimes there's the need to calculate some locks after having acquired
164     the ones before. This function is called just before acquiring locks at a
165     particular level, but after acquiring the ones at lower levels, and permits
166     such calculations. It can be used to modify self.needed_locks, and by
167     default it does nothing.
168
169     This function is only called if you have something already set in
170     self.needed_locks for the level.
171
172     @param level: Locking level which is going to be locked
173     @type level: member of ganeti.locking.LEVELS
174
175     """
176
177   def CheckPrereq(self):
178     """Check prerequisites for this LU.
179
180     This method should check that the prerequisites for the execution
181     of this LU are fulfilled. It can do internode communication, but
182     it should be idempotent - no cluster or system changes are
183     allowed.
184
185     The method should raise errors.OpPrereqError in case something is
186     not fulfilled. Its return value is ignored.
187
188     This method should also update all the parameters of the opcode to
189     their canonical form if it hasn't been done by ExpandNames before.
190
191     """
192     raise NotImplementedError
193
194   def Exec(self, feedback_fn):
195     """Execute the LU.
196
197     This method should implement the actual work. It should raise
198     errors.OpExecError for failures that are somewhat dealt with in
199     code, or expected.
200
201     """
202     raise NotImplementedError
203
204   def BuildHooksEnv(self):
205     """Build hooks environment for this LU.
206
207     This method should return a three-node tuple consisting of: a dict
208     containing the environment that will be used for running the
209     specific hook for this LU, a list of node names on which the hook
210     should run before the execution, and a list of node names on which
211     the hook should run after the execution.
212
213     The keys of the dict must not have 'GANETI_' prefixed as this will
214     be handled in the hooks runner. Also note additional keys will be
215     added by the hooks runner. If the LU doesn't define any
216     environment, an empty dict (and not None) should be returned.
217
218     No nodes should be returned as an empty list (and not None).
219
220     Note that if the HPATH for a LU class is None, this function will
221     not be called.
222
223     """
224     raise NotImplementedError
225
226   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227     """Notify the LU about the results of its hooks.
228
229     This method is called every time a hooks phase is executed, and notifies
230     the Logical Unit about the hooks' result. The LU can then use it to alter
231     its result based on the hooks.  By default the method does nothing and the
232     previous result is passed back unchanged but any LU can define it if it
233     wants to use the local cluster hook-scripts somehow.
234
235     Args:
236       phase: the hooks phase that has just been run
237       hooks_results: the results of the multi-node hooks rpc call
238       feedback_fn: function to send feedback back to the caller
239       lu_result: the previous result this LU had, or None in the PRE phase.
240
241     """
242     return lu_result
243
244   def _ExpandAndLockInstance(self):
245     """Helper function to expand and lock an instance.
246
247     Many LUs that work on an instance take its name in self.op.instance_name
248     and need to expand it and then declare the expanded name for locking. This
249     function does it, and then updates self.op.instance_name to the expanded
250     name. It also initializes needed_locks as a dict, if this hasn't been done
251     before.
252
253     """
254     if self.needed_locks is None:
255       self.needed_locks = {}
256     else:
257       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258         "_ExpandAndLockInstance called with instance-level locks set"
259     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260     if expanded_name is None:
261       raise errors.OpPrereqError("Instance '%s' not known" %
262                                   self.op.instance_name)
263     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264     self.op.instance_name = expanded_name
265
266   def _LockInstancesNodes(self, primary_only=False):
267     """Helper function to declare instances' nodes for locking.
268
269     This function should be called after locking one or more instances to lock
270     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271     with all primary or secondary nodes for instances already locked and
272     present in self.needed_locks[locking.LEVEL_INSTANCE].
273
274     It should be called from DeclareLocks, and for safety only works if
275     self.recalculate_locks[locking.LEVEL_NODE] is set.
276
277     In the future it may grow parameters to just lock some instance's nodes, or
278     to just lock primaries or secondary nodes, if needed.
279
280     If should be called in DeclareLocks in a way similar to:
281
282     if level == locking.LEVEL_NODE:
283       self._LockInstancesNodes()
284
285     @type primary_only: boolean
286     @param primary_only: only lock primary nodes of locked instances
287
288     """
289     assert locking.LEVEL_NODE in self.recalculate_locks, \
290       "_LockInstancesNodes helper function called with no nodes to recalculate"
291
292     # TODO: check if we're really been called with the instance locks held
293
294     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
295     # future we might want to have different behaviors depending on the value
296     # of self.recalculate_locks[locking.LEVEL_NODE]
297     wanted_nodes = []
298     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
299       instance = self.context.cfg.GetInstanceInfo(instance_name)
300       wanted_nodes.append(instance.primary_node)
301       if not primary_only:
302         wanted_nodes.extend(instance.secondary_nodes)
303
304     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
305       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
306     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
307       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
308
309     del self.recalculate_locks[locking.LEVEL_NODE]
310
311
312 class NoHooksLU(LogicalUnit):
313   """Simple LU which runs no hooks.
314
315   This LU is intended as a parent for other LogicalUnits which will
316   run no hooks, in order to reduce duplicate code.
317
318   """
319   HPATH = None
320   HTYPE = None
321
322
323 def _GetWantedNodes(lu, nodes):
324   """Returns list of checked and expanded node names.
325
326   Args:
327     nodes: List of nodes (strings) or None for all
328
329   """
330   if not isinstance(nodes, list):
331     raise errors.OpPrereqError("Invalid argument type 'nodes'")
332
333   if not nodes:
334     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
335       " non-empty list of nodes whose name is to be expanded.")
336
337   wanted = []
338   for name in nodes:
339     node = lu.cfg.ExpandNodeName(name)
340     if node is None:
341       raise errors.OpPrereqError("No such node name '%s'" % name)
342     wanted.append(node)
343
344   return utils.NiceSort(wanted)
345
346
347 def _GetWantedInstances(lu, instances):
348   """Returns list of checked and expanded instance names.
349
350   Args:
351     instances: List of instances (strings) or None for all
352
353   """
354   if not isinstance(instances, list):
355     raise errors.OpPrereqError("Invalid argument type 'instances'")
356
357   if instances:
358     wanted = []
359
360     for name in instances:
361       instance = lu.cfg.ExpandInstanceName(name)
362       if instance is None:
363         raise errors.OpPrereqError("No such instance name '%s'" % name)
364       wanted.append(instance)
365
366   else:
367     wanted = lu.cfg.GetInstanceList()
368   return utils.NiceSort(wanted)
369
370
371 def _CheckOutputFields(static, dynamic, selected):
372   """Checks whether all selected fields are valid.
373
374   Args:
375     static: Static fields
376     dynamic: Dynamic fields
377
378   """
379   static_fields = frozenset(static)
380   dynamic_fields = frozenset(dynamic)
381
382   all_fields = static_fields | dynamic_fields
383
384   if not all_fields.issuperset(selected):
385     raise errors.OpPrereqError("Unknown output fields selected: %s"
386                                % ",".join(frozenset(selected).
387                                           difference(all_fields)))
388
389
390 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
391                           memory, vcpus, nics):
392   """Builds instance related env variables for hooks from single variables.
393
394   Args:
395     secondary_nodes: List of secondary nodes as strings
396   """
397   env = {
398     "OP_TARGET": name,
399     "INSTANCE_NAME": name,
400     "INSTANCE_PRIMARY": primary_node,
401     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
402     "INSTANCE_OS_TYPE": os_type,
403     "INSTANCE_STATUS": status,
404     "INSTANCE_MEMORY": memory,
405     "INSTANCE_VCPUS": vcpus,
406   }
407
408   if nics:
409     nic_count = len(nics)
410     for idx, (ip, bridge, mac) in enumerate(nics):
411       if ip is None:
412         ip = ""
413       env["INSTANCE_NIC%d_IP" % idx] = ip
414       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
415       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
416   else:
417     nic_count = 0
418
419   env["INSTANCE_NIC_COUNT"] = nic_count
420
421   return env
422
423
424 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
425   """Builds instance related env variables for hooks from an object.
426
427   Args:
428     instance: objects.Instance object of instance
429     override: dict of values to override
430   """
431   bep = lu.cfg.GetClusterInfo().FillBE(instance)
432   args = {
433     'name': instance.name,
434     'primary_node': instance.primary_node,
435     'secondary_nodes': instance.secondary_nodes,
436     'os_type': instance.os,
437     'status': instance.os,
438     'memory': bep[constants.BE_MEMORY],
439     'vcpus': bep[constants.BE_VCPUS],
440     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441   }
442   if override:
443     args.update(override)
444   return _BuildInstanceHookEnv(**args)
445
446
447 def _CheckInstanceBridgesExist(lu, instance):
448   """Check that the brigdes needed by an instance exist.
449
450   """
451   # check bridges existance
452   brlist = [nic.bridge for nic in instance.nics]
453   if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
454     raise errors.OpPrereqError("one or more target bridges %s does not"
455                                " exist on destination node '%s'" %
456                                (brlist, instance.primary_node))
457
458
459 class LUDestroyCluster(NoHooksLU):
460   """Logical unit for destroying the cluster.
461
462   """
463   _OP_REQP = []
464
465   def CheckPrereq(self):
466     """Check prerequisites.
467
468     This checks whether the cluster is empty.
469
470     Any errors are signalled by raising errors.OpPrereqError.
471
472     """
473     master = self.cfg.GetMasterNode()
474
475     nodelist = self.cfg.GetNodeList()
476     if len(nodelist) != 1 or nodelist[0] != master:
477       raise errors.OpPrereqError("There are still %d node(s) in"
478                                  " this cluster." % (len(nodelist) - 1))
479     instancelist = self.cfg.GetInstanceList()
480     if instancelist:
481       raise errors.OpPrereqError("There are still %d instance(s) in"
482                                  " this cluster." % len(instancelist))
483
484   def Exec(self, feedback_fn):
485     """Destroys the cluster.
486
487     """
488     master = self.cfg.GetMasterNode()
489     if not self.rpc.call_node_stop_master(master, False):
490       raise errors.OpExecError("Could not disable the master role")
491     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492     utils.CreateBackup(priv_key)
493     utils.CreateBackup(pub_key)
494     return master
495
496
497 class LUVerifyCluster(LogicalUnit):
498   """Verifies the cluster status.
499
500   """
501   HPATH = "cluster-verify"
502   HTYPE = constants.HTYPE_CLUSTER
503   _OP_REQP = ["skip_checks"]
504   REQ_BGL = False
505
506   def ExpandNames(self):
507     self.needed_locks = {
508       locking.LEVEL_NODE: locking.ALL_SET,
509       locking.LEVEL_INSTANCE: locking.ALL_SET,
510     }
511     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512
513   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514                   remote_version, feedback_fn):
515     """Run multiple tests against a node.
516
517     Test list:
518       - compares ganeti version
519       - checks vg existance and size > 20G
520       - checks config file checksum
521       - checks ssh to other nodes
522
523     Args:
524       node: name of the node to check
525       file_list: required list of files
526       local_cksum: dictionary of local files and their checksums
527
528     """
529     # compares ganeti version
530     local_version = constants.PROTOCOL_VERSION
531     if not remote_version:
532       feedback_fn("  - ERROR: connection to %s failed" % (node))
533       return True
534
535     if local_version != remote_version:
536       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
537                       (local_version, node, remote_version))
538       return True
539
540     # checks vg existance and size > 20G
541
542     bad = False
543     if not vglist:
544       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
545                       (node,))
546       bad = True
547     else:
548       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549                                             constants.MIN_VG_SIZE)
550       if vgstatus:
551         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
552         bad = True
553
554     if not node_result:
555       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
556       return True
557
558     # checks config file checksum
559     # checks ssh to any
560
561     if 'filelist' not in node_result:
562       bad = True
563       feedback_fn("  - ERROR: node hasn't returned file checksum data")
564     else:
565       remote_cksum = node_result['filelist']
566       for file_name in file_list:
567         if file_name not in remote_cksum:
568           bad = True
569           feedback_fn("  - ERROR: file '%s' missing" % file_name)
570         elif remote_cksum[file_name] != local_cksum[file_name]:
571           bad = True
572           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
573
574     if 'nodelist' not in node_result:
575       bad = True
576       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
577     else:
578       if node_result['nodelist']:
579         bad = True
580         for node in node_result['nodelist']:
581           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
582                           (node, node_result['nodelist'][node]))
583     if 'node-net-test' not in node_result:
584       bad = True
585       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
586     else:
587       if node_result['node-net-test']:
588         bad = True
589         nlist = utils.NiceSort(node_result['node-net-test'].keys())
590         for node in nlist:
591           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
592                           (node, node_result['node-net-test'][node]))
593
594     hyp_result = node_result.get('hypervisor', None)
595     if isinstance(hyp_result, dict):
596       for hv_name, hv_result in hyp_result.iteritems():
597         if hv_result is not None:
598           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
599                       (hv_name, hv_result))
600     return bad
601
602   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
603                       node_instance, feedback_fn):
604     """Verify an instance.
605
606     This function checks to see if the required block devices are
607     available on the instance's node.
608
609     """
610     bad = False
611
612     node_current = instanceconfig.primary_node
613
614     node_vol_should = {}
615     instanceconfig.MapLVsByNode(node_vol_should)
616
617     for node in node_vol_should:
618       for volume in node_vol_should[node]:
619         if node not in node_vol_is or volume not in node_vol_is[node]:
620           feedback_fn("  - ERROR: volume %s missing on node %s" %
621                           (volume, node))
622           bad = True
623
624     if not instanceconfig.status == 'down':
625       if (node_current not in node_instance or
626           not instance in node_instance[node_current]):
627         feedback_fn("  - ERROR: instance %s not running on node %s" %
628                         (instance, node_current))
629         bad = True
630
631     for node in node_instance:
632       if (not node == node_current):
633         if instance in node_instance[node]:
634           feedback_fn("  - ERROR: instance %s should not run on node %s" %
635                           (instance, node))
636           bad = True
637
638     return bad
639
640   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
641     """Verify if there are any unknown volumes in the cluster.
642
643     The .os, .swap and backup volumes are ignored. All other volumes are
644     reported as unknown.
645
646     """
647     bad = False
648
649     for node in node_vol_is:
650       for volume in node_vol_is[node]:
651         if node not in node_vol_should or volume not in node_vol_should[node]:
652           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
653                       (volume, node))
654           bad = True
655     return bad
656
657   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
658     """Verify the list of running instances.
659
660     This checks what instances are running but unknown to the cluster.
661
662     """
663     bad = False
664     for node in node_instance:
665       for runninginstance in node_instance[node]:
666         if runninginstance not in instancelist:
667           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
668                           (runninginstance, node))
669           bad = True
670     return bad
671
672   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
673     """Verify N+1 Memory Resilience.
674
675     Check that if one single node dies we can still start all the instances it
676     was primary for.
677
678     """
679     bad = False
680
681     for node, nodeinfo in node_info.iteritems():
682       # This code checks that every node which is now listed as secondary has
683       # enough memory to host all instances it is supposed to should a single
684       # other node in the cluster fail.
685       # FIXME: not ready for failover to an arbitrary node
686       # FIXME: does not support file-backed instances
687       # WARNING: we currently take into account down instances as well as up
688       # ones, considering that even if they're down someone might want to start
689       # them even in the event of a node failure.
690       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
691         needed_mem = 0
692         for instance in instances:
693           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
694           if bep[constants.BE_AUTO_BALANCE]:
695             needed_mem += bep[constants.BE_MEMORY]
696         if nodeinfo['mfree'] < needed_mem:
697           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
698                       " failovers should node %s fail" % (node, prinode))
699           bad = True
700     return bad
701
702   def CheckPrereq(self):
703     """Check prerequisites.
704
705     Transform the list of checks we're going to skip into a set and check that
706     all its members are valid.
707
708     """
709     self.skip_set = frozenset(self.op.skip_checks)
710     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
711       raise errors.OpPrereqError("Invalid checks to be skipped specified")
712
713   def BuildHooksEnv(self):
714     """Build hooks env.
715
716     Cluster-Verify hooks just rone in the post phase and their failure makes
717     the output be logged in the verify output and the verification to fail.
718
719     """
720     all_nodes = self.cfg.GetNodeList()
721     # TODO: populate the environment with useful information for verify hooks
722     env = {}
723     return env, [], all_nodes
724
725   def Exec(self, feedback_fn):
726     """Verify integrity of cluster, performing various test on nodes.
727
728     """
729     bad = False
730     feedback_fn("* Verifying global settings")
731     for msg in self.cfg.VerifyConfig():
732       feedback_fn("  - ERROR: %s" % msg)
733
734     vg_name = self.cfg.GetVGName()
735     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
736     nodelist = utils.NiceSort(self.cfg.GetNodeList())
737     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
738     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
739     i_non_redundant = [] # Non redundant instances
740     i_non_a_balanced = [] # Non auto-balanced instances
741     node_volume = {}
742     node_instance = {}
743     node_info = {}
744     instance_cfg = {}
745
746     # FIXME: verify OS list
747     # do local checksums
748     file_names = []
749     file_names.append(constants.SSL_CERT_FILE)
750     file_names.append(constants.CLUSTER_CONF_FILE)
751     local_checksums = utils.FingerprintFiles(file_names)
752
753     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
754     all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
755     all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
756     all_vglist = self.rpc.call_vg_list(nodelist)
757     node_verify_param = {
758       'filelist': file_names,
759       'nodelist': nodelist,
760       'hypervisor': hypervisors,
761       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
762                         for node in nodeinfo]
763       }
764     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
765                                            self.cfg.GetClusterName())
766     all_rversion = self.rpc.call_version(nodelist)
767     all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
768                                         self.cfg.GetHypervisorType())
769
770     cluster = self.cfg.GetClusterInfo()
771     for node in nodelist:
772       feedback_fn("* Verifying node %s" % node)
773       result = self._VerifyNode(node, file_names, local_checksums,
774                                 all_vglist[node], all_nvinfo[node],
775                                 all_rversion[node], feedback_fn)
776       bad = bad or result
777
778       # node_volume
779       volumeinfo = all_volumeinfo[node]
780
781       if isinstance(volumeinfo, basestring):
782         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
783                     (node, volumeinfo[-400:].encode('string_escape')))
784         bad = True
785         node_volume[node] = {}
786       elif not isinstance(volumeinfo, dict):
787         feedback_fn("  - ERROR: connection to %s failed" % (node,))
788         bad = True
789         continue
790       else:
791         node_volume[node] = volumeinfo
792
793       # node_instance
794       nodeinstance = all_instanceinfo[node]
795       if type(nodeinstance) != list:
796         feedback_fn("  - ERROR: connection to %s failed" % (node,))
797         bad = True
798         continue
799
800       node_instance[node] = nodeinstance
801
802       # node_info
803       nodeinfo = all_ninfo[node]
804       if not isinstance(nodeinfo, dict):
805         feedback_fn("  - ERROR: connection to %s failed" % (node,))
806         bad = True
807         continue
808
809       try:
810         node_info[node] = {
811           "mfree": int(nodeinfo['memory_free']),
812           "dfree": int(nodeinfo['vg_free']),
813           "pinst": [],
814           "sinst": [],
815           # dictionary holding all instances this node is secondary for,
816           # grouped by their primary node. Each key is a cluster node, and each
817           # value is a list of instances which have the key as primary and the
818           # current node as secondary.  this is handy to calculate N+1 memory
819           # availability if you can only failover from a primary to its
820           # secondary.
821           "sinst-by-pnode": {},
822         }
823       except ValueError:
824         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
825         bad = True
826         continue
827
828     node_vol_should = {}
829
830     for instance in instancelist:
831       feedback_fn("* Verifying instance %s" % instance)
832       inst_config = self.cfg.GetInstanceInfo(instance)
833       result =  self._VerifyInstance(instance, inst_config, node_volume,
834                                      node_instance, feedback_fn)
835       bad = bad or result
836
837       inst_config.MapLVsByNode(node_vol_should)
838
839       instance_cfg[instance] = inst_config
840
841       pnode = inst_config.primary_node
842       if pnode in node_info:
843         node_info[pnode]['pinst'].append(instance)
844       else:
845         feedback_fn("  - ERROR: instance %s, connection to primary node"
846                     " %s failed" % (instance, pnode))
847         bad = True
848
849       # If the instance is non-redundant we cannot survive losing its primary
850       # node, so we are not N+1 compliant. On the other hand we have no disk
851       # templates with more than one secondary so that situation is not well
852       # supported either.
853       # FIXME: does not support file-backed instances
854       if len(inst_config.secondary_nodes) == 0:
855         i_non_redundant.append(instance)
856       elif len(inst_config.secondary_nodes) > 1:
857         feedback_fn("  - WARNING: multiple secondaries for instance %s"
858                     % instance)
859
860       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
861         i_non_a_balanced.append(instance)
862
863       for snode in inst_config.secondary_nodes:
864         if snode in node_info:
865           node_info[snode]['sinst'].append(instance)
866           if pnode not in node_info[snode]['sinst-by-pnode']:
867             node_info[snode]['sinst-by-pnode'][pnode] = []
868           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
869         else:
870           feedback_fn("  - ERROR: instance %s, connection to secondary node"
871                       " %s failed" % (instance, snode))
872
873     feedback_fn("* Verifying orphan volumes")
874     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
875                                        feedback_fn)
876     bad = bad or result
877
878     feedback_fn("* Verifying remaining instances")
879     result = self._VerifyOrphanInstances(instancelist, node_instance,
880                                          feedback_fn)
881     bad = bad or result
882
883     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
884       feedback_fn("* Verifying N+1 Memory redundancy")
885       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
886       bad = bad or result
887
888     feedback_fn("* Other Notes")
889     if i_non_redundant:
890       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
891                   % len(i_non_redundant))
892
893     if i_non_a_balanced:
894       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
895                   % len(i_non_a_balanced))
896
897     return not bad
898
899   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
900     """Analize the post-hooks' result, handle it, and send some
901     nicely-formatted feedback back to the user.
902
903     Args:
904       phase: the hooks phase that has just been run
905       hooks_results: the results of the multi-node hooks rpc call
906       feedback_fn: function to send feedback back to the caller
907       lu_result: previous Exec result
908
909     """
910     # We only really run POST phase hooks, and are only interested in
911     # their results
912     if phase == constants.HOOKS_PHASE_POST:
913       # Used to change hooks' output to proper indentation
914       indent_re = re.compile('^', re.M)
915       feedback_fn("* Hooks Results")
916       if not hooks_results:
917         feedback_fn("  - ERROR: general communication failure")
918         lu_result = 1
919       else:
920         for node_name in hooks_results:
921           show_node_header = True
922           res = hooks_results[node_name]
923           if res is False or not isinstance(res, list):
924             feedback_fn("    Communication failure")
925             lu_result = 1
926             continue
927           for script, hkr, output in res:
928             if hkr == constants.HKR_FAIL:
929               # The node header is only shown once, if there are
930               # failing hooks on that node
931               if show_node_header:
932                 feedback_fn("  Node %s:" % node_name)
933                 show_node_header = False
934               feedback_fn("    ERROR: Script %s failed, output:" % script)
935               output = indent_re.sub('      ', output)
936               feedback_fn("%s" % output)
937               lu_result = 1
938
939       return lu_result
940
941
942 class LUVerifyDisks(NoHooksLU):
943   """Verifies the cluster disks status.
944
945   """
946   _OP_REQP = []
947   REQ_BGL = False
948
949   def ExpandNames(self):
950     self.needed_locks = {
951       locking.LEVEL_NODE: locking.ALL_SET,
952       locking.LEVEL_INSTANCE: locking.ALL_SET,
953     }
954     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
955
956   def CheckPrereq(self):
957     """Check prerequisites.
958
959     This has no prerequisites.
960
961     """
962     pass
963
964   def Exec(self, feedback_fn):
965     """Verify integrity of cluster disks.
966
967     """
968     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
969
970     vg_name = self.cfg.GetVGName()
971     nodes = utils.NiceSort(self.cfg.GetNodeList())
972     instances = [self.cfg.GetInstanceInfo(name)
973                  for name in self.cfg.GetInstanceList()]
974
975     nv_dict = {}
976     for inst in instances:
977       inst_lvs = {}
978       if (inst.status != "up" or
979           inst.disk_template not in constants.DTS_NET_MIRROR):
980         continue
981       inst.MapLVsByNode(inst_lvs)
982       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
983       for node, vol_list in inst_lvs.iteritems():
984         for vol in vol_list:
985           nv_dict[(node, vol)] = inst
986
987     if not nv_dict:
988       return result
989
990     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
991
992     to_act = set()
993     for node in nodes:
994       # node_volume
995       lvs = node_lvs[node]
996
997       if isinstance(lvs, basestring):
998         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
999         res_nlvm[node] = lvs
1000       elif not isinstance(lvs, dict):
1001         logger.Info("connection to node %s failed or invalid data returned" %
1002                     (node,))
1003         res_nodes.append(node)
1004         continue
1005
1006       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1007         inst = nv_dict.pop((node, lv_name), None)
1008         if (not lv_online and inst is not None
1009             and inst.name not in res_instances):
1010           res_instances.append(inst.name)
1011
1012     # any leftover items in nv_dict are missing LVs, let's arrange the
1013     # data better
1014     for key, inst in nv_dict.iteritems():
1015       if inst.name not in res_missing:
1016         res_missing[inst.name] = []
1017       res_missing[inst.name].append(key)
1018
1019     return result
1020
1021
1022 class LURenameCluster(LogicalUnit):
1023   """Rename the cluster.
1024
1025   """
1026   HPATH = "cluster-rename"
1027   HTYPE = constants.HTYPE_CLUSTER
1028   _OP_REQP = ["name"]
1029
1030   def BuildHooksEnv(self):
1031     """Build hooks env.
1032
1033     """
1034     env = {
1035       "OP_TARGET": self.cfg.GetClusterName(),
1036       "NEW_NAME": self.op.name,
1037       }
1038     mn = self.cfg.GetMasterNode()
1039     return env, [mn], [mn]
1040
1041   def CheckPrereq(self):
1042     """Verify that the passed name is a valid one.
1043
1044     """
1045     hostname = utils.HostInfo(self.op.name)
1046
1047     new_name = hostname.name
1048     self.ip = new_ip = hostname.ip
1049     old_name = self.cfg.GetClusterName()
1050     old_ip = self.cfg.GetMasterIP()
1051     if new_name == old_name and new_ip == old_ip:
1052       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1053                                  " cluster has changed")
1054     if new_ip != old_ip:
1055       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1056         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1057                                    " reachable on the network. Aborting." %
1058                                    new_ip)
1059
1060     self.op.name = new_name
1061
1062   def Exec(self, feedback_fn):
1063     """Rename the cluster.
1064
1065     """
1066     clustername = self.op.name
1067     ip = self.ip
1068
1069     # shutdown the master IP
1070     master = self.cfg.GetMasterNode()
1071     if not self.rpc.call_node_stop_master(master, False):
1072       raise errors.OpExecError("Could not disable the master role")
1073
1074     try:
1075       # modify the sstore
1076       # TODO: sstore
1077       ss.SetKey(ss.SS_MASTER_IP, ip)
1078       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1079
1080       # Distribute updated ss config to all nodes
1081       myself = self.cfg.GetNodeInfo(master)
1082       dist_nodes = self.cfg.GetNodeList()
1083       if myself.name in dist_nodes:
1084         dist_nodes.remove(myself.name)
1085
1086       logger.Debug("Copying updated ssconf data to all nodes")
1087       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1088         fname = ss.KeyToFilename(keyname)
1089         result = self.rpc.call_upload_file(dist_nodes, fname)
1090         for to_node in dist_nodes:
1091           if not result[to_node]:
1092             logger.Error("copy of file %s to node %s failed" %
1093                          (fname, to_node))
1094     finally:
1095       if not self.rpc.call_node_start_master(master, False):
1096         logger.Error("Could not re-enable the master role on the master,"
1097                      " please restart manually.")
1098
1099
1100 def _RecursiveCheckIfLVMBased(disk):
1101   """Check if the given disk or its children are lvm-based.
1102
1103   Args:
1104     disk: ganeti.objects.Disk object
1105
1106   Returns:
1107     boolean indicating whether a LD_LV dev_type was found or not
1108
1109   """
1110   if disk.children:
1111     for chdisk in disk.children:
1112       if _RecursiveCheckIfLVMBased(chdisk):
1113         return True
1114   return disk.dev_type == constants.LD_LV
1115
1116
1117 class LUSetClusterParams(LogicalUnit):
1118   """Change the parameters of the cluster.
1119
1120   """
1121   HPATH = "cluster-modify"
1122   HTYPE = constants.HTYPE_CLUSTER
1123   _OP_REQP = []
1124   REQ_BGL = False
1125
1126   def ExpandNames(self):
1127     # FIXME: in the future maybe other cluster params won't require checking on
1128     # all nodes to be modified.
1129     self.needed_locks = {
1130       locking.LEVEL_NODE: locking.ALL_SET,
1131     }
1132     self.share_locks[locking.LEVEL_NODE] = 1
1133
1134   def BuildHooksEnv(self):
1135     """Build hooks env.
1136
1137     """
1138     env = {
1139       "OP_TARGET": self.cfg.GetClusterName(),
1140       "NEW_VG_NAME": self.op.vg_name,
1141       }
1142     mn = self.cfg.GetMasterNode()
1143     return env, [mn], [mn]
1144
1145   def CheckPrereq(self):
1146     """Check prerequisites.
1147
1148     This checks whether the given params don't conflict and
1149     if the given volume group is valid.
1150
1151     """
1152     # FIXME: This only works because there is only one parameter that can be
1153     # changed or removed.
1154     if self.op.vg_name is not None and not self.op.vg_name:
1155       instances = self.cfg.GetAllInstancesInfo().values()
1156       for inst in instances:
1157         for disk in inst.disks:
1158           if _RecursiveCheckIfLVMBased(disk):
1159             raise errors.OpPrereqError("Cannot disable lvm storage while"
1160                                        " lvm-based instances exist")
1161
1162     node_list = self.acquired_locks[locking.LEVEL_NODE]
1163
1164     # if vg_name not None, checks given volume group on all nodes
1165     if self.op.vg_name:
1166       vglist = self.rpc.call_vg_list(node_list)
1167       for node in node_list:
1168         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1169                                               constants.MIN_VG_SIZE)
1170         if vgstatus:
1171           raise errors.OpPrereqError("Error on node '%s': %s" %
1172                                      (node, vgstatus))
1173
1174     self.cluster = cluster = self.cfg.GetClusterInfo()
1175     # beparams changes do not need validation (we can't validate?),
1176     # but we still process here
1177     if self.op.beparams:
1178       self.new_beparams = cluster.FillDict(
1179         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1180
1181     # hypervisor list/parameters
1182     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1183     if self.op.hvparams:
1184       if not isinstance(self.op.hvparams, dict):
1185         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1186       for hv_name, hv_dict in self.op.hvparams.items():
1187         if hv_name not in self.new_hvparams:
1188           self.new_hvparams[hv_name] = hv_dict
1189         else:
1190           self.new_hvparams[hv_name].update(hv_dict)
1191
1192     if self.op.enabled_hypervisors is not None:
1193       self.hv_list = self.op.enabled_hypervisors
1194     else:
1195       self.hv_list = cluster.enabled_hypervisors
1196
1197     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1198       # either the enabled list has changed, or the parameters have, validate
1199       for hv_name, hv_params in self.new_hvparams.items():
1200         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1201             (self.op.enabled_hypervisors and
1202              hv_name in self.op.enabled_hypervisors)):
1203           # either this is a new hypervisor, or its parameters have changed
1204           hv_class = hypervisor.GetHypervisor(hv_name)
1205           hv_class.CheckParameterSyntax(hv_params)
1206           _CheckHVParams(self, node_list, hv_name, hv_params)
1207
1208   def Exec(self, feedback_fn):
1209     """Change the parameters of the cluster.
1210
1211     """
1212     if self.op.vg_name is not None:
1213       if self.op.vg_name != self.cfg.GetVGName():
1214         self.cfg.SetVGName(self.op.vg_name)
1215       else:
1216         feedback_fn("Cluster LVM configuration already in desired"
1217                     " state, not changing")
1218     if self.op.hvparams:
1219       self.cluster.hvparams = self.new_hvparams
1220     if self.op.enabled_hypervisors is not None:
1221       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1222     if self.op.beparams:
1223       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1224     self.cfg.Update(self.cluster)
1225
1226
1227 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1228   """Sleep and poll for an instance's disk to sync.
1229
1230   """
1231   if not instance.disks:
1232     return True
1233
1234   if not oneshot:
1235     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1236
1237   node = instance.primary_node
1238
1239   for dev in instance.disks:
1240     lu.cfg.SetDiskID(dev, node)
1241
1242   retries = 0
1243   while True:
1244     max_time = 0
1245     done = True
1246     cumul_degraded = False
1247     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1248     if not rstats:
1249       lu.proc.LogWarning("Can't get any data from node %s" % node)
1250       retries += 1
1251       if retries >= 10:
1252         raise errors.RemoteError("Can't contact node %s for mirror data,"
1253                                  " aborting." % node)
1254       time.sleep(6)
1255       continue
1256     retries = 0
1257     for i in range(len(rstats)):
1258       mstat = rstats[i]
1259       if mstat is None:
1260         lu.proc.LogWarning("Can't compute data for node %s/%s" %
1261                            (node, instance.disks[i].iv_name))
1262         continue
1263       # we ignore the ldisk parameter
1264       perc_done, est_time, is_degraded, _ = mstat
1265       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1266       if perc_done is not None:
1267         done = False
1268         if est_time is not None:
1269           rem_time = "%d estimated seconds remaining" % est_time
1270           max_time = est_time
1271         else:
1272           rem_time = "no time estimate"
1273         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1274                         (instance.disks[i].iv_name, perc_done, rem_time))
1275     if done or oneshot:
1276       break
1277
1278     time.sleep(min(60, max_time))
1279
1280   if done:
1281     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1282   return not cumul_degraded
1283
1284
1285 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1286   """Check that mirrors are not degraded.
1287
1288   The ldisk parameter, if True, will change the test from the
1289   is_degraded attribute (which represents overall non-ok status for
1290   the device(s)) to the ldisk (representing the local storage status).
1291
1292   """
1293   lu.cfg.SetDiskID(dev, node)
1294   if ldisk:
1295     idx = 6
1296   else:
1297     idx = 5
1298
1299   result = True
1300   if on_primary or dev.AssembleOnSecondary():
1301     rstats = lu.rpc.call_blockdev_find(node, dev)
1302     if not rstats:
1303       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1304       result = False
1305     else:
1306       result = result and (not rstats[idx])
1307   if dev.children:
1308     for child in dev.children:
1309       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1310
1311   return result
1312
1313
1314 class LUDiagnoseOS(NoHooksLU):
1315   """Logical unit for OS diagnose/query.
1316
1317   """
1318   _OP_REQP = ["output_fields", "names"]
1319   REQ_BGL = False
1320
1321   def ExpandNames(self):
1322     if self.op.names:
1323       raise errors.OpPrereqError("Selective OS query not supported")
1324
1325     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1326     _CheckOutputFields(static=[],
1327                        dynamic=self.dynamic_fields,
1328                        selected=self.op.output_fields)
1329
1330     # Lock all nodes, in shared mode
1331     self.needed_locks = {}
1332     self.share_locks[locking.LEVEL_NODE] = 1
1333     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1334
1335   def CheckPrereq(self):
1336     """Check prerequisites.
1337
1338     """
1339
1340   @staticmethod
1341   def _DiagnoseByOS(node_list, rlist):
1342     """Remaps a per-node return list into an a per-os per-node dictionary
1343
1344       Args:
1345         node_list: a list with the names of all nodes
1346         rlist: a map with node names as keys and OS objects as values
1347
1348       Returns:
1349         map: a map with osnames as keys and as value another map, with
1350              nodes as
1351              keys and list of OS objects as values
1352              e.g. {"debian-etch": {"node1": [<object>,...],
1353                                    "node2": [<object>,]}
1354                   }
1355
1356     """
1357     all_os = {}
1358     for node_name, nr in rlist.iteritems():
1359       if not nr:
1360         continue
1361       for os_obj in nr:
1362         if os_obj.name not in all_os:
1363           # build a list of nodes for this os containing empty lists
1364           # for each node in node_list
1365           all_os[os_obj.name] = {}
1366           for nname in node_list:
1367             all_os[os_obj.name][nname] = []
1368         all_os[os_obj.name][node_name].append(os_obj)
1369     return all_os
1370
1371   def Exec(self, feedback_fn):
1372     """Compute the list of OSes.
1373
1374     """
1375     node_list = self.acquired_locks[locking.LEVEL_NODE]
1376     node_data = self.rpc.call_os_diagnose(node_list)
1377     if node_data == False:
1378       raise errors.OpExecError("Can't gather the list of OSes")
1379     pol = self._DiagnoseByOS(node_list, node_data)
1380     output = []
1381     for os_name, os_data in pol.iteritems():
1382       row = []
1383       for field in self.op.output_fields:
1384         if field == "name":
1385           val = os_name
1386         elif field == "valid":
1387           val = utils.all([osl and osl[0] for osl in os_data.values()])
1388         elif field == "node_status":
1389           val = {}
1390           for node_name, nos_list in os_data.iteritems():
1391             val[node_name] = [(v.status, v.path) for v in nos_list]
1392         else:
1393           raise errors.ParameterError(field)
1394         row.append(val)
1395       output.append(row)
1396
1397     return output
1398
1399
1400 class LURemoveNode(LogicalUnit):
1401   """Logical unit for removing a node.
1402
1403   """
1404   HPATH = "node-remove"
1405   HTYPE = constants.HTYPE_NODE
1406   _OP_REQP = ["node_name"]
1407
1408   def BuildHooksEnv(self):
1409     """Build hooks env.
1410
1411     This doesn't run on the target node in the pre phase as a failed
1412     node would then be impossible to remove.
1413
1414     """
1415     env = {
1416       "OP_TARGET": self.op.node_name,
1417       "NODE_NAME": self.op.node_name,
1418       }
1419     all_nodes = self.cfg.GetNodeList()
1420     all_nodes.remove(self.op.node_name)
1421     return env, all_nodes, all_nodes
1422
1423   def CheckPrereq(self):
1424     """Check prerequisites.
1425
1426     This checks:
1427      - the node exists in the configuration
1428      - it does not have primary or secondary instances
1429      - it's not the master
1430
1431     Any errors are signalled by raising errors.OpPrereqError.
1432
1433     """
1434     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1435     if node is None:
1436       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1437
1438     instance_list = self.cfg.GetInstanceList()
1439
1440     masternode = self.cfg.GetMasterNode()
1441     if node.name == masternode:
1442       raise errors.OpPrereqError("Node is the master node,"
1443                                  " you need to failover first.")
1444
1445     for instance_name in instance_list:
1446       instance = self.cfg.GetInstanceInfo(instance_name)
1447       if node.name == instance.primary_node:
1448         raise errors.OpPrereqError("Instance %s still running on the node,"
1449                                    " please remove first." % instance_name)
1450       if node.name in instance.secondary_nodes:
1451         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1452                                    " please remove first." % instance_name)
1453     self.op.node_name = node.name
1454     self.node = node
1455
1456   def Exec(self, feedback_fn):
1457     """Removes the node from the cluster.
1458
1459     """
1460     node = self.node
1461     logger.Info("stopping the node daemon and removing configs from node %s" %
1462                 node.name)
1463
1464     self.context.RemoveNode(node.name)
1465
1466     self.rpc.call_node_leave_cluster(node.name)
1467
1468
1469 class LUQueryNodes(NoHooksLU):
1470   """Logical unit for querying nodes.
1471
1472   """
1473   _OP_REQP = ["output_fields", "names"]
1474   REQ_BGL = False
1475
1476   def ExpandNames(self):
1477     self.dynamic_fields = frozenset([
1478       "dtotal", "dfree",
1479       "mtotal", "mnode", "mfree",
1480       "bootid",
1481       "ctotal",
1482       ])
1483
1484     self.static_fields = frozenset([
1485       "name", "pinst_cnt", "sinst_cnt",
1486       "pinst_list", "sinst_list",
1487       "pip", "sip", "tags",
1488       "serial_no",
1489       ])
1490
1491     _CheckOutputFields(static=self.static_fields,
1492                        dynamic=self.dynamic_fields,
1493                        selected=self.op.output_fields)
1494
1495     self.needed_locks = {}
1496     self.share_locks[locking.LEVEL_NODE] = 1
1497
1498     if self.op.names:
1499       self.wanted = _GetWantedNodes(self, self.op.names)
1500     else:
1501       self.wanted = locking.ALL_SET
1502
1503     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1504     if self.do_locking:
1505       # if we don't request only static fields, we need to lock the nodes
1506       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1507
1508
1509   def CheckPrereq(self):
1510     """Check prerequisites.
1511
1512     """
1513     # The validation of the node list is done in the _GetWantedNodes,
1514     # if non empty, and if empty, there's no validation to do
1515     pass
1516
1517   def Exec(self, feedback_fn):
1518     """Computes the list of nodes and their attributes.
1519
1520     """
1521     all_info = self.cfg.GetAllNodesInfo()
1522     if self.do_locking:
1523       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1524     elif self.wanted != locking.ALL_SET:
1525       nodenames = self.wanted
1526       missing = set(nodenames).difference(all_info.keys())
1527       if missing:
1528         raise errors.OpExecError(
1529           "Some nodes were removed before retrieving their data: %s" % missing)
1530     else:
1531       nodenames = all_info.keys()
1532
1533     nodenames = utils.NiceSort(nodenames)
1534     nodelist = [all_info[name] for name in nodenames]
1535
1536     # begin data gathering
1537
1538     if self.dynamic_fields.intersection(self.op.output_fields):
1539       live_data = {}
1540       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1541                                           self.cfg.GetHypervisorType())
1542       for name in nodenames:
1543         nodeinfo = node_data.get(name, None)
1544         if nodeinfo:
1545           live_data[name] = {
1546             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1547             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1548             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1549             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1550             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1551             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1552             "bootid": nodeinfo['bootid'],
1553             }
1554         else:
1555           live_data[name] = {}
1556     else:
1557       live_data = dict.fromkeys(nodenames, {})
1558
1559     node_to_primary = dict([(name, set()) for name in nodenames])
1560     node_to_secondary = dict([(name, set()) for name in nodenames])
1561
1562     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1563                              "sinst_cnt", "sinst_list"))
1564     if inst_fields & frozenset(self.op.output_fields):
1565       instancelist = self.cfg.GetInstanceList()
1566
1567       for instance_name in instancelist:
1568         inst = self.cfg.GetInstanceInfo(instance_name)
1569         if inst.primary_node in node_to_primary:
1570           node_to_primary[inst.primary_node].add(inst.name)
1571         for secnode in inst.secondary_nodes:
1572           if secnode in node_to_secondary:
1573             node_to_secondary[secnode].add(inst.name)
1574
1575     # end data gathering
1576
1577     output = []
1578     for node in nodelist:
1579       node_output = []
1580       for field in self.op.output_fields:
1581         if field == "name":
1582           val = node.name
1583         elif field == "pinst_list":
1584           val = list(node_to_primary[node.name])
1585         elif field == "sinst_list":
1586           val = list(node_to_secondary[node.name])
1587         elif field == "pinst_cnt":
1588           val = len(node_to_primary[node.name])
1589         elif field == "sinst_cnt":
1590           val = len(node_to_secondary[node.name])
1591         elif field == "pip":
1592           val = node.primary_ip
1593         elif field == "sip":
1594           val = node.secondary_ip
1595         elif field == "tags":
1596           val = list(node.GetTags())
1597         elif field == "serial_no":
1598           val = node.serial_no
1599         elif field in self.dynamic_fields:
1600           val = live_data[node.name].get(field, None)
1601         else:
1602           raise errors.ParameterError(field)
1603         node_output.append(val)
1604       output.append(node_output)
1605
1606     return output
1607
1608
1609 class LUQueryNodeVolumes(NoHooksLU):
1610   """Logical unit for getting volumes on node(s).
1611
1612   """
1613   _OP_REQP = ["nodes", "output_fields"]
1614   REQ_BGL = False
1615
1616   def ExpandNames(self):
1617     _CheckOutputFields(static=["node"],
1618                        dynamic=["phys", "vg", "name", "size", "instance"],
1619                        selected=self.op.output_fields)
1620
1621     self.needed_locks = {}
1622     self.share_locks[locking.LEVEL_NODE] = 1
1623     if not self.op.nodes:
1624       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1625     else:
1626       self.needed_locks[locking.LEVEL_NODE] = \
1627         _GetWantedNodes(self, self.op.nodes)
1628
1629   def CheckPrereq(self):
1630     """Check prerequisites.
1631
1632     This checks that the fields required are valid output fields.
1633
1634     """
1635     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1636
1637   def Exec(self, feedback_fn):
1638     """Computes the list of nodes and their attributes.
1639
1640     """
1641     nodenames = self.nodes
1642     volumes = self.rpc.call_node_volumes(nodenames)
1643
1644     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1645              in self.cfg.GetInstanceList()]
1646
1647     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1648
1649     output = []
1650     for node in nodenames:
1651       if node not in volumes or not volumes[node]:
1652         continue
1653
1654       node_vols = volumes[node][:]
1655       node_vols.sort(key=lambda vol: vol['dev'])
1656
1657       for vol in node_vols:
1658         node_output = []
1659         for field in self.op.output_fields:
1660           if field == "node":
1661             val = node
1662           elif field == "phys":
1663             val = vol['dev']
1664           elif field == "vg":
1665             val = vol['vg']
1666           elif field == "name":
1667             val = vol['name']
1668           elif field == "size":
1669             val = int(float(vol['size']))
1670           elif field == "instance":
1671             for inst in ilist:
1672               if node not in lv_by_node[inst]:
1673                 continue
1674               if vol['name'] in lv_by_node[inst][node]:
1675                 val = inst.name
1676                 break
1677             else:
1678               val = '-'
1679           else:
1680             raise errors.ParameterError(field)
1681           node_output.append(str(val))
1682
1683         output.append(node_output)
1684
1685     return output
1686
1687
1688 class LUAddNode(LogicalUnit):
1689   """Logical unit for adding node to the cluster.
1690
1691   """
1692   HPATH = "node-add"
1693   HTYPE = constants.HTYPE_NODE
1694   _OP_REQP = ["node_name"]
1695
1696   def BuildHooksEnv(self):
1697     """Build hooks env.
1698
1699     This will run on all nodes before, and on all nodes + the new node after.
1700
1701     """
1702     env = {
1703       "OP_TARGET": self.op.node_name,
1704       "NODE_NAME": self.op.node_name,
1705       "NODE_PIP": self.op.primary_ip,
1706       "NODE_SIP": self.op.secondary_ip,
1707       }
1708     nodes_0 = self.cfg.GetNodeList()
1709     nodes_1 = nodes_0 + [self.op.node_name, ]
1710     return env, nodes_0, nodes_1
1711
1712   def CheckPrereq(self):
1713     """Check prerequisites.
1714
1715     This checks:
1716      - the new node is not already in the config
1717      - it is resolvable
1718      - its parameters (single/dual homed) matches the cluster
1719
1720     Any errors are signalled by raising errors.OpPrereqError.
1721
1722     """
1723     node_name = self.op.node_name
1724     cfg = self.cfg
1725
1726     dns_data = utils.HostInfo(node_name)
1727
1728     node = dns_data.name
1729     primary_ip = self.op.primary_ip = dns_data.ip
1730     secondary_ip = getattr(self.op, "secondary_ip", None)
1731     if secondary_ip is None:
1732       secondary_ip = primary_ip
1733     if not utils.IsValidIP(secondary_ip):
1734       raise errors.OpPrereqError("Invalid secondary IP given")
1735     self.op.secondary_ip = secondary_ip
1736
1737     node_list = cfg.GetNodeList()
1738     if not self.op.readd and node in node_list:
1739       raise errors.OpPrereqError("Node %s is already in the configuration" %
1740                                  node)
1741     elif self.op.readd and node not in node_list:
1742       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1743
1744     for existing_node_name in node_list:
1745       existing_node = cfg.GetNodeInfo(existing_node_name)
1746
1747       if self.op.readd and node == existing_node_name:
1748         if (existing_node.primary_ip != primary_ip or
1749             existing_node.secondary_ip != secondary_ip):
1750           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1751                                      " address configuration as before")
1752         continue
1753
1754       if (existing_node.primary_ip == primary_ip or
1755           existing_node.secondary_ip == primary_ip or
1756           existing_node.primary_ip == secondary_ip or
1757           existing_node.secondary_ip == secondary_ip):
1758         raise errors.OpPrereqError("New node ip address(es) conflict with"
1759                                    " existing node %s" % existing_node.name)
1760
1761     # check that the type of the node (single versus dual homed) is the
1762     # same as for the master
1763     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1764     master_singlehomed = myself.secondary_ip == myself.primary_ip
1765     newbie_singlehomed = secondary_ip == primary_ip
1766     if master_singlehomed != newbie_singlehomed:
1767       if master_singlehomed:
1768         raise errors.OpPrereqError("The master has no private ip but the"
1769                                    " new node has one")
1770       else:
1771         raise errors.OpPrereqError("The master has a private ip but the"
1772                                    " new node doesn't have one")
1773
1774     # checks reachablity
1775     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1776       raise errors.OpPrereqError("Node not reachable by ping")
1777
1778     if not newbie_singlehomed:
1779       # check reachability from my secondary ip to newbie's secondary ip
1780       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1781                            source=myself.secondary_ip):
1782         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1783                                    " based ping to noded port")
1784
1785     self.new_node = objects.Node(name=node,
1786                                  primary_ip=primary_ip,
1787                                  secondary_ip=secondary_ip)
1788
1789   def Exec(self, feedback_fn):
1790     """Adds the new node to the cluster.
1791
1792     """
1793     new_node = self.new_node
1794     node = new_node.name
1795
1796     # check connectivity
1797     result = self.rpc.call_version([node])[node]
1798     if result:
1799       if constants.PROTOCOL_VERSION == result:
1800         logger.Info("communication to node %s fine, sw version %s match" %
1801                     (node, result))
1802       else:
1803         raise errors.OpExecError("Version mismatch master version %s,"
1804                                  " node version %s" %
1805                                  (constants.PROTOCOL_VERSION, result))
1806     else:
1807       raise errors.OpExecError("Cannot get version from the new node")
1808
1809     # setup ssh on node
1810     logger.Info("copy ssh key to node %s" % node)
1811     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1812     keyarray = []
1813     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1814                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1815                 priv_key, pub_key]
1816
1817     for i in keyfiles:
1818       f = open(i, 'r')
1819       try:
1820         keyarray.append(f.read())
1821       finally:
1822         f.close()
1823
1824     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1825                                     keyarray[2],
1826                                     keyarray[3], keyarray[4], keyarray[5])
1827
1828     if not result:
1829       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1830
1831     # Add node to our /etc/hosts, and add key to known_hosts
1832     utils.AddHostToEtcHosts(new_node.name)
1833
1834     if new_node.secondary_ip != new_node.primary_ip:
1835       if not self.rpc.call_node_has_ip_address(new_node.name,
1836                                                new_node.secondary_ip):
1837         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1838                                  " you gave (%s). Please fix and re-run this"
1839                                  " command." % new_node.secondary_ip)
1840
1841     node_verify_list = [self.cfg.GetMasterNode()]
1842     node_verify_param = {
1843       'nodelist': [node],
1844       # TODO: do a node-net-test as well?
1845     }
1846
1847     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1848                                        self.cfg.GetClusterName())
1849     for verifier in node_verify_list:
1850       if not result[verifier]:
1851         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1852                                  " for remote verification" % verifier)
1853       if result[verifier]['nodelist']:
1854         for failed in result[verifier]['nodelist']:
1855           feedback_fn("ssh/hostname verification failed %s -> %s" %
1856                       (verifier, result[verifier]['nodelist'][failed]))
1857         raise errors.OpExecError("ssh/hostname verification failed.")
1858
1859     # Distribute updated /etc/hosts and known_hosts to all nodes,
1860     # including the node just added
1861     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1862     dist_nodes = self.cfg.GetNodeList()
1863     if not self.op.readd:
1864       dist_nodes.append(node)
1865     if myself.name in dist_nodes:
1866       dist_nodes.remove(myself.name)
1867
1868     logger.Debug("Copying hosts and known_hosts to all nodes")
1869     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1870       result = self.rpc.call_upload_file(dist_nodes, fname)
1871       for to_node in dist_nodes:
1872         if not result[to_node]:
1873           logger.Error("copy of file %s to node %s failed" %
1874                        (fname, to_node))
1875
1876     to_copy = []
1877     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1878       to_copy.append(constants.VNC_PASSWORD_FILE)
1879     for fname in to_copy:
1880       result = self.rpc.call_upload_file([node], fname)
1881       if not result[node]:
1882         logger.Error("could not copy file %s to node %s" % (fname, node))
1883
1884     if self.op.readd:
1885       self.context.ReaddNode(new_node)
1886     else:
1887       self.context.AddNode(new_node)
1888
1889
1890 class LUQueryClusterInfo(NoHooksLU):
1891   """Query cluster configuration.
1892
1893   """
1894   _OP_REQP = []
1895   REQ_MASTER = False
1896   REQ_BGL = False
1897
1898   def ExpandNames(self):
1899     self.needed_locks = {}
1900
1901   def CheckPrereq(self):
1902     """No prerequsites needed for this LU.
1903
1904     """
1905     pass
1906
1907   def Exec(self, feedback_fn):
1908     """Return cluster config.
1909
1910     """
1911     cluster = self.cfg.GetClusterInfo()
1912     result = {
1913       "software_version": constants.RELEASE_VERSION,
1914       "protocol_version": constants.PROTOCOL_VERSION,
1915       "config_version": constants.CONFIG_VERSION,
1916       "os_api_version": constants.OS_API_VERSION,
1917       "export_version": constants.EXPORT_VERSION,
1918       "architecture": (platform.architecture()[0], platform.machine()),
1919       "name": cluster.cluster_name,
1920       "master": cluster.master_node,
1921       "hypervisor_type": cluster.hypervisor,
1922       "enabled_hypervisors": cluster.enabled_hypervisors,
1923       "hvparams": cluster.hvparams,
1924       "beparams": cluster.beparams,
1925       }
1926
1927     return result
1928
1929
1930 class LUQueryConfigValues(NoHooksLU):
1931   """Return configuration values.
1932
1933   """
1934   _OP_REQP = []
1935   REQ_BGL = False
1936
1937   def ExpandNames(self):
1938     self.needed_locks = {}
1939
1940     static_fields = ["cluster_name", "master_node", "drain_flag"]
1941     _CheckOutputFields(static=static_fields,
1942                        dynamic=[],
1943                        selected=self.op.output_fields)
1944
1945   def CheckPrereq(self):
1946     """No prerequisites.
1947
1948     """
1949     pass
1950
1951   def Exec(self, feedback_fn):
1952     """Dump a representation of the cluster config to the standard output.
1953
1954     """
1955     values = []
1956     for field in self.op.output_fields:
1957       if field == "cluster_name":
1958         entry = self.cfg.GetClusterName()
1959       elif field == "master_node":
1960         entry = self.cfg.GetMasterNode()
1961       elif field == "drain_flag":
1962         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1963       else:
1964         raise errors.ParameterError(field)
1965       values.append(entry)
1966     return values
1967
1968
1969 class LUActivateInstanceDisks(NoHooksLU):
1970   """Bring up an instance's disks.
1971
1972   """
1973   _OP_REQP = ["instance_name"]
1974   REQ_BGL = False
1975
1976   def ExpandNames(self):
1977     self._ExpandAndLockInstance()
1978     self.needed_locks[locking.LEVEL_NODE] = []
1979     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1980
1981   def DeclareLocks(self, level):
1982     if level == locking.LEVEL_NODE:
1983       self._LockInstancesNodes()
1984
1985   def CheckPrereq(self):
1986     """Check prerequisites.
1987
1988     This checks that the instance is in the cluster.
1989
1990     """
1991     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1992     assert self.instance is not None, \
1993       "Cannot retrieve locked instance %s" % self.op.instance_name
1994
1995   def Exec(self, feedback_fn):
1996     """Activate the disks.
1997
1998     """
1999     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2000     if not disks_ok:
2001       raise errors.OpExecError("Cannot activate block devices")
2002
2003     return disks_info
2004
2005
2006 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2007   """Prepare the block devices for an instance.
2008
2009   This sets up the block devices on all nodes.
2010
2011   Args:
2012     instance: a ganeti.objects.Instance object
2013     ignore_secondaries: if true, errors on secondary nodes won't result
2014                         in an error return from the function
2015
2016   Returns:
2017     false if the operation failed
2018     list of (host, instance_visible_name, node_visible_name) if the operation
2019          suceeded with the mapping from node devices to instance devices
2020   """
2021   device_info = []
2022   disks_ok = True
2023   iname = instance.name
2024   # With the two passes mechanism we try to reduce the window of
2025   # opportunity for the race condition of switching DRBD to primary
2026   # before handshaking occured, but we do not eliminate it
2027
2028   # The proper fix would be to wait (with some limits) until the
2029   # connection has been made and drbd transitions from WFConnection
2030   # into any other network-connected state (Connected, SyncTarget,
2031   # SyncSource, etc.)
2032
2033   # 1st pass, assemble on all nodes in secondary mode
2034   for inst_disk in instance.disks:
2035     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2036       lu.cfg.SetDiskID(node_disk, node)
2037       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2038       if not result:
2039         logger.Error("could not prepare block device %s on node %s"
2040                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2041         if not ignore_secondaries:
2042           disks_ok = False
2043
2044   # FIXME: race condition on drbd migration to primary
2045
2046   # 2nd pass, do only the primary node
2047   for inst_disk in instance.disks:
2048     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2049       if node != instance.primary_node:
2050         continue
2051       lu.cfg.SetDiskID(node_disk, node)
2052       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2053       if not result:
2054         logger.Error("could not prepare block device %s on node %s"
2055                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2056         disks_ok = False
2057     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2058
2059   # leave the disks configured for the primary node
2060   # this is a workaround that would be fixed better by
2061   # improving the logical/physical id handling
2062   for disk in instance.disks:
2063     lu.cfg.SetDiskID(disk, instance.primary_node)
2064
2065   return disks_ok, device_info
2066
2067
2068 def _StartInstanceDisks(lu, instance, force):
2069   """Start the disks of an instance.
2070
2071   """
2072   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2073                                            ignore_secondaries=force)
2074   if not disks_ok:
2075     _ShutdownInstanceDisks(lu, instance)
2076     if force is not None and not force:
2077       logger.Error("If the message above refers to a secondary node,"
2078                    " you can retry the operation using '--force'.")
2079     raise errors.OpExecError("Disk consistency error")
2080
2081
2082 class LUDeactivateInstanceDisks(NoHooksLU):
2083   """Shutdown an instance's disks.
2084
2085   """
2086   _OP_REQP = ["instance_name"]
2087   REQ_BGL = False
2088
2089   def ExpandNames(self):
2090     self._ExpandAndLockInstance()
2091     self.needed_locks[locking.LEVEL_NODE] = []
2092     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2093
2094   def DeclareLocks(self, level):
2095     if level == locking.LEVEL_NODE:
2096       self._LockInstancesNodes()
2097
2098   def CheckPrereq(self):
2099     """Check prerequisites.
2100
2101     This checks that the instance is in the cluster.
2102
2103     """
2104     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2105     assert self.instance is not None, \
2106       "Cannot retrieve locked instance %s" % self.op.instance_name
2107
2108   def Exec(self, feedback_fn):
2109     """Deactivate the disks
2110
2111     """
2112     instance = self.instance
2113     _SafeShutdownInstanceDisks(self, instance)
2114
2115
2116 def _SafeShutdownInstanceDisks(lu, instance):
2117   """Shutdown block devices of an instance.
2118
2119   This function checks if an instance is running, before calling
2120   _ShutdownInstanceDisks.
2121
2122   """
2123   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2124                                       [instance.hypervisor])
2125   ins_l = ins_l[instance.primary_node]
2126   if not type(ins_l) is list:
2127     raise errors.OpExecError("Can't contact node '%s'" %
2128                              instance.primary_node)
2129
2130   if instance.name in ins_l:
2131     raise errors.OpExecError("Instance is running, can't shutdown"
2132                              " block devices.")
2133
2134   _ShutdownInstanceDisks(lu, instance)
2135
2136
2137 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2138   """Shutdown block devices of an instance.
2139
2140   This does the shutdown on all nodes of the instance.
2141
2142   If the ignore_primary is false, errors on the primary node are
2143   ignored.
2144
2145   """
2146   result = True
2147   for disk in instance.disks:
2148     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2149       lu.cfg.SetDiskID(top_disk, node)
2150       if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2151         logger.Error("could not shutdown block device %s on node %s" %
2152                      (disk.iv_name, node))
2153         if not ignore_primary or node != instance.primary_node:
2154           result = False
2155   return result
2156
2157
2158 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2159   """Checks if a node has enough free memory.
2160
2161   This function check if a given node has the needed amount of free
2162   memory. In case the node has less memory or we cannot get the
2163   information from the node, this function raise an OpPrereqError
2164   exception.
2165
2166   @type lu: C{LogicalUnit}
2167   @param lu: a logical unit from which we get configuration data
2168   @type node: C{str}
2169   @param node: the node to check
2170   @type reason: C{str}
2171   @param reason: string to use in the error message
2172   @type requested: C{int}
2173   @param requested: the amount of memory in MiB to check for
2174   @type hypervisor: C{str}
2175   @param hypervisor: the hypervisor to ask for memory stats
2176   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2177       we cannot check the node
2178
2179   """
2180   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2181   if not nodeinfo or not isinstance(nodeinfo, dict):
2182     raise errors.OpPrereqError("Could not contact node %s for resource"
2183                              " information" % (node,))
2184
2185   free_mem = nodeinfo[node].get('memory_free')
2186   if not isinstance(free_mem, int):
2187     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2188                              " was '%s'" % (node, free_mem))
2189   if requested > free_mem:
2190     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2191                              " needed %s MiB, available %s MiB" %
2192                              (node, reason, requested, free_mem))
2193
2194
2195 class LUStartupInstance(LogicalUnit):
2196   """Starts an instance.
2197
2198   """
2199   HPATH = "instance-start"
2200   HTYPE = constants.HTYPE_INSTANCE
2201   _OP_REQP = ["instance_name", "force"]
2202   REQ_BGL = False
2203
2204   def ExpandNames(self):
2205     self._ExpandAndLockInstance()
2206     self.needed_locks[locking.LEVEL_NODE] = []
2207     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2208
2209   def DeclareLocks(self, level):
2210     if level == locking.LEVEL_NODE:
2211       self._LockInstancesNodes()
2212
2213   def BuildHooksEnv(self):
2214     """Build hooks env.
2215
2216     This runs on master, primary and secondary nodes of the instance.
2217
2218     """
2219     env = {
2220       "FORCE": self.op.force,
2221       }
2222     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2223     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2224           list(self.instance.secondary_nodes))
2225     return env, nl, nl
2226
2227   def CheckPrereq(self):
2228     """Check prerequisites.
2229
2230     This checks that the instance is in the cluster.
2231
2232     """
2233     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2234     assert self.instance is not None, \
2235       "Cannot retrieve locked instance %s" % self.op.instance_name
2236
2237     bep = self.cfg.GetClusterInfo().FillBE(instance)
2238     # check bridges existance
2239     _CheckInstanceBridgesExist(self, instance)
2240
2241     _CheckNodeFreeMemory(self, instance.primary_node,
2242                          "starting instance %s" % instance.name,
2243                          bep[constants.BE_MEMORY], instance.hypervisor)
2244
2245   def Exec(self, feedback_fn):
2246     """Start the instance.
2247
2248     """
2249     instance = self.instance
2250     force = self.op.force
2251     extra_args = getattr(self.op, "extra_args", "")
2252
2253     self.cfg.MarkInstanceUp(instance.name)
2254
2255     node_current = instance.primary_node
2256
2257     _StartInstanceDisks(self, instance, force)
2258
2259     if not self.rpc.call_instance_start(node_current, instance, extra_args):
2260       _ShutdownInstanceDisks(self, instance)
2261       raise errors.OpExecError("Could not start instance")
2262
2263
2264 class LURebootInstance(LogicalUnit):
2265   """Reboot an instance.
2266
2267   """
2268   HPATH = "instance-reboot"
2269   HTYPE = constants.HTYPE_INSTANCE
2270   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2271   REQ_BGL = False
2272
2273   def ExpandNames(self):
2274     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2275                                    constants.INSTANCE_REBOOT_HARD,
2276                                    constants.INSTANCE_REBOOT_FULL]:
2277       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2278                                   (constants.INSTANCE_REBOOT_SOFT,
2279                                    constants.INSTANCE_REBOOT_HARD,
2280                                    constants.INSTANCE_REBOOT_FULL))
2281     self._ExpandAndLockInstance()
2282     self.needed_locks[locking.LEVEL_NODE] = []
2283     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2284
2285   def DeclareLocks(self, level):
2286     if level == locking.LEVEL_NODE:
2287       primary_only = not constants.INSTANCE_REBOOT_FULL
2288       self._LockInstancesNodes(primary_only=primary_only)
2289
2290   def BuildHooksEnv(self):
2291     """Build hooks env.
2292
2293     This runs on master, primary and secondary nodes of the instance.
2294
2295     """
2296     env = {
2297       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2298       }
2299     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2300     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2301           list(self.instance.secondary_nodes))
2302     return env, nl, nl
2303
2304   def CheckPrereq(self):
2305     """Check prerequisites.
2306
2307     This checks that the instance is in the cluster.
2308
2309     """
2310     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2311     assert self.instance is not None, \
2312       "Cannot retrieve locked instance %s" % self.op.instance_name
2313
2314     # check bridges existance
2315     _CheckInstanceBridgesExist(self, instance)
2316
2317   def Exec(self, feedback_fn):
2318     """Reboot the instance.
2319
2320     """
2321     instance = self.instance
2322     ignore_secondaries = self.op.ignore_secondaries
2323     reboot_type = self.op.reboot_type
2324     extra_args = getattr(self.op, "extra_args", "")
2325
2326     node_current = instance.primary_node
2327
2328     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2329                        constants.INSTANCE_REBOOT_HARD]:
2330       if not self.rpc.call_instance_reboot(node_current, instance,
2331                                            reboot_type, extra_args):
2332         raise errors.OpExecError("Could not reboot instance")
2333     else:
2334       if not self.rpc.call_instance_shutdown(node_current, instance):
2335         raise errors.OpExecError("could not shutdown instance for full reboot")
2336       _ShutdownInstanceDisks(self, instance)
2337       _StartInstanceDisks(self, instance, ignore_secondaries)
2338       if not self.rpc.call_instance_start(node_current, instance, extra_args):
2339         _ShutdownInstanceDisks(self, instance)
2340         raise errors.OpExecError("Could not start instance for full reboot")
2341
2342     self.cfg.MarkInstanceUp(instance.name)
2343
2344
2345 class LUShutdownInstance(LogicalUnit):
2346   """Shutdown an instance.
2347
2348   """
2349   HPATH = "instance-stop"
2350   HTYPE = constants.HTYPE_INSTANCE
2351   _OP_REQP = ["instance_name"]
2352   REQ_BGL = False
2353
2354   def ExpandNames(self):
2355     self._ExpandAndLockInstance()
2356     self.needed_locks[locking.LEVEL_NODE] = []
2357     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2358
2359   def DeclareLocks(self, level):
2360     if level == locking.LEVEL_NODE:
2361       self._LockInstancesNodes()
2362
2363   def BuildHooksEnv(self):
2364     """Build hooks env.
2365
2366     This runs on master, primary and secondary nodes of the instance.
2367
2368     """
2369     env = _BuildInstanceHookEnvByObject(self, self.instance)
2370     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2371           list(self.instance.secondary_nodes))
2372     return env, nl, nl
2373
2374   def CheckPrereq(self):
2375     """Check prerequisites.
2376
2377     This checks that the instance is in the cluster.
2378
2379     """
2380     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2381     assert self.instance is not None, \
2382       "Cannot retrieve locked instance %s" % self.op.instance_name
2383
2384   def Exec(self, feedback_fn):
2385     """Shutdown the instance.
2386
2387     """
2388     instance = self.instance
2389     node_current = instance.primary_node
2390     self.cfg.MarkInstanceDown(instance.name)
2391     if not self.rpc.call_instance_shutdown(node_current, instance):
2392       logger.Error("could not shutdown instance")
2393
2394     _ShutdownInstanceDisks(self, instance)
2395
2396
2397 class LUReinstallInstance(LogicalUnit):
2398   """Reinstall an instance.
2399
2400   """
2401   HPATH = "instance-reinstall"
2402   HTYPE = constants.HTYPE_INSTANCE
2403   _OP_REQP = ["instance_name"]
2404   REQ_BGL = False
2405
2406   def ExpandNames(self):
2407     self._ExpandAndLockInstance()
2408     self.needed_locks[locking.LEVEL_NODE] = []
2409     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2410
2411   def DeclareLocks(self, level):
2412     if level == locking.LEVEL_NODE:
2413       self._LockInstancesNodes()
2414
2415   def BuildHooksEnv(self):
2416     """Build hooks env.
2417
2418     This runs on master, primary and secondary nodes of the instance.
2419
2420     """
2421     env = _BuildInstanceHookEnvByObject(self, self.instance)
2422     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2423           list(self.instance.secondary_nodes))
2424     return env, nl, nl
2425
2426   def CheckPrereq(self):
2427     """Check prerequisites.
2428
2429     This checks that the instance is in the cluster and is not running.
2430
2431     """
2432     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2433     assert instance is not None, \
2434       "Cannot retrieve locked instance %s" % self.op.instance_name
2435
2436     if instance.disk_template == constants.DT_DISKLESS:
2437       raise errors.OpPrereqError("Instance '%s' has no disks" %
2438                                  self.op.instance_name)
2439     if instance.status != "down":
2440       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2441                                  self.op.instance_name)
2442     remote_info = self.rpc.call_instance_info(instance.primary_node,
2443                                               instance.name,
2444                                               instance.hypervisor)
2445     if remote_info:
2446       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2447                                  (self.op.instance_name,
2448                                   instance.primary_node))
2449
2450     self.op.os_type = getattr(self.op, "os_type", None)
2451     if self.op.os_type is not None:
2452       # OS verification
2453       pnode = self.cfg.GetNodeInfo(
2454         self.cfg.ExpandNodeName(instance.primary_node))
2455       if pnode is None:
2456         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2457                                    self.op.pnode)
2458       os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2459       if not os_obj:
2460         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2461                                    " primary node"  % self.op.os_type)
2462
2463     self.instance = instance
2464
2465   def Exec(self, feedback_fn):
2466     """Reinstall the instance.
2467
2468     """
2469     inst = self.instance
2470
2471     if self.op.os_type is not None:
2472       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2473       inst.os = self.op.os_type
2474       self.cfg.Update(inst)
2475
2476     _StartInstanceDisks(self, inst, None)
2477     try:
2478       feedback_fn("Running the instance OS create scripts...")
2479       if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2480                                            "sda", "sdb"):
2481         raise errors.OpExecError("Could not install OS for instance %s"
2482                                  " on node %s" %
2483                                  (inst.name, inst.primary_node))
2484     finally:
2485       _ShutdownInstanceDisks(self, inst)
2486
2487
2488 class LURenameInstance(LogicalUnit):
2489   """Rename an instance.
2490
2491   """
2492   HPATH = "instance-rename"
2493   HTYPE = constants.HTYPE_INSTANCE
2494   _OP_REQP = ["instance_name", "new_name"]
2495
2496   def BuildHooksEnv(self):
2497     """Build hooks env.
2498
2499     This runs on master, primary and secondary nodes of the instance.
2500
2501     """
2502     env = _BuildInstanceHookEnvByObject(self, self.instance)
2503     env["INSTANCE_NEW_NAME"] = self.op.new_name
2504     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2505           list(self.instance.secondary_nodes))
2506     return env, nl, nl
2507
2508   def CheckPrereq(self):
2509     """Check prerequisites.
2510
2511     This checks that the instance is in the cluster and is not running.
2512
2513     """
2514     instance = self.cfg.GetInstanceInfo(
2515       self.cfg.ExpandInstanceName(self.op.instance_name))
2516     if instance is None:
2517       raise errors.OpPrereqError("Instance '%s' not known" %
2518                                  self.op.instance_name)
2519     if instance.status != "down":
2520       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2521                                  self.op.instance_name)
2522     remote_info = self.rpc.call_instance_info(instance.primary_node,
2523                                               instance.name,
2524                                               instance.hypervisor)
2525     if remote_info:
2526       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2527                                  (self.op.instance_name,
2528                                   instance.primary_node))
2529     self.instance = instance
2530
2531     # new name verification
2532     name_info = utils.HostInfo(self.op.new_name)
2533
2534     self.op.new_name = new_name = name_info.name
2535     instance_list = self.cfg.GetInstanceList()
2536     if new_name in instance_list:
2537       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2538                                  new_name)
2539
2540     if not getattr(self.op, "ignore_ip", False):
2541       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2542         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2543                                    (name_info.ip, new_name))
2544
2545
2546   def Exec(self, feedback_fn):
2547     """Reinstall the instance.
2548
2549     """
2550     inst = self.instance
2551     old_name = inst.name
2552
2553     if inst.disk_template == constants.DT_FILE:
2554       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2555
2556     self.cfg.RenameInstance(inst.name, self.op.new_name)
2557     # Change the instance lock. This is definitely safe while we hold the BGL
2558     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2559     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2560
2561     # re-read the instance from the configuration after rename
2562     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2563
2564     if inst.disk_template == constants.DT_FILE:
2565       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2566       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2567                                                      old_file_storage_dir,
2568                                                      new_file_storage_dir)
2569
2570       if not result:
2571         raise errors.OpExecError("Could not connect to node '%s' to rename"
2572                                  " directory '%s' to '%s' (but the instance"
2573                                  " has been renamed in Ganeti)" % (
2574                                  inst.primary_node, old_file_storage_dir,
2575                                  new_file_storage_dir))
2576
2577       if not result[0]:
2578         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2579                                  " (but the instance has been renamed in"
2580                                  " Ganeti)" % (old_file_storage_dir,
2581                                                new_file_storage_dir))
2582
2583     _StartInstanceDisks(self, inst, None)
2584     try:
2585       if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2586                                                old_name,
2587                                                "sda", "sdb"):
2588         msg = ("Could not run OS rename script for instance %s on node %s"
2589                " (but the instance has been renamed in Ganeti)" %
2590                (inst.name, inst.primary_node))
2591         logger.Error(msg)
2592     finally:
2593       _ShutdownInstanceDisks(self, inst)
2594
2595
2596 class LURemoveInstance(LogicalUnit):
2597   """Remove an instance.
2598
2599   """
2600   HPATH = "instance-remove"
2601   HTYPE = constants.HTYPE_INSTANCE
2602   _OP_REQP = ["instance_name", "ignore_failures"]
2603   REQ_BGL = False
2604
2605   def ExpandNames(self):
2606     self._ExpandAndLockInstance()
2607     self.needed_locks[locking.LEVEL_NODE] = []
2608     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2609
2610   def DeclareLocks(self, level):
2611     if level == locking.LEVEL_NODE:
2612       self._LockInstancesNodes()
2613
2614   def BuildHooksEnv(self):
2615     """Build hooks env.
2616
2617     This runs on master, primary and secondary nodes of the instance.
2618
2619     """
2620     env = _BuildInstanceHookEnvByObject(self, self.instance)
2621     nl = [self.cfg.GetMasterNode()]
2622     return env, nl, nl
2623
2624   def CheckPrereq(self):
2625     """Check prerequisites.
2626
2627     This checks that the instance is in the cluster.
2628
2629     """
2630     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2631     assert self.instance is not None, \
2632       "Cannot retrieve locked instance %s" % self.op.instance_name
2633
2634   def Exec(self, feedback_fn):
2635     """Remove the instance.
2636
2637     """
2638     instance = self.instance
2639     logger.Info("shutting down instance %s on node %s" %
2640                 (instance.name, instance.primary_node))
2641
2642     if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2643       if self.op.ignore_failures:
2644         feedback_fn("Warning: can't shutdown instance")
2645       else:
2646         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2647                                  (instance.name, instance.primary_node))
2648
2649     logger.Info("removing block devices for instance %s" % instance.name)
2650
2651     if not _RemoveDisks(self, instance):
2652       if self.op.ignore_failures:
2653         feedback_fn("Warning: can't remove instance's disks")
2654       else:
2655         raise errors.OpExecError("Can't remove instance's disks")
2656
2657     logger.Info("removing instance %s out of cluster config" % instance.name)
2658
2659     self.cfg.RemoveInstance(instance.name)
2660     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2661
2662
2663 class LUQueryInstances(NoHooksLU):
2664   """Logical unit for querying instances.
2665
2666   """
2667   _OP_REQP = ["output_fields", "names"]
2668   REQ_BGL = False
2669
2670   def ExpandNames(self):
2671     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2672     hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2673     bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2674     self.static_fields = frozenset([
2675       "name", "os", "pnode", "snodes",
2676       "admin_state", "admin_ram",
2677       "disk_template", "ip", "mac", "bridge",
2678       "sda_size", "sdb_size", "vcpus", "tags",
2679       "network_port",
2680       "serial_no", "hypervisor", "hvparams",
2681       ] + hvp + bep)
2682
2683     _CheckOutputFields(static=self.static_fields,
2684                        dynamic=self.dynamic_fields,
2685                        selected=self.op.output_fields)
2686
2687     self.needed_locks = {}
2688     self.share_locks[locking.LEVEL_INSTANCE] = 1
2689     self.share_locks[locking.LEVEL_NODE] = 1
2690
2691     if self.op.names:
2692       self.wanted = _GetWantedInstances(self, self.op.names)
2693     else:
2694       self.wanted = locking.ALL_SET
2695
2696     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2697     if self.do_locking:
2698       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2699       self.needed_locks[locking.LEVEL_NODE] = []
2700       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2701
2702   def DeclareLocks(self, level):
2703     if level == locking.LEVEL_NODE and self.do_locking:
2704       self._LockInstancesNodes()
2705
2706   def CheckPrereq(self):
2707     """Check prerequisites.
2708
2709     """
2710     pass
2711
2712   def Exec(self, feedback_fn):
2713     """Computes the list of nodes and their attributes.
2714
2715     """
2716     all_info = self.cfg.GetAllInstancesInfo()
2717     if self.do_locking:
2718       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2719     elif self.wanted != locking.ALL_SET:
2720       instance_names = self.wanted
2721       missing = set(instance_names).difference(all_info.keys())
2722       if missing:
2723         raise errors.OpExecError(
2724           "Some instances were removed before retrieving their data: %s"
2725           % missing)
2726     else:
2727       instance_names = all_info.keys()
2728
2729     instance_names = utils.NiceSort(instance_names)
2730     instance_list = [all_info[iname] for iname in instance_names]
2731
2732     # begin data gathering
2733
2734     nodes = frozenset([inst.primary_node for inst in instance_list])
2735     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2736
2737     bad_nodes = []
2738     if self.dynamic_fields.intersection(self.op.output_fields):
2739       live_data = {}
2740       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2741       for name in nodes:
2742         result = node_data[name]
2743         if result:
2744           live_data.update(result)
2745         elif result == False:
2746           bad_nodes.append(name)
2747         # else no instance is alive
2748     else:
2749       live_data = dict([(name, {}) for name in instance_names])
2750
2751     # end data gathering
2752
2753     HVPREFIX = "hv/"
2754     BEPREFIX = "be/"
2755     output = []
2756     for instance in instance_list:
2757       iout = []
2758       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2759       i_be = self.cfg.GetClusterInfo().FillBE(instance)
2760       for field in self.op.output_fields:
2761         if field == "name":
2762           val = instance.name
2763         elif field == "os":
2764           val = instance.os
2765         elif field == "pnode":
2766           val = instance.primary_node
2767         elif field == "snodes":
2768           val = list(instance.secondary_nodes)
2769         elif field == "admin_state":
2770           val = (instance.status != "down")
2771         elif field == "oper_state":
2772           if instance.primary_node in bad_nodes:
2773             val = None
2774           else:
2775             val = bool(live_data.get(instance.name))
2776         elif field == "status":
2777           if instance.primary_node in bad_nodes:
2778             val = "ERROR_nodedown"
2779           else:
2780             running = bool(live_data.get(instance.name))
2781             if running:
2782               if instance.status != "down":
2783                 val = "running"
2784               else:
2785                 val = "ERROR_up"
2786             else:
2787               if instance.status != "down":
2788                 val = "ERROR_down"
2789               else:
2790                 val = "ADMIN_down"
2791         elif field == "oper_ram":
2792           if instance.primary_node in bad_nodes:
2793             val = None
2794           elif instance.name in live_data:
2795             val = live_data[instance.name].get("memory", "?")
2796           else:
2797             val = "-"
2798         elif field == "disk_template":
2799           val = instance.disk_template
2800         elif field == "ip":
2801           val = instance.nics[0].ip
2802         elif field == "bridge":
2803           val = instance.nics[0].bridge
2804         elif field == "mac":
2805           val = instance.nics[0].mac
2806         elif field == "sda_size" or field == "sdb_size":
2807           disk = instance.FindDisk(field[:3])
2808           if disk is None:
2809             val = None
2810           else:
2811             val = disk.size
2812         elif field == "tags":
2813           val = list(instance.GetTags())
2814         elif field == "serial_no":
2815           val = instance.serial_no
2816         elif field == "network_port":
2817           val = instance.network_port
2818         elif field == "hypervisor":
2819           val = instance.hypervisor
2820         elif field == "hvparams":
2821           val = i_hv
2822         elif (field.startswith(HVPREFIX) and
2823               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2824           val = i_hv.get(field[len(HVPREFIX):], None)
2825         elif field == "beparams":
2826           val = i_be
2827         elif (field.startswith(BEPREFIX) and
2828               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2829           val = i_be.get(field[len(BEPREFIX):], None)
2830         else:
2831           raise errors.ParameterError(field)
2832         iout.append(val)
2833       output.append(iout)
2834
2835     return output
2836
2837
2838 class LUFailoverInstance(LogicalUnit):
2839   """Failover an instance.
2840
2841   """
2842   HPATH = "instance-failover"
2843   HTYPE = constants.HTYPE_INSTANCE
2844   _OP_REQP = ["instance_name", "ignore_consistency"]
2845   REQ_BGL = False
2846
2847   def ExpandNames(self):
2848     self._ExpandAndLockInstance()
2849     self.needed_locks[locking.LEVEL_NODE] = []
2850     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2851
2852   def DeclareLocks(self, level):
2853     if level == locking.LEVEL_NODE:
2854       self._LockInstancesNodes()
2855
2856   def BuildHooksEnv(self):
2857     """Build hooks env.
2858
2859     This runs on master, primary and secondary nodes of the instance.
2860
2861     """
2862     env = {
2863       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2864       }
2865     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2866     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2867     return env, nl, nl
2868
2869   def CheckPrereq(self):
2870     """Check prerequisites.
2871
2872     This checks that the instance is in the cluster.
2873
2874     """
2875     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2876     assert self.instance is not None, \
2877       "Cannot retrieve locked instance %s" % self.op.instance_name
2878
2879     bep = self.cfg.GetClusterInfo().FillBE(instance)
2880     if instance.disk_template not in constants.DTS_NET_MIRROR:
2881       raise errors.OpPrereqError("Instance's disk layout is not"
2882                                  " network mirrored, cannot failover.")
2883
2884     secondary_nodes = instance.secondary_nodes
2885     if not secondary_nodes:
2886       raise errors.ProgrammerError("no secondary node but using "
2887                                    "a mirrored disk template")
2888
2889     target_node = secondary_nodes[0]
2890     # check memory requirements on the secondary node
2891     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2892                          instance.name, bep[constants.BE_MEMORY],
2893                          instance.hypervisor)
2894
2895     # check bridge existance
2896     brlist = [nic.bridge for nic in instance.nics]
2897     if not self.rpc.call_bridges_exist(target_node, brlist):
2898       raise errors.OpPrereqError("One or more target bridges %s does not"
2899                                  " exist on destination node '%s'" %
2900                                  (brlist, target_node))
2901
2902   def Exec(self, feedback_fn):
2903     """Failover an instance.
2904
2905     The failover is done by shutting it down on its present node and
2906     starting it on the secondary.
2907
2908     """
2909     instance = self.instance
2910
2911     source_node = instance.primary_node
2912     target_node = instance.secondary_nodes[0]
2913
2914     feedback_fn("* checking disk consistency between source and target")
2915     for dev in instance.disks:
2916       # for drbd, these are drbd over lvm
2917       if not _CheckDiskConsistency(self, dev, target_node, False):
2918         if instance.status == "up" and not self.op.ignore_consistency:
2919           raise errors.OpExecError("Disk %s is degraded on target node,"
2920                                    " aborting failover." % dev.iv_name)
2921
2922     feedback_fn("* shutting down instance on source node")
2923     logger.Info("Shutting down instance %s on node %s" %
2924                 (instance.name, source_node))
2925
2926     if not self.rpc.call_instance_shutdown(source_node, instance):
2927       if self.op.ignore_consistency:
2928         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2929                      " anyway. Please make sure node %s is down"  %
2930                      (instance.name, source_node, source_node))
2931       else:
2932         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2933                                  (instance.name, source_node))
2934
2935     feedback_fn("* deactivating the instance's disks on source node")
2936     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2937       raise errors.OpExecError("Can't shut down the instance's disks.")
2938
2939     instance.primary_node = target_node
2940     # distribute new instance config to the other nodes
2941     self.cfg.Update(instance)
2942
2943     # Only start the instance if it's marked as up
2944     if instance.status == "up":
2945       feedback_fn("* activating the instance's disks on target node")
2946       logger.Info("Starting instance %s on node %s" %
2947                   (instance.name, target_node))
2948
2949       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2950                                                ignore_secondaries=True)
2951       if not disks_ok:
2952         _ShutdownInstanceDisks(self, instance)
2953         raise errors.OpExecError("Can't activate the instance's disks")
2954
2955       feedback_fn("* starting the instance on the target node")
2956       if not self.rpc.call_instance_start(target_node, instance, None):
2957         _ShutdownInstanceDisks(self, instance)
2958         raise errors.OpExecError("Could not start instance %s on node %s." %
2959                                  (instance.name, target_node))
2960
2961
2962 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2963   """Create a tree of block devices on the primary node.
2964
2965   This always creates all devices.
2966
2967   """
2968   if device.children:
2969     for child in device.children:
2970       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2971         return False
2972
2973   lu.cfg.SetDiskID(device, node)
2974   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2975                                        instance.name, True, info)
2976   if not new_id:
2977     return False
2978   if device.physical_id is None:
2979     device.physical_id = new_id
2980   return True
2981
2982
2983 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2984   """Create a tree of block devices on a secondary node.
2985
2986   If this device type has to be created on secondaries, create it and
2987   all its children.
2988
2989   If not, just recurse to children keeping the same 'force' value.
2990
2991   """
2992   if device.CreateOnSecondary():
2993     force = True
2994   if device.children:
2995     for child in device.children:
2996       if not _CreateBlockDevOnSecondary(lu, node, instance,
2997                                         child, force, info):
2998         return False
2999
3000   if not force:
3001     return True
3002   lu.cfg.SetDiskID(device, node)
3003   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3004                                        instance.name, False, info)
3005   if not new_id:
3006     return False
3007   if device.physical_id is None:
3008     device.physical_id = new_id
3009   return True
3010
3011
3012 def _GenerateUniqueNames(lu, exts):
3013   """Generate a suitable LV name.
3014
3015   This will generate a logical volume name for the given instance.
3016
3017   """
3018   results = []
3019   for val in exts:
3020     new_id = lu.cfg.GenerateUniqueID()
3021     results.append("%s%s" % (new_id, val))
3022   return results
3023
3024
3025 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3026                          p_minor, s_minor):
3027   """Generate a drbd8 device complete with its children.
3028
3029   """
3030   port = lu.cfg.AllocatePort()
3031   vgname = lu.cfg.GetVGName()
3032   shared_secret = lu.cfg.GenerateDRBDSecret()
3033   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3034                           logical_id=(vgname, names[0]))
3035   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3036                           logical_id=(vgname, names[1]))
3037   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3038                           logical_id=(primary, secondary, port,
3039                                       p_minor, s_minor,
3040                                       shared_secret),
3041                           children=[dev_data, dev_meta],
3042                           iv_name=iv_name)
3043   return drbd_dev
3044
3045
3046 def _GenerateDiskTemplate(lu, template_name,
3047                           instance_name, primary_node,
3048                           secondary_nodes, disk_sz, swap_sz,
3049                           file_storage_dir, file_driver):
3050   """Generate the entire disk layout for a given template type.
3051
3052   """
3053   #TODO: compute space requirements
3054
3055   vgname = lu.cfg.GetVGName()
3056   if template_name == constants.DT_DISKLESS:
3057     disks = []
3058   elif template_name == constants.DT_PLAIN:
3059     if len(secondary_nodes) != 0:
3060       raise errors.ProgrammerError("Wrong template configuration")
3061
3062     names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3063     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3064                            logical_id=(vgname, names[0]),
3065                            iv_name = "sda")
3066     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3067                            logical_id=(vgname, names[1]),
3068                            iv_name = "sdb")
3069     disks = [sda_dev, sdb_dev]
3070   elif template_name == constants.DT_DRBD8:
3071     if len(secondary_nodes) != 1:
3072       raise errors.ProgrammerError("Wrong template configuration")
3073     remote_node = secondary_nodes[0]
3074     (minor_pa, minor_pb,
3075      minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3076       [primary_node, primary_node, remote_node, remote_node], instance_name)
3077
3078     names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3079                                       ".sdb_data", ".sdb_meta"])
3080     drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3081                                         disk_sz, names[0:2], "sda",
3082                                         minor_pa, minor_sa)
3083     drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3084                                         swap_sz, names[2:4], "sdb",
3085                                         minor_pb, minor_sb)
3086     disks = [drbd_sda_dev, drbd_sdb_dev]
3087   elif template_name == constants.DT_FILE:
3088     if len(secondary_nodes) != 0:
3089       raise errors.ProgrammerError("Wrong template configuration")
3090
3091     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3092                                 iv_name="sda", logical_id=(file_driver,
3093                                 "%s/sda" % file_storage_dir))
3094     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3095                                 iv_name="sdb", logical_id=(file_driver,
3096                                 "%s/sdb" % file_storage_dir))
3097     disks = [file_sda_dev, file_sdb_dev]
3098   else:
3099     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3100   return disks
3101
3102
3103 def _GetInstanceInfoText(instance):
3104   """Compute that text that should be added to the disk's metadata.
3105
3106   """
3107   return "originstname+%s" % instance.name
3108
3109
3110 def _CreateDisks(lu, instance):
3111   """Create all disks for an instance.
3112
3113   This abstracts away some work from AddInstance.
3114
3115   Args:
3116     instance: the instance object
3117
3118   Returns:
3119     True or False showing the success of the creation process
3120
3121   """
3122   info = _GetInstanceInfoText(instance)
3123
3124   if instance.disk_template == constants.DT_FILE:
3125     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3126     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3127                                                  file_storage_dir)
3128
3129     if not result:
3130       logger.Error("Could not connect to node '%s'" % instance.primary_node)
3131       return False
3132
3133     if not result[0]:
3134       logger.Error("failed to create directory '%s'" % file_storage_dir)
3135       return False
3136
3137   for device in instance.disks:
3138     logger.Info("creating volume %s for instance %s" %
3139                 (device.iv_name, instance.name))
3140     #HARDCODE
3141     for secondary_node in instance.secondary_nodes:
3142       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3143                                         device, False, info):
3144         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3145                      (device.iv_name, device, secondary_node))
3146         return False
3147     #HARDCODE
3148     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3149                                     instance, device, info):
3150       logger.Error("failed to create volume %s on primary!" %
3151                    device.iv_name)
3152       return False
3153
3154   return True
3155
3156
3157 def _RemoveDisks(lu, instance):
3158   """Remove all disks for an instance.
3159
3160   This abstracts away some work from `AddInstance()` and
3161   `RemoveInstance()`. Note that in case some of the devices couldn't
3162   be removed, the removal will continue with the other ones (compare
3163   with `_CreateDisks()`).
3164
3165   Args:
3166     instance: the instance object
3167
3168   Returns:
3169     True or False showing the success of the removal proces
3170
3171   """
3172   logger.Info("removing block devices for instance %s" % instance.name)
3173
3174   result = True
3175   for device in instance.disks:
3176     for node, disk in device.ComputeNodeTree(instance.primary_node):
3177       lu.cfg.SetDiskID(disk, node)
3178       if not lu.rpc.call_blockdev_remove(node, disk):
3179         logger.Error("could not remove block device %s on node %s,"
3180                      " continuing anyway" %
3181                      (device.iv_name, node))
3182         result = False
3183
3184   if instance.disk_template == constants.DT_FILE:
3185     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3186     if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3187                                                file_storage_dir):
3188       logger.Error("could not remove directory '%s'" % file_storage_dir)
3189       result = False
3190
3191   return result
3192
3193
3194 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3195   """Compute disk size requirements in the volume group
3196
3197   This is currently hard-coded for the two-drive layout.
3198
3199   """
3200   # Required free disk space as a function of disk and swap space
3201   req_size_dict = {
3202     constants.DT_DISKLESS: None,
3203     constants.DT_PLAIN: disk_size + swap_size,
3204     # 256 MB are added for drbd metadata, 128MB for each drbd device
3205     constants.DT_DRBD8: disk_size + swap_size + 256,
3206     constants.DT_FILE: None,
3207   }
3208
3209   if disk_template not in req_size_dict:
3210     raise errors.ProgrammerError("Disk template '%s' size requirement"
3211                                  " is unknown" %  disk_template)
3212
3213   return req_size_dict[disk_template]
3214
3215
3216 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3217   """Hypervisor parameter validation.
3218
3219   This function abstract the hypervisor parameter validation to be
3220   used in both instance create and instance modify.
3221
3222   @type lu: L{LogicalUnit}
3223   @param lu: the logical unit for which we check
3224   @type nodenames: list
3225   @param nodenames: the list of nodes on which we should check
3226   @type hvname: string
3227   @param hvname: the name of the hypervisor we should use
3228   @type hvparams: dict
3229   @param hvparams: the parameters which we need to check
3230   @raise errors.OpPrereqError: if the parameters are not valid
3231
3232   """
3233   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3234                                                   hvname,
3235                                                   hvparams)
3236   for node in nodenames:
3237     info = hvinfo.get(node, None)
3238     if not info or not isinstance(info, (tuple, list)):
3239       raise errors.OpPrereqError("Cannot get current information"
3240                                  " from node '%s' (%s)" % (node, info))
3241     if not info[0]:
3242       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3243                                  " %s" % info[1])
3244
3245
3246 class LUCreateInstance(LogicalUnit):
3247   """Create an instance.
3248
3249   """
3250   HPATH = "instance-add"
3251   HTYPE = constants.HTYPE_INSTANCE
3252   _OP_REQP = ["instance_name", "disk_size",
3253               "disk_template", "swap_size", "mode", "start",
3254               "wait_for_sync", "ip_check", "mac",
3255               "hvparams", "beparams"]
3256   REQ_BGL = False
3257
3258   def _ExpandNode(self, node):
3259     """Expands and checks one node name.
3260
3261     """
3262     node_full = self.cfg.ExpandNodeName(node)
3263     if node_full is None:
3264       raise errors.OpPrereqError("Unknown node %s" % node)
3265     return node_full
3266
3267   def ExpandNames(self):
3268     """ExpandNames for CreateInstance.
3269
3270     Figure out the right locks for instance creation.
3271
3272     """
3273     self.needed_locks = {}
3274
3275     # set optional parameters to none if they don't exist
3276     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3277       if not hasattr(self.op, attr):
3278         setattr(self.op, attr, None)
3279
3280     # cheap checks, mostly valid constants given
3281
3282     # verify creation mode
3283     if self.op.mode not in (constants.INSTANCE_CREATE,
3284                             constants.INSTANCE_IMPORT):
3285       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3286                                  self.op.mode)
3287
3288     # disk template and mirror node verification
3289     if self.op.disk_template not in constants.DISK_TEMPLATES:
3290       raise errors.OpPrereqError("Invalid disk template name")
3291
3292     if self.op.hypervisor is None:
3293       self.op.hypervisor = self.cfg.GetHypervisorType()
3294
3295     cluster = self.cfg.GetClusterInfo()
3296     enabled_hvs = cluster.enabled_hypervisors
3297     if self.op.hypervisor not in enabled_hvs:
3298       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3299                                  " cluster (%s)" % (self.op.hypervisor,
3300                                   ",".join(enabled_hvs)))
3301
3302     # check hypervisor parameter syntax (locally)
3303
3304     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3305                                   self.op.hvparams)
3306     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3307     hv_type.CheckParameterSyntax(filled_hvp)
3308
3309     # fill and remember the beparams dict
3310     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3311                                     self.op.beparams)
3312
3313     #### instance parameters check
3314
3315     # instance name verification
3316     hostname1 = utils.HostInfo(self.op.instance_name)
3317     self.op.instance_name = instance_name = hostname1.name
3318
3319     # this is just a preventive check, but someone might still add this
3320     # instance in the meantime, and creation will fail at lock-add time
3321     if instance_name in self.cfg.GetInstanceList():
3322       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3323                                  instance_name)
3324
3325     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3326
3327     # ip validity checks
3328     ip = getattr(self.op, "ip", None)
3329     if ip is None or ip.lower() == "none":
3330       inst_ip = None
3331     elif ip.lower() == "auto":
3332       inst_ip = hostname1.ip
3333     else:
3334       if not utils.IsValidIP(ip):
3335         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3336                                    " like a valid IP" % ip)
3337       inst_ip = ip
3338     self.inst_ip = self.op.ip = inst_ip
3339     # used in CheckPrereq for ip ping check
3340     self.check_ip = hostname1.ip
3341
3342     # MAC address verification
3343     if self.op.mac != "auto":
3344       if not utils.IsValidMac(self.op.mac.lower()):
3345         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3346                                    self.op.mac)
3347
3348     # file storage checks
3349     if (self.op.file_driver and
3350         not self.op.file_driver in constants.FILE_DRIVER):
3351       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3352                                  self.op.file_driver)
3353
3354     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3355       raise errors.OpPrereqError("File storage directory path not absolute")
3356
3357     ### Node/iallocator related checks
3358     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3359       raise errors.OpPrereqError("One and only one of iallocator and primary"
3360                                  " node must be given")
3361
3362     if self.op.iallocator:
3363       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3364     else:
3365       self.op.pnode = self._ExpandNode(self.op.pnode)
3366       nodelist = [self.op.pnode]
3367       if self.op.snode is not None:
3368         self.op.snode = self._ExpandNode(self.op.snode)
3369         nodelist.append(self.op.snode)
3370       self.needed_locks[locking.LEVEL_NODE] = nodelist
3371
3372     # in case of import lock the source node too
3373     if self.op.mode == constants.INSTANCE_IMPORT:
3374       src_node = getattr(self.op, "src_node", None)
3375       src_path = getattr(self.op, "src_path", None)
3376
3377       if src_node is None or src_path is None:
3378         raise errors.OpPrereqError("Importing an instance requires source"
3379                                    " node and path options")
3380
3381       if not os.path.isabs(src_path):
3382         raise errors.OpPrereqError("The source path must be absolute")
3383
3384       self.op.src_node = src_node = self._ExpandNode(src_node)
3385       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3386         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3387
3388     else: # INSTANCE_CREATE
3389       if getattr(self.op, "os_type", None) is None:
3390         raise errors.OpPrereqError("No guest OS specified")
3391
3392   def _RunAllocator(self):
3393     """Run the allocator based on input opcode.
3394
3395     """
3396     disks = [{"size": self.op.disk_size, "mode": "w"},
3397              {"size": self.op.swap_size, "mode": "w"}]
3398     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3399              "bridge": self.op.bridge}]
3400     ial = IAllocator(self,
3401                      mode=constants.IALLOCATOR_MODE_ALLOC,
3402                      name=self.op.instance_name,
3403                      disk_template=self.op.disk_template,
3404                      tags=[],
3405                      os=self.op.os_type,
3406                      vcpus=self.be_full[constants.BE_VCPUS],
3407                      mem_size=self.be_full[constants.BE_MEMORY],
3408                      disks=disks,
3409                      nics=nics,
3410                      )
3411
3412     ial.Run(self.op.iallocator)
3413
3414     if not ial.success:
3415       raise errors.OpPrereqError("Can't compute nodes using"
3416                                  " iallocator '%s': %s" % (self.op.iallocator,
3417                                                            ial.info))
3418     if len(ial.nodes) != ial.required_nodes:
3419       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3420                                  " of nodes (%s), required %s" %
3421                                  (self.op.iallocator, len(ial.nodes),
3422                                   ial.required_nodes))
3423     self.op.pnode = ial.nodes[0]
3424     logger.ToStdout("Selected nodes for the instance: %s" %
3425                     (", ".join(ial.nodes),))
3426     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3427                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3428     if ial.required_nodes == 2:
3429       self.op.snode = ial.nodes[1]
3430
3431   def BuildHooksEnv(self):
3432     """Build hooks env.
3433
3434     This runs on master, primary and secondary nodes of the instance.
3435
3436     """
3437     env = {
3438       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3439       "INSTANCE_DISK_SIZE": self.op.disk_size,
3440       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3441       "INSTANCE_ADD_MODE": self.op.mode,
3442       }
3443     if self.op.mode == constants.INSTANCE_IMPORT:
3444       env["INSTANCE_SRC_NODE"] = self.op.src_node
3445       env["INSTANCE_SRC_PATH"] = self.op.src_path
3446       env["INSTANCE_SRC_IMAGE"] = self.src_image
3447
3448     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3449       primary_node=self.op.pnode,
3450       secondary_nodes=self.secondaries,
3451       status=self.instance_status,
3452       os_type=self.op.os_type,
3453       memory=self.be_full[constants.BE_MEMORY],
3454       vcpus=self.be_full[constants.BE_VCPUS],
3455       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3456     ))
3457
3458     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3459           self.secondaries)
3460     return env, nl, nl
3461
3462
3463   def CheckPrereq(self):
3464     """Check prerequisites.
3465
3466     """
3467     if (not self.cfg.GetVGName() and
3468         self.op.disk_template not in constants.DTS_NOT_LVM):
3469       raise errors.OpPrereqError("Cluster does not support lvm-based"
3470                                  " instances")
3471
3472
3473     if self.op.mode == constants.INSTANCE_IMPORT:
3474       src_node = self.op.src_node
3475       src_path = self.op.src_path
3476
3477       export_info = self.rpc.call_export_info(src_node, src_path)
3478
3479       if not export_info:
3480         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3481
3482       if not export_info.has_section(constants.INISECT_EXP):
3483         raise errors.ProgrammerError("Corrupted export config")
3484
3485       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3486       if (int(ei_version) != constants.EXPORT_VERSION):
3487         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3488                                    (ei_version, constants.EXPORT_VERSION))
3489
3490       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3491         raise errors.OpPrereqError("Can't import instance with more than"
3492                                    " one data disk")
3493
3494       # FIXME: are the old os-es, disk sizes, etc. useful?
3495       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3496       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3497                                                          'disk0_dump'))
3498       self.src_image = diskimage
3499
3500     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3501
3502     if self.op.start and not self.op.ip_check:
3503       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3504                                  " adding an instance in start mode")
3505
3506     if self.op.ip_check:
3507       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3508         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3509                                    (self.check_ip, self.op.instance_name))
3510
3511     # bridge verification
3512     bridge = getattr(self.op, "bridge", None)
3513     if bridge is None:
3514       self.op.bridge = self.cfg.GetDefBridge()
3515     else:
3516       self.op.bridge = bridge
3517
3518     #### allocator run
3519
3520     if self.op.iallocator is not None:
3521       self._RunAllocator()
3522
3523     #### node related checks
3524
3525     # check primary node
3526     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3527     assert self.pnode is not None, \
3528       "Cannot retrieve locked node %s" % self.op.pnode
3529     self.secondaries = []
3530
3531     # mirror node verification
3532     if self.op.disk_template in constants.DTS_NET_MIRROR:
3533       if self.op.snode is None:
3534         raise errors.OpPrereqError("The networked disk templates need"
3535                                    " a mirror node")
3536       if self.op.snode == pnode.name:
3537         raise errors.OpPrereqError("The secondary node cannot be"
3538                                    " the primary node.")
3539       self.secondaries.append(self.op.snode)
3540
3541     nodenames = [pnode.name] + self.secondaries
3542
3543     req_size = _ComputeDiskSize(self.op.disk_template,
3544                                 self.op.disk_size, self.op.swap_size)
3545
3546     # Check lv size requirements
3547     if req_size is not None:
3548       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3549                                          self.op.hypervisor)
3550       for node in nodenames:
3551         info = nodeinfo.get(node, None)
3552         if not info:
3553           raise errors.OpPrereqError("Cannot get current information"
3554                                      " from node '%s'" % node)
3555         vg_free = info.get('vg_free', None)
3556         if not isinstance(vg_free, int):
3557           raise errors.OpPrereqError("Can't compute free disk space on"
3558                                      " node %s" % node)
3559         if req_size > info['vg_free']:
3560           raise errors.OpPrereqError("Not enough disk space on target node %s."
3561                                      " %d MB available, %d MB required" %
3562                                      (node, info['vg_free'], req_size))
3563
3564     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3565
3566     # os verification
3567     os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3568     if not os_obj:
3569       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3570                                  " primary node"  % self.op.os_type)
3571
3572     # bridge check on primary node
3573     if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3574       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3575                                  " destination node '%s'" %
3576                                  (self.op.bridge, pnode.name))
3577
3578     # memory check on primary node
3579     if self.op.start:
3580       _CheckNodeFreeMemory(self, self.pnode.name,
3581                            "creating instance %s" % self.op.instance_name,
3582                            self.be_full[constants.BE_MEMORY],
3583                            self.op.hypervisor)
3584
3585     if self.op.start:
3586       self.instance_status = 'up'
3587     else:
3588       self.instance_status = 'down'
3589
3590   def Exec(self, feedback_fn):
3591     """Create and add the instance to the cluster.
3592
3593     """
3594     instance = self.op.instance_name
3595     pnode_name = self.pnode.name
3596
3597     if self.op.mac == "auto":
3598       mac_address = self.cfg.GenerateMAC()
3599     else:
3600       mac_address = self.op.mac
3601
3602     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3603     if self.inst_ip is not None:
3604       nic.ip = self.inst_ip
3605
3606     ht_kind = self.op.hypervisor
3607     if ht_kind in constants.HTS_REQ_PORT:
3608       network_port = self.cfg.AllocatePort()
3609     else:
3610       network_port = None
3611
3612     ##if self.op.vnc_bind_address is None:
3613     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3614
3615     # this is needed because os.path.join does not accept None arguments
3616     if self.op.file_storage_dir is None:
3617       string_file_storage_dir = ""
3618     else:
3619       string_file_storage_dir = self.op.file_storage_dir
3620
3621     # build the full file storage dir path
3622     file_storage_dir = os.path.normpath(os.path.join(
3623                                         self.cfg.GetFileStorageDir(),
3624                                         string_file_storage_dir, instance))
3625
3626
3627     disks = _GenerateDiskTemplate(self,
3628                                   self.op.disk_template,
3629                                   instance, pnode_name,
3630                                   self.secondaries, self.op.disk_size,
3631                                   self.op.swap_size,
3632                                   file_storage_dir,
3633                                   self.op.file_driver)
3634
3635     iobj = objects.Instance(name=instance, os=self.op.os_type,
3636                             primary_node=pnode_name,
3637                             nics=[nic], disks=disks,
3638                             disk_template=self.op.disk_template,
3639                             status=self.instance_status,
3640                             network_port=network_port,
3641                             beparams=self.op.beparams,
3642                             hvparams=self.op.hvparams,
3643                             hypervisor=self.op.hypervisor,
3644                             )
3645
3646     feedback_fn("* creating instance disks...")
3647     if not _CreateDisks(self, iobj):
3648       _RemoveDisks(self, iobj)
3649       self.cfg.ReleaseDRBDMinors(instance)
3650       raise errors.OpExecError("Device creation failed, reverting...")
3651
3652     feedback_fn("adding instance %s to cluster config" % instance)
3653
3654     self.cfg.AddInstance(iobj)
3655     # Declare that we don't want to remove the instance lock anymore, as we've
3656     # added the instance to the config
3657     del self.remove_locks[locking.LEVEL_INSTANCE]
3658     # Remove the temp. assignements for the instance's drbds
3659     self.cfg.ReleaseDRBDMinors(instance)
3660
3661     if self.op.wait_for_sync:
3662       disk_abort = not _WaitForSync(self, iobj)
3663     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3664       # make sure the disks are not degraded (still sync-ing is ok)
3665       time.sleep(15)
3666       feedback_fn("* checking mirrors status")
3667       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3668     else:
3669       disk_abort = False
3670
3671     if disk_abort:
3672       _RemoveDisks(self, iobj)
3673       self.cfg.RemoveInstance(iobj.name)
3674       # Make sure the instance lock gets removed
3675       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3676       raise errors.OpExecError("There are some degraded disks for"
3677                                " this instance")
3678
3679     feedback_fn("creating os for instance %s on node %s" %
3680                 (instance, pnode_name))
3681
3682     if iobj.disk_template != constants.DT_DISKLESS:
3683       if self.op.mode == constants.INSTANCE_CREATE:
3684         feedback_fn("* running the instance OS create scripts...")
3685         if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3686           raise errors.OpExecError("could not add os for instance %s"
3687                                    " on node %s" %
3688                                    (instance, pnode_name))
3689
3690       elif self.op.mode == constants.INSTANCE_IMPORT:
3691         feedback_fn("* running the instance OS import scripts...")
3692         src_node = self.op.src_node
3693         src_image = self.src_image
3694         cluster_name = self.cfg.GetClusterName()
3695         if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3696                                                 src_node, src_image,
3697                                                 cluster_name):
3698           raise errors.OpExecError("Could not import os for instance"
3699                                    " %s on node %s" %
3700                                    (instance, pnode_name))
3701       else:
3702         # also checked in the prereq part
3703         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3704                                      % self.op.mode)
3705
3706     if self.op.start:
3707       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3708       feedback_fn("* starting instance...")
3709       if not self.rpc.call_instance_start(pnode_name, iobj, None):
3710         raise errors.OpExecError("Could not start instance")
3711
3712
3713 class LUConnectConsole(NoHooksLU):
3714   """Connect to an instance's console.
3715
3716   This is somewhat special in that it returns the command line that
3717   you need to run on the master node in order to connect to the
3718   console.
3719
3720   """
3721   _OP_REQP = ["instance_name"]
3722   REQ_BGL = False
3723
3724   def ExpandNames(self):
3725     self._ExpandAndLockInstance()
3726
3727   def CheckPrereq(self):
3728     """Check prerequisites.
3729
3730     This checks that the instance is in the cluster.
3731
3732     """
3733     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3734     assert self.instance is not None, \
3735       "Cannot retrieve locked instance %s" % self.op.instance_name
3736
3737   def Exec(self, feedback_fn):
3738     """Connect to the console of an instance
3739
3740     """
3741     instance = self.instance
3742     node = instance.primary_node
3743
3744     node_insts = self.rpc.call_instance_list([node],
3745                                              [instance.hypervisor])[node]
3746     if node_insts is False:
3747       raise errors.OpExecError("Can't connect to node %s." % node)
3748
3749     if instance.name not in node_insts:
3750       raise errors.OpExecError("Instance %s is not running." % instance.name)
3751
3752     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3753
3754     hyper = hypervisor.GetHypervisor(instance.hypervisor)
3755     console_cmd = hyper.GetShellCommandForConsole(instance)
3756
3757     # build ssh cmdline
3758     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3759
3760
3761 class LUReplaceDisks(LogicalUnit):
3762   """Replace the disks of an instance.
3763
3764   """
3765   HPATH = "mirrors-replace"
3766   HTYPE = constants.HTYPE_INSTANCE
3767   _OP_REQP = ["instance_name", "mode", "disks"]
3768   REQ_BGL = False
3769
3770   def ExpandNames(self):
3771     self._ExpandAndLockInstance()
3772
3773     if not hasattr(self.op, "remote_node"):
3774       self.op.remote_node = None
3775
3776     ia_name = getattr(self.op, "iallocator", None)
3777     if ia_name is not None:
3778       if self.op.remote_node is not None:
3779         raise errors.OpPrereqError("Give either the iallocator or the new"
3780                                    " secondary, not both")
3781       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3782     elif self.op.remote_node is not None:
3783       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3784       if remote_node is None:
3785         raise errors.OpPrereqError("Node '%s' not known" %
3786                                    self.op.remote_node)
3787       self.op.remote_node = remote_node
3788       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3789       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3790     else:
3791       self.needed_locks[locking.LEVEL_NODE] = []
3792       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3793
3794   def DeclareLocks(self, level):
3795     # If we're not already locking all nodes in the set we have to declare the
3796     # instance's primary/secondary nodes.
3797     if (level == locking.LEVEL_NODE and
3798         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3799       self._LockInstancesNodes()
3800
3801   def _RunAllocator(self):
3802     """Compute a new secondary node using an IAllocator.
3803
3804     """
3805     ial = IAllocator(self,
3806                      mode=constants.IALLOCATOR_MODE_RELOC,
3807                      name=self.op.instance_name,
3808                      relocate_from=[self.sec_node])
3809
3810     ial.Run(self.op.iallocator)
3811
3812     if not ial.success:
3813       raise errors.OpPrereqError("Can't compute nodes using"
3814                                  " iallocator '%s': %s" % (self.op.iallocator,
3815                                                            ial.info))
3816     if len(ial.nodes) != ial.required_nodes:
3817       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3818                                  " of nodes (%s), required %s" %
3819                                  (len(ial.nodes), ial.required_nodes))
3820     self.op.remote_node = ial.nodes[0]
3821     logger.ToStdout("Selected new secondary for the instance: %s" %
3822                     self.op.remote_node)
3823
3824   def BuildHooksEnv(self):
3825     """Build hooks env.
3826
3827     This runs on the master, the primary and all the secondaries.
3828
3829     """
3830     env = {
3831       "MODE": self.op.mode,
3832       "NEW_SECONDARY": self.op.remote_node,
3833       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3834       }
3835     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3836     nl = [
3837       self.cfg.GetMasterNode(),
3838       self.instance.primary_node,
3839       ]
3840     if self.op.remote_node is not None:
3841       nl.append(self.op.remote_node)
3842     return env, nl, nl
3843
3844   def CheckPrereq(self):
3845     """Check prerequisites.
3846
3847     This checks that the instance is in the cluster.
3848
3849     """
3850     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3851     assert instance is not None, \
3852       "Cannot retrieve locked instance %s" % self.op.instance_name
3853     self.instance = instance
3854
3855     if instance.disk_template not in constants.DTS_NET_MIRROR:
3856       raise errors.OpPrereqError("Instance's disk layout is not"
3857                                  " network mirrored.")
3858
3859     if len(instance.secondary_nodes) != 1:
3860       raise errors.OpPrereqError("The instance has a strange layout,"
3861                                  " expected one secondary but found %d" %
3862                                  len(instance.secondary_nodes))
3863
3864     self.sec_node = instance.secondary_nodes[0]
3865
3866     ia_name = getattr(self.op, "iallocator", None)
3867     if ia_name is not None:
3868       self._RunAllocator()
3869
3870     remote_node = self.op.remote_node
3871     if remote_node is not None:
3872       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3873       assert self.remote_node_info is not None, \
3874         "Cannot retrieve locked node %s" % remote_node
3875     else:
3876       self.remote_node_info = None
3877     if remote_node == instance.primary_node:
3878       raise errors.OpPrereqError("The specified node is the primary node of"
3879                                  " the instance.")
3880     elif remote_node == self.sec_node:
3881       if self.op.mode == constants.REPLACE_DISK_SEC:
3882         # this is for DRBD8, where we can't execute the same mode of
3883         # replacement as for drbd7 (no different port allocated)
3884         raise errors.OpPrereqError("Same secondary given, cannot execute"
3885                                    " replacement")
3886     if instance.disk_template == constants.DT_DRBD8:
3887       if (self.op.mode == constants.REPLACE_DISK_ALL and
3888           remote_node is not None):
3889         # switch to replace secondary mode
3890         self.op.mode = constants.REPLACE_DISK_SEC
3891
3892       if self.op.mode == constants.REPLACE_DISK_ALL:
3893         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3894                                    " secondary disk replacement, not"
3895                                    " both at once")
3896       elif self.op.mode == constants.REPLACE_DISK_PRI:
3897         if remote_node is not None:
3898           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3899                                      " the secondary while doing a primary"
3900                                      " node disk replacement")
3901         self.tgt_node = instance.primary_node
3902         self.oth_node = instance.secondary_nodes[0]
3903       elif self.op.mode == constants.REPLACE_DISK_SEC:
3904         self.new_node = remote_node # this can be None, in which case
3905                                     # we don't change the secondary
3906         self.tgt_node = instance.secondary_nodes[0]
3907         self.oth_node = instance.primary_node
3908       else:
3909         raise errors.ProgrammerError("Unhandled disk replace mode")
3910
3911     for name in self.op.disks:
3912       if instance.FindDisk(name) is None:
3913         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3914                                    (name, instance.name))
3915
3916   def _ExecD8DiskOnly(self, feedback_fn):
3917     """Replace a disk on the primary or secondary for dbrd8.
3918
3919     The algorithm for replace is quite complicated:
3920       - for each disk to be replaced:
3921         - create new LVs on the target node with unique names
3922         - detach old LVs from the drbd device
3923         - rename old LVs to name_replaced.<time_t>
3924         - rename new LVs to old LVs
3925         - attach the new LVs (with the old names now) to the drbd device
3926       - wait for sync across all devices
3927       - for each modified disk:
3928         - remove old LVs (which have the name name_replaces.<time_t>)
3929
3930     Failures are not very well handled.
3931
3932     """
3933     steps_total = 6
3934     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3935     instance = self.instance
3936     iv_names = {}
3937     vgname = self.cfg.GetVGName()
3938     # start of work
3939     cfg = self.cfg
3940     tgt_node = self.tgt_node
3941     oth_node = self.oth_node
3942
3943     # Step: check device activation
3944     self.proc.LogStep(1, steps_total, "check device existence")
3945     info("checking volume groups")
3946     my_vg = cfg.GetVGName()
3947     results = self.rpc.call_vg_list([oth_node, tgt_node])
3948     if not results:
3949       raise errors.OpExecError("Can't list volume groups on the nodes")
3950     for node in oth_node, tgt_node:
3951       res = results.get(node, False)
3952       if not res or my_vg not in res:
3953         raise errors.OpExecError("Volume group '%s' not found on %s" %
3954                                  (my_vg, node))
3955     for dev in instance.disks:
3956       if not dev.iv_name in self.op.disks:
3957         continue
3958       for node in tgt_node, oth_node:
3959         info("checking %s on %s" % (dev.iv_name, node))
3960         cfg.SetDiskID(dev, node)
3961         if not self.rpc.call_blockdev_find(node, dev):
3962           raise errors.OpExecError("Can't find device %s on node %s" %
3963                                    (dev.iv_name, node))
3964
3965     # Step: check other node consistency
3966     self.proc.LogStep(2, steps_total, "check peer consistency")
3967     for dev in instance.disks:
3968       if not dev.iv_name in self.op.disks:
3969         continue
3970       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3971       if not _CheckDiskConsistency(self, dev, oth_node,
3972                                    oth_node==instance.primary_node):
3973         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3974                                  " to replace disks on this node (%s)" %
3975                                  (oth_node, tgt_node))
3976
3977     # Step: create new storage
3978     self.proc.LogStep(3, steps_total, "allocate new storage")
3979     for dev in instance.disks:
3980       if not dev.iv_name in self.op.disks:
3981         continue
3982       size = dev.size
3983       cfg.SetDiskID(dev, tgt_node)
3984       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3985       names = _GenerateUniqueNames(self, lv_names)
3986       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3987                              logical_id=(vgname, names[0]))
3988       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3989                              logical_id=(vgname, names[1]))
3990       new_lvs = [lv_data, lv_meta]
3991       old_lvs = dev.children
3992       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3993       info("creating new local storage on %s for %s" %
3994            (tgt_node, dev.iv_name))
3995       # since we *always* want to create this LV, we use the
3996       # _Create...OnPrimary (which forces the creation), even if we
3997       # are talking about the secondary node
3998       for new_lv in new_lvs:
3999         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4000                                         _GetInstanceInfoText(instance)):
4001           raise errors.OpExecError("Failed to create new LV named '%s' on"
4002                                    " node '%s'" %
4003                                    (new_lv.logical_id[1], tgt_node))
4004
4005     # Step: for each lv, detach+rename*2+attach
4006     self.proc.LogStep(4, steps_total, "change drbd configuration")
4007     for dev, old_lvs, new_lvs in iv_names.itervalues():
4008       info("detaching %s drbd from local storage" % dev.iv_name)
4009       if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4010         raise errors.OpExecError("Can't detach drbd from local storage on node"
4011                                  " %s for device %s" % (tgt_node, dev.iv_name))
4012       #dev.children = []
4013       #cfg.Update(instance)
4014
4015       # ok, we created the new LVs, so now we know we have the needed
4016       # storage; as such, we proceed on the target node to rename
4017       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4018       # using the assumption that logical_id == physical_id (which in
4019       # turn is the unique_id on that node)
4020
4021       # FIXME(iustin): use a better name for the replaced LVs
4022       temp_suffix = int(time.time())
4023       ren_fn = lambda d, suff: (d.physical_id[0],
4024                                 d.physical_id[1] + "_replaced-%s" % suff)
4025       # build the rename list based on what LVs exist on the node
4026       rlist = []
4027       for to_ren in old_lvs:
4028         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4029         if find_res is not None: # device exists
4030           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4031
4032       info("renaming the old LVs on the target node")
4033       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4034         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4035       # now we rename the new LVs to the old LVs
4036       info("renaming the new LVs on the target node")
4037       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4038       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4039         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4040
4041       for old, new in zip(old_lvs, new_lvs):
4042         new.logical_id = old.logical_id
4043         cfg.SetDiskID(new, tgt_node)
4044
4045       for disk in old_lvs:
4046         disk.logical_id = ren_fn(disk, temp_suffix)
4047         cfg.SetDiskID(disk, tgt_node)
4048
4049       # now that the new lvs have the old name, we can add them to the device
4050       info("adding new mirror component on %s" % tgt_node)
4051       if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4052         for new_lv in new_lvs:
4053           if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4054             warning("Can't rollback device %s", hint="manually cleanup unused"
4055                     " logical volumes")
4056         raise errors.OpExecError("Can't add local storage to drbd")
4057
4058       dev.children = new_lvs
4059       cfg.Update(instance)
4060
4061     # Step: wait for sync
4062
4063     # this can fail as the old devices are degraded and _WaitForSync
4064     # does a combined result over all disks, so we don't check its
4065     # return value
4066     self.proc.LogStep(5, steps_total, "sync devices")
4067     _WaitForSync(self, instance, unlock=True)
4068
4069     # so check manually all the devices
4070     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4071       cfg.SetDiskID(dev, instance.primary_node)
4072       is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4073       if is_degr:
4074         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4075
4076     # Step: remove old storage
4077     self.proc.LogStep(6, steps_total, "removing old storage")
4078     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4079       info("remove logical volumes for %s" % name)
4080       for lv in old_lvs:
4081         cfg.SetDiskID(lv, tgt_node)
4082         if not self.rpc.call_blockdev_remove(tgt_node, lv):
4083           warning("Can't remove old LV", hint="manually remove unused LVs")
4084           continue
4085
4086   def _ExecD8Secondary(self, feedback_fn):
4087     """Replace the secondary node for drbd8.
4088
4089     The algorithm for replace is quite complicated:
4090       - for all disks of the instance:
4091         - create new LVs on the new node with same names
4092         - shutdown the drbd device on the old secondary
4093         - disconnect the drbd network on the primary
4094         - create the drbd device on the new secondary
4095         - network attach the drbd on the primary, using an artifice:
4096           the drbd code for Attach() will connect to the network if it
4097           finds a device which is connected to the good local disks but
4098           not network enabled
4099       - wait for sync across all devices
4100       - remove all disks from the old secondary
4101
4102     Failures are not very well handled.
4103
4104     """
4105     steps_total = 6
4106     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4107     instance = self.instance
4108     iv_names = {}
4109     vgname = self.cfg.GetVGName()
4110     # start of work
4111     cfg = self.cfg
4112     old_node = self.tgt_node
4113     new_node = self.new_node
4114     pri_node = instance.primary_node
4115
4116     # Step: check device activation
4117     self.proc.LogStep(1, steps_total, "check device existence")
4118     info("checking volume groups")
4119     my_vg = cfg.GetVGName()
4120     results = self.rpc.call_vg_list([pri_node, new_node])
4121     if not results:
4122       raise errors.OpExecError("Can't list volume groups on the nodes")
4123     for node in pri_node, new_node:
4124       res = results.get(node, False)
4125       if not res or my_vg not in res:
4126         raise errors.OpExecError("Volume group '%s' not found on %s" %
4127                                  (my_vg, node))
4128     for dev in instance.disks:
4129       if not dev.iv_name in self.op.disks:
4130         continue
4131       info("checking %s on %s" % (dev.iv_name, pri_node))
4132       cfg.SetDiskID(dev, pri_node)
4133       if not self.rpc.call_blockdev_find(pri_node, dev):
4134         raise errors.OpExecError("Can't find device %s on node %s" %
4135                                  (dev.iv_name, pri_node))
4136
4137     # Step: check other node consistency
4138     self.proc.LogStep(2, steps_total, "check peer consistency")
4139     for dev in instance.disks:
4140       if not dev.iv_name in self.op.disks:
4141         continue
4142       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4143       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4144         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4145                                  " unsafe to replace the secondary" %
4146                                  pri_node)
4147
4148     # Step: create new storage
4149     self.proc.LogStep(3, steps_total, "allocate new storage")
4150     for dev in instance.disks:
4151       size = dev.size
4152       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4153       # since we *always* want to create this LV, we use the
4154       # _Create...OnPrimary (which forces the creation), even if we
4155       # are talking about the secondary node
4156       for new_lv in dev.children:
4157         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4158                                         _GetInstanceInfoText(instance)):
4159           raise errors.OpExecError("Failed to create new LV named '%s' on"
4160                                    " node '%s'" %
4161                                    (new_lv.logical_id[1], new_node))
4162
4163
4164     # Step 4: dbrd minors and drbd setups changes
4165     # after this, we must manually remove the drbd minors on both the
4166     # error and the success paths
4167     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4168                                    instance.name)
4169     logging.debug("Allocated minors %s" % (minors,))
4170     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4171     for dev, new_minor in zip(instance.disks, minors):
4172       size = dev.size
4173       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4174       # create new devices on new_node
4175       if pri_node == dev.logical_id[0]:
4176         new_logical_id = (pri_node, new_node,
4177                           dev.logical_id[2], dev.logical_id[3], new_minor,
4178                           dev.logical_id[5])
4179       else:
4180         new_logical_id = (new_node, pri_node,
4181                           dev.logical_id[2], new_minor, dev.logical_id[4],
4182                           dev.logical_id[5])
4183       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4184       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4185                     new_logical_id)
4186       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4187                               logical_id=new_logical_id,
4188                               children=dev.children)
4189       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4190                                         new_drbd, False,
4191                                         _GetInstanceInfoText(instance)):
4192         self.cfg.ReleaseDRBDMinors(instance.name)
4193         raise errors.OpExecError("Failed to create new DRBD on"
4194                                  " node '%s'" % new_node)
4195
4196     for dev in instance.disks:
4197       # we have new devices, shutdown the drbd on the old secondary
4198       info("shutting down drbd for %s on old node" % dev.iv_name)
4199       cfg.SetDiskID(dev, old_node)
4200       if not self.rpc.call_blockdev_shutdown(old_node, dev):
4201         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4202                 hint="Please cleanup this device manually as soon as possible")
4203
4204     info("detaching primary drbds from the network (=> standalone)")
4205     done = 0
4206     for dev in instance.disks:
4207       cfg.SetDiskID(dev, pri_node)
4208       # set the network part of the physical (unique in bdev terms) id
4209       # to None, meaning detach from network
4210       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4211       # and 'find' the device, which will 'fix' it to match the
4212       # standalone state
4213       if self.rpc.call_blockdev_find(pri_node, dev):
4214         done += 1
4215       else:
4216         warning("Failed to detach drbd %s from network, unusual case" %
4217                 dev.iv_name)
4218
4219     if not done:
4220       # no detaches succeeded (very unlikely)
4221       self.cfg.ReleaseDRBDMinors(instance.name)
4222       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4223
4224     # if we managed to detach at least one, we update all the disks of
4225     # the instance to point to the new secondary
4226     info("updating instance configuration")
4227     for dev, _, new_logical_id in iv_names.itervalues():
4228       dev.logical_id = new_logical_id
4229       cfg.SetDiskID(dev, pri_node)
4230     cfg.Update(instance)
4231     # we can remove now the temp minors as now the new values are
4232     # written to the config file (and therefore stable)
4233     self.cfg.ReleaseDRBDMinors(instance.name)
4234
4235     # and now perform the drbd attach
4236     info("attaching primary drbds to new secondary (standalone => connected)")
4237     failures = []
4238     for dev in instance.disks:
4239       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4240       # since the attach is smart, it's enough to 'find' the device,
4241       # it will automatically activate the network, if the physical_id
4242       # is correct
4243       cfg.SetDiskID(dev, pri_node)
4244       logging.debug("Disk to attach: %s", dev)
4245       if not self.rpc.call_blockdev_find(pri_node, dev):
4246         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4247                 "please do a gnt-instance info to see the status of disks")
4248
4249     # this can fail as the old devices are degraded and _WaitForSync
4250     # does a combined result over all disks, so we don't check its
4251     # return value
4252     self.proc.LogStep(5, steps_total, "sync devices")
4253     _WaitForSync(self, instance, unlock=True)
4254
4255     # so check manually all the devices
4256     for name, (dev, old_lvs, _) in iv_names.iteritems():
4257       cfg.SetDiskID(dev, pri_node)
4258       is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4259       if is_degr:
4260         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4261
4262     self.proc.LogStep(6, steps_total, "removing old storage")
4263     for name, (dev, old_lvs, _) in iv_names.iteritems():
4264       info("remove logical volumes for %s" % name)
4265       for lv in old_lvs:
4266         cfg.SetDiskID(lv, old_node)
4267         if not self.rpc.call_blockdev_remove(old_node, lv):
4268           warning("Can't remove LV on old secondary",
4269                   hint="Cleanup stale volumes by hand")
4270
4271   def Exec(self, feedback_fn):
4272     """Execute disk replacement.
4273
4274     This dispatches the disk replacement to the appropriate handler.
4275
4276     """
4277     instance = self.instance
4278
4279     # Activate the instance disks if we're replacing them on a down instance
4280     if instance.status == "down":
4281       _StartInstanceDisks(self, instance, True)
4282
4283     if instance.disk_template == constants.DT_DRBD8:
4284       if self.op.remote_node is None:
4285         fn = self._ExecD8DiskOnly
4286       else:
4287         fn = self._ExecD8Secondary
4288     else:
4289       raise errors.ProgrammerError("Unhandled disk replacement case")
4290
4291     ret = fn(feedback_fn)
4292
4293     # Deactivate the instance disks if we're replacing them on a down instance
4294     if instance.status == "down":
4295       _SafeShutdownInstanceDisks(self, instance)
4296
4297     return ret
4298
4299
4300 class LUGrowDisk(LogicalUnit):
4301   """Grow a disk of an instance.
4302
4303   """
4304   HPATH = "disk-grow"
4305   HTYPE = constants.HTYPE_INSTANCE
4306   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4307   REQ_BGL = False
4308
4309   def ExpandNames(self):
4310     self._ExpandAndLockInstance()
4311     self.needed_locks[locking.LEVEL_NODE] = []
4312     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4313
4314   def DeclareLocks(self, level):
4315     if level == locking.LEVEL_NODE:
4316       self._LockInstancesNodes()
4317
4318   def BuildHooksEnv(self):
4319     """Build hooks env.
4320
4321     This runs on the master, the primary and all the secondaries.
4322
4323     """
4324     env = {
4325       "DISK": self.op.disk,
4326       "AMOUNT": self.op.amount,
4327       }
4328     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4329     nl = [
4330       self.cfg.GetMasterNode(),
4331       self.instance.primary_node,
4332       ]
4333     return env, nl, nl
4334
4335   def CheckPrereq(self):
4336     """Check prerequisites.
4337
4338     This checks that the instance is in the cluster.
4339
4340     """
4341     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4342     assert instance is not None, \
4343       "Cannot retrieve locked instance %s" % self.op.instance_name
4344
4345     self.instance = instance
4346
4347     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4348       raise errors.OpPrereqError("Instance's disk layout does not support"
4349                                  " growing.")
4350
4351     if instance.FindDisk(self.op.disk) is None:
4352       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4353                                  (self.op.disk, instance.name))
4354
4355     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4356     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4357                                        instance.hypervisor)
4358     for node in nodenames:
4359       info = nodeinfo.get(node, None)
4360       if not info:
4361         raise errors.OpPrereqError("Cannot get current information"
4362                                    " from node '%s'" % node)
4363       vg_free = info.get('vg_free', None)
4364       if not isinstance(vg_free, int):
4365         raise errors.OpPrereqError("Can't compute free disk space on"
4366                                    " node %s" % node)
4367       if self.op.amount > info['vg_free']:
4368         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4369                                    " %d MiB available, %d MiB required" %
4370                                    (node, info['vg_free'], self.op.amount))
4371
4372   def Exec(self, feedback_fn):
4373     """Execute disk grow.
4374
4375     """
4376     instance = self.instance
4377     disk = instance.FindDisk(self.op.disk)
4378     for node in (instance.secondary_nodes + (instance.primary_node,)):
4379       self.cfg.SetDiskID(disk, node)
4380       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4381       if (not result or not isinstance(result, (list, tuple)) or
4382           len(result) != 2):
4383         raise errors.OpExecError("grow request failed to node %s" % node)
4384       elif not result[0]:
4385         raise errors.OpExecError("grow request failed to node %s: %s" %
4386                                  (node, result[1]))
4387     disk.RecordGrow(self.op.amount)
4388     self.cfg.Update(instance)
4389     if self.op.wait_for_sync:
4390       disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4391       if disk_abort:
4392         logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4393                      " Please check the instance.")
4394
4395
4396 class LUQueryInstanceData(NoHooksLU):
4397   """Query runtime instance data.
4398
4399   """
4400   _OP_REQP = ["instances", "static"]
4401   REQ_BGL = False
4402
4403   def ExpandNames(self):
4404     self.needed_locks = {}
4405     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4406
4407     if not isinstance(self.op.instances, list):
4408       raise errors.OpPrereqError("Invalid argument type 'instances'")
4409
4410     if self.op.instances:
4411       self.wanted_names = []
4412       for name in self.op.instances:
4413         full_name = self.cfg.ExpandInstanceName(name)
4414         if full_name is None:
4415           raise errors.OpPrereqError("Instance '%s' not known" %
4416                                      self.op.instance_name)
4417         self.wanted_names.append(full_name)
4418       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4419     else:
4420       self.wanted_names = None
4421       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4422
4423     self.needed_locks[locking.LEVEL_NODE] = []
4424     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4425
4426   def DeclareLocks(self, level):
4427     if level == locking.LEVEL_NODE:
4428       self._LockInstancesNodes()
4429
4430   def CheckPrereq(self):
4431     """Check prerequisites.
4432
4433     This only checks the optional instance list against the existing names.
4434
4435     """
4436     if self.wanted_names is None:
4437       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4438
4439     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4440                              in self.wanted_names]
4441     return
4442
4443   def _ComputeDiskStatus(self, instance, snode, dev):
4444     """Compute block device status.
4445
4446     """
4447     static = self.op.static
4448     if not static:
4449       self.cfg.SetDiskID(dev, instance.primary_node)
4450       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4451     else:
4452       dev_pstatus = None
4453
4454     if dev.dev_type in constants.LDS_DRBD:
4455       # we change the snode then (otherwise we use the one passed in)
4456       if dev.logical_id[0] == instance.primary_node:
4457         snode = dev.logical_id[1]
4458       else:
4459         snode = dev.logical_id[0]
4460
4461     if snode and not static:
4462       self.cfg.SetDiskID(dev, snode)
4463       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4464     else:
4465       dev_sstatus = None
4466
4467     if dev.children:
4468       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4469                       for child in dev.children]
4470     else:
4471       dev_children = []
4472
4473     data = {
4474       "iv_name": dev.iv_name,
4475       "dev_type": dev.dev_type,
4476       "logical_id": dev.logical_id,
4477       "physical_id": dev.physical_id,
4478       "pstatus": dev_pstatus,
4479       "sstatus": dev_sstatus,
4480       "children": dev_children,
4481       }
4482
4483     return data
4484
4485   def Exec(self, feedback_fn):
4486     """Gather and return data"""
4487     result = {}
4488
4489     cluster = self.cfg.GetClusterInfo()
4490
4491     for instance in self.wanted_instances:
4492       if not self.op.static:
4493         remote_info = self.rpc.call_instance_info(instance.primary_node,
4494                                                   instance.name,
4495                                                   instance.hypervisor)
4496         if remote_info and "state" in remote_info:
4497           remote_state = "up"
4498         else:
4499           remote_state = "down"
4500       else:
4501         remote_state = None
4502       if instance.status == "down":
4503         config_state = "down"
4504       else:
4505         config_state = "up"
4506
4507       disks = [self._ComputeDiskStatus(instance, None, device)
4508                for device in instance.disks]
4509
4510       idict = {
4511         "name": instance.name,
4512         "config_state": config_state,
4513         "run_state": remote_state,
4514         "pnode": instance.primary_node,
4515         "snodes": instance.secondary_nodes,
4516         "os": instance.os,
4517         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4518         "disks": disks,
4519         "hypervisor": instance.hypervisor,
4520         "network_port": instance.network_port,
4521         "hv_instance": instance.hvparams,
4522         "hv_actual": cluster.FillHV(instance),
4523         "be_instance": instance.beparams,
4524         "be_actual": cluster.FillBE(instance),
4525         }
4526
4527       result[instance.name] = idict
4528
4529     return result
4530
4531
4532 class LUSetInstanceParams(LogicalUnit):
4533   """Modifies an instances's parameters.
4534
4535   """
4536   HPATH = "instance-modify"
4537   HTYPE = constants.HTYPE_INSTANCE
4538   _OP_REQP = ["instance_name", "hvparams"]
4539   REQ_BGL = False
4540
4541   def ExpandNames(self):
4542     self._ExpandAndLockInstance()
4543     self.needed_locks[locking.LEVEL_NODE] = []
4544     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4545
4546
4547   def DeclareLocks(self, level):
4548     if level == locking.LEVEL_NODE:
4549       self._LockInstancesNodes()
4550
4551   def BuildHooksEnv(self):
4552     """Build hooks env.
4553
4554     This runs on the master, primary and secondaries.
4555
4556     """
4557     args = dict()
4558     if constants.BE_MEMORY in self.be_new:
4559       args['memory'] = self.be_new[constants.BE_MEMORY]
4560     if constants.BE_VCPUS in self.be_new:
4561       args['vcpus'] = self.be_bnew[constants.BE_VCPUS]
4562     if self.do_ip or self.do_bridge or self.mac:
4563       if self.do_ip:
4564         ip = self.ip
4565       else:
4566         ip = self.instance.nics[0].ip
4567       if self.bridge:
4568         bridge = self.bridge
4569       else:
4570         bridge = self.instance.nics[0].bridge
4571       if self.mac:
4572         mac = self.mac
4573       else:
4574         mac = self.instance.nics[0].mac
4575       args['nics'] = [(ip, bridge, mac)]
4576     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4577     nl = [self.cfg.GetMasterNode(),
4578           self.instance.primary_node] + list(self.instance.secondary_nodes)
4579     return env, nl, nl
4580
4581   def CheckPrereq(self):
4582     """Check prerequisites.
4583
4584     This only checks the instance list against the existing names.
4585
4586     """
4587     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4588     # a separate CheckArguments function, if we implement one, so the operation
4589     # can be aborted without waiting for any lock, should it have an error...
4590     self.ip = getattr(self.op, "ip", None)
4591     self.mac = getattr(self.op, "mac", None)
4592     self.bridge = getattr(self.op, "bridge", None)
4593     self.kernel_path = getattr(self.op, "kernel_path", None)
4594     self.initrd_path = getattr(self.op, "initrd_path", None)
4595     self.force = getattr(self.op, "force", None)
4596     all_parms = [self.ip, self.bridge, self.mac]
4597     if (all_parms.count(None) == len(all_parms) and
4598         not self.op.hvparams and
4599         not self.op.beparams):
4600       raise errors.OpPrereqError("No changes submitted")
4601     for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4602       val = self.op.beparams.get(item, None)
4603       if val is not None:
4604         try:
4605           val = int(val)
4606         except ValueError, err:
4607           raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4608         self.op.beparams[item] = val
4609     if self.ip is not None:
4610       self.do_ip = True
4611       if self.ip.lower() == "none":
4612         self.ip = None
4613       else:
4614         if not utils.IsValidIP(self.ip):
4615           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4616     else:
4617       self.do_ip = False
4618     self.do_bridge = (self.bridge is not None)
4619     if self.mac is not None:
4620       if self.cfg.IsMacInUse(self.mac):
4621         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4622                                    self.mac)
4623       if not utils.IsValidMac(self.mac):
4624         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4625
4626     # checking the new params on the primary/secondary nodes
4627
4628     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4629     assert self.instance is not None, \
4630       "Cannot retrieve locked instance %s" % self.op.instance_name
4631     pnode = self.instance.primary_node
4632     nodelist = [pnode]
4633     nodelist.extend(instance.secondary_nodes)
4634
4635     # hvparams processing
4636     if self.op.hvparams:
4637       i_hvdict = copy.deepcopy(instance.hvparams)
4638       for key, val in self.op.hvparams.iteritems():
4639         if val is None:
4640           try:
4641             del i_hvdict[key]
4642           except KeyError:
4643             pass
4644         else:
4645           i_hvdict[key] = val
4646       cluster = self.cfg.GetClusterInfo()
4647       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4648                                 i_hvdict)
4649       # local check
4650       hypervisor.GetHypervisor(
4651         instance.hypervisor).CheckParameterSyntax(hv_new)
4652       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4653       self.hv_new = hv_new # the new actual values
4654       self.hv_inst = i_hvdict # the new dict (without defaults)
4655     else:
4656       self.hv_new = self.hv_inst = {}
4657
4658     # beparams processing
4659     if self.op.beparams:
4660       i_bedict = copy.deepcopy(instance.beparams)
4661       for key, val in self.op.beparams.iteritems():
4662         if val is None:
4663           try:
4664             del i_bedict[key]
4665           except KeyError:
4666             pass
4667         else:
4668           i_bedict[key] = val
4669       cluster = self.cfg.GetClusterInfo()
4670       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4671                                 i_bedict)
4672       self.be_new = be_new # the new actual values
4673       self.be_inst = i_bedict # the new dict (without defaults)
4674     else:
4675       self.hv_new = self.hv_inst = {}
4676
4677     self.warn = []
4678
4679     if constants.BE_MEMORY in self.op.beparams and not self.force:
4680       mem_check_list = [pnode]
4681       if be_new[constants.BE_AUTO_BALANCE]:
4682         # either we changed auto_balance to yes or it was from before
4683         mem_check_list.extend(instance.secondary_nodes)
4684       instance_info = self.rpc.call_instance_info(pnode, instance.name,
4685                                                   instance.hypervisor)
4686       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4687                                          instance.hypervisor)
4688
4689       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4690         # Assume the primary node is unreachable and go ahead
4691         self.warn.append("Can't get info from primary node %s" % pnode)
4692       else:
4693         if instance_info:
4694           current_mem = instance_info['memory']
4695         else:
4696           # Assume instance not running
4697           # (there is a slight race condition here, but it's not very probable,
4698           # and we have no other way to check)
4699           current_mem = 0
4700         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4701                     nodeinfo[pnode]['memory_free'])
4702         if miss_mem > 0:
4703           raise errors.OpPrereqError("This change will prevent the instance"
4704                                      " from starting, due to %d MB of memory"
4705                                      " missing on its primary node" % miss_mem)
4706
4707       if be_new[constants.BE_AUTO_BALANCE]:
4708         for node in instance.secondary_nodes:
4709           if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4710             self.warn.append("Can't get info from secondary node %s" % node)
4711           elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4712             self.warn.append("Not enough memory to failover instance to"
4713                              " secondary node %s" % node)
4714
4715     return
4716
4717   def Exec(self, feedback_fn):
4718     """Modifies an instance.
4719
4720     All parameters take effect only at the next restart of the instance.
4721     """
4722     # Process here the warnings from CheckPrereq, as we don't have a
4723     # feedback_fn there.
4724     for warn in self.warn:
4725       feedback_fn("WARNING: %s" % warn)
4726
4727     result = []
4728     instance = self.instance
4729     if self.do_ip:
4730       instance.nics[0].ip = self.ip
4731       result.append(("ip", self.ip))
4732     if self.bridge:
4733       instance.nics[0].bridge = self.bridge
4734       result.append(("bridge", self.bridge))
4735     if self.mac:
4736       instance.nics[0].mac = self.mac
4737       result.append(("mac", self.mac))
4738     if self.op.hvparams:
4739       instance.hvparams = self.hv_new
4740       for key, val in self.op.hvparams.iteritems():
4741         result.append(("hv/%s" % key, val))
4742     if self.op.beparams:
4743       instance.beparams = self.be_inst
4744       for key, val in self.op.beparams.iteritems():
4745         result.append(("be/%s" % key, val))
4746
4747     self.cfg.Update(instance)
4748
4749     return result
4750
4751
4752 class LUQueryExports(NoHooksLU):
4753   """Query the exports list
4754
4755   """
4756   _OP_REQP = ['nodes']
4757   REQ_BGL = False
4758
4759   def ExpandNames(self):
4760     self.needed_locks = {}
4761     self.share_locks[locking.LEVEL_NODE] = 1
4762     if not self.op.nodes:
4763       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4764     else:
4765       self.needed_locks[locking.LEVEL_NODE] = \
4766         _GetWantedNodes(self, self.op.nodes)
4767
4768   def CheckPrereq(self):
4769     """Check prerequisites.
4770
4771     """
4772     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4773
4774   def Exec(self, feedback_fn):
4775     """Compute the list of all the exported system images.
4776
4777     Returns:
4778       a dictionary with the structure node->(export-list)
4779       where export-list is a list of the instances exported on
4780       that node.
4781
4782     """
4783     return self.rpc.call_export_list(self.nodes)
4784
4785
4786 class LUExportInstance(LogicalUnit):
4787   """Export an instance to an image in the cluster.
4788
4789   """
4790   HPATH = "instance-export"
4791   HTYPE = constants.HTYPE_INSTANCE
4792   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4793   REQ_BGL = False
4794
4795   def ExpandNames(self):
4796     self._ExpandAndLockInstance()
4797     # FIXME: lock only instance primary and destination node
4798     #
4799     # Sad but true, for now we have do lock all nodes, as we don't know where
4800     # the previous export might be, and and in this LU we search for it and
4801     # remove it from its current node. In the future we could fix this by:
4802     #  - making a tasklet to search (share-lock all), then create the new one,
4803     #    then one to remove, after
4804     #  - removing the removal operation altoghether
4805     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4806
4807   def DeclareLocks(self, level):
4808     """Last minute lock declaration."""
4809     # All nodes are locked anyway, so nothing to do here.
4810
4811   def BuildHooksEnv(self):
4812     """Build hooks env.
4813
4814     This will run on the master, primary node and target node.
4815
4816     """
4817     env = {
4818       "EXPORT_NODE": self.op.target_node,
4819       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4820       }
4821     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4822     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4823           self.op.target_node]
4824     return env, nl, nl
4825
4826   def CheckPrereq(self):
4827     """Check prerequisites.
4828
4829     This checks that the instance and node names are valid.
4830
4831     """
4832     instance_name = self.op.instance_name
4833     self.instance = self.cfg.GetInstanceInfo(instance_name)
4834     assert self.instance is not None, \
4835           "Cannot retrieve locked instance %s" % self.op.instance_name
4836
4837     self.dst_node = self.cfg.GetNodeInfo(
4838       self.cfg.ExpandNodeName(self.op.target_node))
4839
4840     assert self.dst_node is not None, \
4841           "Cannot retrieve locked node %s" % self.op.target_node
4842
4843     # instance disk type verification
4844     for disk in self.instance.disks:
4845       if disk.dev_type == constants.LD_FILE:
4846         raise errors.OpPrereqError("Export not supported for instances with"
4847                                    " file-based disks")
4848
4849   def Exec(self, feedback_fn):
4850     """Export an instance to an image in the cluster.
4851
4852     """
4853     instance = self.instance
4854     dst_node = self.dst_node
4855     src_node = instance.primary_node
4856     if self.op.shutdown:
4857       # shutdown the instance, but not the disks
4858       if not self.rpc.call_instance_shutdown(src_node, instance):
4859         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4860                                  (instance.name, src_node))
4861
4862     vgname = self.cfg.GetVGName()
4863
4864     snap_disks = []
4865
4866     try:
4867       for disk in instance.disks:
4868         if disk.iv_name == "sda":
4869           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4870           new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4871
4872           if not new_dev_name:
4873             logger.Error("could not snapshot block device %s on node %s" %
4874                          (disk.logical_id[1], src_node))
4875           else:
4876             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4877                                       logical_id=(vgname, new_dev_name),
4878                                       physical_id=(vgname, new_dev_name),
4879                                       iv_name=disk.iv_name)
4880             snap_disks.append(new_dev)
4881
4882     finally:
4883       if self.op.shutdown and instance.status == "up":
4884         if not self.rpc.call_instance_start(src_node, instance, None):
4885           _ShutdownInstanceDisks(self, instance)
4886           raise errors.OpExecError("Could not start instance")
4887
4888     # TODO: check for size
4889
4890     cluster_name = self.cfg.GetClusterName()
4891     for dev in snap_disks:
4892       if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4893                                       instance, cluster_name):
4894         logger.Error("could not export block device %s from node %s to node %s"
4895                      % (dev.logical_id[1], src_node, dst_node.name))
4896       if not self.rpc.call_blockdev_remove(src_node, dev):
4897         logger.Error("could not remove snapshot block device %s from node %s" %
4898                      (dev.logical_id[1], src_node))
4899
4900     if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4901       logger.Error("could not finalize export for instance %s on node %s" %
4902                    (instance.name, dst_node.name))
4903
4904     nodelist = self.cfg.GetNodeList()
4905     nodelist.remove(dst_node.name)
4906
4907     # on one-node clusters nodelist will be empty after the removal
4908     # if we proceed the backup would be removed because OpQueryExports
4909     # substitutes an empty list with the full cluster node list.
4910     if nodelist:
4911       exportlist = self.rpc.call_export_list(nodelist)
4912       for node in exportlist:
4913         if instance.name in exportlist[node]:
4914           if not self.rpc.call_export_remove(node, instance.name):
4915             logger.Error("could not remove older export for instance %s"
4916                          " on node %s" % (instance.name, node))
4917
4918
4919 class LURemoveExport(NoHooksLU):
4920   """Remove exports related to the named instance.
4921
4922   """
4923   _OP_REQP = ["instance_name"]
4924   REQ_BGL = False
4925
4926   def ExpandNames(self):
4927     self.needed_locks = {}
4928     # We need all nodes to be locked in order for RemoveExport to work, but we
4929     # don't need to lock the instance itself, as nothing will happen to it (and
4930     # we can remove exports also for a removed instance)
4931     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4932
4933   def CheckPrereq(self):
4934     """Check prerequisites.
4935     """
4936     pass
4937
4938   def Exec(self, feedback_fn):
4939     """Remove any export.
4940
4941     """
4942     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4943     # If the instance was not found we'll try with the name that was passed in.
4944     # This will only work if it was an FQDN, though.
4945     fqdn_warn = False
4946     if not instance_name:
4947       fqdn_warn = True
4948       instance_name = self.op.instance_name
4949
4950     exportlist = self.rpc.call_export_list(self.acquired_locks[
4951       locking.LEVEL_NODE])
4952     found = False
4953     for node in exportlist:
4954       if instance_name in exportlist[node]:
4955         found = True
4956         if not self.rpc.call_export_remove(node, instance_name):
4957           logger.Error("could not remove export for instance %s"
4958                        " on node %s" % (instance_name, node))
4959
4960     if fqdn_warn and not found:
4961       feedback_fn("Export not found. If trying to remove an export belonging"
4962                   " to a deleted instance please use its Fully Qualified"
4963                   " Domain Name.")
4964
4965
4966 class TagsLU(NoHooksLU):
4967   """Generic tags LU.
4968
4969   This is an abstract class which is the parent of all the other tags LUs.
4970
4971   """
4972
4973   def ExpandNames(self):
4974     self.needed_locks = {}
4975     if self.op.kind == constants.TAG_NODE:
4976       name = self.cfg.ExpandNodeName(self.op.name)
4977       if name is None:
4978         raise errors.OpPrereqError("Invalid node name (%s)" %
4979                                    (self.op.name,))
4980       self.op.name = name
4981       self.needed_locks[locking.LEVEL_NODE] = name
4982     elif self.op.kind == constants.TAG_INSTANCE:
4983       name = self.cfg.ExpandInstanceName(self.op.name)
4984       if name is None:
4985         raise errors.OpPrereqError("Invalid instance name (%s)" %
4986                                    (self.op.name,))
4987       self.op.name = name
4988       self.needed_locks[locking.LEVEL_INSTANCE] = name
4989
4990   def CheckPrereq(self):
4991     """Check prerequisites.
4992
4993     """
4994     if self.op.kind == constants.TAG_CLUSTER:
4995       self.target = self.cfg.GetClusterInfo()
4996     elif self.op.kind == constants.TAG_NODE:
4997       self.target = self.cfg.GetNodeInfo(self.op.name)
4998     elif self.op.kind == constants.TAG_INSTANCE:
4999       self.target = self.cfg.GetInstanceInfo(self.op.name)
5000     else:
5001       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5002                                  str(self.op.kind))
5003
5004
5005 class LUGetTags(TagsLU):
5006   """Returns the tags of a given object.
5007
5008   """
5009   _OP_REQP = ["kind", "name"]
5010   REQ_BGL = False
5011
5012   def Exec(self, feedback_fn):
5013     """Returns the tag list.
5014
5015     """
5016     return list(self.target.GetTags())
5017
5018
5019 class LUSearchTags(NoHooksLU):
5020   """Searches the tags for a given pattern.
5021
5022   """
5023   _OP_REQP = ["pattern"]
5024   REQ_BGL = False
5025
5026   def ExpandNames(self):
5027     self.needed_locks = {}
5028
5029   def CheckPrereq(self):
5030     """Check prerequisites.
5031
5032     This checks the pattern passed for validity by compiling it.
5033
5034     """
5035     try:
5036       self.re = re.compile(self.op.pattern)
5037     except re.error, err:
5038       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5039                                  (self.op.pattern, err))
5040
5041   def Exec(self, feedback_fn):
5042     """Returns the tag list.
5043
5044     """
5045     cfg = self.cfg
5046     tgts = [("/cluster", cfg.GetClusterInfo())]
5047     ilist = cfg.GetAllInstancesInfo().values()
5048     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5049     nlist = cfg.GetAllNodesInfo().values()
5050     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5051     results = []
5052     for path, target in tgts:
5053       for tag in target.GetTags():
5054         if self.re.search(tag):
5055           results.append((path, tag))
5056     return results
5057
5058
5059 class LUAddTags(TagsLU):
5060   """Sets a tag on a given object.
5061
5062   """
5063   _OP_REQP = ["kind", "name", "tags"]
5064   REQ_BGL = False
5065
5066   def CheckPrereq(self):
5067     """Check prerequisites.
5068
5069     This checks the type and length of the tag name and value.
5070
5071     """
5072     TagsLU.CheckPrereq(self)
5073     for tag in self.op.tags:
5074       objects.TaggableObject.ValidateTag(tag)
5075
5076   def Exec(self, feedback_fn):
5077     """Sets the tag.
5078
5079     """
5080     try:
5081       for tag in self.op.tags:
5082         self.target.AddTag(tag)
5083     except errors.TagError, err:
5084       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5085     try:
5086       self.cfg.Update(self.target)
5087     except errors.ConfigurationError:
5088       raise errors.OpRetryError("There has been a modification to the"
5089                                 " config file and the operation has been"
5090                                 " aborted. Please retry.")
5091
5092
5093 class LUDelTags(TagsLU):
5094   """Delete a list of tags from a given object.
5095
5096   """
5097   _OP_REQP = ["kind", "name", "tags"]
5098   REQ_BGL = False
5099
5100   def CheckPrereq(self):
5101     """Check prerequisites.
5102
5103     This checks that we have the given tag.
5104
5105     """
5106     TagsLU.CheckPrereq(self)
5107     for tag in self.op.tags:
5108       objects.TaggableObject.ValidateTag(tag)
5109     del_tags = frozenset(self.op.tags)
5110     cur_tags = self.target.GetTags()
5111     if not del_tags <= cur_tags:
5112       diff_tags = del_tags - cur_tags
5113       diff_names = ["'%s'" % tag for tag in diff_tags]
5114       diff_names.sort()
5115       raise errors.OpPrereqError("Tag(s) %s not found" %
5116                                  (",".join(diff_names)))
5117
5118   def Exec(self, feedback_fn):
5119     """Remove the tag from the object.
5120
5121     """
5122     for tag in self.op.tags:
5123       self.target.RemoveTag(tag)
5124     try:
5125       self.cfg.Update(self.target)
5126     except errors.ConfigurationError:
5127       raise errors.OpRetryError("There has been a modification to the"
5128                                 " config file and the operation has been"
5129                                 " aborted. Please retry.")
5130
5131
5132 class LUTestDelay(NoHooksLU):
5133   """Sleep for a specified amount of time.
5134
5135   This LU sleeps on the master and/or nodes for a specified amount of
5136   time.
5137
5138   """
5139   _OP_REQP = ["duration", "on_master", "on_nodes"]
5140   REQ_BGL = False
5141
5142   def ExpandNames(self):
5143     """Expand names and set required locks.
5144
5145     This expands the node list, if any.
5146
5147     """
5148     self.needed_locks = {}
5149     if self.op.on_nodes:
5150       # _GetWantedNodes can be used here, but is not always appropriate to use
5151       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5152       # more information.
5153       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5154       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5155
5156   def CheckPrereq(self):
5157     """Check prerequisites.
5158
5159     """
5160
5161   def Exec(self, feedback_fn):
5162     """Do the actual sleep.
5163
5164     """
5165     if self.op.on_master:
5166       if not utils.TestDelay(self.op.duration):
5167         raise errors.OpExecError("Error during master delay test")
5168     if self.op.on_nodes:
5169       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5170       if not result:
5171         raise errors.OpExecError("Complete failure from rpc call")
5172       for node, node_result in result.items():
5173         if not node_result:
5174           raise errors.OpExecError("Failure during rpc call to node %s,"
5175                                    " result: %s" % (node, node_result))
5176
5177
5178 class IAllocator(object):
5179   """IAllocator framework.
5180
5181   An IAllocator instance has three sets of attributes:
5182     - cfg that is needed to query the cluster
5183     - input data (all members of the _KEYS class attribute are required)
5184     - four buffer attributes (in|out_data|text), that represent the
5185       input (to the external script) in text and data structure format,
5186       and the output from it, again in two formats
5187     - the result variables from the script (success, info, nodes) for
5188       easy usage
5189
5190   """
5191   _ALLO_KEYS = [
5192     "mem_size", "disks", "disk_template",
5193     "os", "tags", "nics", "vcpus",
5194     ]
5195   _RELO_KEYS = [
5196     "relocate_from",
5197     ]
5198
5199   def __init__(self, lu, mode, name, **kwargs):
5200     self.lu = lu
5201     # init buffer variables
5202     self.in_text = self.out_text = self.in_data = self.out_data = None
5203     # init all input fields so that pylint is happy
5204     self.mode = mode
5205     self.name = name
5206     self.mem_size = self.disks = self.disk_template = None
5207     self.os = self.tags = self.nics = self.vcpus = None
5208     self.relocate_from = None
5209     # computed fields
5210     self.required_nodes = None
5211     # init result fields
5212     self.success = self.info = self.nodes = None
5213     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5214       keyset = self._ALLO_KEYS
5215     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5216       keyset = self._RELO_KEYS
5217     else:
5218       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5219                                    " IAllocator" % self.mode)
5220     for key in kwargs:
5221       if key not in keyset:
5222         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5223                                      " IAllocator" % key)
5224       setattr(self, key, kwargs[key])
5225     for key in keyset:
5226       if key not in kwargs:
5227         raise errors.ProgrammerError("Missing input parameter '%s' to"
5228                                      " IAllocator" % key)
5229     self._BuildInputData()
5230
5231   def _ComputeClusterData(self):
5232     """Compute the generic allocator input data.
5233
5234     This is the data that is independent of the actual operation.
5235
5236     """
5237     cfg = self.lu.cfg
5238     cluster_info = cfg.GetClusterInfo()
5239     # cluster data
5240     data = {
5241       "version": 1,
5242       "cluster_name": cfg.GetClusterName(),
5243       "cluster_tags": list(cluster_info.GetTags()),
5244       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5245       # we don't have job IDs
5246       }
5247
5248     i_list = []
5249     cluster = self.cfg.GetClusterInfo()
5250     for iname in cfg.GetInstanceList():
5251       i_obj = cfg.GetInstanceInfo(iname)
5252       i_list.append((i_obj, cluster.FillBE(i_obj)))
5253
5254     # node data
5255     node_results = {}
5256     node_list = cfg.GetNodeList()
5257     # FIXME: here we have only one hypervisor information, but
5258     # instance can belong to different hypervisors
5259     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5260                                            cfg.GetHypervisorType())
5261     for nname in node_list:
5262       ninfo = cfg.GetNodeInfo(nname)
5263       if nname not in node_data or not isinstance(node_data[nname], dict):
5264         raise errors.OpExecError("Can't get data for node %s" % nname)
5265       remote_info = node_data[nname]
5266       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5267                    'vg_size', 'vg_free', 'cpu_total']:
5268         if attr not in remote_info:
5269           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5270                                    (nname, attr))
5271         try:
5272           remote_info[attr] = int(remote_info[attr])
5273         except ValueError, err:
5274           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5275                                    " %s" % (nname, attr, str(err)))
5276       # compute memory used by primary instances
5277       i_p_mem = i_p_up_mem = 0
5278       for iinfo, beinfo in i_list:
5279         if iinfo.primary_node == nname:
5280           i_p_mem += beinfo[constants.BE_MEMORY]
5281           if iinfo.status == "up":
5282             i_p_up_mem += beinfo[constants.BE_MEMORY]
5283
5284       # compute memory used by instances
5285       pnr = {
5286         "tags": list(ninfo.GetTags()),
5287         "total_memory": remote_info['memory_total'],
5288         "reserved_memory": remote_info['memory_dom0'],
5289         "free_memory": remote_info['memory_free'],
5290         "i_pri_memory": i_p_mem,
5291         "i_pri_up_memory": i_p_up_mem,
5292         "total_disk": remote_info['vg_size'],
5293         "free_disk": remote_info['vg_free'],
5294         "primary_ip": ninfo.primary_ip,
5295         "secondary_ip": ninfo.secondary_ip,
5296         "total_cpus": remote_info['cpu_total'],
5297         }
5298       node_results[nname] = pnr
5299     data["nodes"] = node_results
5300
5301     # instance data
5302     instance_data = {}
5303     for iinfo, beinfo in i_list:
5304       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5305                   for n in iinfo.nics]
5306       pir = {
5307         "tags": list(iinfo.GetTags()),
5308         "should_run": iinfo.status == "up",
5309         "vcpus": beinfo[constants.BE_VCPUS],
5310         "memory": beinfo[constants.BE_MEMORY],
5311         "os": iinfo.os,
5312         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5313         "nics": nic_data,
5314         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5315         "disk_template": iinfo.disk_template,
5316         "hypervisor": iinfo.hypervisor,
5317         }
5318       instance_data[iinfo.name] = pir
5319
5320     data["instances"] = instance_data
5321
5322     self.in_data = data
5323
5324   def _AddNewInstance(self):
5325     """Add new instance data to allocator structure.
5326
5327     This in combination with _AllocatorGetClusterData will create the
5328     correct structure needed as input for the allocator.
5329
5330     The checks for the completeness of the opcode must have already been
5331     done.
5332
5333     """
5334     data = self.in_data
5335     if len(self.disks) != 2:
5336       raise errors.OpExecError("Only two-disk configurations supported")
5337
5338     disk_space = _ComputeDiskSize(self.disk_template,
5339                                   self.disks[0]["size"], self.disks[1]["size"])
5340
5341     if self.disk_template in constants.DTS_NET_MIRROR:
5342       self.required_nodes = 2
5343     else:
5344       self.required_nodes = 1
5345     request = {
5346       "type": "allocate",
5347       "name": self.name,
5348       "disk_template": self.disk_template,
5349       "tags": self.tags,
5350       "os": self.os,
5351       "vcpus": self.vcpus,
5352       "memory": self.mem_size,
5353       "disks": self.disks,
5354       "disk_space_total": disk_space,
5355       "nics": self.nics,
5356       "required_nodes": self.required_nodes,
5357       }
5358     data["request"] = request
5359
5360   def _AddRelocateInstance(self):
5361     """Add relocate instance data to allocator structure.
5362
5363     This in combination with _IAllocatorGetClusterData will create the
5364     correct structure needed as input for the allocator.
5365
5366     The checks for the completeness of the opcode must have already been
5367     done.
5368
5369     """
5370     instance = self.lu.cfg.GetInstanceInfo(self.name)
5371     if instance is None:
5372       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5373                                    " IAllocator" % self.name)
5374
5375     if instance.disk_template not in constants.DTS_NET_MIRROR:
5376       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5377
5378     if len(instance.secondary_nodes) != 1:
5379       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5380
5381     self.required_nodes = 1
5382
5383     disk_space = _ComputeDiskSize(instance.disk_template,
5384                                   instance.disks[0].size,
5385                                   instance.disks[1].size)
5386
5387     request = {
5388       "type": "relocate",
5389       "name": self.name,
5390       "disk_space_total": disk_space,
5391       "required_nodes": self.required_nodes,
5392       "relocate_from": self.relocate_from,
5393       }
5394     self.in_data["request"] = request
5395
5396   def _BuildInputData(self):
5397     """Build input data structures.
5398
5399     """
5400     self._ComputeClusterData()
5401
5402     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5403       self._AddNewInstance()
5404     else:
5405       self._AddRelocateInstance()
5406
5407     self.in_text = serializer.Dump(self.in_data)
5408
5409   def Run(self, name, validate=True, call_fn=None):
5410     """Run an instance allocator and return the results.
5411
5412     """
5413     if call_fn is None:
5414       call_fn = self.lu.rpc.call_iallocator_runner
5415     data = self.in_text
5416
5417     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5418
5419     if not isinstance(result, (list, tuple)) or len(result) != 4:
5420       raise errors.OpExecError("Invalid result from master iallocator runner")
5421
5422     rcode, stdout, stderr, fail = result
5423
5424     if rcode == constants.IARUN_NOTFOUND:
5425       raise errors.OpExecError("Can't find allocator '%s'" % name)
5426     elif rcode == constants.IARUN_FAILURE:
5427       raise errors.OpExecError("Instance allocator call failed: %s,"
5428                                " output: %s" % (fail, stdout+stderr))
5429     self.out_text = stdout
5430     if validate:
5431       self._ValidateResult()
5432
5433   def _ValidateResult(self):
5434     """Process the allocator results.
5435
5436     This will process and if successful save the result in
5437     self.out_data and the other parameters.
5438
5439     """
5440     try:
5441       rdict = serializer.Load(self.out_text)
5442     except Exception, err:
5443       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5444
5445     if not isinstance(rdict, dict):
5446       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5447
5448     for key in "success", "info", "nodes":
5449       if key not in rdict:
5450         raise errors.OpExecError("Can't parse iallocator results:"
5451                                  " missing key '%s'" % key)
5452       setattr(self, key, rdict[key])
5453
5454     if not isinstance(rdict["nodes"], list):
5455       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5456                                " is not a list")
5457     self.out_data = rdict
5458
5459
5460 class LUTestAllocator(NoHooksLU):
5461   """Run allocator tests.
5462
5463   This LU runs the allocator tests
5464
5465   """
5466   _OP_REQP = ["direction", "mode", "name"]
5467
5468   def CheckPrereq(self):
5469     """Check prerequisites.
5470
5471     This checks the opcode parameters depending on the director and mode test.
5472
5473     """
5474     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5475       for attr in ["name", "mem_size", "disks", "disk_template",
5476                    "os", "tags", "nics", "vcpus"]:
5477         if not hasattr(self.op, attr):
5478           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5479                                      attr)
5480       iname = self.cfg.ExpandInstanceName(self.op.name)
5481       if iname is not None:
5482         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5483                                    iname)
5484       if not isinstance(self.op.nics, list):
5485         raise errors.OpPrereqError("Invalid parameter 'nics'")
5486       for row in self.op.nics:
5487         if (not isinstance(row, dict) or
5488             "mac" not in row or
5489             "ip" not in row or
5490             "bridge" not in row):
5491           raise errors.OpPrereqError("Invalid contents of the"
5492                                      " 'nics' parameter")
5493       if not isinstance(self.op.disks, list):
5494         raise errors.OpPrereqError("Invalid parameter 'disks'")
5495       if len(self.op.disks) != 2:
5496         raise errors.OpPrereqError("Only two-disk configurations supported")
5497       for row in self.op.disks:
5498         if (not isinstance(row, dict) or
5499             "size" not in row or
5500             not isinstance(row["size"], int) or
5501             "mode" not in row or
5502             row["mode"] not in ['r', 'w']):
5503           raise errors.OpPrereqError("Invalid contents of the"
5504                                      " 'disks' parameter")
5505     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5506       if not hasattr(self.op, "name"):
5507         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5508       fname = self.cfg.ExpandInstanceName(self.op.name)
5509       if fname is None:
5510         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5511                                    self.op.name)
5512       self.op.name = fname
5513       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5514     else:
5515       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5516                                  self.op.mode)
5517
5518     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5519       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5520         raise errors.OpPrereqError("Missing allocator name")
5521     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5522       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5523                                  self.op.direction)
5524
5525   def Exec(self, feedback_fn):
5526     """Run the allocator test.
5527
5528     """
5529     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5530       ial = IAllocator(self,
5531                        mode=self.op.mode,
5532                        name=self.op.name,
5533                        mem_size=self.op.mem_size,
5534                        disks=self.op.disks,
5535                        disk_template=self.op.disk_template,
5536                        os=self.op.os,
5537                        tags=self.op.tags,
5538                        nics=self.op.nics,
5539                        vcpus=self.op.vcpus,
5540                        )
5541     else:
5542       ial = IAllocator(self,
5543                        mode=self.op.mode,
5544                        name=self.op.name,
5545                        relocate_from=list(self.relocate_from),
5546                        )
5547
5548     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5549       result = ial.in_text
5550     else:
5551       ial.Run(self.op.allocator, validate=False)
5552       result = ial.out_text
5553     return result