4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement ExpandNames
52 - implement CheckPrereq
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements:
57 REQ_MASTER: the LU needs to run on the master node
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 for attr_name in self._OP_REQP:
92 attr_val = getattr(op, attr_name, None)
94 raise errors.OpPrereqError("Required parameter '%s' missing" %
97 if not self.cfg.IsCluster():
98 raise errors.OpPrereqError("Cluster not initialized yet,"
99 " use 'gnt-cluster init' first.")
101 master = self.cfg.GetMasterNode()
102 if master != utils.HostInfo().name:
103 raise errors.OpPrereqError("Commands must be run on the master"
107 """Returns the SshRunner object
111 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
114 ssh = property(fget=__GetSSH)
116 def ExpandNames(self):
117 """Expand names for this LU.
119 This method is called before starting to execute the opcode, and it should
120 update all the parameters of the opcode to their canonical form (e.g. a
121 short node name must be fully expanded after this method has successfully
122 completed). This way locking, hooks, logging, ecc. can work correctly.
124 LUs which implement this method must also populate the self.needed_locks
125 member, as a dict with lock levels as keys, and a list of needed lock names
127 - Use an empty dict if you don't need any lock
128 - If you don't need any lock at a particular level omit that level
129 - Don't put anything for the BGL level
130 - If you want all locks at a level use locking.ALL_SET as a value
132 If you need to share locks (rather than acquire them exclusively) at one
133 level you can modify self.share_locks, setting a true value (usually 1) for
134 that level. By default locks are not shared.
137 # Acquire all nodes and one instance
138 self.needed_locks = {
139 locking.LEVEL_NODE: locking.ALL_SET,
140 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
142 # Acquire just two nodes
143 self.needed_locks = {
144 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
147 self.needed_locks = {} # No, you can't leave it to the default value None
150 # The implementation of this method is mandatory only if the new LU is
151 # concurrent, so that old LUs don't need to be changed all at the same
154 self.needed_locks = {} # Exclusive LUs don't need locks.
156 raise NotImplementedError
158 def DeclareLocks(self, level):
159 """Declare LU locking needs for a level
161 While most LUs can just declare their locking needs at ExpandNames time,
162 sometimes there's the need to calculate some locks after having acquired
163 the ones before. This function is called just before acquiring locks at a
164 particular level, but after acquiring the ones at lower levels, and permits
165 such calculations. It can be used to modify self.needed_locks, and by
166 default it does nothing.
168 This function is only called if you have something already set in
169 self.needed_locks for the level.
171 @param level: Locking level which is going to be locked
172 @type level: member of ganeti.locking.LEVELS
176 def CheckPrereq(self):
177 """Check prerequisites for this LU.
179 This method should check that the prerequisites for the execution
180 of this LU are fulfilled. It can do internode communication, but
181 it should be idempotent - no cluster or system changes are
184 The method should raise errors.OpPrereqError in case something is
185 not fulfilled. Its return value is ignored.
187 This method should also update all the parameters of the opcode to
188 their canonical form if it hasn't been done by ExpandNames before.
191 raise NotImplementedError
193 def Exec(self, feedback_fn):
196 This method should implement the actual work. It should raise
197 errors.OpExecError for failures that are somewhat dealt with in
201 raise NotImplementedError
203 def BuildHooksEnv(self):
204 """Build hooks environment for this LU.
206 This method should return a three-node tuple consisting of: a dict
207 containing the environment that will be used for running the
208 specific hook for this LU, a list of node names on which the hook
209 should run before the execution, and a list of node names on which
210 the hook should run after the execution.
212 The keys of the dict must not have 'GANETI_' prefixed as this will
213 be handled in the hooks runner. Also note additional keys will be
214 added by the hooks runner. If the LU doesn't define any
215 environment, an empty dict (and not None) should be returned.
217 No nodes should be returned as an empty list (and not None).
219 Note that if the HPATH for a LU class is None, this function will
223 raise NotImplementedError
225 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226 """Notify the LU about the results of its hooks.
228 This method is called every time a hooks phase is executed, and notifies
229 the Logical Unit about the hooks' result. The LU can then use it to alter
230 its result based on the hooks. By default the method does nothing and the
231 previous result is passed back unchanged but any LU can define it if it
232 wants to use the local cluster hook-scripts somehow.
235 phase: the hooks phase that has just been run
236 hooks_results: the results of the multi-node hooks rpc call
237 feedback_fn: function to send feedback back to the caller
238 lu_result: the previous result this LU had, or None in the PRE phase.
243 def _ExpandAndLockInstance(self):
244 """Helper function to expand and lock an instance.
246 Many LUs that work on an instance take its name in self.op.instance_name
247 and need to expand it and then declare the expanded name for locking. This
248 function does it, and then updates self.op.instance_name to the expanded
249 name. It also initializes needed_locks as a dict, if this hasn't been done
253 if self.needed_locks is None:
254 self.needed_locks = {}
256 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257 "_ExpandAndLockInstance called with instance-level locks set"
258 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259 if expanded_name is None:
260 raise errors.OpPrereqError("Instance '%s' not known" %
261 self.op.instance_name)
262 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263 self.op.instance_name = expanded_name
265 def _LockInstancesNodes(self, primary_only=False):
266 """Helper function to declare instances' nodes for locking.
268 This function should be called after locking one or more instances to lock
269 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270 with all primary or secondary nodes for instances already locked and
271 present in self.needed_locks[locking.LEVEL_INSTANCE].
273 It should be called from DeclareLocks, and for safety only works if
274 self.recalculate_locks[locking.LEVEL_NODE] is set.
276 In the future it may grow parameters to just lock some instance's nodes, or
277 to just lock primaries or secondary nodes, if needed.
279 If should be called in DeclareLocks in a way similar to:
281 if level == locking.LEVEL_NODE:
282 self._LockInstancesNodes()
284 @type primary_only: boolean
285 @param primary_only: only lock primary nodes of locked instances
288 assert locking.LEVEL_NODE in self.recalculate_locks, \
289 "_LockInstancesNodes helper function called with no nodes to recalculate"
291 # TODO: check if we're really been called with the instance locks held
293 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294 # future we might want to have different behaviors depending on the value
295 # of self.recalculate_locks[locking.LEVEL_NODE]
297 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298 instance = self.context.cfg.GetInstanceInfo(instance_name)
299 wanted_nodes.append(instance.primary_node)
301 wanted_nodes.extend(instance.secondary_nodes)
303 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
308 del self.recalculate_locks[locking.LEVEL_NODE]
311 class NoHooksLU(LogicalUnit):
312 """Simple LU which runs no hooks.
314 This LU is intended as a parent for other LogicalUnits which will
315 run no hooks, in order to reduce duplicate code.
322 def _GetWantedNodes(lu, nodes):
323 """Returns list of checked and expanded node names.
326 nodes: List of nodes (strings) or None for all
329 if not isinstance(nodes, list):
330 raise errors.OpPrereqError("Invalid argument type 'nodes'")
333 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334 " non-empty list of nodes whose name is to be expanded.")
338 node = lu.cfg.ExpandNodeName(name)
340 raise errors.OpPrereqError("No such node name '%s'" % name)
343 return utils.NiceSort(wanted)
346 def _GetWantedInstances(lu, instances):
347 """Returns list of checked and expanded instance names.
350 instances: List of instances (strings) or None for all
353 if not isinstance(instances, list):
354 raise errors.OpPrereqError("Invalid argument type 'instances'")
359 for name in instances:
360 instance = lu.cfg.ExpandInstanceName(name)
362 raise errors.OpPrereqError("No such instance name '%s'" % name)
363 wanted.append(instance)
366 wanted = lu.cfg.GetInstanceList()
367 return utils.NiceSort(wanted)
370 def _CheckOutputFields(static, dynamic, selected):
371 """Checks whether all selected fields are valid.
374 static: Static fields
375 dynamic: Dynamic fields
378 static_fields = frozenset(static)
379 dynamic_fields = frozenset(dynamic)
381 all_fields = static_fields | dynamic_fields
383 if not all_fields.issuperset(selected):
384 raise errors.OpPrereqError("Unknown output fields selected: %s"
385 % ",".join(frozenset(selected).
386 difference(all_fields)))
389 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390 memory, vcpus, nics):
391 """Builds instance related env variables for hooks from single variables.
394 secondary_nodes: List of secondary nodes as strings
398 "INSTANCE_NAME": name,
399 "INSTANCE_PRIMARY": primary_node,
400 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401 "INSTANCE_OS_TYPE": os_type,
402 "INSTANCE_STATUS": status,
403 "INSTANCE_MEMORY": memory,
404 "INSTANCE_VCPUS": vcpus,
408 nic_count = len(nics)
409 for idx, (ip, bridge, mac) in enumerate(nics):
412 env["INSTANCE_NIC%d_IP" % idx] = ip
413 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418 env["INSTANCE_NIC_COUNT"] = nic_count
423 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
424 """Builds instance related env variables for hooks from an object.
427 instance: objects.Instance object of instance
428 override: dict of values to override
430 bep = lu.cfg.GetClusterInfo().FillBE(instance)
432 'name': instance.name,
433 'primary_node': instance.primary_node,
434 'secondary_nodes': instance.secondary_nodes,
435 'os_type': instance.os,
436 'status': instance.os,
437 'memory': bep[constants.BE_MEMORY],
438 'vcpus': bep[constants.BE_VCPUS],
439 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
442 args.update(override)
443 return _BuildInstanceHookEnv(**args)
446 def _CheckInstanceBridgesExist(lu, instance):
447 """Check that the brigdes needed by an instance exist.
450 # check bridges existance
451 brlist = [nic.bridge for nic in instance.nics]
452 if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
453 raise errors.OpPrereqError("one or more target bridges %s does not"
454 " exist on destination node '%s'" %
455 (brlist, instance.primary_node))
458 class LUDestroyCluster(NoHooksLU):
459 """Logical unit for destroying the cluster.
464 def CheckPrereq(self):
465 """Check prerequisites.
467 This checks whether the cluster is empty.
469 Any errors are signalled by raising errors.OpPrereqError.
472 master = self.cfg.GetMasterNode()
474 nodelist = self.cfg.GetNodeList()
475 if len(nodelist) != 1 or nodelist[0] != master:
476 raise errors.OpPrereqError("There are still %d node(s) in"
477 " this cluster." % (len(nodelist) - 1))
478 instancelist = self.cfg.GetInstanceList()
480 raise errors.OpPrereqError("There are still %d instance(s) in"
481 " this cluster." % len(instancelist))
483 def Exec(self, feedback_fn):
484 """Destroys the cluster.
487 master = self.cfg.GetMasterNode()
488 if not self.rpc.call_node_stop_master(master, False):
489 raise errors.OpExecError("Could not disable the master role")
490 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
491 utils.CreateBackup(priv_key)
492 utils.CreateBackup(pub_key)
496 class LUVerifyCluster(LogicalUnit):
497 """Verifies the cluster status.
500 HPATH = "cluster-verify"
501 HTYPE = constants.HTYPE_CLUSTER
502 _OP_REQP = ["skip_checks"]
505 def ExpandNames(self):
506 self.needed_locks = {
507 locking.LEVEL_NODE: locking.ALL_SET,
508 locking.LEVEL_INSTANCE: locking.ALL_SET,
510 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
513 remote_version, feedback_fn):
514 """Run multiple tests against a node.
517 - compares ganeti version
518 - checks vg existance and size > 20G
519 - checks config file checksum
520 - checks ssh to other nodes
523 node: name of the node to check
524 file_list: required list of files
525 local_cksum: dictionary of local files and their checksums
528 # compares ganeti version
529 local_version = constants.PROTOCOL_VERSION
530 if not remote_version:
531 feedback_fn(" - ERROR: connection to %s failed" % (node))
534 if local_version != remote_version:
535 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
536 (local_version, node, remote_version))
539 # checks vg existance and size > 20G
543 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
547 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
548 constants.MIN_VG_SIZE)
550 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
554 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
557 # checks config file checksum
560 if 'filelist' not in node_result:
562 feedback_fn(" - ERROR: node hasn't returned file checksum data")
564 remote_cksum = node_result['filelist']
565 for file_name in file_list:
566 if file_name not in remote_cksum:
568 feedback_fn(" - ERROR: file '%s' missing" % file_name)
569 elif remote_cksum[file_name] != local_cksum[file_name]:
571 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
573 if 'nodelist' not in node_result:
575 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
577 if node_result['nodelist']:
579 for node in node_result['nodelist']:
580 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
581 (node, node_result['nodelist'][node]))
582 if 'node-net-test' not in node_result:
584 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
586 if node_result['node-net-test']:
588 nlist = utils.NiceSort(node_result['node-net-test'].keys())
590 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
591 (node, node_result['node-net-test'][node]))
593 hyp_result = node_result.get('hypervisor', None)
594 if isinstance(hyp_result, dict):
595 for hv_name, hv_result in hyp_result.iteritems():
596 if hv_result is not None:
597 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
598 (hv_name, hv_result))
601 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
602 node_instance, feedback_fn):
603 """Verify an instance.
605 This function checks to see if the required block devices are
606 available on the instance's node.
611 node_current = instanceconfig.primary_node
614 instanceconfig.MapLVsByNode(node_vol_should)
616 for node in node_vol_should:
617 for volume in node_vol_should[node]:
618 if node not in node_vol_is or volume not in node_vol_is[node]:
619 feedback_fn(" - ERROR: volume %s missing on node %s" %
623 if not instanceconfig.status == 'down':
624 if (node_current not in node_instance or
625 not instance in node_instance[node_current]):
626 feedback_fn(" - ERROR: instance %s not running on node %s" %
627 (instance, node_current))
630 for node in node_instance:
631 if (not node == node_current):
632 if instance in node_instance[node]:
633 feedback_fn(" - ERROR: instance %s should not run on node %s" %
639 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
640 """Verify if there are any unknown volumes in the cluster.
642 The .os, .swap and backup volumes are ignored. All other volumes are
648 for node in node_vol_is:
649 for volume in node_vol_is[node]:
650 if node not in node_vol_should or volume not in node_vol_should[node]:
651 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
656 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
657 """Verify the list of running instances.
659 This checks what instances are running but unknown to the cluster.
663 for node in node_instance:
664 for runninginstance in node_instance[node]:
665 if runninginstance not in instancelist:
666 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
667 (runninginstance, node))
671 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
672 """Verify N+1 Memory Resilience.
674 Check that if one single node dies we can still start all the instances it
680 for node, nodeinfo in node_info.iteritems():
681 # This code checks that every node which is now listed as secondary has
682 # enough memory to host all instances it is supposed to should a single
683 # other node in the cluster fail.
684 # FIXME: not ready for failover to an arbitrary node
685 # FIXME: does not support file-backed instances
686 # WARNING: we currently take into account down instances as well as up
687 # ones, considering that even if they're down someone might want to start
688 # them even in the event of a node failure.
689 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
691 for instance in instances:
692 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
693 if bep[constants.BE_AUTO_BALANCE]:
694 needed_mem += bep[constants.BE_MEMORY]
695 if nodeinfo['mfree'] < needed_mem:
696 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
697 " failovers should node %s fail" % (node, prinode))
701 def CheckPrereq(self):
702 """Check prerequisites.
704 Transform the list of checks we're going to skip into a set and check that
705 all its members are valid.
708 self.skip_set = frozenset(self.op.skip_checks)
709 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
710 raise errors.OpPrereqError("Invalid checks to be skipped specified")
712 def BuildHooksEnv(self):
715 Cluster-Verify hooks just rone in the post phase and their failure makes
716 the output be logged in the verify output and the verification to fail.
719 all_nodes = self.cfg.GetNodeList()
720 # TODO: populate the environment with useful information for verify hooks
722 return env, [], all_nodes
724 def Exec(self, feedback_fn):
725 """Verify integrity of cluster, performing various test on nodes.
729 feedback_fn("* Verifying global settings")
730 for msg in self.cfg.VerifyConfig():
731 feedback_fn(" - ERROR: %s" % msg)
733 vg_name = self.cfg.GetVGName()
734 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
735 nodelist = utils.NiceSort(self.cfg.GetNodeList())
736 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
737 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
738 i_non_redundant = [] # Non redundant instances
739 i_non_a_balanced = [] # Non auto-balanced instances
745 # FIXME: verify OS list
748 file_names.append(constants.SSL_CERT_FILE)
749 file_names.append(constants.CLUSTER_CONF_FILE)
750 local_checksums = utils.FingerprintFiles(file_names)
752 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
753 all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
754 all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
755 all_vglist = self.rpc.call_vg_list(nodelist)
756 node_verify_param = {
757 'filelist': file_names,
758 'nodelist': nodelist,
759 'hypervisor': hypervisors,
760 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
761 for node in nodeinfo]
763 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
764 self.cfg.GetClusterName())
765 all_rversion = self.rpc.call_version(nodelist)
766 all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
767 self.cfg.GetHypervisorType())
769 cluster = self.cfg.GetClusterInfo()
770 for node in nodelist:
771 feedback_fn("* Verifying node %s" % node)
772 result = self._VerifyNode(node, file_names, local_checksums,
773 all_vglist[node], all_nvinfo[node],
774 all_rversion[node], feedback_fn)
778 volumeinfo = all_volumeinfo[node]
780 if isinstance(volumeinfo, basestring):
781 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
782 (node, volumeinfo[-400:].encode('string_escape')))
784 node_volume[node] = {}
785 elif not isinstance(volumeinfo, dict):
786 feedback_fn(" - ERROR: connection to %s failed" % (node,))
790 node_volume[node] = volumeinfo
793 nodeinstance = all_instanceinfo[node]
794 if type(nodeinstance) != list:
795 feedback_fn(" - ERROR: connection to %s failed" % (node,))
799 node_instance[node] = nodeinstance
802 nodeinfo = all_ninfo[node]
803 if not isinstance(nodeinfo, dict):
804 feedback_fn(" - ERROR: connection to %s failed" % (node,))
810 "mfree": int(nodeinfo['memory_free']),
811 "dfree": int(nodeinfo['vg_free']),
814 # dictionary holding all instances this node is secondary for,
815 # grouped by their primary node. Each key is a cluster node, and each
816 # value is a list of instances which have the key as primary and the
817 # current node as secondary. this is handy to calculate N+1 memory
818 # availability if you can only failover from a primary to its
820 "sinst-by-pnode": {},
823 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
829 for instance in instancelist:
830 feedback_fn("* Verifying instance %s" % instance)
831 inst_config = self.cfg.GetInstanceInfo(instance)
832 result = self._VerifyInstance(instance, inst_config, node_volume,
833 node_instance, feedback_fn)
836 inst_config.MapLVsByNode(node_vol_should)
838 instance_cfg[instance] = inst_config
840 pnode = inst_config.primary_node
841 if pnode in node_info:
842 node_info[pnode]['pinst'].append(instance)
844 feedback_fn(" - ERROR: instance %s, connection to primary node"
845 " %s failed" % (instance, pnode))
848 # If the instance is non-redundant we cannot survive losing its primary
849 # node, so we are not N+1 compliant. On the other hand we have no disk
850 # templates with more than one secondary so that situation is not well
852 # FIXME: does not support file-backed instances
853 if len(inst_config.secondary_nodes) == 0:
854 i_non_redundant.append(instance)
855 elif len(inst_config.secondary_nodes) > 1:
856 feedback_fn(" - WARNING: multiple secondaries for instance %s"
859 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
860 i_non_a_balanced.append(instance)
862 for snode in inst_config.secondary_nodes:
863 if snode in node_info:
864 node_info[snode]['sinst'].append(instance)
865 if pnode not in node_info[snode]['sinst-by-pnode']:
866 node_info[snode]['sinst-by-pnode'][pnode] = []
867 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
869 feedback_fn(" - ERROR: instance %s, connection to secondary node"
870 " %s failed" % (instance, snode))
872 feedback_fn("* Verifying orphan volumes")
873 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
877 feedback_fn("* Verifying remaining instances")
878 result = self._VerifyOrphanInstances(instancelist, node_instance,
882 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
883 feedback_fn("* Verifying N+1 Memory redundancy")
884 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
887 feedback_fn("* Other Notes")
889 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
890 % len(i_non_redundant))
893 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
894 % len(i_non_a_balanced))
898 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
899 """Analize the post-hooks' result, handle it, and send some
900 nicely-formatted feedback back to the user.
903 phase: the hooks phase that has just been run
904 hooks_results: the results of the multi-node hooks rpc call
905 feedback_fn: function to send feedback back to the caller
906 lu_result: previous Exec result
909 # We only really run POST phase hooks, and are only interested in
911 if phase == constants.HOOKS_PHASE_POST:
912 # Used to change hooks' output to proper indentation
913 indent_re = re.compile('^', re.M)
914 feedback_fn("* Hooks Results")
915 if not hooks_results:
916 feedback_fn(" - ERROR: general communication failure")
919 for node_name in hooks_results:
920 show_node_header = True
921 res = hooks_results[node_name]
922 if res is False or not isinstance(res, list):
923 feedback_fn(" Communication failure")
926 for script, hkr, output in res:
927 if hkr == constants.HKR_FAIL:
928 # The node header is only shown once, if there are
929 # failing hooks on that node
931 feedback_fn(" Node %s:" % node_name)
932 show_node_header = False
933 feedback_fn(" ERROR: Script %s failed, output:" % script)
934 output = indent_re.sub(' ', output)
935 feedback_fn("%s" % output)
941 class LUVerifyDisks(NoHooksLU):
942 """Verifies the cluster disks status.
948 def ExpandNames(self):
949 self.needed_locks = {
950 locking.LEVEL_NODE: locking.ALL_SET,
951 locking.LEVEL_INSTANCE: locking.ALL_SET,
953 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
955 def CheckPrereq(self):
956 """Check prerequisites.
958 This has no prerequisites.
963 def Exec(self, feedback_fn):
964 """Verify integrity of cluster disks.
967 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
969 vg_name = self.cfg.GetVGName()
970 nodes = utils.NiceSort(self.cfg.GetNodeList())
971 instances = [self.cfg.GetInstanceInfo(name)
972 for name in self.cfg.GetInstanceList()]
975 for inst in instances:
977 if (inst.status != "up" or
978 inst.disk_template not in constants.DTS_NET_MIRROR):
980 inst.MapLVsByNode(inst_lvs)
981 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
982 for node, vol_list in inst_lvs.iteritems():
984 nv_dict[(node, vol)] = inst
989 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
996 if isinstance(lvs, basestring):
997 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
999 elif not isinstance(lvs, dict):
1000 logging.warning("Connection to node %s failed or invalid data"
1002 res_nodes.append(node)
1005 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1006 inst = nv_dict.pop((node, lv_name), None)
1007 if (not lv_online and inst is not None
1008 and inst.name not in res_instances):
1009 res_instances.append(inst.name)
1011 # any leftover items in nv_dict are missing LVs, let's arrange the
1013 for key, inst in nv_dict.iteritems():
1014 if inst.name not in res_missing:
1015 res_missing[inst.name] = []
1016 res_missing[inst.name].append(key)
1021 class LURenameCluster(LogicalUnit):
1022 """Rename the cluster.
1025 HPATH = "cluster-rename"
1026 HTYPE = constants.HTYPE_CLUSTER
1029 def BuildHooksEnv(self):
1034 "OP_TARGET": self.cfg.GetClusterName(),
1035 "NEW_NAME": self.op.name,
1037 mn = self.cfg.GetMasterNode()
1038 return env, [mn], [mn]
1040 def CheckPrereq(self):
1041 """Verify that the passed name is a valid one.
1044 hostname = utils.HostInfo(self.op.name)
1046 new_name = hostname.name
1047 self.ip = new_ip = hostname.ip
1048 old_name = self.cfg.GetClusterName()
1049 old_ip = self.cfg.GetMasterIP()
1050 if new_name == old_name and new_ip == old_ip:
1051 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1052 " cluster has changed")
1053 if new_ip != old_ip:
1054 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1055 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1056 " reachable on the network. Aborting." %
1059 self.op.name = new_name
1061 def Exec(self, feedback_fn):
1062 """Rename the cluster.
1065 clustername = self.op.name
1068 # shutdown the master IP
1069 master = self.cfg.GetMasterNode()
1070 if not self.rpc.call_node_stop_master(master, False):
1071 raise errors.OpExecError("Could not disable the master role")
1076 ss.SetKey(ss.SS_MASTER_IP, ip)
1077 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1079 # Distribute updated ss config to all nodes
1080 myself = self.cfg.GetNodeInfo(master)
1081 dist_nodes = self.cfg.GetNodeList()
1082 if myself.name in dist_nodes:
1083 dist_nodes.remove(myself.name)
1085 logging.debug("Copying updated ssconf data to all nodes")
1086 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1087 fname = ss.KeyToFilename(keyname)
1088 result = self.rpc.call_upload_file(dist_nodes, fname)
1089 for to_node in dist_nodes:
1090 if not result[to_node]:
1091 logging.error("Copy of file %s to node %s failed", fname, to_node)
1093 if not self.rpc.call_node_start_master(master, False):
1094 logging.error("Could not re-enable the master role on the master,"
1095 " please restart manually.")
1098 def _RecursiveCheckIfLVMBased(disk):
1099 """Check if the given disk or its children are lvm-based.
1102 disk: ganeti.objects.Disk object
1105 boolean indicating whether a LD_LV dev_type was found or not
1109 for chdisk in disk.children:
1110 if _RecursiveCheckIfLVMBased(chdisk):
1112 return disk.dev_type == constants.LD_LV
1115 class LUSetClusterParams(LogicalUnit):
1116 """Change the parameters of the cluster.
1119 HPATH = "cluster-modify"
1120 HTYPE = constants.HTYPE_CLUSTER
1124 def ExpandNames(self):
1125 # FIXME: in the future maybe other cluster params won't require checking on
1126 # all nodes to be modified.
1127 self.needed_locks = {
1128 locking.LEVEL_NODE: locking.ALL_SET,
1130 self.share_locks[locking.LEVEL_NODE] = 1
1132 def BuildHooksEnv(self):
1137 "OP_TARGET": self.cfg.GetClusterName(),
1138 "NEW_VG_NAME": self.op.vg_name,
1140 mn = self.cfg.GetMasterNode()
1141 return env, [mn], [mn]
1143 def CheckPrereq(self):
1144 """Check prerequisites.
1146 This checks whether the given params don't conflict and
1147 if the given volume group is valid.
1150 # FIXME: This only works because there is only one parameter that can be
1151 # changed or removed.
1152 if self.op.vg_name is not None and not self.op.vg_name:
1153 instances = self.cfg.GetAllInstancesInfo().values()
1154 for inst in instances:
1155 for disk in inst.disks:
1156 if _RecursiveCheckIfLVMBased(disk):
1157 raise errors.OpPrereqError("Cannot disable lvm storage while"
1158 " lvm-based instances exist")
1160 node_list = self.acquired_locks[locking.LEVEL_NODE]
1162 # if vg_name not None, checks given volume group on all nodes
1164 vglist = self.rpc.call_vg_list(node_list)
1165 for node in node_list:
1166 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1167 constants.MIN_VG_SIZE)
1169 raise errors.OpPrereqError("Error on node '%s': %s" %
1172 self.cluster = cluster = self.cfg.GetClusterInfo()
1173 # beparams changes do not need validation (we can't validate?),
1174 # but we still process here
1175 if self.op.beparams:
1176 self.new_beparams = cluster.FillDict(
1177 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1179 # hypervisor list/parameters
1180 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1181 if self.op.hvparams:
1182 if not isinstance(self.op.hvparams, dict):
1183 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1184 for hv_name, hv_dict in self.op.hvparams.items():
1185 if hv_name not in self.new_hvparams:
1186 self.new_hvparams[hv_name] = hv_dict
1188 self.new_hvparams[hv_name].update(hv_dict)
1190 if self.op.enabled_hypervisors is not None:
1191 self.hv_list = self.op.enabled_hypervisors
1193 self.hv_list = cluster.enabled_hypervisors
1195 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1196 # either the enabled list has changed, or the parameters have, validate
1197 for hv_name, hv_params in self.new_hvparams.items():
1198 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1199 (self.op.enabled_hypervisors and
1200 hv_name in self.op.enabled_hypervisors)):
1201 # either this is a new hypervisor, or its parameters have changed
1202 hv_class = hypervisor.GetHypervisor(hv_name)
1203 hv_class.CheckParameterSyntax(hv_params)
1204 _CheckHVParams(self, node_list, hv_name, hv_params)
1206 def Exec(self, feedback_fn):
1207 """Change the parameters of the cluster.
1210 if self.op.vg_name is not None:
1211 if self.op.vg_name != self.cfg.GetVGName():
1212 self.cfg.SetVGName(self.op.vg_name)
1214 feedback_fn("Cluster LVM configuration already in desired"
1215 " state, not changing")
1216 if self.op.hvparams:
1217 self.cluster.hvparams = self.new_hvparams
1218 if self.op.enabled_hypervisors is not None:
1219 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1220 if self.op.beparams:
1221 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1222 self.cfg.Update(self.cluster)
1225 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1226 """Sleep and poll for an instance's disk to sync.
1229 if not instance.disks:
1233 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1235 node = instance.primary_node
1237 for dev in instance.disks:
1238 lu.cfg.SetDiskID(dev, node)
1244 cumul_degraded = False
1245 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1247 lu.proc.LogWarning("Can't get any data from node %s" % node)
1250 raise errors.RemoteError("Can't contact node %s for mirror data,"
1251 " aborting." % node)
1255 for i in range(len(rstats)):
1258 lu.proc.LogWarning("Can't compute data for node %s/%s" %
1259 (node, instance.disks[i].iv_name))
1261 # we ignore the ldisk parameter
1262 perc_done, est_time, is_degraded, _ = mstat
1263 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1264 if perc_done is not None:
1266 if est_time is not None:
1267 rem_time = "%d estimated seconds remaining" % est_time
1270 rem_time = "no time estimate"
1271 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1272 (instance.disks[i].iv_name, perc_done, rem_time))
1276 time.sleep(min(60, max_time))
1279 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1280 return not cumul_degraded
1283 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1284 """Check that mirrors are not degraded.
1286 The ldisk parameter, if True, will change the test from the
1287 is_degraded attribute (which represents overall non-ok status for
1288 the device(s)) to the ldisk (representing the local storage status).
1291 lu.cfg.SetDiskID(dev, node)
1298 if on_primary or dev.AssembleOnSecondary():
1299 rstats = lu.rpc.call_blockdev_find(node, dev)
1301 logging.warning("Node %s: disk degraded, not found or node down", node)
1304 result = result and (not rstats[idx])
1306 for child in dev.children:
1307 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1312 class LUDiagnoseOS(NoHooksLU):
1313 """Logical unit for OS diagnose/query.
1316 _OP_REQP = ["output_fields", "names"]
1319 def ExpandNames(self):
1321 raise errors.OpPrereqError("Selective OS query not supported")
1323 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1324 _CheckOutputFields(static=[],
1325 dynamic=self.dynamic_fields,
1326 selected=self.op.output_fields)
1328 # Lock all nodes, in shared mode
1329 self.needed_locks = {}
1330 self.share_locks[locking.LEVEL_NODE] = 1
1331 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1333 def CheckPrereq(self):
1334 """Check prerequisites.
1339 def _DiagnoseByOS(node_list, rlist):
1340 """Remaps a per-node return list into an a per-os per-node dictionary
1343 node_list: a list with the names of all nodes
1344 rlist: a map with node names as keys and OS objects as values
1347 map: a map with osnames as keys and as value another map, with
1349 keys and list of OS objects as values
1350 e.g. {"debian-etch": {"node1": [<object>,...],
1351 "node2": [<object>,]}
1356 for node_name, nr in rlist.iteritems():
1360 if os_obj.name not in all_os:
1361 # build a list of nodes for this os containing empty lists
1362 # for each node in node_list
1363 all_os[os_obj.name] = {}
1364 for nname in node_list:
1365 all_os[os_obj.name][nname] = []
1366 all_os[os_obj.name][node_name].append(os_obj)
1369 def Exec(self, feedback_fn):
1370 """Compute the list of OSes.
1373 node_list = self.acquired_locks[locking.LEVEL_NODE]
1374 node_data = self.rpc.call_os_diagnose(node_list)
1375 if node_data == False:
1376 raise errors.OpExecError("Can't gather the list of OSes")
1377 pol = self._DiagnoseByOS(node_list, node_data)
1379 for os_name, os_data in pol.iteritems():
1381 for field in self.op.output_fields:
1384 elif field == "valid":
1385 val = utils.all([osl and osl[0] for osl in os_data.values()])
1386 elif field == "node_status":
1388 for node_name, nos_list in os_data.iteritems():
1389 val[node_name] = [(v.status, v.path) for v in nos_list]
1391 raise errors.ParameterError(field)
1398 class LURemoveNode(LogicalUnit):
1399 """Logical unit for removing a node.
1402 HPATH = "node-remove"
1403 HTYPE = constants.HTYPE_NODE
1404 _OP_REQP = ["node_name"]
1406 def BuildHooksEnv(self):
1409 This doesn't run on the target node in the pre phase as a failed
1410 node would then be impossible to remove.
1414 "OP_TARGET": self.op.node_name,
1415 "NODE_NAME": self.op.node_name,
1417 all_nodes = self.cfg.GetNodeList()
1418 all_nodes.remove(self.op.node_name)
1419 return env, all_nodes, all_nodes
1421 def CheckPrereq(self):
1422 """Check prerequisites.
1425 - the node exists in the configuration
1426 - it does not have primary or secondary instances
1427 - it's not the master
1429 Any errors are signalled by raising errors.OpPrereqError.
1432 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1434 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1436 instance_list = self.cfg.GetInstanceList()
1438 masternode = self.cfg.GetMasterNode()
1439 if node.name == masternode:
1440 raise errors.OpPrereqError("Node is the master node,"
1441 " you need to failover first.")
1443 for instance_name in instance_list:
1444 instance = self.cfg.GetInstanceInfo(instance_name)
1445 if node.name == instance.primary_node:
1446 raise errors.OpPrereqError("Instance %s still running on the node,"
1447 " please remove first." % instance_name)
1448 if node.name in instance.secondary_nodes:
1449 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1450 " please remove first." % instance_name)
1451 self.op.node_name = node.name
1454 def Exec(self, feedback_fn):
1455 """Removes the node from the cluster.
1459 logging.info("Stopping the node daemon and removing configs from node %s",
1462 self.context.RemoveNode(node.name)
1464 self.rpc.call_node_leave_cluster(node.name)
1467 class LUQueryNodes(NoHooksLU):
1468 """Logical unit for querying nodes.
1471 _OP_REQP = ["output_fields", "names"]
1474 def ExpandNames(self):
1475 self.dynamic_fields = frozenset([
1477 "mtotal", "mnode", "mfree",
1482 self.static_fields = frozenset([
1483 "name", "pinst_cnt", "sinst_cnt",
1484 "pinst_list", "sinst_list",
1485 "pip", "sip", "tags",
1489 _CheckOutputFields(static=self.static_fields,
1490 dynamic=self.dynamic_fields,
1491 selected=self.op.output_fields)
1493 self.needed_locks = {}
1494 self.share_locks[locking.LEVEL_NODE] = 1
1497 self.wanted = _GetWantedNodes(self, self.op.names)
1499 self.wanted = locking.ALL_SET
1501 self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1503 # if we don't request only static fields, we need to lock the nodes
1504 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1507 def CheckPrereq(self):
1508 """Check prerequisites.
1511 # The validation of the node list is done in the _GetWantedNodes,
1512 # if non empty, and if empty, there's no validation to do
1515 def Exec(self, feedback_fn):
1516 """Computes the list of nodes and their attributes.
1519 all_info = self.cfg.GetAllNodesInfo()
1521 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1522 elif self.wanted != locking.ALL_SET:
1523 nodenames = self.wanted
1524 missing = set(nodenames).difference(all_info.keys())
1526 raise errors.OpExecError(
1527 "Some nodes were removed before retrieving their data: %s" % missing)
1529 nodenames = all_info.keys()
1531 nodenames = utils.NiceSort(nodenames)
1532 nodelist = [all_info[name] for name in nodenames]
1534 # begin data gathering
1536 if self.dynamic_fields.intersection(self.op.output_fields):
1538 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1539 self.cfg.GetHypervisorType())
1540 for name in nodenames:
1541 nodeinfo = node_data.get(name, None)
1544 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1545 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1546 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1547 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1548 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1549 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1550 "bootid": nodeinfo['bootid'],
1553 live_data[name] = {}
1555 live_data = dict.fromkeys(nodenames, {})
1557 node_to_primary = dict([(name, set()) for name in nodenames])
1558 node_to_secondary = dict([(name, set()) for name in nodenames])
1560 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1561 "sinst_cnt", "sinst_list"))
1562 if inst_fields & frozenset(self.op.output_fields):
1563 instancelist = self.cfg.GetInstanceList()
1565 for instance_name in instancelist:
1566 inst = self.cfg.GetInstanceInfo(instance_name)
1567 if inst.primary_node in node_to_primary:
1568 node_to_primary[inst.primary_node].add(inst.name)
1569 for secnode in inst.secondary_nodes:
1570 if secnode in node_to_secondary:
1571 node_to_secondary[secnode].add(inst.name)
1573 # end data gathering
1576 for node in nodelist:
1578 for field in self.op.output_fields:
1581 elif field == "pinst_list":
1582 val = list(node_to_primary[node.name])
1583 elif field == "sinst_list":
1584 val = list(node_to_secondary[node.name])
1585 elif field == "pinst_cnt":
1586 val = len(node_to_primary[node.name])
1587 elif field == "sinst_cnt":
1588 val = len(node_to_secondary[node.name])
1589 elif field == "pip":
1590 val = node.primary_ip
1591 elif field == "sip":
1592 val = node.secondary_ip
1593 elif field == "tags":
1594 val = list(node.GetTags())
1595 elif field == "serial_no":
1596 val = node.serial_no
1597 elif field in self.dynamic_fields:
1598 val = live_data[node.name].get(field, None)
1600 raise errors.ParameterError(field)
1601 node_output.append(val)
1602 output.append(node_output)
1607 class LUQueryNodeVolumes(NoHooksLU):
1608 """Logical unit for getting volumes on node(s).
1611 _OP_REQP = ["nodes", "output_fields"]
1614 def ExpandNames(self):
1615 _CheckOutputFields(static=["node"],
1616 dynamic=["phys", "vg", "name", "size", "instance"],
1617 selected=self.op.output_fields)
1619 self.needed_locks = {}
1620 self.share_locks[locking.LEVEL_NODE] = 1
1621 if not self.op.nodes:
1622 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1624 self.needed_locks[locking.LEVEL_NODE] = \
1625 _GetWantedNodes(self, self.op.nodes)
1627 def CheckPrereq(self):
1628 """Check prerequisites.
1630 This checks that the fields required are valid output fields.
1633 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1635 def Exec(self, feedback_fn):
1636 """Computes the list of nodes and their attributes.
1639 nodenames = self.nodes
1640 volumes = self.rpc.call_node_volumes(nodenames)
1642 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1643 in self.cfg.GetInstanceList()]
1645 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1648 for node in nodenames:
1649 if node not in volumes or not volumes[node]:
1652 node_vols = volumes[node][:]
1653 node_vols.sort(key=lambda vol: vol['dev'])
1655 for vol in node_vols:
1657 for field in self.op.output_fields:
1660 elif field == "phys":
1664 elif field == "name":
1666 elif field == "size":
1667 val = int(float(vol['size']))
1668 elif field == "instance":
1670 if node not in lv_by_node[inst]:
1672 if vol['name'] in lv_by_node[inst][node]:
1678 raise errors.ParameterError(field)
1679 node_output.append(str(val))
1681 output.append(node_output)
1686 class LUAddNode(LogicalUnit):
1687 """Logical unit for adding node to the cluster.
1691 HTYPE = constants.HTYPE_NODE
1692 _OP_REQP = ["node_name"]
1694 def BuildHooksEnv(self):
1697 This will run on all nodes before, and on all nodes + the new node after.
1701 "OP_TARGET": self.op.node_name,
1702 "NODE_NAME": self.op.node_name,
1703 "NODE_PIP": self.op.primary_ip,
1704 "NODE_SIP": self.op.secondary_ip,
1706 nodes_0 = self.cfg.GetNodeList()
1707 nodes_1 = nodes_0 + [self.op.node_name, ]
1708 return env, nodes_0, nodes_1
1710 def CheckPrereq(self):
1711 """Check prerequisites.
1714 - the new node is not already in the config
1716 - its parameters (single/dual homed) matches the cluster
1718 Any errors are signalled by raising errors.OpPrereqError.
1721 node_name = self.op.node_name
1724 dns_data = utils.HostInfo(node_name)
1726 node = dns_data.name
1727 primary_ip = self.op.primary_ip = dns_data.ip
1728 secondary_ip = getattr(self.op, "secondary_ip", None)
1729 if secondary_ip is None:
1730 secondary_ip = primary_ip
1731 if not utils.IsValidIP(secondary_ip):
1732 raise errors.OpPrereqError("Invalid secondary IP given")
1733 self.op.secondary_ip = secondary_ip
1735 node_list = cfg.GetNodeList()
1736 if not self.op.readd and node in node_list:
1737 raise errors.OpPrereqError("Node %s is already in the configuration" %
1739 elif self.op.readd and node not in node_list:
1740 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1742 for existing_node_name in node_list:
1743 existing_node = cfg.GetNodeInfo(existing_node_name)
1745 if self.op.readd and node == existing_node_name:
1746 if (existing_node.primary_ip != primary_ip or
1747 existing_node.secondary_ip != secondary_ip):
1748 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1749 " address configuration as before")
1752 if (existing_node.primary_ip == primary_ip or
1753 existing_node.secondary_ip == primary_ip or
1754 existing_node.primary_ip == secondary_ip or
1755 existing_node.secondary_ip == secondary_ip):
1756 raise errors.OpPrereqError("New node ip address(es) conflict with"
1757 " existing node %s" % existing_node.name)
1759 # check that the type of the node (single versus dual homed) is the
1760 # same as for the master
1761 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1762 master_singlehomed = myself.secondary_ip == myself.primary_ip
1763 newbie_singlehomed = secondary_ip == primary_ip
1764 if master_singlehomed != newbie_singlehomed:
1765 if master_singlehomed:
1766 raise errors.OpPrereqError("The master has no private ip but the"
1767 " new node has one")
1769 raise errors.OpPrereqError("The master has a private ip but the"
1770 " new node doesn't have one")
1772 # checks reachablity
1773 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1774 raise errors.OpPrereqError("Node not reachable by ping")
1776 if not newbie_singlehomed:
1777 # check reachability from my secondary ip to newbie's secondary ip
1778 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1779 source=myself.secondary_ip):
1780 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1781 " based ping to noded port")
1783 self.new_node = objects.Node(name=node,
1784 primary_ip=primary_ip,
1785 secondary_ip=secondary_ip)
1787 def Exec(self, feedback_fn):
1788 """Adds the new node to the cluster.
1791 new_node = self.new_node
1792 node = new_node.name
1794 # check connectivity
1795 result = self.rpc.call_version([node])[node]
1797 if constants.PROTOCOL_VERSION == result:
1798 logging.info("Communication to node %s fine, sw version %s match",
1801 raise errors.OpExecError("Version mismatch master version %s,"
1802 " node version %s" %
1803 (constants.PROTOCOL_VERSION, result))
1805 raise errors.OpExecError("Cannot get version from the new node")
1808 logging.info("Copy ssh key to node %s", node)
1809 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1811 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1812 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1818 keyarray.append(f.read())
1822 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1824 keyarray[3], keyarray[4], keyarray[5])
1827 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1829 # Add node to our /etc/hosts, and add key to known_hosts
1830 utils.AddHostToEtcHosts(new_node.name)
1832 if new_node.secondary_ip != new_node.primary_ip:
1833 if not self.rpc.call_node_has_ip_address(new_node.name,
1834 new_node.secondary_ip):
1835 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1836 " you gave (%s). Please fix and re-run this"
1837 " command." % new_node.secondary_ip)
1839 node_verify_list = [self.cfg.GetMasterNode()]
1840 node_verify_param = {
1842 # TODO: do a node-net-test as well?
1845 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1846 self.cfg.GetClusterName())
1847 for verifier in node_verify_list:
1848 if not result[verifier]:
1849 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1850 " for remote verification" % verifier)
1851 if result[verifier]['nodelist']:
1852 for failed in result[verifier]['nodelist']:
1853 feedback_fn("ssh/hostname verification failed %s -> %s" %
1854 (verifier, result[verifier]['nodelist'][failed]))
1855 raise errors.OpExecError("ssh/hostname verification failed.")
1857 # Distribute updated /etc/hosts and known_hosts to all nodes,
1858 # including the node just added
1859 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1860 dist_nodes = self.cfg.GetNodeList()
1861 if not self.op.readd:
1862 dist_nodes.append(node)
1863 if myself.name in dist_nodes:
1864 dist_nodes.remove(myself.name)
1866 logging.debug("Copying hosts and known_hosts to all nodes")
1867 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1868 result = self.rpc.call_upload_file(dist_nodes, fname)
1869 for to_node in dist_nodes:
1870 if not result[to_node]:
1871 logging.error("Copy of file %s to node %s failed", fname, to_node)
1874 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1875 to_copy.append(constants.VNC_PASSWORD_FILE)
1876 for fname in to_copy:
1877 result = self.rpc.call_upload_file([node], fname)
1878 if not result[node]:
1879 logging.error("Could not copy file %s to node %s", fname, node)
1882 self.context.ReaddNode(new_node)
1884 self.context.AddNode(new_node)
1887 class LUQueryClusterInfo(NoHooksLU):
1888 """Query cluster configuration.
1895 def ExpandNames(self):
1896 self.needed_locks = {}
1898 def CheckPrereq(self):
1899 """No prerequsites needed for this LU.
1904 def Exec(self, feedback_fn):
1905 """Return cluster config.
1908 cluster = self.cfg.GetClusterInfo()
1910 "software_version": constants.RELEASE_VERSION,
1911 "protocol_version": constants.PROTOCOL_VERSION,
1912 "config_version": constants.CONFIG_VERSION,
1913 "os_api_version": constants.OS_API_VERSION,
1914 "export_version": constants.EXPORT_VERSION,
1915 "architecture": (platform.architecture()[0], platform.machine()),
1916 "name": cluster.cluster_name,
1917 "master": cluster.master_node,
1918 "hypervisor_type": cluster.hypervisor,
1919 "enabled_hypervisors": cluster.enabled_hypervisors,
1920 "hvparams": cluster.hvparams,
1921 "beparams": cluster.beparams,
1927 class LUQueryConfigValues(NoHooksLU):
1928 """Return configuration values.
1934 def ExpandNames(self):
1935 self.needed_locks = {}
1937 static_fields = ["cluster_name", "master_node", "drain_flag"]
1938 _CheckOutputFields(static=static_fields,
1940 selected=self.op.output_fields)
1942 def CheckPrereq(self):
1943 """No prerequisites.
1948 def Exec(self, feedback_fn):
1949 """Dump a representation of the cluster config to the standard output.
1953 for field in self.op.output_fields:
1954 if field == "cluster_name":
1955 entry = self.cfg.GetClusterName()
1956 elif field == "master_node":
1957 entry = self.cfg.GetMasterNode()
1958 elif field == "drain_flag":
1959 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1961 raise errors.ParameterError(field)
1962 values.append(entry)
1966 class LUActivateInstanceDisks(NoHooksLU):
1967 """Bring up an instance's disks.
1970 _OP_REQP = ["instance_name"]
1973 def ExpandNames(self):
1974 self._ExpandAndLockInstance()
1975 self.needed_locks[locking.LEVEL_NODE] = []
1976 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1978 def DeclareLocks(self, level):
1979 if level == locking.LEVEL_NODE:
1980 self._LockInstancesNodes()
1982 def CheckPrereq(self):
1983 """Check prerequisites.
1985 This checks that the instance is in the cluster.
1988 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1989 assert self.instance is not None, \
1990 "Cannot retrieve locked instance %s" % self.op.instance_name
1992 def Exec(self, feedback_fn):
1993 """Activate the disks.
1996 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1998 raise errors.OpExecError("Cannot activate block devices")
2003 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2004 """Prepare the block devices for an instance.
2006 This sets up the block devices on all nodes.
2009 instance: a ganeti.objects.Instance object
2010 ignore_secondaries: if true, errors on secondary nodes won't result
2011 in an error return from the function
2014 false if the operation failed
2015 list of (host, instance_visible_name, node_visible_name) if the operation
2016 suceeded with the mapping from node devices to instance devices
2020 iname = instance.name
2021 # With the two passes mechanism we try to reduce the window of
2022 # opportunity for the race condition of switching DRBD to primary
2023 # before handshaking occured, but we do not eliminate it
2025 # The proper fix would be to wait (with some limits) until the
2026 # connection has been made and drbd transitions from WFConnection
2027 # into any other network-connected state (Connected, SyncTarget,
2030 # 1st pass, assemble on all nodes in secondary mode
2031 for inst_disk in instance.disks:
2032 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2033 lu.cfg.SetDiskID(node_disk, node)
2034 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2036 logging.error("Could not prepare block device %s on node %s"
2037 " (is_primary=False, pass=1)", inst_disk.iv_name, node)
2038 if not ignore_secondaries:
2041 # FIXME: race condition on drbd migration to primary
2043 # 2nd pass, do only the primary node
2044 for inst_disk in instance.disks:
2045 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2046 if node != instance.primary_node:
2048 lu.cfg.SetDiskID(node_disk, node)
2049 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2051 logging.error("Could not prepare block device %s on node %s"
2052 " (is_primary=True, pass=2)", inst_disk.iv_name, node)
2054 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2056 # leave the disks configured for the primary node
2057 # this is a workaround that would be fixed better by
2058 # improving the logical/physical id handling
2059 for disk in instance.disks:
2060 lu.cfg.SetDiskID(disk, instance.primary_node)
2062 return disks_ok, device_info
2065 def _StartInstanceDisks(lu, instance, force):
2066 """Start the disks of an instance.
2069 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2070 ignore_secondaries=force)
2072 _ShutdownInstanceDisks(lu, instance)
2073 if force is not None and not force:
2074 logging.error("If the message above refers to a secondary node,"
2075 " you can retry the operation using '--force'.")
2076 raise errors.OpExecError("Disk consistency error")
2079 class LUDeactivateInstanceDisks(NoHooksLU):
2080 """Shutdown an instance's disks.
2083 _OP_REQP = ["instance_name"]
2086 def ExpandNames(self):
2087 self._ExpandAndLockInstance()
2088 self.needed_locks[locking.LEVEL_NODE] = []
2089 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2091 def DeclareLocks(self, level):
2092 if level == locking.LEVEL_NODE:
2093 self._LockInstancesNodes()
2095 def CheckPrereq(self):
2096 """Check prerequisites.
2098 This checks that the instance is in the cluster.
2101 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2102 assert self.instance is not None, \
2103 "Cannot retrieve locked instance %s" % self.op.instance_name
2105 def Exec(self, feedback_fn):
2106 """Deactivate the disks
2109 instance = self.instance
2110 _SafeShutdownInstanceDisks(self, instance)
2113 def _SafeShutdownInstanceDisks(lu, instance):
2114 """Shutdown block devices of an instance.
2116 This function checks if an instance is running, before calling
2117 _ShutdownInstanceDisks.
2120 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2121 [instance.hypervisor])
2122 ins_l = ins_l[instance.primary_node]
2123 if not type(ins_l) is list:
2124 raise errors.OpExecError("Can't contact node '%s'" %
2125 instance.primary_node)
2127 if instance.name in ins_l:
2128 raise errors.OpExecError("Instance is running, can't shutdown"
2131 _ShutdownInstanceDisks(lu, instance)
2134 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2135 """Shutdown block devices of an instance.
2137 This does the shutdown on all nodes of the instance.
2139 If the ignore_primary is false, errors on the primary node are
2144 for disk in instance.disks:
2145 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2146 lu.cfg.SetDiskID(top_disk, node)
2147 if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2148 logging.error("Could not shutdown block device %s on node %s",
2150 if not ignore_primary or node != instance.primary_node:
2155 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2156 """Checks if a node has enough free memory.
2158 This function check if a given node has the needed amount of free
2159 memory. In case the node has less memory or we cannot get the
2160 information from the node, this function raise an OpPrereqError
2163 @type lu: C{LogicalUnit}
2164 @param lu: a logical unit from which we get configuration data
2166 @param node: the node to check
2167 @type reason: C{str}
2168 @param reason: string to use in the error message
2169 @type requested: C{int}
2170 @param requested: the amount of memory in MiB to check for
2171 @type hypervisor: C{str}
2172 @param hypervisor: the hypervisor to ask for memory stats
2173 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2174 we cannot check the node
2177 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2178 if not nodeinfo or not isinstance(nodeinfo, dict):
2179 raise errors.OpPrereqError("Could not contact node %s for resource"
2180 " information" % (node,))
2182 free_mem = nodeinfo[node].get('memory_free')
2183 if not isinstance(free_mem, int):
2184 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2185 " was '%s'" % (node, free_mem))
2186 if requested > free_mem:
2187 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2188 " needed %s MiB, available %s MiB" %
2189 (node, reason, requested, free_mem))
2192 class LUStartupInstance(LogicalUnit):
2193 """Starts an instance.
2196 HPATH = "instance-start"
2197 HTYPE = constants.HTYPE_INSTANCE
2198 _OP_REQP = ["instance_name", "force"]
2201 def ExpandNames(self):
2202 self._ExpandAndLockInstance()
2203 self.needed_locks[locking.LEVEL_NODE] = []
2204 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2206 def DeclareLocks(self, level):
2207 if level == locking.LEVEL_NODE:
2208 self._LockInstancesNodes()
2210 def BuildHooksEnv(self):
2213 This runs on master, primary and secondary nodes of the instance.
2217 "FORCE": self.op.force,
2219 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2220 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2221 list(self.instance.secondary_nodes))
2224 def CheckPrereq(self):
2225 """Check prerequisites.
2227 This checks that the instance is in the cluster.
2230 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2231 assert self.instance is not None, \
2232 "Cannot retrieve locked instance %s" % self.op.instance_name
2234 bep = self.cfg.GetClusterInfo().FillBE(instance)
2235 # check bridges existance
2236 _CheckInstanceBridgesExist(self, instance)
2238 _CheckNodeFreeMemory(self, instance.primary_node,
2239 "starting instance %s" % instance.name,
2240 bep[constants.BE_MEMORY], instance.hypervisor)
2242 def Exec(self, feedback_fn):
2243 """Start the instance.
2246 instance = self.instance
2247 force = self.op.force
2248 extra_args = getattr(self.op, "extra_args", "")
2250 self.cfg.MarkInstanceUp(instance.name)
2252 node_current = instance.primary_node
2254 _StartInstanceDisks(self, instance, force)
2256 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2257 _ShutdownInstanceDisks(self, instance)
2258 raise errors.OpExecError("Could not start instance")
2261 class LURebootInstance(LogicalUnit):
2262 """Reboot an instance.
2265 HPATH = "instance-reboot"
2266 HTYPE = constants.HTYPE_INSTANCE
2267 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2270 def ExpandNames(self):
2271 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2272 constants.INSTANCE_REBOOT_HARD,
2273 constants.INSTANCE_REBOOT_FULL]:
2274 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2275 (constants.INSTANCE_REBOOT_SOFT,
2276 constants.INSTANCE_REBOOT_HARD,
2277 constants.INSTANCE_REBOOT_FULL))
2278 self._ExpandAndLockInstance()
2279 self.needed_locks[locking.LEVEL_NODE] = []
2280 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2282 def DeclareLocks(self, level):
2283 if level == locking.LEVEL_NODE:
2284 primary_only = not constants.INSTANCE_REBOOT_FULL
2285 self._LockInstancesNodes(primary_only=primary_only)
2287 def BuildHooksEnv(self):
2290 This runs on master, primary and secondary nodes of the instance.
2294 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2296 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2297 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2298 list(self.instance.secondary_nodes))
2301 def CheckPrereq(self):
2302 """Check prerequisites.
2304 This checks that the instance is in the cluster.
2307 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2308 assert self.instance is not None, \
2309 "Cannot retrieve locked instance %s" % self.op.instance_name
2311 # check bridges existance
2312 _CheckInstanceBridgesExist(self, instance)
2314 def Exec(self, feedback_fn):
2315 """Reboot the instance.
2318 instance = self.instance
2319 ignore_secondaries = self.op.ignore_secondaries
2320 reboot_type = self.op.reboot_type
2321 extra_args = getattr(self.op, "extra_args", "")
2323 node_current = instance.primary_node
2325 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2326 constants.INSTANCE_REBOOT_HARD]:
2327 if not self.rpc.call_instance_reboot(node_current, instance,
2328 reboot_type, extra_args):
2329 raise errors.OpExecError("Could not reboot instance")
2331 if not self.rpc.call_instance_shutdown(node_current, instance):
2332 raise errors.OpExecError("could not shutdown instance for full reboot")
2333 _ShutdownInstanceDisks(self, instance)
2334 _StartInstanceDisks(self, instance, ignore_secondaries)
2335 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2336 _ShutdownInstanceDisks(self, instance)
2337 raise errors.OpExecError("Could not start instance for full reboot")
2339 self.cfg.MarkInstanceUp(instance.name)
2342 class LUShutdownInstance(LogicalUnit):
2343 """Shutdown an instance.
2346 HPATH = "instance-stop"
2347 HTYPE = constants.HTYPE_INSTANCE
2348 _OP_REQP = ["instance_name"]
2351 def ExpandNames(self):
2352 self._ExpandAndLockInstance()
2353 self.needed_locks[locking.LEVEL_NODE] = []
2354 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2356 def DeclareLocks(self, level):
2357 if level == locking.LEVEL_NODE:
2358 self._LockInstancesNodes()
2360 def BuildHooksEnv(self):
2363 This runs on master, primary and secondary nodes of the instance.
2366 env = _BuildInstanceHookEnvByObject(self, self.instance)
2367 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2368 list(self.instance.secondary_nodes))
2371 def CheckPrereq(self):
2372 """Check prerequisites.
2374 This checks that the instance is in the cluster.
2377 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2378 assert self.instance is not None, \
2379 "Cannot retrieve locked instance %s" % self.op.instance_name
2381 def Exec(self, feedback_fn):
2382 """Shutdown the instance.
2385 instance = self.instance
2386 node_current = instance.primary_node
2387 self.cfg.MarkInstanceDown(instance.name)
2388 if not self.rpc.call_instance_shutdown(node_current, instance):
2389 logging.error("Could not shutdown instance")
2391 _ShutdownInstanceDisks(self, instance)
2394 class LUReinstallInstance(LogicalUnit):
2395 """Reinstall an instance.
2398 HPATH = "instance-reinstall"
2399 HTYPE = constants.HTYPE_INSTANCE
2400 _OP_REQP = ["instance_name"]
2403 def ExpandNames(self):
2404 self._ExpandAndLockInstance()
2405 self.needed_locks[locking.LEVEL_NODE] = []
2406 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2408 def DeclareLocks(self, level):
2409 if level == locking.LEVEL_NODE:
2410 self._LockInstancesNodes()
2412 def BuildHooksEnv(self):
2415 This runs on master, primary and secondary nodes of the instance.
2418 env = _BuildInstanceHookEnvByObject(self, self.instance)
2419 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2420 list(self.instance.secondary_nodes))
2423 def CheckPrereq(self):
2424 """Check prerequisites.
2426 This checks that the instance is in the cluster and is not running.
2429 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2430 assert instance is not None, \
2431 "Cannot retrieve locked instance %s" % self.op.instance_name
2433 if instance.disk_template == constants.DT_DISKLESS:
2434 raise errors.OpPrereqError("Instance '%s' has no disks" %
2435 self.op.instance_name)
2436 if instance.status != "down":
2437 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2438 self.op.instance_name)
2439 remote_info = self.rpc.call_instance_info(instance.primary_node,
2441 instance.hypervisor)
2443 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2444 (self.op.instance_name,
2445 instance.primary_node))
2447 self.op.os_type = getattr(self.op, "os_type", None)
2448 if self.op.os_type is not None:
2450 pnode = self.cfg.GetNodeInfo(
2451 self.cfg.ExpandNodeName(instance.primary_node))
2453 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2455 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2457 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2458 " primary node" % self.op.os_type)
2460 self.instance = instance
2462 def Exec(self, feedback_fn):
2463 """Reinstall the instance.
2466 inst = self.instance
2468 if self.op.os_type is not None:
2469 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2470 inst.os = self.op.os_type
2471 self.cfg.Update(inst)
2473 _StartInstanceDisks(self, inst, None)
2475 feedback_fn("Running the instance OS create scripts...")
2476 if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2478 raise errors.OpExecError("Could not install OS for instance %s"
2480 (inst.name, inst.primary_node))
2482 _ShutdownInstanceDisks(self, inst)
2485 class LURenameInstance(LogicalUnit):
2486 """Rename an instance.
2489 HPATH = "instance-rename"
2490 HTYPE = constants.HTYPE_INSTANCE
2491 _OP_REQP = ["instance_name", "new_name"]
2493 def BuildHooksEnv(self):
2496 This runs on master, primary and secondary nodes of the instance.
2499 env = _BuildInstanceHookEnvByObject(self, self.instance)
2500 env["INSTANCE_NEW_NAME"] = self.op.new_name
2501 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2502 list(self.instance.secondary_nodes))
2505 def CheckPrereq(self):
2506 """Check prerequisites.
2508 This checks that the instance is in the cluster and is not running.
2511 instance = self.cfg.GetInstanceInfo(
2512 self.cfg.ExpandInstanceName(self.op.instance_name))
2513 if instance is None:
2514 raise errors.OpPrereqError("Instance '%s' not known" %
2515 self.op.instance_name)
2516 if instance.status != "down":
2517 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2518 self.op.instance_name)
2519 remote_info = self.rpc.call_instance_info(instance.primary_node,
2521 instance.hypervisor)
2523 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2524 (self.op.instance_name,
2525 instance.primary_node))
2526 self.instance = instance
2528 # new name verification
2529 name_info = utils.HostInfo(self.op.new_name)
2531 self.op.new_name = new_name = name_info.name
2532 instance_list = self.cfg.GetInstanceList()
2533 if new_name in instance_list:
2534 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2537 if not getattr(self.op, "ignore_ip", False):
2538 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2539 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2540 (name_info.ip, new_name))
2543 def Exec(self, feedback_fn):
2544 """Reinstall the instance.
2547 inst = self.instance
2548 old_name = inst.name
2550 if inst.disk_template == constants.DT_FILE:
2551 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2553 self.cfg.RenameInstance(inst.name, self.op.new_name)
2554 # Change the instance lock. This is definitely safe while we hold the BGL
2555 self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2556 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2558 # re-read the instance from the configuration after rename
2559 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2561 if inst.disk_template == constants.DT_FILE:
2562 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2563 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2564 old_file_storage_dir,
2565 new_file_storage_dir)
2568 raise errors.OpExecError("Could not connect to node '%s' to rename"
2569 " directory '%s' to '%s' (but the instance"
2570 " has been renamed in Ganeti)" % (
2571 inst.primary_node, old_file_storage_dir,
2572 new_file_storage_dir))
2575 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2576 " (but the instance has been renamed in"
2577 " Ganeti)" % (old_file_storage_dir,
2578 new_file_storage_dir))
2580 _StartInstanceDisks(self, inst, None)
2582 if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2584 msg = ("Could not run OS rename script for instance %s on node %s"
2585 " (but the instance has been renamed in Ganeti)" %
2586 (inst.name, inst.primary_node))
2589 _ShutdownInstanceDisks(self, inst)
2592 class LURemoveInstance(LogicalUnit):
2593 """Remove an instance.
2596 HPATH = "instance-remove"
2597 HTYPE = constants.HTYPE_INSTANCE
2598 _OP_REQP = ["instance_name", "ignore_failures"]
2601 def ExpandNames(self):
2602 self._ExpandAndLockInstance()
2603 self.needed_locks[locking.LEVEL_NODE] = []
2604 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2606 def DeclareLocks(self, level):
2607 if level == locking.LEVEL_NODE:
2608 self._LockInstancesNodes()
2610 def BuildHooksEnv(self):
2613 This runs on master, primary and secondary nodes of the instance.
2616 env = _BuildInstanceHookEnvByObject(self, self.instance)
2617 nl = [self.cfg.GetMasterNode()]
2620 def CheckPrereq(self):
2621 """Check prerequisites.
2623 This checks that the instance is in the cluster.
2626 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2627 assert self.instance is not None, \
2628 "Cannot retrieve locked instance %s" % self.op.instance_name
2630 def Exec(self, feedback_fn):
2631 """Remove the instance.
2634 instance = self.instance
2635 logging.info("Shutting down instance %s on node %s",
2636 instance.name, instance.primary_node)
2638 if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2639 if self.op.ignore_failures:
2640 feedback_fn("Warning: can't shutdown instance")
2642 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2643 (instance.name, instance.primary_node))
2645 logging.info("Removing block devices for instance %s", instance.name)
2647 if not _RemoveDisks(self, instance):
2648 if self.op.ignore_failures:
2649 feedback_fn("Warning: can't remove instance's disks")
2651 raise errors.OpExecError("Can't remove instance's disks")
2653 logging.info("Removing instance %s out of cluster config", instance.name)
2655 self.cfg.RemoveInstance(instance.name)
2656 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2659 class LUQueryInstances(NoHooksLU):
2660 """Logical unit for querying instances.
2663 _OP_REQP = ["output_fields", "names"]
2666 def ExpandNames(self):
2667 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2668 hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2669 bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2670 self.static_fields = frozenset([
2671 "name", "os", "pnode", "snodes",
2672 "admin_state", "admin_ram",
2673 "disk_template", "ip", "mac", "bridge",
2674 "sda_size", "sdb_size", "vcpus", "tags",
2676 "serial_no", "hypervisor", "hvparams",
2679 _CheckOutputFields(static=self.static_fields,
2680 dynamic=self.dynamic_fields,
2681 selected=self.op.output_fields)
2683 self.needed_locks = {}
2684 self.share_locks[locking.LEVEL_INSTANCE] = 1
2685 self.share_locks[locking.LEVEL_NODE] = 1
2688 self.wanted = _GetWantedInstances(self, self.op.names)
2690 self.wanted = locking.ALL_SET
2692 self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2694 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2695 self.needed_locks[locking.LEVEL_NODE] = []
2696 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2698 def DeclareLocks(self, level):
2699 if level == locking.LEVEL_NODE and self.do_locking:
2700 self._LockInstancesNodes()
2702 def CheckPrereq(self):
2703 """Check prerequisites.
2708 def Exec(self, feedback_fn):
2709 """Computes the list of nodes and their attributes.
2712 all_info = self.cfg.GetAllInstancesInfo()
2714 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2715 elif self.wanted != locking.ALL_SET:
2716 instance_names = self.wanted
2717 missing = set(instance_names).difference(all_info.keys())
2719 raise errors.OpExecError(
2720 "Some instances were removed before retrieving their data: %s"
2723 instance_names = all_info.keys()
2725 instance_names = utils.NiceSort(instance_names)
2726 instance_list = [all_info[iname] for iname in instance_names]
2728 # begin data gathering
2730 nodes = frozenset([inst.primary_node for inst in instance_list])
2731 hv_list = list(set([inst.hypervisor for inst in instance_list]))
2734 if self.dynamic_fields.intersection(self.op.output_fields):
2736 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2738 result = node_data[name]
2740 live_data.update(result)
2741 elif result == False:
2742 bad_nodes.append(name)
2743 # else no instance is alive
2745 live_data = dict([(name, {}) for name in instance_names])
2747 # end data gathering
2752 for instance in instance_list:
2754 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2755 i_be = self.cfg.GetClusterInfo().FillBE(instance)
2756 for field in self.op.output_fields:
2761 elif field == "pnode":
2762 val = instance.primary_node
2763 elif field == "snodes":
2764 val = list(instance.secondary_nodes)
2765 elif field == "admin_state":
2766 val = (instance.status != "down")
2767 elif field == "oper_state":
2768 if instance.primary_node in bad_nodes:
2771 val = bool(live_data.get(instance.name))
2772 elif field == "status":
2773 if instance.primary_node in bad_nodes:
2774 val = "ERROR_nodedown"
2776 running = bool(live_data.get(instance.name))
2778 if instance.status != "down":
2783 if instance.status != "down":
2787 elif field == "oper_ram":
2788 if instance.primary_node in bad_nodes:
2790 elif instance.name in live_data:
2791 val = live_data[instance.name].get("memory", "?")
2794 elif field == "disk_template":
2795 val = instance.disk_template
2797 val = instance.nics[0].ip
2798 elif field == "bridge":
2799 val = instance.nics[0].bridge
2800 elif field == "mac":
2801 val = instance.nics[0].mac
2802 elif field == "sda_size" or field == "sdb_size":
2803 disk = instance.FindDisk(field[:3])
2808 elif field == "tags":
2809 val = list(instance.GetTags())
2810 elif field == "serial_no":
2811 val = instance.serial_no
2812 elif field == "network_port":
2813 val = instance.network_port
2814 elif field == "hypervisor":
2815 val = instance.hypervisor
2816 elif field == "hvparams":
2818 elif (field.startswith(HVPREFIX) and
2819 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2820 val = i_hv.get(field[len(HVPREFIX):], None)
2821 elif field == "beparams":
2823 elif (field.startswith(BEPREFIX) and
2824 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2825 val = i_be.get(field[len(BEPREFIX):], None)
2827 raise errors.ParameterError(field)
2834 class LUFailoverInstance(LogicalUnit):
2835 """Failover an instance.
2838 HPATH = "instance-failover"
2839 HTYPE = constants.HTYPE_INSTANCE
2840 _OP_REQP = ["instance_name", "ignore_consistency"]
2843 def ExpandNames(self):
2844 self._ExpandAndLockInstance()
2845 self.needed_locks[locking.LEVEL_NODE] = []
2846 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2848 def DeclareLocks(self, level):
2849 if level == locking.LEVEL_NODE:
2850 self._LockInstancesNodes()
2852 def BuildHooksEnv(self):
2855 This runs on master, primary and secondary nodes of the instance.
2859 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2861 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2862 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2865 def CheckPrereq(self):
2866 """Check prerequisites.
2868 This checks that the instance is in the cluster.
2871 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2872 assert self.instance is not None, \
2873 "Cannot retrieve locked instance %s" % self.op.instance_name
2875 bep = self.cfg.GetClusterInfo().FillBE(instance)
2876 if instance.disk_template not in constants.DTS_NET_MIRROR:
2877 raise errors.OpPrereqError("Instance's disk layout is not"
2878 " network mirrored, cannot failover.")
2880 secondary_nodes = instance.secondary_nodes
2881 if not secondary_nodes:
2882 raise errors.ProgrammerError("no secondary node but using "
2883 "a mirrored disk template")
2885 target_node = secondary_nodes[0]
2886 # check memory requirements on the secondary node
2887 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2888 instance.name, bep[constants.BE_MEMORY],
2889 instance.hypervisor)
2891 # check bridge existance
2892 brlist = [nic.bridge for nic in instance.nics]
2893 if not self.rpc.call_bridges_exist(target_node, brlist):
2894 raise errors.OpPrereqError("One or more target bridges %s does not"
2895 " exist on destination node '%s'" %
2896 (brlist, target_node))
2898 def Exec(self, feedback_fn):
2899 """Failover an instance.
2901 The failover is done by shutting it down on its present node and
2902 starting it on the secondary.
2905 instance = self.instance
2907 source_node = instance.primary_node
2908 target_node = instance.secondary_nodes[0]
2910 feedback_fn("* checking disk consistency between source and target")
2911 for dev in instance.disks:
2912 # for drbd, these are drbd over lvm
2913 if not _CheckDiskConsistency(self, dev, target_node, False):
2914 if instance.status == "up" and not self.op.ignore_consistency:
2915 raise errors.OpExecError("Disk %s is degraded on target node,"
2916 " aborting failover." % dev.iv_name)
2918 feedback_fn("* shutting down instance on source node")
2919 logging.info("Shutting down instance %s on node %s",
2920 instance.name, source_node)
2922 if not self.rpc.call_instance_shutdown(source_node, instance):
2923 if self.op.ignore_consistency:
2924 logging.error("Could not shutdown instance %s on node %s. Proceeding"
2925 " anyway. Please make sure node %s is down",
2926 instance.name, source_node, source_node)
2928 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2929 (instance.name, source_node))
2931 feedback_fn("* deactivating the instance's disks on source node")
2932 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2933 raise errors.OpExecError("Can't shut down the instance's disks.")
2935 instance.primary_node = target_node
2936 # distribute new instance config to the other nodes
2937 self.cfg.Update(instance)
2939 # Only start the instance if it's marked as up
2940 if instance.status == "up":
2941 feedback_fn("* activating the instance's disks on target node")
2942 logging.info("Starting instance %s on node %s",
2943 instance.name, target_node)
2945 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2946 ignore_secondaries=True)
2948 _ShutdownInstanceDisks(self, instance)
2949 raise errors.OpExecError("Can't activate the instance's disks")
2951 feedback_fn("* starting the instance on the target node")
2952 if not self.rpc.call_instance_start(target_node, instance, None):
2953 _ShutdownInstanceDisks(self, instance)
2954 raise errors.OpExecError("Could not start instance %s on node %s." %
2955 (instance.name, target_node))
2958 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2959 """Create a tree of block devices on the primary node.
2961 This always creates all devices.
2965 for child in device.children:
2966 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2969 lu.cfg.SetDiskID(device, node)
2970 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2971 instance.name, True, info)
2974 if device.physical_id is None:
2975 device.physical_id = new_id
2979 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2980 """Create a tree of block devices on a secondary node.
2982 If this device type has to be created on secondaries, create it and
2985 If not, just recurse to children keeping the same 'force' value.
2988 if device.CreateOnSecondary():
2991 for child in device.children:
2992 if not _CreateBlockDevOnSecondary(lu, node, instance,
2993 child, force, info):
2998 lu.cfg.SetDiskID(device, node)
2999 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3000 instance.name, False, info)
3003 if device.physical_id is None:
3004 device.physical_id = new_id
3008 def _GenerateUniqueNames(lu, exts):
3009 """Generate a suitable LV name.
3011 This will generate a logical volume name for the given instance.
3016 new_id = lu.cfg.GenerateUniqueID()
3017 results.append("%s%s" % (new_id, val))
3021 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3023 """Generate a drbd8 device complete with its children.
3026 port = lu.cfg.AllocatePort()
3027 vgname = lu.cfg.GetVGName()
3028 shared_secret = lu.cfg.GenerateDRBDSecret()
3029 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3030 logical_id=(vgname, names[0]))
3031 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3032 logical_id=(vgname, names[1]))
3033 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3034 logical_id=(primary, secondary, port,
3037 children=[dev_data, dev_meta],
3042 def _GenerateDiskTemplate(lu, template_name,
3043 instance_name, primary_node,
3044 secondary_nodes, disk_sz, swap_sz,
3045 file_storage_dir, file_driver):
3046 """Generate the entire disk layout for a given template type.
3049 #TODO: compute space requirements
3051 vgname = lu.cfg.GetVGName()
3052 if template_name == constants.DT_DISKLESS:
3054 elif template_name == constants.DT_PLAIN:
3055 if len(secondary_nodes) != 0:
3056 raise errors.ProgrammerError("Wrong template configuration")
3058 names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3059 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3060 logical_id=(vgname, names[0]),
3062 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3063 logical_id=(vgname, names[1]),
3065 disks = [sda_dev, sdb_dev]
3066 elif template_name == constants.DT_DRBD8:
3067 if len(secondary_nodes) != 1:
3068 raise errors.ProgrammerError("Wrong template configuration")
3069 remote_node = secondary_nodes[0]
3070 (minor_pa, minor_pb,
3071 minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3072 [primary_node, primary_node, remote_node, remote_node], instance_name)
3074 names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3075 ".sdb_data", ".sdb_meta"])
3076 drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3077 disk_sz, names[0:2], "sda",
3079 drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3080 swap_sz, names[2:4], "sdb",
3082 disks = [drbd_sda_dev, drbd_sdb_dev]
3083 elif template_name == constants.DT_FILE:
3084 if len(secondary_nodes) != 0:
3085 raise errors.ProgrammerError("Wrong template configuration")
3087 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3088 iv_name="sda", logical_id=(file_driver,
3089 "%s/sda" % file_storage_dir))
3090 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3091 iv_name="sdb", logical_id=(file_driver,
3092 "%s/sdb" % file_storage_dir))
3093 disks = [file_sda_dev, file_sdb_dev]
3095 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3099 def _GetInstanceInfoText(instance):
3100 """Compute that text that should be added to the disk's metadata.
3103 return "originstname+%s" % instance.name
3106 def _CreateDisks(lu, instance):
3107 """Create all disks for an instance.
3109 This abstracts away some work from AddInstance.
3112 instance: the instance object
3115 True or False showing the success of the creation process
3118 info = _GetInstanceInfoText(instance)
3120 if instance.disk_template == constants.DT_FILE:
3121 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3122 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3126 logging.error("Could not connect to node '%s'", instance.primary_node)
3130 logging.error("Failed to create directory '%s'", file_storage_dir)
3133 for device in instance.disks:
3134 logging.info("Creating volume %s for instance %s",
3135 device.iv_name, instance.name)
3137 for secondary_node in instance.secondary_nodes:
3138 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3139 device, False, info):
3140 logging.error("Failed to create volume %s (%s) on secondary node %s!",
3141 device.iv_name, device, secondary_node)
3144 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3145 instance, device, info):
3146 logging.error("Failed to create volume %s on primary!", device.iv_name)
3152 def _RemoveDisks(lu, instance):
3153 """Remove all disks for an instance.
3155 This abstracts away some work from `AddInstance()` and
3156 `RemoveInstance()`. Note that in case some of the devices couldn't
3157 be removed, the removal will continue with the other ones (compare
3158 with `_CreateDisks()`).
3161 instance: the instance object
3164 True or False showing the success of the removal proces
3167 logging.info("Removing block devices for instance %s", instance.name)
3170 for device in instance.disks:
3171 for node, disk in device.ComputeNodeTree(instance.primary_node):
3172 lu.cfg.SetDiskID(disk, node)
3173 if not lu.rpc.call_blockdev_remove(node, disk):
3174 logging.error("Could not remove block device %s on node %s,"
3175 " continuing anyway", device.iv_name, node)
3178 if instance.disk_template == constants.DT_FILE:
3179 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3180 if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3182 logging.error("Could not remove directory '%s'", file_storage_dir)
3188 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3189 """Compute disk size requirements in the volume group
3191 This is currently hard-coded for the two-drive layout.
3194 # Required free disk space as a function of disk and swap space
3196 constants.DT_DISKLESS: None,
3197 constants.DT_PLAIN: disk_size + swap_size,
3198 # 256 MB are added for drbd metadata, 128MB for each drbd device
3199 constants.DT_DRBD8: disk_size + swap_size + 256,
3200 constants.DT_FILE: None,
3203 if disk_template not in req_size_dict:
3204 raise errors.ProgrammerError("Disk template '%s' size requirement"
3205 " is unknown" % disk_template)
3207 return req_size_dict[disk_template]
3210 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3211 """Hypervisor parameter validation.
3213 This function abstract the hypervisor parameter validation to be
3214 used in both instance create and instance modify.
3216 @type lu: L{LogicalUnit}
3217 @param lu: the logical unit for which we check
3218 @type nodenames: list
3219 @param nodenames: the list of nodes on which we should check
3220 @type hvname: string
3221 @param hvname: the name of the hypervisor we should use
3222 @type hvparams: dict
3223 @param hvparams: the parameters which we need to check
3224 @raise errors.OpPrereqError: if the parameters are not valid
3227 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3230 for node in nodenames:
3231 info = hvinfo.get(node, None)
3232 if not info or not isinstance(info, (tuple, list)):
3233 raise errors.OpPrereqError("Cannot get current information"
3234 " from node '%s' (%s)" % (node, info))
3236 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3240 class LUCreateInstance(LogicalUnit):
3241 """Create an instance.
3244 HPATH = "instance-add"
3245 HTYPE = constants.HTYPE_INSTANCE
3246 _OP_REQP = ["instance_name", "disk_size",
3247 "disk_template", "swap_size", "mode", "start",
3248 "wait_for_sync", "ip_check", "mac",
3249 "hvparams", "beparams"]
3252 def _ExpandNode(self, node):
3253 """Expands and checks one node name.
3256 node_full = self.cfg.ExpandNodeName(node)
3257 if node_full is None:
3258 raise errors.OpPrereqError("Unknown node %s" % node)
3261 def ExpandNames(self):
3262 """ExpandNames for CreateInstance.
3264 Figure out the right locks for instance creation.
3267 self.needed_locks = {}
3269 # set optional parameters to none if they don't exist
3270 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3271 if not hasattr(self.op, attr):
3272 setattr(self.op, attr, None)
3274 # cheap checks, mostly valid constants given
3276 # verify creation mode
3277 if self.op.mode not in (constants.INSTANCE_CREATE,
3278 constants.INSTANCE_IMPORT):
3279 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3282 # disk template and mirror node verification
3283 if self.op.disk_template not in constants.DISK_TEMPLATES:
3284 raise errors.OpPrereqError("Invalid disk template name")
3286 if self.op.hypervisor is None:
3287 self.op.hypervisor = self.cfg.GetHypervisorType()
3289 cluster = self.cfg.GetClusterInfo()
3290 enabled_hvs = cluster.enabled_hypervisors
3291 if self.op.hypervisor not in enabled_hvs:
3292 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3293 " cluster (%s)" % (self.op.hypervisor,
3294 ",".join(enabled_hvs)))
3296 # check hypervisor parameter syntax (locally)
3298 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3300 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3301 hv_type.CheckParameterSyntax(filled_hvp)
3303 # fill and remember the beparams dict
3304 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3307 #### instance parameters check
3309 # instance name verification
3310 hostname1 = utils.HostInfo(self.op.instance_name)
3311 self.op.instance_name = instance_name = hostname1.name
3313 # this is just a preventive check, but someone might still add this
3314 # instance in the meantime, and creation will fail at lock-add time
3315 if instance_name in self.cfg.GetInstanceList():
3316 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3319 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3321 # ip validity checks
3322 ip = getattr(self.op, "ip", None)
3323 if ip is None or ip.lower() == "none":
3325 elif ip.lower() == constants.VALUE_AUTO:
3326 inst_ip = hostname1.ip
3328 if not utils.IsValidIP(ip):
3329 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3330 " like a valid IP" % ip)
3332 self.inst_ip = self.op.ip = inst_ip
3333 # used in CheckPrereq for ip ping check
3334 self.check_ip = hostname1.ip
3336 # MAC address verification
3337 if self.op.mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3338 if not utils.IsValidMac(self.op.mac.lower()):
3339 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3342 # file storage checks
3343 if (self.op.file_driver and
3344 not self.op.file_driver in constants.FILE_DRIVER):
3345 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3346 self.op.file_driver)
3348 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3349 raise errors.OpPrereqError("File storage directory path not absolute")
3351 ### Node/iallocator related checks
3352 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3353 raise errors.OpPrereqError("One and only one of iallocator and primary"
3354 " node must be given")
3356 if self.op.iallocator:
3357 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3359 self.op.pnode = self._ExpandNode(self.op.pnode)
3360 nodelist = [self.op.pnode]
3361 if self.op.snode is not None:
3362 self.op.snode = self._ExpandNode(self.op.snode)
3363 nodelist.append(self.op.snode)
3364 self.needed_locks[locking.LEVEL_NODE] = nodelist
3366 # in case of import lock the source node too
3367 if self.op.mode == constants.INSTANCE_IMPORT:
3368 src_node = getattr(self.op, "src_node", None)
3369 src_path = getattr(self.op, "src_path", None)
3371 if src_node is None or src_path is None:
3372 raise errors.OpPrereqError("Importing an instance requires source"
3373 " node and path options")
3375 if not os.path.isabs(src_path):
3376 raise errors.OpPrereqError("The source path must be absolute")
3378 self.op.src_node = src_node = self._ExpandNode(src_node)
3379 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3380 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3382 else: # INSTANCE_CREATE
3383 if getattr(self.op, "os_type", None) is None:
3384 raise errors.OpPrereqError("No guest OS specified")
3386 def _RunAllocator(self):
3387 """Run the allocator based on input opcode.
3390 disks = [{"size": self.op.disk_size, "mode": "w"},
3391 {"size": self.op.swap_size, "mode": "w"}]
3392 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3393 "bridge": self.op.bridge}]
3394 ial = IAllocator(self,
3395 mode=constants.IALLOCATOR_MODE_ALLOC,
3396 name=self.op.instance_name,
3397 disk_template=self.op.disk_template,
3400 vcpus=self.be_full[constants.BE_VCPUS],
3401 mem_size=self.be_full[constants.BE_MEMORY],
3406 ial.Run(self.op.iallocator)
3409 raise errors.OpPrereqError("Can't compute nodes using"
3410 " iallocator '%s': %s" % (self.op.iallocator,
3412 if len(ial.nodes) != ial.required_nodes:
3413 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3414 " of nodes (%s), required %s" %
3415 (self.op.iallocator, len(ial.nodes),
3416 ial.required_nodes))
3417 self.op.pnode = ial.nodes[0]
3418 feedback_fn("Selected nodes for the instance: %s" %
3419 (", ".join(ial.nodes),))
3420 logging.info("Selected nodes for instance %s via iallocator %s: %s",
3421 self.op.instance_name, self.op.iallocator, ial.nodes)
3422 if ial.required_nodes == 2:
3423 self.op.snode = ial.nodes[1]
3425 def BuildHooksEnv(self):
3428 This runs on master, primary and secondary nodes of the instance.
3432 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3433 "INSTANCE_DISK_SIZE": self.op.disk_size,
3434 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3435 "INSTANCE_ADD_MODE": self.op.mode,
3437 if self.op.mode == constants.INSTANCE_IMPORT:
3438 env["INSTANCE_SRC_NODE"] = self.op.src_node
3439 env["INSTANCE_SRC_PATH"] = self.op.src_path
3440 env["INSTANCE_SRC_IMAGE"] = self.src_image
3442 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3443 primary_node=self.op.pnode,
3444 secondary_nodes=self.secondaries,
3445 status=self.instance_status,
3446 os_type=self.op.os_type,
3447 memory=self.be_full[constants.BE_MEMORY],
3448 vcpus=self.be_full[constants.BE_VCPUS],
3449 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3452 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3457 def CheckPrereq(self):
3458 """Check prerequisites.
3461 if (not self.cfg.GetVGName() and
3462 self.op.disk_template not in constants.DTS_NOT_LVM):
3463 raise errors.OpPrereqError("Cluster does not support lvm-based"
3467 if self.op.mode == constants.INSTANCE_IMPORT:
3468 src_node = self.op.src_node
3469 src_path = self.op.src_path
3471 export_info = self.rpc.call_export_info(src_node, src_path)
3474 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3476 if not export_info.has_section(constants.INISECT_EXP):
3477 raise errors.ProgrammerError("Corrupted export config")
3479 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3480 if (int(ei_version) != constants.EXPORT_VERSION):
3481 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3482 (ei_version, constants.EXPORT_VERSION))
3484 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3485 raise errors.OpPrereqError("Can't import instance with more than"
3488 # FIXME: are the old os-es, disk sizes, etc. useful?
3489 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3490 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3492 self.src_image = diskimage
3494 if self.op.mac == constants.VALUE_AUTO:
3495 old_name = export_info.get(constants.INISECT_INS, 'name')
3496 if self.op.instance_name == old_name:
3497 # FIXME: adjust every nic, when we'll be able to create instances
3498 # with more than one
3499 if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3500 self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3502 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3504 if self.op.start and not self.op.ip_check:
3505 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3506 " adding an instance in start mode")
3508 if self.op.ip_check:
3509 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3510 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3511 (self.check_ip, self.op.instance_name))
3513 # bridge verification
3514 bridge = getattr(self.op, "bridge", None)
3516 self.op.bridge = self.cfg.GetDefBridge()
3518 self.op.bridge = bridge
3522 if self.op.iallocator is not None:
3523 self._RunAllocator()
3525 #### node related checks
3527 # check primary node
3528 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3529 assert self.pnode is not None, \
3530 "Cannot retrieve locked node %s" % self.op.pnode
3531 self.secondaries = []
3533 # mirror node verification
3534 if self.op.disk_template in constants.DTS_NET_MIRROR:
3535 if self.op.snode is None:
3536 raise errors.OpPrereqError("The networked disk templates need"
3538 if self.op.snode == pnode.name:
3539 raise errors.OpPrereqError("The secondary node cannot be"
3540 " the primary node.")
3541 self.secondaries.append(self.op.snode)
3543 nodenames = [pnode.name] + self.secondaries
3545 req_size = _ComputeDiskSize(self.op.disk_template,
3546 self.op.disk_size, self.op.swap_size)
3548 # Check lv size requirements
3549 if req_size is not None:
3550 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3552 for node in nodenames:
3553 info = nodeinfo.get(node, None)
3555 raise errors.OpPrereqError("Cannot get current information"
3556 " from node '%s'" % node)
3557 vg_free = info.get('vg_free', None)
3558 if not isinstance(vg_free, int):
3559 raise errors.OpPrereqError("Can't compute free disk space on"
3561 if req_size > info['vg_free']:
3562 raise errors.OpPrereqError("Not enough disk space on target node %s."
3563 " %d MB available, %d MB required" %
3564 (node, info['vg_free'], req_size))
3566 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3569 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3571 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3572 " primary node" % self.op.os_type)
3574 # bridge check on primary node
3575 if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3576 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3577 " destination node '%s'" %
3578 (self.op.bridge, pnode.name))
3580 # memory check on primary node
3582 _CheckNodeFreeMemory(self, self.pnode.name,
3583 "creating instance %s" % self.op.instance_name,
3584 self.be_full[constants.BE_MEMORY],
3588 self.instance_status = 'up'
3590 self.instance_status = 'down'
3592 def Exec(self, feedback_fn):
3593 """Create and add the instance to the cluster.
3596 instance = self.op.instance_name
3597 pnode_name = self.pnode.name
3599 if self.op.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3600 mac_address = self.cfg.GenerateMAC()
3602 mac_address = self.op.mac
3604 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3605 if self.inst_ip is not None:
3606 nic.ip = self.inst_ip
3608 ht_kind = self.op.hypervisor
3609 if ht_kind in constants.HTS_REQ_PORT:
3610 network_port = self.cfg.AllocatePort()
3614 ##if self.op.vnc_bind_address is None:
3615 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3617 # this is needed because os.path.join does not accept None arguments
3618 if self.op.file_storage_dir is None:
3619 string_file_storage_dir = ""
3621 string_file_storage_dir = self.op.file_storage_dir
3623 # build the full file storage dir path
3624 file_storage_dir = os.path.normpath(os.path.join(
3625 self.cfg.GetFileStorageDir(),
3626 string_file_storage_dir, instance))
3629 disks = _GenerateDiskTemplate(self,
3630 self.op.disk_template,
3631 instance, pnode_name,
3632 self.secondaries, self.op.disk_size,
3635 self.op.file_driver)
3637 iobj = objects.Instance(name=instance, os=self.op.os_type,
3638 primary_node=pnode_name,
3639 nics=[nic], disks=disks,
3640 disk_template=self.op.disk_template,
3641 status=self.instance_status,
3642 network_port=network_port,
3643 beparams=self.op.beparams,
3644 hvparams=self.op.hvparams,
3645 hypervisor=self.op.hypervisor,
3648 feedback_fn("* creating instance disks...")
3649 if not _CreateDisks(self, iobj):
3650 _RemoveDisks(self, iobj)
3651 self.cfg.ReleaseDRBDMinors(instance)
3652 raise errors.OpExecError("Device creation failed, reverting...")
3654 feedback_fn("adding instance %s to cluster config" % instance)
3656 self.cfg.AddInstance(iobj)
3657 # Declare that we don't want to remove the instance lock anymore, as we've
3658 # added the instance to the config
3659 del self.remove_locks[locking.LEVEL_INSTANCE]
3660 # Remove the temp. assignements for the instance's drbds
3661 self.cfg.ReleaseDRBDMinors(instance)
3663 if self.op.wait_for_sync:
3664 disk_abort = not _WaitForSync(self, iobj)
3665 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3666 # make sure the disks are not degraded (still sync-ing is ok)
3668 feedback_fn("* checking mirrors status")
3669 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3674 _RemoveDisks(self, iobj)
3675 self.cfg.RemoveInstance(iobj.name)
3676 # Make sure the instance lock gets removed
3677 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3678 raise errors.OpExecError("There are some degraded disks for"
3681 feedback_fn("creating os for instance %s on node %s" %
3682 (instance, pnode_name))
3684 if iobj.disk_template != constants.DT_DISKLESS:
3685 if self.op.mode == constants.INSTANCE_CREATE:
3686 feedback_fn("* running the instance OS create scripts...")
3687 if not self.rpc.call_instance_os_add(pnode_name, iobj):
3688 raise errors.OpExecError("could not add os for instance %s"
3690 (instance, pnode_name))
3692 elif self.op.mode == constants.INSTANCE_IMPORT:
3693 feedback_fn("* running the instance OS import scripts...")
3694 src_node = self.op.src_node
3695 src_image = self.src_image
3696 cluster_name = self.cfg.GetClusterName()
3697 if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3698 src_node, src_image,
3700 raise errors.OpExecError("Could not import os for instance"
3702 (instance, pnode_name))
3704 # also checked in the prereq part
3705 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3709 logging.info("Starting instance %s on node %s", instance, pnode_name)
3710 feedback_fn("* starting instance...")
3711 if not self.rpc.call_instance_start(pnode_name, iobj, None):
3712 raise errors.OpExecError("Could not start instance")
3715 class LUConnectConsole(NoHooksLU):
3716 """Connect to an instance's console.
3718 This is somewhat special in that it returns the command line that
3719 you need to run on the master node in order to connect to the
3723 _OP_REQP = ["instance_name"]
3726 def ExpandNames(self):
3727 self._ExpandAndLockInstance()
3729 def CheckPrereq(self):
3730 """Check prerequisites.
3732 This checks that the instance is in the cluster.
3735 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3736 assert self.instance is not None, \
3737 "Cannot retrieve locked instance %s" % self.op.instance_name
3739 def Exec(self, feedback_fn):
3740 """Connect to the console of an instance
3743 instance = self.instance
3744 node = instance.primary_node
3746 node_insts = self.rpc.call_instance_list([node],
3747 [instance.hypervisor])[node]
3748 if node_insts is False:
3749 raise errors.OpExecError("Can't connect to node %s." % node)
3751 if instance.name not in node_insts:
3752 raise errors.OpExecError("Instance %s is not running." % instance.name)
3754 logging.debug("Connecting to console of %s on %s", instance.name, node)
3756 hyper = hypervisor.GetHypervisor(instance.hypervisor)
3757 console_cmd = hyper.GetShellCommandForConsole(instance)
3760 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3763 class LUReplaceDisks(LogicalUnit):
3764 """Replace the disks of an instance.
3767 HPATH = "mirrors-replace"
3768 HTYPE = constants.HTYPE_INSTANCE
3769 _OP_REQP = ["instance_name", "mode", "disks"]
3772 def ExpandNames(self):
3773 self._ExpandAndLockInstance()
3775 if not hasattr(self.op, "remote_node"):
3776 self.op.remote_node = None
3778 ia_name = getattr(self.op, "iallocator", None)
3779 if ia_name is not None:
3780 if self.op.remote_node is not None:
3781 raise errors.OpPrereqError("Give either the iallocator or the new"
3782 " secondary, not both")
3783 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3784 elif self.op.remote_node is not None:
3785 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3786 if remote_node is None:
3787 raise errors.OpPrereqError("Node '%s' not known" %
3788 self.op.remote_node)
3789 self.op.remote_node = remote_node
3790 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3791 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3793 self.needed_locks[locking.LEVEL_NODE] = []
3794 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3796 def DeclareLocks(self, level):
3797 # If we're not already locking all nodes in the set we have to declare the
3798 # instance's primary/secondary nodes.
3799 if (level == locking.LEVEL_NODE and
3800 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3801 self._LockInstancesNodes()
3803 def _RunAllocator(self):
3804 """Compute a new secondary node using an IAllocator.
3807 ial = IAllocator(self,
3808 mode=constants.IALLOCATOR_MODE_RELOC,
3809 name=self.op.instance_name,
3810 relocate_from=[self.sec_node])
3812 ial.Run(self.op.iallocator)
3815 raise errors.OpPrereqError("Can't compute nodes using"
3816 " iallocator '%s': %s" % (self.op.iallocator,
3818 if len(ial.nodes) != ial.required_nodes:
3819 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3820 " of nodes (%s), required %s" %
3821 (len(ial.nodes), ial.required_nodes))
3822 self.op.remote_node = ial.nodes[0]
3823 feedback_fn("Selected new secondary for the instance: %s" %
3824 self.op.remote_node)
3826 def BuildHooksEnv(self):
3829 This runs on the master, the primary and all the secondaries.
3833 "MODE": self.op.mode,
3834 "NEW_SECONDARY": self.op.remote_node,
3835 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3837 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3839 self.cfg.GetMasterNode(),
3840 self.instance.primary_node,
3842 if self.op.remote_node is not None:
3843 nl.append(self.op.remote_node)
3846 def CheckPrereq(self):
3847 """Check prerequisites.
3849 This checks that the instance is in the cluster.
3852 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3853 assert instance is not None, \
3854 "Cannot retrieve locked instance %s" % self.op.instance_name
3855 self.instance = instance
3857 if instance.disk_template not in constants.DTS_NET_MIRROR:
3858 raise errors.OpPrereqError("Instance's disk layout is not"
3859 " network mirrored.")
3861 if len(instance.secondary_nodes) != 1:
3862 raise errors.OpPrereqError("The instance has a strange layout,"
3863 " expected one secondary but found %d" %
3864 len(instance.secondary_nodes))
3866 self.sec_node = instance.secondary_nodes[0]
3868 ia_name = getattr(self.op, "iallocator", None)
3869 if ia_name is not None:
3870 self._RunAllocator()
3872 remote_node = self.op.remote_node
3873 if remote_node is not None:
3874 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3875 assert self.remote_node_info is not None, \
3876 "Cannot retrieve locked node %s" % remote_node
3878 self.remote_node_info = None
3879 if remote_node == instance.primary_node:
3880 raise errors.OpPrereqError("The specified node is the primary node of"
3882 elif remote_node == self.sec_node:
3883 if self.op.mode == constants.REPLACE_DISK_SEC:
3884 # this is for DRBD8, where we can't execute the same mode of
3885 # replacement as for drbd7 (no different port allocated)
3886 raise errors.OpPrereqError("Same secondary given, cannot execute"
3888 if instance.disk_template == constants.DT_DRBD8:
3889 if (self.op.mode == constants.REPLACE_DISK_ALL and
3890 remote_node is not None):
3891 # switch to replace secondary mode
3892 self.op.mode = constants.REPLACE_DISK_SEC
3894 if self.op.mode == constants.REPLACE_DISK_ALL:
3895 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3896 " secondary disk replacement, not"
3898 elif self.op.mode == constants.REPLACE_DISK_PRI:
3899 if remote_node is not None:
3900 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3901 " the secondary while doing a primary"
3902 " node disk replacement")
3903 self.tgt_node = instance.primary_node
3904 self.oth_node = instance.secondary_nodes[0]
3905 elif self.op.mode == constants.REPLACE_DISK_SEC:
3906 self.new_node = remote_node # this can be None, in which case
3907 # we don't change the secondary
3908 self.tgt_node = instance.secondary_nodes[0]
3909 self.oth_node = instance.primary_node
3911 raise errors.ProgrammerError("Unhandled disk replace mode")
3913 for name in self.op.disks:
3914 if instance.FindDisk(name) is None:
3915 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3916 (name, instance.name))
3918 def _ExecD8DiskOnly(self, feedback_fn):
3919 """Replace a disk on the primary or secondary for dbrd8.
3921 The algorithm for replace is quite complicated:
3922 - for each disk to be replaced:
3923 - create new LVs on the target node with unique names
3924 - detach old LVs from the drbd device
3925 - rename old LVs to name_replaced.<time_t>
3926 - rename new LVs to old LVs
3927 - attach the new LVs (with the old names now) to the drbd device
3928 - wait for sync across all devices
3929 - for each modified disk:
3930 - remove old LVs (which have the name name_replaces.<time_t>)
3932 Failures are not very well handled.
3936 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3937 instance = self.instance
3939 vgname = self.cfg.GetVGName()
3942 tgt_node = self.tgt_node
3943 oth_node = self.oth_node
3945 # Step: check device activation
3946 self.proc.LogStep(1, steps_total, "check device existence")
3947 info("checking volume groups")
3948 my_vg = cfg.GetVGName()
3949 results = self.rpc.call_vg_list([oth_node, tgt_node])
3951 raise errors.OpExecError("Can't list volume groups on the nodes")
3952 for node in oth_node, tgt_node:
3953 res = results.get(node, False)
3954 if not res or my_vg not in res:
3955 raise errors.OpExecError("Volume group '%s' not found on %s" %
3957 for dev in instance.disks:
3958 if not dev.iv_name in self.op.disks:
3960 for node in tgt_node, oth_node:
3961 info("checking %s on %s" % (dev.iv_name, node))
3962 cfg.SetDiskID(dev, node)
3963 if not self.rpc.call_blockdev_find(node, dev):
3964 raise errors.OpExecError("Can't find device %s on node %s" %
3965 (dev.iv_name, node))
3967 # Step: check other node consistency
3968 self.proc.LogStep(2, steps_total, "check peer consistency")
3969 for dev in instance.disks:
3970 if not dev.iv_name in self.op.disks:
3972 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3973 if not _CheckDiskConsistency(self, dev, oth_node,
3974 oth_node==instance.primary_node):
3975 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3976 " to replace disks on this node (%s)" %
3977 (oth_node, tgt_node))
3979 # Step: create new storage
3980 self.proc.LogStep(3, steps_total, "allocate new storage")
3981 for dev in instance.disks:
3982 if not dev.iv_name in self.op.disks:
3985 cfg.SetDiskID(dev, tgt_node)
3986 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3987 names = _GenerateUniqueNames(self, lv_names)
3988 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3989 logical_id=(vgname, names[0]))
3990 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3991 logical_id=(vgname, names[1]))
3992 new_lvs = [lv_data, lv_meta]
3993 old_lvs = dev.children
3994 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3995 info("creating new local storage on %s for %s" %
3996 (tgt_node, dev.iv_name))
3997 # since we *always* want to create this LV, we use the
3998 # _Create...OnPrimary (which forces the creation), even if we
3999 # are talking about the secondary node
4000 for new_lv in new_lvs:
4001 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4002 _GetInstanceInfoText(instance)):
4003 raise errors.OpExecError("Failed to create new LV named '%s' on"
4005 (new_lv.logical_id[1], tgt_node))
4007 # Step: for each lv, detach+rename*2+attach
4008 self.proc.LogStep(4, steps_total, "change drbd configuration")
4009 for dev, old_lvs, new_lvs in iv_names.itervalues():
4010 info("detaching %s drbd from local storage" % dev.iv_name)
4011 if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4012 raise errors.OpExecError("Can't detach drbd from local storage on node"
4013 " %s for device %s" % (tgt_node, dev.iv_name))
4015 #cfg.Update(instance)
4017 # ok, we created the new LVs, so now we know we have the needed
4018 # storage; as such, we proceed on the target node to rename
4019 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4020 # using the assumption that logical_id == physical_id (which in
4021 # turn is the unique_id on that node)
4023 # FIXME(iustin): use a better name for the replaced LVs
4024 temp_suffix = int(time.time())
4025 ren_fn = lambda d, suff: (d.physical_id[0],
4026 d.physical_id[1] + "_replaced-%s" % suff)
4027 # build the rename list based on what LVs exist on the node
4029 for to_ren in old_lvs:
4030 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4031 if find_res is not None: # device exists
4032 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4034 info("renaming the old LVs on the target node")
4035 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4036 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4037 # now we rename the new LVs to the old LVs
4038 info("renaming the new LVs on the target node")
4039 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4040 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4041 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4043 for old, new in zip(old_lvs, new_lvs):
4044 new.logical_id = old.logical_id
4045 cfg.SetDiskID(new, tgt_node)
4047 for disk in old_lvs:
4048 disk.logical_id = ren_fn(disk, temp_suffix)
4049 cfg.SetDiskID(disk, tgt_node)
4051 # now that the new lvs have the old name, we can add them to the device
4052 info("adding new mirror component on %s" % tgt_node)
4053 if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4054 for new_lv in new_lvs:
4055 if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4056 warning("Can't rollback device %s", hint="manually cleanup unused"
4058 raise errors.OpExecError("Can't add local storage to drbd")
4060 dev.children = new_lvs
4061 cfg.Update(instance)
4063 # Step: wait for sync
4065 # this can fail as the old devices are degraded and _WaitForSync
4066 # does a combined result over all disks, so we don't check its
4068 self.proc.LogStep(5, steps_total, "sync devices")
4069 _WaitForSync(self, instance, unlock=True)
4071 # so check manually all the devices
4072 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4073 cfg.SetDiskID(dev, instance.primary_node)
4074 is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4076 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4078 # Step: remove old storage
4079 self.proc.LogStep(6, steps_total, "removing old storage")
4080 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4081 info("remove logical volumes for %s" % name)
4083 cfg.SetDiskID(lv, tgt_node)
4084 if not self.rpc.call_blockdev_remove(tgt_node, lv):
4085 warning("Can't remove old LV", hint="manually remove unused LVs")
4088 def _ExecD8Secondary(self, feedback_fn):
4089 """Replace the secondary node for drbd8.
4091 The algorithm for replace is quite complicated:
4092 - for all disks of the instance:
4093 - create new LVs on the new node with same names
4094 - shutdown the drbd device on the old secondary
4095 - disconnect the drbd network on the primary
4096 - create the drbd device on the new secondary
4097 - network attach the drbd on the primary, using an artifice:
4098 the drbd code for Attach() will connect to the network if it
4099 finds a device which is connected to the good local disks but
4101 - wait for sync across all devices
4102 - remove all disks from the old secondary
4104 Failures are not very well handled.
4108 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4109 instance = self.instance
4111 vgname = self.cfg.GetVGName()
4114 old_node = self.tgt_node
4115 new_node = self.new_node
4116 pri_node = instance.primary_node
4118 # Step: check device activation
4119 self.proc.LogStep(1, steps_total, "check device existence")
4120 info("checking volume groups")
4121 my_vg = cfg.GetVGName()
4122 results = self.rpc.call_vg_list([pri_node, new_node])
4124 raise errors.OpExecError("Can't list volume groups on the nodes")
4125 for node in pri_node, new_node:
4126 res = results.get(node, False)
4127 if not res or my_vg not in res:
4128 raise errors.OpExecError("Volume group '%s' not found on %s" %
4130 for dev in instance.disks:
4131 if not dev.iv_name in self.op.disks:
4133 info("checking %s on %s" % (dev.iv_name, pri_node))
4134 cfg.SetDiskID(dev, pri_node)
4135 if not self.rpc.call_blockdev_find(pri_node, dev):
4136 raise errors.OpExecError("Can't find device %s on node %s" %
4137 (dev.iv_name, pri_node))
4139 # Step: check other node consistency
4140 self.proc.LogStep(2, steps_total, "check peer consistency")
4141 for dev in instance.disks:
4142 if not dev.iv_name in self.op.disks:
4144 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4145 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4146 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4147 " unsafe to replace the secondary" %
4150 # Step: create new storage
4151 self.proc.LogStep(3, steps_total, "allocate new storage")
4152 for dev in instance.disks:
4154 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4155 # since we *always* want to create this LV, we use the
4156 # _Create...OnPrimary (which forces the creation), even if we
4157 # are talking about the secondary node
4158 for new_lv in dev.children:
4159 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4160 _GetInstanceInfoText(instance)):
4161 raise errors.OpExecError("Failed to create new LV named '%s' on"
4163 (new_lv.logical_id[1], new_node))
4166 # Step 4: dbrd minors and drbd setups changes
4167 # after this, we must manually remove the drbd minors on both the
4168 # error and the success paths
4169 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4171 logging.debug("Allocated minors %s" % (minors,))
4172 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4173 for dev, new_minor in zip(instance.disks, minors):
4175 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4176 # create new devices on new_node
4177 if pri_node == dev.logical_id[0]:
4178 new_logical_id = (pri_node, new_node,
4179 dev.logical_id[2], dev.logical_id[3], new_minor,
4182 new_logical_id = (new_node, pri_node,
4183 dev.logical_id[2], new_minor, dev.logical_id[4],
4185 iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4186 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4188 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4189 logical_id=new_logical_id,
4190 children=dev.children)
4191 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4193 _GetInstanceInfoText(instance)):
4194 self.cfg.ReleaseDRBDMinors(instance.name)
4195 raise errors.OpExecError("Failed to create new DRBD on"
4196 " node '%s'" % new_node)
4198 for dev in instance.disks:
4199 # we have new devices, shutdown the drbd on the old secondary
4200 info("shutting down drbd for %s on old node" % dev.iv_name)
4201 cfg.SetDiskID(dev, old_node)
4202 if not self.rpc.call_blockdev_shutdown(old_node, dev):
4203 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4204 hint="Please cleanup this device manually as soon as possible")
4206 info("detaching primary drbds from the network (=> standalone)")
4208 for dev in instance.disks:
4209 cfg.SetDiskID(dev, pri_node)
4210 # set the network part of the physical (unique in bdev terms) id
4211 # to None, meaning detach from network
4212 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4213 # and 'find' the device, which will 'fix' it to match the
4215 if self.rpc.call_blockdev_find(pri_node, dev):
4218 warning("Failed to detach drbd %s from network, unusual case" %
4222 # no detaches succeeded (very unlikely)
4223 self.cfg.ReleaseDRBDMinors(instance.name)
4224 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4226 # if we managed to detach at least one, we update all the disks of
4227 # the instance to point to the new secondary
4228 info("updating instance configuration")
4229 for dev, _, new_logical_id in iv_names.itervalues():
4230 dev.logical_id = new_logical_id
4231 cfg.SetDiskID(dev, pri_node)
4232 cfg.Update(instance)
4233 # we can remove now the temp minors as now the new values are
4234 # written to the config file (and therefore stable)
4235 self.cfg.ReleaseDRBDMinors(instance.name)
4237 # and now perform the drbd attach
4238 info("attaching primary drbds to new secondary (standalone => connected)")
4240 for dev in instance.disks:
4241 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4242 # since the attach is smart, it's enough to 'find' the device,
4243 # it will automatically activate the network, if the physical_id
4245 cfg.SetDiskID(dev, pri_node)
4246 logging.debug("Disk to attach: %s", dev)
4247 if not self.rpc.call_blockdev_find(pri_node, dev):
4248 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4249 "please do a gnt-instance info to see the status of disks")
4251 # this can fail as the old devices are degraded and _WaitForSync
4252 # does a combined result over all disks, so we don't check its
4254 self.proc.LogStep(5, steps_total, "sync devices")
4255 _WaitForSync(self, instance, unlock=True)
4257 # so check manually all the devices
4258 for name, (dev, old_lvs, _) in iv_names.iteritems():
4259 cfg.SetDiskID(dev, pri_node)
4260 is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4262 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4264 self.proc.LogStep(6, steps_total, "removing old storage")
4265 for name, (dev, old_lvs, _) in iv_names.iteritems():
4266 info("remove logical volumes for %s" % name)
4268 cfg.SetDiskID(lv, old_node)
4269 if not self.rpc.call_blockdev_remove(old_node, lv):
4270 warning("Can't remove LV on old secondary",
4271 hint="Cleanup stale volumes by hand")
4273 def Exec(self, feedback_fn):
4274 """Execute disk replacement.
4276 This dispatches the disk replacement to the appropriate handler.
4279 instance = self.instance
4281 # Activate the instance disks if we're replacing them on a down instance
4282 if instance.status == "down":
4283 _StartInstanceDisks(self, instance, True)
4285 if instance.disk_template == constants.DT_DRBD8:
4286 if self.op.remote_node is None:
4287 fn = self._ExecD8DiskOnly
4289 fn = self._ExecD8Secondary
4291 raise errors.ProgrammerError("Unhandled disk replacement case")
4293 ret = fn(feedback_fn)
4295 # Deactivate the instance disks if we're replacing them on a down instance
4296 if instance.status == "down":
4297 _SafeShutdownInstanceDisks(self, instance)
4302 class LUGrowDisk(LogicalUnit):
4303 """Grow a disk of an instance.
4307 HTYPE = constants.HTYPE_INSTANCE
4308 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4311 def ExpandNames(self):
4312 self._ExpandAndLockInstance()
4313 self.needed_locks[locking.LEVEL_NODE] = []
4314 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4316 def DeclareLocks(self, level):
4317 if level == locking.LEVEL_NODE:
4318 self._LockInstancesNodes()
4320 def BuildHooksEnv(self):
4323 This runs on the master, the primary and all the secondaries.
4327 "DISK": self.op.disk,
4328 "AMOUNT": self.op.amount,
4330 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4332 self.cfg.GetMasterNode(),
4333 self.instance.primary_node,
4337 def CheckPrereq(self):
4338 """Check prerequisites.
4340 This checks that the instance is in the cluster.
4343 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4344 assert instance is not None, \
4345 "Cannot retrieve locked instance %s" % self.op.instance_name
4347 self.instance = instance
4349 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4350 raise errors.OpPrereqError("Instance's disk layout does not support"
4353 if instance.FindDisk(self.op.disk) is None:
4354 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4355 (self.op.disk, instance.name))
4357 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4358 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4359 instance.hypervisor)
4360 for node in nodenames:
4361 info = nodeinfo.get(node, None)
4363 raise errors.OpPrereqError("Cannot get current information"
4364 " from node '%s'" % node)
4365 vg_free = info.get('vg_free', None)
4366 if not isinstance(vg_free, int):
4367 raise errors.OpPrereqError("Can't compute free disk space on"
4369 if self.op.amount > info['vg_free']:
4370 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4371 " %d MiB available, %d MiB required" %
4372 (node, info['vg_free'], self.op.amount))
4374 def Exec(self, feedback_fn):
4375 """Execute disk grow.
4378 instance = self.instance
4379 disk = instance.FindDisk(self.op.disk)
4380 for node in (instance.secondary_nodes + (instance.primary_node,)):
4381 self.cfg.SetDiskID(disk, node)
4382 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4383 if (not result or not isinstance(result, (list, tuple)) or
4385 raise errors.OpExecError("grow request failed to node %s" % node)
4387 raise errors.OpExecError("grow request failed to node %s: %s" %
4389 disk.RecordGrow(self.op.amount)
4390 self.cfg.Update(instance)
4391 if self.op.wait_for_sync:
4392 disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4394 logging.error("Warning: disk sync-ing has not returned a good"
4395 " status.\nPlease check the instance.")
4398 class LUQueryInstanceData(NoHooksLU):
4399 """Query runtime instance data.
4402 _OP_REQP = ["instances", "static"]
4405 def ExpandNames(self):
4406 self.needed_locks = {}
4407 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4409 if not isinstance(self.op.instances, list):
4410 raise errors.OpPrereqError("Invalid argument type 'instances'")
4412 if self.op.instances:
4413 self.wanted_names = []
4414 for name in self.op.instances:
4415 full_name = self.cfg.ExpandInstanceName(name)
4416 if full_name is None:
4417 raise errors.OpPrereqError("Instance '%s' not known" %
4418 self.op.instance_name)
4419 self.wanted_names.append(full_name)
4420 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4422 self.wanted_names = None
4423 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4425 self.needed_locks[locking.LEVEL_NODE] = []
4426 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4428 def DeclareLocks(self, level):
4429 if level == locking.LEVEL_NODE:
4430 self._LockInstancesNodes()
4432 def CheckPrereq(self):
4433 """Check prerequisites.
4435 This only checks the optional instance list against the existing names.
4438 if self.wanted_names is None:
4439 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4441 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4442 in self.wanted_names]
4445 def _ComputeDiskStatus(self, instance, snode, dev):
4446 """Compute block device status.
4449 static = self.op.static
4451 self.cfg.SetDiskID(dev, instance.primary_node)
4452 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4456 if dev.dev_type in constants.LDS_DRBD:
4457 # we change the snode then (otherwise we use the one passed in)
4458 if dev.logical_id[0] == instance.primary_node:
4459 snode = dev.logical_id[1]
4461 snode = dev.logical_id[0]
4463 if snode and not static:
4464 self.cfg.SetDiskID(dev, snode)
4465 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4470 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4471 for child in dev.children]
4476 "iv_name": dev.iv_name,
4477 "dev_type": dev.dev_type,
4478 "logical_id": dev.logical_id,
4479 "physical_id": dev.physical_id,
4480 "pstatus": dev_pstatus,
4481 "sstatus": dev_sstatus,
4482 "children": dev_children,
4487 def Exec(self, feedback_fn):
4488 """Gather and return data"""
4491 cluster = self.cfg.GetClusterInfo()
4493 for instance in self.wanted_instances:
4494 if not self.op.static:
4495 remote_info = self.rpc.call_instance_info(instance.primary_node,
4497 instance.hypervisor)
4498 if remote_info and "state" in remote_info:
4501 remote_state = "down"
4504 if instance.status == "down":
4505 config_state = "down"
4509 disks = [self._ComputeDiskStatus(instance, None, device)
4510 for device in instance.disks]
4513 "name": instance.name,
4514 "config_state": config_state,
4515 "run_state": remote_state,
4516 "pnode": instance.primary_node,
4517 "snodes": instance.secondary_nodes,
4519 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4521 "hypervisor": instance.hypervisor,
4522 "network_port": instance.network_port,
4523 "hv_instance": instance.hvparams,
4524 "hv_actual": cluster.FillHV(instance),
4525 "be_instance": instance.beparams,
4526 "be_actual": cluster.FillBE(instance),
4529 result[instance.name] = idict
4534 class LUSetInstanceParams(LogicalUnit):
4535 """Modifies an instances's parameters.
4538 HPATH = "instance-modify"
4539 HTYPE = constants.HTYPE_INSTANCE
4540 _OP_REQP = ["instance_name", "hvparams"]
4543 def ExpandNames(self):
4544 self._ExpandAndLockInstance()
4545 self.needed_locks[locking.LEVEL_NODE] = []
4546 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4549 def DeclareLocks(self, level):
4550 if level == locking.LEVEL_NODE:
4551 self._LockInstancesNodes()
4553 def BuildHooksEnv(self):
4556 This runs on the master, primary and secondaries.
4560 if constants.BE_MEMORY in self.be_new:
4561 args['memory'] = self.be_new[constants.BE_MEMORY]
4562 if constants.BE_VCPUS in self.be_new:
4563 args['vcpus'] = self.be_new[constants.BE_VCPUS]
4564 if self.do_ip or self.do_bridge or self.mac:
4568 ip = self.instance.nics[0].ip
4570 bridge = self.bridge
4572 bridge = self.instance.nics[0].bridge
4576 mac = self.instance.nics[0].mac
4577 args['nics'] = [(ip, bridge, mac)]
4578 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4579 nl = [self.cfg.GetMasterNode(),
4580 self.instance.primary_node] + list(self.instance.secondary_nodes)
4583 def CheckPrereq(self):
4584 """Check prerequisites.
4586 This only checks the instance list against the existing names.
4589 # FIXME: all the parameters could be checked before, in ExpandNames, or in
4590 # a separate CheckArguments function, if we implement one, so the operation
4591 # can be aborted without waiting for any lock, should it have an error...
4592 self.ip = getattr(self.op, "ip", None)
4593 self.mac = getattr(self.op, "mac", None)
4594 self.bridge = getattr(self.op, "bridge", None)
4595 self.kernel_path = getattr(self.op, "kernel_path", None)
4596 self.initrd_path = getattr(self.op, "initrd_path", None)
4597 self.force = getattr(self.op, "force", None)
4598 all_parms = [self.ip, self.bridge, self.mac]
4599 if (all_parms.count(None) == len(all_parms) and
4600 not self.op.hvparams and
4601 not self.op.beparams):
4602 raise errors.OpPrereqError("No changes submitted")
4603 for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4604 val = self.op.beparams.get(item, None)
4608 except ValueError, err:
4609 raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4610 self.op.beparams[item] = val
4611 if self.ip is not None:
4613 if self.ip.lower() == "none":
4616 if not utils.IsValidIP(self.ip):
4617 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4620 self.do_bridge = (self.bridge is not None)
4621 if self.mac is not None:
4622 if self.cfg.IsMacInUse(self.mac):
4623 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4625 if not utils.IsValidMac(self.mac):
4626 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4628 # checking the new params on the primary/secondary nodes
4630 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4631 assert self.instance is not None, \
4632 "Cannot retrieve locked instance %s" % self.op.instance_name
4633 pnode = self.instance.primary_node
4635 nodelist.extend(instance.secondary_nodes)
4637 # hvparams processing
4638 if self.op.hvparams:
4639 i_hvdict = copy.deepcopy(instance.hvparams)
4640 for key, val in self.op.hvparams.iteritems():
4648 cluster = self.cfg.GetClusterInfo()
4649 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4652 hypervisor.GetHypervisor(
4653 instance.hypervisor).CheckParameterSyntax(hv_new)
4654 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4655 self.hv_new = hv_new # the new actual values
4656 self.hv_inst = i_hvdict # the new dict (without defaults)
4658 self.hv_new = self.hv_inst = {}
4660 # beparams processing
4661 if self.op.beparams:
4662 i_bedict = copy.deepcopy(instance.beparams)
4663 for key, val in self.op.beparams.iteritems():
4671 cluster = self.cfg.GetClusterInfo()
4672 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4674 self.be_new = be_new # the new actual values
4675 self.be_inst = i_bedict # the new dict (without defaults)
4677 self.hv_new = self.hv_inst = {}
4681 if constants.BE_MEMORY in self.op.beparams and not self.force:
4682 mem_check_list = [pnode]
4683 if be_new[constants.BE_AUTO_BALANCE]:
4684 # either we changed auto_balance to yes or it was from before
4685 mem_check_list.extend(instance.secondary_nodes)
4686 instance_info = self.rpc.call_instance_info(pnode, instance.name,
4687 instance.hypervisor)
4688 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4689 instance.hypervisor)
4691 if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4692 # Assume the primary node is unreachable and go ahead
4693 self.warn.append("Can't get info from primary node %s" % pnode)
4696 current_mem = instance_info['memory']
4698 # Assume instance not running
4699 # (there is a slight race condition here, but it's not very probable,
4700 # and we have no other way to check)
4702 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4703 nodeinfo[pnode]['memory_free'])
4705 raise errors.OpPrereqError("This change will prevent the instance"
4706 " from starting, due to %d MB of memory"
4707 " missing on its primary node" % miss_mem)
4709 if be_new[constants.BE_AUTO_BALANCE]:
4710 for node in instance.secondary_nodes:
4711 if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4712 self.warn.append("Can't get info from secondary node %s" % node)
4713 elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4714 self.warn.append("Not enough memory to failover instance to"
4715 " secondary node %s" % node)
4719 def Exec(self, feedback_fn):
4720 """Modifies an instance.
4722 All parameters take effect only at the next restart of the instance.
4724 # Process here the warnings from CheckPrereq, as we don't have a
4725 # feedback_fn there.
4726 for warn in self.warn:
4727 feedback_fn("WARNING: %s" % warn)
4730 instance = self.instance
4732 instance.nics[0].ip = self.ip
4733 result.append(("ip", self.ip))
4735 instance.nics[0].bridge = self.bridge
4736 result.append(("bridge", self.bridge))
4738 instance.nics[0].mac = self.mac
4739 result.append(("mac", self.mac))
4740 if self.op.hvparams:
4741 instance.hvparams = self.hv_new
4742 for key, val in self.op.hvparams.iteritems():
4743 result.append(("hv/%s" % key, val))
4744 if self.op.beparams:
4745 instance.beparams = self.be_inst
4746 for key, val in self.op.beparams.iteritems():
4747 result.append(("be/%s" % key, val))
4749 self.cfg.Update(instance)
4754 class LUQueryExports(NoHooksLU):
4755 """Query the exports list
4758 _OP_REQP = ['nodes']
4761 def ExpandNames(self):
4762 self.needed_locks = {}
4763 self.share_locks[locking.LEVEL_NODE] = 1
4764 if not self.op.nodes:
4765 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4767 self.needed_locks[locking.LEVEL_NODE] = \
4768 _GetWantedNodes(self, self.op.nodes)
4770 def CheckPrereq(self):
4771 """Check prerequisites.
4774 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4776 def Exec(self, feedback_fn):
4777 """Compute the list of all the exported system images.
4780 a dictionary with the structure node->(export-list)
4781 where export-list is a list of the instances exported on
4785 return self.rpc.call_export_list(self.nodes)
4788 class LUExportInstance(LogicalUnit):
4789 """Export an instance to an image in the cluster.
4792 HPATH = "instance-export"
4793 HTYPE = constants.HTYPE_INSTANCE
4794 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4797 def ExpandNames(self):
4798 self._ExpandAndLockInstance()
4799 # FIXME: lock only instance primary and destination node
4801 # Sad but true, for now we have do lock all nodes, as we don't know where
4802 # the previous export might be, and and in this LU we search for it and
4803 # remove it from its current node. In the future we could fix this by:
4804 # - making a tasklet to search (share-lock all), then create the new one,
4805 # then one to remove, after
4806 # - removing the removal operation altoghether
4807 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4809 def DeclareLocks(self, level):
4810 """Last minute lock declaration."""
4811 # All nodes are locked anyway, so nothing to do here.
4813 def BuildHooksEnv(self):
4816 This will run on the master, primary node and target node.
4820 "EXPORT_NODE": self.op.target_node,
4821 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4823 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4824 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4825 self.op.target_node]
4828 def CheckPrereq(self):
4829 """Check prerequisites.
4831 This checks that the instance and node names are valid.
4834 instance_name = self.op.instance_name
4835 self.instance = self.cfg.GetInstanceInfo(instance_name)
4836 assert self.instance is not None, \
4837 "Cannot retrieve locked instance %s" % self.op.instance_name
4839 self.dst_node = self.cfg.GetNodeInfo(
4840 self.cfg.ExpandNodeName(self.op.target_node))
4842 assert self.dst_node is not None, \
4843 "Cannot retrieve locked node %s" % self.op.target_node
4845 # instance disk type verification
4846 for disk in self.instance.disks:
4847 if disk.dev_type == constants.LD_FILE:
4848 raise errors.OpPrereqError("Export not supported for instances with"
4849 " file-based disks")
4851 def Exec(self, feedback_fn):
4852 """Export an instance to an image in the cluster.
4855 instance = self.instance
4856 dst_node = self.dst_node
4857 src_node = instance.primary_node
4858 if self.op.shutdown:
4859 # shutdown the instance, but not the disks
4860 if not self.rpc.call_instance_shutdown(src_node, instance):
4861 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4862 (instance.name, src_node))
4864 vgname = self.cfg.GetVGName()
4869 for disk in instance.disks:
4870 if disk.iv_name == "sda":
4871 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4872 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4874 if not new_dev_name:
4875 logging.error("Could not snapshot block device %s on node %s",
4876 disk.logical_id[1], src_node)
4878 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4879 logical_id=(vgname, new_dev_name),
4880 physical_id=(vgname, new_dev_name),
4881 iv_name=disk.iv_name)
4882 snap_disks.append(new_dev)
4885 if self.op.shutdown and instance.status == "up":
4886 if not self.rpc.call_instance_start(src_node, instance, None):
4887 _ShutdownInstanceDisks(self, instance)
4888 raise errors.OpExecError("Could not start instance")
4890 # TODO: check for size
4892 cluster_name = self.cfg.GetClusterName()
4893 for dev in snap_disks:
4894 if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4895 instance, cluster_name):
4896 logging.error("Could not export block device %s from node %s to"
4897 " node %s", dev.logical_id[1], src_node, dst_node.name)
4898 if not self.rpc.call_blockdev_remove(src_node, dev):
4899 logging.error("Could not remove snapshot block device %s from node"
4900 " %s", dev.logical_id[1], src_node)
4902 if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4903 logging.error("Could not finalize export for instance %s on node %s",
4904 instance.name, dst_node.name)
4906 nodelist = self.cfg.GetNodeList()
4907 nodelist.remove(dst_node.name)
4909 # on one-node clusters nodelist will be empty after the removal
4910 # if we proceed the backup would be removed because OpQueryExports
4911 # substitutes an empty list with the full cluster node list.
4913 exportlist = self.rpc.call_export_list(nodelist)
4914 for node in exportlist:
4915 if instance.name in exportlist[node]:
4916 if not self.rpc.call_export_remove(node, instance.name):
4917 logging.error("Could not remove older export for instance %s"
4918 " on node %s", instance.name, node)
4921 class LURemoveExport(NoHooksLU):
4922 """Remove exports related to the named instance.
4925 _OP_REQP = ["instance_name"]
4928 def ExpandNames(self):
4929 self.needed_locks = {}
4930 # We need all nodes to be locked in order for RemoveExport to work, but we
4931 # don't need to lock the instance itself, as nothing will happen to it (and
4932 # we can remove exports also for a removed instance)
4933 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4935 def CheckPrereq(self):
4936 """Check prerequisites.
4940 def Exec(self, feedback_fn):
4941 """Remove any export.
4944 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4945 # If the instance was not found we'll try with the name that was passed in.
4946 # This will only work if it was an FQDN, though.
4948 if not instance_name:
4950 instance_name = self.op.instance_name
4952 exportlist = self.rpc.call_export_list(self.acquired_locks[
4953 locking.LEVEL_NODE])
4955 for node in exportlist:
4956 if instance_name in exportlist[node]:
4958 if not self.rpc.call_export_remove(node, instance_name):
4959 logging.error("Could not remove export for instance %s"
4960 " on node %s", instance_name, node)
4962 if fqdn_warn and not found:
4963 feedback_fn("Export not found. If trying to remove an export belonging"
4964 " to a deleted instance please use its Fully Qualified"
4968 class TagsLU(NoHooksLU):
4971 This is an abstract class which is the parent of all the other tags LUs.
4975 def ExpandNames(self):
4976 self.needed_locks = {}
4977 if self.op.kind == constants.TAG_NODE:
4978 name = self.cfg.ExpandNodeName(self.op.name)
4980 raise errors.OpPrereqError("Invalid node name (%s)" %
4983 self.needed_locks[locking.LEVEL_NODE] = name
4984 elif self.op.kind == constants.TAG_INSTANCE:
4985 name = self.cfg.ExpandInstanceName(self.op.name)
4987 raise errors.OpPrereqError("Invalid instance name (%s)" %
4990 self.needed_locks[locking.LEVEL_INSTANCE] = name
4992 def CheckPrereq(self):
4993 """Check prerequisites.
4996 if self.op.kind == constants.TAG_CLUSTER:
4997 self.target = self.cfg.GetClusterInfo()
4998 elif self.op.kind == constants.TAG_NODE:
4999 self.target = self.cfg.GetNodeInfo(self.op.name)
5000 elif self.op.kind == constants.TAG_INSTANCE:
5001 self.target = self.cfg.GetInstanceInfo(self.op.name)
5003 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5007 class LUGetTags(TagsLU):
5008 """Returns the tags of a given object.
5011 _OP_REQP = ["kind", "name"]
5014 def Exec(self, feedback_fn):
5015 """Returns the tag list.
5018 return list(self.target.GetTags())
5021 class LUSearchTags(NoHooksLU):
5022 """Searches the tags for a given pattern.
5025 _OP_REQP = ["pattern"]
5028 def ExpandNames(self):
5029 self.needed_locks = {}
5031 def CheckPrereq(self):
5032 """Check prerequisites.
5034 This checks the pattern passed for validity by compiling it.
5038 self.re = re.compile(self.op.pattern)
5039 except re.error, err:
5040 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5041 (self.op.pattern, err))
5043 def Exec(self, feedback_fn):
5044 """Returns the tag list.
5048 tgts = [("/cluster", cfg.GetClusterInfo())]
5049 ilist = cfg.GetAllInstancesInfo().values()
5050 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5051 nlist = cfg.GetAllNodesInfo().values()
5052 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5054 for path, target in tgts:
5055 for tag in target.GetTags():
5056 if self.re.search(tag):
5057 results.append((path, tag))
5061 class LUAddTags(TagsLU):
5062 """Sets a tag on a given object.
5065 _OP_REQP = ["kind", "name", "tags"]
5068 def CheckPrereq(self):
5069 """Check prerequisites.
5071 This checks the type and length of the tag name and value.
5074 TagsLU.CheckPrereq(self)
5075 for tag in self.op.tags:
5076 objects.TaggableObject.ValidateTag(tag)
5078 def Exec(self, feedback_fn):
5083 for tag in self.op.tags:
5084 self.target.AddTag(tag)
5085 except errors.TagError, err:
5086 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5088 self.cfg.Update(self.target)
5089 except errors.ConfigurationError:
5090 raise errors.OpRetryError("There has been a modification to the"
5091 " config file and the operation has been"
5092 " aborted. Please retry.")
5095 class LUDelTags(TagsLU):
5096 """Delete a list of tags from a given object.
5099 _OP_REQP = ["kind", "name", "tags"]
5102 def CheckPrereq(self):
5103 """Check prerequisites.
5105 This checks that we have the given tag.
5108 TagsLU.CheckPrereq(self)
5109 for tag in self.op.tags:
5110 objects.TaggableObject.ValidateTag(tag)
5111 del_tags = frozenset(self.op.tags)
5112 cur_tags = self.target.GetTags()
5113 if not del_tags <= cur_tags:
5114 diff_tags = del_tags - cur_tags
5115 diff_names = ["'%s'" % tag for tag in diff_tags]
5117 raise errors.OpPrereqError("Tag(s) %s not found" %
5118 (",".join(diff_names)))
5120 def Exec(self, feedback_fn):
5121 """Remove the tag from the object.
5124 for tag in self.op.tags:
5125 self.target.RemoveTag(tag)
5127 self.cfg.Update(self.target)
5128 except errors.ConfigurationError:
5129 raise errors.OpRetryError("There has been a modification to the"
5130 " config file and the operation has been"
5131 " aborted. Please retry.")
5134 class LUTestDelay(NoHooksLU):
5135 """Sleep for a specified amount of time.
5137 This LU sleeps on the master and/or nodes for a specified amount of
5141 _OP_REQP = ["duration", "on_master", "on_nodes"]
5144 def ExpandNames(self):
5145 """Expand names and set required locks.
5147 This expands the node list, if any.
5150 self.needed_locks = {}
5151 if self.op.on_nodes:
5152 # _GetWantedNodes can be used here, but is not always appropriate to use
5153 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5155 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5156 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5158 def CheckPrereq(self):
5159 """Check prerequisites.
5163 def Exec(self, feedback_fn):
5164 """Do the actual sleep.
5167 if self.op.on_master:
5168 if not utils.TestDelay(self.op.duration):
5169 raise errors.OpExecError("Error during master delay test")
5170 if self.op.on_nodes:
5171 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5173 raise errors.OpExecError("Complete failure from rpc call")
5174 for node, node_result in result.items():
5176 raise errors.OpExecError("Failure during rpc call to node %s,"
5177 " result: %s" % (node, node_result))
5180 class IAllocator(object):
5181 """IAllocator framework.
5183 An IAllocator instance has three sets of attributes:
5184 - cfg that is needed to query the cluster
5185 - input data (all members of the _KEYS class attribute are required)
5186 - four buffer attributes (in|out_data|text), that represent the
5187 input (to the external script) in text and data structure format,
5188 and the output from it, again in two formats
5189 - the result variables from the script (success, info, nodes) for
5194 "mem_size", "disks", "disk_template",
5195 "os", "tags", "nics", "vcpus",
5201 def __init__(self, lu, mode, name, **kwargs):
5203 # init buffer variables
5204 self.in_text = self.out_text = self.in_data = self.out_data = None
5205 # init all input fields so that pylint is happy
5208 self.mem_size = self.disks = self.disk_template = None
5209 self.os = self.tags = self.nics = self.vcpus = None
5210 self.relocate_from = None
5212 self.required_nodes = None
5213 # init result fields
5214 self.success = self.info = self.nodes = None
5215 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5216 keyset = self._ALLO_KEYS
5217 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5218 keyset = self._RELO_KEYS
5220 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5221 " IAllocator" % self.mode)
5223 if key not in keyset:
5224 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5225 " IAllocator" % key)
5226 setattr(self, key, kwargs[key])
5228 if key not in kwargs:
5229 raise errors.ProgrammerError("Missing input parameter '%s' to"
5230 " IAllocator" % key)
5231 self._BuildInputData()
5233 def _ComputeClusterData(self):
5234 """Compute the generic allocator input data.
5236 This is the data that is independent of the actual operation.
5240 cluster_info = cfg.GetClusterInfo()
5244 "cluster_name": cfg.GetClusterName(),
5245 "cluster_tags": list(cluster_info.GetTags()),
5246 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5247 # we don't have job IDs
5251 cluster = self.cfg.GetClusterInfo()
5252 for iname in cfg.GetInstanceList():
5253 i_obj = cfg.GetInstanceInfo(iname)
5254 i_list.append((i_obj, cluster.FillBE(i_obj)))
5258 node_list = cfg.GetNodeList()
5259 # FIXME: here we have only one hypervisor information, but
5260 # instance can belong to different hypervisors
5261 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5262 cfg.GetHypervisorType())
5263 for nname in node_list:
5264 ninfo = cfg.GetNodeInfo(nname)
5265 if nname not in node_data or not isinstance(node_data[nname], dict):
5266 raise errors.OpExecError("Can't get data for node %s" % nname)
5267 remote_info = node_data[nname]
5268 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5269 'vg_size', 'vg_free', 'cpu_total']:
5270 if attr not in remote_info:
5271 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5274 remote_info[attr] = int(remote_info[attr])
5275 except ValueError, err:
5276 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5277 " %s" % (nname, attr, str(err)))
5278 # compute memory used by primary instances
5279 i_p_mem = i_p_up_mem = 0
5280 for iinfo, beinfo in i_list:
5281 if iinfo.primary_node == nname:
5282 i_p_mem += beinfo[constants.BE_MEMORY]
5283 if iinfo.status == "up":
5284 i_p_up_mem += beinfo[constants.BE_MEMORY]
5286 # compute memory used by instances
5288 "tags": list(ninfo.GetTags()),
5289 "total_memory": remote_info['memory_total'],
5290 "reserved_memory": remote_info['memory_dom0'],
5291 "free_memory": remote_info['memory_free'],
5292 "i_pri_memory": i_p_mem,
5293 "i_pri_up_memory": i_p_up_mem,
5294 "total_disk": remote_info['vg_size'],
5295 "free_disk": remote_info['vg_free'],
5296 "primary_ip": ninfo.primary_ip,
5297 "secondary_ip": ninfo.secondary_ip,
5298 "total_cpus": remote_info['cpu_total'],
5300 node_results[nname] = pnr
5301 data["nodes"] = node_results
5305 for iinfo, beinfo in i_list:
5306 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5307 for n in iinfo.nics]
5309 "tags": list(iinfo.GetTags()),
5310 "should_run": iinfo.status == "up",
5311 "vcpus": beinfo[constants.BE_VCPUS],
5312 "memory": beinfo[constants.BE_MEMORY],
5314 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5316 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5317 "disk_template": iinfo.disk_template,
5318 "hypervisor": iinfo.hypervisor,
5320 instance_data[iinfo.name] = pir
5322 data["instances"] = instance_data
5326 def _AddNewInstance(self):
5327 """Add new instance data to allocator structure.
5329 This in combination with _AllocatorGetClusterData will create the
5330 correct structure needed as input for the allocator.
5332 The checks for the completeness of the opcode must have already been
5337 if len(self.disks) != 2:
5338 raise errors.OpExecError("Only two-disk configurations supported")
5340 disk_space = _ComputeDiskSize(self.disk_template,
5341 self.disks[0]["size"], self.disks[1]["size"])
5343 if self.disk_template in constants.DTS_NET_MIRROR:
5344 self.required_nodes = 2
5346 self.required_nodes = 1
5350 "disk_template": self.disk_template,
5353 "vcpus": self.vcpus,
5354 "memory": self.mem_size,
5355 "disks": self.disks,
5356 "disk_space_total": disk_space,
5358 "required_nodes": self.required_nodes,
5360 data["request"] = request
5362 def _AddRelocateInstance(self):
5363 """Add relocate instance data to allocator structure.
5365 This in combination with _IAllocatorGetClusterData will create the
5366 correct structure needed as input for the allocator.
5368 The checks for the completeness of the opcode must have already been
5372 instance = self.lu.cfg.GetInstanceInfo(self.name)
5373 if instance is None:
5374 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5375 " IAllocator" % self.name)
5377 if instance.disk_template not in constants.DTS_NET_MIRROR:
5378 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5380 if len(instance.secondary_nodes) != 1:
5381 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5383 self.required_nodes = 1
5385 disk_space = _ComputeDiskSize(instance.disk_template,
5386 instance.disks[0].size,
5387 instance.disks[1].size)
5392 "disk_space_total": disk_space,
5393 "required_nodes": self.required_nodes,
5394 "relocate_from": self.relocate_from,
5396 self.in_data["request"] = request
5398 def _BuildInputData(self):
5399 """Build input data structures.
5402 self._ComputeClusterData()
5404 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5405 self._AddNewInstance()
5407 self._AddRelocateInstance()
5409 self.in_text = serializer.Dump(self.in_data)
5411 def Run(self, name, validate=True, call_fn=None):
5412 """Run an instance allocator and return the results.
5416 call_fn = self.lu.rpc.call_iallocator_runner
5419 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5421 if not isinstance(result, (list, tuple)) or len(result) != 4:
5422 raise errors.OpExecError("Invalid result from master iallocator runner")
5424 rcode, stdout, stderr, fail = result
5426 if rcode == constants.IARUN_NOTFOUND:
5427 raise errors.OpExecError("Can't find allocator '%s'" % name)
5428 elif rcode == constants.IARUN_FAILURE:
5429 raise errors.OpExecError("Instance allocator call failed: %s,"
5430 " output: %s" % (fail, stdout+stderr))
5431 self.out_text = stdout
5433 self._ValidateResult()
5435 def _ValidateResult(self):
5436 """Process the allocator results.
5438 This will process and if successful save the result in
5439 self.out_data and the other parameters.
5443 rdict = serializer.Load(self.out_text)
5444 except Exception, err:
5445 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5447 if not isinstance(rdict, dict):
5448 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5450 for key in "success", "info", "nodes":
5451 if key not in rdict:
5452 raise errors.OpExecError("Can't parse iallocator results:"
5453 " missing key '%s'" % key)
5454 setattr(self, key, rdict[key])
5456 if not isinstance(rdict["nodes"], list):
5457 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5459 self.out_data = rdict
5462 class LUTestAllocator(NoHooksLU):
5463 """Run allocator tests.
5465 This LU runs the allocator tests
5468 _OP_REQP = ["direction", "mode", "name"]
5470 def CheckPrereq(self):
5471 """Check prerequisites.
5473 This checks the opcode parameters depending on the director and mode test.
5476 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5477 for attr in ["name", "mem_size", "disks", "disk_template",
5478 "os", "tags", "nics", "vcpus"]:
5479 if not hasattr(self.op, attr):
5480 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5482 iname = self.cfg.ExpandInstanceName(self.op.name)
5483 if iname is not None:
5484 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5486 if not isinstance(self.op.nics, list):
5487 raise errors.OpPrereqError("Invalid parameter 'nics'")
5488 for row in self.op.nics:
5489 if (not isinstance(row, dict) or
5492 "bridge" not in row):
5493 raise errors.OpPrereqError("Invalid contents of the"
5494 " 'nics' parameter")
5495 if not isinstance(self.op.disks, list):
5496 raise errors.OpPrereqError("Invalid parameter 'disks'")
5497 if len(self.op.disks) != 2:
5498 raise errors.OpPrereqError("Only two-disk configurations supported")
5499 for row in self.op.disks:
5500 if (not isinstance(row, dict) or
5501 "size" not in row or
5502 not isinstance(row["size"], int) or
5503 "mode" not in row or
5504 row["mode"] not in ['r', 'w']):
5505 raise errors.OpPrereqError("Invalid contents of the"
5506 " 'disks' parameter")
5507 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5508 if not hasattr(self.op, "name"):
5509 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5510 fname = self.cfg.ExpandInstanceName(self.op.name)
5512 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5514 self.op.name = fname
5515 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5517 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5520 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5521 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5522 raise errors.OpPrereqError("Missing allocator name")
5523 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5524 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5527 def Exec(self, feedback_fn):
5528 """Run the allocator test.
5531 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5532 ial = IAllocator(self,
5535 mem_size=self.op.mem_size,
5536 disks=self.op.disks,
5537 disk_template=self.op.disk_template,
5541 vcpus=self.op.vcpus,
5544 ial = IAllocator(self,
5547 relocate_from=list(self.relocate_from),
5550 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5551 result = ial.in_text
5553 ial.Run(self.op.allocator, validate=False)
5554 result = ial.out_text