4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_MASTER: the LU needs to run on the master node
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
70 def __init__(self, processor, op, context, rpc):
71 """Constructor for LogicalUnit.
73 This needs to be overriden in derived classes in order to check op
79 self.cfg = context.cfg
80 self.context = context
82 # Dicts used to declare locking needs to mcpu
83 self.needed_locks = None
84 self.acquired_locks = {}
85 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
87 self.remove_locks = {}
88 # Used to force good behavior when calling helper functions
89 self.recalculate_locks = {}
92 self.LogWarning = processor.LogWarning
93 self.LogInfo = processor.LogInfo
95 for attr_name in self._OP_REQP:
96 attr_val = getattr(op, attr_name, None)
98 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 if not self.cfg.IsCluster():
102 raise errors.OpPrereqError("Cluster not initialized yet,"
103 " use 'gnt-cluster init' first.")
105 master = self.cfg.GetMasterNode()
106 if master != utils.HostInfo().name:
107 raise errors.OpPrereqError("Commands must be run on the master"
111 """Returns the SshRunner object
115 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
118 ssh = property(fget=__GetSSH)
120 def ExpandNames(self):
121 """Expand names for this LU.
123 This method is called before starting to execute the opcode, and it should
124 update all the parameters of the opcode to their canonical form (e.g. a
125 short node name must be fully expanded after this method has successfully
126 completed). This way locking, hooks, logging, ecc. can work correctly.
128 LUs which implement this method must also populate the self.needed_locks
129 member, as a dict with lock levels as keys, and a list of needed lock names
131 - Use an empty dict if you don't need any lock
132 - If you don't need any lock at a particular level omit that level
133 - Don't put anything for the BGL level
134 - If you want all locks at a level use locking.ALL_SET as a value
136 If you need to share locks (rather than acquire them exclusively) at one
137 level you can modify self.share_locks, setting a true value (usually 1) for
138 that level. By default locks are not shared.
141 # Acquire all nodes and one instance
142 self.needed_locks = {
143 locking.LEVEL_NODE: locking.ALL_SET,
144 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
146 # Acquire just two nodes
147 self.needed_locks = {
148 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
151 self.needed_locks = {} # No, you can't leave it to the default value None
154 # The implementation of this method is mandatory only if the new LU is
155 # concurrent, so that old LUs don't need to be changed all at the same
158 self.needed_locks = {} # Exclusive LUs don't need locks.
160 raise NotImplementedError
162 def DeclareLocks(self, level):
163 """Declare LU locking needs for a level
165 While most LUs can just declare their locking needs at ExpandNames time,
166 sometimes there's the need to calculate some locks after having acquired
167 the ones before. This function is called just before acquiring locks at a
168 particular level, but after acquiring the ones at lower levels, and permits
169 such calculations. It can be used to modify self.needed_locks, and by
170 default it does nothing.
172 This function is only called if you have something already set in
173 self.needed_locks for the level.
175 @param level: Locking level which is going to be locked
176 @type level: member of ganeti.locking.LEVELS
180 def CheckPrereq(self):
181 """Check prerequisites for this LU.
183 This method should check that the prerequisites for the execution
184 of this LU are fulfilled. It can do internode communication, but
185 it should be idempotent - no cluster or system changes are
188 The method should raise errors.OpPrereqError in case something is
189 not fulfilled. Its return value is ignored.
191 This method should also update all the parameters of the opcode to
192 their canonical form if it hasn't been done by ExpandNames before.
195 raise NotImplementedError
197 def Exec(self, feedback_fn):
200 This method should implement the actual work. It should raise
201 errors.OpExecError for failures that are somewhat dealt with in
205 raise NotImplementedError
207 def BuildHooksEnv(self):
208 """Build hooks environment for this LU.
210 This method should return a three-node tuple consisting of: a dict
211 containing the environment that will be used for running the
212 specific hook for this LU, a list of node names on which the hook
213 should run before the execution, and a list of node names on which
214 the hook should run after the execution.
216 The keys of the dict must not have 'GANETI_' prefixed as this will
217 be handled in the hooks runner. Also note additional keys will be
218 added by the hooks runner. If the LU doesn't define any
219 environment, an empty dict (and not None) should be returned.
221 No nodes should be returned as an empty list (and not None).
223 Note that if the HPATH for a LU class is None, this function will
227 raise NotImplementedError
229 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
230 """Notify the LU about the results of its hooks.
232 This method is called every time a hooks phase is executed, and notifies
233 the Logical Unit about the hooks' result. The LU can then use it to alter
234 its result based on the hooks. By default the method does nothing and the
235 previous result is passed back unchanged but any LU can define it if it
236 wants to use the local cluster hook-scripts somehow.
239 phase: the hooks phase that has just been run
240 hooks_results: the results of the multi-node hooks rpc call
241 feedback_fn: function to send feedback back to the caller
242 lu_result: the previous result this LU had, or None in the PRE phase.
247 def _ExpandAndLockInstance(self):
248 """Helper function to expand and lock an instance.
250 Many LUs that work on an instance take its name in self.op.instance_name
251 and need to expand it and then declare the expanded name for locking. This
252 function does it, and then updates self.op.instance_name to the expanded
253 name. It also initializes needed_locks as a dict, if this hasn't been done
257 if self.needed_locks is None:
258 self.needed_locks = {}
260 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
261 "_ExpandAndLockInstance called with instance-level locks set"
262 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
263 if expanded_name is None:
264 raise errors.OpPrereqError("Instance '%s' not known" %
265 self.op.instance_name)
266 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
267 self.op.instance_name = expanded_name
269 def _LockInstancesNodes(self, primary_only=False):
270 """Helper function to declare instances' nodes for locking.
272 This function should be called after locking one or more instances to lock
273 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
274 with all primary or secondary nodes for instances already locked and
275 present in self.needed_locks[locking.LEVEL_INSTANCE].
277 It should be called from DeclareLocks, and for safety only works if
278 self.recalculate_locks[locking.LEVEL_NODE] is set.
280 In the future it may grow parameters to just lock some instance's nodes, or
281 to just lock primaries or secondary nodes, if needed.
283 If should be called in DeclareLocks in a way similar to:
285 if level == locking.LEVEL_NODE:
286 self._LockInstancesNodes()
288 @type primary_only: boolean
289 @param primary_only: only lock primary nodes of locked instances
292 assert locking.LEVEL_NODE in self.recalculate_locks, \
293 "_LockInstancesNodes helper function called with no nodes to recalculate"
295 # TODO: check if we're really been called with the instance locks held
297 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
298 # future we might want to have different behaviors depending on the value
299 # of self.recalculate_locks[locking.LEVEL_NODE]
301 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
302 instance = self.context.cfg.GetInstanceInfo(instance_name)
303 wanted_nodes.append(instance.primary_node)
305 wanted_nodes.extend(instance.secondary_nodes)
307 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
308 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
309 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
310 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
312 del self.recalculate_locks[locking.LEVEL_NODE]
315 class NoHooksLU(LogicalUnit):
316 """Simple LU which runs no hooks.
318 This LU is intended as a parent for other LogicalUnits which will
319 run no hooks, in order to reduce duplicate code.
326 class _FieldSet(object):
327 """A simple field set.
329 Among the features are:
330 - checking if a string is among a list of static string or regex objects
331 - checking if a whole list of string matches
332 - returning the matching groups from a regex match
334 Internally, all fields are held as regular expression objects.
337 def __init__(self, *items):
338 self.items = [re.compile("^%s$" % value) for value in items]
340 def Extend(self, other_set):
341 """Extend the field set with the items from another one"""
342 self.items.extend(other_set.items)
344 def Matches(self, field):
345 """Checks if a field matches the current set
348 @param field: the string to match
349 @return: either False or a regular expression match object
352 for m in itertools.ifilter(None, (val.match(field) for val in self.items)):
356 def NonMatching(self, items):
357 """Returns the list of fields not matching the current set
360 @param items: the list of fields to check
362 @return: list of non-matching fields
365 return [val for val in items if not self.Matches(val)]
368 def _GetWantedNodes(lu, nodes):
369 """Returns list of checked and expanded node names.
372 nodes: List of nodes (strings) or None for all
375 if not isinstance(nodes, list):
376 raise errors.OpPrereqError("Invalid argument type 'nodes'")
379 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
380 " non-empty list of nodes whose name is to be expanded.")
384 node = lu.cfg.ExpandNodeName(name)
386 raise errors.OpPrereqError("No such node name '%s'" % name)
389 return utils.NiceSort(wanted)
392 def _GetWantedInstances(lu, instances):
393 """Returns list of checked and expanded instance names.
396 instances: List of instances (strings) or None for all
399 if not isinstance(instances, list):
400 raise errors.OpPrereqError("Invalid argument type 'instances'")
405 for name in instances:
406 instance = lu.cfg.ExpandInstanceName(name)
408 raise errors.OpPrereqError("No such instance name '%s'" % name)
409 wanted.append(instance)
412 wanted = lu.cfg.GetInstanceList()
413 return utils.NiceSort(wanted)
416 def _CheckOutputFields(static, dynamic, selected):
417 """Checks whether all selected fields are valid.
420 static: Static fields
421 dynamic: Dynamic fields
424 static_fields = frozenset(static)
425 dynamic_fields = frozenset(dynamic)
427 all_fields = static_fields | dynamic_fields
429 if not all_fields.issuperset(selected):
430 raise errors.OpPrereqError("Unknown output fields selected: %s"
431 % ",".join(frozenset(selected).
432 difference(all_fields)))
435 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
436 memory, vcpus, nics):
437 """Builds instance related env variables for hooks from single variables.
440 secondary_nodes: List of secondary nodes as strings
444 "INSTANCE_NAME": name,
445 "INSTANCE_PRIMARY": primary_node,
446 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
447 "INSTANCE_OS_TYPE": os_type,
448 "INSTANCE_STATUS": status,
449 "INSTANCE_MEMORY": memory,
450 "INSTANCE_VCPUS": vcpus,
454 nic_count = len(nics)
455 for idx, (ip, bridge, mac) in enumerate(nics):
458 env["INSTANCE_NIC%d_IP" % idx] = ip
459 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
460 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
464 env["INSTANCE_NIC_COUNT"] = nic_count
469 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
470 """Builds instance related env variables for hooks from an object.
473 instance: objects.Instance object of instance
474 override: dict of values to override
476 bep = lu.cfg.GetClusterInfo().FillBE(instance)
478 'name': instance.name,
479 'primary_node': instance.primary_node,
480 'secondary_nodes': instance.secondary_nodes,
481 'os_type': instance.os,
482 'status': instance.os,
483 'memory': bep[constants.BE_MEMORY],
484 'vcpus': bep[constants.BE_VCPUS],
485 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
488 args.update(override)
489 return _BuildInstanceHookEnv(**args)
492 def _CheckInstanceBridgesExist(lu, instance):
493 """Check that the brigdes needed by an instance exist.
496 # check bridges existance
497 brlist = [nic.bridge for nic in instance.nics]
498 if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
499 raise errors.OpPrereqError("one or more target bridges %s does not"
500 " exist on destination node '%s'" %
501 (brlist, instance.primary_node))
504 class LUDestroyCluster(NoHooksLU):
505 """Logical unit for destroying the cluster.
510 def CheckPrereq(self):
511 """Check prerequisites.
513 This checks whether the cluster is empty.
515 Any errors are signalled by raising errors.OpPrereqError.
518 master = self.cfg.GetMasterNode()
520 nodelist = self.cfg.GetNodeList()
521 if len(nodelist) != 1 or nodelist[0] != master:
522 raise errors.OpPrereqError("There are still %d node(s) in"
523 " this cluster." % (len(nodelist) - 1))
524 instancelist = self.cfg.GetInstanceList()
526 raise errors.OpPrereqError("There are still %d instance(s) in"
527 " this cluster." % len(instancelist))
529 def Exec(self, feedback_fn):
530 """Destroys the cluster.
533 master = self.cfg.GetMasterNode()
534 if not self.rpc.call_node_stop_master(master, False):
535 raise errors.OpExecError("Could not disable the master role")
536 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
537 utils.CreateBackup(priv_key)
538 utils.CreateBackup(pub_key)
542 class LUVerifyCluster(LogicalUnit):
543 """Verifies the cluster status.
546 HPATH = "cluster-verify"
547 HTYPE = constants.HTYPE_CLUSTER
548 _OP_REQP = ["skip_checks"]
551 def ExpandNames(self):
552 self.needed_locks = {
553 locking.LEVEL_NODE: locking.ALL_SET,
554 locking.LEVEL_INSTANCE: locking.ALL_SET,
556 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
558 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
559 remote_version, feedback_fn):
560 """Run multiple tests against a node.
563 - compares ganeti version
564 - checks vg existance and size > 20G
565 - checks config file checksum
566 - checks ssh to other nodes
569 node: name of the node to check
570 file_list: required list of files
571 local_cksum: dictionary of local files and their checksums
574 # compares ganeti version
575 local_version = constants.PROTOCOL_VERSION
576 if not remote_version:
577 feedback_fn(" - ERROR: connection to %s failed" % (node))
580 if local_version != remote_version:
581 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
582 (local_version, node, remote_version))
585 # checks vg existance and size > 20G
589 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
593 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
594 constants.MIN_VG_SIZE)
596 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
600 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
603 # checks config file checksum
606 if 'filelist' not in node_result:
608 feedback_fn(" - ERROR: node hasn't returned file checksum data")
610 remote_cksum = node_result['filelist']
611 for file_name in file_list:
612 if file_name not in remote_cksum:
614 feedback_fn(" - ERROR: file '%s' missing" % file_name)
615 elif remote_cksum[file_name] != local_cksum[file_name]:
617 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
619 if 'nodelist' not in node_result:
621 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
623 if node_result['nodelist']:
625 for node in node_result['nodelist']:
626 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
627 (node, node_result['nodelist'][node]))
628 if 'node-net-test' not in node_result:
630 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
632 if node_result['node-net-test']:
634 nlist = utils.NiceSort(node_result['node-net-test'].keys())
636 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
637 (node, node_result['node-net-test'][node]))
639 hyp_result = node_result.get('hypervisor', None)
640 if isinstance(hyp_result, dict):
641 for hv_name, hv_result in hyp_result.iteritems():
642 if hv_result is not None:
643 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
644 (hv_name, hv_result))
647 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
648 node_instance, feedback_fn):
649 """Verify an instance.
651 This function checks to see if the required block devices are
652 available on the instance's node.
657 node_current = instanceconfig.primary_node
660 instanceconfig.MapLVsByNode(node_vol_should)
662 for node in node_vol_should:
663 for volume in node_vol_should[node]:
664 if node not in node_vol_is or volume not in node_vol_is[node]:
665 feedback_fn(" - ERROR: volume %s missing on node %s" %
669 if not instanceconfig.status == 'down':
670 if (node_current not in node_instance or
671 not instance in node_instance[node_current]):
672 feedback_fn(" - ERROR: instance %s not running on node %s" %
673 (instance, node_current))
676 for node in node_instance:
677 if (not node == node_current):
678 if instance in node_instance[node]:
679 feedback_fn(" - ERROR: instance %s should not run on node %s" %
685 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
686 """Verify if there are any unknown volumes in the cluster.
688 The .os, .swap and backup volumes are ignored. All other volumes are
694 for node in node_vol_is:
695 for volume in node_vol_is[node]:
696 if node not in node_vol_should or volume not in node_vol_should[node]:
697 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
702 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
703 """Verify the list of running instances.
705 This checks what instances are running but unknown to the cluster.
709 for node in node_instance:
710 for runninginstance in node_instance[node]:
711 if runninginstance not in instancelist:
712 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
713 (runninginstance, node))
717 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
718 """Verify N+1 Memory Resilience.
720 Check that if one single node dies we can still start all the instances it
726 for node, nodeinfo in node_info.iteritems():
727 # This code checks that every node which is now listed as secondary has
728 # enough memory to host all instances it is supposed to should a single
729 # other node in the cluster fail.
730 # FIXME: not ready for failover to an arbitrary node
731 # FIXME: does not support file-backed instances
732 # WARNING: we currently take into account down instances as well as up
733 # ones, considering that even if they're down someone might want to start
734 # them even in the event of a node failure.
735 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
737 for instance in instances:
738 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
739 if bep[constants.BE_AUTO_BALANCE]:
740 needed_mem += bep[constants.BE_MEMORY]
741 if nodeinfo['mfree'] < needed_mem:
742 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
743 " failovers should node %s fail" % (node, prinode))
747 def CheckPrereq(self):
748 """Check prerequisites.
750 Transform the list of checks we're going to skip into a set and check that
751 all its members are valid.
754 self.skip_set = frozenset(self.op.skip_checks)
755 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
756 raise errors.OpPrereqError("Invalid checks to be skipped specified")
758 def BuildHooksEnv(self):
761 Cluster-Verify hooks just rone in the post phase and their failure makes
762 the output be logged in the verify output and the verification to fail.
765 all_nodes = self.cfg.GetNodeList()
766 # TODO: populate the environment with useful information for verify hooks
768 return env, [], all_nodes
770 def Exec(self, feedback_fn):
771 """Verify integrity of cluster, performing various test on nodes.
775 feedback_fn("* Verifying global settings")
776 for msg in self.cfg.VerifyConfig():
777 feedback_fn(" - ERROR: %s" % msg)
779 vg_name = self.cfg.GetVGName()
780 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
781 nodelist = utils.NiceSort(self.cfg.GetNodeList())
782 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
783 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
784 i_non_redundant = [] # Non redundant instances
785 i_non_a_balanced = [] # Non auto-balanced instances
791 # FIXME: verify OS list
794 file_names.append(constants.SSL_CERT_FILE)
795 file_names.append(constants.CLUSTER_CONF_FILE)
796 local_checksums = utils.FingerprintFiles(file_names)
798 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
799 all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
800 all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
801 all_vglist = self.rpc.call_vg_list(nodelist)
802 node_verify_param = {
803 'filelist': file_names,
804 'nodelist': nodelist,
805 'hypervisor': hypervisors,
806 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
807 for node in nodeinfo]
809 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
810 self.cfg.GetClusterName())
811 all_rversion = self.rpc.call_version(nodelist)
812 all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
813 self.cfg.GetHypervisorType())
815 cluster = self.cfg.GetClusterInfo()
816 for node in nodelist:
817 feedback_fn("* Verifying node %s" % node)
818 result = self._VerifyNode(node, file_names, local_checksums,
819 all_vglist[node], all_nvinfo[node],
820 all_rversion[node], feedback_fn)
824 volumeinfo = all_volumeinfo[node]
826 if isinstance(volumeinfo, basestring):
827 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
828 (node, volumeinfo[-400:].encode('string_escape')))
830 node_volume[node] = {}
831 elif not isinstance(volumeinfo, dict):
832 feedback_fn(" - ERROR: connection to %s failed" % (node,))
836 node_volume[node] = volumeinfo
839 nodeinstance = all_instanceinfo[node]
840 if type(nodeinstance) != list:
841 feedback_fn(" - ERROR: connection to %s failed" % (node,))
845 node_instance[node] = nodeinstance
848 nodeinfo = all_ninfo[node]
849 if not isinstance(nodeinfo, dict):
850 feedback_fn(" - ERROR: connection to %s failed" % (node,))
856 "mfree": int(nodeinfo['memory_free']),
857 "dfree": int(nodeinfo['vg_free']),
860 # dictionary holding all instances this node is secondary for,
861 # grouped by their primary node. Each key is a cluster node, and each
862 # value is a list of instances which have the key as primary and the
863 # current node as secondary. this is handy to calculate N+1 memory
864 # availability if you can only failover from a primary to its
866 "sinst-by-pnode": {},
869 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
875 for instance in instancelist:
876 feedback_fn("* Verifying instance %s" % instance)
877 inst_config = self.cfg.GetInstanceInfo(instance)
878 result = self._VerifyInstance(instance, inst_config, node_volume,
879 node_instance, feedback_fn)
882 inst_config.MapLVsByNode(node_vol_should)
884 instance_cfg[instance] = inst_config
886 pnode = inst_config.primary_node
887 if pnode in node_info:
888 node_info[pnode]['pinst'].append(instance)
890 feedback_fn(" - ERROR: instance %s, connection to primary node"
891 " %s failed" % (instance, pnode))
894 # If the instance is non-redundant we cannot survive losing its primary
895 # node, so we are not N+1 compliant. On the other hand we have no disk
896 # templates with more than one secondary so that situation is not well
898 # FIXME: does not support file-backed instances
899 if len(inst_config.secondary_nodes) == 0:
900 i_non_redundant.append(instance)
901 elif len(inst_config.secondary_nodes) > 1:
902 feedback_fn(" - WARNING: multiple secondaries for instance %s"
905 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
906 i_non_a_balanced.append(instance)
908 for snode in inst_config.secondary_nodes:
909 if snode in node_info:
910 node_info[snode]['sinst'].append(instance)
911 if pnode not in node_info[snode]['sinst-by-pnode']:
912 node_info[snode]['sinst-by-pnode'][pnode] = []
913 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
915 feedback_fn(" - ERROR: instance %s, connection to secondary node"
916 " %s failed" % (instance, snode))
918 feedback_fn("* Verifying orphan volumes")
919 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
923 feedback_fn("* Verifying remaining instances")
924 result = self._VerifyOrphanInstances(instancelist, node_instance,
928 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
929 feedback_fn("* Verifying N+1 Memory redundancy")
930 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
933 feedback_fn("* Other Notes")
935 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
936 % len(i_non_redundant))
939 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
940 % len(i_non_a_balanced))
944 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
945 """Analize the post-hooks' result, handle it, and send some
946 nicely-formatted feedback back to the user.
949 phase: the hooks phase that has just been run
950 hooks_results: the results of the multi-node hooks rpc call
951 feedback_fn: function to send feedback back to the caller
952 lu_result: previous Exec result
955 # We only really run POST phase hooks, and are only interested in
957 if phase == constants.HOOKS_PHASE_POST:
958 # Used to change hooks' output to proper indentation
959 indent_re = re.compile('^', re.M)
960 feedback_fn("* Hooks Results")
961 if not hooks_results:
962 feedback_fn(" - ERROR: general communication failure")
965 for node_name in hooks_results:
966 show_node_header = True
967 res = hooks_results[node_name]
968 if res is False or not isinstance(res, list):
969 feedback_fn(" Communication failure")
972 for script, hkr, output in res:
973 if hkr == constants.HKR_FAIL:
974 # The node header is only shown once, if there are
975 # failing hooks on that node
977 feedback_fn(" Node %s:" % node_name)
978 show_node_header = False
979 feedback_fn(" ERROR: Script %s failed, output:" % script)
980 output = indent_re.sub(' ', output)
981 feedback_fn("%s" % output)
987 class LUVerifyDisks(NoHooksLU):
988 """Verifies the cluster disks status.
994 def ExpandNames(self):
995 self.needed_locks = {
996 locking.LEVEL_NODE: locking.ALL_SET,
997 locking.LEVEL_INSTANCE: locking.ALL_SET,
999 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1001 def CheckPrereq(self):
1002 """Check prerequisites.
1004 This has no prerequisites.
1009 def Exec(self, feedback_fn):
1010 """Verify integrity of cluster disks.
1013 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1015 vg_name = self.cfg.GetVGName()
1016 nodes = utils.NiceSort(self.cfg.GetNodeList())
1017 instances = [self.cfg.GetInstanceInfo(name)
1018 for name in self.cfg.GetInstanceList()]
1021 for inst in instances:
1023 if (inst.status != "up" or
1024 inst.disk_template not in constants.DTS_NET_MIRROR):
1026 inst.MapLVsByNode(inst_lvs)
1027 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1028 for node, vol_list in inst_lvs.iteritems():
1029 for vol in vol_list:
1030 nv_dict[(node, vol)] = inst
1035 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1040 lvs = node_lvs[node]
1042 if isinstance(lvs, basestring):
1043 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1044 res_nlvm[node] = lvs
1045 elif not isinstance(lvs, dict):
1046 logging.warning("Connection to node %s failed or invalid data"
1048 res_nodes.append(node)
1051 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1052 inst = nv_dict.pop((node, lv_name), None)
1053 if (not lv_online and inst is not None
1054 and inst.name not in res_instances):
1055 res_instances.append(inst.name)
1057 # any leftover items in nv_dict are missing LVs, let's arrange the
1059 for key, inst in nv_dict.iteritems():
1060 if inst.name not in res_missing:
1061 res_missing[inst.name] = []
1062 res_missing[inst.name].append(key)
1067 class LURenameCluster(LogicalUnit):
1068 """Rename the cluster.
1071 HPATH = "cluster-rename"
1072 HTYPE = constants.HTYPE_CLUSTER
1075 def BuildHooksEnv(self):
1080 "OP_TARGET": self.cfg.GetClusterName(),
1081 "NEW_NAME": self.op.name,
1083 mn = self.cfg.GetMasterNode()
1084 return env, [mn], [mn]
1086 def CheckPrereq(self):
1087 """Verify that the passed name is a valid one.
1090 hostname = utils.HostInfo(self.op.name)
1092 new_name = hostname.name
1093 self.ip = new_ip = hostname.ip
1094 old_name = self.cfg.GetClusterName()
1095 old_ip = self.cfg.GetMasterIP()
1096 if new_name == old_name and new_ip == old_ip:
1097 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1098 " cluster has changed")
1099 if new_ip != old_ip:
1100 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1101 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1102 " reachable on the network. Aborting." %
1105 self.op.name = new_name
1107 def Exec(self, feedback_fn):
1108 """Rename the cluster.
1111 clustername = self.op.name
1114 # shutdown the master IP
1115 master = self.cfg.GetMasterNode()
1116 if not self.rpc.call_node_stop_master(master, False):
1117 raise errors.OpExecError("Could not disable the master role")
1122 ss.SetKey(ss.SS_MASTER_IP, ip)
1123 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1125 # Distribute updated ss config to all nodes
1126 myself = self.cfg.GetNodeInfo(master)
1127 dist_nodes = self.cfg.GetNodeList()
1128 if myself.name in dist_nodes:
1129 dist_nodes.remove(myself.name)
1131 logging.debug("Copying updated ssconf data to all nodes")
1132 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1133 fname = ss.KeyToFilename(keyname)
1134 result = self.rpc.call_upload_file(dist_nodes, fname)
1135 for to_node in dist_nodes:
1136 if not result[to_node]:
1137 self.LogWarning("Copy of file %s to node %s failed",
1140 if not self.rpc.call_node_start_master(master, False):
1141 self.LogWarning("Could not re-enable the master role on"
1142 " the master, please restart manually.")
1145 def _RecursiveCheckIfLVMBased(disk):
1146 """Check if the given disk or its children are lvm-based.
1149 disk: ganeti.objects.Disk object
1152 boolean indicating whether a LD_LV dev_type was found or not
1156 for chdisk in disk.children:
1157 if _RecursiveCheckIfLVMBased(chdisk):
1159 return disk.dev_type == constants.LD_LV
1162 class LUSetClusterParams(LogicalUnit):
1163 """Change the parameters of the cluster.
1166 HPATH = "cluster-modify"
1167 HTYPE = constants.HTYPE_CLUSTER
1171 def ExpandNames(self):
1172 # FIXME: in the future maybe other cluster params won't require checking on
1173 # all nodes to be modified.
1174 self.needed_locks = {
1175 locking.LEVEL_NODE: locking.ALL_SET,
1177 self.share_locks[locking.LEVEL_NODE] = 1
1179 def BuildHooksEnv(self):
1184 "OP_TARGET": self.cfg.GetClusterName(),
1185 "NEW_VG_NAME": self.op.vg_name,
1187 mn = self.cfg.GetMasterNode()
1188 return env, [mn], [mn]
1190 def CheckPrereq(self):
1191 """Check prerequisites.
1193 This checks whether the given params don't conflict and
1194 if the given volume group is valid.
1197 # FIXME: This only works because there is only one parameter that can be
1198 # changed or removed.
1199 if self.op.vg_name is not None and not self.op.vg_name:
1200 instances = self.cfg.GetAllInstancesInfo().values()
1201 for inst in instances:
1202 for disk in inst.disks:
1203 if _RecursiveCheckIfLVMBased(disk):
1204 raise errors.OpPrereqError("Cannot disable lvm storage while"
1205 " lvm-based instances exist")
1207 node_list = self.acquired_locks[locking.LEVEL_NODE]
1209 # if vg_name not None, checks given volume group on all nodes
1211 vglist = self.rpc.call_vg_list(node_list)
1212 for node in node_list:
1213 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1214 constants.MIN_VG_SIZE)
1216 raise errors.OpPrereqError("Error on node '%s': %s" %
1219 self.cluster = cluster = self.cfg.GetClusterInfo()
1220 # beparams changes do not need validation (we can't validate?),
1221 # but we still process here
1222 if self.op.beparams:
1223 self.new_beparams = cluster.FillDict(
1224 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1226 # hypervisor list/parameters
1227 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1228 if self.op.hvparams:
1229 if not isinstance(self.op.hvparams, dict):
1230 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1231 for hv_name, hv_dict in self.op.hvparams.items():
1232 if hv_name not in self.new_hvparams:
1233 self.new_hvparams[hv_name] = hv_dict
1235 self.new_hvparams[hv_name].update(hv_dict)
1237 if self.op.enabled_hypervisors is not None:
1238 self.hv_list = self.op.enabled_hypervisors
1240 self.hv_list = cluster.enabled_hypervisors
1242 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1243 # either the enabled list has changed, or the parameters have, validate
1244 for hv_name, hv_params in self.new_hvparams.items():
1245 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1246 (self.op.enabled_hypervisors and
1247 hv_name in self.op.enabled_hypervisors)):
1248 # either this is a new hypervisor, or its parameters have changed
1249 hv_class = hypervisor.GetHypervisor(hv_name)
1250 hv_class.CheckParameterSyntax(hv_params)
1251 _CheckHVParams(self, node_list, hv_name, hv_params)
1253 def Exec(self, feedback_fn):
1254 """Change the parameters of the cluster.
1257 if self.op.vg_name is not None:
1258 if self.op.vg_name != self.cfg.GetVGName():
1259 self.cfg.SetVGName(self.op.vg_name)
1261 feedback_fn("Cluster LVM configuration already in desired"
1262 " state, not changing")
1263 if self.op.hvparams:
1264 self.cluster.hvparams = self.new_hvparams
1265 if self.op.enabled_hypervisors is not None:
1266 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1267 if self.op.beparams:
1268 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1269 self.cfg.Update(self.cluster)
1272 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1273 """Sleep and poll for an instance's disk to sync.
1276 if not instance.disks:
1280 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1282 node = instance.primary_node
1284 for dev in instance.disks:
1285 lu.cfg.SetDiskID(dev, node)
1291 cumul_degraded = False
1292 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1294 lu.LogWarning("Can't get any data from node %s", node)
1297 raise errors.RemoteError("Can't contact node %s for mirror data,"
1298 " aborting." % node)
1302 for i in range(len(rstats)):
1305 lu.LogWarning("Can't compute data for node %s/%s",
1306 node, instance.disks[i].iv_name)
1308 # we ignore the ldisk parameter
1309 perc_done, est_time, is_degraded, _ = mstat
1310 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1311 if perc_done is not None:
1313 if est_time is not None:
1314 rem_time = "%d estimated seconds remaining" % est_time
1317 rem_time = "no time estimate"
1318 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1319 (instance.disks[i].iv_name, perc_done, rem_time))
1323 time.sleep(min(60, max_time))
1326 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1327 return not cumul_degraded
1330 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1331 """Check that mirrors are not degraded.
1333 The ldisk parameter, if True, will change the test from the
1334 is_degraded attribute (which represents overall non-ok status for
1335 the device(s)) to the ldisk (representing the local storage status).
1338 lu.cfg.SetDiskID(dev, node)
1345 if on_primary or dev.AssembleOnSecondary():
1346 rstats = lu.rpc.call_blockdev_find(node, dev)
1348 logging.warning("Node %s: disk degraded, not found or node down", node)
1351 result = result and (not rstats[idx])
1353 for child in dev.children:
1354 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1359 class LUDiagnoseOS(NoHooksLU):
1360 """Logical unit for OS diagnose/query.
1363 _OP_REQP = ["output_fields", "names"]
1366 def ExpandNames(self):
1368 raise errors.OpPrereqError("Selective OS query not supported")
1370 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1371 _CheckOutputFields(static=[],
1372 dynamic=self.dynamic_fields,
1373 selected=self.op.output_fields)
1375 # Lock all nodes, in shared mode
1376 self.needed_locks = {}
1377 self.share_locks[locking.LEVEL_NODE] = 1
1378 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1380 def CheckPrereq(self):
1381 """Check prerequisites.
1386 def _DiagnoseByOS(node_list, rlist):
1387 """Remaps a per-node return list into an a per-os per-node dictionary
1390 node_list: a list with the names of all nodes
1391 rlist: a map with node names as keys and OS objects as values
1394 map: a map with osnames as keys and as value another map, with
1396 keys and list of OS objects as values
1397 e.g. {"debian-etch": {"node1": [<object>,...],
1398 "node2": [<object>,]}
1403 for node_name, nr in rlist.iteritems():
1407 if os_obj.name not in all_os:
1408 # build a list of nodes for this os containing empty lists
1409 # for each node in node_list
1410 all_os[os_obj.name] = {}
1411 for nname in node_list:
1412 all_os[os_obj.name][nname] = []
1413 all_os[os_obj.name][node_name].append(os_obj)
1416 def Exec(self, feedback_fn):
1417 """Compute the list of OSes.
1420 node_list = self.acquired_locks[locking.LEVEL_NODE]
1421 node_data = self.rpc.call_os_diagnose(node_list)
1422 if node_data == False:
1423 raise errors.OpExecError("Can't gather the list of OSes")
1424 pol = self._DiagnoseByOS(node_list, node_data)
1426 for os_name, os_data in pol.iteritems():
1428 for field in self.op.output_fields:
1431 elif field == "valid":
1432 val = utils.all([osl and osl[0] for osl in os_data.values()])
1433 elif field == "node_status":
1435 for node_name, nos_list in os_data.iteritems():
1436 val[node_name] = [(v.status, v.path) for v in nos_list]
1438 raise errors.ParameterError(field)
1445 class LURemoveNode(LogicalUnit):
1446 """Logical unit for removing a node.
1449 HPATH = "node-remove"
1450 HTYPE = constants.HTYPE_NODE
1451 _OP_REQP = ["node_name"]
1453 def BuildHooksEnv(self):
1456 This doesn't run on the target node in the pre phase as a failed
1457 node would then be impossible to remove.
1461 "OP_TARGET": self.op.node_name,
1462 "NODE_NAME": self.op.node_name,
1464 all_nodes = self.cfg.GetNodeList()
1465 all_nodes.remove(self.op.node_name)
1466 return env, all_nodes, all_nodes
1468 def CheckPrereq(self):
1469 """Check prerequisites.
1472 - the node exists in the configuration
1473 - it does not have primary or secondary instances
1474 - it's not the master
1476 Any errors are signalled by raising errors.OpPrereqError.
1479 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1481 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1483 instance_list = self.cfg.GetInstanceList()
1485 masternode = self.cfg.GetMasterNode()
1486 if node.name == masternode:
1487 raise errors.OpPrereqError("Node is the master node,"
1488 " you need to failover first.")
1490 for instance_name in instance_list:
1491 instance = self.cfg.GetInstanceInfo(instance_name)
1492 if node.name == instance.primary_node:
1493 raise errors.OpPrereqError("Instance %s still running on the node,"
1494 " please remove first." % instance_name)
1495 if node.name in instance.secondary_nodes:
1496 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1497 " please remove first." % instance_name)
1498 self.op.node_name = node.name
1501 def Exec(self, feedback_fn):
1502 """Removes the node from the cluster.
1506 logging.info("Stopping the node daemon and removing configs from node %s",
1509 self.context.RemoveNode(node.name)
1511 self.rpc.call_node_leave_cluster(node.name)
1514 class LUQueryNodes(NoHooksLU):
1515 """Logical unit for querying nodes.
1518 _OP_REQP = ["output_fields", "names"]
1521 def ExpandNames(self):
1522 self.dynamic_fields = frozenset([
1524 "mtotal", "mnode", "mfree",
1529 self.static_fields = frozenset([
1530 "name", "pinst_cnt", "sinst_cnt",
1531 "pinst_list", "sinst_list",
1532 "pip", "sip", "tags",
1536 _CheckOutputFields(static=self.static_fields,
1537 dynamic=self.dynamic_fields,
1538 selected=self.op.output_fields)
1540 self.needed_locks = {}
1541 self.share_locks[locking.LEVEL_NODE] = 1
1544 self.wanted = _GetWantedNodes(self, self.op.names)
1546 self.wanted = locking.ALL_SET
1548 self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1550 # if we don't request only static fields, we need to lock the nodes
1551 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1554 def CheckPrereq(self):
1555 """Check prerequisites.
1558 # The validation of the node list is done in the _GetWantedNodes,
1559 # if non empty, and if empty, there's no validation to do
1562 def Exec(self, feedback_fn):
1563 """Computes the list of nodes and their attributes.
1566 all_info = self.cfg.GetAllNodesInfo()
1568 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1569 elif self.wanted != locking.ALL_SET:
1570 nodenames = self.wanted
1571 missing = set(nodenames).difference(all_info.keys())
1573 raise errors.OpExecError(
1574 "Some nodes were removed before retrieving their data: %s" % missing)
1576 nodenames = all_info.keys()
1578 nodenames = utils.NiceSort(nodenames)
1579 nodelist = [all_info[name] for name in nodenames]
1581 # begin data gathering
1583 if self.dynamic_fields.intersection(self.op.output_fields):
1585 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1586 self.cfg.GetHypervisorType())
1587 for name in nodenames:
1588 nodeinfo = node_data.get(name, None)
1591 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1592 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1593 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1594 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1595 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1596 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1597 "bootid": nodeinfo['bootid'],
1600 live_data[name] = {}
1602 live_data = dict.fromkeys(nodenames, {})
1604 node_to_primary = dict([(name, set()) for name in nodenames])
1605 node_to_secondary = dict([(name, set()) for name in nodenames])
1607 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1608 "sinst_cnt", "sinst_list"))
1609 if inst_fields & frozenset(self.op.output_fields):
1610 instancelist = self.cfg.GetInstanceList()
1612 for instance_name in instancelist:
1613 inst = self.cfg.GetInstanceInfo(instance_name)
1614 if inst.primary_node in node_to_primary:
1615 node_to_primary[inst.primary_node].add(inst.name)
1616 for secnode in inst.secondary_nodes:
1617 if secnode in node_to_secondary:
1618 node_to_secondary[secnode].add(inst.name)
1620 # end data gathering
1623 for node in nodelist:
1625 for field in self.op.output_fields:
1628 elif field == "pinst_list":
1629 val = list(node_to_primary[node.name])
1630 elif field == "sinst_list":
1631 val = list(node_to_secondary[node.name])
1632 elif field == "pinst_cnt":
1633 val = len(node_to_primary[node.name])
1634 elif field == "sinst_cnt":
1635 val = len(node_to_secondary[node.name])
1636 elif field == "pip":
1637 val = node.primary_ip
1638 elif field == "sip":
1639 val = node.secondary_ip
1640 elif field == "tags":
1641 val = list(node.GetTags())
1642 elif field == "serial_no":
1643 val = node.serial_no
1644 elif field in self.dynamic_fields:
1645 val = live_data[node.name].get(field, None)
1647 raise errors.ParameterError(field)
1648 node_output.append(val)
1649 output.append(node_output)
1654 class LUQueryNodeVolumes(NoHooksLU):
1655 """Logical unit for getting volumes on node(s).
1658 _OP_REQP = ["nodes", "output_fields"]
1661 def ExpandNames(self):
1662 _CheckOutputFields(static=["node"],
1663 dynamic=["phys", "vg", "name", "size", "instance"],
1664 selected=self.op.output_fields)
1666 self.needed_locks = {}
1667 self.share_locks[locking.LEVEL_NODE] = 1
1668 if not self.op.nodes:
1669 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1671 self.needed_locks[locking.LEVEL_NODE] = \
1672 _GetWantedNodes(self, self.op.nodes)
1674 def CheckPrereq(self):
1675 """Check prerequisites.
1677 This checks that the fields required are valid output fields.
1680 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1682 def Exec(self, feedback_fn):
1683 """Computes the list of nodes and their attributes.
1686 nodenames = self.nodes
1687 volumes = self.rpc.call_node_volumes(nodenames)
1689 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1690 in self.cfg.GetInstanceList()]
1692 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1695 for node in nodenames:
1696 if node not in volumes or not volumes[node]:
1699 node_vols = volumes[node][:]
1700 node_vols.sort(key=lambda vol: vol['dev'])
1702 for vol in node_vols:
1704 for field in self.op.output_fields:
1707 elif field == "phys":
1711 elif field == "name":
1713 elif field == "size":
1714 val = int(float(vol['size']))
1715 elif field == "instance":
1717 if node not in lv_by_node[inst]:
1719 if vol['name'] in lv_by_node[inst][node]:
1725 raise errors.ParameterError(field)
1726 node_output.append(str(val))
1728 output.append(node_output)
1733 class LUAddNode(LogicalUnit):
1734 """Logical unit for adding node to the cluster.
1738 HTYPE = constants.HTYPE_NODE
1739 _OP_REQP = ["node_name"]
1741 def BuildHooksEnv(self):
1744 This will run on all nodes before, and on all nodes + the new node after.
1748 "OP_TARGET": self.op.node_name,
1749 "NODE_NAME": self.op.node_name,
1750 "NODE_PIP": self.op.primary_ip,
1751 "NODE_SIP": self.op.secondary_ip,
1753 nodes_0 = self.cfg.GetNodeList()
1754 nodes_1 = nodes_0 + [self.op.node_name, ]
1755 return env, nodes_0, nodes_1
1757 def CheckPrereq(self):
1758 """Check prerequisites.
1761 - the new node is not already in the config
1763 - its parameters (single/dual homed) matches the cluster
1765 Any errors are signalled by raising errors.OpPrereqError.
1768 node_name = self.op.node_name
1771 dns_data = utils.HostInfo(node_name)
1773 node = dns_data.name
1774 primary_ip = self.op.primary_ip = dns_data.ip
1775 secondary_ip = getattr(self.op, "secondary_ip", None)
1776 if secondary_ip is None:
1777 secondary_ip = primary_ip
1778 if not utils.IsValidIP(secondary_ip):
1779 raise errors.OpPrereqError("Invalid secondary IP given")
1780 self.op.secondary_ip = secondary_ip
1782 node_list = cfg.GetNodeList()
1783 if not self.op.readd and node in node_list:
1784 raise errors.OpPrereqError("Node %s is already in the configuration" %
1786 elif self.op.readd and node not in node_list:
1787 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1789 for existing_node_name in node_list:
1790 existing_node = cfg.GetNodeInfo(existing_node_name)
1792 if self.op.readd and node == existing_node_name:
1793 if (existing_node.primary_ip != primary_ip or
1794 existing_node.secondary_ip != secondary_ip):
1795 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1796 " address configuration as before")
1799 if (existing_node.primary_ip == primary_ip or
1800 existing_node.secondary_ip == primary_ip or
1801 existing_node.primary_ip == secondary_ip or
1802 existing_node.secondary_ip == secondary_ip):
1803 raise errors.OpPrereqError("New node ip address(es) conflict with"
1804 " existing node %s" % existing_node.name)
1806 # check that the type of the node (single versus dual homed) is the
1807 # same as for the master
1808 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1809 master_singlehomed = myself.secondary_ip == myself.primary_ip
1810 newbie_singlehomed = secondary_ip == primary_ip
1811 if master_singlehomed != newbie_singlehomed:
1812 if master_singlehomed:
1813 raise errors.OpPrereqError("The master has no private ip but the"
1814 " new node has one")
1816 raise errors.OpPrereqError("The master has a private ip but the"
1817 " new node doesn't have one")
1819 # checks reachablity
1820 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1821 raise errors.OpPrereqError("Node not reachable by ping")
1823 if not newbie_singlehomed:
1824 # check reachability from my secondary ip to newbie's secondary ip
1825 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1826 source=myself.secondary_ip):
1827 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1828 " based ping to noded port")
1830 self.new_node = objects.Node(name=node,
1831 primary_ip=primary_ip,
1832 secondary_ip=secondary_ip)
1834 def Exec(self, feedback_fn):
1835 """Adds the new node to the cluster.
1838 new_node = self.new_node
1839 node = new_node.name
1841 # check connectivity
1842 result = self.rpc.call_version([node])[node]
1844 if constants.PROTOCOL_VERSION == result:
1845 logging.info("Communication to node %s fine, sw version %s match",
1848 raise errors.OpExecError("Version mismatch master version %s,"
1849 " node version %s" %
1850 (constants.PROTOCOL_VERSION, result))
1852 raise errors.OpExecError("Cannot get version from the new node")
1855 logging.info("Copy ssh key to node %s", node)
1856 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1858 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1859 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1865 keyarray.append(f.read())
1869 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1871 keyarray[3], keyarray[4], keyarray[5])
1874 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1876 # Add node to our /etc/hosts, and add key to known_hosts
1877 utils.AddHostToEtcHosts(new_node.name)
1879 if new_node.secondary_ip != new_node.primary_ip:
1880 if not self.rpc.call_node_has_ip_address(new_node.name,
1881 new_node.secondary_ip):
1882 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1883 " you gave (%s). Please fix and re-run this"
1884 " command." % new_node.secondary_ip)
1886 node_verify_list = [self.cfg.GetMasterNode()]
1887 node_verify_param = {
1889 # TODO: do a node-net-test as well?
1892 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1893 self.cfg.GetClusterName())
1894 for verifier in node_verify_list:
1895 if not result[verifier]:
1896 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1897 " for remote verification" % verifier)
1898 if result[verifier]['nodelist']:
1899 for failed in result[verifier]['nodelist']:
1900 feedback_fn("ssh/hostname verification failed %s -> %s" %
1901 (verifier, result[verifier]['nodelist'][failed]))
1902 raise errors.OpExecError("ssh/hostname verification failed.")
1904 # Distribute updated /etc/hosts and known_hosts to all nodes,
1905 # including the node just added
1906 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1907 dist_nodes = self.cfg.GetNodeList()
1908 if not self.op.readd:
1909 dist_nodes.append(node)
1910 if myself.name in dist_nodes:
1911 dist_nodes.remove(myself.name)
1913 logging.debug("Copying hosts and known_hosts to all nodes")
1914 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1915 result = self.rpc.call_upload_file(dist_nodes, fname)
1916 for to_node in dist_nodes:
1917 if not result[to_node]:
1918 logging.error("Copy of file %s to node %s failed", fname, to_node)
1921 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1922 to_copy.append(constants.VNC_PASSWORD_FILE)
1923 for fname in to_copy:
1924 result = self.rpc.call_upload_file([node], fname)
1925 if not result[node]:
1926 logging.error("Could not copy file %s to node %s", fname, node)
1929 self.context.ReaddNode(new_node)
1931 self.context.AddNode(new_node)
1934 class LUQueryClusterInfo(NoHooksLU):
1935 """Query cluster configuration.
1942 def ExpandNames(self):
1943 self.needed_locks = {}
1945 def CheckPrereq(self):
1946 """No prerequsites needed for this LU.
1951 def Exec(self, feedback_fn):
1952 """Return cluster config.
1955 cluster = self.cfg.GetClusterInfo()
1957 "software_version": constants.RELEASE_VERSION,
1958 "protocol_version": constants.PROTOCOL_VERSION,
1959 "config_version": constants.CONFIG_VERSION,
1960 "os_api_version": constants.OS_API_VERSION,
1961 "export_version": constants.EXPORT_VERSION,
1962 "architecture": (platform.architecture()[0], platform.machine()),
1963 "name": cluster.cluster_name,
1964 "master": cluster.master_node,
1965 "default_hypervisor": cluster.default_hypervisor,
1966 "enabled_hypervisors": cluster.enabled_hypervisors,
1967 "hvparams": cluster.hvparams,
1968 "beparams": cluster.beparams,
1974 class LUQueryConfigValues(NoHooksLU):
1975 """Return configuration values.
1981 def ExpandNames(self):
1982 self.needed_locks = {}
1984 static_fields = ["cluster_name", "master_node", "drain_flag"]
1985 _CheckOutputFields(static=static_fields,
1987 selected=self.op.output_fields)
1989 def CheckPrereq(self):
1990 """No prerequisites.
1995 def Exec(self, feedback_fn):
1996 """Dump a representation of the cluster config to the standard output.
2000 for field in self.op.output_fields:
2001 if field == "cluster_name":
2002 entry = self.cfg.GetClusterName()
2003 elif field == "master_node":
2004 entry = self.cfg.GetMasterNode()
2005 elif field == "drain_flag":
2006 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2008 raise errors.ParameterError(field)
2009 values.append(entry)
2013 class LUActivateInstanceDisks(NoHooksLU):
2014 """Bring up an instance's disks.
2017 _OP_REQP = ["instance_name"]
2020 def ExpandNames(self):
2021 self._ExpandAndLockInstance()
2022 self.needed_locks[locking.LEVEL_NODE] = []
2023 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2025 def DeclareLocks(self, level):
2026 if level == locking.LEVEL_NODE:
2027 self._LockInstancesNodes()
2029 def CheckPrereq(self):
2030 """Check prerequisites.
2032 This checks that the instance is in the cluster.
2035 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2036 assert self.instance is not None, \
2037 "Cannot retrieve locked instance %s" % self.op.instance_name
2039 def Exec(self, feedback_fn):
2040 """Activate the disks.
2043 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2045 raise errors.OpExecError("Cannot activate block devices")
2050 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2051 """Prepare the block devices for an instance.
2053 This sets up the block devices on all nodes.
2056 instance: a ganeti.objects.Instance object
2057 ignore_secondaries: if true, errors on secondary nodes won't result
2058 in an error return from the function
2061 false if the operation failed
2062 list of (host, instance_visible_name, node_visible_name) if the operation
2063 suceeded with the mapping from node devices to instance devices
2067 iname = instance.name
2068 # With the two passes mechanism we try to reduce the window of
2069 # opportunity for the race condition of switching DRBD to primary
2070 # before handshaking occured, but we do not eliminate it
2072 # The proper fix would be to wait (with some limits) until the
2073 # connection has been made and drbd transitions from WFConnection
2074 # into any other network-connected state (Connected, SyncTarget,
2077 # 1st pass, assemble on all nodes in secondary mode
2078 for inst_disk in instance.disks:
2079 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2080 lu.cfg.SetDiskID(node_disk, node)
2081 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2083 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2084 " (is_primary=False, pass=1)",
2085 inst_disk.iv_name, node)
2086 if not ignore_secondaries:
2089 # FIXME: race condition on drbd migration to primary
2091 # 2nd pass, do only the primary node
2092 for inst_disk in instance.disks:
2093 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2094 if node != instance.primary_node:
2096 lu.cfg.SetDiskID(node_disk, node)
2097 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2099 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2100 " (is_primary=True, pass=2)",
2101 inst_disk.iv_name, node)
2103 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2105 # leave the disks configured for the primary node
2106 # this is a workaround that would be fixed better by
2107 # improving the logical/physical id handling
2108 for disk in instance.disks:
2109 lu.cfg.SetDiskID(disk, instance.primary_node)
2111 return disks_ok, device_info
2114 def _StartInstanceDisks(lu, instance, force):
2115 """Start the disks of an instance.
2118 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2119 ignore_secondaries=force)
2121 _ShutdownInstanceDisks(lu, instance)
2122 if force is not None and not force:
2123 lu.proc.LogWarning("", hint="If the message above refers to a"
2125 " you can retry the operation using '--force'.")
2126 raise errors.OpExecError("Disk consistency error")
2129 class LUDeactivateInstanceDisks(NoHooksLU):
2130 """Shutdown an instance's disks.
2133 _OP_REQP = ["instance_name"]
2136 def ExpandNames(self):
2137 self._ExpandAndLockInstance()
2138 self.needed_locks[locking.LEVEL_NODE] = []
2139 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2141 def DeclareLocks(self, level):
2142 if level == locking.LEVEL_NODE:
2143 self._LockInstancesNodes()
2145 def CheckPrereq(self):
2146 """Check prerequisites.
2148 This checks that the instance is in the cluster.
2151 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2152 assert self.instance is not None, \
2153 "Cannot retrieve locked instance %s" % self.op.instance_name
2155 def Exec(self, feedback_fn):
2156 """Deactivate the disks
2159 instance = self.instance
2160 _SafeShutdownInstanceDisks(self, instance)
2163 def _SafeShutdownInstanceDisks(lu, instance):
2164 """Shutdown block devices of an instance.
2166 This function checks if an instance is running, before calling
2167 _ShutdownInstanceDisks.
2170 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2171 [instance.hypervisor])
2172 ins_l = ins_l[instance.primary_node]
2173 if not type(ins_l) is list:
2174 raise errors.OpExecError("Can't contact node '%s'" %
2175 instance.primary_node)
2177 if instance.name in ins_l:
2178 raise errors.OpExecError("Instance is running, can't shutdown"
2181 _ShutdownInstanceDisks(lu, instance)
2184 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2185 """Shutdown block devices of an instance.
2187 This does the shutdown on all nodes of the instance.
2189 If the ignore_primary is false, errors on the primary node are
2194 for disk in instance.disks:
2195 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2196 lu.cfg.SetDiskID(top_disk, node)
2197 if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2198 logging.error("Could not shutdown block device %s on node %s",
2200 if not ignore_primary or node != instance.primary_node:
2205 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2206 """Checks if a node has enough free memory.
2208 This function check if a given node has the needed amount of free
2209 memory. In case the node has less memory or we cannot get the
2210 information from the node, this function raise an OpPrereqError
2213 @type lu: C{LogicalUnit}
2214 @param lu: a logical unit from which we get configuration data
2216 @param node: the node to check
2217 @type reason: C{str}
2218 @param reason: string to use in the error message
2219 @type requested: C{int}
2220 @param requested: the amount of memory in MiB to check for
2221 @type hypervisor: C{str}
2222 @param hypervisor: the hypervisor to ask for memory stats
2223 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2224 we cannot check the node
2227 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2228 if not nodeinfo or not isinstance(nodeinfo, dict):
2229 raise errors.OpPrereqError("Could not contact node %s for resource"
2230 " information" % (node,))
2232 free_mem = nodeinfo[node].get('memory_free')
2233 if not isinstance(free_mem, int):
2234 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2235 " was '%s'" % (node, free_mem))
2236 if requested > free_mem:
2237 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2238 " needed %s MiB, available %s MiB" %
2239 (node, reason, requested, free_mem))
2242 class LUStartupInstance(LogicalUnit):
2243 """Starts an instance.
2246 HPATH = "instance-start"
2247 HTYPE = constants.HTYPE_INSTANCE
2248 _OP_REQP = ["instance_name", "force"]
2251 def ExpandNames(self):
2252 self._ExpandAndLockInstance()
2253 self.needed_locks[locking.LEVEL_NODE] = []
2254 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2256 def DeclareLocks(self, level):
2257 if level == locking.LEVEL_NODE:
2258 self._LockInstancesNodes()
2260 def BuildHooksEnv(self):
2263 This runs on master, primary and secondary nodes of the instance.
2267 "FORCE": self.op.force,
2269 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2270 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2271 list(self.instance.secondary_nodes))
2274 def CheckPrereq(self):
2275 """Check prerequisites.
2277 This checks that the instance is in the cluster.
2280 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2281 assert self.instance is not None, \
2282 "Cannot retrieve locked instance %s" % self.op.instance_name
2284 bep = self.cfg.GetClusterInfo().FillBE(instance)
2285 # check bridges existance
2286 _CheckInstanceBridgesExist(self, instance)
2288 _CheckNodeFreeMemory(self, instance.primary_node,
2289 "starting instance %s" % instance.name,
2290 bep[constants.BE_MEMORY], instance.hypervisor)
2292 def Exec(self, feedback_fn):
2293 """Start the instance.
2296 instance = self.instance
2297 force = self.op.force
2298 extra_args = getattr(self.op, "extra_args", "")
2300 self.cfg.MarkInstanceUp(instance.name)
2302 node_current = instance.primary_node
2304 _StartInstanceDisks(self, instance, force)
2306 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2307 _ShutdownInstanceDisks(self, instance)
2308 raise errors.OpExecError("Could not start instance")
2311 class LURebootInstance(LogicalUnit):
2312 """Reboot an instance.
2315 HPATH = "instance-reboot"
2316 HTYPE = constants.HTYPE_INSTANCE
2317 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2320 def ExpandNames(self):
2321 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2322 constants.INSTANCE_REBOOT_HARD,
2323 constants.INSTANCE_REBOOT_FULL]:
2324 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2325 (constants.INSTANCE_REBOOT_SOFT,
2326 constants.INSTANCE_REBOOT_HARD,
2327 constants.INSTANCE_REBOOT_FULL))
2328 self._ExpandAndLockInstance()
2329 self.needed_locks[locking.LEVEL_NODE] = []
2330 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2332 def DeclareLocks(self, level):
2333 if level == locking.LEVEL_NODE:
2334 primary_only = not constants.INSTANCE_REBOOT_FULL
2335 self._LockInstancesNodes(primary_only=primary_only)
2337 def BuildHooksEnv(self):
2340 This runs on master, primary and secondary nodes of the instance.
2344 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2346 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2347 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2348 list(self.instance.secondary_nodes))
2351 def CheckPrereq(self):
2352 """Check prerequisites.
2354 This checks that the instance is in the cluster.
2357 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2358 assert self.instance is not None, \
2359 "Cannot retrieve locked instance %s" % self.op.instance_name
2361 # check bridges existance
2362 _CheckInstanceBridgesExist(self, instance)
2364 def Exec(self, feedback_fn):
2365 """Reboot the instance.
2368 instance = self.instance
2369 ignore_secondaries = self.op.ignore_secondaries
2370 reboot_type = self.op.reboot_type
2371 extra_args = getattr(self.op, "extra_args", "")
2373 node_current = instance.primary_node
2375 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2376 constants.INSTANCE_REBOOT_HARD]:
2377 if not self.rpc.call_instance_reboot(node_current, instance,
2378 reboot_type, extra_args):
2379 raise errors.OpExecError("Could not reboot instance")
2381 if not self.rpc.call_instance_shutdown(node_current, instance):
2382 raise errors.OpExecError("could not shutdown instance for full reboot")
2383 _ShutdownInstanceDisks(self, instance)
2384 _StartInstanceDisks(self, instance, ignore_secondaries)
2385 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2386 _ShutdownInstanceDisks(self, instance)
2387 raise errors.OpExecError("Could not start instance for full reboot")
2389 self.cfg.MarkInstanceUp(instance.name)
2392 class LUShutdownInstance(LogicalUnit):
2393 """Shutdown an instance.
2396 HPATH = "instance-stop"
2397 HTYPE = constants.HTYPE_INSTANCE
2398 _OP_REQP = ["instance_name"]
2401 def ExpandNames(self):
2402 self._ExpandAndLockInstance()
2403 self.needed_locks[locking.LEVEL_NODE] = []
2404 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2406 def DeclareLocks(self, level):
2407 if level == locking.LEVEL_NODE:
2408 self._LockInstancesNodes()
2410 def BuildHooksEnv(self):
2413 This runs on master, primary and secondary nodes of the instance.
2416 env = _BuildInstanceHookEnvByObject(self, self.instance)
2417 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2418 list(self.instance.secondary_nodes))
2421 def CheckPrereq(self):
2422 """Check prerequisites.
2424 This checks that the instance is in the cluster.
2427 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2428 assert self.instance is not None, \
2429 "Cannot retrieve locked instance %s" % self.op.instance_name
2431 def Exec(self, feedback_fn):
2432 """Shutdown the instance.
2435 instance = self.instance
2436 node_current = instance.primary_node
2437 self.cfg.MarkInstanceDown(instance.name)
2438 if not self.rpc.call_instance_shutdown(node_current, instance):
2439 self.proc.LogWarning("Could not shutdown instance")
2441 _ShutdownInstanceDisks(self, instance)
2444 class LUReinstallInstance(LogicalUnit):
2445 """Reinstall an instance.
2448 HPATH = "instance-reinstall"
2449 HTYPE = constants.HTYPE_INSTANCE
2450 _OP_REQP = ["instance_name"]
2453 def ExpandNames(self):
2454 self._ExpandAndLockInstance()
2455 self.needed_locks[locking.LEVEL_NODE] = []
2456 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2458 def DeclareLocks(self, level):
2459 if level == locking.LEVEL_NODE:
2460 self._LockInstancesNodes()
2462 def BuildHooksEnv(self):
2465 This runs on master, primary and secondary nodes of the instance.
2468 env = _BuildInstanceHookEnvByObject(self, self.instance)
2469 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2470 list(self.instance.secondary_nodes))
2473 def CheckPrereq(self):
2474 """Check prerequisites.
2476 This checks that the instance is in the cluster and is not running.
2479 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2480 assert instance is not None, \
2481 "Cannot retrieve locked instance %s" % self.op.instance_name
2483 if instance.disk_template == constants.DT_DISKLESS:
2484 raise errors.OpPrereqError("Instance '%s' has no disks" %
2485 self.op.instance_name)
2486 if instance.status != "down":
2487 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2488 self.op.instance_name)
2489 remote_info = self.rpc.call_instance_info(instance.primary_node,
2491 instance.hypervisor)
2493 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2494 (self.op.instance_name,
2495 instance.primary_node))
2497 self.op.os_type = getattr(self.op, "os_type", None)
2498 if self.op.os_type is not None:
2500 pnode = self.cfg.GetNodeInfo(
2501 self.cfg.ExpandNodeName(instance.primary_node))
2503 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2505 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2507 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2508 " primary node" % self.op.os_type)
2510 self.instance = instance
2512 def Exec(self, feedback_fn):
2513 """Reinstall the instance.
2516 inst = self.instance
2518 if self.op.os_type is not None:
2519 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2520 inst.os = self.op.os_type
2521 self.cfg.Update(inst)
2523 _StartInstanceDisks(self, inst, None)
2525 feedback_fn("Running the instance OS create scripts...")
2526 if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2527 raise errors.OpExecError("Could not install OS for instance %s"
2529 (inst.name, inst.primary_node))
2531 _ShutdownInstanceDisks(self, inst)
2534 class LURenameInstance(LogicalUnit):
2535 """Rename an instance.
2538 HPATH = "instance-rename"
2539 HTYPE = constants.HTYPE_INSTANCE
2540 _OP_REQP = ["instance_name", "new_name"]
2542 def BuildHooksEnv(self):
2545 This runs on master, primary and secondary nodes of the instance.
2548 env = _BuildInstanceHookEnvByObject(self, self.instance)
2549 env["INSTANCE_NEW_NAME"] = self.op.new_name
2550 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2551 list(self.instance.secondary_nodes))
2554 def CheckPrereq(self):
2555 """Check prerequisites.
2557 This checks that the instance is in the cluster and is not running.
2560 instance = self.cfg.GetInstanceInfo(
2561 self.cfg.ExpandInstanceName(self.op.instance_name))
2562 if instance is None:
2563 raise errors.OpPrereqError("Instance '%s' not known" %
2564 self.op.instance_name)
2565 if instance.status != "down":
2566 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2567 self.op.instance_name)
2568 remote_info = self.rpc.call_instance_info(instance.primary_node,
2570 instance.hypervisor)
2572 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2573 (self.op.instance_name,
2574 instance.primary_node))
2575 self.instance = instance
2577 # new name verification
2578 name_info = utils.HostInfo(self.op.new_name)
2580 self.op.new_name = new_name = name_info.name
2581 instance_list = self.cfg.GetInstanceList()
2582 if new_name in instance_list:
2583 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2586 if not getattr(self.op, "ignore_ip", False):
2587 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2588 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2589 (name_info.ip, new_name))
2592 def Exec(self, feedback_fn):
2593 """Reinstall the instance.
2596 inst = self.instance
2597 old_name = inst.name
2599 if inst.disk_template == constants.DT_FILE:
2600 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2602 self.cfg.RenameInstance(inst.name, self.op.new_name)
2603 # Change the instance lock. This is definitely safe while we hold the BGL
2604 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2605 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2607 # re-read the instance from the configuration after rename
2608 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2610 if inst.disk_template == constants.DT_FILE:
2611 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2612 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2613 old_file_storage_dir,
2614 new_file_storage_dir)
2617 raise errors.OpExecError("Could not connect to node '%s' to rename"
2618 " directory '%s' to '%s' (but the instance"
2619 " has been renamed in Ganeti)" % (
2620 inst.primary_node, old_file_storage_dir,
2621 new_file_storage_dir))
2624 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2625 " (but the instance has been renamed in"
2626 " Ganeti)" % (old_file_storage_dir,
2627 new_file_storage_dir))
2629 _StartInstanceDisks(self, inst, None)
2631 if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2633 msg = ("Could not run OS rename script for instance %s on node %s"
2634 " (but the instance has been renamed in Ganeti)" %
2635 (inst.name, inst.primary_node))
2636 self.proc.LogWarning(msg)
2638 _ShutdownInstanceDisks(self, inst)
2641 class LURemoveInstance(LogicalUnit):
2642 """Remove an instance.
2645 HPATH = "instance-remove"
2646 HTYPE = constants.HTYPE_INSTANCE
2647 _OP_REQP = ["instance_name", "ignore_failures"]
2650 def ExpandNames(self):
2651 self._ExpandAndLockInstance()
2652 self.needed_locks[locking.LEVEL_NODE] = []
2653 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2655 def DeclareLocks(self, level):
2656 if level == locking.LEVEL_NODE:
2657 self._LockInstancesNodes()
2659 def BuildHooksEnv(self):
2662 This runs on master, primary and secondary nodes of the instance.
2665 env = _BuildInstanceHookEnvByObject(self, self.instance)
2666 nl = [self.cfg.GetMasterNode()]
2669 def CheckPrereq(self):
2670 """Check prerequisites.
2672 This checks that the instance is in the cluster.
2675 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2676 assert self.instance is not None, \
2677 "Cannot retrieve locked instance %s" % self.op.instance_name
2679 def Exec(self, feedback_fn):
2680 """Remove the instance.
2683 instance = self.instance
2684 logging.info("Shutting down instance %s on node %s",
2685 instance.name, instance.primary_node)
2687 if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2688 if self.op.ignore_failures:
2689 feedback_fn("Warning: can't shutdown instance")
2691 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2692 (instance.name, instance.primary_node))
2694 logging.info("Removing block devices for instance %s", instance.name)
2696 if not _RemoveDisks(self, instance):
2697 if self.op.ignore_failures:
2698 feedback_fn("Warning: can't remove instance's disks")
2700 raise errors.OpExecError("Can't remove instance's disks")
2702 logging.info("Removing instance %s out of cluster config", instance.name)
2704 self.cfg.RemoveInstance(instance.name)
2705 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2708 class LUQueryInstances(NoHooksLU):
2709 """Logical unit for querying instances.
2712 _OP_REQP = ["output_fields", "names"]
2715 def ExpandNames(self):
2716 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2717 hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2718 bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2719 self.static_fields = frozenset([
2720 "name", "os", "pnode", "snodes",
2721 "admin_state", "admin_ram",
2722 "disk_template", "ip", "mac", "bridge",
2723 "sda_size", "sdb_size", "vcpus", "tags",
2724 "network_port", "beparams",
2725 "serial_no", "hypervisor", "hvparams",
2728 _CheckOutputFields(static=self.static_fields,
2729 dynamic=self.dynamic_fields,
2730 selected=self.op.output_fields)
2732 self.needed_locks = {}
2733 self.share_locks[locking.LEVEL_INSTANCE] = 1
2734 self.share_locks[locking.LEVEL_NODE] = 1
2737 self.wanted = _GetWantedInstances(self, self.op.names)
2739 self.wanted = locking.ALL_SET
2741 self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2743 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2744 self.needed_locks[locking.LEVEL_NODE] = []
2745 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2747 def DeclareLocks(self, level):
2748 if level == locking.LEVEL_NODE and self.do_locking:
2749 self._LockInstancesNodes()
2751 def CheckPrereq(self):
2752 """Check prerequisites.
2757 def Exec(self, feedback_fn):
2758 """Computes the list of nodes and their attributes.
2761 all_info = self.cfg.GetAllInstancesInfo()
2763 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2764 elif self.wanted != locking.ALL_SET:
2765 instance_names = self.wanted
2766 missing = set(instance_names).difference(all_info.keys())
2768 raise errors.OpExecError(
2769 "Some instances were removed before retrieving their data: %s"
2772 instance_names = all_info.keys()
2774 instance_names = utils.NiceSort(instance_names)
2775 instance_list = [all_info[iname] for iname in instance_names]
2777 # begin data gathering
2779 nodes = frozenset([inst.primary_node for inst in instance_list])
2780 hv_list = list(set([inst.hypervisor for inst in instance_list]))
2783 if self.dynamic_fields.intersection(self.op.output_fields):
2785 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2787 result = node_data[name]
2789 live_data.update(result)
2790 elif result == False:
2791 bad_nodes.append(name)
2792 # else no instance is alive
2794 live_data = dict([(name, {}) for name in instance_names])
2796 # end data gathering
2801 for instance in instance_list:
2803 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2804 i_be = self.cfg.GetClusterInfo().FillBE(instance)
2805 for field in self.op.output_fields:
2810 elif field == "pnode":
2811 val = instance.primary_node
2812 elif field == "snodes":
2813 val = list(instance.secondary_nodes)
2814 elif field == "admin_state":
2815 val = (instance.status != "down")
2816 elif field == "oper_state":
2817 if instance.primary_node in bad_nodes:
2820 val = bool(live_data.get(instance.name))
2821 elif field == "status":
2822 if instance.primary_node in bad_nodes:
2823 val = "ERROR_nodedown"
2825 running = bool(live_data.get(instance.name))
2827 if instance.status != "down":
2832 if instance.status != "down":
2836 elif field == "oper_ram":
2837 if instance.primary_node in bad_nodes:
2839 elif instance.name in live_data:
2840 val = live_data[instance.name].get("memory", "?")
2843 elif field == "disk_template":
2844 val = instance.disk_template
2846 val = instance.nics[0].ip
2847 elif field == "bridge":
2848 val = instance.nics[0].bridge
2849 elif field == "mac":
2850 val = instance.nics[0].mac
2851 elif field == "sda_size" or field == "sdb_size":
2852 disk = instance.FindDisk(field[:3])
2857 elif field == "tags":
2858 val = list(instance.GetTags())
2859 elif field == "serial_no":
2860 val = instance.serial_no
2861 elif field == "network_port":
2862 val = instance.network_port
2863 elif field == "hypervisor":
2864 val = instance.hypervisor
2865 elif field == "hvparams":
2867 elif (field.startswith(HVPREFIX) and
2868 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2869 val = i_hv.get(field[len(HVPREFIX):], None)
2870 elif field == "beparams":
2872 elif (field.startswith(BEPREFIX) and
2873 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2874 val = i_be.get(field[len(BEPREFIX):], None)
2876 raise errors.ParameterError(field)
2883 class LUFailoverInstance(LogicalUnit):
2884 """Failover an instance.
2887 HPATH = "instance-failover"
2888 HTYPE = constants.HTYPE_INSTANCE
2889 _OP_REQP = ["instance_name", "ignore_consistency"]
2892 def ExpandNames(self):
2893 self._ExpandAndLockInstance()
2894 self.needed_locks[locking.LEVEL_NODE] = []
2895 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2897 def DeclareLocks(self, level):
2898 if level == locking.LEVEL_NODE:
2899 self._LockInstancesNodes()
2901 def BuildHooksEnv(self):
2904 This runs on master, primary and secondary nodes of the instance.
2908 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2910 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2911 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2914 def CheckPrereq(self):
2915 """Check prerequisites.
2917 This checks that the instance is in the cluster.
2920 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2921 assert self.instance is not None, \
2922 "Cannot retrieve locked instance %s" % self.op.instance_name
2924 bep = self.cfg.GetClusterInfo().FillBE(instance)
2925 if instance.disk_template not in constants.DTS_NET_MIRROR:
2926 raise errors.OpPrereqError("Instance's disk layout is not"
2927 " network mirrored, cannot failover.")
2929 secondary_nodes = instance.secondary_nodes
2930 if not secondary_nodes:
2931 raise errors.ProgrammerError("no secondary node but using "
2932 "a mirrored disk template")
2934 target_node = secondary_nodes[0]
2935 # check memory requirements on the secondary node
2936 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2937 instance.name, bep[constants.BE_MEMORY],
2938 instance.hypervisor)
2940 # check bridge existance
2941 brlist = [nic.bridge for nic in instance.nics]
2942 if not self.rpc.call_bridges_exist(target_node, brlist):
2943 raise errors.OpPrereqError("One or more target bridges %s does not"
2944 " exist on destination node '%s'" %
2945 (brlist, target_node))
2947 def Exec(self, feedback_fn):
2948 """Failover an instance.
2950 The failover is done by shutting it down on its present node and
2951 starting it on the secondary.
2954 instance = self.instance
2956 source_node = instance.primary_node
2957 target_node = instance.secondary_nodes[0]
2959 feedback_fn("* checking disk consistency between source and target")
2960 for dev in instance.disks:
2961 # for drbd, these are drbd over lvm
2962 if not _CheckDiskConsistency(self, dev, target_node, False):
2963 if instance.status == "up" and not self.op.ignore_consistency:
2964 raise errors.OpExecError("Disk %s is degraded on target node,"
2965 " aborting failover." % dev.iv_name)
2967 feedback_fn("* shutting down instance on source node")
2968 logging.info("Shutting down instance %s on node %s",
2969 instance.name, source_node)
2971 if not self.rpc.call_instance_shutdown(source_node, instance):
2972 if self.op.ignore_consistency:
2973 self.proc.LogWarning("Could not shutdown instance %s on node %s."
2975 " anyway. Please make sure node %s is down",
2976 instance.name, source_node, source_node)
2978 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2979 (instance.name, source_node))
2981 feedback_fn("* deactivating the instance's disks on source node")
2982 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2983 raise errors.OpExecError("Can't shut down the instance's disks.")
2985 instance.primary_node = target_node
2986 # distribute new instance config to the other nodes
2987 self.cfg.Update(instance)
2989 # Only start the instance if it's marked as up
2990 if instance.status == "up":
2991 feedback_fn("* activating the instance's disks on target node")
2992 logging.info("Starting instance %s on node %s",
2993 instance.name, target_node)
2995 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2996 ignore_secondaries=True)
2998 _ShutdownInstanceDisks(self, instance)
2999 raise errors.OpExecError("Can't activate the instance's disks")
3001 feedback_fn("* starting the instance on the target node")
3002 if not self.rpc.call_instance_start(target_node, instance, None):
3003 _ShutdownInstanceDisks(self, instance)
3004 raise errors.OpExecError("Could not start instance %s on node %s." %
3005 (instance.name, target_node))
3008 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3009 """Create a tree of block devices on the primary node.
3011 This always creates all devices.
3015 for child in device.children:
3016 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3019 lu.cfg.SetDiskID(device, node)
3020 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3021 instance.name, True, info)
3024 if device.physical_id is None:
3025 device.physical_id = new_id
3029 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3030 """Create a tree of block devices on a secondary node.
3032 If this device type has to be created on secondaries, create it and
3035 If not, just recurse to children keeping the same 'force' value.
3038 if device.CreateOnSecondary():
3041 for child in device.children:
3042 if not _CreateBlockDevOnSecondary(lu, node, instance,
3043 child, force, info):
3048 lu.cfg.SetDiskID(device, node)
3049 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3050 instance.name, False, info)
3053 if device.physical_id is None:
3054 device.physical_id = new_id
3058 def _GenerateUniqueNames(lu, exts):
3059 """Generate a suitable LV name.
3061 This will generate a logical volume name for the given instance.
3066 new_id = lu.cfg.GenerateUniqueID()
3067 results.append("%s%s" % (new_id, val))
3071 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3073 """Generate a drbd8 device complete with its children.
3076 port = lu.cfg.AllocatePort()
3077 vgname = lu.cfg.GetVGName()
3078 shared_secret = lu.cfg.GenerateDRBDSecret()
3079 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3080 logical_id=(vgname, names[0]))
3081 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3082 logical_id=(vgname, names[1]))
3083 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3084 logical_id=(primary, secondary, port,
3087 children=[dev_data, dev_meta],
3092 def _GenerateDiskTemplate(lu, template_name,
3093 instance_name, primary_node,
3094 secondary_nodes, disk_sz, swap_sz,
3095 file_storage_dir, file_driver):
3096 """Generate the entire disk layout for a given template type.
3099 #TODO: compute space requirements
3101 vgname = lu.cfg.GetVGName()
3102 if template_name == constants.DT_DISKLESS:
3104 elif template_name == constants.DT_PLAIN:
3105 if len(secondary_nodes) != 0:
3106 raise errors.ProgrammerError("Wrong template configuration")
3108 names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3109 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3110 logical_id=(vgname, names[0]),
3112 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3113 logical_id=(vgname, names[1]),
3115 disks = [sda_dev, sdb_dev]
3116 elif template_name == constants.DT_DRBD8:
3117 if len(secondary_nodes) != 1:
3118 raise errors.ProgrammerError("Wrong template configuration")
3119 remote_node = secondary_nodes[0]
3120 (minor_pa, minor_pb,
3121 minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3122 [primary_node, primary_node, remote_node, remote_node], instance_name)
3124 names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3125 ".sdb_data", ".sdb_meta"])
3126 drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3127 disk_sz, names[0:2], "sda",
3129 drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3130 swap_sz, names[2:4], "sdb",
3132 disks = [drbd_sda_dev, drbd_sdb_dev]
3133 elif template_name == constants.DT_FILE:
3134 if len(secondary_nodes) != 0:
3135 raise errors.ProgrammerError("Wrong template configuration")
3137 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3138 iv_name="sda", logical_id=(file_driver,
3139 "%s/sda" % file_storage_dir))
3140 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3141 iv_name="sdb", logical_id=(file_driver,
3142 "%s/sdb" % file_storage_dir))
3143 disks = [file_sda_dev, file_sdb_dev]
3145 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3149 def _GetInstanceInfoText(instance):
3150 """Compute that text that should be added to the disk's metadata.
3153 return "originstname+%s" % instance.name
3156 def _CreateDisks(lu, instance):
3157 """Create all disks for an instance.
3159 This abstracts away some work from AddInstance.
3162 instance: the instance object
3165 True or False showing the success of the creation process
3168 info = _GetInstanceInfoText(instance)
3170 if instance.disk_template == constants.DT_FILE:
3171 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3172 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3176 logging.error("Could not connect to node '%s'", instance.primary_node)
3180 logging.error("Failed to create directory '%s'", file_storage_dir)
3183 for device in instance.disks:
3184 logging.info("Creating volume %s for instance %s",
3185 device.iv_name, instance.name)
3187 for secondary_node in instance.secondary_nodes:
3188 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3189 device, False, info):
3190 logging.error("Failed to create volume %s (%s) on secondary node %s!",
3191 device.iv_name, device, secondary_node)
3194 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3195 instance, device, info):
3196 logging.error("Failed to create volume %s on primary!", device.iv_name)
3202 def _RemoveDisks(lu, instance):
3203 """Remove all disks for an instance.
3205 This abstracts away some work from `AddInstance()` and
3206 `RemoveInstance()`. Note that in case some of the devices couldn't
3207 be removed, the removal will continue with the other ones (compare
3208 with `_CreateDisks()`).
3211 instance: the instance object
3214 True or False showing the success of the removal proces
3217 logging.info("Removing block devices for instance %s", instance.name)
3220 for device in instance.disks:
3221 for node, disk in device.ComputeNodeTree(instance.primary_node):
3222 lu.cfg.SetDiskID(disk, node)
3223 if not lu.rpc.call_blockdev_remove(node, disk):
3224 lu.proc.LogWarning("Could not remove block device %s on node %s,"
3225 " continuing anyway", device.iv_name, node)
3228 if instance.disk_template == constants.DT_FILE:
3229 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3230 if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3232 logging.error("Could not remove directory '%s'", file_storage_dir)
3238 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3239 """Compute disk size requirements in the volume group
3241 This is currently hard-coded for the two-drive layout.
3244 # Required free disk space as a function of disk and swap space
3246 constants.DT_DISKLESS: None,
3247 constants.DT_PLAIN: disk_size + swap_size,
3248 # 256 MB are added for drbd metadata, 128MB for each drbd device
3249 constants.DT_DRBD8: disk_size + swap_size + 256,
3250 constants.DT_FILE: None,
3253 if disk_template not in req_size_dict:
3254 raise errors.ProgrammerError("Disk template '%s' size requirement"
3255 " is unknown" % disk_template)
3257 return req_size_dict[disk_template]
3260 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3261 """Hypervisor parameter validation.
3263 This function abstract the hypervisor parameter validation to be
3264 used in both instance create and instance modify.
3266 @type lu: L{LogicalUnit}
3267 @param lu: the logical unit for which we check
3268 @type nodenames: list
3269 @param nodenames: the list of nodes on which we should check
3270 @type hvname: string
3271 @param hvname: the name of the hypervisor we should use
3272 @type hvparams: dict
3273 @param hvparams: the parameters which we need to check
3274 @raise errors.OpPrereqError: if the parameters are not valid
3277 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3280 for node in nodenames:
3281 info = hvinfo.get(node, None)
3282 if not info or not isinstance(info, (tuple, list)):
3283 raise errors.OpPrereqError("Cannot get current information"
3284 " from node '%s' (%s)" % (node, info))
3286 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3290 class LUCreateInstance(LogicalUnit):
3291 """Create an instance.
3294 HPATH = "instance-add"
3295 HTYPE = constants.HTYPE_INSTANCE
3296 _OP_REQP = ["instance_name", "disk_size",
3297 "disk_template", "swap_size", "mode", "start",
3298 "wait_for_sync", "ip_check", "mac",
3299 "hvparams", "beparams"]
3302 def _ExpandNode(self, node):
3303 """Expands and checks one node name.
3306 node_full = self.cfg.ExpandNodeName(node)
3307 if node_full is None:
3308 raise errors.OpPrereqError("Unknown node %s" % node)
3311 def ExpandNames(self):
3312 """ExpandNames for CreateInstance.
3314 Figure out the right locks for instance creation.
3317 self.needed_locks = {}
3319 # set optional parameters to none if they don't exist
3320 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3321 if not hasattr(self.op, attr):
3322 setattr(self.op, attr, None)
3324 # cheap checks, mostly valid constants given
3326 # verify creation mode
3327 if self.op.mode not in (constants.INSTANCE_CREATE,
3328 constants.INSTANCE_IMPORT):
3329 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3332 # disk template and mirror node verification
3333 if self.op.disk_template not in constants.DISK_TEMPLATES:
3334 raise errors.OpPrereqError("Invalid disk template name")
3336 if self.op.hypervisor is None:
3337 self.op.hypervisor = self.cfg.GetHypervisorType()
3339 cluster = self.cfg.GetClusterInfo()
3340 enabled_hvs = cluster.enabled_hypervisors
3341 if self.op.hypervisor not in enabled_hvs:
3342 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3343 " cluster (%s)" % (self.op.hypervisor,
3344 ",".join(enabled_hvs)))
3346 # check hypervisor parameter syntax (locally)
3348 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3350 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3351 hv_type.CheckParameterSyntax(filled_hvp)
3353 # fill and remember the beparams dict
3354 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3357 #### instance parameters check
3359 # instance name verification
3360 hostname1 = utils.HostInfo(self.op.instance_name)
3361 self.op.instance_name = instance_name = hostname1.name
3363 # this is just a preventive check, but someone might still add this
3364 # instance in the meantime, and creation will fail at lock-add time
3365 if instance_name in self.cfg.GetInstanceList():
3366 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3369 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3371 # ip validity checks
3372 ip = getattr(self.op, "ip", None)
3373 if ip is None or ip.lower() == "none":
3375 elif ip.lower() == constants.VALUE_AUTO:
3376 inst_ip = hostname1.ip
3378 if not utils.IsValidIP(ip):
3379 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3380 " like a valid IP" % ip)
3382 self.inst_ip = self.op.ip = inst_ip
3383 # used in CheckPrereq for ip ping check
3384 self.check_ip = hostname1.ip
3386 # MAC address verification
3387 if self.op.mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3388 if not utils.IsValidMac(self.op.mac.lower()):
3389 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3392 # file storage checks
3393 if (self.op.file_driver and
3394 not self.op.file_driver in constants.FILE_DRIVER):
3395 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3396 self.op.file_driver)
3398 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3399 raise errors.OpPrereqError("File storage directory path not absolute")
3401 ### Node/iallocator related checks
3402 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3403 raise errors.OpPrereqError("One and only one of iallocator and primary"
3404 " node must be given")
3406 if self.op.iallocator:
3407 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3409 self.op.pnode = self._ExpandNode(self.op.pnode)
3410 nodelist = [self.op.pnode]
3411 if self.op.snode is not None:
3412 self.op.snode = self._ExpandNode(self.op.snode)
3413 nodelist.append(self.op.snode)
3414 self.needed_locks[locking.LEVEL_NODE] = nodelist
3416 # in case of import lock the source node too
3417 if self.op.mode == constants.INSTANCE_IMPORT:
3418 src_node = getattr(self.op, "src_node", None)
3419 src_path = getattr(self.op, "src_path", None)
3421 if src_node is None or src_path is None:
3422 raise errors.OpPrereqError("Importing an instance requires source"
3423 " node and path options")
3425 if not os.path.isabs(src_path):
3426 raise errors.OpPrereqError("The source path must be absolute")
3428 self.op.src_node = src_node = self._ExpandNode(src_node)
3429 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3430 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3432 else: # INSTANCE_CREATE
3433 if getattr(self.op, "os_type", None) is None:
3434 raise errors.OpPrereqError("No guest OS specified")
3436 def _RunAllocator(self):
3437 """Run the allocator based on input opcode.
3440 disks = [{"size": self.op.disk_size, "mode": "w"},
3441 {"size": self.op.swap_size, "mode": "w"}]
3442 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3443 "bridge": self.op.bridge}]
3444 ial = IAllocator(self,
3445 mode=constants.IALLOCATOR_MODE_ALLOC,
3446 name=self.op.instance_name,
3447 disk_template=self.op.disk_template,
3450 vcpus=self.be_full[constants.BE_VCPUS],
3451 mem_size=self.be_full[constants.BE_MEMORY],
3456 ial.Run(self.op.iallocator)
3459 raise errors.OpPrereqError("Can't compute nodes using"
3460 " iallocator '%s': %s" % (self.op.iallocator,
3462 if len(ial.nodes) != ial.required_nodes:
3463 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3464 " of nodes (%s), required %s" %
3465 (self.op.iallocator, len(ial.nodes),
3466 ial.required_nodes))
3467 self.op.pnode = ial.nodes[0]
3468 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3469 self.op.instance_name, self.op.iallocator,
3470 ", ".join(ial.nodes))
3471 if ial.required_nodes == 2:
3472 self.op.snode = ial.nodes[1]
3474 def BuildHooksEnv(self):
3477 This runs on master, primary and secondary nodes of the instance.
3481 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3482 "INSTANCE_DISK_SIZE": self.op.disk_size,
3483 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3484 "INSTANCE_ADD_MODE": self.op.mode,
3486 if self.op.mode == constants.INSTANCE_IMPORT:
3487 env["INSTANCE_SRC_NODE"] = self.op.src_node
3488 env["INSTANCE_SRC_PATH"] = self.op.src_path
3489 env["INSTANCE_SRC_IMAGES"] = self.src_images
3491 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3492 primary_node=self.op.pnode,
3493 secondary_nodes=self.secondaries,
3494 status=self.instance_status,
3495 os_type=self.op.os_type,
3496 memory=self.be_full[constants.BE_MEMORY],
3497 vcpus=self.be_full[constants.BE_VCPUS],
3498 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3501 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3506 def CheckPrereq(self):
3507 """Check prerequisites.
3510 if (not self.cfg.GetVGName() and
3511 self.op.disk_template not in constants.DTS_NOT_LVM):
3512 raise errors.OpPrereqError("Cluster does not support lvm-based"
3516 if self.op.mode == constants.INSTANCE_IMPORT:
3517 src_node = self.op.src_node
3518 src_path = self.op.src_path
3520 export_info = self.rpc.call_export_info(src_node, src_path)
3523 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3525 if not export_info.has_section(constants.INISECT_EXP):
3526 raise errors.ProgrammerError("Corrupted export config")
3528 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3529 if (int(ei_version) != constants.EXPORT_VERSION):
3530 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3531 (ei_version, constants.EXPORT_VERSION))
3533 # Check that the new instance doesn't have less disks than the export
3534 # TODO: substitute "2" with the actual number of disks requested
3536 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3537 if instance_disks < export_disks:
3538 raise errors.OpPrereqError("Not enough disks to import."
3539 " (instance: %d, export: %d)" %
3542 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3544 for idx in range(export_disks):
3545 option = 'disk%d_dump' % idx
3546 if export_info.has_option(constants.INISECT_INS, option):
3547 # FIXME: are the old os-es, disk sizes, etc. useful?
3548 export_name = export_info.get(constants.INISECT_INS, option)
3549 image = os.path.join(src_path, export_name)
3550 disk_images.append(image)
3552 disk_images.append(False)
3554 self.src_images = disk_images
3556 if self.op.mac == constants.VALUE_AUTO:
3557 old_name = export_info.get(constants.INISECT_INS, 'name')
3558 if self.op.instance_name == old_name:
3559 # FIXME: adjust every nic, when we'll be able to create instances
3560 # with more than one
3561 if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3562 self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3564 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3566 if self.op.start and not self.op.ip_check:
3567 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3568 " adding an instance in start mode")
3570 if self.op.ip_check:
3571 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3572 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3573 (self.check_ip, self.op.instance_name))
3575 # bridge verification
3576 bridge = getattr(self.op, "bridge", None)
3578 self.op.bridge = self.cfg.GetDefBridge()
3580 self.op.bridge = bridge
3584 if self.op.iallocator is not None:
3585 self._RunAllocator()
3587 #### node related checks
3589 # check primary node
3590 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3591 assert self.pnode is not None, \
3592 "Cannot retrieve locked node %s" % self.op.pnode
3593 self.secondaries = []
3595 # mirror node verification
3596 if self.op.disk_template in constants.DTS_NET_MIRROR:
3597 if self.op.snode is None:
3598 raise errors.OpPrereqError("The networked disk templates need"
3600 if self.op.snode == pnode.name:
3601 raise errors.OpPrereqError("The secondary node cannot be"
3602 " the primary node.")
3603 self.secondaries.append(self.op.snode)
3605 nodenames = [pnode.name] + self.secondaries
3607 req_size = _ComputeDiskSize(self.op.disk_template,
3608 self.op.disk_size, self.op.swap_size)
3610 # Check lv size requirements
3611 if req_size is not None:
3612 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3614 for node in nodenames:
3615 info = nodeinfo.get(node, None)
3617 raise errors.OpPrereqError("Cannot get current information"
3618 " from node '%s'" % node)
3619 vg_free = info.get('vg_free', None)
3620 if not isinstance(vg_free, int):
3621 raise errors.OpPrereqError("Can't compute free disk space on"
3623 if req_size > info['vg_free']:
3624 raise errors.OpPrereqError("Not enough disk space on target node %s."
3625 " %d MB available, %d MB required" %
3626 (node, info['vg_free'], req_size))
3628 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3631 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3633 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3634 " primary node" % self.op.os_type)
3636 # bridge check on primary node
3637 if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3638 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3639 " destination node '%s'" %
3640 (self.op.bridge, pnode.name))
3642 # memory check on primary node
3644 _CheckNodeFreeMemory(self, self.pnode.name,
3645 "creating instance %s" % self.op.instance_name,
3646 self.be_full[constants.BE_MEMORY],
3650 self.instance_status = 'up'
3652 self.instance_status = 'down'
3654 def Exec(self, feedback_fn):
3655 """Create and add the instance to the cluster.
3658 instance = self.op.instance_name
3659 pnode_name = self.pnode.name
3661 if self.op.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3662 mac_address = self.cfg.GenerateMAC()
3664 mac_address = self.op.mac
3666 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3667 if self.inst_ip is not None:
3668 nic.ip = self.inst_ip
3670 ht_kind = self.op.hypervisor
3671 if ht_kind in constants.HTS_REQ_PORT:
3672 network_port = self.cfg.AllocatePort()
3676 ##if self.op.vnc_bind_address is None:
3677 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3679 # this is needed because os.path.join does not accept None arguments
3680 if self.op.file_storage_dir is None:
3681 string_file_storage_dir = ""
3683 string_file_storage_dir = self.op.file_storage_dir
3685 # build the full file storage dir path
3686 file_storage_dir = os.path.normpath(os.path.join(
3687 self.cfg.GetFileStorageDir(),
3688 string_file_storage_dir, instance))
3691 disks = _GenerateDiskTemplate(self,
3692 self.op.disk_template,
3693 instance, pnode_name,
3694 self.secondaries, self.op.disk_size,
3697 self.op.file_driver)
3699 iobj = objects.Instance(name=instance, os=self.op.os_type,
3700 primary_node=pnode_name,
3701 nics=[nic], disks=disks,
3702 disk_template=self.op.disk_template,
3703 status=self.instance_status,
3704 network_port=network_port,
3705 beparams=self.op.beparams,
3706 hvparams=self.op.hvparams,
3707 hypervisor=self.op.hypervisor,
3710 feedback_fn("* creating instance disks...")
3711 if not _CreateDisks(self, iobj):
3712 _RemoveDisks(self, iobj)
3713 self.cfg.ReleaseDRBDMinors(instance)
3714 raise errors.OpExecError("Device creation failed, reverting...")
3716 feedback_fn("adding instance %s to cluster config" % instance)
3718 self.cfg.AddInstance(iobj)
3719 # Declare that we don't want to remove the instance lock anymore, as we've
3720 # added the instance to the config
3721 del self.remove_locks[locking.LEVEL_INSTANCE]
3722 # Remove the temp. assignements for the instance's drbds
3723 self.cfg.ReleaseDRBDMinors(instance)
3725 if self.op.wait_for_sync:
3726 disk_abort = not _WaitForSync(self, iobj)
3727 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3728 # make sure the disks are not degraded (still sync-ing is ok)
3730 feedback_fn("* checking mirrors status")
3731 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3736 _RemoveDisks(self, iobj)
3737 self.cfg.RemoveInstance(iobj.name)
3738 # Make sure the instance lock gets removed
3739 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3740 raise errors.OpExecError("There are some degraded disks for"
3743 feedback_fn("creating os for instance %s on node %s" %
3744 (instance, pnode_name))
3746 if iobj.disk_template != constants.DT_DISKLESS:
3747 if self.op.mode == constants.INSTANCE_CREATE:
3748 feedback_fn("* running the instance OS create scripts...")
3749 if not self.rpc.call_instance_os_add(pnode_name, iobj):
3750 raise errors.OpExecError("could not add os for instance %s"
3752 (instance, pnode_name))
3754 elif self.op.mode == constants.INSTANCE_IMPORT:
3755 feedback_fn("* running the instance OS import scripts...")
3756 src_node = self.op.src_node
3757 src_images = self.src_images
3758 cluster_name = self.cfg.GetClusterName()
3759 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3760 src_node, src_images,
3762 for idx, result in enumerate(import_result):
3764 self.LogWarning("Could not image %s for on instance %s, disk %d,"
3765 " on node %s" % (src_images[idx], instance, idx,
3768 # also checked in the prereq part
3769 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3773 logging.info("Starting instance %s on node %s", instance, pnode_name)
3774 feedback_fn("* starting instance...")
3775 if not self.rpc.call_instance_start(pnode_name, iobj, None):
3776 raise errors.OpExecError("Could not start instance")
3779 class LUConnectConsole(NoHooksLU):
3780 """Connect to an instance's console.
3782 This is somewhat special in that it returns the command line that
3783 you need to run on the master node in order to connect to the
3787 _OP_REQP = ["instance_name"]
3790 def ExpandNames(self):
3791 self._ExpandAndLockInstance()
3793 def CheckPrereq(self):
3794 """Check prerequisites.
3796 This checks that the instance is in the cluster.
3799 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3800 assert self.instance is not None, \
3801 "Cannot retrieve locked instance %s" % self.op.instance_name
3803 def Exec(self, feedback_fn):
3804 """Connect to the console of an instance
3807 instance = self.instance
3808 node = instance.primary_node
3810 node_insts = self.rpc.call_instance_list([node],
3811 [instance.hypervisor])[node]
3812 if node_insts is False:
3813 raise errors.OpExecError("Can't connect to node %s." % node)
3815 if instance.name not in node_insts:
3816 raise errors.OpExecError("Instance %s is not running." % instance.name)
3818 logging.debug("Connecting to console of %s on %s", instance.name, node)
3820 hyper = hypervisor.GetHypervisor(instance.hypervisor)
3821 console_cmd = hyper.GetShellCommandForConsole(instance)
3824 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3827 class LUReplaceDisks(LogicalUnit):
3828 """Replace the disks of an instance.
3831 HPATH = "mirrors-replace"
3832 HTYPE = constants.HTYPE_INSTANCE
3833 _OP_REQP = ["instance_name", "mode", "disks"]
3836 def ExpandNames(self):
3837 self._ExpandAndLockInstance()
3839 if not hasattr(self.op, "remote_node"):
3840 self.op.remote_node = None
3842 ia_name = getattr(self.op, "iallocator", None)
3843 if ia_name is not None:
3844 if self.op.remote_node is not None:
3845 raise errors.OpPrereqError("Give either the iallocator or the new"
3846 " secondary, not both")
3847 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3848 elif self.op.remote_node is not None:
3849 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3850 if remote_node is None:
3851 raise errors.OpPrereqError("Node '%s' not known" %
3852 self.op.remote_node)
3853 self.op.remote_node = remote_node
3854 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3855 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3857 self.needed_locks[locking.LEVEL_NODE] = []
3858 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3860 def DeclareLocks(self, level):
3861 # If we're not already locking all nodes in the set we have to declare the
3862 # instance's primary/secondary nodes.
3863 if (level == locking.LEVEL_NODE and
3864 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3865 self._LockInstancesNodes()
3867 def _RunAllocator(self):
3868 """Compute a new secondary node using an IAllocator.
3871 ial = IAllocator(self,
3872 mode=constants.IALLOCATOR_MODE_RELOC,
3873 name=self.op.instance_name,
3874 relocate_from=[self.sec_node])
3876 ial.Run(self.op.iallocator)
3879 raise errors.OpPrereqError("Can't compute nodes using"
3880 " iallocator '%s': %s" % (self.op.iallocator,
3882 if len(ial.nodes) != ial.required_nodes:
3883 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3884 " of nodes (%s), required %s" %
3885 (len(ial.nodes), ial.required_nodes))
3886 self.op.remote_node = ial.nodes[0]
3887 self.LogInfo("Selected new secondary for the instance: %s",
3888 self.op.remote_node)
3890 def BuildHooksEnv(self):
3893 This runs on the master, the primary and all the secondaries.
3897 "MODE": self.op.mode,
3898 "NEW_SECONDARY": self.op.remote_node,
3899 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3901 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3903 self.cfg.GetMasterNode(),
3904 self.instance.primary_node,
3906 if self.op.remote_node is not None:
3907 nl.append(self.op.remote_node)
3910 def CheckPrereq(self):
3911 """Check prerequisites.
3913 This checks that the instance is in the cluster.
3916 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3917 assert instance is not None, \
3918 "Cannot retrieve locked instance %s" % self.op.instance_name
3919 self.instance = instance
3921 if instance.disk_template not in constants.DTS_NET_MIRROR:
3922 raise errors.OpPrereqError("Instance's disk layout is not"
3923 " network mirrored.")
3925 if len(instance.secondary_nodes) != 1:
3926 raise errors.OpPrereqError("The instance has a strange layout,"
3927 " expected one secondary but found %d" %
3928 len(instance.secondary_nodes))
3930 self.sec_node = instance.secondary_nodes[0]
3932 ia_name = getattr(self.op, "iallocator", None)
3933 if ia_name is not None:
3934 self._RunAllocator()
3936 remote_node = self.op.remote_node
3937 if remote_node is not None:
3938 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3939 assert self.remote_node_info is not None, \
3940 "Cannot retrieve locked node %s" % remote_node
3942 self.remote_node_info = None
3943 if remote_node == instance.primary_node:
3944 raise errors.OpPrereqError("The specified node is the primary node of"
3946 elif remote_node == self.sec_node:
3947 if self.op.mode == constants.REPLACE_DISK_SEC:
3948 # this is for DRBD8, where we can't execute the same mode of
3949 # replacement as for drbd7 (no different port allocated)
3950 raise errors.OpPrereqError("Same secondary given, cannot execute"
3952 if instance.disk_template == constants.DT_DRBD8:
3953 if (self.op.mode == constants.REPLACE_DISK_ALL and
3954 remote_node is not None):
3955 # switch to replace secondary mode
3956 self.op.mode = constants.REPLACE_DISK_SEC
3958 if self.op.mode == constants.REPLACE_DISK_ALL:
3959 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3960 " secondary disk replacement, not"
3962 elif self.op.mode == constants.REPLACE_DISK_PRI:
3963 if remote_node is not None:
3964 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3965 " the secondary while doing a primary"
3966 " node disk replacement")
3967 self.tgt_node = instance.primary_node
3968 self.oth_node = instance.secondary_nodes[0]
3969 elif self.op.mode == constants.REPLACE_DISK_SEC:
3970 self.new_node = remote_node # this can be None, in which case
3971 # we don't change the secondary
3972 self.tgt_node = instance.secondary_nodes[0]
3973 self.oth_node = instance.primary_node
3975 raise errors.ProgrammerError("Unhandled disk replace mode")
3977 for name in self.op.disks:
3978 if instance.FindDisk(name) is None:
3979 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3980 (name, instance.name))
3982 def _ExecD8DiskOnly(self, feedback_fn):
3983 """Replace a disk on the primary or secondary for dbrd8.
3985 The algorithm for replace is quite complicated:
3986 - for each disk to be replaced:
3987 - create new LVs on the target node with unique names
3988 - detach old LVs from the drbd device
3989 - rename old LVs to name_replaced.<time_t>
3990 - rename new LVs to old LVs
3991 - attach the new LVs (with the old names now) to the drbd device
3992 - wait for sync across all devices
3993 - for each modified disk:
3994 - remove old LVs (which have the name name_replaces.<time_t>)
3996 Failures are not very well handled.
4000 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4001 instance = self.instance
4003 vgname = self.cfg.GetVGName()
4006 tgt_node = self.tgt_node
4007 oth_node = self.oth_node
4009 # Step: check device activation
4010 self.proc.LogStep(1, steps_total, "check device existence")
4011 info("checking volume groups")
4012 my_vg = cfg.GetVGName()
4013 results = self.rpc.call_vg_list([oth_node, tgt_node])
4015 raise errors.OpExecError("Can't list volume groups on the nodes")
4016 for node in oth_node, tgt_node:
4017 res = results.get(node, False)
4018 if not res or my_vg not in res:
4019 raise errors.OpExecError("Volume group '%s' not found on %s" %
4021 for dev in instance.disks:
4022 if not dev.iv_name in self.op.disks:
4024 for node in tgt_node, oth_node:
4025 info("checking %s on %s" % (dev.iv_name, node))
4026 cfg.SetDiskID(dev, node)
4027 if not self.rpc.call_blockdev_find(node, dev):
4028 raise errors.OpExecError("Can't find device %s on node %s" %
4029 (dev.iv_name, node))
4031 # Step: check other node consistency
4032 self.proc.LogStep(2, steps_total, "check peer consistency")
4033 for dev in instance.disks:
4034 if not dev.iv_name in self.op.disks:
4036 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4037 if not _CheckDiskConsistency(self, dev, oth_node,
4038 oth_node==instance.primary_node):
4039 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4040 " to replace disks on this node (%s)" %
4041 (oth_node, tgt_node))
4043 # Step: create new storage
4044 self.proc.LogStep(3, steps_total, "allocate new storage")
4045 for dev in instance.disks:
4046 if not dev.iv_name in self.op.disks:
4049 cfg.SetDiskID(dev, tgt_node)
4050 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4051 names = _GenerateUniqueNames(self, lv_names)
4052 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4053 logical_id=(vgname, names[0]))
4054 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4055 logical_id=(vgname, names[1]))
4056 new_lvs = [lv_data, lv_meta]
4057 old_lvs = dev.children
4058 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4059 info("creating new local storage on %s for %s" %
4060 (tgt_node, dev.iv_name))
4061 # since we *always* want to create this LV, we use the
4062 # _Create...OnPrimary (which forces the creation), even if we
4063 # are talking about the secondary node
4064 for new_lv in new_lvs:
4065 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4066 _GetInstanceInfoText(instance)):
4067 raise errors.OpExecError("Failed to create new LV named '%s' on"
4069 (new_lv.logical_id[1], tgt_node))
4071 # Step: for each lv, detach+rename*2+attach
4072 self.proc.LogStep(4, steps_total, "change drbd configuration")
4073 for dev, old_lvs, new_lvs in iv_names.itervalues():
4074 info("detaching %s drbd from local storage" % dev.iv_name)
4075 if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4076 raise errors.OpExecError("Can't detach drbd from local storage on node"
4077 " %s for device %s" % (tgt_node, dev.iv_name))
4079 #cfg.Update(instance)
4081 # ok, we created the new LVs, so now we know we have the needed
4082 # storage; as such, we proceed on the target node to rename
4083 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4084 # using the assumption that logical_id == physical_id (which in
4085 # turn is the unique_id on that node)
4087 # FIXME(iustin): use a better name for the replaced LVs
4088 temp_suffix = int(time.time())
4089 ren_fn = lambda d, suff: (d.physical_id[0],
4090 d.physical_id[1] + "_replaced-%s" % suff)
4091 # build the rename list based on what LVs exist on the node
4093 for to_ren in old_lvs:
4094 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4095 if find_res is not None: # device exists
4096 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4098 info("renaming the old LVs on the target node")
4099 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4100 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4101 # now we rename the new LVs to the old LVs
4102 info("renaming the new LVs on the target node")
4103 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4104 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4105 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4107 for old, new in zip(old_lvs, new_lvs):
4108 new.logical_id = old.logical_id
4109 cfg.SetDiskID(new, tgt_node)
4111 for disk in old_lvs:
4112 disk.logical_id = ren_fn(disk, temp_suffix)
4113 cfg.SetDiskID(disk, tgt_node)
4115 # now that the new lvs have the old name, we can add them to the device
4116 info("adding new mirror component on %s" % tgt_node)
4117 if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4118 for new_lv in new_lvs:
4119 if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4120 warning("Can't rollback device %s", hint="manually cleanup unused"
4122 raise errors.OpExecError("Can't add local storage to drbd")
4124 dev.children = new_lvs
4125 cfg.Update(instance)
4127 # Step: wait for sync
4129 # this can fail as the old devices are degraded and _WaitForSync
4130 # does a combined result over all disks, so we don't check its
4132 self.proc.LogStep(5, steps_total, "sync devices")
4133 _WaitForSync(self, instance, unlock=True)
4135 # so check manually all the devices
4136 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4137 cfg.SetDiskID(dev, instance.primary_node)
4138 is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4140 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4142 # Step: remove old storage
4143 self.proc.LogStep(6, steps_total, "removing old storage")
4144 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4145 info("remove logical volumes for %s" % name)
4147 cfg.SetDiskID(lv, tgt_node)
4148 if not self.rpc.call_blockdev_remove(tgt_node, lv):
4149 warning("Can't remove old LV", hint="manually remove unused LVs")
4152 def _ExecD8Secondary(self, feedback_fn):
4153 """Replace the secondary node for drbd8.
4155 The algorithm for replace is quite complicated:
4156 - for all disks of the instance:
4157 - create new LVs on the new node with same names
4158 - shutdown the drbd device on the old secondary
4159 - disconnect the drbd network on the primary
4160 - create the drbd device on the new secondary
4161 - network attach the drbd on the primary, using an artifice:
4162 the drbd code for Attach() will connect to the network if it
4163 finds a device which is connected to the good local disks but
4165 - wait for sync across all devices
4166 - remove all disks from the old secondary
4168 Failures are not very well handled.
4172 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4173 instance = self.instance
4175 vgname = self.cfg.GetVGName()
4178 old_node = self.tgt_node
4179 new_node = self.new_node
4180 pri_node = instance.primary_node
4182 # Step: check device activation
4183 self.proc.LogStep(1, steps_total, "check device existence")
4184 info("checking volume groups")
4185 my_vg = cfg.GetVGName()
4186 results = self.rpc.call_vg_list([pri_node, new_node])
4188 raise errors.OpExecError("Can't list volume groups on the nodes")
4189 for node in pri_node, new_node:
4190 res = results.get(node, False)
4191 if not res or my_vg not in res:
4192 raise errors.OpExecError("Volume group '%s' not found on %s" %
4194 for dev in instance.disks:
4195 if not dev.iv_name in self.op.disks:
4197 info("checking %s on %s" % (dev.iv_name, pri_node))
4198 cfg.SetDiskID(dev, pri_node)
4199 if not self.rpc.call_blockdev_find(pri_node, dev):
4200 raise errors.OpExecError("Can't find device %s on node %s" %
4201 (dev.iv_name, pri_node))
4203 # Step: check other node consistency
4204 self.proc.LogStep(2, steps_total, "check peer consistency")
4205 for dev in instance.disks:
4206 if not dev.iv_name in self.op.disks:
4208 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4209 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4210 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4211 " unsafe to replace the secondary" %
4214 # Step: create new storage
4215 self.proc.LogStep(3, steps_total, "allocate new storage")
4216 for dev in instance.disks:
4218 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4219 # since we *always* want to create this LV, we use the
4220 # _Create...OnPrimary (which forces the creation), even if we
4221 # are talking about the secondary node
4222 for new_lv in dev.children:
4223 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4224 _GetInstanceInfoText(instance)):
4225 raise errors.OpExecError("Failed to create new LV named '%s' on"
4227 (new_lv.logical_id[1], new_node))
4230 # Step 4: dbrd minors and drbd setups changes
4231 # after this, we must manually remove the drbd minors on both the
4232 # error and the success paths
4233 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4235 logging.debug("Allocated minors %s" % (minors,))
4236 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4237 for dev, new_minor in zip(instance.disks, minors):
4239 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4240 # create new devices on new_node
4241 if pri_node == dev.logical_id[0]:
4242 new_logical_id = (pri_node, new_node,
4243 dev.logical_id[2], dev.logical_id[3], new_minor,
4246 new_logical_id = (new_node, pri_node,
4247 dev.logical_id[2], new_minor, dev.logical_id[4],
4249 iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4250 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4252 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4253 logical_id=new_logical_id,
4254 children=dev.children)
4255 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4257 _GetInstanceInfoText(instance)):
4258 self.cfg.ReleaseDRBDMinors(instance.name)
4259 raise errors.OpExecError("Failed to create new DRBD on"
4260 " node '%s'" % new_node)
4262 for dev in instance.disks:
4263 # we have new devices, shutdown the drbd on the old secondary
4264 info("shutting down drbd for %s on old node" % dev.iv_name)
4265 cfg.SetDiskID(dev, old_node)
4266 if not self.rpc.call_blockdev_shutdown(old_node, dev):
4267 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4268 hint="Please cleanup this device manually as soon as possible")
4270 info("detaching primary drbds from the network (=> standalone)")
4272 for dev in instance.disks:
4273 cfg.SetDiskID(dev, pri_node)
4274 # set the network part of the physical (unique in bdev terms) id
4275 # to None, meaning detach from network
4276 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4277 # and 'find' the device, which will 'fix' it to match the
4279 if self.rpc.call_blockdev_find(pri_node, dev):
4282 warning("Failed to detach drbd %s from network, unusual case" %
4286 # no detaches succeeded (very unlikely)
4287 self.cfg.ReleaseDRBDMinors(instance.name)
4288 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4290 # if we managed to detach at least one, we update all the disks of
4291 # the instance to point to the new secondary
4292 info("updating instance configuration")
4293 for dev, _, new_logical_id in iv_names.itervalues():
4294 dev.logical_id = new_logical_id
4295 cfg.SetDiskID(dev, pri_node)
4296 cfg.Update(instance)
4297 # we can remove now the temp minors as now the new values are
4298 # written to the config file (and therefore stable)
4299 self.cfg.ReleaseDRBDMinors(instance.name)
4301 # and now perform the drbd attach
4302 info("attaching primary drbds to new secondary (standalone => connected)")
4304 for dev in instance.disks:
4305 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4306 # since the attach is smart, it's enough to 'find' the device,
4307 # it will automatically activate the network, if the physical_id
4309 cfg.SetDiskID(dev, pri_node)
4310 logging.debug("Disk to attach: %s", dev)
4311 if not self.rpc.call_blockdev_find(pri_node, dev):
4312 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4313 "please do a gnt-instance info to see the status of disks")
4315 # this can fail as the old devices are degraded and _WaitForSync
4316 # does a combined result over all disks, so we don't check its
4318 self.proc.LogStep(5, steps_total, "sync devices")
4319 _WaitForSync(self, instance, unlock=True)
4321 # so check manually all the devices
4322 for name, (dev, old_lvs, _) in iv_names.iteritems():
4323 cfg.SetDiskID(dev, pri_node)
4324 is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4326 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4328 self.proc.LogStep(6, steps_total, "removing old storage")
4329 for name, (dev, old_lvs, _) in iv_names.iteritems():
4330 info("remove logical volumes for %s" % name)
4332 cfg.SetDiskID(lv, old_node)
4333 if not self.rpc.call_blockdev_remove(old_node, lv):
4334 warning("Can't remove LV on old secondary",
4335 hint="Cleanup stale volumes by hand")
4337 def Exec(self, feedback_fn):
4338 """Execute disk replacement.
4340 This dispatches the disk replacement to the appropriate handler.
4343 instance = self.instance
4345 # Activate the instance disks if we're replacing them on a down instance
4346 if instance.status == "down":
4347 _StartInstanceDisks(self, instance, True)
4349 if instance.disk_template == constants.DT_DRBD8:
4350 if self.op.remote_node is None:
4351 fn = self._ExecD8DiskOnly
4353 fn = self._ExecD8Secondary
4355 raise errors.ProgrammerError("Unhandled disk replacement case")
4357 ret = fn(feedback_fn)
4359 # Deactivate the instance disks if we're replacing them on a down instance
4360 if instance.status == "down":
4361 _SafeShutdownInstanceDisks(self, instance)
4366 class LUGrowDisk(LogicalUnit):
4367 """Grow a disk of an instance.
4371 HTYPE = constants.HTYPE_INSTANCE
4372 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4375 def ExpandNames(self):
4376 self._ExpandAndLockInstance()
4377 self.needed_locks[locking.LEVEL_NODE] = []
4378 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4380 def DeclareLocks(self, level):
4381 if level == locking.LEVEL_NODE:
4382 self._LockInstancesNodes()
4384 def BuildHooksEnv(self):
4387 This runs on the master, the primary and all the secondaries.
4391 "DISK": self.op.disk,
4392 "AMOUNT": self.op.amount,
4394 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4396 self.cfg.GetMasterNode(),
4397 self.instance.primary_node,
4401 def CheckPrereq(self):
4402 """Check prerequisites.
4404 This checks that the instance is in the cluster.
4407 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4408 assert instance is not None, \
4409 "Cannot retrieve locked instance %s" % self.op.instance_name
4411 self.instance = instance
4413 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4414 raise errors.OpPrereqError("Instance's disk layout does not support"
4417 if instance.FindDisk(self.op.disk) is None:
4418 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4419 (self.op.disk, instance.name))
4421 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4422 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4423 instance.hypervisor)
4424 for node in nodenames:
4425 info = nodeinfo.get(node, None)
4427 raise errors.OpPrereqError("Cannot get current information"
4428 " from node '%s'" % node)
4429 vg_free = info.get('vg_free', None)
4430 if not isinstance(vg_free, int):
4431 raise errors.OpPrereqError("Can't compute free disk space on"
4433 if self.op.amount > info['vg_free']:
4434 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4435 " %d MiB available, %d MiB required" %
4436 (node, info['vg_free'], self.op.amount))
4438 def Exec(self, feedback_fn):
4439 """Execute disk grow.
4442 instance = self.instance
4443 disk = instance.FindDisk(self.op.disk)
4444 for node in (instance.secondary_nodes + (instance.primary_node,)):
4445 self.cfg.SetDiskID(disk, node)
4446 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4447 if (not result or not isinstance(result, (list, tuple)) or
4449 raise errors.OpExecError("grow request failed to node %s" % node)
4451 raise errors.OpExecError("grow request failed to node %s: %s" %
4453 disk.RecordGrow(self.op.amount)
4454 self.cfg.Update(instance)
4455 if self.op.wait_for_sync:
4456 disk_abort = not _WaitForSync(self, instance)
4458 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4459 " status.\nPlease check the instance.")
4462 class LUQueryInstanceData(NoHooksLU):
4463 """Query runtime instance data.
4466 _OP_REQP = ["instances", "static"]
4469 def ExpandNames(self):
4470 self.needed_locks = {}
4471 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4473 if not isinstance(self.op.instances, list):
4474 raise errors.OpPrereqError("Invalid argument type 'instances'")
4476 if self.op.instances:
4477 self.wanted_names = []
4478 for name in self.op.instances:
4479 full_name = self.cfg.ExpandInstanceName(name)
4480 if full_name is None:
4481 raise errors.OpPrereqError("Instance '%s' not known" %
4482 self.op.instance_name)
4483 self.wanted_names.append(full_name)
4484 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4486 self.wanted_names = None
4487 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4489 self.needed_locks[locking.LEVEL_NODE] = []
4490 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4492 def DeclareLocks(self, level):
4493 if level == locking.LEVEL_NODE:
4494 self._LockInstancesNodes()
4496 def CheckPrereq(self):
4497 """Check prerequisites.
4499 This only checks the optional instance list against the existing names.
4502 if self.wanted_names is None:
4503 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4505 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4506 in self.wanted_names]
4509 def _ComputeDiskStatus(self, instance, snode, dev):
4510 """Compute block device status.
4513 static = self.op.static
4515 self.cfg.SetDiskID(dev, instance.primary_node)
4516 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4520 if dev.dev_type in constants.LDS_DRBD:
4521 # we change the snode then (otherwise we use the one passed in)
4522 if dev.logical_id[0] == instance.primary_node:
4523 snode = dev.logical_id[1]
4525 snode = dev.logical_id[0]
4527 if snode and not static:
4528 self.cfg.SetDiskID(dev, snode)
4529 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4534 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4535 for child in dev.children]
4540 "iv_name": dev.iv_name,
4541 "dev_type": dev.dev_type,
4542 "logical_id": dev.logical_id,
4543 "physical_id": dev.physical_id,
4544 "pstatus": dev_pstatus,
4545 "sstatus": dev_sstatus,
4546 "children": dev_children,
4551 def Exec(self, feedback_fn):
4552 """Gather and return data"""
4555 cluster = self.cfg.GetClusterInfo()
4557 for instance in self.wanted_instances:
4558 if not self.op.static:
4559 remote_info = self.rpc.call_instance_info(instance.primary_node,
4561 instance.hypervisor)
4562 if remote_info and "state" in remote_info:
4565 remote_state = "down"
4568 if instance.status == "down":
4569 config_state = "down"
4573 disks = [self._ComputeDiskStatus(instance, None, device)
4574 for device in instance.disks]
4577 "name": instance.name,
4578 "config_state": config_state,
4579 "run_state": remote_state,
4580 "pnode": instance.primary_node,
4581 "snodes": instance.secondary_nodes,
4583 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4585 "hypervisor": instance.hypervisor,
4586 "network_port": instance.network_port,
4587 "hv_instance": instance.hvparams,
4588 "hv_actual": cluster.FillHV(instance),
4589 "be_instance": instance.beparams,
4590 "be_actual": cluster.FillBE(instance),
4593 result[instance.name] = idict
4598 class LUSetInstanceParams(LogicalUnit):
4599 """Modifies an instances's parameters.
4602 HPATH = "instance-modify"
4603 HTYPE = constants.HTYPE_INSTANCE
4604 _OP_REQP = ["instance_name", "hvparams"]
4607 def ExpandNames(self):
4608 self._ExpandAndLockInstance()
4609 self.needed_locks[locking.LEVEL_NODE] = []
4610 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4613 def DeclareLocks(self, level):
4614 if level == locking.LEVEL_NODE:
4615 self._LockInstancesNodes()
4617 def BuildHooksEnv(self):
4620 This runs on the master, primary and secondaries.
4624 if constants.BE_MEMORY in self.be_new:
4625 args['memory'] = self.be_new[constants.BE_MEMORY]
4626 if constants.BE_VCPUS in self.be_new:
4627 args['vcpus'] = self.be_new[constants.BE_VCPUS]
4628 if self.do_ip or self.do_bridge or self.mac:
4632 ip = self.instance.nics[0].ip
4634 bridge = self.bridge
4636 bridge = self.instance.nics[0].bridge
4640 mac = self.instance.nics[0].mac
4641 args['nics'] = [(ip, bridge, mac)]
4642 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4643 nl = [self.cfg.GetMasterNode(),
4644 self.instance.primary_node] + list(self.instance.secondary_nodes)
4647 def CheckPrereq(self):
4648 """Check prerequisites.
4650 This only checks the instance list against the existing names.
4653 # FIXME: all the parameters could be checked before, in ExpandNames, or in
4654 # a separate CheckArguments function, if we implement one, so the operation
4655 # can be aborted without waiting for any lock, should it have an error...
4656 self.ip = getattr(self.op, "ip", None)
4657 self.mac = getattr(self.op, "mac", None)
4658 self.bridge = getattr(self.op, "bridge", None)
4659 self.kernel_path = getattr(self.op, "kernel_path", None)
4660 self.initrd_path = getattr(self.op, "initrd_path", None)
4661 self.force = getattr(self.op, "force", None)
4662 all_parms = [self.ip, self.bridge, self.mac]
4663 if (all_parms.count(None) == len(all_parms) and
4664 not self.op.hvparams and
4665 not self.op.beparams):
4666 raise errors.OpPrereqError("No changes submitted")
4667 for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4668 val = self.op.beparams.get(item, None)
4672 except ValueError, err:
4673 raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4674 self.op.beparams[item] = val
4675 if self.ip is not None:
4677 if self.ip.lower() == "none":
4680 if not utils.IsValidIP(self.ip):
4681 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4684 self.do_bridge = (self.bridge is not None)
4685 if self.mac is not None:
4686 if self.cfg.IsMacInUse(self.mac):
4687 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4689 if not utils.IsValidMac(self.mac):
4690 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4692 # checking the new params on the primary/secondary nodes
4694 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4695 assert self.instance is not None, \
4696 "Cannot retrieve locked instance %s" % self.op.instance_name
4697 pnode = self.instance.primary_node
4699 nodelist.extend(instance.secondary_nodes)
4701 # hvparams processing
4702 if self.op.hvparams:
4703 i_hvdict = copy.deepcopy(instance.hvparams)
4704 for key, val in self.op.hvparams.iteritems():
4712 cluster = self.cfg.GetClusterInfo()
4713 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4716 hypervisor.GetHypervisor(
4717 instance.hypervisor).CheckParameterSyntax(hv_new)
4718 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4719 self.hv_new = hv_new # the new actual values
4720 self.hv_inst = i_hvdict # the new dict (without defaults)
4722 self.hv_new = self.hv_inst = {}
4724 # beparams processing
4725 if self.op.beparams:
4726 i_bedict = copy.deepcopy(instance.beparams)
4727 for key, val in self.op.beparams.iteritems():
4735 cluster = self.cfg.GetClusterInfo()
4736 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4738 self.be_new = be_new # the new actual values
4739 self.be_inst = i_bedict # the new dict (without defaults)
4741 self.hv_new = self.hv_inst = {}
4745 if constants.BE_MEMORY in self.op.beparams and not self.force:
4746 mem_check_list = [pnode]
4747 if be_new[constants.BE_AUTO_BALANCE]:
4748 # either we changed auto_balance to yes or it was from before
4749 mem_check_list.extend(instance.secondary_nodes)
4750 instance_info = self.rpc.call_instance_info(pnode, instance.name,
4751 instance.hypervisor)
4752 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4753 instance.hypervisor)
4755 if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4756 # Assume the primary node is unreachable and go ahead
4757 self.warn.append("Can't get info from primary node %s" % pnode)
4760 current_mem = instance_info['memory']
4762 # Assume instance not running
4763 # (there is a slight race condition here, but it's not very probable,
4764 # and we have no other way to check)
4766 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4767 nodeinfo[pnode]['memory_free'])
4769 raise errors.OpPrereqError("This change will prevent the instance"
4770 " from starting, due to %d MB of memory"
4771 " missing on its primary node" % miss_mem)
4773 if be_new[constants.BE_AUTO_BALANCE]:
4774 for node in instance.secondary_nodes:
4775 if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4776 self.warn.append("Can't get info from secondary node %s" % node)
4777 elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4778 self.warn.append("Not enough memory to failover instance to"
4779 " secondary node %s" % node)
4783 def Exec(self, feedback_fn):
4784 """Modifies an instance.
4786 All parameters take effect only at the next restart of the instance.
4788 # Process here the warnings from CheckPrereq, as we don't have a
4789 # feedback_fn there.
4790 for warn in self.warn:
4791 feedback_fn("WARNING: %s" % warn)
4794 instance = self.instance
4796 instance.nics[0].ip = self.ip
4797 result.append(("ip", self.ip))
4799 instance.nics[0].bridge = self.bridge
4800 result.append(("bridge", self.bridge))
4802 instance.nics[0].mac = self.mac
4803 result.append(("mac", self.mac))
4804 if self.op.hvparams:
4805 instance.hvparams = self.hv_new
4806 for key, val in self.op.hvparams.iteritems():
4807 result.append(("hv/%s" % key, val))
4808 if self.op.beparams:
4809 instance.beparams = self.be_inst
4810 for key, val in self.op.beparams.iteritems():
4811 result.append(("be/%s" % key, val))
4813 self.cfg.Update(instance)
4818 class LUQueryExports(NoHooksLU):
4819 """Query the exports list
4822 _OP_REQP = ['nodes']
4825 def ExpandNames(self):
4826 self.needed_locks = {}
4827 self.share_locks[locking.LEVEL_NODE] = 1
4828 if not self.op.nodes:
4829 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4831 self.needed_locks[locking.LEVEL_NODE] = \
4832 _GetWantedNodes(self, self.op.nodes)
4834 def CheckPrereq(self):
4835 """Check prerequisites.
4838 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4840 def Exec(self, feedback_fn):
4841 """Compute the list of all the exported system images.
4844 a dictionary with the structure node->(export-list)
4845 where export-list is a list of the instances exported on
4849 return self.rpc.call_export_list(self.nodes)
4852 class LUExportInstance(LogicalUnit):
4853 """Export an instance to an image in the cluster.
4856 HPATH = "instance-export"
4857 HTYPE = constants.HTYPE_INSTANCE
4858 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4861 def ExpandNames(self):
4862 self._ExpandAndLockInstance()
4863 # FIXME: lock only instance primary and destination node
4865 # Sad but true, for now we have do lock all nodes, as we don't know where
4866 # the previous export might be, and and in this LU we search for it and
4867 # remove it from its current node. In the future we could fix this by:
4868 # - making a tasklet to search (share-lock all), then create the new one,
4869 # then one to remove, after
4870 # - removing the removal operation altoghether
4871 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4873 def DeclareLocks(self, level):
4874 """Last minute lock declaration."""
4875 # All nodes are locked anyway, so nothing to do here.
4877 def BuildHooksEnv(self):
4880 This will run on the master, primary node and target node.
4884 "EXPORT_NODE": self.op.target_node,
4885 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4887 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4888 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4889 self.op.target_node]
4892 def CheckPrereq(self):
4893 """Check prerequisites.
4895 This checks that the instance and node names are valid.
4898 instance_name = self.op.instance_name
4899 self.instance = self.cfg.GetInstanceInfo(instance_name)
4900 assert self.instance is not None, \
4901 "Cannot retrieve locked instance %s" % self.op.instance_name
4903 self.dst_node = self.cfg.GetNodeInfo(
4904 self.cfg.ExpandNodeName(self.op.target_node))
4906 assert self.dst_node is not None, \
4907 "Cannot retrieve locked node %s" % self.op.target_node
4909 # instance disk type verification
4910 for disk in self.instance.disks:
4911 if disk.dev_type == constants.LD_FILE:
4912 raise errors.OpPrereqError("Export not supported for instances with"
4913 " file-based disks")
4915 def Exec(self, feedback_fn):
4916 """Export an instance to an image in the cluster.
4919 instance = self.instance
4920 dst_node = self.dst_node
4921 src_node = instance.primary_node
4922 if self.op.shutdown:
4923 # shutdown the instance, but not the disks
4924 if not self.rpc.call_instance_shutdown(src_node, instance):
4925 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4926 (instance.name, src_node))
4928 vgname = self.cfg.GetVGName()
4933 for disk in instance.disks:
4934 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4935 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4937 if not new_dev_name:
4938 self.LogWarning("Could not snapshot block device %s on node %s",
4939 disk.logical_id[1], src_node)
4940 snap_disks.append(False)
4942 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4943 logical_id=(vgname, new_dev_name),
4944 physical_id=(vgname, new_dev_name),
4945 iv_name=disk.iv_name)
4946 snap_disks.append(new_dev)
4949 if self.op.shutdown and instance.status == "up":
4950 if not self.rpc.call_instance_start(src_node, instance, None):
4951 _ShutdownInstanceDisks(self, instance)
4952 raise errors.OpExecError("Could not start instance")
4954 # TODO: check for size
4956 cluster_name = self.cfg.GetClusterName()
4957 for idx, dev in enumerate(snap_disks):
4959 if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4960 instance, cluster_name, idx):
4961 self.LogWarning("Could not export block device %s from node %s to"
4962 " node %s", dev.logical_id[1], src_node,
4964 if not self.rpc.call_blockdev_remove(src_node, dev):
4965 self.LogWarning("Could not remove snapshot block device %s from node"
4966 " %s", dev.logical_id[1], src_node)
4968 if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4969 self.LogWarning("Could not finalize export for instance %s on node %s",
4970 instance.name, dst_node.name)
4972 nodelist = self.cfg.GetNodeList()
4973 nodelist.remove(dst_node.name)
4975 # on one-node clusters nodelist will be empty after the removal
4976 # if we proceed the backup would be removed because OpQueryExports
4977 # substitutes an empty list with the full cluster node list.
4979 exportlist = self.rpc.call_export_list(nodelist)
4980 for node in exportlist:
4981 if instance.name in exportlist[node]:
4982 if not self.rpc.call_export_remove(node, instance.name):
4983 self.LogWarning("Could not remove older export for instance %s"
4984 " on node %s", instance.name, node)
4987 class LURemoveExport(NoHooksLU):
4988 """Remove exports related to the named instance.
4991 _OP_REQP = ["instance_name"]
4994 def ExpandNames(self):
4995 self.needed_locks = {}
4996 # We need all nodes to be locked in order for RemoveExport to work, but we
4997 # don't need to lock the instance itself, as nothing will happen to it (and
4998 # we can remove exports also for a removed instance)
4999 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5001 def CheckPrereq(self):
5002 """Check prerequisites.
5006 def Exec(self, feedback_fn):
5007 """Remove any export.
5010 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5011 # If the instance was not found we'll try with the name that was passed in.
5012 # This will only work if it was an FQDN, though.
5014 if not instance_name:
5016 instance_name = self.op.instance_name
5018 exportlist = self.rpc.call_export_list(self.acquired_locks[
5019 locking.LEVEL_NODE])
5021 for node in exportlist:
5022 if instance_name in exportlist[node]:
5024 if not self.rpc.call_export_remove(node, instance_name):
5025 logging.error("Could not remove export for instance %s"
5026 " on node %s", instance_name, node)
5028 if fqdn_warn and not found:
5029 feedback_fn("Export not found. If trying to remove an export belonging"
5030 " to a deleted instance please use its Fully Qualified"
5034 class TagsLU(NoHooksLU):
5037 This is an abstract class which is the parent of all the other tags LUs.
5041 def ExpandNames(self):
5042 self.needed_locks = {}
5043 if self.op.kind == constants.TAG_NODE:
5044 name = self.cfg.ExpandNodeName(self.op.name)
5046 raise errors.OpPrereqError("Invalid node name (%s)" %
5049 self.needed_locks[locking.LEVEL_NODE] = name
5050 elif self.op.kind == constants.TAG_INSTANCE:
5051 name = self.cfg.ExpandInstanceName(self.op.name)
5053 raise errors.OpPrereqError("Invalid instance name (%s)" %
5056 self.needed_locks[locking.LEVEL_INSTANCE] = name
5058 def CheckPrereq(self):
5059 """Check prerequisites.
5062 if self.op.kind == constants.TAG_CLUSTER:
5063 self.target = self.cfg.GetClusterInfo()
5064 elif self.op.kind == constants.TAG_NODE:
5065 self.target = self.cfg.GetNodeInfo(self.op.name)
5066 elif self.op.kind == constants.TAG_INSTANCE:
5067 self.target = self.cfg.GetInstanceInfo(self.op.name)
5069 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5073 class LUGetTags(TagsLU):
5074 """Returns the tags of a given object.
5077 _OP_REQP = ["kind", "name"]
5080 def Exec(self, feedback_fn):
5081 """Returns the tag list.
5084 return list(self.target.GetTags())
5087 class LUSearchTags(NoHooksLU):
5088 """Searches the tags for a given pattern.
5091 _OP_REQP = ["pattern"]
5094 def ExpandNames(self):
5095 self.needed_locks = {}
5097 def CheckPrereq(self):
5098 """Check prerequisites.
5100 This checks the pattern passed for validity by compiling it.
5104 self.re = re.compile(self.op.pattern)
5105 except re.error, err:
5106 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5107 (self.op.pattern, err))
5109 def Exec(self, feedback_fn):
5110 """Returns the tag list.
5114 tgts = [("/cluster", cfg.GetClusterInfo())]
5115 ilist = cfg.GetAllInstancesInfo().values()
5116 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5117 nlist = cfg.GetAllNodesInfo().values()
5118 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5120 for path, target in tgts:
5121 for tag in target.GetTags():
5122 if self.re.search(tag):
5123 results.append((path, tag))
5127 class LUAddTags(TagsLU):
5128 """Sets a tag on a given object.
5131 _OP_REQP = ["kind", "name", "tags"]
5134 def CheckPrereq(self):
5135 """Check prerequisites.
5137 This checks the type and length of the tag name and value.
5140 TagsLU.CheckPrereq(self)
5141 for tag in self.op.tags:
5142 objects.TaggableObject.ValidateTag(tag)
5144 def Exec(self, feedback_fn):
5149 for tag in self.op.tags:
5150 self.target.AddTag(tag)
5151 except errors.TagError, err:
5152 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5154 self.cfg.Update(self.target)
5155 except errors.ConfigurationError:
5156 raise errors.OpRetryError("There has been a modification to the"
5157 " config file and the operation has been"
5158 " aborted. Please retry.")
5161 class LUDelTags(TagsLU):
5162 """Delete a list of tags from a given object.
5165 _OP_REQP = ["kind", "name", "tags"]
5168 def CheckPrereq(self):
5169 """Check prerequisites.
5171 This checks that we have the given tag.
5174 TagsLU.CheckPrereq(self)
5175 for tag in self.op.tags:
5176 objects.TaggableObject.ValidateTag(tag)
5177 del_tags = frozenset(self.op.tags)
5178 cur_tags = self.target.GetTags()
5179 if not del_tags <= cur_tags:
5180 diff_tags = del_tags - cur_tags
5181 diff_names = ["'%s'" % tag for tag in diff_tags]
5183 raise errors.OpPrereqError("Tag(s) %s not found" %
5184 (",".join(diff_names)))
5186 def Exec(self, feedback_fn):
5187 """Remove the tag from the object.
5190 for tag in self.op.tags:
5191 self.target.RemoveTag(tag)
5193 self.cfg.Update(self.target)
5194 except errors.ConfigurationError:
5195 raise errors.OpRetryError("There has been a modification to the"
5196 " config file and the operation has been"
5197 " aborted. Please retry.")
5200 class LUTestDelay(NoHooksLU):
5201 """Sleep for a specified amount of time.
5203 This LU sleeps on the master and/or nodes for a specified amount of
5207 _OP_REQP = ["duration", "on_master", "on_nodes"]
5210 def ExpandNames(self):
5211 """Expand names and set required locks.
5213 This expands the node list, if any.
5216 self.needed_locks = {}
5217 if self.op.on_nodes:
5218 # _GetWantedNodes can be used here, but is not always appropriate to use
5219 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5221 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5222 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5224 def CheckPrereq(self):
5225 """Check prerequisites.
5229 def Exec(self, feedback_fn):
5230 """Do the actual sleep.
5233 if self.op.on_master:
5234 if not utils.TestDelay(self.op.duration):
5235 raise errors.OpExecError("Error during master delay test")
5236 if self.op.on_nodes:
5237 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5239 raise errors.OpExecError("Complete failure from rpc call")
5240 for node, node_result in result.items():
5242 raise errors.OpExecError("Failure during rpc call to node %s,"
5243 " result: %s" % (node, node_result))
5246 class IAllocator(object):
5247 """IAllocator framework.
5249 An IAllocator instance has three sets of attributes:
5250 - cfg that is needed to query the cluster
5251 - input data (all members of the _KEYS class attribute are required)
5252 - four buffer attributes (in|out_data|text), that represent the
5253 input (to the external script) in text and data structure format,
5254 and the output from it, again in two formats
5255 - the result variables from the script (success, info, nodes) for
5260 "mem_size", "disks", "disk_template",
5261 "os", "tags", "nics", "vcpus",
5267 def __init__(self, lu, mode, name, **kwargs):
5269 # init buffer variables
5270 self.in_text = self.out_text = self.in_data = self.out_data = None
5271 # init all input fields so that pylint is happy
5274 self.mem_size = self.disks = self.disk_template = None
5275 self.os = self.tags = self.nics = self.vcpus = None
5276 self.relocate_from = None
5278 self.required_nodes = None
5279 # init result fields
5280 self.success = self.info = self.nodes = None
5281 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5282 keyset = self._ALLO_KEYS
5283 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5284 keyset = self._RELO_KEYS
5286 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5287 " IAllocator" % self.mode)
5289 if key not in keyset:
5290 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5291 " IAllocator" % key)
5292 setattr(self, key, kwargs[key])
5294 if key not in kwargs:
5295 raise errors.ProgrammerError("Missing input parameter '%s' to"
5296 " IAllocator" % key)
5297 self._BuildInputData()
5299 def _ComputeClusterData(self):
5300 """Compute the generic allocator input data.
5302 This is the data that is independent of the actual operation.
5306 cluster_info = cfg.GetClusterInfo()
5310 "cluster_name": cfg.GetClusterName(),
5311 "cluster_tags": list(cluster_info.GetTags()),
5312 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5313 # we don't have job IDs
5317 cluster = self.cfg.GetClusterInfo()
5318 for iname in cfg.GetInstanceList():
5319 i_obj = cfg.GetInstanceInfo(iname)
5320 i_list.append((i_obj, cluster.FillBE(i_obj)))
5324 node_list = cfg.GetNodeList()
5325 # FIXME: here we have only one hypervisor information, but
5326 # instance can belong to different hypervisors
5327 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5328 cfg.GetHypervisorType())
5329 for nname in node_list:
5330 ninfo = cfg.GetNodeInfo(nname)
5331 if nname not in node_data or not isinstance(node_data[nname], dict):
5332 raise errors.OpExecError("Can't get data for node %s" % nname)
5333 remote_info = node_data[nname]
5334 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5335 'vg_size', 'vg_free', 'cpu_total']:
5336 if attr not in remote_info:
5337 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5340 remote_info[attr] = int(remote_info[attr])
5341 except ValueError, err:
5342 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5343 " %s" % (nname, attr, str(err)))
5344 # compute memory used by primary instances
5345 i_p_mem = i_p_up_mem = 0
5346 for iinfo, beinfo in i_list:
5347 if iinfo.primary_node == nname:
5348 i_p_mem += beinfo[constants.BE_MEMORY]
5349 if iinfo.status == "up":
5350 i_p_up_mem += beinfo[constants.BE_MEMORY]
5352 # compute memory used by instances
5354 "tags": list(ninfo.GetTags()),
5355 "total_memory": remote_info['memory_total'],
5356 "reserved_memory": remote_info['memory_dom0'],
5357 "free_memory": remote_info['memory_free'],
5358 "i_pri_memory": i_p_mem,
5359 "i_pri_up_memory": i_p_up_mem,
5360 "total_disk": remote_info['vg_size'],
5361 "free_disk": remote_info['vg_free'],
5362 "primary_ip": ninfo.primary_ip,
5363 "secondary_ip": ninfo.secondary_ip,
5364 "total_cpus": remote_info['cpu_total'],
5366 node_results[nname] = pnr
5367 data["nodes"] = node_results
5371 for iinfo, beinfo in i_list:
5372 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5373 for n in iinfo.nics]
5375 "tags": list(iinfo.GetTags()),
5376 "should_run": iinfo.status == "up",
5377 "vcpus": beinfo[constants.BE_VCPUS],
5378 "memory": beinfo[constants.BE_MEMORY],
5380 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5382 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5383 "disk_template": iinfo.disk_template,
5384 "hypervisor": iinfo.hypervisor,
5386 instance_data[iinfo.name] = pir
5388 data["instances"] = instance_data
5392 def _AddNewInstance(self):
5393 """Add new instance data to allocator structure.
5395 This in combination with _AllocatorGetClusterData will create the
5396 correct structure needed as input for the allocator.
5398 The checks for the completeness of the opcode must have already been
5403 if len(self.disks) != 2:
5404 raise errors.OpExecError("Only two-disk configurations supported")
5406 disk_space = _ComputeDiskSize(self.disk_template,
5407 self.disks[0]["size"], self.disks[1]["size"])
5409 if self.disk_template in constants.DTS_NET_MIRROR:
5410 self.required_nodes = 2
5412 self.required_nodes = 1
5416 "disk_template": self.disk_template,
5419 "vcpus": self.vcpus,
5420 "memory": self.mem_size,
5421 "disks": self.disks,
5422 "disk_space_total": disk_space,
5424 "required_nodes": self.required_nodes,
5426 data["request"] = request
5428 def _AddRelocateInstance(self):
5429 """Add relocate instance data to allocator structure.
5431 This in combination with _IAllocatorGetClusterData will create the
5432 correct structure needed as input for the allocator.
5434 The checks for the completeness of the opcode must have already been
5438 instance = self.lu.cfg.GetInstanceInfo(self.name)
5439 if instance is None:
5440 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5441 " IAllocator" % self.name)
5443 if instance.disk_template not in constants.DTS_NET_MIRROR:
5444 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5446 if len(instance.secondary_nodes) != 1:
5447 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5449 self.required_nodes = 1
5451 disk_space = _ComputeDiskSize(instance.disk_template,
5452 instance.disks[0].size,
5453 instance.disks[1].size)
5458 "disk_space_total": disk_space,
5459 "required_nodes": self.required_nodes,
5460 "relocate_from": self.relocate_from,
5462 self.in_data["request"] = request
5464 def _BuildInputData(self):
5465 """Build input data structures.
5468 self._ComputeClusterData()
5470 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5471 self._AddNewInstance()
5473 self._AddRelocateInstance()
5475 self.in_text = serializer.Dump(self.in_data)
5477 def Run(self, name, validate=True, call_fn=None):
5478 """Run an instance allocator and return the results.
5482 call_fn = self.lu.rpc.call_iallocator_runner
5485 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5487 if not isinstance(result, (list, tuple)) or len(result) != 4:
5488 raise errors.OpExecError("Invalid result from master iallocator runner")
5490 rcode, stdout, stderr, fail = result
5492 if rcode == constants.IARUN_NOTFOUND:
5493 raise errors.OpExecError("Can't find allocator '%s'" % name)
5494 elif rcode == constants.IARUN_FAILURE:
5495 raise errors.OpExecError("Instance allocator call failed: %s,"
5496 " output: %s" % (fail, stdout+stderr))
5497 self.out_text = stdout
5499 self._ValidateResult()
5501 def _ValidateResult(self):
5502 """Process the allocator results.
5504 This will process and if successful save the result in
5505 self.out_data and the other parameters.
5509 rdict = serializer.Load(self.out_text)
5510 except Exception, err:
5511 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5513 if not isinstance(rdict, dict):
5514 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5516 for key in "success", "info", "nodes":
5517 if key not in rdict:
5518 raise errors.OpExecError("Can't parse iallocator results:"
5519 " missing key '%s'" % key)
5520 setattr(self, key, rdict[key])
5522 if not isinstance(rdict["nodes"], list):
5523 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5525 self.out_data = rdict
5528 class LUTestAllocator(NoHooksLU):
5529 """Run allocator tests.
5531 This LU runs the allocator tests
5534 _OP_REQP = ["direction", "mode", "name"]
5536 def CheckPrereq(self):
5537 """Check prerequisites.
5539 This checks the opcode parameters depending on the director and mode test.
5542 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5543 for attr in ["name", "mem_size", "disks", "disk_template",
5544 "os", "tags", "nics", "vcpus"]:
5545 if not hasattr(self.op, attr):
5546 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5548 iname = self.cfg.ExpandInstanceName(self.op.name)
5549 if iname is not None:
5550 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5552 if not isinstance(self.op.nics, list):
5553 raise errors.OpPrereqError("Invalid parameter 'nics'")
5554 for row in self.op.nics:
5555 if (not isinstance(row, dict) or
5558 "bridge" not in row):
5559 raise errors.OpPrereqError("Invalid contents of the"
5560 " 'nics' parameter")
5561 if not isinstance(self.op.disks, list):
5562 raise errors.OpPrereqError("Invalid parameter 'disks'")
5563 if len(self.op.disks) != 2:
5564 raise errors.OpPrereqError("Only two-disk configurations supported")
5565 for row in self.op.disks:
5566 if (not isinstance(row, dict) or
5567 "size" not in row or
5568 not isinstance(row["size"], int) or
5569 "mode" not in row or
5570 row["mode"] not in ['r', 'w']):
5571 raise errors.OpPrereqError("Invalid contents of the"
5572 " 'disks' parameter")
5573 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5574 if not hasattr(self.op, "name"):
5575 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5576 fname = self.cfg.ExpandInstanceName(self.op.name)
5578 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5580 self.op.name = fname
5581 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5583 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5586 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5587 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5588 raise errors.OpPrereqError("Missing allocator name")
5589 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5590 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5593 def Exec(self, feedback_fn):
5594 """Run the allocator test.
5597 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5598 ial = IAllocator(self,
5601 mem_size=self.op.mem_size,
5602 disks=self.op.disks,
5603 disk_template=self.op.disk_template,
5607 vcpus=self.op.vcpus,
5610 ial = IAllocator(self,
5613 relocate_from=list(self.relocate_from),
5616 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5617 result = ial.in_text
5619 ial.Run(self.op.allocator, validate=False)
5620 result = ial.out_text