4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_MASTER: the LU needs to run on the master node
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
70 def __init__(self, processor, op, context, rpc):
71 """Constructor for LogicalUnit.
73 This needs to be overriden in derived classes in order to check op
79 self.cfg = context.cfg
80 self.context = context
82 # Dicts used to declare locking needs to mcpu
83 self.needed_locks = None
84 self.acquired_locks = {}
85 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
87 self.remove_locks = {}
88 # Used to force good behavior when calling helper functions
89 self.recalculate_locks = {}
92 for attr_name in self._OP_REQP:
93 attr_val = getattr(op, attr_name, None)
95 raise errors.OpPrereqError("Required parameter '%s' missing" %
98 if not self.cfg.IsCluster():
99 raise errors.OpPrereqError("Cluster not initialized yet,"
100 " use 'gnt-cluster init' first.")
102 master = self.cfg.GetMasterNode()
103 if master != utils.HostInfo().name:
104 raise errors.OpPrereqError("Commands must be run on the master"
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def ExpandNames(self):
118 """Expand names for this LU.
120 This method is called before starting to execute the opcode, and it should
121 update all the parameters of the opcode to their canonical form (e.g. a
122 short node name must be fully expanded after this method has successfully
123 completed). This way locking, hooks, logging, ecc. can work correctly.
125 LUs which implement this method must also populate the self.needed_locks
126 member, as a dict with lock levels as keys, and a list of needed lock names
128 - Use an empty dict if you don't need any lock
129 - If you don't need any lock at a particular level omit that level
130 - Don't put anything for the BGL level
131 - If you want all locks at a level use locking.ALL_SET as a value
133 If you need to share locks (rather than acquire them exclusively) at one
134 level you can modify self.share_locks, setting a true value (usually 1) for
135 that level. By default locks are not shared.
138 # Acquire all nodes and one instance
139 self.needed_locks = {
140 locking.LEVEL_NODE: locking.ALL_SET,
141 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
143 # Acquire just two nodes
144 self.needed_locks = {
145 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148 self.needed_locks = {} # No, you can't leave it to the default value None
151 # The implementation of this method is mandatory only if the new LU is
152 # concurrent, so that old LUs don't need to be changed all at the same
155 self.needed_locks = {} # Exclusive LUs don't need locks.
157 raise NotImplementedError
159 def DeclareLocks(self, level):
160 """Declare LU locking needs for a level
162 While most LUs can just declare their locking needs at ExpandNames time,
163 sometimes there's the need to calculate some locks after having acquired
164 the ones before. This function is called just before acquiring locks at a
165 particular level, but after acquiring the ones at lower levels, and permits
166 such calculations. It can be used to modify self.needed_locks, and by
167 default it does nothing.
169 This function is only called if you have something already set in
170 self.needed_locks for the level.
172 @param level: Locking level which is going to be locked
173 @type level: member of ganeti.locking.LEVELS
177 def CheckPrereq(self):
178 """Check prerequisites for this LU.
180 This method should check that the prerequisites for the execution
181 of this LU are fulfilled. It can do internode communication, but
182 it should be idempotent - no cluster or system changes are
185 The method should raise errors.OpPrereqError in case something is
186 not fulfilled. Its return value is ignored.
188 This method should also update all the parameters of the opcode to
189 their canonical form if it hasn't been done by ExpandNames before.
192 raise NotImplementedError
194 def Exec(self, feedback_fn):
197 This method should implement the actual work. It should raise
198 errors.OpExecError for failures that are somewhat dealt with in
202 raise NotImplementedError
204 def BuildHooksEnv(self):
205 """Build hooks environment for this LU.
207 This method should return a three-node tuple consisting of: a dict
208 containing the environment that will be used for running the
209 specific hook for this LU, a list of node names on which the hook
210 should run before the execution, and a list of node names on which
211 the hook should run after the execution.
213 The keys of the dict must not have 'GANETI_' prefixed as this will
214 be handled in the hooks runner. Also note additional keys will be
215 added by the hooks runner. If the LU doesn't define any
216 environment, an empty dict (and not None) should be returned.
218 No nodes should be returned as an empty list (and not None).
220 Note that if the HPATH for a LU class is None, this function will
224 raise NotImplementedError
226 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227 """Notify the LU about the results of its hooks.
229 This method is called every time a hooks phase is executed, and notifies
230 the Logical Unit about the hooks' result. The LU can then use it to alter
231 its result based on the hooks. By default the method does nothing and the
232 previous result is passed back unchanged but any LU can define it if it
233 wants to use the local cluster hook-scripts somehow.
236 phase: the hooks phase that has just been run
237 hooks_results: the results of the multi-node hooks rpc call
238 feedback_fn: function to send feedback back to the caller
239 lu_result: the previous result this LU had, or None in the PRE phase.
244 def _ExpandAndLockInstance(self):
245 """Helper function to expand and lock an instance.
247 Many LUs that work on an instance take its name in self.op.instance_name
248 and need to expand it and then declare the expanded name for locking. This
249 function does it, and then updates self.op.instance_name to the expanded
250 name. It also initializes needed_locks as a dict, if this hasn't been done
254 if self.needed_locks is None:
255 self.needed_locks = {}
257 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258 "_ExpandAndLockInstance called with instance-level locks set"
259 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260 if expanded_name is None:
261 raise errors.OpPrereqError("Instance '%s' not known" %
262 self.op.instance_name)
263 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264 self.op.instance_name = expanded_name
266 def _LockInstancesNodes(self, primary_only=False):
267 """Helper function to declare instances' nodes for locking.
269 This function should be called after locking one or more instances to lock
270 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271 with all primary or secondary nodes for instances already locked and
272 present in self.needed_locks[locking.LEVEL_INSTANCE].
274 It should be called from DeclareLocks, and for safety only works if
275 self.recalculate_locks[locking.LEVEL_NODE] is set.
277 In the future it may grow parameters to just lock some instance's nodes, or
278 to just lock primaries or secondary nodes, if needed.
280 If should be called in DeclareLocks in a way similar to:
282 if level == locking.LEVEL_NODE:
283 self._LockInstancesNodes()
285 @type primary_only: boolean
286 @param primary_only: only lock primary nodes of locked instances
289 assert locking.LEVEL_NODE in self.recalculate_locks, \
290 "_LockInstancesNodes helper function called with no nodes to recalculate"
292 # TODO: check if we're really been called with the instance locks held
294 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
295 # future we might want to have different behaviors depending on the value
296 # of self.recalculate_locks[locking.LEVEL_NODE]
298 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
299 instance = self.context.cfg.GetInstanceInfo(instance_name)
300 wanted_nodes.append(instance.primary_node)
302 wanted_nodes.extend(instance.secondary_nodes)
304 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
305 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
306 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
307 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
309 del self.recalculate_locks[locking.LEVEL_NODE]
312 class NoHooksLU(LogicalUnit):
313 """Simple LU which runs no hooks.
315 This LU is intended as a parent for other LogicalUnits which will
316 run no hooks, in order to reduce duplicate code.
323 def _GetWantedNodes(lu, nodes):
324 """Returns list of checked and expanded node names.
327 nodes: List of nodes (strings) or None for all
330 if not isinstance(nodes, list):
331 raise errors.OpPrereqError("Invalid argument type 'nodes'")
334 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
335 " non-empty list of nodes whose name is to be expanded.")
339 node = lu.cfg.ExpandNodeName(name)
341 raise errors.OpPrereqError("No such node name '%s'" % name)
344 return utils.NiceSort(wanted)
347 def _GetWantedInstances(lu, instances):
348 """Returns list of checked and expanded instance names.
351 instances: List of instances (strings) or None for all
354 if not isinstance(instances, list):
355 raise errors.OpPrereqError("Invalid argument type 'instances'")
360 for name in instances:
361 instance = lu.cfg.ExpandInstanceName(name)
363 raise errors.OpPrereqError("No such instance name '%s'" % name)
364 wanted.append(instance)
367 wanted = lu.cfg.GetInstanceList()
368 return utils.NiceSort(wanted)
371 def _CheckOutputFields(static, dynamic, selected):
372 """Checks whether all selected fields are valid.
375 static: Static fields
376 dynamic: Dynamic fields
379 static_fields = frozenset(static)
380 dynamic_fields = frozenset(dynamic)
382 all_fields = static_fields | dynamic_fields
384 if not all_fields.issuperset(selected):
385 raise errors.OpPrereqError("Unknown output fields selected: %s"
386 % ",".join(frozenset(selected).
387 difference(all_fields)))
390 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
391 memory, vcpus, nics):
392 """Builds instance related env variables for hooks from single variables.
395 secondary_nodes: List of secondary nodes as strings
399 "INSTANCE_NAME": name,
400 "INSTANCE_PRIMARY": primary_node,
401 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
402 "INSTANCE_OS_TYPE": os_type,
403 "INSTANCE_STATUS": status,
404 "INSTANCE_MEMORY": memory,
405 "INSTANCE_VCPUS": vcpus,
409 nic_count = len(nics)
410 for idx, (ip, bridge, mac) in enumerate(nics):
413 env["INSTANCE_NIC%d_IP" % idx] = ip
414 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
415 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
419 env["INSTANCE_NIC_COUNT"] = nic_count
424 def _BuildInstanceHookEnvByObject(instance, override=None):
425 """Builds instance related env variables for hooks from an object.
428 instance: objects.Instance object of instance
429 override: dict of values to override
432 'name': instance.name,
433 'primary_node': instance.primary_node,
434 'secondary_nodes': instance.secondary_nodes,
435 'os_type': instance.os,
436 'status': instance.os,
437 'memory': instance.memory,
438 'vcpus': instance.vcpus,
439 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
442 args.update(override)
443 return _BuildInstanceHookEnv(**args)
446 def _CheckInstanceBridgesExist(lu, instance):
447 """Check that the brigdes needed by an instance exist.
450 # check bridges existance
451 brlist = [nic.bridge for nic in instance.nics]
452 if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
453 raise errors.OpPrereqError("one or more target bridges %s does not"
454 " exist on destination node '%s'" %
455 (brlist, instance.primary_node))
458 class LUDestroyCluster(NoHooksLU):
459 """Logical unit for destroying the cluster.
464 def CheckPrereq(self):
465 """Check prerequisites.
467 This checks whether the cluster is empty.
469 Any errors are signalled by raising errors.OpPrereqError.
472 master = self.cfg.GetMasterNode()
474 nodelist = self.cfg.GetNodeList()
475 if len(nodelist) != 1 or nodelist[0] != master:
476 raise errors.OpPrereqError("There are still %d node(s) in"
477 " this cluster." % (len(nodelist) - 1))
478 instancelist = self.cfg.GetInstanceList()
480 raise errors.OpPrereqError("There are still %d instance(s) in"
481 " this cluster." % len(instancelist))
483 def Exec(self, feedback_fn):
484 """Destroys the cluster.
487 master = self.cfg.GetMasterNode()
488 if not self.rpc.call_node_stop_master(master, False):
489 raise errors.OpExecError("Could not disable the master role")
490 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
491 utils.CreateBackup(priv_key)
492 utils.CreateBackup(pub_key)
496 class LUVerifyCluster(LogicalUnit):
497 """Verifies the cluster status.
500 HPATH = "cluster-verify"
501 HTYPE = constants.HTYPE_CLUSTER
502 _OP_REQP = ["skip_checks"]
505 def ExpandNames(self):
506 self.needed_locks = {
507 locking.LEVEL_NODE: locking.ALL_SET,
508 locking.LEVEL_INSTANCE: locking.ALL_SET,
510 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
513 remote_version, feedback_fn):
514 """Run multiple tests against a node.
517 - compares ganeti version
518 - checks vg existance and size > 20G
519 - checks config file checksum
520 - checks ssh to other nodes
523 node: name of the node to check
524 file_list: required list of files
525 local_cksum: dictionary of local files and their checksums
528 # compares ganeti version
529 local_version = constants.PROTOCOL_VERSION
530 if not remote_version:
531 feedback_fn(" - ERROR: connection to %s failed" % (node))
534 if local_version != remote_version:
535 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
536 (local_version, node, remote_version))
539 # checks vg existance and size > 20G
543 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
547 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
548 constants.MIN_VG_SIZE)
550 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
554 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
557 # checks config file checksum
560 if 'filelist' not in node_result:
562 feedback_fn(" - ERROR: node hasn't returned file checksum data")
564 remote_cksum = node_result['filelist']
565 for file_name in file_list:
566 if file_name not in remote_cksum:
568 feedback_fn(" - ERROR: file '%s' missing" % file_name)
569 elif remote_cksum[file_name] != local_cksum[file_name]:
571 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
573 if 'nodelist' not in node_result:
575 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
577 if node_result['nodelist']:
579 for node in node_result['nodelist']:
580 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
581 (node, node_result['nodelist'][node]))
582 if 'node-net-test' not in node_result:
584 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
586 if node_result['node-net-test']:
588 nlist = utils.NiceSort(node_result['node-net-test'].keys())
590 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
591 (node, node_result['node-net-test'][node]))
593 hyp_result = node_result.get('hypervisor', None)
594 if isinstance(hyp_result, dict):
595 for hv_name, hv_result in hyp_result.iteritems():
596 if hv_result is not None:
597 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
598 (hv_name, hv_result))
601 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
602 node_instance, feedback_fn):
603 """Verify an instance.
605 This function checks to see if the required block devices are
606 available on the instance's node.
611 node_current = instanceconfig.primary_node
614 instanceconfig.MapLVsByNode(node_vol_should)
616 for node in node_vol_should:
617 for volume in node_vol_should[node]:
618 if node not in node_vol_is or volume not in node_vol_is[node]:
619 feedback_fn(" - ERROR: volume %s missing on node %s" %
623 if not instanceconfig.status == 'down':
624 if (node_current not in node_instance or
625 not instance in node_instance[node_current]):
626 feedback_fn(" - ERROR: instance %s not running on node %s" %
627 (instance, node_current))
630 for node in node_instance:
631 if (not node == node_current):
632 if instance in node_instance[node]:
633 feedback_fn(" - ERROR: instance %s should not run on node %s" %
639 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
640 """Verify if there are any unknown volumes in the cluster.
642 The .os, .swap and backup volumes are ignored. All other volumes are
648 for node in node_vol_is:
649 for volume in node_vol_is[node]:
650 if node not in node_vol_should or volume not in node_vol_should[node]:
651 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
656 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
657 """Verify the list of running instances.
659 This checks what instances are running but unknown to the cluster.
663 for node in node_instance:
664 for runninginstance in node_instance[node]:
665 if runninginstance not in instancelist:
666 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
667 (runninginstance, node))
671 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
672 """Verify N+1 Memory Resilience.
674 Check that if one single node dies we can still start all the instances it
680 for node, nodeinfo in node_info.iteritems():
681 # This code checks that every node which is now listed as secondary has
682 # enough memory to host all instances it is supposed to should a single
683 # other node in the cluster fail.
684 # FIXME: not ready for failover to an arbitrary node
685 # FIXME: does not support file-backed instances
686 # WARNING: we currently take into account down instances as well as up
687 # ones, considering that even if they're down someone might want to start
688 # them even in the event of a node failure.
689 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
691 for instance in instances:
692 needed_mem += instance_cfg[instance].memory
693 if nodeinfo['mfree'] < needed_mem:
694 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
695 " failovers should node %s fail" % (node, prinode))
699 def CheckPrereq(self):
700 """Check prerequisites.
702 Transform the list of checks we're going to skip into a set and check that
703 all its members are valid.
706 self.skip_set = frozenset(self.op.skip_checks)
707 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
708 raise errors.OpPrereqError("Invalid checks to be skipped specified")
710 def BuildHooksEnv(self):
713 Cluster-Verify hooks just rone in the post phase and their failure makes
714 the output be logged in the verify output and the verification to fail.
717 all_nodes = self.cfg.GetNodeList()
718 # TODO: populate the environment with useful information for verify hooks
720 return env, [], all_nodes
722 def Exec(self, feedback_fn):
723 """Verify integrity of cluster, performing various test on nodes.
727 feedback_fn("* Verifying global settings")
728 for msg in self.cfg.VerifyConfig():
729 feedback_fn(" - ERROR: %s" % msg)
731 vg_name = self.cfg.GetVGName()
732 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
733 nodelist = utils.NiceSort(self.cfg.GetNodeList())
734 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
735 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
736 i_non_redundant = [] # Non redundant instances
742 # FIXME: verify OS list
745 file_names.append(constants.SSL_CERT_FILE)
746 file_names.append(constants.CLUSTER_CONF_FILE)
747 local_checksums = utils.FingerprintFiles(file_names)
749 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
750 all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
751 all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
752 all_vglist = self.rpc.call_vg_list(nodelist)
753 node_verify_param = {
754 'filelist': file_names,
755 'nodelist': nodelist,
756 'hypervisor': hypervisors,
757 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
758 for node in nodeinfo]
760 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
761 self.cfg.GetClusterName())
762 all_rversion = self.rpc.call_version(nodelist)
763 all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
764 self.cfg.GetHypervisorType())
766 for node in nodelist:
767 feedback_fn("* Verifying node %s" % node)
768 result = self._VerifyNode(node, file_names, local_checksums,
769 all_vglist[node], all_nvinfo[node],
770 all_rversion[node], feedback_fn)
774 volumeinfo = all_volumeinfo[node]
776 if isinstance(volumeinfo, basestring):
777 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
778 (node, volumeinfo[-400:].encode('string_escape')))
780 node_volume[node] = {}
781 elif not isinstance(volumeinfo, dict):
782 feedback_fn(" - ERROR: connection to %s failed" % (node,))
786 node_volume[node] = volumeinfo
789 nodeinstance = all_instanceinfo[node]
790 if type(nodeinstance) != list:
791 feedback_fn(" - ERROR: connection to %s failed" % (node,))
795 node_instance[node] = nodeinstance
798 nodeinfo = all_ninfo[node]
799 if not isinstance(nodeinfo, dict):
800 feedback_fn(" - ERROR: connection to %s failed" % (node,))
806 "mfree": int(nodeinfo['memory_free']),
807 "dfree": int(nodeinfo['vg_free']),
810 # dictionary holding all instances this node is secondary for,
811 # grouped by their primary node. Each key is a cluster node, and each
812 # value is a list of instances which have the key as primary and the
813 # current node as secondary. this is handy to calculate N+1 memory
814 # availability if you can only failover from a primary to its
816 "sinst-by-pnode": {},
819 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
825 for instance in instancelist:
826 feedback_fn("* Verifying instance %s" % instance)
827 inst_config = self.cfg.GetInstanceInfo(instance)
828 result = self._VerifyInstance(instance, inst_config, node_volume,
829 node_instance, feedback_fn)
832 inst_config.MapLVsByNode(node_vol_should)
834 instance_cfg[instance] = inst_config
836 pnode = inst_config.primary_node
837 if pnode in node_info:
838 node_info[pnode]['pinst'].append(instance)
840 feedback_fn(" - ERROR: instance %s, connection to primary node"
841 " %s failed" % (instance, pnode))
844 # If the instance is non-redundant we cannot survive losing its primary
845 # node, so we are not N+1 compliant. On the other hand we have no disk
846 # templates with more than one secondary so that situation is not well
848 # FIXME: does not support file-backed instances
849 if len(inst_config.secondary_nodes) == 0:
850 i_non_redundant.append(instance)
851 elif len(inst_config.secondary_nodes) > 1:
852 feedback_fn(" - WARNING: multiple secondaries for instance %s"
855 for snode in inst_config.secondary_nodes:
856 if snode in node_info:
857 node_info[snode]['sinst'].append(instance)
858 if pnode not in node_info[snode]['sinst-by-pnode']:
859 node_info[snode]['sinst-by-pnode'][pnode] = []
860 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
862 feedback_fn(" - ERROR: instance %s, connection to secondary node"
863 " %s failed" % (instance, snode))
865 feedback_fn("* Verifying orphan volumes")
866 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
870 feedback_fn("* Verifying remaining instances")
871 result = self._VerifyOrphanInstances(instancelist, node_instance,
875 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
876 feedback_fn("* Verifying N+1 Memory redundancy")
877 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
880 feedback_fn("* Other Notes")
882 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
883 % len(i_non_redundant))
887 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
888 """Analize the post-hooks' result, handle it, and send some
889 nicely-formatted feedback back to the user.
892 phase: the hooks phase that has just been run
893 hooks_results: the results of the multi-node hooks rpc call
894 feedback_fn: function to send feedback back to the caller
895 lu_result: previous Exec result
898 # We only really run POST phase hooks, and are only interested in
900 if phase == constants.HOOKS_PHASE_POST:
901 # Used to change hooks' output to proper indentation
902 indent_re = re.compile('^', re.M)
903 feedback_fn("* Hooks Results")
904 if not hooks_results:
905 feedback_fn(" - ERROR: general communication failure")
908 for node_name in hooks_results:
909 show_node_header = True
910 res = hooks_results[node_name]
911 if res is False or not isinstance(res, list):
912 feedback_fn(" Communication failure")
915 for script, hkr, output in res:
916 if hkr == constants.HKR_FAIL:
917 # The node header is only shown once, if there are
918 # failing hooks on that node
920 feedback_fn(" Node %s:" % node_name)
921 show_node_header = False
922 feedback_fn(" ERROR: Script %s failed, output:" % script)
923 output = indent_re.sub(' ', output)
924 feedback_fn("%s" % output)
930 class LUVerifyDisks(NoHooksLU):
931 """Verifies the cluster disks status.
937 def ExpandNames(self):
938 self.needed_locks = {
939 locking.LEVEL_NODE: locking.ALL_SET,
940 locking.LEVEL_INSTANCE: locking.ALL_SET,
942 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
944 def CheckPrereq(self):
945 """Check prerequisites.
947 This has no prerequisites.
952 def Exec(self, feedback_fn):
953 """Verify integrity of cluster disks.
956 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
958 vg_name = self.cfg.GetVGName()
959 nodes = utils.NiceSort(self.cfg.GetNodeList())
960 instances = [self.cfg.GetInstanceInfo(name)
961 for name in self.cfg.GetInstanceList()]
964 for inst in instances:
966 if (inst.status != "up" or
967 inst.disk_template not in constants.DTS_NET_MIRROR):
969 inst.MapLVsByNode(inst_lvs)
970 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
971 for node, vol_list in inst_lvs.iteritems():
973 nv_dict[(node, vol)] = inst
978 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
985 if isinstance(lvs, basestring):
986 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
988 elif not isinstance(lvs, dict):
989 logger.Info("connection to node %s failed or invalid data returned" %
991 res_nodes.append(node)
994 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
995 inst = nv_dict.pop((node, lv_name), None)
996 if (not lv_online and inst is not None
997 and inst.name not in res_instances):
998 res_instances.append(inst.name)
1000 # any leftover items in nv_dict are missing LVs, let's arrange the
1002 for key, inst in nv_dict.iteritems():
1003 if inst.name not in res_missing:
1004 res_missing[inst.name] = []
1005 res_missing[inst.name].append(key)
1010 class LURenameCluster(LogicalUnit):
1011 """Rename the cluster.
1014 HPATH = "cluster-rename"
1015 HTYPE = constants.HTYPE_CLUSTER
1018 def BuildHooksEnv(self):
1023 "OP_TARGET": self.cfg.GetClusterName(),
1024 "NEW_NAME": self.op.name,
1026 mn = self.cfg.GetMasterNode()
1027 return env, [mn], [mn]
1029 def CheckPrereq(self):
1030 """Verify that the passed name is a valid one.
1033 hostname = utils.HostInfo(self.op.name)
1035 new_name = hostname.name
1036 self.ip = new_ip = hostname.ip
1037 old_name = self.cfg.GetClusterName()
1038 old_ip = self.cfg.GetMasterIP()
1039 if new_name == old_name and new_ip == old_ip:
1040 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1041 " cluster has changed")
1042 if new_ip != old_ip:
1043 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1044 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1045 " reachable on the network. Aborting." %
1048 self.op.name = new_name
1050 def Exec(self, feedback_fn):
1051 """Rename the cluster.
1054 clustername = self.op.name
1057 # shutdown the master IP
1058 master = self.cfg.GetMasterNode()
1059 if not self.rpc.call_node_stop_master(master, False):
1060 raise errors.OpExecError("Could not disable the master role")
1065 ss.SetKey(ss.SS_MASTER_IP, ip)
1066 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1068 # Distribute updated ss config to all nodes
1069 myself = self.cfg.GetNodeInfo(master)
1070 dist_nodes = self.cfg.GetNodeList()
1071 if myself.name in dist_nodes:
1072 dist_nodes.remove(myself.name)
1074 logger.Debug("Copying updated ssconf data to all nodes")
1075 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1076 fname = ss.KeyToFilename(keyname)
1077 result = self.rpc.call_upload_file(dist_nodes, fname)
1078 for to_node in dist_nodes:
1079 if not result[to_node]:
1080 logger.Error("copy of file %s to node %s failed" %
1083 if not self.rpc.call_node_start_master(master, False):
1084 logger.Error("Could not re-enable the master role on the master,"
1085 " please restart manually.")
1088 def _RecursiveCheckIfLVMBased(disk):
1089 """Check if the given disk or its children are lvm-based.
1092 disk: ganeti.objects.Disk object
1095 boolean indicating whether a LD_LV dev_type was found or not
1099 for chdisk in disk.children:
1100 if _RecursiveCheckIfLVMBased(chdisk):
1102 return disk.dev_type == constants.LD_LV
1105 class LUSetClusterParams(LogicalUnit):
1106 """Change the parameters of the cluster.
1109 HPATH = "cluster-modify"
1110 HTYPE = constants.HTYPE_CLUSTER
1114 def ExpandNames(self):
1115 # FIXME: in the future maybe other cluster params won't require checking on
1116 # all nodes to be modified.
1117 self.needed_locks = {
1118 locking.LEVEL_NODE: locking.ALL_SET,
1120 self.share_locks[locking.LEVEL_NODE] = 1
1122 def BuildHooksEnv(self):
1127 "OP_TARGET": self.cfg.GetClusterName(),
1128 "NEW_VG_NAME": self.op.vg_name,
1130 mn = self.cfg.GetMasterNode()
1131 return env, [mn], [mn]
1133 def CheckPrereq(self):
1134 """Check prerequisites.
1136 This checks whether the given params don't conflict and
1137 if the given volume group is valid.
1140 # FIXME: This only works because there is only one parameter that can be
1141 # changed or removed.
1142 if not self.op.vg_name:
1143 instances = self.cfg.GetAllInstancesInfo().values()
1144 for inst in instances:
1145 for disk in inst.disks:
1146 if _RecursiveCheckIfLVMBased(disk):
1147 raise errors.OpPrereqError("Cannot disable lvm storage while"
1148 " lvm-based instances exist")
1150 # if vg_name not None, checks given volume group on all nodes
1152 node_list = self.acquired_locks[locking.LEVEL_NODE]
1153 vglist = self.rpc.call_vg_list(node_list)
1154 for node in node_list:
1155 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1156 constants.MIN_VG_SIZE)
1158 raise errors.OpPrereqError("Error on node '%s': %s" %
1161 def Exec(self, feedback_fn):
1162 """Change the parameters of the cluster.
1165 if self.op.vg_name != self.cfg.GetVGName():
1166 self.cfg.SetVGName(self.op.vg_name)
1168 feedback_fn("Cluster LVM configuration already in desired"
1169 " state, not changing")
1172 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1173 """Sleep and poll for an instance's disk to sync.
1176 if not instance.disks:
1180 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1182 node = instance.primary_node
1184 for dev in instance.disks:
1185 lu.cfg.SetDiskID(dev, node)
1191 cumul_degraded = False
1192 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1194 lu.proc.LogWarning("Can't get any data from node %s" % node)
1197 raise errors.RemoteError("Can't contact node %s for mirror data,"
1198 " aborting." % node)
1202 for i in range(len(rstats)):
1205 lu.proc.LogWarning("Can't compute data for node %s/%s" %
1206 (node, instance.disks[i].iv_name))
1208 # we ignore the ldisk parameter
1209 perc_done, est_time, is_degraded, _ = mstat
1210 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1211 if perc_done is not None:
1213 if est_time is not None:
1214 rem_time = "%d estimated seconds remaining" % est_time
1217 rem_time = "no time estimate"
1218 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1219 (instance.disks[i].iv_name, perc_done, rem_time))
1223 time.sleep(min(60, max_time))
1226 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1227 return not cumul_degraded
1230 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1231 """Check that mirrors are not degraded.
1233 The ldisk parameter, if True, will change the test from the
1234 is_degraded attribute (which represents overall non-ok status for
1235 the device(s)) to the ldisk (representing the local storage status).
1238 lu.cfg.SetDiskID(dev, node)
1245 if on_primary or dev.AssembleOnSecondary():
1246 rstats = lu.rpc.call_blockdev_find(node, dev)
1248 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1251 result = result and (not rstats[idx])
1253 for child in dev.children:
1254 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1259 class LUDiagnoseOS(NoHooksLU):
1260 """Logical unit for OS diagnose/query.
1263 _OP_REQP = ["output_fields", "names"]
1266 def ExpandNames(self):
1268 raise errors.OpPrereqError("Selective OS query not supported")
1270 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1271 _CheckOutputFields(static=[],
1272 dynamic=self.dynamic_fields,
1273 selected=self.op.output_fields)
1275 # Lock all nodes, in shared mode
1276 self.needed_locks = {}
1277 self.share_locks[locking.LEVEL_NODE] = 1
1278 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1280 def CheckPrereq(self):
1281 """Check prerequisites.
1286 def _DiagnoseByOS(node_list, rlist):
1287 """Remaps a per-node return list into an a per-os per-node dictionary
1290 node_list: a list with the names of all nodes
1291 rlist: a map with node names as keys and OS objects as values
1294 map: a map with osnames as keys and as value another map, with
1296 keys and list of OS objects as values
1297 e.g. {"debian-etch": {"node1": [<object>,...],
1298 "node2": [<object>,]}
1303 for node_name, nr in rlist.iteritems():
1307 if os_obj.name not in all_os:
1308 # build a list of nodes for this os containing empty lists
1309 # for each node in node_list
1310 all_os[os_obj.name] = {}
1311 for nname in node_list:
1312 all_os[os_obj.name][nname] = []
1313 all_os[os_obj.name][node_name].append(os_obj)
1316 def Exec(self, feedback_fn):
1317 """Compute the list of OSes.
1320 node_list = self.acquired_locks[locking.LEVEL_NODE]
1321 node_data = self.rpc.call_os_diagnose(node_list)
1322 if node_data == False:
1323 raise errors.OpExecError("Can't gather the list of OSes")
1324 pol = self._DiagnoseByOS(node_list, node_data)
1326 for os_name, os_data in pol.iteritems():
1328 for field in self.op.output_fields:
1331 elif field == "valid":
1332 val = utils.all([osl and osl[0] for osl in os_data.values()])
1333 elif field == "node_status":
1335 for node_name, nos_list in os_data.iteritems():
1336 val[node_name] = [(v.status, v.path) for v in nos_list]
1338 raise errors.ParameterError(field)
1345 class LURemoveNode(LogicalUnit):
1346 """Logical unit for removing a node.
1349 HPATH = "node-remove"
1350 HTYPE = constants.HTYPE_NODE
1351 _OP_REQP = ["node_name"]
1353 def BuildHooksEnv(self):
1356 This doesn't run on the target node in the pre phase as a failed
1357 node would then be impossible to remove.
1361 "OP_TARGET": self.op.node_name,
1362 "NODE_NAME": self.op.node_name,
1364 all_nodes = self.cfg.GetNodeList()
1365 all_nodes.remove(self.op.node_name)
1366 return env, all_nodes, all_nodes
1368 def CheckPrereq(self):
1369 """Check prerequisites.
1372 - the node exists in the configuration
1373 - it does not have primary or secondary instances
1374 - it's not the master
1376 Any errors are signalled by raising errors.OpPrereqError.
1379 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1381 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1383 instance_list = self.cfg.GetInstanceList()
1385 masternode = self.cfg.GetMasterNode()
1386 if node.name == masternode:
1387 raise errors.OpPrereqError("Node is the master node,"
1388 " you need to failover first.")
1390 for instance_name in instance_list:
1391 instance = self.cfg.GetInstanceInfo(instance_name)
1392 if node.name == instance.primary_node:
1393 raise errors.OpPrereqError("Instance %s still running on the node,"
1394 " please remove first." % instance_name)
1395 if node.name in instance.secondary_nodes:
1396 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1397 " please remove first." % instance_name)
1398 self.op.node_name = node.name
1401 def Exec(self, feedback_fn):
1402 """Removes the node from the cluster.
1406 logger.Info("stopping the node daemon and removing configs from node %s" %
1409 self.context.RemoveNode(node.name)
1411 self.rpc.call_node_leave_cluster(node.name)
1414 class LUQueryNodes(NoHooksLU):
1415 """Logical unit for querying nodes.
1418 _OP_REQP = ["output_fields", "names"]
1421 def ExpandNames(self):
1422 self.dynamic_fields = frozenset([
1424 "mtotal", "mnode", "mfree",
1429 self.static_fields = frozenset([
1430 "name", "pinst_cnt", "sinst_cnt",
1431 "pinst_list", "sinst_list",
1432 "pip", "sip", "tags",
1436 _CheckOutputFields(static=self.static_fields,
1437 dynamic=self.dynamic_fields,
1438 selected=self.op.output_fields)
1440 self.needed_locks = {}
1441 self.share_locks[locking.LEVEL_NODE] = 1
1444 self.wanted = _GetWantedNodes(self, self.op.names)
1446 self.wanted = locking.ALL_SET
1448 self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1450 # if we don't request only static fields, we need to lock the nodes
1451 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1454 def CheckPrereq(self):
1455 """Check prerequisites.
1458 # The validation of the node list is done in the _GetWantedNodes,
1459 # if non empty, and if empty, there's no validation to do
1462 def Exec(self, feedback_fn):
1463 """Computes the list of nodes and their attributes.
1466 all_info = self.cfg.GetAllNodesInfo()
1468 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1469 elif self.wanted != locking.ALL_SET:
1470 nodenames = self.wanted
1471 missing = set(nodenames).difference(all_info.keys())
1473 raise errors.OpExecError(
1474 "Some nodes were removed before retrieving their data: %s" % missing)
1476 nodenames = all_info.keys()
1477 nodelist = [all_info[name] for name in nodenames]
1479 # begin data gathering
1481 if self.dynamic_fields.intersection(self.op.output_fields):
1483 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1484 self.cfg.GetHypervisorType())
1485 for name in nodenames:
1486 nodeinfo = node_data.get(name, None)
1489 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1490 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1491 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1492 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1493 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1494 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1495 "bootid": nodeinfo['bootid'],
1498 live_data[name] = {}
1500 live_data = dict.fromkeys(nodenames, {})
1502 node_to_primary = dict([(name, set()) for name in nodenames])
1503 node_to_secondary = dict([(name, set()) for name in nodenames])
1505 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1506 "sinst_cnt", "sinst_list"))
1507 if inst_fields & frozenset(self.op.output_fields):
1508 instancelist = self.cfg.GetInstanceList()
1510 for instance_name in instancelist:
1511 inst = self.cfg.GetInstanceInfo(instance_name)
1512 if inst.primary_node in node_to_primary:
1513 node_to_primary[inst.primary_node].add(inst.name)
1514 for secnode in inst.secondary_nodes:
1515 if secnode in node_to_secondary:
1516 node_to_secondary[secnode].add(inst.name)
1518 # end data gathering
1521 for node in nodelist:
1523 for field in self.op.output_fields:
1526 elif field == "pinst_list":
1527 val = list(node_to_primary[node.name])
1528 elif field == "sinst_list":
1529 val = list(node_to_secondary[node.name])
1530 elif field == "pinst_cnt":
1531 val = len(node_to_primary[node.name])
1532 elif field == "sinst_cnt":
1533 val = len(node_to_secondary[node.name])
1534 elif field == "pip":
1535 val = node.primary_ip
1536 elif field == "sip":
1537 val = node.secondary_ip
1538 elif field == "tags":
1539 val = list(node.GetTags())
1540 elif field == "serial_no":
1541 val = node.serial_no
1542 elif field in self.dynamic_fields:
1543 val = live_data[node.name].get(field, None)
1545 raise errors.ParameterError(field)
1546 node_output.append(val)
1547 output.append(node_output)
1552 class LUQueryNodeVolumes(NoHooksLU):
1553 """Logical unit for getting volumes on node(s).
1556 _OP_REQP = ["nodes", "output_fields"]
1559 def ExpandNames(self):
1560 _CheckOutputFields(static=["node"],
1561 dynamic=["phys", "vg", "name", "size", "instance"],
1562 selected=self.op.output_fields)
1564 self.needed_locks = {}
1565 self.share_locks[locking.LEVEL_NODE] = 1
1566 if not self.op.nodes:
1567 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1569 self.needed_locks[locking.LEVEL_NODE] = \
1570 _GetWantedNodes(self, self.op.nodes)
1572 def CheckPrereq(self):
1573 """Check prerequisites.
1575 This checks that the fields required are valid output fields.
1578 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1580 def Exec(self, feedback_fn):
1581 """Computes the list of nodes and their attributes.
1584 nodenames = self.nodes
1585 volumes = self.rpc.call_node_volumes(nodenames)
1587 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1588 in self.cfg.GetInstanceList()]
1590 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1593 for node in nodenames:
1594 if node not in volumes or not volumes[node]:
1597 node_vols = volumes[node][:]
1598 node_vols.sort(key=lambda vol: vol['dev'])
1600 for vol in node_vols:
1602 for field in self.op.output_fields:
1605 elif field == "phys":
1609 elif field == "name":
1611 elif field == "size":
1612 val = int(float(vol['size']))
1613 elif field == "instance":
1615 if node not in lv_by_node[inst]:
1617 if vol['name'] in lv_by_node[inst][node]:
1623 raise errors.ParameterError(field)
1624 node_output.append(str(val))
1626 output.append(node_output)
1631 class LUAddNode(LogicalUnit):
1632 """Logical unit for adding node to the cluster.
1636 HTYPE = constants.HTYPE_NODE
1637 _OP_REQP = ["node_name"]
1639 def BuildHooksEnv(self):
1642 This will run on all nodes before, and on all nodes + the new node after.
1646 "OP_TARGET": self.op.node_name,
1647 "NODE_NAME": self.op.node_name,
1648 "NODE_PIP": self.op.primary_ip,
1649 "NODE_SIP": self.op.secondary_ip,
1651 nodes_0 = self.cfg.GetNodeList()
1652 nodes_1 = nodes_0 + [self.op.node_name, ]
1653 return env, nodes_0, nodes_1
1655 def CheckPrereq(self):
1656 """Check prerequisites.
1659 - the new node is not already in the config
1661 - its parameters (single/dual homed) matches the cluster
1663 Any errors are signalled by raising errors.OpPrereqError.
1666 node_name = self.op.node_name
1669 dns_data = utils.HostInfo(node_name)
1671 node = dns_data.name
1672 primary_ip = self.op.primary_ip = dns_data.ip
1673 secondary_ip = getattr(self.op, "secondary_ip", None)
1674 if secondary_ip is None:
1675 secondary_ip = primary_ip
1676 if not utils.IsValidIP(secondary_ip):
1677 raise errors.OpPrereqError("Invalid secondary IP given")
1678 self.op.secondary_ip = secondary_ip
1680 node_list = cfg.GetNodeList()
1681 if not self.op.readd and node in node_list:
1682 raise errors.OpPrereqError("Node %s is already in the configuration" %
1684 elif self.op.readd and node not in node_list:
1685 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1687 for existing_node_name in node_list:
1688 existing_node = cfg.GetNodeInfo(existing_node_name)
1690 if self.op.readd and node == existing_node_name:
1691 if (existing_node.primary_ip != primary_ip or
1692 existing_node.secondary_ip != secondary_ip):
1693 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1694 " address configuration as before")
1697 if (existing_node.primary_ip == primary_ip or
1698 existing_node.secondary_ip == primary_ip or
1699 existing_node.primary_ip == secondary_ip or
1700 existing_node.secondary_ip == secondary_ip):
1701 raise errors.OpPrereqError("New node ip address(es) conflict with"
1702 " existing node %s" % existing_node.name)
1704 # check that the type of the node (single versus dual homed) is the
1705 # same as for the master
1706 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1707 master_singlehomed = myself.secondary_ip == myself.primary_ip
1708 newbie_singlehomed = secondary_ip == primary_ip
1709 if master_singlehomed != newbie_singlehomed:
1710 if master_singlehomed:
1711 raise errors.OpPrereqError("The master has no private ip but the"
1712 " new node has one")
1714 raise errors.OpPrereqError("The master has a private ip but the"
1715 " new node doesn't have one")
1717 # checks reachablity
1718 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1719 raise errors.OpPrereqError("Node not reachable by ping")
1721 if not newbie_singlehomed:
1722 # check reachability from my secondary ip to newbie's secondary ip
1723 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1724 source=myself.secondary_ip):
1725 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1726 " based ping to noded port")
1728 self.new_node = objects.Node(name=node,
1729 primary_ip=primary_ip,
1730 secondary_ip=secondary_ip)
1732 def Exec(self, feedback_fn):
1733 """Adds the new node to the cluster.
1736 new_node = self.new_node
1737 node = new_node.name
1739 # check connectivity
1740 result = self.rpc.call_version([node])[node]
1742 if constants.PROTOCOL_VERSION == result:
1743 logger.Info("communication to node %s fine, sw version %s match" %
1746 raise errors.OpExecError("Version mismatch master version %s,"
1747 " node version %s" %
1748 (constants.PROTOCOL_VERSION, result))
1750 raise errors.OpExecError("Cannot get version from the new node")
1753 logger.Info("copy ssh key to node %s" % node)
1754 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1756 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1757 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1763 keyarray.append(f.read())
1767 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1769 keyarray[3], keyarray[4], keyarray[5])
1772 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1774 # Add node to our /etc/hosts, and add key to known_hosts
1775 utils.AddHostToEtcHosts(new_node.name)
1777 if new_node.secondary_ip != new_node.primary_ip:
1778 if not self.rpc.call_node_has_ip_address(new_node.name,
1779 new_node.secondary_ip):
1780 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1781 " you gave (%s). Please fix and re-run this"
1782 " command." % new_node.secondary_ip)
1784 node_verify_list = [self.cfg.GetMasterNode()]
1785 node_verify_param = {
1787 # TODO: do a node-net-test as well?
1790 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1791 self.cfg.GetClusterName())
1792 for verifier in node_verify_list:
1793 if not result[verifier]:
1794 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1795 " for remote verification" % verifier)
1796 if result[verifier]['nodelist']:
1797 for failed in result[verifier]['nodelist']:
1798 feedback_fn("ssh/hostname verification failed %s -> %s" %
1799 (verifier, result[verifier]['nodelist'][failed]))
1800 raise errors.OpExecError("ssh/hostname verification failed.")
1802 # Distribute updated /etc/hosts and known_hosts to all nodes,
1803 # including the node just added
1804 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1805 dist_nodes = self.cfg.GetNodeList()
1806 if not self.op.readd:
1807 dist_nodes.append(node)
1808 if myself.name in dist_nodes:
1809 dist_nodes.remove(myself.name)
1811 logger.Debug("Copying hosts and known_hosts to all nodes")
1812 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1813 result = self.rpc.call_upload_file(dist_nodes, fname)
1814 for to_node in dist_nodes:
1815 if not result[to_node]:
1816 logger.Error("copy of file %s to node %s failed" %
1820 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1821 to_copy.append(constants.VNC_PASSWORD_FILE)
1822 for fname in to_copy:
1823 result = self.rpc.call_upload_file([node], fname)
1824 if not result[node]:
1825 logger.Error("could not copy file %s to node %s" % (fname, node))
1828 self.context.ReaddNode(new_node)
1830 self.context.AddNode(new_node)
1833 class LUQueryClusterInfo(NoHooksLU):
1834 """Query cluster configuration.
1841 def ExpandNames(self):
1842 self.needed_locks = {}
1844 def CheckPrereq(self):
1845 """No prerequsites needed for this LU.
1850 def Exec(self, feedback_fn):
1851 """Return cluster config.
1855 "name": self.cfg.GetClusterName(),
1856 "software_version": constants.RELEASE_VERSION,
1857 "protocol_version": constants.PROTOCOL_VERSION,
1858 "config_version": constants.CONFIG_VERSION,
1859 "os_api_version": constants.OS_API_VERSION,
1860 "export_version": constants.EXPORT_VERSION,
1861 "master": self.cfg.GetMasterNode(),
1862 "architecture": (platform.architecture()[0], platform.machine()),
1863 "hypervisor_type": self.cfg.GetHypervisorType(),
1864 "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1870 class LUQueryConfigValues(NoHooksLU):
1871 """Return configuration values.
1877 def ExpandNames(self):
1878 self.needed_locks = {}
1880 static_fields = ["cluster_name", "master_node"]
1881 _CheckOutputFields(static=static_fields,
1883 selected=self.op.output_fields)
1885 def CheckPrereq(self):
1886 """No prerequisites.
1891 def Exec(self, feedback_fn):
1892 """Dump a representation of the cluster config to the standard output.
1896 for field in self.op.output_fields:
1897 if field == "cluster_name":
1898 values.append(self.cfg.GetClusterName())
1899 elif field == "master_node":
1900 values.append(self.cfg.GetMasterNode())
1902 raise errors.ParameterError(field)
1906 class LUActivateInstanceDisks(NoHooksLU):
1907 """Bring up an instance's disks.
1910 _OP_REQP = ["instance_name"]
1913 def ExpandNames(self):
1914 self._ExpandAndLockInstance()
1915 self.needed_locks[locking.LEVEL_NODE] = []
1916 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1918 def DeclareLocks(self, level):
1919 if level == locking.LEVEL_NODE:
1920 self._LockInstancesNodes()
1922 def CheckPrereq(self):
1923 """Check prerequisites.
1925 This checks that the instance is in the cluster.
1928 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1929 assert self.instance is not None, \
1930 "Cannot retrieve locked instance %s" % self.op.instance_name
1932 def Exec(self, feedback_fn):
1933 """Activate the disks.
1936 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1938 raise errors.OpExecError("Cannot activate block devices")
1943 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
1944 """Prepare the block devices for an instance.
1946 This sets up the block devices on all nodes.
1949 instance: a ganeti.objects.Instance object
1950 ignore_secondaries: if true, errors on secondary nodes won't result
1951 in an error return from the function
1954 false if the operation failed
1955 list of (host, instance_visible_name, node_visible_name) if the operation
1956 suceeded with the mapping from node devices to instance devices
1960 iname = instance.name
1961 # With the two passes mechanism we try to reduce the window of
1962 # opportunity for the race condition of switching DRBD to primary
1963 # before handshaking occured, but we do not eliminate it
1965 # The proper fix would be to wait (with some limits) until the
1966 # connection has been made and drbd transitions from WFConnection
1967 # into any other network-connected state (Connected, SyncTarget,
1970 # 1st pass, assemble on all nodes in secondary mode
1971 for inst_disk in instance.disks:
1972 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1973 lu.cfg.SetDiskID(node_disk, node)
1974 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
1976 logger.Error("could not prepare block device %s on node %s"
1977 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1978 if not ignore_secondaries:
1981 # FIXME: race condition on drbd migration to primary
1983 # 2nd pass, do only the primary node
1984 for inst_disk in instance.disks:
1985 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1986 if node != instance.primary_node:
1988 lu.cfg.SetDiskID(node_disk, node)
1989 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
1991 logger.Error("could not prepare block device %s on node %s"
1992 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1994 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1996 # leave the disks configured for the primary node
1997 # this is a workaround that would be fixed better by
1998 # improving the logical/physical id handling
1999 for disk in instance.disks:
2000 lu.cfg.SetDiskID(disk, instance.primary_node)
2002 return disks_ok, device_info
2005 def _StartInstanceDisks(lu, instance, force):
2006 """Start the disks of an instance.
2009 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2010 ignore_secondaries=force)
2012 _ShutdownInstanceDisks(lu, instance)
2013 if force is not None and not force:
2014 logger.Error("If the message above refers to a secondary node,"
2015 " you can retry the operation using '--force'.")
2016 raise errors.OpExecError("Disk consistency error")
2019 class LUDeactivateInstanceDisks(NoHooksLU):
2020 """Shutdown an instance's disks.
2023 _OP_REQP = ["instance_name"]
2026 def ExpandNames(self):
2027 self._ExpandAndLockInstance()
2028 self.needed_locks[locking.LEVEL_NODE] = []
2029 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2031 def DeclareLocks(self, level):
2032 if level == locking.LEVEL_NODE:
2033 self._LockInstancesNodes()
2035 def CheckPrereq(self):
2036 """Check prerequisites.
2038 This checks that the instance is in the cluster.
2041 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2042 assert self.instance is not None, \
2043 "Cannot retrieve locked instance %s" % self.op.instance_name
2045 def Exec(self, feedback_fn):
2046 """Deactivate the disks
2049 instance = self.instance
2050 _SafeShutdownInstanceDisks(self, instance)
2053 def _SafeShutdownInstanceDisks(lu, instance):
2054 """Shutdown block devices of an instance.
2056 This function checks if an instance is running, before calling
2057 _ShutdownInstanceDisks.
2060 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2061 [instance.hypervisor])
2062 ins_l = ins_l[instance.primary_node]
2063 if not type(ins_l) is list:
2064 raise errors.OpExecError("Can't contact node '%s'" %
2065 instance.primary_node)
2067 if instance.name in ins_l:
2068 raise errors.OpExecError("Instance is running, can't shutdown"
2071 _ShutdownInstanceDisks(lu, instance)
2074 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2075 """Shutdown block devices of an instance.
2077 This does the shutdown on all nodes of the instance.
2079 If the ignore_primary is false, errors on the primary node are
2084 for disk in instance.disks:
2085 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2086 lu.cfg.SetDiskID(top_disk, node)
2087 if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2088 logger.Error("could not shutdown block device %s on node %s" %
2089 (disk.iv_name, node))
2090 if not ignore_primary or node != instance.primary_node:
2095 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2096 """Checks if a node has enough free memory.
2098 This function check if a given node has the needed amount of free
2099 memory. In case the node has less memory or we cannot get the
2100 information from the node, this function raise an OpPrereqError
2103 @type lu: C{LogicalUnit}
2104 @param lu: a logical unit from which we get configuration data
2106 @param node: the node to check
2107 @type reason: C{str}
2108 @param reason: string to use in the error message
2109 @type requested: C{int}
2110 @param requested: the amount of memory in MiB to check for
2111 @type hypervisor: C{str}
2112 @param hypervisor: the hypervisor to ask for memory stats
2113 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2114 we cannot check the node
2117 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2118 if not nodeinfo or not isinstance(nodeinfo, dict):
2119 raise errors.OpPrereqError("Could not contact node %s for resource"
2120 " information" % (node,))
2122 free_mem = nodeinfo[node].get('memory_free')
2123 if not isinstance(free_mem, int):
2124 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2125 " was '%s'" % (node, free_mem))
2126 if requested > free_mem:
2127 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2128 " needed %s MiB, available %s MiB" %
2129 (node, reason, requested, free_mem))
2132 class LUStartupInstance(LogicalUnit):
2133 """Starts an instance.
2136 HPATH = "instance-start"
2137 HTYPE = constants.HTYPE_INSTANCE
2138 _OP_REQP = ["instance_name", "force"]
2141 def ExpandNames(self):
2142 self._ExpandAndLockInstance()
2143 self.needed_locks[locking.LEVEL_NODE] = []
2144 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2146 def DeclareLocks(self, level):
2147 if level == locking.LEVEL_NODE:
2148 self._LockInstancesNodes()
2150 def BuildHooksEnv(self):
2153 This runs on master, primary and secondary nodes of the instance.
2157 "FORCE": self.op.force,
2159 env.update(_BuildInstanceHookEnvByObject(self.instance))
2160 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2161 list(self.instance.secondary_nodes))
2164 def CheckPrereq(self):
2165 """Check prerequisites.
2167 This checks that the instance is in the cluster.
2170 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2171 assert self.instance is not None, \
2172 "Cannot retrieve locked instance %s" % self.op.instance_name
2174 # check bridges existance
2175 _CheckInstanceBridgesExist(self, instance)
2177 _CheckNodeFreeMemory(self, instance.primary_node,
2178 "starting instance %s" % instance.name,
2179 instance.memory, instance.hypervisor)
2181 def Exec(self, feedback_fn):
2182 """Start the instance.
2185 instance = self.instance
2186 force = self.op.force
2187 extra_args = getattr(self.op, "extra_args", "")
2189 self.cfg.MarkInstanceUp(instance.name)
2191 node_current = instance.primary_node
2193 _StartInstanceDisks(self, instance, force)
2195 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2196 _ShutdownInstanceDisks(self, instance)
2197 raise errors.OpExecError("Could not start instance")
2200 class LURebootInstance(LogicalUnit):
2201 """Reboot an instance.
2204 HPATH = "instance-reboot"
2205 HTYPE = constants.HTYPE_INSTANCE
2206 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2209 def ExpandNames(self):
2210 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2211 constants.INSTANCE_REBOOT_HARD,
2212 constants.INSTANCE_REBOOT_FULL]:
2213 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2214 (constants.INSTANCE_REBOOT_SOFT,
2215 constants.INSTANCE_REBOOT_HARD,
2216 constants.INSTANCE_REBOOT_FULL))
2217 self._ExpandAndLockInstance()
2218 self.needed_locks[locking.LEVEL_NODE] = []
2219 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2221 def DeclareLocks(self, level):
2222 if level == locking.LEVEL_NODE:
2223 primary_only = not constants.INSTANCE_REBOOT_FULL
2224 self._LockInstancesNodes(primary_only=primary_only)
2226 def BuildHooksEnv(self):
2229 This runs on master, primary and secondary nodes of the instance.
2233 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2235 env.update(_BuildInstanceHookEnvByObject(self.instance))
2236 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2237 list(self.instance.secondary_nodes))
2240 def CheckPrereq(self):
2241 """Check prerequisites.
2243 This checks that the instance is in the cluster.
2246 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2247 assert self.instance is not None, \
2248 "Cannot retrieve locked instance %s" % self.op.instance_name
2250 # check bridges existance
2251 _CheckInstanceBridgesExist(self, instance)
2253 def Exec(self, feedback_fn):
2254 """Reboot the instance.
2257 instance = self.instance
2258 ignore_secondaries = self.op.ignore_secondaries
2259 reboot_type = self.op.reboot_type
2260 extra_args = getattr(self.op, "extra_args", "")
2262 node_current = instance.primary_node
2264 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2265 constants.INSTANCE_REBOOT_HARD]:
2266 if not self.rpc.call_instance_reboot(node_current, instance,
2267 reboot_type, extra_args):
2268 raise errors.OpExecError("Could not reboot instance")
2270 if not self.rpc.call_instance_shutdown(node_current, instance):
2271 raise errors.OpExecError("could not shutdown instance for full reboot")
2272 _ShutdownInstanceDisks(self, instance)
2273 _StartInstanceDisks(self, instance, ignore_secondaries)
2274 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2275 _ShutdownInstanceDisks(self, instance)
2276 raise errors.OpExecError("Could not start instance for full reboot")
2278 self.cfg.MarkInstanceUp(instance.name)
2281 class LUShutdownInstance(LogicalUnit):
2282 """Shutdown an instance.
2285 HPATH = "instance-stop"
2286 HTYPE = constants.HTYPE_INSTANCE
2287 _OP_REQP = ["instance_name"]
2290 def ExpandNames(self):
2291 self._ExpandAndLockInstance()
2292 self.needed_locks[locking.LEVEL_NODE] = []
2293 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2295 def DeclareLocks(self, level):
2296 if level == locking.LEVEL_NODE:
2297 self._LockInstancesNodes()
2299 def BuildHooksEnv(self):
2302 This runs on master, primary and secondary nodes of the instance.
2305 env = _BuildInstanceHookEnvByObject(self.instance)
2306 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2307 list(self.instance.secondary_nodes))
2310 def CheckPrereq(self):
2311 """Check prerequisites.
2313 This checks that the instance is in the cluster.
2316 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2317 assert self.instance is not None, \
2318 "Cannot retrieve locked instance %s" % self.op.instance_name
2320 def Exec(self, feedback_fn):
2321 """Shutdown the instance.
2324 instance = self.instance
2325 node_current = instance.primary_node
2326 self.cfg.MarkInstanceDown(instance.name)
2327 if not self.rpc.call_instance_shutdown(node_current, instance):
2328 logger.Error("could not shutdown instance")
2330 _ShutdownInstanceDisks(self, instance)
2333 class LUReinstallInstance(LogicalUnit):
2334 """Reinstall an instance.
2337 HPATH = "instance-reinstall"
2338 HTYPE = constants.HTYPE_INSTANCE
2339 _OP_REQP = ["instance_name"]
2342 def ExpandNames(self):
2343 self._ExpandAndLockInstance()
2344 self.needed_locks[locking.LEVEL_NODE] = []
2345 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2347 def DeclareLocks(self, level):
2348 if level == locking.LEVEL_NODE:
2349 self._LockInstancesNodes()
2351 def BuildHooksEnv(self):
2354 This runs on master, primary and secondary nodes of the instance.
2357 env = _BuildInstanceHookEnvByObject(self.instance)
2358 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2359 list(self.instance.secondary_nodes))
2362 def CheckPrereq(self):
2363 """Check prerequisites.
2365 This checks that the instance is in the cluster and is not running.
2368 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2369 assert instance is not None, \
2370 "Cannot retrieve locked instance %s" % self.op.instance_name
2372 if instance.disk_template == constants.DT_DISKLESS:
2373 raise errors.OpPrereqError("Instance '%s' has no disks" %
2374 self.op.instance_name)
2375 if instance.status != "down":
2376 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2377 self.op.instance_name)
2378 remote_info = self.rpc.call_instance_info(instance.primary_node,
2380 instance.hypervisor)
2382 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2383 (self.op.instance_name,
2384 instance.primary_node))
2386 self.op.os_type = getattr(self.op, "os_type", None)
2387 if self.op.os_type is not None:
2389 pnode = self.cfg.GetNodeInfo(
2390 self.cfg.ExpandNodeName(instance.primary_node))
2392 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2394 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2396 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2397 " primary node" % self.op.os_type)
2399 self.instance = instance
2401 def Exec(self, feedback_fn):
2402 """Reinstall the instance.
2405 inst = self.instance
2407 if self.op.os_type is not None:
2408 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2409 inst.os = self.op.os_type
2410 self.cfg.Update(inst)
2412 _StartInstanceDisks(self, inst, None)
2414 feedback_fn("Running the instance OS create scripts...")
2415 if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2417 raise errors.OpExecError("Could not install OS for instance %s"
2419 (inst.name, inst.primary_node))
2421 _ShutdownInstanceDisks(self, inst)
2424 class LURenameInstance(LogicalUnit):
2425 """Rename an instance.
2428 HPATH = "instance-rename"
2429 HTYPE = constants.HTYPE_INSTANCE
2430 _OP_REQP = ["instance_name", "new_name"]
2432 def BuildHooksEnv(self):
2435 This runs on master, primary and secondary nodes of the instance.
2438 env = _BuildInstanceHookEnvByObject(self.instance)
2439 env["INSTANCE_NEW_NAME"] = self.op.new_name
2440 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2441 list(self.instance.secondary_nodes))
2444 def CheckPrereq(self):
2445 """Check prerequisites.
2447 This checks that the instance is in the cluster and is not running.
2450 instance = self.cfg.GetInstanceInfo(
2451 self.cfg.ExpandInstanceName(self.op.instance_name))
2452 if instance is None:
2453 raise errors.OpPrereqError("Instance '%s' not known" %
2454 self.op.instance_name)
2455 if instance.status != "down":
2456 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2457 self.op.instance_name)
2458 remote_info = self.rpc.call_instance_info(instance.primary_node,
2460 instance.hypervisor)
2462 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2463 (self.op.instance_name,
2464 instance.primary_node))
2465 self.instance = instance
2467 # new name verification
2468 name_info = utils.HostInfo(self.op.new_name)
2470 self.op.new_name = new_name = name_info.name
2471 instance_list = self.cfg.GetInstanceList()
2472 if new_name in instance_list:
2473 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2476 if not getattr(self.op, "ignore_ip", False):
2477 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2478 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2479 (name_info.ip, new_name))
2482 def Exec(self, feedback_fn):
2483 """Reinstall the instance.
2486 inst = self.instance
2487 old_name = inst.name
2489 if inst.disk_template == constants.DT_FILE:
2490 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2492 self.cfg.RenameInstance(inst.name, self.op.new_name)
2493 # Change the instance lock. This is definitely safe while we hold the BGL
2494 self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2495 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2497 # re-read the instance from the configuration after rename
2498 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2500 if inst.disk_template == constants.DT_FILE:
2501 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2502 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2503 old_file_storage_dir,
2504 new_file_storage_dir)
2507 raise errors.OpExecError("Could not connect to node '%s' to rename"
2508 " directory '%s' to '%s' (but the instance"
2509 " has been renamed in Ganeti)" % (
2510 inst.primary_node, old_file_storage_dir,
2511 new_file_storage_dir))
2514 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2515 " (but the instance has been renamed in"
2516 " Ganeti)" % (old_file_storage_dir,
2517 new_file_storage_dir))
2519 _StartInstanceDisks(self, inst, None)
2521 if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2524 msg = ("Could not run OS rename script for instance %s on node %s"
2525 " (but the instance has been renamed in Ganeti)" %
2526 (inst.name, inst.primary_node))
2529 _ShutdownInstanceDisks(self, inst)
2532 class LURemoveInstance(LogicalUnit):
2533 """Remove an instance.
2536 HPATH = "instance-remove"
2537 HTYPE = constants.HTYPE_INSTANCE
2538 _OP_REQP = ["instance_name", "ignore_failures"]
2541 def ExpandNames(self):
2542 self._ExpandAndLockInstance()
2543 self.needed_locks[locking.LEVEL_NODE] = []
2544 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2546 def DeclareLocks(self, level):
2547 if level == locking.LEVEL_NODE:
2548 self._LockInstancesNodes()
2550 def BuildHooksEnv(self):
2553 This runs on master, primary and secondary nodes of the instance.
2556 env = _BuildInstanceHookEnvByObject(self.instance)
2557 nl = [self.cfg.GetMasterNode()]
2560 def CheckPrereq(self):
2561 """Check prerequisites.
2563 This checks that the instance is in the cluster.
2566 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2567 assert self.instance is not None, \
2568 "Cannot retrieve locked instance %s" % self.op.instance_name
2570 def Exec(self, feedback_fn):
2571 """Remove the instance.
2574 instance = self.instance
2575 logger.Info("shutting down instance %s on node %s" %
2576 (instance.name, instance.primary_node))
2578 if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2579 if self.op.ignore_failures:
2580 feedback_fn("Warning: can't shutdown instance")
2582 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2583 (instance.name, instance.primary_node))
2585 logger.Info("removing block devices for instance %s" % instance.name)
2587 if not _RemoveDisks(self, instance):
2588 if self.op.ignore_failures:
2589 feedback_fn("Warning: can't remove instance's disks")
2591 raise errors.OpExecError("Can't remove instance's disks")
2593 logger.Info("removing instance %s out of cluster config" % instance.name)
2595 self.cfg.RemoveInstance(instance.name)
2596 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2599 class LUQueryInstances(NoHooksLU):
2600 """Logical unit for querying instances.
2603 _OP_REQP = ["output_fields", "names"]
2606 def ExpandNames(self):
2607 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2608 self.static_fields = frozenset([
2609 "name", "os", "pnode", "snodes",
2610 "admin_state", "admin_ram",
2611 "disk_template", "ip", "mac", "bridge",
2612 "sda_size", "sdb_size", "vcpus", "tags",
2614 "serial_no", "hypervisor", "hvparams",
2615 ] + ["hv/%s" % name for name in constants.HVS_PARAMETERS])
2616 _CheckOutputFields(static=self.static_fields,
2617 dynamic=self.dynamic_fields,
2618 selected=self.op.output_fields)
2620 self.needed_locks = {}
2621 self.share_locks[locking.LEVEL_INSTANCE] = 1
2622 self.share_locks[locking.LEVEL_NODE] = 1
2625 self.wanted = _GetWantedInstances(self, self.op.names)
2627 self.wanted = locking.ALL_SET
2629 self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2631 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2632 self.needed_locks[locking.LEVEL_NODE] = []
2633 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2635 def DeclareLocks(self, level):
2636 if level == locking.LEVEL_NODE and self.do_locking:
2637 self._LockInstancesNodes()
2639 def CheckPrereq(self):
2640 """Check prerequisites.
2645 def Exec(self, feedback_fn):
2646 """Computes the list of nodes and their attributes.
2649 all_info = self.cfg.GetAllInstancesInfo()
2651 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2652 elif self.wanted != locking.ALL_SET:
2653 instance_names = self.wanted
2654 missing = set(instance_names).difference(all_info.keys())
2656 raise errors.OpExecError(
2657 "Some instances were removed before retrieving their data: %s"
2660 instance_names = all_info.keys()
2661 instance_list = [all_info[iname] for iname in instance_names]
2663 # begin data gathering
2665 nodes = frozenset([inst.primary_node for inst in instance_list])
2666 hv_list = list(set([inst.hypervisor for inst in instance_list]))
2669 if self.dynamic_fields.intersection(self.op.output_fields):
2671 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2673 result = node_data[name]
2675 live_data.update(result)
2676 elif result == False:
2677 bad_nodes.append(name)
2678 # else no instance is alive
2680 live_data = dict([(name, {}) for name in instance_names])
2682 # end data gathering
2686 for instance in instance_list:
2688 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2689 for field in self.op.output_fields:
2694 elif field == "pnode":
2695 val = instance.primary_node
2696 elif field == "snodes":
2697 val = list(instance.secondary_nodes)
2698 elif field == "admin_state":
2699 val = (instance.status != "down")
2700 elif field == "oper_state":
2701 if instance.primary_node in bad_nodes:
2704 val = bool(live_data.get(instance.name))
2705 elif field == "status":
2706 if instance.primary_node in bad_nodes:
2707 val = "ERROR_nodedown"
2709 running = bool(live_data.get(instance.name))
2711 if instance.status != "down":
2716 if instance.status != "down":
2720 elif field == "admin_ram":
2721 val = instance.memory
2722 elif field == "oper_ram":
2723 if instance.primary_node in bad_nodes:
2725 elif instance.name in live_data:
2726 val = live_data[instance.name].get("memory", "?")
2729 elif field == "disk_template":
2730 val = instance.disk_template
2732 val = instance.nics[0].ip
2733 elif field == "bridge":
2734 val = instance.nics[0].bridge
2735 elif field == "mac":
2736 val = instance.nics[0].mac
2737 elif field == "sda_size" or field == "sdb_size":
2738 disk = instance.FindDisk(field[:3])
2743 elif field == "vcpus":
2744 val = instance.vcpus
2745 elif field == "tags":
2746 val = list(instance.GetTags())
2747 elif field == "serial_no":
2748 val = instance.serial_no
2749 elif field == "network_port":
2750 val = instance.network_port
2751 elif (field.startswith(HVPREFIX) and
2752 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2753 val = i_hv.get(field[len(HVPREFIX):], None)
2754 elif field == "hvparams":
2756 elif field == "hypervisor":
2757 val = instance.hypervisor
2759 raise errors.ParameterError(field)
2766 class LUFailoverInstance(LogicalUnit):
2767 """Failover an instance.
2770 HPATH = "instance-failover"
2771 HTYPE = constants.HTYPE_INSTANCE
2772 _OP_REQP = ["instance_name", "ignore_consistency"]
2775 def ExpandNames(self):
2776 self._ExpandAndLockInstance()
2777 self.needed_locks[locking.LEVEL_NODE] = []
2778 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2780 def DeclareLocks(self, level):
2781 if level == locking.LEVEL_NODE:
2782 self._LockInstancesNodes()
2784 def BuildHooksEnv(self):
2787 This runs on master, primary and secondary nodes of the instance.
2791 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2793 env.update(_BuildInstanceHookEnvByObject(self.instance))
2794 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2797 def CheckPrereq(self):
2798 """Check prerequisites.
2800 This checks that the instance is in the cluster.
2803 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2804 assert self.instance is not None, \
2805 "Cannot retrieve locked instance %s" % self.op.instance_name
2807 if instance.disk_template not in constants.DTS_NET_MIRROR:
2808 raise errors.OpPrereqError("Instance's disk layout is not"
2809 " network mirrored, cannot failover.")
2811 secondary_nodes = instance.secondary_nodes
2812 if not secondary_nodes:
2813 raise errors.ProgrammerError("no secondary node but using "
2814 "a mirrored disk template")
2816 target_node = secondary_nodes[0]
2817 # check memory requirements on the secondary node
2818 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2819 instance.name, instance.memory,
2820 instance.hypervisor)
2822 # check bridge existance
2823 brlist = [nic.bridge for nic in instance.nics]
2824 if not self.rpc.call_bridges_exist(target_node, brlist):
2825 raise errors.OpPrereqError("One or more target bridges %s does not"
2826 " exist on destination node '%s'" %
2827 (brlist, target_node))
2829 def Exec(self, feedback_fn):
2830 """Failover an instance.
2832 The failover is done by shutting it down on its present node and
2833 starting it on the secondary.
2836 instance = self.instance
2838 source_node = instance.primary_node
2839 target_node = instance.secondary_nodes[0]
2841 feedback_fn("* checking disk consistency between source and target")
2842 for dev in instance.disks:
2843 # for drbd, these are drbd over lvm
2844 if not _CheckDiskConsistency(self, dev, target_node, False):
2845 if instance.status == "up" and not self.op.ignore_consistency:
2846 raise errors.OpExecError("Disk %s is degraded on target node,"
2847 " aborting failover." % dev.iv_name)
2849 feedback_fn("* shutting down instance on source node")
2850 logger.Info("Shutting down instance %s on node %s" %
2851 (instance.name, source_node))
2853 if not self.rpc.call_instance_shutdown(source_node, instance):
2854 if self.op.ignore_consistency:
2855 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2856 " anyway. Please make sure node %s is down" %
2857 (instance.name, source_node, source_node))
2859 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2860 (instance.name, source_node))
2862 feedback_fn("* deactivating the instance's disks on source node")
2863 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2864 raise errors.OpExecError("Can't shut down the instance's disks.")
2866 instance.primary_node = target_node
2867 # distribute new instance config to the other nodes
2868 self.cfg.Update(instance)
2870 # Only start the instance if it's marked as up
2871 if instance.status == "up":
2872 feedback_fn("* activating the instance's disks on target node")
2873 logger.Info("Starting instance %s on node %s" %
2874 (instance.name, target_node))
2876 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2877 ignore_secondaries=True)
2879 _ShutdownInstanceDisks(self, instance)
2880 raise errors.OpExecError("Can't activate the instance's disks")
2882 feedback_fn("* starting the instance on the target node")
2883 if not self.rpc.call_instance_start(target_node, instance, None):
2884 _ShutdownInstanceDisks(self, instance)
2885 raise errors.OpExecError("Could not start instance %s on node %s." %
2886 (instance.name, target_node))
2889 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2890 """Create a tree of block devices on the primary node.
2892 This always creates all devices.
2896 for child in device.children:
2897 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2900 lu.cfg.SetDiskID(device, node)
2901 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2902 instance.name, True, info)
2905 if device.physical_id is None:
2906 device.physical_id = new_id
2910 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2911 """Create a tree of block devices on a secondary node.
2913 If this device type has to be created on secondaries, create it and
2916 If not, just recurse to children keeping the same 'force' value.
2919 if device.CreateOnSecondary():
2922 for child in device.children:
2923 if not _CreateBlockDevOnSecondary(lu, node, instance,
2924 child, force, info):
2929 lu.cfg.SetDiskID(device, node)
2930 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2931 instance.name, False, info)
2934 if device.physical_id is None:
2935 device.physical_id = new_id
2939 def _GenerateUniqueNames(lu, exts):
2940 """Generate a suitable LV name.
2942 This will generate a logical volume name for the given instance.
2947 new_id = lu.cfg.GenerateUniqueID()
2948 results.append("%s%s" % (new_id, val))
2952 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
2954 """Generate a drbd8 device complete with its children.
2957 port = lu.cfg.AllocatePort()
2958 vgname = lu.cfg.GetVGName()
2959 shared_secret = lu.cfg.GenerateDRBDSecret()
2960 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2961 logical_id=(vgname, names[0]))
2962 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2963 logical_id=(vgname, names[1]))
2964 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2965 logical_id=(primary, secondary, port,
2968 children=[dev_data, dev_meta],
2973 def _GenerateDiskTemplate(lu, template_name,
2974 instance_name, primary_node,
2975 secondary_nodes, disk_sz, swap_sz,
2976 file_storage_dir, file_driver):
2977 """Generate the entire disk layout for a given template type.
2980 #TODO: compute space requirements
2982 vgname = lu.cfg.GetVGName()
2983 if template_name == constants.DT_DISKLESS:
2985 elif template_name == constants.DT_PLAIN:
2986 if len(secondary_nodes) != 0:
2987 raise errors.ProgrammerError("Wrong template configuration")
2989 names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
2990 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2991 logical_id=(vgname, names[0]),
2993 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2994 logical_id=(vgname, names[1]),
2996 disks = [sda_dev, sdb_dev]
2997 elif template_name == constants.DT_DRBD8:
2998 if len(secondary_nodes) != 1:
2999 raise errors.ProgrammerError("Wrong template configuration")
3000 remote_node = secondary_nodes[0]
3001 (minor_pa, minor_pb,
3002 minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3003 [primary_node, primary_node, remote_node, remote_node], instance_name)
3005 names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3006 ".sdb_data", ".sdb_meta"])
3007 drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3008 disk_sz, names[0:2], "sda",
3010 drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3011 swap_sz, names[2:4], "sdb",
3013 disks = [drbd_sda_dev, drbd_sdb_dev]
3014 elif template_name == constants.DT_FILE:
3015 if len(secondary_nodes) != 0:
3016 raise errors.ProgrammerError("Wrong template configuration")
3018 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3019 iv_name="sda", logical_id=(file_driver,
3020 "%s/sda" % file_storage_dir))
3021 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3022 iv_name="sdb", logical_id=(file_driver,
3023 "%s/sdb" % file_storage_dir))
3024 disks = [file_sda_dev, file_sdb_dev]
3026 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3030 def _GetInstanceInfoText(instance):
3031 """Compute that text that should be added to the disk's metadata.
3034 return "originstname+%s" % instance.name
3037 def _CreateDisks(lu, instance):
3038 """Create all disks for an instance.
3040 This abstracts away some work from AddInstance.
3043 instance: the instance object
3046 True or False showing the success of the creation process
3049 info = _GetInstanceInfoText(instance)
3051 if instance.disk_template == constants.DT_FILE:
3052 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3053 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3057 logger.Error("Could not connect to node '%s'" % instance.primary_node)
3061 logger.Error("failed to create directory '%s'" % file_storage_dir)
3064 for device in instance.disks:
3065 logger.Info("creating volume %s for instance %s" %
3066 (device.iv_name, instance.name))
3068 for secondary_node in instance.secondary_nodes:
3069 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3070 device, False, info):
3071 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3072 (device.iv_name, device, secondary_node))
3075 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3076 instance, device, info):
3077 logger.Error("failed to create volume %s on primary!" %
3084 def _RemoveDisks(lu, instance):
3085 """Remove all disks for an instance.
3087 This abstracts away some work from `AddInstance()` and
3088 `RemoveInstance()`. Note that in case some of the devices couldn't
3089 be removed, the removal will continue with the other ones (compare
3090 with `_CreateDisks()`).
3093 instance: the instance object
3096 True or False showing the success of the removal proces
3099 logger.Info("removing block devices for instance %s" % instance.name)
3102 for device in instance.disks:
3103 for node, disk in device.ComputeNodeTree(instance.primary_node):
3104 lu.cfg.SetDiskID(disk, node)
3105 if not lu.rpc.call_blockdev_remove(node, disk):
3106 logger.Error("could not remove block device %s on node %s,"
3107 " continuing anyway" %
3108 (device.iv_name, node))
3111 if instance.disk_template == constants.DT_FILE:
3112 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3113 if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3115 logger.Error("could not remove directory '%s'" % file_storage_dir)
3121 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3122 """Compute disk size requirements in the volume group
3124 This is currently hard-coded for the two-drive layout.
3127 # Required free disk space as a function of disk and swap space
3129 constants.DT_DISKLESS: None,
3130 constants.DT_PLAIN: disk_size + swap_size,
3131 # 256 MB are added for drbd metadata, 128MB for each drbd device
3132 constants.DT_DRBD8: disk_size + swap_size + 256,
3133 constants.DT_FILE: None,
3136 if disk_template not in req_size_dict:
3137 raise errors.ProgrammerError("Disk template '%s' size requirement"
3138 " is unknown" % disk_template)
3140 return req_size_dict[disk_template]
3143 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3144 """Hypervisor parameter validation.
3146 This function abstract the hypervisor parameter validation to be
3147 used in both instance create and instance modify.
3149 @type lu: L{LogicalUnit}
3150 @param lu: the logical unit for which we check
3151 @type nodenames: list
3152 @param nodenames: the list of nodes on which we should check
3153 @type hvname: string
3154 @param hvname: the name of the hypervisor we should use
3155 @type hvparams: dict
3156 @param hvparams: the parameters which we need to check
3157 @raise errors.OpPrereqError: if the parameters are not valid
3160 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3163 for node in nodenames:
3164 info = hvinfo.get(node, None)
3165 if not info or not isinstance(info, (tuple, list)):
3166 raise errors.OpPrereqError("Cannot get current information"
3167 " from node '%s' (%s)" % (node, info))
3169 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3173 class LUCreateInstance(LogicalUnit):
3174 """Create an instance.
3177 HPATH = "instance-add"
3178 HTYPE = constants.HTYPE_INSTANCE
3179 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3180 "disk_template", "swap_size", "mode", "start", "vcpus",
3181 "wait_for_sync", "ip_check", "mac", "hvparams"]
3184 def _ExpandNode(self, node):
3185 """Expands and checks one node name.
3188 node_full = self.cfg.ExpandNodeName(node)
3189 if node_full is None:
3190 raise errors.OpPrereqError("Unknown node %s" % node)
3193 def ExpandNames(self):
3194 """ExpandNames for CreateInstance.
3196 Figure out the right locks for instance creation.
3199 self.needed_locks = {}
3201 # set optional parameters to none if they don't exist
3202 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3203 if not hasattr(self.op, attr):
3204 setattr(self.op, attr, None)
3206 # cheap checks, mostly valid constants given
3208 # verify creation mode
3209 if self.op.mode not in (constants.INSTANCE_CREATE,
3210 constants.INSTANCE_IMPORT):
3211 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3214 # disk template and mirror node verification
3215 if self.op.disk_template not in constants.DISK_TEMPLATES:
3216 raise errors.OpPrereqError("Invalid disk template name")
3218 if self.op.hypervisor is None:
3219 self.op.hypervisor = self.cfg.GetHypervisorType()
3221 enabled_hvs = self.cfg.GetClusterInfo().enabled_hypervisors
3222 if self.op.hypervisor not in enabled_hvs:
3223 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3224 " cluster (%s)" % (self.op.hypervisor,
3225 ",".join(enabled_hvs)))
3227 # check hypervisor parameter syntax (locally)
3229 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3230 hv_type.CheckParameterSyntax(self.op.hvparams)
3232 #### instance parameters check
3234 # instance name verification
3235 hostname1 = utils.HostInfo(self.op.instance_name)
3236 self.op.instance_name = instance_name = hostname1.name
3238 # this is just a preventive check, but someone might still add this
3239 # instance in the meantime, and creation will fail at lock-add time
3240 if instance_name in self.cfg.GetInstanceList():
3241 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3244 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3246 # ip validity checks
3247 ip = getattr(self.op, "ip", None)
3248 if ip is None or ip.lower() == "none":
3250 elif ip.lower() == "auto":
3251 inst_ip = hostname1.ip
3253 if not utils.IsValidIP(ip):
3254 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3255 " like a valid IP" % ip)
3257 self.inst_ip = self.op.ip = inst_ip
3258 # used in CheckPrereq for ip ping check
3259 self.check_ip = hostname1.ip
3261 # MAC address verification
3262 if self.op.mac != "auto":
3263 if not utils.IsValidMac(self.op.mac.lower()):
3264 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3267 # file storage checks
3268 if (self.op.file_driver and
3269 not self.op.file_driver in constants.FILE_DRIVER):
3270 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3271 self.op.file_driver)
3273 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3274 raise errors.OpPrereqError("File storage directory path not absolute")
3276 ### Node/iallocator related checks
3277 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3278 raise errors.OpPrereqError("One and only one of iallocator and primary"
3279 " node must be given")
3281 if self.op.iallocator:
3282 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3284 self.op.pnode = self._ExpandNode(self.op.pnode)
3285 nodelist = [self.op.pnode]
3286 if self.op.snode is not None:
3287 self.op.snode = self._ExpandNode(self.op.snode)
3288 nodelist.append(self.op.snode)
3289 self.needed_locks[locking.LEVEL_NODE] = nodelist
3291 # in case of import lock the source node too
3292 if self.op.mode == constants.INSTANCE_IMPORT:
3293 src_node = getattr(self.op, "src_node", None)
3294 src_path = getattr(self.op, "src_path", None)
3296 if src_node is None or src_path is None:
3297 raise errors.OpPrereqError("Importing an instance requires source"
3298 " node and path options")
3300 if not os.path.isabs(src_path):
3301 raise errors.OpPrereqError("The source path must be absolute")
3303 self.op.src_node = src_node = self._ExpandNode(src_node)
3304 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3305 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3307 else: # INSTANCE_CREATE
3308 if getattr(self.op, "os_type", None) is None:
3309 raise errors.OpPrereqError("No guest OS specified")
3311 def _RunAllocator(self):
3312 """Run the allocator based on input opcode.
3315 disks = [{"size": self.op.disk_size, "mode": "w"},
3316 {"size": self.op.swap_size, "mode": "w"}]
3317 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3318 "bridge": self.op.bridge}]
3319 ial = IAllocator(self,
3320 mode=constants.IALLOCATOR_MODE_ALLOC,
3321 name=self.op.instance_name,
3322 disk_template=self.op.disk_template,
3325 vcpus=self.op.vcpus,
3326 mem_size=self.op.mem_size,
3331 ial.Run(self.op.iallocator)
3334 raise errors.OpPrereqError("Can't compute nodes using"
3335 " iallocator '%s': %s" % (self.op.iallocator,
3337 if len(ial.nodes) != ial.required_nodes:
3338 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3339 " of nodes (%s), required %s" %
3340 (self.op.iallocator, len(ial.nodes),
3341 ial.required_nodes))
3342 self.op.pnode = ial.nodes[0]
3343 logger.ToStdout("Selected nodes for the instance: %s" %
3344 (", ".join(ial.nodes),))
3345 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3346 (self.op.instance_name, self.op.iallocator, ial.nodes))
3347 if ial.required_nodes == 2:
3348 self.op.snode = ial.nodes[1]
3350 def BuildHooksEnv(self):
3353 This runs on master, primary and secondary nodes of the instance.
3357 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3358 "INSTANCE_DISK_SIZE": self.op.disk_size,
3359 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3360 "INSTANCE_ADD_MODE": self.op.mode,
3362 if self.op.mode == constants.INSTANCE_IMPORT:
3363 env["INSTANCE_SRC_NODE"] = self.op.src_node
3364 env["INSTANCE_SRC_PATH"] = self.op.src_path
3365 env["INSTANCE_SRC_IMAGE"] = self.src_image
3367 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3368 primary_node=self.op.pnode,
3369 secondary_nodes=self.secondaries,
3370 status=self.instance_status,
3371 os_type=self.op.os_type,
3372 memory=self.op.mem_size,
3373 vcpus=self.op.vcpus,
3374 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3377 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3382 def CheckPrereq(self):
3383 """Check prerequisites.
3386 if (not self.cfg.GetVGName() and
3387 self.op.disk_template not in constants.DTS_NOT_LVM):
3388 raise errors.OpPrereqError("Cluster does not support lvm-based"
3392 if self.op.mode == constants.INSTANCE_IMPORT:
3393 src_node = self.op.src_node
3394 src_path = self.op.src_path
3396 export_info = self.rpc.call_export_info(src_node, src_path)
3399 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3401 if not export_info.has_section(constants.INISECT_EXP):
3402 raise errors.ProgrammerError("Corrupted export config")
3404 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3405 if (int(ei_version) != constants.EXPORT_VERSION):
3406 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3407 (ei_version, constants.EXPORT_VERSION))
3409 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3410 raise errors.OpPrereqError("Can't import instance with more than"
3413 # FIXME: are the old os-es, disk sizes, etc. useful?
3414 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3415 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3417 self.src_image = diskimage
3419 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3421 if self.op.start and not self.op.ip_check:
3422 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3423 " adding an instance in start mode")
3425 if self.op.ip_check:
3426 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3427 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3428 (self.check_ip, self.op.instance_name))
3430 # bridge verification
3431 bridge = getattr(self.op, "bridge", None)
3433 self.op.bridge = self.cfg.GetDefBridge()
3435 self.op.bridge = bridge
3439 if self.op.iallocator is not None:
3440 self._RunAllocator()
3442 #### node related checks
3444 # check primary node
3445 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3446 assert self.pnode is not None, \
3447 "Cannot retrieve locked node %s" % self.op.pnode
3448 self.secondaries = []
3450 # mirror node verification
3451 if self.op.disk_template in constants.DTS_NET_MIRROR:
3452 if self.op.snode is None:
3453 raise errors.OpPrereqError("The networked disk templates need"
3455 if self.op.snode == pnode.name:
3456 raise errors.OpPrereqError("The secondary node cannot be"
3457 " the primary node.")
3458 self.secondaries.append(self.op.snode)
3460 nodenames = [pnode.name] + self.secondaries
3462 req_size = _ComputeDiskSize(self.op.disk_template,
3463 self.op.disk_size, self.op.swap_size)
3465 # Check lv size requirements
3466 if req_size is not None:
3467 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3469 for node in nodenames:
3470 info = nodeinfo.get(node, None)
3472 raise errors.OpPrereqError("Cannot get current information"
3473 " from node '%s'" % node)
3474 vg_free = info.get('vg_free', None)
3475 if not isinstance(vg_free, int):
3476 raise errors.OpPrereqError("Can't compute free disk space on"
3478 if req_size > info['vg_free']:
3479 raise errors.OpPrereqError("Not enough disk space on target node %s."
3480 " %d MB available, %d MB required" %
3481 (node, info['vg_free'], req_size))
3483 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3486 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3488 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3489 " primary node" % self.op.os_type)
3491 # bridge check on primary node
3492 if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3493 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3494 " destination node '%s'" %
3495 (self.op.bridge, pnode.name))
3497 # memory check on primary node
3499 _CheckNodeFreeMemory(self, self.pnode.name,
3500 "creating instance %s" % self.op.instance_name,
3501 self.op.mem_size, self.op.hypervisor)
3504 self.instance_status = 'up'
3506 self.instance_status = 'down'
3508 def Exec(self, feedback_fn):
3509 """Create and add the instance to the cluster.
3512 instance = self.op.instance_name
3513 pnode_name = self.pnode.name
3515 if self.op.mac == "auto":
3516 mac_address = self.cfg.GenerateMAC()
3518 mac_address = self.op.mac
3520 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3521 if self.inst_ip is not None:
3522 nic.ip = self.inst_ip
3524 ht_kind = self.op.hypervisor
3525 if ht_kind in constants.HTS_REQ_PORT:
3526 network_port = self.cfg.AllocatePort()
3530 ##if self.op.vnc_bind_address is None:
3531 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3533 # this is needed because os.path.join does not accept None arguments
3534 if self.op.file_storage_dir is None:
3535 string_file_storage_dir = ""
3537 string_file_storage_dir = self.op.file_storage_dir
3539 # build the full file storage dir path
3540 file_storage_dir = os.path.normpath(os.path.join(
3541 self.cfg.GetFileStorageDir(),
3542 string_file_storage_dir, instance))
3545 disks = _GenerateDiskTemplate(self,
3546 self.op.disk_template,
3547 instance, pnode_name,
3548 self.secondaries, self.op.disk_size,
3551 self.op.file_driver)
3553 iobj = objects.Instance(name=instance, os=self.op.os_type,
3554 primary_node=pnode_name,
3555 memory=self.op.mem_size,
3556 vcpus=self.op.vcpus,
3557 nics=[nic], disks=disks,
3558 disk_template=self.op.disk_template,
3559 status=self.instance_status,
3560 network_port=network_port,
3561 hvparams=self.op.hvparams,
3562 hypervisor=self.op.hypervisor,
3565 feedback_fn("* creating instance disks...")
3566 if not _CreateDisks(self, iobj):
3567 _RemoveDisks(self, iobj)
3568 self.cfg.ReleaseDRBDMinors(instance)
3569 raise errors.OpExecError("Device creation failed, reverting...")
3571 feedback_fn("adding instance %s to cluster config" % instance)
3573 self.cfg.AddInstance(iobj)
3574 # Declare that we don't want to remove the instance lock anymore, as we've
3575 # added the instance to the config
3576 del self.remove_locks[locking.LEVEL_INSTANCE]
3577 # Remove the temp. assignements for the instance's drbds
3578 self.cfg.ReleaseDRBDMinors(instance)
3580 if self.op.wait_for_sync:
3581 disk_abort = not _WaitForSync(self, iobj)
3582 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3583 # make sure the disks are not degraded (still sync-ing is ok)
3585 feedback_fn("* checking mirrors status")
3586 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3591 _RemoveDisks(self, iobj)
3592 self.cfg.RemoveInstance(iobj.name)
3593 # Make sure the instance lock gets removed
3594 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3595 raise errors.OpExecError("There are some degraded disks for"
3598 feedback_fn("creating os for instance %s on node %s" %
3599 (instance, pnode_name))
3601 if iobj.disk_template != constants.DT_DISKLESS:
3602 if self.op.mode == constants.INSTANCE_CREATE:
3603 feedback_fn("* running the instance OS create scripts...")
3604 if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3605 raise errors.OpExecError("could not add os for instance %s"
3607 (instance, pnode_name))
3609 elif self.op.mode == constants.INSTANCE_IMPORT:
3610 feedback_fn("* running the instance OS import scripts...")
3611 src_node = self.op.src_node
3612 src_image = self.src_image
3613 cluster_name = self.cfg.GetClusterName()
3614 if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3615 src_node, src_image,
3617 raise errors.OpExecError("Could not import os for instance"
3619 (instance, pnode_name))
3621 # also checked in the prereq part
3622 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3626 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3627 feedback_fn("* starting instance...")
3628 if not self.rpc.call_instance_start(pnode_name, iobj, None):
3629 raise errors.OpExecError("Could not start instance")
3632 class LUConnectConsole(NoHooksLU):
3633 """Connect to an instance's console.
3635 This is somewhat special in that it returns the command line that
3636 you need to run on the master node in order to connect to the
3640 _OP_REQP = ["instance_name"]
3643 def ExpandNames(self):
3644 self._ExpandAndLockInstance()
3646 def CheckPrereq(self):
3647 """Check prerequisites.
3649 This checks that the instance is in the cluster.
3652 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3653 assert self.instance is not None, \
3654 "Cannot retrieve locked instance %s" % self.op.instance_name
3656 def Exec(self, feedback_fn):
3657 """Connect to the console of an instance
3660 instance = self.instance
3661 node = instance.primary_node
3663 node_insts = self.rpc.call_instance_list([node],
3664 [instance.hypervisor])[node]
3665 if node_insts is False:
3666 raise errors.OpExecError("Can't connect to node %s." % node)
3668 if instance.name not in node_insts:
3669 raise errors.OpExecError("Instance %s is not running." % instance.name)
3671 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3673 hyper = hypervisor.GetHypervisor(instance.hypervisor)
3674 console_cmd = hyper.GetShellCommandForConsole(instance)
3677 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3680 class LUReplaceDisks(LogicalUnit):
3681 """Replace the disks of an instance.
3684 HPATH = "mirrors-replace"
3685 HTYPE = constants.HTYPE_INSTANCE
3686 _OP_REQP = ["instance_name", "mode", "disks"]
3689 def ExpandNames(self):
3690 self._ExpandAndLockInstance()
3692 if not hasattr(self.op, "remote_node"):
3693 self.op.remote_node = None
3695 ia_name = getattr(self.op, "iallocator", None)
3696 if ia_name is not None:
3697 if self.op.remote_node is not None:
3698 raise errors.OpPrereqError("Give either the iallocator or the new"
3699 " secondary, not both")
3700 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3701 elif self.op.remote_node is not None:
3702 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3703 if remote_node is None:
3704 raise errors.OpPrereqError("Node '%s' not known" %
3705 self.op.remote_node)
3706 self.op.remote_node = remote_node
3707 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3708 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3710 self.needed_locks[locking.LEVEL_NODE] = []
3711 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3713 def DeclareLocks(self, level):
3714 # If we're not already locking all nodes in the set we have to declare the
3715 # instance's primary/secondary nodes.
3716 if (level == locking.LEVEL_NODE and
3717 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3718 self._LockInstancesNodes()
3720 def _RunAllocator(self):
3721 """Compute a new secondary node using an IAllocator.
3724 ial = IAllocator(self,
3725 mode=constants.IALLOCATOR_MODE_RELOC,
3726 name=self.op.instance_name,
3727 relocate_from=[self.sec_node])
3729 ial.Run(self.op.iallocator)
3732 raise errors.OpPrereqError("Can't compute nodes using"
3733 " iallocator '%s': %s" % (self.op.iallocator,
3735 if len(ial.nodes) != ial.required_nodes:
3736 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3737 " of nodes (%s), required %s" %
3738 (len(ial.nodes), ial.required_nodes))
3739 self.op.remote_node = ial.nodes[0]
3740 logger.ToStdout("Selected new secondary for the instance: %s" %
3741 self.op.remote_node)
3743 def BuildHooksEnv(self):
3746 This runs on the master, the primary and all the secondaries.
3750 "MODE": self.op.mode,
3751 "NEW_SECONDARY": self.op.remote_node,
3752 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3754 env.update(_BuildInstanceHookEnvByObject(self.instance))
3756 self.cfg.GetMasterNode(),
3757 self.instance.primary_node,
3759 if self.op.remote_node is not None:
3760 nl.append(self.op.remote_node)
3763 def CheckPrereq(self):
3764 """Check prerequisites.
3766 This checks that the instance is in the cluster.
3769 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3770 assert instance is not None, \
3771 "Cannot retrieve locked instance %s" % self.op.instance_name
3772 self.instance = instance
3774 if instance.disk_template not in constants.DTS_NET_MIRROR:
3775 raise errors.OpPrereqError("Instance's disk layout is not"
3776 " network mirrored.")
3778 if len(instance.secondary_nodes) != 1:
3779 raise errors.OpPrereqError("The instance has a strange layout,"
3780 " expected one secondary but found %d" %
3781 len(instance.secondary_nodes))
3783 self.sec_node = instance.secondary_nodes[0]
3785 ia_name = getattr(self.op, "iallocator", None)
3786 if ia_name is not None:
3787 self._RunAllocator()
3789 remote_node = self.op.remote_node
3790 if remote_node is not None:
3791 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3792 assert self.remote_node_info is not None, \
3793 "Cannot retrieve locked node %s" % remote_node
3795 self.remote_node_info = None
3796 if remote_node == instance.primary_node:
3797 raise errors.OpPrereqError("The specified node is the primary node of"
3799 elif remote_node == self.sec_node:
3800 if self.op.mode == constants.REPLACE_DISK_SEC:
3801 # this is for DRBD8, where we can't execute the same mode of
3802 # replacement as for drbd7 (no different port allocated)
3803 raise errors.OpPrereqError("Same secondary given, cannot execute"
3805 if instance.disk_template == constants.DT_DRBD8:
3806 if (self.op.mode == constants.REPLACE_DISK_ALL and
3807 remote_node is not None):
3808 # switch to replace secondary mode
3809 self.op.mode = constants.REPLACE_DISK_SEC
3811 if self.op.mode == constants.REPLACE_DISK_ALL:
3812 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3813 " secondary disk replacement, not"
3815 elif self.op.mode == constants.REPLACE_DISK_PRI:
3816 if remote_node is not None:
3817 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3818 " the secondary while doing a primary"
3819 " node disk replacement")
3820 self.tgt_node = instance.primary_node
3821 self.oth_node = instance.secondary_nodes[0]
3822 elif self.op.mode == constants.REPLACE_DISK_SEC:
3823 self.new_node = remote_node # this can be None, in which case
3824 # we don't change the secondary
3825 self.tgt_node = instance.secondary_nodes[0]
3826 self.oth_node = instance.primary_node
3828 raise errors.ProgrammerError("Unhandled disk replace mode")
3830 for name in self.op.disks:
3831 if instance.FindDisk(name) is None:
3832 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3833 (name, instance.name))
3835 def _ExecD8DiskOnly(self, feedback_fn):
3836 """Replace a disk on the primary or secondary for dbrd8.
3838 The algorithm for replace is quite complicated:
3839 - for each disk to be replaced:
3840 - create new LVs on the target node with unique names
3841 - detach old LVs from the drbd device
3842 - rename old LVs to name_replaced.<time_t>
3843 - rename new LVs to old LVs
3844 - attach the new LVs (with the old names now) to the drbd device
3845 - wait for sync across all devices
3846 - for each modified disk:
3847 - remove old LVs (which have the name name_replaces.<time_t>)
3849 Failures are not very well handled.
3853 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3854 instance = self.instance
3856 vgname = self.cfg.GetVGName()
3859 tgt_node = self.tgt_node
3860 oth_node = self.oth_node
3862 # Step: check device activation
3863 self.proc.LogStep(1, steps_total, "check device existence")
3864 info("checking volume groups")
3865 my_vg = cfg.GetVGName()
3866 results = self.rpc.call_vg_list([oth_node, tgt_node])
3868 raise errors.OpExecError("Can't list volume groups on the nodes")
3869 for node in oth_node, tgt_node:
3870 res = results.get(node, False)
3871 if not res or my_vg not in res:
3872 raise errors.OpExecError("Volume group '%s' not found on %s" %
3874 for dev in instance.disks:
3875 if not dev.iv_name in self.op.disks:
3877 for node in tgt_node, oth_node:
3878 info("checking %s on %s" % (dev.iv_name, node))
3879 cfg.SetDiskID(dev, node)
3880 if not self.rpc.call_blockdev_find(node, dev):
3881 raise errors.OpExecError("Can't find device %s on node %s" %
3882 (dev.iv_name, node))
3884 # Step: check other node consistency
3885 self.proc.LogStep(2, steps_total, "check peer consistency")
3886 for dev in instance.disks:
3887 if not dev.iv_name in self.op.disks:
3889 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3890 if not _CheckDiskConsistency(self, dev, oth_node,
3891 oth_node==instance.primary_node):
3892 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3893 " to replace disks on this node (%s)" %
3894 (oth_node, tgt_node))
3896 # Step: create new storage
3897 self.proc.LogStep(3, steps_total, "allocate new storage")
3898 for dev in instance.disks:
3899 if not dev.iv_name in self.op.disks:
3902 cfg.SetDiskID(dev, tgt_node)
3903 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3904 names = _GenerateUniqueNames(self, lv_names)
3905 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3906 logical_id=(vgname, names[0]))
3907 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3908 logical_id=(vgname, names[1]))
3909 new_lvs = [lv_data, lv_meta]
3910 old_lvs = dev.children
3911 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3912 info("creating new local storage on %s for %s" %
3913 (tgt_node, dev.iv_name))
3914 # since we *always* want to create this LV, we use the
3915 # _Create...OnPrimary (which forces the creation), even if we
3916 # are talking about the secondary node
3917 for new_lv in new_lvs:
3918 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3919 _GetInstanceInfoText(instance)):
3920 raise errors.OpExecError("Failed to create new LV named '%s' on"
3922 (new_lv.logical_id[1], tgt_node))
3924 # Step: for each lv, detach+rename*2+attach
3925 self.proc.LogStep(4, steps_total, "change drbd configuration")
3926 for dev, old_lvs, new_lvs in iv_names.itervalues():
3927 info("detaching %s drbd from local storage" % dev.iv_name)
3928 if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3929 raise errors.OpExecError("Can't detach drbd from local storage on node"
3930 " %s for device %s" % (tgt_node, dev.iv_name))
3932 #cfg.Update(instance)
3934 # ok, we created the new LVs, so now we know we have the needed
3935 # storage; as such, we proceed on the target node to rename
3936 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3937 # using the assumption that logical_id == physical_id (which in
3938 # turn is the unique_id on that node)
3940 # FIXME(iustin): use a better name for the replaced LVs
3941 temp_suffix = int(time.time())
3942 ren_fn = lambda d, suff: (d.physical_id[0],
3943 d.physical_id[1] + "_replaced-%s" % suff)
3944 # build the rename list based on what LVs exist on the node
3946 for to_ren in old_lvs:
3947 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
3948 if find_res is not None: # device exists
3949 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3951 info("renaming the old LVs on the target node")
3952 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3953 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3954 # now we rename the new LVs to the old LVs
3955 info("renaming the new LVs on the target node")
3956 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3957 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3958 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3960 for old, new in zip(old_lvs, new_lvs):
3961 new.logical_id = old.logical_id
3962 cfg.SetDiskID(new, tgt_node)
3964 for disk in old_lvs:
3965 disk.logical_id = ren_fn(disk, temp_suffix)
3966 cfg.SetDiskID(disk, tgt_node)
3968 # now that the new lvs have the old name, we can add them to the device
3969 info("adding new mirror component on %s" % tgt_node)
3970 if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3971 for new_lv in new_lvs:
3972 if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
3973 warning("Can't rollback device %s", hint="manually cleanup unused"
3975 raise errors.OpExecError("Can't add local storage to drbd")
3977 dev.children = new_lvs
3978 cfg.Update(instance)
3980 # Step: wait for sync
3982 # this can fail as the old devices are degraded and _WaitForSync
3983 # does a combined result over all disks, so we don't check its
3985 self.proc.LogStep(5, steps_total, "sync devices")
3986 _WaitForSync(self, instance, unlock=True)
3988 # so check manually all the devices
3989 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3990 cfg.SetDiskID(dev, instance.primary_node)
3991 is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
3993 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3995 # Step: remove old storage
3996 self.proc.LogStep(6, steps_total, "removing old storage")
3997 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3998 info("remove logical volumes for %s" % name)
4000 cfg.SetDiskID(lv, tgt_node)
4001 if not self.rpc.call_blockdev_remove(tgt_node, lv):
4002 warning("Can't remove old LV", hint="manually remove unused LVs")
4005 def _ExecD8Secondary(self, feedback_fn):
4006 """Replace the secondary node for drbd8.
4008 The algorithm for replace is quite complicated:
4009 - for all disks of the instance:
4010 - create new LVs on the new node with same names
4011 - shutdown the drbd device on the old secondary
4012 - disconnect the drbd network on the primary
4013 - create the drbd device on the new secondary
4014 - network attach the drbd on the primary, using an artifice:
4015 the drbd code for Attach() will connect to the network if it
4016 finds a device which is connected to the good local disks but
4018 - wait for sync across all devices
4019 - remove all disks from the old secondary
4021 Failures are not very well handled.
4025 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4026 instance = self.instance
4028 vgname = self.cfg.GetVGName()
4031 old_node = self.tgt_node
4032 new_node = self.new_node
4033 pri_node = instance.primary_node
4035 # Step: check device activation
4036 self.proc.LogStep(1, steps_total, "check device existence")
4037 info("checking volume groups")
4038 my_vg = cfg.GetVGName()
4039 results = self.rpc.call_vg_list([pri_node, new_node])
4041 raise errors.OpExecError("Can't list volume groups on the nodes")
4042 for node in pri_node, new_node:
4043 res = results.get(node, False)
4044 if not res or my_vg not in res:
4045 raise errors.OpExecError("Volume group '%s' not found on %s" %
4047 for dev in instance.disks:
4048 if not dev.iv_name in self.op.disks:
4050 info("checking %s on %s" % (dev.iv_name, pri_node))
4051 cfg.SetDiskID(dev, pri_node)
4052 if not self.rpc.call_blockdev_find(pri_node, dev):
4053 raise errors.OpExecError("Can't find device %s on node %s" %
4054 (dev.iv_name, pri_node))
4056 # Step: check other node consistency
4057 self.proc.LogStep(2, steps_total, "check peer consistency")
4058 for dev in instance.disks:
4059 if not dev.iv_name in self.op.disks:
4061 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4062 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4063 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4064 " unsafe to replace the secondary" %
4067 # Step: create new storage
4068 self.proc.LogStep(3, steps_total, "allocate new storage")
4069 for dev in instance.disks:
4071 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4072 # since we *always* want to create this LV, we use the
4073 # _Create...OnPrimary (which forces the creation), even if we
4074 # are talking about the secondary node
4075 for new_lv in dev.children:
4076 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4077 _GetInstanceInfoText(instance)):
4078 raise errors.OpExecError("Failed to create new LV named '%s' on"
4080 (new_lv.logical_id[1], new_node))
4083 # Step 4: dbrd minors and drbd setups changes
4084 # after this, we must manually remove the drbd minors on both the
4085 # error and the success paths
4086 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4088 logging.debug("Allocated minors %s" % (minors,))
4089 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4090 for dev, new_minor in zip(instance.disks, minors):
4092 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4093 # create new devices on new_node
4094 if pri_node == dev.logical_id[0]:
4095 new_logical_id = (pri_node, new_node,
4096 dev.logical_id[2], dev.logical_id[3], new_minor,
4099 new_logical_id = (new_node, pri_node,
4100 dev.logical_id[2], new_minor, dev.logical_id[4],
4102 iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4103 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4105 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4106 logical_id=new_logical_id,
4107 children=dev.children)
4108 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4110 _GetInstanceInfoText(instance)):
4111 self.cfg.ReleaseDRBDMinors(instance.name)
4112 raise errors.OpExecError("Failed to create new DRBD on"
4113 " node '%s'" % new_node)
4115 for dev in instance.disks:
4116 # we have new devices, shutdown the drbd on the old secondary
4117 info("shutting down drbd for %s on old node" % dev.iv_name)
4118 cfg.SetDiskID(dev, old_node)
4119 if not self.rpc.call_blockdev_shutdown(old_node, dev):
4120 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4121 hint="Please cleanup this device manually as soon as possible")
4123 info("detaching primary drbds from the network (=> standalone)")
4125 for dev in instance.disks:
4126 cfg.SetDiskID(dev, pri_node)
4127 # set the network part of the physical (unique in bdev terms) id
4128 # to None, meaning detach from network
4129 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4130 # and 'find' the device, which will 'fix' it to match the
4132 if self.rpc.call_blockdev_find(pri_node, dev):
4135 warning("Failed to detach drbd %s from network, unusual case" %
4139 # no detaches succeeded (very unlikely)
4140 self.cfg.ReleaseDRBDMinors(instance.name)
4141 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4143 # if we managed to detach at least one, we update all the disks of
4144 # the instance to point to the new secondary
4145 info("updating instance configuration")
4146 for dev, _, new_logical_id in iv_names.itervalues():
4147 dev.logical_id = new_logical_id
4148 cfg.SetDiskID(dev, pri_node)
4149 cfg.Update(instance)
4150 # we can remove now the temp minors as now the new values are
4151 # written to the config file (and therefore stable)
4152 self.cfg.ReleaseDRBDMinors(instance.name)
4154 # and now perform the drbd attach
4155 info("attaching primary drbds to new secondary (standalone => connected)")
4157 for dev in instance.disks:
4158 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4159 # since the attach is smart, it's enough to 'find' the device,
4160 # it will automatically activate the network, if the physical_id
4162 cfg.SetDiskID(dev, pri_node)
4163 logging.debug("Disk to attach: %s", dev)
4164 if not self.rpc.call_blockdev_find(pri_node, dev):
4165 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4166 "please do a gnt-instance info to see the status of disks")
4168 # this can fail as the old devices are degraded and _WaitForSync
4169 # does a combined result over all disks, so we don't check its
4171 self.proc.LogStep(5, steps_total, "sync devices")
4172 _WaitForSync(self, instance, unlock=True)
4174 # so check manually all the devices
4175 for name, (dev, old_lvs, _) in iv_names.iteritems():
4176 cfg.SetDiskID(dev, pri_node)
4177 is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4179 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4181 self.proc.LogStep(6, steps_total, "removing old storage")
4182 for name, (dev, old_lvs, _) in iv_names.iteritems():
4183 info("remove logical volumes for %s" % name)
4185 cfg.SetDiskID(lv, old_node)
4186 if not self.rpc.call_blockdev_remove(old_node, lv):
4187 warning("Can't remove LV on old secondary",
4188 hint="Cleanup stale volumes by hand")
4190 def Exec(self, feedback_fn):
4191 """Execute disk replacement.
4193 This dispatches the disk replacement to the appropriate handler.
4196 instance = self.instance
4198 # Activate the instance disks if we're replacing them on a down instance
4199 if instance.status == "down":
4200 _StartInstanceDisks(self, instance, True)
4202 if instance.disk_template == constants.DT_DRBD8:
4203 if self.op.remote_node is None:
4204 fn = self._ExecD8DiskOnly
4206 fn = self._ExecD8Secondary
4208 raise errors.ProgrammerError("Unhandled disk replacement case")
4210 ret = fn(feedback_fn)
4212 # Deactivate the instance disks if we're replacing them on a down instance
4213 if instance.status == "down":
4214 _SafeShutdownInstanceDisks(self, instance)
4219 class LUGrowDisk(LogicalUnit):
4220 """Grow a disk of an instance.
4224 HTYPE = constants.HTYPE_INSTANCE
4225 _OP_REQP = ["instance_name", "disk", "amount"]
4228 def ExpandNames(self):
4229 self._ExpandAndLockInstance()
4230 self.needed_locks[locking.LEVEL_NODE] = []
4231 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4233 def DeclareLocks(self, level):
4234 if level == locking.LEVEL_NODE:
4235 self._LockInstancesNodes()
4237 def BuildHooksEnv(self):
4240 This runs on the master, the primary and all the secondaries.
4244 "DISK": self.op.disk,
4245 "AMOUNT": self.op.amount,
4247 env.update(_BuildInstanceHookEnvByObject(self.instance))
4249 self.cfg.GetMasterNode(),
4250 self.instance.primary_node,
4254 def CheckPrereq(self):
4255 """Check prerequisites.
4257 This checks that the instance is in the cluster.
4260 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4261 assert instance is not None, \
4262 "Cannot retrieve locked instance %s" % self.op.instance_name
4264 self.instance = instance
4266 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4267 raise errors.OpPrereqError("Instance's disk layout does not support"
4270 if instance.FindDisk(self.op.disk) is None:
4271 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4272 (self.op.disk, instance.name))
4274 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4275 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4276 instance.hypervisor)
4277 for node in nodenames:
4278 info = nodeinfo.get(node, None)
4280 raise errors.OpPrereqError("Cannot get current information"
4281 " from node '%s'" % node)
4282 vg_free = info.get('vg_free', None)
4283 if not isinstance(vg_free, int):
4284 raise errors.OpPrereqError("Can't compute free disk space on"
4286 if self.op.amount > info['vg_free']:
4287 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4288 " %d MiB available, %d MiB required" %
4289 (node, info['vg_free'], self.op.amount))
4291 def Exec(self, feedback_fn):
4292 """Execute disk grow.
4295 instance = self.instance
4296 disk = instance.FindDisk(self.op.disk)
4297 for node in (instance.secondary_nodes + (instance.primary_node,)):
4298 self.cfg.SetDiskID(disk, node)
4299 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4300 if (not result or not isinstance(result, (list, tuple)) or
4302 raise errors.OpExecError("grow request failed to node %s" % node)
4304 raise errors.OpExecError("grow request failed to node %s: %s" %
4306 disk.RecordGrow(self.op.amount)
4307 self.cfg.Update(instance)
4311 class LUQueryInstanceData(NoHooksLU):
4312 """Query runtime instance data.
4315 _OP_REQP = ["instances"]
4318 def ExpandNames(self):
4319 self.needed_locks = {}
4320 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4322 if not isinstance(self.op.instances, list):
4323 raise errors.OpPrereqError("Invalid argument type 'instances'")
4325 if self.op.instances:
4326 self.wanted_names = []
4327 for name in self.op.instances:
4328 full_name = self.cfg.ExpandInstanceName(name)
4329 if full_name is None:
4330 raise errors.OpPrereqError("Instance '%s' not known" %
4331 self.op.instance_name)
4332 self.wanted_names.append(full_name)
4333 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4335 self.wanted_names = None
4336 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4338 self.needed_locks[locking.LEVEL_NODE] = []
4339 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4341 def DeclareLocks(self, level):
4342 if level == locking.LEVEL_NODE:
4343 self._LockInstancesNodes()
4345 def CheckPrereq(self):
4346 """Check prerequisites.
4348 This only checks the optional instance list against the existing names.
4351 if self.wanted_names is None:
4352 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4354 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4355 in self.wanted_names]
4358 def _ComputeDiskStatus(self, instance, snode, dev):
4359 """Compute block device status.
4362 self.cfg.SetDiskID(dev, instance.primary_node)
4363 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4364 if dev.dev_type in constants.LDS_DRBD:
4365 # we change the snode then (otherwise we use the one passed in)
4366 if dev.logical_id[0] == instance.primary_node:
4367 snode = dev.logical_id[1]
4369 snode = dev.logical_id[0]
4372 self.cfg.SetDiskID(dev, snode)
4373 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4378 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4379 for child in dev.children]
4384 "iv_name": dev.iv_name,
4385 "dev_type": dev.dev_type,
4386 "logical_id": dev.logical_id,
4387 "physical_id": dev.physical_id,
4388 "pstatus": dev_pstatus,
4389 "sstatus": dev_sstatus,
4390 "children": dev_children,
4395 def Exec(self, feedback_fn):
4396 """Gather and return data"""
4398 for instance in self.wanted_instances:
4399 remote_info = self.rpc.call_instance_info(instance.primary_node,
4401 instance.hypervisor)
4402 if remote_info and "state" in remote_info:
4405 remote_state = "down"
4406 if instance.status == "down":
4407 config_state = "down"
4411 disks = [self._ComputeDiskStatus(instance, None, device)
4412 for device in instance.disks]
4415 "name": instance.name,
4416 "config_state": config_state,
4417 "run_state": remote_state,
4418 "pnode": instance.primary_node,
4419 "snodes": instance.secondary_nodes,
4421 "memory": instance.memory,
4422 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4424 "vcpus": instance.vcpus,
4425 "hypervisor": instance.hypervisor,
4428 htkind = instance.hypervisor
4429 if htkind == constants.HT_XEN_PVM:
4430 idict["kernel_path"] = instance.kernel_path
4431 idict["initrd_path"] = instance.initrd_path
4433 if htkind == constants.HT_XEN_HVM:
4434 idict["hvm_boot_order"] = instance.hvm_boot_order
4435 idict["hvm_acpi"] = instance.hvm_acpi
4436 idict["hvm_pae"] = instance.hvm_pae
4437 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4438 idict["hvm_nic_type"] = instance.hvm_nic_type
4439 idict["hvm_disk_type"] = instance.hvm_disk_type
4441 if htkind in constants.HTS_REQ_PORT:
4442 if instance.vnc_bind_address is None:
4443 vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4445 vnc_bind_address = instance.vnc_bind_address
4446 if instance.network_port is None:
4447 vnc_console_port = None
4448 elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4449 vnc_console_port = "%s:%s" % (instance.primary_node,
4450 instance.network_port)
4451 elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4452 vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4453 instance.network_port,
4454 instance.primary_node)
4456 vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4457 instance.network_port)
4458 idict["vnc_console_port"] = vnc_console_port
4459 idict["vnc_bind_address"] = vnc_bind_address
4460 idict["network_port"] = instance.network_port
4462 result[instance.name] = idict
4467 class LUSetInstanceParams(LogicalUnit):
4468 """Modifies an instances's parameters.
4471 HPATH = "instance-modify"
4472 HTYPE = constants.HTYPE_INSTANCE
4473 _OP_REQP = ["instance_name", "hvparams"]
4476 def ExpandNames(self):
4477 self._ExpandAndLockInstance()
4478 self.needed_locks[locking.LEVEL_NODE] = []
4479 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4482 def DeclareLocks(self, level):
4483 if level == locking.LEVEL_NODE:
4484 self._LockInstancesNodes()
4486 def BuildHooksEnv(self):
4489 This runs on the master, primary and secondaries.
4494 args['memory'] = self.mem
4496 args['vcpus'] = self.vcpus
4497 if self.do_ip or self.do_bridge or self.mac:
4501 ip = self.instance.nics[0].ip
4503 bridge = self.bridge
4505 bridge = self.instance.nics[0].bridge
4509 mac = self.instance.nics[0].mac
4510 args['nics'] = [(ip, bridge, mac)]
4511 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4512 nl = [self.cfg.GetMasterNode(),
4513 self.instance.primary_node] + list(self.instance.secondary_nodes)
4516 def CheckPrereq(self):
4517 """Check prerequisites.
4519 This only checks the instance list against the existing names.
4522 # FIXME: all the parameters could be checked before, in ExpandNames, or in
4523 # a separate CheckArguments function, if we implement one, so the operation
4524 # can be aborted without waiting for any lock, should it have an error...
4525 self.mem = getattr(self.op, "mem", None)
4526 self.vcpus = getattr(self.op, "vcpus", None)
4527 self.ip = getattr(self.op, "ip", None)
4528 self.mac = getattr(self.op, "mac", None)
4529 self.bridge = getattr(self.op, "bridge", None)
4530 self.kernel_path = getattr(self.op, "kernel_path", None)
4531 self.initrd_path = getattr(self.op, "initrd_path", None)
4532 self.force = getattr(self.op, "force", None)
4533 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac]
4534 if all_parms.count(None) == len(all_parms) and not self.op.hvparams:
4535 raise errors.OpPrereqError("No changes submitted")
4536 if self.mem is not None:
4538 self.mem = int(self.mem)
4539 except ValueError, err:
4540 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4541 if self.vcpus is not None:
4543 self.vcpus = int(self.vcpus)
4544 except ValueError, err:
4545 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4546 if self.ip is not None:
4548 if self.ip.lower() == "none":
4551 if not utils.IsValidIP(self.ip):
4552 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4555 self.do_bridge = (self.bridge is not None)
4556 if self.mac is not None:
4557 if self.cfg.IsMacInUse(self.mac):
4558 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4560 if not utils.IsValidMac(self.mac):
4561 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4563 # checking the new params on the primary/secondary nodes
4565 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4566 assert self.instance is not None, \
4567 "Cannot retrieve locked instance %s" % self.op.instance_name
4568 pnode = self.instance.primary_node
4570 nodelist.extend(instance.secondary_nodes)
4572 if self.op.hvparams:
4573 i_hvdict = copy.deepcopy(instance.hvparams)
4574 for key, val in self.op.hvparams.iteritems():
4582 cluster = self.cfg.GetClusterInfo()
4583 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4586 hypervisor.GetHypervisor(
4587 instance.hypervisor).CheckParameterSyntax(hv_new)
4588 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4589 self.hv_new = hv_new
4592 if self.mem is not None and not self.force:
4593 instance_info = self.rpc.call_instance_info(pnode, instance.name,
4594 instance.hypervisor)
4595 nodeinfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
4596 instance.hypervisor)
4598 if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4599 # Assume the primary node is unreachable and go ahead
4600 self.warn.append("Can't get info from primary node %s" % pnode)
4603 current_mem = instance_info['memory']
4605 # Assume instance not running
4606 # (there is a slight race condition here, but it's not very probable,
4607 # and we have no other way to check)
4609 miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4611 raise errors.OpPrereqError("This change will prevent the instance"
4612 " from starting, due to %d MB of memory"
4613 " missing on its primary node" % miss_mem)
4615 for node in instance.secondary_nodes:
4616 if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4617 self.warn.append("Can't get info from secondary node %s" % node)
4618 elif self.mem > nodeinfo[node]['memory_free']:
4619 self.warn.append("Not enough memory to failover instance to"
4620 " secondary node %s" % node)
4624 def Exec(self, feedback_fn):
4625 """Modifies an instance.
4627 All parameters take effect only at the next restart of the instance.
4629 # Process here the warnings from CheckPrereq, as we don't have a
4630 # feedback_fn there.
4631 for warn in self.warn:
4632 feedback_fn("WARNING: %s" % warn)
4635 instance = self.instance
4637 instance.memory = self.mem
4638 result.append(("mem", self.mem))
4640 instance.vcpus = self.vcpus
4641 result.append(("vcpus", self.vcpus))
4643 instance.nics[0].ip = self.ip
4644 result.append(("ip", self.ip))
4646 instance.nics[0].bridge = self.bridge
4647 result.append(("bridge", self.bridge))
4649 instance.nics[0].mac = self.mac
4650 result.append(("mac", self.mac))
4651 if self.op.hvparams:
4652 instance.hvparams = self.hv_new
4653 for key, val in self.op.hvparams.iteritems():
4654 result.append(("hv/%s" % key, val))
4656 self.cfg.Update(instance)
4661 class LUQueryExports(NoHooksLU):
4662 """Query the exports list
4665 _OP_REQP = ['nodes']
4668 def ExpandNames(self):
4669 self.needed_locks = {}
4670 self.share_locks[locking.LEVEL_NODE] = 1
4671 if not self.op.nodes:
4672 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4674 self.needed_locks[locking.LEVEL_NODE] = \
4675 _GetWantedNodes(self, self.op.nodes)
4677 def CheckPrereq(self):
4678 """Check prerequisites.
4681 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4683 def Exec(self, feedback_fn):
4684 """Compute the list of all the exported system images.
4687 a dictionary with the structure node->(export-list)
4688 where export-list is a list of the instances exported on
4692 return self.rpc.call_export_list(self.nodes)
4695 class LUExportInstance(LogicalUnit):
4696 """Export an instance to an image in the cluster.
4699 HPATH = "instance-export"
4700 HTYPE = constants.HTYPE_INSTANCE
4701 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4704 def ExpandNames(self):
4705 self._ExpandAndLockInstance()
4706 # FIXME: lock only instance primary and destination node
4708 # Sad but true, for now we have do lock all nodes, as we don't know where
4709 # the previous export might be, and and in this LU we search for it and
4710 # remove it from its current node. In the future we could fix this by:
4711 # - making a tasklet to search (share-lock all), then create the new one,
4712 # then one to remove, after
4713 # - removing the removal operation altoghether
4714 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4716 def DeclareLocks(self, level):
4717 """Last minute lock declaration."""
4718 # All nodes are locked anyway, so nothing to do here.
4720 def BuildHooksEnv(self):
4723 This will run on the master, primary node and target node.
4727 "EXPORT_NODE": self.op.target_node,
4728 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4730 env.update(_BuildInstanceHookEnvByObject(self.instance))
4731 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4732 self.op.target_node]
4735 def CheckPrereq(self):
4736 """Check prerequisites.
4738 This checks that the instance and node names are valid.
4741 instance_name = self.op.instance_name
4742 self.instance = self.cfg.GetInstanceInfo(instance_name)
4743 assert self.instance is not None, \
4744 "Cannot retrieve locked instance %s" % self.op.instance_name
4746 self.dst_node = self.cfg.GetNodeInfo(
4747 self.cfg.ExpandNodeName(self.op.target_node))
4749 assert self.dst_node is not None, \
4750 "Cannot retrieve locked node %s" % self.op.target_node
4752 # instance disk type verification
4753 for disk in self.instance.disks:
4754 if disk.dev_type == constants.LD_FILE:
4755 raise errors.OpPrereqError("Export not supported for instances with"
4756 " file-based disks")
4758 def Exec(self, feedback_fn):
4759 """Export an instance to an image in the cluster.
4762 instance = self.instance
4763 dst_node = self.dst_node
4764 src_node = instance.primary_node
4765 if self.op.shutdown:
4766 # shutdown the instance, but not the disks
4767 if not self.rpc.call_instance_shutdown(src_node, instance):
4768 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4769 (instance.name, src_node))
4771 vgname = self.cfg.GetVGName()
4776 for disk in instance.disks:
4777 if disk.iv_name == "sda":
4778 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4779 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4781 if not new_dev_name:
4782 logger.Error("could not snapshot block device %s on node %s" %
4783 (disk.logical_id[1], src_node))
4785 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4786 logical_id=(vgname, new_dev_name),
4787 physical_id=(vgname, new_dev_name),
4788 iv_name=disk.iv_name)
4789 snap_disks.append(new_dev)
4792 if self.op.shutdown and instance.status == "up":
4793 if not self.rpc.call_instance_start(src_node, instance, None):
4794 _ShutdownInstanceDisks(self, instance)
4795 raise errors.OpExecError("Could not start instance")
4797 # TODO: check for size
4799 cluster_name = self.cfg.GetClusterName()
4800 for dev in snap_disks:
4801 if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4802 instance, cluster_name):
4803 logger.Error("could not export block device %s from node %s to node %s"
4804 % (dev.logical_id[1], src_node, dst_node.name))
4805 if not self.rpc.call_blockdev_remove(src_node, dev):
4806 logger.Error("could not remove snapshot block device %s from node %s" %
4807 (dev.logical_id[1], src_node))
4809 if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4810 logger.Error("could not finalize export for instance %s on node %s" %
4811 (instance.name, dst_node.name))
4813 nodelist = self.cfg.GetNodeList()
4814 nodelist.remove(dst_node.name)
4816 # on one-node clusters nodelist will be empty after the removal
4817 # if we proceed the backup would be removed because OpQueryExports
4818 # substitutes an empty list with the full cluster node list.
4820 exportlist = self.rpc.call_export_list(nodelist)
4821 for node in exportlist:
4822 if instance.name in exportlist[node]:
4823 if not self.rpc.call_export_remove(node, instance.name):
4824 logger.Error("could not remove older export for instance %s"
4825 " on node %s" % (instance.name, node))
4828 class LURemoveExport(NoHooksLU):
4829 """Remove exports related to the named instance.
4832 _OP_REQP = ["instance_name"]
4835 def ExpandNames(self):
4836 self.needed_locks = {}
4837 # We need all nodes to be locked in order for RemoveExport to work, but we
4838 # don't need to lock the instance itself, as nothing will happen to it (and
4839 # we can remove exports also for a removed instance)
4840 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4842 def CheckPrereq(self):
4843 """Check prerequisites.
4847 def Exec(self, feedback_fn):
4848 """Remove any export.
4851 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4852 # If the instance was not found we'll try with the name that was passed in.
4853 # This will only work if it was an FQDN, though.
4855 if not instance_name:
4857 instance_name = self.op.instance_name
4859 exportlist = self.rpc.call_export_list(self.acquired_locks[
4860 locking.LEVEL_NODE])
4862 for node in exportlist:
4863 if instance_name in exportlist[node]:
4865 if not self.rpc.call_export_remove(node, instance_name):
4866 logger.Error("could not remove export for instance %s"
4867 " on node %s" % (instance_name, node))
4869 if fqdn_warn and not found:
4870 feedback_fn("Export not found. If trying to remove an export belonging"
4871 " to a deleted instance please use its Fully Qualified"
4875 class TagsLU(NoHooksLU):
4878 This is an abstract class which is the parent of all the other tags LUs.
4882 def ExpandNames(self):
4883 self.needed_locks = {}
4884 if self.op.kind == constants.TAG_NODE:
4885 name = self.cfg.ExpandNodeName(self.op.name)
4887 raise errors.OpPrereqError("Invalid node name (%s)" %
4890 self.needed_locks[locking.LEVEL_NODE] = name
4891 elif self.op.kind == constants.TAG_INSTANCE:
4892 name = self.cfg.ExpandInstanceName(self.op.name)
4894 raise errors.OpPrereqError("Invalid instance name (%s)" %
4897 self.needed_locks[locking.LEVEL_INSTANCE] = name
4899 def CheckPrereq(self):
4900 """Check prerequisites.
4903 if self.op.kind == constants.TAG_CLUSTER:
4904 self.target = self.cfg.GetClusterInfo()
4905 elif self.op.kind == constants.TAG_NODE:
4906 self.target = self.cfg.GetNodeInfo(self.op.name)
4907 elif self.op.kind == constants.TAG_INSTANCE:
4908 self.target = self.cfg.GetInstanceInfo(self.op.name)
4910 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4914 class LUGetTags(TagsLU):
4915 """Returns the tags of a given object.
4918 _OP_REQP = ["kind", "name"]
4921 def Exec(self, feedback_fn):
4922 """Returns the tag list.
4925 return list(self.target.GetTags())
4928 class LUSearchTags(NoHooksLU):
4929 """Searches the tags for a given pattern.
4932 _OP_REQP = ["pattern"]
4935 def ExpandNames(self):
4936 self.needed_locks = {}
4938 def CheckPrereq(self):
4939 """Check prerequisites.
4941 This checks the pattern passed for validity by compiling it.
4945 self.re = re.compile(self.op.pattern)
4946 except re.error, err:
4947 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4948 (self.op.pattern, err))
4950 def Exec(self, feedback_fn):
4951 """Returns the tag list.
4955 tgts = [("/cluster", cfg.GetClusterInfo())]
4956 ilist = cfg.GetAllInstancesInfo().values()
4957 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4958 nlist = cfg.GetAllNodesInfo().values()
4959 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4961 for path, target in tgts:
4962 for tag in target.GetTags():
4963 if self.re.search(tag):
4964 results.append((path, tag))
4968 class LUAddTags(TagsLU):
4969 """Sets a tag on a given object.
4972 _OP_REQP = ["kind", "name", "tags"]
4975 def CheckPrereq(self):
4976 """Check prerequisites.
4978 This checks the type and length of the tag name and value.
4981 TagsLU.CheckPrereq(self)
4982 for tag in self.op.tags:
4983 objects.TaggableObject.ValidateTag(tag)
4985 def Exec(self, feedback_fn):
4990 for tag in self.op.tags:
4991 self.target.AddTag(tag)
4992 except errors.TagError, err:
4993 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4995 self.cfg.Update(self.target)
4996 except errors.ConfigurationError:
4997 raise errors.OpRetryError("There has been a modification to the"
4998 " config file and the operation has been"
4999 " aborted. Please retry.")
5002 class LUDelTags(TagsLU):
5003 """Delete a list of tags from a given object.
5006 _OP_REQP = ["kind", "name", "tags"]
5009 def CheckPrereq(self):
5010 """Check prerequisites.
5012 This checks that we have the given tag.
5015 TagsLU.CheckPrereq(self)
5016 for tag in self.op.tags:
5017 objects.TaggableObject.ValidateTag(tag)
5018 del_tags = frozenset(self.op.tags)
5019 cur_tags = self.target.GetTags()
5020 if not del_tags <= cur_tags:
5021 diff_tags = del_tags - cur_tags
5022 diff_names = ["'%s'" % tag for tag in diff_tags]
5024 raise errors.OpPrereqError("Tag(s) %s not found" %
5025 (",".join(diff_names)))
5027 def Exec(self, feedback_fn):
5028 """Remove the tag from the object.
5031 for tag in self.op.tags:
5032 self.target.RemoveTag(tag)
5034 self.cfg.Update(self.target)
5035 except errors.ConfigurationError:
5036 raise errors.OpRetryError("There has been a modification to the"
5037 " config file and the operation has been"
5038 " aborted. Please retry.")
5041 class LUTestDelay(NoHooksLU):
5042 """Sleep for a specified amount of time.
5044 This LU sleeps on the master and/or nodes for a specified amount of
5048 _OP_REQP = ["duration", "on_master", "on_nodes"]
5051 def ExpandNames(self):
5052 """Expand names and set required locks.
5054 This expands the node list, if any.
5057 self.needed_locks = {}
5058 if self.op.on_nodes:
5059 # _GetWantedNodes can be used here, but is not always appropriate to use
5060 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5062 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5063 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5065 def CheckPrereq(self):
5066 """Check prerequisites.
5070 def Exec(self, feedback_fn):
5071 """Do the actual sleep.
5074 if self.op.on_master:
5075 if not utils.TestDelay(self.op.duration):
5076 raise errors.OpExecError("Error during master delay test")
5077 if self.op.on_nodes:
5078 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5080 raise errors.OpExecError("Complete failure from rpc call")
5081 for node, node_result in result.items():
5083 raise errors.OpExecError("Failure during rpc call to node %s,"
5084 " result: %s" % (node, node_result))
5087 class IAllocator(object):
5088 """IAllocator framework.
5090 An IAllocator instance has three sets of attributes:
5091 - cfg that is needed to query the cluster
5092 - input data (all members of the _KEYS class attribute are required)
5093 - four buffer attributes (in|out_data|text), that represent the
5094 input (to the external script) in text and data structure format,
5095 and the output from it, again in two formats
5096 - the result variables from the script (success, info, nodes) for
5101 "mem_size", "disks", "disk_template",
5102 "os", "tags", "nics", "vcpus",
5108 def __init__(self, lu, mode, name, **kwargs):
5110 # init buffer variables
5111 self.in_text = self.out_text = self.in_data = self.out_data = None
5112 # init all input fields so that pylint is happy
5115 self.mem_size = self.disks = self.disk_template = None
5116 self.os = self.tags = self.nics = self.vcpus = None
5117 self.relocate_from = None
5119 self.required_nodes = None
5120 # init result fields
5121 self.success = self.info = self.nodes = None
5122 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5123 keyset = self._ALLO_KEYS
5124 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5125 keyset = self._RELO_KEYS
5127 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5128 " IAllocator" % self.mode)
5130 if key not in keyset:
5131 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5132 " IAllocator" % key)
5133 setattr(self, key, kwargs[key])
5135 if key not in kwargs:
5136 raise errors.ProgrammerError("Missing input parameter '%s' to"
5137 " IAllocator" % key)
5138 self._BuildInputData()
5140 def _ComputeClusterData(self):
5141 """Compute the generic allocator input data.
5143 This is the data that is independent of the actual operation.
5147 cluster_info = cfg.GetClusterInfo()
5151 "cluster_name": cfg.GetClusterName(),
5152 "cluster_tags": list(cluster_info.GetTags()),
5153 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5154 # we don't have job IDs
5157 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5161 node_list = cfg.GetNodeList()
5162 # FIXME: here we have only one hypervisor information, but
5163 # instance can belong to different hypervisors
5164 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5165 cfg.GetHypervisorType())
5166 for nname in node_list:
5167 ninfo = cfg.GetNodeInfo(nname)
5168 if nname not in node_data or not isinstance(node_data[nname], dict):
5169 raise errors.OpExecError("Can't get data for node %s" % nname)
5170 remote_info = node_data[nname]
5171 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5172 'vg_size', 'vg_free', 'cpu_total']:
5173 if attr not in remote_info:
5174 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5177 remote_info[attr] = int(remote_info[attr])
5178 except ValueError, err:
5179 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5180 " %s" % (nname, attr, str(err)))
5181 # compute memory used by primary instances
5182 i_p_mem = i_p_up_mem = 0
5183 for iinfo in i_list:
5184 if iinfo.primary_node == nname:
5185 i_p_mem += iinfo.memory
5186 if iinfo.status == "up":
5187 i_p_up_mem += iinfo.memory
5189 # compute memory used by instances
5191 "tags": list(ninfo.GetTags()),
5192 "total_memory": remote_info['memory_total'],
5193 "reserved_memory": remote_info['memory_dom0'],
5194 "free_memory": remote_info['memory_free'],
5195 "i_pri_memory": i_p_mem,
5196 "i_pri_up_memory": i_p_up_mem,
5197 "total_disk": remote_info['vg_size'],
5198 "free_disk": remote_info['vg_free'],
5199 "primary_ip": ninfo.primary_ip,
5200 "secondary_ip": ninfo.secondary_ip,
5201 "total_cpus": remote_info['cpu_total'],
5203 node_results[nname] = pnr
5204 data["nodes"] = node_results
5208 for iinfo in i_list:
5209 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5210 for n in iinfo.nics]
5212 "tags": list(iinfo.GetTags()),
5213 "should_run": iinfo.status == "up",
5214 "vcpus": iinfo.vcpus,
5215 "memory": iinfo.memory,
5217 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5219 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5220 "disk_template": iinfo.disk_template,
5221 "hypervisor": iinfo.hypervisor,
5223 instance_data[iinfo.name] = pir
5225 data["instances"] = instance_data
5229 def _AddNewInstance(self):
5230 """Add new instance data to allocator structure.
5232 This in combination with _AllocatorGetClusterData will create the
5233 correct structure needed as input for the allocator.
5235 The checks for the completeness of the opcode must have already been
5240 if len(self.disks) != 2:
5241 raise errors.OpExecError("Only two-disk configurations supported")
5243 disk_space = _ComputeDiskSize(self.disk_template,
5244 self.disks[0]["size"], self.disks[1]["size"])
5246 if self.disk_template in constants.DTS_NET_MIRROR:
5247 self.required_nodes = 2
5249 self.required_nodes = 1
5253 "disk_template": self.disk_template,
5256 "vcpus": self.vcpus,
5257 "memory": self.mem_size,
5258 "disks": self.disks,
5259 "disk_space_total": disk_space,
5261 "required_nodes": self.required_nodes,
5263 data["request"] = request
5265 def _AddRelocateInstance(self):
5266 """Add relocate instance data to allocator structure.
5268 This in combination with _IAllocatorGetClusterData will create the
5269 correct structure needed as input for the allocator.
5271 The checks for the completeness of the opcode must have already been
5275 instance = self.lu.cfg.GetInstanceInfo(self.name)
5276 if instance is None:
5277 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5278 " IAllocator" % self.name)
5280 if instance.disk_template not in constants.DTS_NET_MIRROR:
5281 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5283 if len(instance.secondary_nodes) != 1:
5284 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5286 self.required_nodes = 1
5288 disk_space = _ComputeDiskSize(instance.disk_template,
5289 instance.disks[0].size,
5290 instance.disks[1].size)
5295 "disk_space_total": disk_space,
5296 "required_nodes": self.required_nodes,
5297 "relocate_from": self.relocate_from,
5299 self.in_data["request"] = request
5301 def _BuildInputData(self):
5302 """Build input data structures.
5305 self._ComputeClusterData()
5307 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5308 self._AddNewInstance()
5310 self._AddRelocateInstance()
5312 self.in_text = serializer.Dump(self.in_data)
5314 def Run(self, name, validate=True, call_fn=None):
5315 """Run an instance allocator and return the results.
5319 call_fn = self.lu.rpc.call_iallocator_runner
5322 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5324 if not isinstance(result, (list, tuple)) or len(result) != 4:
5325 raise errors.OpExecError("Invalid result from master iallocator runner")
5327 rcode, stdout, stderr, fail = result
5329 if rcode == constants.IARUN_NOTFOUND:
5330 raise errors.OpExecError("Can't find allocator '%s'" % name)
5331 elif rcode == constants.IARUN_FAILURE:
5332 raise errors.OpExecError("Instance allocator call failed: %s,"
5333 " output: %s" % (fail, stdout+stderr))
5334 self.out_text = stdout
5336 self._ValidateResult()
5338 def _ValidateResult(self):
5339 """Process the allocator results.
5341 This will process and if successful save the result in
5342 self.out_data and the other parameters.
5346 rdict = serializer.Load(self.out_text)
5347 except Exception, err:
5348 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5350 if not isinstance(rdict, dict):
5351 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5353 for key in "success", "info", "nodes":
5354 if key not in rdict:
5355 raise errors.OpExecError("Can't parse iallocator results:"
5356 " missing key '%s'" % key)
5357 setattr(self, key, rdict[key])
5359 if not isinstance(rdict["nodes"], list):
5360 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5362 self.out_data = rdict
5365 class LUTestAllocator(NoHooksLU):
5366 """Run allocator tests.
5368 This LU runs the allocator tests
5371 _OP_REQP = ["direction", "mode", "name"]
5373 def CheckPrereq(self):
5374 """Check prerequisites.
5376 This checks the opcode parameters depending on the director and mode test.
5379 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5380 for attr in ["name", "mem_size", "disks", "disk_template",
5381 "os", "tags", "nics", "vcpus"]:
5382 if not hasattr(self.op, attr):
5383 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5385 iname = self.cfg.ExpandInstanceName(self.op.name)
5386 if iname is not None:
5387 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5389 if not isinstance(self.op.nics, list):
5390 raise errors.OpPrereqError("Invalid parameter 'nics'")
5391 for row in self.op.nics:
5392 if (not isinstance(row, dict) or
5395 "bridge" not in row):
5396 raise errors.OpPrereqError("Invalid contents of the"
5397 " 'nics' parameter")
5398 if not isinstance(self.op.disks, list):
5399 raise errors.OpPrereqError("Invalid parameter 'disks'")
5400 if len(self.op.disks) != 2:
5401 raise errors.OpPrereqError("Only two-disk configurations supported")
5402 for row in self.op.disks:
5403 if (not isinstance(row, dict) or
5404 "size" not in row or
5405 not isinstance(row["size"], int) or
5406 "mode" not in row or
5407 row["mode"] not in ['r', 'w']):
5408 raise errors.OpPrereqError("Invalid contents of the"
5409 " 'disks' parameter")
5410 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5411 if not hasattr(self.op, "name"):
5412 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5413 fname = self.cfg.ExpandInstanceName(self.op.name)
5415 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5417 self.op.name = fname
5418 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5420 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5423 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5424 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5425 raise errors.OpPrereqError("Missing allocator name")
5426 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5427 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5430 def Exec(self, feedback_fn):
5431 """Run the allocator test.
5434 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5435 ial = IAllocator(self,
5438 mem_size=self.op.mem_size,
5439 disks=self.op.disks,
5440 disk_template=self.op.disk_template,
5444 vcpus=self.op.vcpus,
5447 ial = IAllocator(self,
5450 relocate_from=list(self.relocate_from),
5453 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5454 result = ial.in_text
5456 ial.Run(self.op.allocator, validate=False)
5457 result = ial.out_text