4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_MASTER: the LU needs to run on the master node
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
70 def __init__(self, processor, op, context):
71 """Constructor for LogicalUnit.
73 This needs to be overriden in derived classes in order to check op
79 self.cfg = context.cfg
80 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 for attr_name in self._OP_REQP:
92 attr_val = getattr(op, attr_name, None)
94 raise errors.OpPrereqError("Required parameter '%s' missing" %
97 if not self.cfg.IsCluster():
98 raise errors.OpPrereqError("Cluster not initialized yet,"
99 " use 'gnt-cluster init' first.")
101 master = self.cfg.GetMasterNode()
102 if master != utils.HostInfo().name:
103 raise errors.OpPrereqError("Commands must be run on the master"
107 """Returns the SshRunner object
111 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
114 ssh = property(fget=__GetSSH)
116 def ExpandNames(self):
117 """Expand names for this LU.
119 This method is called before starting to execute the opcode, and it should
120 update all the parameters of the opcode to their canonical form (e.g. a
121 short node name must be fully expanded after this method has successfully
122 completed). This way locking, hooks, logging, ecc. can work correctly.
124 LUs which implement this method must also populate the self.needed_locks
125 member, as a dict with lock levels as keys, and a list of needed lock names
127 - Use an empty dict if you don't need any lock
128 - If you don't need any lock at a particular level omit that level
129 - Don't put anything for the BGL level
130 - If you want all locks at a level use locking.ALL_SET as a value
132 If you need to share locks (rather than acquire them exclusively) at one
133 level you can modify self.share_locks, setting a true value (usually 1) for
134 that level. By default locks are not shared.
137 # Acquire all nodes and one instance
138 self.needed_locks = {
139 locking.LEVEL_NODE: locking.ALL_SET,
140 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
142 # Acquire just two nodes
143 self.needed_locks = {
144 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
147 self.needed_locks = {} # No, you can't leave it to the default value None
150 # The implementation of this method is mandatory only if the new LU is
151 # concurrent, so that old LUs don't need to be changed all at the same
154 self.needed_locks = {} # Exclusive LUs don't need locks.
156 raise NotImplementedError
158 def DeclareLocks(self, level):
159 """Declare LU locking needs for a level
161 While most LUs can just declare their locking needs at ExpandNames time,
162 sometimes there's the need to calculate some locks after having acquired
163 the ones before. This function is called just before acquiring locks at a
164 particular level, but after acquiring the ones at lower levels, and permits
165 such calculations. It can be used to modify self.needed_locks, and by
166 default it does nothing.
168 This function is only called if you have something already set in
169 self.needed_locks for the level.
171 @param level: Locking level which is going to be locked
172 @type level: member of ganeti.locking.LEVELS
176 def CheckPrereq(self):
177 """Check prerequisites for this LU.
179 This method should check that the prerequisites for the execution
180 of this LU are fulfilled. It can do internode communication, but
181 it should be idempotent - no cluster or system changes are
184 The method should raise errors.OpPrereqError in case something is
185 not fulfilled. Its return value is ignored.
187 This method should also update all the parameters of the opcode to
188 their canonical form if it hasn't been done by ExpandNames before.
191 raise NotImplementedError
193 def Exec(self, feedback_fn):
196 This method should implement the actual work. It should raise
197 errors.OpExecError for failures that are somewhat dealt with in
201 raise NotImplementedError
203 def BuildHooksEnv(self):
204 """Build hooks environment for this LU.
206 This method should return a three-node tuple consisting of: a dict
207 containing the environment that will be used for running the
208 specific hook for this LU, a list of node names on which the hook
209 should run before the execution, and a list of node names on which
210 the hook should run after the execution.
212 The keys of the dict must not have 'GANETI_' prefixed as this will
213 be handled in the hooks runner. Also note additional keys will be
214 added by the hooks runner. If the LU doesn't define any
215 environment, an empty dict (and not None) should be returned.
217 No nodes should be returned as an empty list (and not None).
219 Note that if the HPATH for a LU class is None, this function will
223 raise NotImplementedError
225 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226 """Notify the LU about the results of its hooks.
228 This method is called every time a hooks phase is executed, and notifies
229 the Logical Unit about the hooks' result. The LU can then use it to alter
230 its result based on the hooks. By default the method does nothing and the
231 previous result is passed back unchanged but any LU can define it if it
232 wants to use the local cluster hook-scripts somehow.
235 phase: the hooks phase that has just been run
236 hooks_results: the results of the multi-node hooks rpc call
237 feedback_fn: function to send feedback back to the caller
238 lu_result: the previous result this LU had, or None in the PRE phase.
243 def _ExpandAndLockInstance(self):
244 """Helper function to expand and lock an instance.
246 Many LUs that work on an instance take its name in self.op.instance_name
247 and need to expand it and then declare the expanded name for locking. This
248 function does it, and then updates self.op.instance_name to the expanded
249 name. It also initializes needed_locks as a dict, if this hasn't been done
253 if self.needed_locks is None:
254 self.needed_locks = {}
256 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257 "_ExpandAndLockInstance called with instance-level locks set"
258 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259 if expanded_name is None:
260 raise errors.OpPrereqError("Instance '%s' not known" %
261 self.op.instance_name)
262 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263 self.op.instance_name = expanded_name
265 def _LockInstancesNodes(self, primary_only=False):
266 """Helper function to declare instances' nodes for locking.
268 This function should be called after locking one or more instances to lock
269 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270 with all primary or secondary nodes for instances already locked and
271 present in self.needed_locks[locking.LEVEL_INSTANCE].
273 It should be called from DeclareLocks, and for safety only works if
274 self.recalculate_locks[locking.LEVEL_NODE] is set.
276 In the future it may grow parameters to just lock some instance's nodes, or
277 to just lock primaries or secondary nodes, if needed.
279 If should be called in DeclareLocks in a way similar to:
281 if level == locking.LEVEL_NODE:
282 self._LockInstancesNodes()
284 @type primary_only: boolean
285 @param primary_only: only lock primary nodes of locked instances
288 assert locking.LEVEL_NODE in self.recalculate_locks, \
289 "_LockInstancesNodes helper function called with no nodes to recalculate"
291 # TODO: check if we're really been called with the instance locks held
293 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294 # future we might want to have different behaviors depending on the value
295 # of self.recalculate_locks[locking.LEVEL_NODE]
297 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298 instance = self.context.cfg.GetInstanceInfo(instance_name)
299 wanted_nodes.append(instance.primary_node)
301 wanted_nodes.extend(instance.secondary_nodes)
303 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
308 del self.recalculate_locks[locking.LEVEL_NODE]
311 class NoHooksLU(LogicalUnit):
312 """Simple LU which runs no hooks.
314 This LU is intended as a parent for other LogicalUnits which will
315 run no hooks, in order to reduce duplicate code.
322 def _GetWantedNodes(lu, nodes):
323 """Returns list of checked and expanded node names.
326 nodes: List of nodes (strings) or None for all
329 if not isinstance(nodes, list):
330 raise errors.OpPrereqError("Invalid argument type 'nodes'")
333 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334 " non-empty list of nodes whose name is to be expanded.")
338 node = lu.cfg.ExpandNodeName(name)
340 raise errors.OpPrereqError("No such node name '%s'" % name)
343 return utils.NiceSort(wanted)
346 def _GetWantedInstances(lu, instances):
347 """Returns list of checked and expanded instance names.
350 instances: List of instances (strings) or None for all
353 if not isinstance(instances, list):
354 raise errors.OpPrereqError("Invalid argument type 'instances'")
359 for name in instances:
360 instance = lu.cfg.ExpandInstanceName(name)
362 raise errors.OpPrereqError("No such instance name '%s'" % name)
363 wanted.append(instance)
366 wanted = lu.cfg.GetInstanceList()
367 return utils.NiceSort(wanted)
370 def _CheckOutputFields(static, dynamic, selected):
371 """Checks whether all selected fields are valid.
374 static: Static fields
375 dynamic: Dynamic fields
378 static_fields = frozenset(static)
379 dynamic_fields = frozenset(dynamic)
381 all_fields = static_fields | dynamic_fields
383 if not all_fields.issuperset(selected):
384 raise errors.OpPrereqError("Unknown output fields selected: %s"
385 % ",".join(frozenset(selected).
386 difference(all_fields)))
389 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390 memory, vcpus, nics):
391 """Builds instance related env variables for hooks from single variables.
394 secondary_nodes: List of secondary nodes as strings
398 "INSTANCE_NAME": name,
399 "INSTANCE_PRIMARY": primary_node,
400 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401 "INSTANCE_OS_TYPE": os_type,
402 "INSTANCE_STATUS": status,
403 "INSTANCE_MEMORY": memory,
404 "INSTANCE_VCPUS": vcpus,
408 nic_count = len(nics)
409 for idx, (ip, bridge, mac) in enumerate(nics):
412 env["INSTANCE_NIC%d_IP" % idx] = ip
413 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418 env["INSTANCE_NIC_COUNT"] = nic_count
423 def _BuildInstanceHookEnvByObject(instance, override=None):
424 """Builds instance related env variables for hooks from an object.
427 instance: objects.Instance object of instance
428 override: dict of values to override
431 'name': instance.name,
432 'primary_node': instance.primary_node,
433 'secondary_nodes': instance.secondary_nodes,
434 'os_type': instance.os,
435 'status': instance.os,
436 'memory': instance.memory,
437 'vcpus': instance.vcpus,
438 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441 args.update(override)
442 return _BuildInstanceHookEnv(**args)
445 def _CheckInstanceBridgesExist(lu, instance):
446 """Check that the brigdes needed by an instance exist.
449 # check bridges existance
450 brlist = [nic.bridge for nic in instance.nics]
451 if not rpc.call_bridges_exist(instance.primary_node, brlist):
452 raise errors.OpPrereqError("one or more target bridges %s does not"
453 " exist on destination node '%s'" %
454 (brlist, instance.primary_node))
457 class LUDestroyCluster(NoHooksLU):
458 """Logical unit for destroying the cluster.
463 def CheckPrereq(self):
464 """Check prerequisites.
466 This checks whether the cluster is empty.
468 Any errors are signalled by raising errors.OpPrereqError.
471 master = self.cfg.GetMasterNode()
473 nodelist = self.cfg.GetNodeList()
474 if len(nodelist) != 1 or nodelist[0] != master:
475 raise errors.OpPrereqError("There are still %d node(s) in"
476 " this cluster." % (len(nodelist) - 1))
477 instancelist = self.cfg.GetInstanceList()
479 raise errors.OpPrereqError("There are still %d instance(s) in"
480 " this cluster." % len(instancelist))
482 def Exec(self, feedback_fn):
483 """Destroys the cluster.
486 master = self.cfg.GetMasterNode()
487 if not rpc.call_node_stop_master(master, False):
488 raise errors.OpExecError("Could not disable the master role")
489 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
490 utils.CreateBackup(priv_key)
491 utils.CreateBackup(pub_key)
495 class LUVerifyCluster(LogicalUnit):
496 """Verifies the cluster status.
499 HPATH = "cluster-verify"
500 HTYPE = constants.HTYPE_CLUSTER
501 _OP_REQP = ["skip_checks"]
504 def ExpandNames(self):
505 self.needed_locks = {
506 locking.LEVEL_NODE: locking.ALL_SET,
507 locking.LEVEL_INSTANCE: locking.ALL_SET,
509 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
511 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
512 remote_version, feedback_fn):
513 """Run multiple tests against a node.
516 - compares ganeti version
517 - checks vg existance and size > 20G
518 - checks config file checksum
519 - checks ssh to other nodes
522 node: name of the node to check
523 file_list: required list of files
524 local_cksum: dictionary of local files and their checksums
527 # compares ganeti version
528 local_version = constants.PROTOCOL_VERSION
529 if not remote_version:
530 feedback_fn(" - ERROR: connection to %s failed" % (node))
533 if local_version != remote_version:
534 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
535 (local_version, node, remote_version))
538 # checks vg existance and size > 20G
542 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
546 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
547 constants.MIN_VG_SIZE)
549 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
552 # checks config file checksum
555 if 'filelist' not in node_result:
557 feedback_fn(" - ERROR: node hasn't returned file checksum data")
559 remote_cksum = node_result['filelist']
560 for file_name in file_list:
561 if file_name not in remote_cksum:
563 feedback_fn(" - ERROR: file '%s' missing" % file_name)
564 elif remote_cksum[file_name] != local_cksum[file_name]:
566 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
568 if 'nodelist' not in node_result:
570 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
572 if node_result['nodelist']:
574 for node in node_result['nodelist']:
575 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
576 (node, node_result['nodelist'][node]))
577 if 'node-net-test' not in node_result:
579 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
581 if node_result['node-net-test']:
583 nlist = utils.NiceSort(node_result['node-net-test'].keys())
585 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
586 (node, node_result['node-net-test'][node]))
588 hyp_result = node_result.get('hypervisor', None)
589 if isinstance(hyp_result, dict):
590 for hv_name, hv_result in hyp_result.iteritems():
591 if hv_result is not None:
592 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
593 (hv_name, hv_result))
596 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
597 node_instance, feedback_fn):
598 """Verify an instance.
600 This function checks to see if the required block devices are
601 available on the instance's node.
606 node_current = instanceconfig.primary_node
609 instanceconfig.MapLVsByNode(node_vol_should)
611 for node in node_vol_should:
612 for volume in node_vol_should[node]:
613 if node not in node_vol_is or volume not in node_vol_is[node]:
614 feedback_fn(" - ERROR: volume %s missing on node %s" %
618 if not instanceconfig.status == 'down':
619 if (node_current not in node_instance or
620 not instance in node_instance[node_current]):
621 feedback_fn(" - ERROR: instance %s not running on node %s" %
622 (instance, node_current))
625 for node in node_instance:
626 if (not node == node_current):
627 if instance in node_instance[node]:
628 feedback_fn(" - ERROR: instance %s should not run on node %s" %
634 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
635 """Verify if there are any unknown volumes in the cluster.
637 The .os, .swap and backup volumes are ignored. All other volumes are
643 for node in node_vol_is:
644 for volume in node_vol_is[node]:
645 if node not in node_vol_should or volume not in node_vol_should[node]:
646 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
651 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
652 """Verify the list of running instances.
654 This checks what instances are running but unknown to the cluster.
658 for node in node_instance:
659 for runninginstance in node_instance[node]:
660 if runninginstance not in instancelist:
661 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
662 (runninginstance, node))
666 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
667 """Verify N+1 Memory Resilience.
669 Check that if one single node dies we can still start all the instances it
675 for node, nodeinfo in node_info.iteritems():
676 # This code checks that every node which is now listed as secondary has
677 # enough memory to host all instances it is supposed to should a single
678 # other node in the cluster fail.
679 # FIXME: not ready for failover to an arbitrary node
680 # FIXME: does not support file-backed instances
681 # WARNING: we currently take into account down instances as well as up
682 # ones, considering that even if they're down someone might want to start
683 # them even in the event of a node failure.
684 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
686 for instance in instances:
687 needed_mem += instance_cfg[instance].memory
688 if nodeinfo['mfree'] < needed_mem:
689 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
690 " failovers should node %s fail" % (node, prinode))
694 def CheckPrereq(self):
695 """Check prerequisites.
697 Transform the list of checks we're going to skip into a set and check that
698 all its members are valid.
701 self.skip_set = frozenset(self.op.skip_checks)
702 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
703 raise errors.OpPrereqError("Invalid checks to be skipped specified")
705 def BuildHooksEnv(self):
708 Cluster-Verify hooks just rone in the post phase and their failure makes
709 the output be logged in the verify output and the verification to fail.
712 all_nodes = self.cfg.GetNodeList()
713 # TODO: populate the environment with useful information for verify hooks
715 return env, [], all_nodes
717 def Exec(self, feedback_fn):
718 """Verify integrity of cluster, performing various test on nodes.
722 feedback_fn("* Verifying global settings")
723 for msg in self.cfg.VerifyConfig():
724 feedback_fn(" - ERROR: %s" % msg)
726 vg_name = self.cfg.GetVGName()
727 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
728 nodelist = utils.NiceSort(self.cfg.GetNodeList())
729 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
730 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
731 i_non_redundant = [] # Non redundant instances
737 # FIXME: verify OS list
740 file_names.append(constants.SSL_CERT_FILE)
741 file_names.append(constants.CLUSTER_CONF_FILE)
742 local_checksums = utils.FingerprintFiles(file_names)
744 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
745 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
746 all_instanceinfo = rpc.call_instance_list(nodelist, hypervisors)
747 all_vglist = rpc.call_vg_list(nodelist)
748 node_verify_param = {
749 'filelist': file_names,
750 'nodelist': nodelist,
751 'hypervisor': hypervisors,
752 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
753 for node in nodeinfo]
755 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param,
756 self.cfg.GetClusterName())
757 all_rversion = rpc.call_version(nodelist)
758 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName(),
759 self.cfg.GetHypervisorType())
761 for node in nodelist:
762 feedback_fn("* Verifying node %s" % node)
763 result = self._VerifyNode(node, file_names, local_checksums,
764 all_vglist[node], all_nvinfo[node],
765 all_rversion[node], feedback_fn)
769 volumeinfo = all_volumeinfo[node]
771 if isinstance(volumeinfo, basestring):
772 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
773 (node, volumeinfo[-400:].encode('string_escape')))
775 node_volume[node] = {}
776 elif not isinstance(volumeinfo, dict):
777 feedback_fn(" - ERROR: connection to %s failed" % (node,))
781 node_volume[node] = volumeinfo
784 nodeinstance = all_instanceinfo[node]
785 if type(nodeinstance) != list:
786 feedback_fn(" - ERROR: connection to %s failed" % (node,))
790 node_instance[node] = nodeinstance
793 nodeinfo = all_ninfo[node]
794 if not isinstance(nodeinfo, dict):
795 feedback_fn(" - ERROR: connection to %s failed" % (node,))
801 "mfree": int(nodeinfo['memory_free']),
802 "dfree": int(nodeinfo['vg_free']),
805 # dictionary holding all instances this node is secondary for,
806 # grouped by their primary node. Each key is a cluster node, and each
807 # value is a list of instances which have the key as primary and the
808 # current node as secondary. this is handy to calculate N+1 memory
809 # availability if you can only failover from a primary to its
811 "sinst-by-pnode": {},
814 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
820 for instance in instancelist:
821 feedback_fn("* Verifying instance %s" % instance)
822 inst_config = self.cfg.GetInstanceInfo(instance)
823 result = self._VerifyInstance(instance, inst_config, node_volume,
824 node_instance, feedback_fn)
827 inst_config.MapLVsByNode(node_vol_should)
829 instance_cfg[instance] = inst_config
831 pnode = inst_config.primary_node
832 if pnode in node_info:
833 node_info[pnode]['pinst'].append(instance)
835 feedback_fn(" - ERROR: instance %s, connection to primary node"
836 " %s failed" % (instance, pnode))
839 # If the instance is non-redundant we cannot survive losing its primary
840 # node, so we are not N+1 compliant. On the other hand we have no disk
841 # templates with more than one secondary so that situation is not well
843 # FIXME: does not support file-backed instances
844 if len(inst_config.secondary_nodes) == 0:
845 i_non_redundant.append(instance)
846 elif len(inst_config.secondary_nodes) > 1:
847 feedback_fn(" - WARNING: multiple secondaries for instance %s"
850 for snode in inst_config.secondary_nodes:
851 if snode in node_info:
852 node_info[snode]['sinst'].append(instance)
853 if pnode not in node_info[snode]['sinst-by-pnode']:
854 node_info[snode]['sinst-by-pnode'][pnode] = []
855 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
857 feedback_fn(" - ERROR: instance %s, connection to secondary node"
858 " %s failed" % (instance, snode))
860 feedback_fn("* Verifying orphan volumes")
861 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
865 feedback_fn("* Verifying remaining instances")
866 result = self._VerifyOrphanInstances(instancelist, node_instance,
870 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
871 feedback_fn("* Verifying N+1 Memory redundancy")
872 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
875 feedback_fn("* Other Notes")
877 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
878 % len(i_non_redundant))
882 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
883 """Analize the post-hooks' result, handle it, and send some
884 nicely-formatted feedback back to the user.
887 phase: the hooks phase that has just been run
888 hooks_results: the results of the multi-node hooks rpc call
889 feedback_fn: function to send feedback back to the caller
890 lu_result: previous Exec result
893 # We only really run POST phase hooks, and are only interested in
895 if phase == constants.HOOKS_PHASE_POST:
896 # Used to change hooks' output to proper indentation
897 indent_re = re.compile('^', re.M)
898 feedback_fn("* Hooks Results")
899 if not hooks_results:
900 feedback_fn(" - ERROR: general communication failure")
903 for node_name in hooks_results:
904 show_node_header = True
905 res = hooks_results[node_name]
906 if res is False or not isinstance(res, list):
907 feedback_fn(" Communication failure")
910 for script, hkr, output in res:
911 if hkr == constants.HKR_FAIL:
912 # The node header is only shown once, if there are
913 # failing hooks on that node
915 feedback_fn(" Node %s:" % node_name)
916 show_node_header = False
917 feedback_fn(" ERROR: Script %s failed, output:" % script)
918 output = indent_re.sub(' ', output)
919 feedback_fn("%s" % output)
925 class LUVerifyDisks(NoHooksLU):
926 """Verifies the cluster disks status.
932 def ExpandNames(self):
933 self.needed_locks = {
934 locking.LEVEL_NODE: locking.ALL_SET,
935 locking.LEVEL_INSTANCE: locking.ALL_SET,
937 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
939 def CheckPrereq(self):
940 """Check prerequisites.
942 This has no prerequisites.
947 def Exec(self, feedback_fn):
948 """Verify integrity of cluster disks.
951 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
953 vg_name = self.cfg.GetVGName()
954 nodes = utils.NiceSort(self.cfg.GetNodeList())
955 instances = [self.cfg.GetInstanceInfo(name)
956 for name in self.cfg.GetInstanceList()]
959 for inst in instances:
961 if (inst.status != "up" or
962 inst.disk_template not in constants.DTS_NET_MIRROR):
964 inst.MapLVsByNode(inst_lvs)
965 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
966 for node, vol_list in inst_lvs.iteritems():
968 nv_dict[(node, vol)] = inst
973 node_lvs = rpc.call_volume_list(nodes, vg_name)
980 if isinstance(lvs, basestring):
981 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
983 elif not isinstance(lvs, dict):
984 logger.Info("connection to node %s failed or invalid data returned" %
986 res_nodes.append(node)
989 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
990 inst = nv_dict.pop((node, lv_name), None)
991 if (not lv_online and inst is not None
992 and inst.name not in res_instances):
993 res_instances.append(inst.name)
995 # any leftover items in nv_dict are missing LVs, let's arrange the
997 for key, inst in nv_dict.iteritems():
998 if inst.name not in res_missing:
999 res_missing[inst.name] = []
1000 res_missing[inst.name].append(key)
1005 class LURenameCluster(LogicalUnit):
1006 """Rename the cluster.
1009 HPATH = "cluster-rename"
1010 HTYPE = constants.HTYPE_CLUSTER
1013 def BuildHooksEnv(self):
1018 "OP_TARGET": self.cfg.GetClusterName(),
1019 "NEW_NAME": self.op.name,
1021 mn = self.cfg.GetMasterNode()
1022 return env, [mn], [mn]
1024 def CheckPrereq(self):
1025 """Verify that the passed name is a valid one.
1028 hostname = utils.HostInfo(self.op.name)
1030 new_name = hostname.name
1031 self.ip = new_ip = hostname.ip
1032 old_name = self.cfg.GetClusterName()
1033 old_ip = self.cfg.GetMasterIP()
1034 if new_name == old_name and new_ip == old_ip:
1035 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1036 " cluster has changed")
1037 if new_ip != old_ip:
1038 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1039 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1040 " reachable on the network. Aborting." %
1043 self.op.name = new_name
1045 def Exec(self, feedback_fn):
1046 """Rename the cluster.
1049 clustername = self.op.name
1052 # shutdown the master IP
1053 master = self.cfg.GetMasterNode()
1054 if not rpc.call_node_stop_master(master, False):
1055 raise errors.OpExecError("Could not disable the master role")
1060 ss.SetKey(ss.SS_MASTER_IP, ip)
1061 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1063 # Distribute updated ss config to all nodes
1064 myself = self.cfg.GetNodeInfo(master)
1065 dist_nodes = self.cfg.GetNodeList()
1066 if myself.name in dist_nodes:
1067 dist_nodes.remove(myself.name)
1069 logger.Debug("Copying updated ssconf data to all nodes")
1070 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1071 fname = ss.KeyToFilename(keyname)
1072 result = rpc.call_upload_file(dist_nodes, fname)
1073 for to_node in dist_nodes:
1074 if not result[to_node]:
1075 logger.Error("copy of file %s to node %s failed" %
1078 if not rpc.call_node_start_master(master, False):
1079 logger.Error("Could not re-enable the master role on the master,"
1080 " please restart manually.")
1083 def _RecursiveCheckIfLVMBased(disk):
1084 """Check if the given disk or its children are lvm-based.
1087 disk: ganeti.objects.Disk object
1090 boolean indicating whether a LD_LV dev_type was found or not
1094 for chdisk in disk.children:
1095 if _RecursiveCheckIfLVMBased(chdisk):
1097 return disk.dev_type == constants.LD_LV
1100 class LUSetClusterParams(LogicalUnit):
1101 """Change the parameters of the cluster.
1104 HPATH = "cluster-modify"
1105 HTYPE = constants.HTYPE_CLUSTER
1109 def ExpandNames(self):
1110 # FIXME: in the future maybe other cluster params won't require checking on
1111 # all nodes to be modified.
1112 self.needed_locks = {
1113 locking.LEVEL_NODE: locking.ALL_SET,
1115 self.share_locks[locking.LEVEL_NODE] = 1
1117 def BuildHooksEnv(self):
1122 "OP_TARGET": self.cfg.GetClusterName(),
1123 "NEW_VG_NAME": self.op.vg_name,
1125 mn = self.cfg.GetMasterNode()
1126 return env, [mn], [mn]
1128 def CheckPrereq(self):
1129 """Check prerequisites.
1131 This checks whether the given params don't conflict and
1132 if the given volume group is valid.
1135 # FIXME: This only works because there is only one parameter that can be
1136 # changed or removed.
1137 if not self.op.vg_name:
1138 instances = self.cfg.GetAllInstancesInfo().values()
1139 for inst in instances:
1140 for disk in inst.disks:
1141 if _RecursiveCheckIfLVMBased(disk):
1142 raise errors.OpPrereqError("Cannot disable lvm storage while"
1143 " lvm-based instances exist")
1145 # if vg_name not None, checks given volume group on all nodes
1147 node_list = self.acquired_locks[locking.LEVEL_NODE]
1148 vglist = rpc.call_vg_list(node_list)
1149 for node in node_list:
1150 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1151 constants.MIN_VG_SIZE)
1153 raise errors.OpPrereqError("Error on node '%s': %s" %
1156 def Exec(self, feedback_fn):
1157 """Change the parameters of the cluster.
1160 if self.op.vg_name != self.cfg.GetVGName():
1161 self.cfg.SetVGName(self.op.vg_name)
1163 feedback_fn("Cluster LVM configuration already in desired"
1164 " state, not changing")
1167 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1168 """Sleep and poll for an instance's disk to sync.
1171 if not instance.disks:
1175 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1177 node = instance.primary_node
1179 for dev in instance.disks:
1180 lu.cfg.SetDiskID(dev, node)
1186 cumul_degraded = False
1187 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1189 lu.proc.LogWarning("Can't get any data from node %s" % node)
1192 raise errors.RemoteError("Can't contact node %s for mirror data,"
1193 " aborting." % node)
1197 for i in range(len(rstats)):
1200 lu.proc.LogWarning("Can't compute data for node %s/%s" %
1201 (node, instance.disks[i].iv_name))
1203 # we ignore the ldisk parameter
1204 perc_done, est_time, is_degraded, _ = mstat
1205 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1206 if perc_done is not None:
1208 if est_time is not None:
1209 rem_time = "%d estimated seconds remaining" % est_time
1212 rem_time = "no time estimate"
1213 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1214 (instance.disks[i].iv_name, perc_done, rem_time))
1218 time.sleep(min(60, max_time))
1221 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1222 return not cumul_degraded
1225 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1226 """Check that mirrors are not degraded.
1228 The ldisk parameter, if True, will change the test from the
1229 is_degraded attribute (which represents overall non-ok status for
1230 the device(s)) to the ldisk (representing the local storage status).
1233 lu.cfg.SetDiskID(dev, node)
1240 if on_primary or dev.AssembleOnSecondary():
1241 rstats = rpc.call_blockdev_find(node, dev)
1243 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1246 result = result and (not rstats[idx])
1248 for child in dev.children:
1249 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1254 class LUDiagnoseOS(NoHooksLU):
1255 """Logical unit for OS diagnose/query.
1258 _OP_REQP = ["output_fields", "names"]
1261 def ExpandNames(self):
1263 raise errors.OpPrereqError("Selective OS query not supported")
1265 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1266 _CheckOutputFields(static=[],
1267 dynamic=self.dynamic_fields,
1268 selected=self.op.output_fields)
1270 # Lock all nodes, in shared mode
1271 self.needed_locks = {}
1272 self.share_locks[locking.LEVEL_NODE] = 1
1273 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1275 def CheckPrereq(self):
1276 """Check prerequisites.
1281 def _DiagnoseByOS(node_list, rlist):
1282 """Remaps a per-node return list into an a per-os per-node dictionary
1285 node_list: a list with the names of all nodes
1286 rlist: a map with node names as keys and OS objects as values
1289 map: a map with osnames as keys and as value another map, with
1291 keys and list of OS objects as values
1292 e.g. {"debian-etch": {"node1": [<object>,...],
1293 "node2": [<object>,]}
1298 for node_name, nr in rlist.iteritems():
1302 if os_obj.name not in all_os:
1303 # build a list of nodes for this os containing empty lists
1304 # for each node in node_list
1305 all_os[os_obj.name] = {}
1306 for nname in node_list:
1307 all_os[os_obj.name][nname] = []
1308 all_os[os_obj.name][node_name].append(os_obj)
1311 def Exec(self, feedback_fn):
1312 """Compute the list of OSes.
1315 node_list = self.acquired_locks[locking.LEVEL_NODE]
1316 node_data = rpc.call_os_diagnose(node_list)
1317 if node_data == False:
1318 raise errors.OpExecError("Can't gather the list of OSes")
1319 pol = self._DiagnoseByOS(node_list, node_data)
1321 for os_name, os_data in pol.iteritems():
1323 for field in self.op.output_fields:
1326 elif field == "valid":
1327 val = utils.all([osl and osl[0] for osl in os_data.values()])
1328 elif field == "node_status":
1330 for node_name, nos_list in os_data.iteritems():
1331 val[node_name] = [(v.status, v.path) for v in nos_list]
1333 raise errors.ParameterError(field)
1340 class LURemoveNode(LogicalUnit):
1341 """Logical unit for removing a node.
1344 HPATH = "node-remove"
1345 HTYPE = constants.HTYPE_NODE
1346 _OP_REQP = ["node_name"]
1348 def BuildHooksEnv(self):
1351 This doesn't run on the target node in the pre phase as a failed
1352 node would then be impossible to remove.
1356 "OP_TARGET": self.op.node_name,
1357 "NODE_NAME": self.op.node_name,
1359 all_nodes = self.cfg.GetNodeList()
1360 all_nodes.remove(self.op.node_name)
1361 return env, all_nodes, all_nodes
1363 def CheckPrereq(self):
1364 """Check prerequisites.
1367 - the node exists in the configuration
1368 - it does not have primary or secondary instances
1369 - it's not the master
1371 Any errors are signalled by raising errors.OpPrereqError.
1374 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1376 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1378 instance_list = self.cfg.GetInstanceList()
1380 masternode = self.cfg.GetMasterNode()
1381 if node.name == masternode:
1382 raise errors.OpPrereqError("Node is the master node,"
1383 " you need to failover first.")
1385 for instance_name in instance_list:
1386 instance = self.cfg.GetInstanceInfo(instance_name)
1387 if node.name == instance.primary_node:
1388 raise errors.OpPrereqError("Instance %s still running on the node,"
1389 " please remove first." % instance_name)
1390 if node.name in instance.secondary_nodes:
1391 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1392 " please remove first." % instance_name)
1393 self.op.node_name = node.name
1396 def Exec(self, feedback_fn):
1397 """Removes the node from the cluster.
1401 logger.Info("stopping the node daemon and removing configs from node %s" %
1404 self.context.RemoveNode(node.name)
1406 rpc.call_node_leave_cluster(node.name)
1409 class LUQueryNodes(NoHooksLU):
1410 """Logical unit for querying nodes.
1413 _OP_REQP = ["output_fields", "names"]
1416 def ExpandNames(self):
1417 self.dynamic_fields = frozenset([
1419 "mtotal", "mnode", "mfree",
1424 self.static_fields = frozenset([
1425 "name", "pinst_cnt", "sinst_cnt",
1426 "pinst_list", "sinst_list",
1427 "pip", "sip", "tags",
1431 _CheckOutputFields(static=self.static_fields,
1432 dynamic=self.dynamic_fields,
1433 selected=self.op.output_fields)
1435 self.needed_locks = {}
1436 self.share_locks[locking.LEVEL_NODE] = 1
1439 self.wanted = _GetWantedNodes(self, self.op.names)
1441 self.wanted = locking.ALL_SET
1443 self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1445 # if we don't request only static fields, we need to lock the nodes
1446 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1449 def CheckPrereq(self):
1450 """Check prerequisites.
1453 # The validation of the node list is done in the _GetWantedNodes,
1454 # if non empty, and if empty, there's no validation to do
1457 def Exec(self, feedback_fn):
1458 """Computes the list of nodes and their attributes.
1461 all_info = self.cfg.GetAllNodesInfo()
1463 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1464 elif self.wanted != locking.ALL_SET:
1465 nodenames = self.wanted
1466 missing = set(nodenames).difference(all_info.keys())
1468 raise errors.OpExecError(
1469 "Some nodes were removed before retrieving their data: %s" % missing)
1471 nodenames = all_info.keys()
1472 nodelist = [all_info[name] for name in nodenames]
1474 # begin data gathering
1476 if self.dynamic_fields.intersection(self.op.output_fields):
1478 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1479 self.cfg.GetHypervisorType())
1480 for name in nodenames:
1481 nodeinfo = node_data.get(name, None)
1484 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1485 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1486 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1487 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1488 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1489 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1490 "bootid": nodeinfo['bootid'],
1493 live_data[name] = {}
1495 live_data = dict.fromkeys(nodenames, {})
1497 node_to_primary = dict([(name, set()) for name in nodenames])
1498 node_to_secondary = dict([(name, set()) for name in nodenames])
1500 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1501 "sinst_cnt", "sinst_list"))
1502 if inst_fields & frozenset(self.op.output_fields):
1503 instancelist = self.cfg.GetInstanceList()
1505 for instance_name in instancelist:
1506 inst = self.cfg.GetInstanceInfo(instance_name)
1507 if inst.primary_node in node_to_primary:
1508 node_to_primary[inst.primary_node].add(inst.name)
1509 for secnode in inst.secondary_nodes:
1510 if secnode in node_to_secondary:
1511 node_to_secondary[secnode].add(inst.name)
1513 # end data gathering
1516 for node in nodelist:
1518 for field in self.op.output_fields:
1521 elif field == "pinst_list":
1522 val = list(node_to_primary[node.name])
1523 elif field == "sinst_list":
1524 val = list(node_to_secondary[node.name])
1525 elif field == "pinst_cnt":
1526 val = len(node_to_primary[node.name])
1527 elif field == "sinst_cnt":
1528 val = len(node_to_secondary[node.name])
1529 elif field == "pip":
1530 val = node.primary_ip
1531 elif field == "sip":
1532 val = node.secondary_ip
1533 elif field == "tags":
1534 val = list(node.GetTags())
1535 elif field == "serial_no":
1536 val = node.serial_no
1537 elif field in self.dynamic_fields:
1538 val = live_data[node.name].get(field, None)
1540 raise errors.ParameterError(field)
1541 node_output.append(val)
1542 output.append(node_output)
1547 class LUQueryNodeVolumes(NoHooksLU):
1548 """Logical unit for getting volumes on node(s).
1551 _OP_REQP = ["nodes", "output_fields"]
1554 def ExpandNames(self):
1555 _CheckOutputFields(static=["node"],
1556 dynamic=["phys", "vg", "name", "size", "instance"],
1557 selected=self.op.output_fields)
1559 self.needed_locks = {}
1560 self.share_locks[locking.LEVEL_NODE] = 1
1561 if not self.op.nodes:
1562 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1564 self.needed_locks[locking.LEVEL_NODE] = \
1565 _GetWantedNodes(self, self.op.nodes)
1567 def CheckPrereq(self):
1568 """Check prerequisites.
1570 This checks that the fields required are valid output fields.
1573 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1575 def Exec(self, feedback_fn):
1576 """Computes the list of nodes and their attributes.
1579 nodenames = self.nodes
1580 volumes = rpc.call_node_volumes(nodenames)
1582 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1583 in self.cfg.GetInstanceList()]
1585 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1588 for node in nodenames:
1589 if node not in volumes or not volumes[node]:
1592 node_vols = volumes[node][:]
1593 node_vols.sort(key=lambda vol: vol['dev'])
1595 for vol in node_vols:
1597 for field in self.op.output_fields:
1600 elif field == "phys":
1604 elif field == "name":
1606 elif field == "size":
1607 val = int(float(vol['size']))
1608 elif field == "instance":
1610 if node not in lv_by_node[inst]:
1612 if vol['name'] in lv_by_node[inst][node]:
1618 raise errors.ParameterError(field)
1619 node_output.append(str(val))
1621 output.append(node_output)
1626 class LUAddNode(LogicalUnit):
1627 """Logical unit for adding node to the cluster.
1631 HTYPE = constants.HTYPE_NODE
1632 _OP_REQP = ["node_name"]
1634 def BuildHooksEnv(self):
1637 This will run on all nodes before, and on all nodes + the new node after.
1641 "OP_TARGET": self.op.node_name,
1642 "NODE_NAME": self.op.node_name,
1643 "NODE_PIP": self.op.primary_ip,
1644 "NODE_SIP": self.op.secondary_ip,
1646 nodes_0 = self.cfg.GetNodeList()
1647 nodes_1 = nodes_0 + [self.op.node_name, ]
1648 return env, nodes_0, nodes_1
1650 def CheckPrereq(self):
1651 """Check prerequisites.
1654 - the new node is not already in the config
1656 - its parameters (single/dual homed) matches the cluster
1658 Any errors are signalled by raising errors.OpPrereqError.
1661 node_name = self.op.node_name
1664 dns_data = utils.HostInfo(node_name)
1666 node = dns_data.name
1667 primary_ip = self.op.primary_ip = dns_data.ip
1668 secondary_ip = getattr(self.op, "secondary_ip", None)
1669 if secondary_ip is None:
1670 secondary_ip = primary_ip
1671 if not utils.IsValidIP(secondary_ip):
1672 raise errors.OpPrereqError("Invalid secondary IP given")
1673 self.op.secondary_ip = secondary_ip
1675 node_list = cfg.GetNodeList()
1676 if not self.op.readd and node in node_list:
1677 raise errors.OpPrereqError("Node %s is already in the configuration" %
1679 elif self.op.readd and node not in node_list:
1680 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1682 for existing_node_name in node_list:
1683 existing_node = cfg.GetNodeInfo(existing_node_name)
1685 if self.op.readd and node == existing_node_name:
1686 if (existing_node.primary_ip != primary_ip or
1687 existing_node.secondary_ip != secondary_ip):
1688 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1689 " address configuration as before")
1692 if (existing_node.primary_ip == primary_ip or
1693 existing_node.secondary_ip == primary_ip or
1694 existing_node.primary_ip == secondary_ip or
1695 existing_node.secondary_ip == secondary_ip):
1696 raise errors.OpPrereqError("New node ip address(es) conflict with"
1697 " existing node %s" % existing_node.name)
1699 # check that the type of the node (single versus dual homed) is the
1700 # same as for the master
1701 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1702 master_singlehomed = myself.secondary_ip == myself.primary_ip
1703 newbie_singlehomed = secondary_ip == primary_ip
1704 if master_singlehomed != newbie_singlehomed:
1705 if master_singlehomed:
1706 raise errors.OpPrereqError("The master has no private ip but the"
1707 " new node has one")
1709 raise errors.OpPrereqError("The master has a private ip but the"
1710 " new node doesn't have one")
1712 # checks reachablity
1713 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1714 raise errors.OpPrereqError("Node not reachable by ping")
1716 if not newbie_singlehomed:
1717 # check reachability from my secondary ip to newbie's secondary ip
1718 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1719 source=myself.secondary_ip):
1720 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1721 " based ping to noded port")
1723 self.new_node = objects.Node(name=node,
1724 primary_ip=primary_ip,
1725 secondary_ip=secondary_ip)
1727 def Exec(self, feedback_fn):
1728 """Adds the new node to the cluster.
1731 new_node = self.new_node
1732 node = new_node.name
1734 # check connectivity
1735 result = rpc.call_version([node])[node]
1737 if constants.PROTOCOL_VERSION == result:
1738 logger.Info("communication to node %s fine, sw version %s match" %
1741 raise errors.OpExecError("Version mismatch master version %s,"
1742 " node version %s" %
1743 (constants.PROTOCOL_VERSION, result))
1745 raise errors.OpExecError("Cannot get version from the new node")
1748 logger.Info("copy ssh key to node %s" % node)
1749 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1751 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1752 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1758 keyarray.append(f.read())
1762 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1763 keyarray[3], keyarray[4], keyarray[5])
1766 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1768 # Add node to our /etc/hosts, and add key to known_hosts
1769 utils.AddHostToEtcHosts(new_node.name)
1771 if new_node.secondary_ip != new_node.primary_ip:
1772 if not rpc.call_node_tcp_ping(new_node.name,
1773 constants.LOCALHOST_IP_ADDRESS,
1774 new_node.secondary_ip,
1775 constants.DEFAULT_NODED_PORT,
1777 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1778 " you gave (%s). Please fix and re-run this"
1779 " command." % new_node.secondary_ip)
1781 node_verify_list = [self.cfg.GetMasterNode()]
1782 node_verify_param = {
1784 # TODO: do a node-net-test as well?
1787 result = rpc.call_node_verify(node_verify_list, node_verify_param,
1788 self.cfg.GetClusterName())
1789 for verifier in node_verify_list:
1790 if not result[verifier]:
1791 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1792 " for remote verification" % verifier)
1793 if result[verifier]['nodelist']:
1794 for failed in result[verifier]['nodelist']:
1795 feedback_fn("ssh/hostname verification failed %s -> %s" %
1796 (verifier, result[verifier]['nodelist'][failed]))
1797 raise errors.OpExecError("ssh/hostname verification failed.")
1799 # Distribute updated /etc/hosts and known_hosts to all nodes,
1800 # including the node just added
1801 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1802 dist_nodes = self.cfg.GetNodeList()
1803 if not self.op.readd:
1804 dist_nodes.append(node)
1805 if myself.name in dist_nodes:
1806 dist_nodes.remove(myself.name)
1808 logger.Debug("Copying hosts and known_hosts to all nodes")
1809 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1810 result = rpc.call_upload_file(dist_nodes, fname)
1811 for to_node in dist_nodes:
1812 if not result[to_node]:
1813 logger.Error("copy of file %s to node %s failed" %
1817 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1818 to_copy.append(constants.VNC_PASSWORD_FILE)
1819 for fname in to_copy:
1820 result = rpc.call_upload_file([node], fname)
1821 if not result[node]:
1822 logger.Error("could not copy file %s to node %s" % (fname, node))
1825 self.context.ReaddNode(new_node)
1827 self.context.AddNode(new_node)
1830 class LUQueryClusterInfo(NoHooksLU):
1831 """Query cluster configuration.
1838 def ExpandNames(self):
1839 self.needed_locks = {}
1841 def CheckPrereq(self):
1842 """No prerequsites needed for this LU.
1847 def Exec(self, feedback_fn):
1848 """Return cluster config.
1852 "name": self.cfg.GetClusterName(),
1853 "software_version": constants.RELEASE_VERSION,
1854 "protocol_version": constants.PROTOCOL_VERSION,
1855 "config_version": constants.CONFIG_VERSION,
1856 "os_api_version": constants.OS_API_VERSION,
1857 "export_version": constants.EXPORT_VERSION,
1858 "master": self.cfg.GetMasterNode(),
1859 "architecture": (platform.architecture()[0], platform.machine()),
1860 "hypervisor_type": self.cfg.GetHypervisorType(),
1861 "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1867 class LUQueryConfigValues(NoHooksLU):
1868 """Return configuration values.
1874 def ExpandNames(self):
1875 self.needed_locks = {}
1877 static_fields = ["cluster_name", "master_node"]
1878 _CheckOutputFields(static=static_fields,
1880 selected=self.op.output_fields)
1882 def CheckPrereq(self):
1883 """No prerequisites.
1888 def Exec(self, feedback_fn):
1889 """Dump a representation of the cluster config to the standard output.
1893 for field in self.op.output_fields:
1894 if field == "cluster_name":
1895 values.append(self.cfg.GetClusterName())
1896 elif field == "master_node":
1897 values.append(self.cfg.GetMasterNode())
1899 raise errors.ParameterError(field)
1903 class LUActivateInstanceDisks(NoHooksLU):
1904 """Bring up an instance's disks.
1907 _OP_REQP = ["instance_name"]
1910 def ExpandNames(self):
1911 self._ExpandAndLockInstance()
1912 self.needed_locks[locking.LEVEL_NODE] = []
1913 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1915 def DeclareLocks(self, level):
1916 if level == locking.LEVEL_NODE:
1917 self._LockInstancesNodes()
1919 def CheckPrereq(self):
1920 """Check prerequisites.
1922 This checks that the instance is in the cluster.
1925 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1926 assert self.instance is not None, \
1927 "Cannot retrieve locked instance %s" % self.op.instance_name
1929 def Exec(self, feedback_fn):
1930 """Activate the disks.
1933 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1935 raise errors.OpExecError("Cannot activate block devices")
1940 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
1941 """Prepare the block devices for an instance.
1943 This sets up the block devices on all nodes.
1946 instance: a ganeti.objects.Instance object
1947 ignore_secondaries: if true, errors on secondary nodes won't result
1948 in an error return from the function
1951 false if the operation failed
1952 list of (host, instance_visible_name, node_visible_name) if the operation
1953 suceeded with the mapping from node devices to instance devices
1957 iname = instance.name
1958 # With the two passes mechanism we try to reduce the window of
1959 # opportunity for the race condition of switching DRBD to primary
1960 # before handshaking occured, but we do not eliminate it
1962 # The proper fix would be to wait (with some limits) until the
1963 # connection has been made and drbd transitions from WFConnection
1964 # into any other network-connected state (Connected, SyncTarget,
1967 # 1st pass, assemble on all nodes in secondary mode
1968 for inst_disk in instance.disks:
1969 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1970 lu.cfg.SetDiskID(node_disk, node)
1971 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1973 logger.Error("could not prepare block device %s on node %s"
1974 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1975 if not ignore_secondaries:
1978 # FIXME: race condition on drbd migration to primary
1980 # 2nd pass, do only the primary node
1981 for inst_disk in instance.disks:
1982 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1983 if node != instance.primary_node:
1985 lu.cfg.SetDiskID(node_disk, node)
1986 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1988 logger.Error("could not prepare block device %s on node %s"
1989 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1991 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1993 # leave the disks configured for the primary node
1994 # this is a workaround that would be fixed better by
1995 # improving the logical/physical id handling
1996 for disk in instance.disks:
1997 lu.cfg.SetDiskID(disk, instance.primary_node)
1999 return disks_ok, device_info
2002 def _StartInstanceDisks(lu, instance, force):
2003 """Start the disks of an instance.
2006 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2007 ignore_secondaries=force)
2009 _ShutdownInstanceDisks(lu, instance)
2010 if force is not None and not force:
2011 logger.Error("If the message above refers to a secondary node,"
2012 " you can retry the operation using '--force'.")
2013 raise errors.OpExecError("Disk consistency error")
2016 class LUDeactivateInstanceDisks(NoHooksLU):
2017 """Shutdown an instance's disks.
2020 _OP_REQP = ["instance_name"]
2023 def ExpandNames(self):
2024 self._ExpandAndLockInstance()
2025 self.needed_locks[locking.LEVEL_NODE] = []
2026 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2028 def DeclareLocks(self, level):
2029 if level == locking.LEVEL_NODE:
2030 self._LockInstancesNodes()
2032 def CheckPrereq(self):
2033 """Check prerequisites.
2035 This checks that the instance is in the cluster.
2038 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2039 assert self.instance is not None, \
2040 "Cannot retrieve locked instance %s" % self.op.instance_name
2042 def Exec(self, feedback_fn):
2043 """Deactivate the disks
2046 instance = self.instance
2047 _SafeShutdownInstanceDisks(self, instance)
2050 def _SafeShutdownInstanceDisks(lu, instance):
2051 """Shutdown block devices of an instance.
2053 This function checks if an instance is running, before calling
2054 _ShutdownInstanceDisks.
2057 ins_l = rpc.call_instance_list([instance.primary_node],
2058 [instance.hypervisor])
2059 ins_l = ins_l[instance.primary_node]
2060 if not type(ins_l) is list:
2061 raise errors.OpExecError("Can't contact node '%s'" %
2062 instance.primary_node)
2064 if instance.name in ins_l:
2065 raise errors.OpExecError("Instance is running, can't shutdown"
2068 _ShutdownInstanceDisks(lu, instance)
2071 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2072 """Shutdown block devices of an instance.
2074 This does the shutdown on all nodes of the instance.
2076 If the ignore_primary is false, errors on the primary node are
2081 for disk in instance.disks:
2082 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2083 lu.cfg.SetDiskID(top_disk, node)
2084 if not rpc.call_blockdev_shutdown(node, top_disk):
2085 logger.Error("could not shutdown block device %s on node %s" %
2086 (disk.iv_name, node))
2087 if not ignore_primary or node != instance.primary_node:
2092 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2093 """Checks if a node has enough free memory.
2095 This function check if a given node has the needed amount of free
2096 memory. In case the node has less memory or we cannot get the
2097 information from the node, this function raise an OpPrereqError
2100 @type lu: C{LogicalUnit}
2101 @param lu: a logical unit from which we get configuration data
2103 @param node: the node to check
2104 @type reason: C{str}
2105 @param reason: string to use in the error message
2106 @type requested: C{int}
2107 @param requested: the amount of memory in MiB to check for
2108 @type hypervisor: C{str}
2109 @param hypervisor: the hypervisor to ask for memory stats
2110 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2111 we cannot check the node
2114 nodeinfo = rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2115 if not nodeinfo or not isinstance(nodeinfo, dict):
2116 raise errors.OpPrereqError("Could not contact node %s for resource"
2117 " information" % (node,))
2119 free_mem = nodeinfo[node].get('memory_free')
2120 if not isinstance(free_mem, int):
2121 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2122 " was '%s'" % (node, free_mem))
2123 if requested > free_mem:
2124 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2125 " needed %s MiB, available %s MiB" %
2126 (node, reason, requested, free_mem))
2129 class LUStartupInstance(LogicalUnit):
2130 """Starts an instance.
2133 HPATH = "instance-start"
2134 HTYPE = constants.HTYPE_INSTANCE
2135 _OP_REQP = ["instance_name", "force"]
2138 def ExpandNames(self):
2139 self._ExpandAndLockInstance()
2140 self.needed_locks[locking.LEVEL_NODE] = []
2141 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2143 def DeclareLocks(self, level):
2144 if level == locking.LEVEL_NODE:
2145 self._LockInstancesNodes()
2147 def BuildHooksEnv(self):
2150 This runs on master, primary and secondary nodes of the instance.
2154 "FORCE": self.op.force,
2156 env.update(_BuildInstanceHookEnvByObject(self.instance))
2157 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2158 list(self.instance.secondary_nodes))
2161 def CheckPrereq(self):
2162 """Check prerequisites.
2164 This checks that the instance is in the cluster.
2167 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2168 assert self.instance is not None, \
2169 "Cannot retrieve locked instance %s" % self.op.instance_name
2171 # check bridges existance
2172 _CheckInstanceBridgesExist(self, instance)
2174 _CheckNodeFreeMemory(self, instance.primary_node,
2175 "starting instance %s" % instance.name,
2176 instance.memory, instance.hypervisor)
2178 def Exec(self, feedback_fn):
2179 """Start the instance.
2182 instance = self.instance
2183 force = self.op.force
2184 extra_args = getattr(self.op, "extra_args", "")
2186 self.cfg.MarkInstanceUp(instance.name)
2188 node_current = instance.primary_node
2190 _StartInstanceDisks(self, instance, force)
2192 if not rpc.call_instance_start(node_current, instance, extra_args):
2193 _ShutdownInstanceDisks(self, instance)
2194 raise errors.OpExecError("Could not start instance")
2197 class LURebootInstance(LogicalUnit):
2198 """Reboot an instance.
2201 HPATH = "instance-reboot"
2202 HTYPE = constants.HTYPE_INSTANCE
2203 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2206 def ExpandNames(self):
2207 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2208 constants.INSTANCE_REBOOT_HARD,
2209 constants.INSTANCE_REBOOT_FULL]:
2210 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2211 (constants.INSTANCE_REBOOT_SOFT,
2212 constants.INSTANCE_REBOOT_HARD,
2213 constants.INSTANCE_REBOOT_FULL))
2214 self._ExpandAndLockInstance()
2215 self.needed_locks[locking.LEVEL_NODE] = []
2216 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2218 def DeclareLocks(self, level):
2219 if level == locking.LEVEL_NODE:
2220 primary_only = not constants.INSTANCE_REBOOT_FULL
2221 self._LockInstancesNodes(primary_only=primary_only)
2223 def BuildHooksEnv(self):
2226 This runs on master, primary and secondary nodes of the instance.
2230 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2232 env.update(_BuildInstanceHookEnvByObject(self.instance))
2233 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2234 list(self.instance.secondary_nodes))
2237 def CheckPrereq(self):
2238 """Check prerequisites.
2240 This checks that the instance is in the cluster.
2243 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2244 assert self.instance is not None, \
2245 "Cannot retrieve locked instance %s" % self.op.instance_name
2247 # check bridges existance
2248 _CheckInstanceBridgesExist(self, instance)
2250 def Exec(self, feedback_fn):
2251 """Reboot the instance.
2254 instance = self.instance
2255 ignore_secondaries = self.op.ignore_secondaries
2256 reboot_type = self.op.reboot_type
2257 extra_args = getattr(self.op, "extra_args", "")
2259 node_current = instance.primary_node
2261 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2262 constants.INSTANCE_REBOOT_HARD]:
2263 if not rpc.call_instance_reboot(node_current, instance,
2264 reboot_type, extra_args):
2265 raise errors.OpExecError("Could not reboot instance")
2267 if not rpc.call_instance_shutdown(node_current, instance):
2268 raise errors.OpExecError("could not shutdown instance for full reboot")
2269 _ShutdownInstanceDisks(self, instance)
2270 _StartInstanceDisks(self, instance, ignore_secondaries)
2271 if not rpc.call_instance_start(node_current, instance, extra_args):
2272 _ShutdownInstanceDisks(self, instance)
2273 raise errors.OpExecError("Could not start instance for full reboot")
2275 self.cfg.MarkInstanceUp(instance.name)
2278 class LUShutdownInstance(LogicalUnit):
2279 """Shutdown an instance.
2282 HPATH = "instance-stop"
2283 HTYPE = constants.HTYPE_INSTANCE
2284 _OP_REQP = ["instance_name"]
2287 def ExpandNames(self):
2288 self._ExpandAndLockInstance()
2289 self.needed_locks[locking.LEVEL_NODE] = []
2290 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2292 def DeclareLocks(self, level):
2293 if level == locking.LEVEL_NODE:
2294 self._LockInstancesNodes()
2296 def BuildHooksEnv(self):
2299 This runs on master, primary and secondary nodes of the instance.
2302 env = _BuildInstanceHookEnvByObject(self.instance)
2303 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2304 list(self.instance.secondary_nodes))
2307 def CheckPrereq(self):
2308 """Check prerequisites.
2310 This checks that the instance is in the cluster.
2313 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2314 assert self.instance is not None, \
2315 "Cannot retrieve locked instance %s" % self.op.instance_name
2317 def Exec(self, feedback_fn):
2318 """Shutdown the instance.
2321 instance = self.instance
2322 node_current = instance.primary_node
2323 self.cfg.MarkInstanceDown(instance.name)
2324 if not rpc.call_instance_shutdown(node_current, instance):
2325 logger.Error("could not shutdown instance")
2327 _ShutdownInstanceDisks(self, instance)
2330 class LUReinstallInstance(LogicalUnit):
2331 """Reinstall an instance.
2334 HPATH = "instance-reinstall"
2335 HTYPE = constants.HTYPE_INSTANCE
2336 _OP_REQP = ["instance_name"]
2339 def ExpandNames(self):
2340 self._ExpandAndLockInstance()
2341 self.needed_locks[locking.LEVEL_NODE] = []
2342 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2344 def DeclareLocks(self, level):
2345 if level == locking.LEVEL_NODE:
2346 self._LockInstancesNodes()
2348 def BuildHooksEnv(self):
2351 This runs on master, primary and secondary nodes of the instance.
2354 env = _BuildInstanceHookEnvByObject(self.instance)
2355 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2356 list(self.instance.secondary_nodes))
2359 def CheckPrereq(self):
2360 """Check prerequisites.
2362 This checks that the instance is in the cluster and is not running.
2365 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2366 assert instance is not None, \
2367 "Cannot retrieve locked instance %s" % self.op.instance_name
2369 if instance.disk_template == constants.DT_DISKLESS:
2370 raise errors.OpPrereqError("Instance '%s' has no disks" %
2371 self.op.instance_name)
2372 if instance.status != "down":
2373 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2374 self.op.instance_name)
2375 remote_info = rpc.call_instance_info(instance.primary_node, instance.name,
2376 instance.hypervisor)
2378 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2379 (self.op.instance_name,
2380 instance.primary_node))
2382 self.op.os_type = getattr(self.op, "os_type", None)
2383 if self.op.os_type is not None:
2385 pnode = self.cfg.GetNodeInfo(
2386 self.cfg.ExpandNodeName(instance.primary_node))
2388 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2390 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2392 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2393 " primary node" % self.op.os_type)
2395 self.instance = instance
2397 def Exec(self, feedback_fn):
2398 """Reinstall the instance.
2401 inst = self.instance
2403 if self.op.os_type is not None:
2404 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2405 inst.os = self.op.os_type
2406 self.cfg.Update(inst)
2408 _StartInstanceDisks(self, inst, None)
2410 feedback_fn("Running the instance OS create scripts...")
2411 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2412 raise errors.OpExecError("Could not install OS for instance %s"
2414 (inst.name, inst.primary_node))
2416 _ShutdownInstanceDisks(self, inst)
2419 class LURenameInstance(LogicalUnit):
2420 """Rename an instance.
2423 HPATH = "instance-rename"
2424 HTYPE = constants.HTYPE_INSTANCE
2425 _OP_REQP = ["instance_name", "new_name"]
2427 def BuildHooksEnv(self):
2430 This runs on master, primary and secondary nodes of the instance.
2433 env = _BuildInstanceHookEnvByObject(self.instance)
2434 env["INSTANCE_NEW_NAME"] = self.op.new_name
2435 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2436 list(self.instance.secondary_nodes))
2439 def CheckPrereq(self):
2440 """Check prerequisites.
2442 This checks that the instance is in the cluster and is not running.
2445 instance = self.cfg.GetInstanceInfo(
2446 self.cfg.ExpandInstanceName(self.op.instance_name))
2447 if instance is None:
2448 raise errors.OpPrereqError("Instance '%s' not known" %
2449 self.op.instance_name)
2450 if instance.status != "down":
2451 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2452 self.op.instance_name)
2453 remote_info = rpc.call_instance_info(instance.primary_node, instance.name,
2454 instance.hypervisor)
2456 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2457 (self.op.instance_name,
2458 instance.primary_node))
2459 self.instance = instance
2461 # new name verification
2462 name_info = utils.HostInfo(self.op.new_name)
2464 self.op.new_name = new_name = name_info.name
2465 instance_list = self.cfg.GetInstanceList()
2466 if new_name in instance_list:
2467 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2470 if not getattr(self.op, "ignore_ip", False):
2471 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2472 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2473 (name_info.ip, new_name))
2476 def Exec(self, feedback_fn):
2477 """Reinstall the instance.
2480 inst = self.instance
2481 old_name = inst.name
2483 if inst.disk_template == constants.DT_FILE:
2484 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2486 self.cfg.RenameInstance(inst.name, self.op.new_name)
2487 # Change the instance lock. This is definitely safe while we hold the BGL
2488 self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2489 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2491 # re-read the instance from the configuration after rename
2492 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2494 if inst.disk_template == constants.DT_FILE:
2495 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2496 result = rpc.call_file_storage_dir_rename(inst.primary_node,
2497 old_file_storage_dir,
2498 new_file_storage_dir)
2501 raise errors.OpExecError("Could not connect to node '%s' to rename"
2502 " directory '%s' to '%s' (but the instance"
2503 " has been renamed in Ganeti)" % (
2504 inst.primary_node, old_file_storage_dir,
2505 new_file_storage_dir))
2508 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2509 " (but the instance has been renamed in"
2510 " Ganeti)" % (old_file_storage_dir,
2511 new_file_storage_dir))
2513 _StartInstanceDisks(self, inst, None)
2515 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2517 msg = ("Could not run OS rename script for instance %s on node %s"
2518 " (but the instance has been renamed in Ganeti)" %
2519 (inst.name, inst.primary_node))
2522 _ShutdownInstanceDisks(self, inst)
2525 class LURemoveInstance(LogicalUnit):
2526 """Remove an instance.
2529 HPATH = "instance-remove"
2530 HTYPE = constants.HTYPE_INSTANCE
2531 _OP_REQP = ["instance_name", "ignore_failures"]
2534 def ExpandNames(self):
2535 self._ExpandAndLockInstance()
2536 self.needed_locks[locking.LEVEL_NODE] = []
2537 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2539 def DeclareLocks(self, level):
2540 if level == locking.LEVEL_NODE:
2541 self._LockInstancesNodes()
2543 def BuildHooksEnv(self):
2546 This runs on master, primary and secondary nodes of the instance.
2549 env = _BuildInstanceHookEnvByObject(self.instance)
2550 nl = [self.cfg.GetMasterNode()]
2553 def CheckPrereq(self):
2554 """Check prerequisites.
2556 This checks that the instance is in the cluster.
2559 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2560 assert self.instance is not None, \
2561 "Cannot retrieve locked instance %s" % self.op.instance_name
2563 def Exec(self, feedback_fn):
2564 """Remove the instance.
2567 instance = self.instance
2568 logger.Info("shutting down instance %s on node %s" %
2569 (instance.name, instance.primary_node))
2571 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2572 if self.op.ignore_failures:
2573 feedback_fn("Warning: can't shutdown instance")
2575 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2576 (instance.name, instance.primary_node))
2578 logger.Info("removing block devices for instance %s" % instance.name)
2580 if not _RemoveDisks(self, instance):
2581 if self.op.ignore_failures:
2582 feedback_fn("Warning: can't remove instance's disks")
2584 raise errors.OpExecError("Can't remove instance's disks")
2586 logger.Info("removing instance %s out of cluster config" % instance.name)
2588 self.cfg.RemoveInstance(instance.name)
2589 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2592 class LUQueryInstances(NoHooksLU):
2593 """Logical unit for querying instances.
2596 _OP_REQP = ["output_fields", "names"]
2599 def ExpandNames(self):
2600 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2601 self.static_fields = frozenset([
2602 "name", "os", "pnode", "snodes",
2603 "admin_state", "admin_ram",
2604 "disk_template", "ip", "mac", "bridge",
2605 "sda_size", "sdb_size", "vcpus", "tags",
2606 "network_port", "kernel_path", "initrd_path",
2607 "hvm_boot_order", "hvm_acpi", "hvm_pae",
2608 "hvm_cdrom_image_path", "hvm_nic_type",
2609 "hvm_disk_type", "vnc_bind_address",
2610 "serial_no", "hypervisor",
2612 _CheckOutputFields(static=self.static_fields,
2613 dynamic=self.dynamic_fields,
2614 selected=self.op.output_fields)
2616 self.needed_locks = {}
2617 self.share_locks[locking.LEVEL_INSTANCE] = 1
2618 self.share_locks[locking.LEVEL_NODE] = 1
2621 self.wanted = _GetWantedInstances(self, self.op.names)
2623 self.wanted = locking.ALL_SET
2625 self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2627 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2628 self.needed_locks[locking.LEVEL_NODE] = []
2629 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2631 def DeclareLocks(self, level):
2632 if level == locking.LEVEL_NODE and self.do_locking:
2633 self._LockInstancesNodes()
2635 def CheckPrereq(self):
2636 """Check prerequisites.
2641 def Exec(self, feedback_fn):
2642 """Computes the list of nodes and their attributes.
2645 all_info = self.cfg.GetAllInstancesInfo()
2647 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2648 elif self.wanted != locking.ALL_SET:
2649 instance_names = self.wanted
2650 missing = set(instance_names).difference(all_info.keys())
2652 raise errors.OpExecError(
2653 "Some instances were removed before retrieving their data: %s"
2656 instance_names = all_info.keys()
2657 instance_list = [all_info[iname] for iname in instance_names]
2659 # begin data gathering
2661 nodes = frozenset([inst.primary_node for inst in instance_list])
2662 hv_list = list(set([inst.hypervisor for inst in instance_list]))
2665 if self.dynamic_fields.intersection(self.op.output_fields):
2667 node_data = rpc.call_all_instances_info(nodes, hv_list)
2669 result = node_data[name]
2671 live_data.update(result)
2672 elif result == False:
2673 bad_nodes.append(name)
2674 # else no instance is alive
2676 live_data = dict([(name, {}) for name in instance_names])
2678 # end data gathering
2681 for instance in instance_list:
2683 for field in self.op.output_fields:
2688 elif field == "pnode":
2689 val = instance.primary_node
2690 elif field == "snodes":
2691 val = list(instance.secondary_nodes)
2692 elif field == "admin_state":
2693 val = (instance.status != "down")
2694 elif field == "oper_state":
2695 if instance.primary_node in bad_nodes:
2698 val = bool(live_data.get(instance.name))
2699 elif field == "status":
2700 if instance.primary_node in bad_nodes:
2701 val = "ERROR_nodedown"
2703 running = bool(live_data.get(instance.name))
2705 if instance.status != "down":
2710 if instance.status != "down":
2714 elif field == "admin_ram":
2715 val = instance.memory
2716 elif field == "oper_ram":
2717 if instance.primary_node in bad_nodes:
2719 elif instance.name in live_data:
2720 val = live_data[instance.name].get("memory", "?")
2723 elif field == "disk_template":
2724 val = instance.disk_template
2726 val = instance.nics[0].ip
2727 elif field == "bridge":
2728 val = instance.nics[0].bridge
2729 elif field == "mac":
2730 val = instance.nics[0].mac
2731 elif field == "sda_size" or field == "sdb_size":
2732 disk = instance.FindDisk(field[:3])
2737 elif field == "vcpus":
2738 val = instance.vcpus
2739 elif field == "tags":
2740 val = list(instance.GetTags())
2741 elif field == "serial_no":
2742 val = instance.serial_no
2743 elif field in ("network_port", "kernel_path", "initrd_path",
2744 "hvm_boot_order", "hvm_acpi", "hvm_pae",
2745 "hvm_cdrom_image_path", "hvm_nic_type",
2746 "hvm_disk_type", "vnc_bind_address"):
2747 val = getattr(instance, field, None)
2750 elif field in ("hvm_nic_type", "hvm_disk_type",
2751 "kernel_path", "initrd_path"):
2755 elif field == "hypervisor":
2756 val = instance.hypervisor
2758 raise errors.ParameterError(field)
2765 class LUFailoverInstance(LogicalUnit):
2766 """Failover an instance.
2769 HPATH = "instance-failover"
2770 HTYPE = constants.HTYPE_INSTANCE
2771 _OP_REQP = ["instance_name", "ignore_consistency"]
2774 def ExpandNames(self):
2775 self._ExpandAndLockInstance()
2776 self.needed_locks[locking.LEVEL_NODE] = []
2777 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2779 def DeclareLocks(self, level):
2780 if level == locking.LEVEL_NODE:
2781 self._LockInstancesNodes()
2783 def BuildHooksEnv(self):
2786 This runs on master, primary and secondary nodes of the instance.
2790 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2792 env.update(_BuildInstanceHookEnvByObject(self.instance))
2793 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2796 def CheckPrereq(self):
2797 """Check prerequisites.
2799 This checks that the instance is in the cluster.
2802 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2803 assert self.instance is not None, \
2804 "Cannot retrieve locked instance %s" % self.op.instance_name
2806 if instance.disk_template not in constants.DTS_NET_MIRROR:
2807 raise errors.OpPrereqError("Instance's disk layout is not"
2808 " network mirrored, cannot failover.")
2810 secondary_nodes = instance.secondary_nodes
2811 if not secondary_nodes:
2812 raise errors.ProgrammerError("no secondary node but using "
2813 "a mirrored disk template")
2815 target_node = secondary_nodes[0]
2816 # check memory requirements on the secondary node
2817 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2818 instance.name, instance.memory,
2819 instance.hypervisor)
2821 # check bridge existance
2822 brlist = [nic.bridge for nic in instance.nics]
2823 if not rpc.call_bridges_exist(target_node, brlist):
2824 raise errors.OpPrereqError("One or more target bridges %s does not"
2825 " exist on destination node '%s'" %
2826 (brlist, target_node))
2828 def Exec(self, feedback_fn):
2829 """Failover an instance.
2831 The failover is done by shutting it down on its present node and
2832 starting it on the secondary.
2835 instance = self.instance
2837 source_node = instance.primary_node
2838 target_node = instance.secondary_nodes[0]
2840 feedback_fn("* checking disk consistency between source and target")
2841 for dev in instance.disks:
2842 # for drbd, these are drbd over lvm
2843 if not _CheckDiskConsistency(self, dev, target_node, False):
2844 if instance.status == "up" and not self.op.ignore_consistency:
2845 raise errors.OpExecError("Disk %s is degraded on target node,"
2846 " aborting failover." % dev.iv_name)
2848 feedback_fn("* shutting down instance on source node")
2849 logger.Info("Shutting down instance %s on node %s" %
2850 (instance.name, source_node))
2852 if not rpc.call_instance_shutdown(source_node, instance):
2853 if self.op.ignore_consistency:
2854 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2855 " anyway. Please make sure node %s is down" %
2856 (instance.name, source_node, source_node))
2858 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2859 (instance.name, source_node))
2861 feedback_fn("* deactivating the instance's disks on source node")
2862 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2863 raise errors.OpExecError("Can't shut down the instance's disks.")
2865 instance.primary_node = target_node
2866 # distribute new instance config to the other nodes
2867 self.cfg.Update(instance)
2869 # Only start the instance if it's marked as up
2870 if instance.status == "up":
2871 feedback_fn("* activating the instance's disks on target node")
2872 logger.Info("Starting instance %s on node %s" %
2873 (instance.name, target_node))
2875 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2876 ignore_secondaries=True)
2878 _ShutdownInstanceDisks(self, instance)
2879 raise errors.OpExecError("Can't activate the instance's disks")
2881 feedback_fn("* starting the instance on the target node")
2882 if not rpc.call_instance_start(target_node, instance, None):
2883 _ShutdownInstanceDisks(self, instance)
2884 raise errors.OpExecError("Could not start instance %s on node %s." %
2885 (instance.name, target_node))
2888 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2889 """Create a tree of block devices on the primary node.
2891 This always creates all devices.
2895 for child in device.children:
2896 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2899 lu.cfg.SetDiskID(device, node)
2900 new_id = rpc.call_blockdev_create(node, device, device.size,
2901 instance.name, True, info)
2904 if device.physical_id is None:
2905 device.physical_id = new_id
2909 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2910 """Create a tree of block devices on a secondary node.
2912 If this device type has to be created on secondaries, create it and
2915 If not, just recurse to children keeping the same 'force' value.
2918 if device.CreateOnSecondary():
2921 for child in device.children:
2922 if not _CreateBlockDevOnSecondary(lu, node, instance,
2923 child, force, info):
2928 lu.cfg.SetDiskID(device, node)
2929 new_id = rpc.call_blockdev_create(node, device, device.size,
2930 instance.name, False, info)
2933 if device.physical_id is None:
2934 device.physical_id = new_id
2938 def _GenerateUniqueNames(lu, exts):
2939 """Generate a suitable LV name.
2941 This will generate a logical volume name for the given instance.
2946 new_id = lu.cfg.GenerateUniqueID()
2947 results.append("%s%s" % (new_id, val))
2951 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
2953 """Generate a drbd8 device complete with its children.
2956 port = lu.cfg.AllocatePort()
2957 vgname = lu.cfg.GetVGName()
2958 shared_secret = lu.cfg.GenerateDRBDSecret()
2959 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2960 logical_id=(vgname, names[0]))
2961 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2962 logical_id=(vgname, names[1]))
2963 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2964 logical_id=(primary, secondary, port,
2967 children=[dev_data, dev_meta],
2972 def _GenerateDiskTemplate(lu, template_name,
2973 instance_name, primary_node,
2974 secondary_nodes, disk_sz, swap_sz,
2975 file_storage_dir, file_driver):
2976 """Generate the entire disk layout for a given template type.
2979 #TODO: compute space requirements
2981 vgname = lu.cfg.GetVGName()
2982 if template_name == constants.DT_DISKLESS:
2984 elif template_name == constants.DT_PLAIN:
2985 if len(secondary_nodes) != 0:
2986 raise errors.ProgrammerError("Wrong template configuration")
2988 names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
2989 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2990 logical_id=(vgname, names[0]),
2992 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2993 logical_id=(vgname, names[1]),
2995 disks = [sda_dev, sdb_dev]
2996 elif template_name == constants.DT_DRBD8:
2997 if len(secondary_nodes) != 1:
2998 raise errors.ProgrammerError("Wrong template configuration")
2999 remote_node = secondary_nodes[0]
3000 (minor_pa, minor_pb,
3001 minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3002 [primary_node, primary_node, remote_node, remote_node], instance_name)
3004 names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3005 ".sdb_data", ".sdb_meta"])
3006 drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3007 disk_sz, names[0:2], "sda",
3009 drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3010 swap_sz, names[2:4], "sdb",
3012 disks = [drbd_sda_dev, drbd_sdb_dev]
3013 elif template_name == constants.DT_FILE:
3014 if len(secondary_nodes) != 0:
3015 raise errors.ProgrammerError("Wrong template configuration")
3017 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3018 iv_name="sda", logical_id=(file_driver,
3019 "%s/sda" % file_storage_dir))
3020 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3021 iv_name="sdb", logical_id=(file_driver,
3022 "%s/sdb" % file_storage_dir))
3023 disks = [file_sda_dev, file_sdb_dev]
3025 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3029 def _GetInstanceInfoText(instance):
3030 """Compute that text that should be added to the disk's metadata.
3033 return "originstname+%s" % instance.name
3036 def _CreateDisks(lu, instance):
3037 """Create all disks for an instance.
3039 This abstracts away some work from AddInstance.
3042 instance: the instance object
3045 True or False showing the success of the creation process
3048 info = _GetInstanceInfoText(instance)
3050 if instance.disk_template == constants.DT_FILE:
3051 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3052 result = rpc.call_file_storage_dir_create(instance.primary_node,
3056 logger.Error("Could not connect to node '%s'" % instance.primary_node)
3060 logger.Error("failed to create directory '%s'" % file_storage_dir)
3063 for device in instance.disks:
3064 logger.Info("creating volume %s for instance %s" %
3065 (device.iv_name, instance.name))
3067 for secondary_node in instance.secondary_nodes:
3068 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3069 device, False, info):
3070 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3071 (device.iv_name, device, secondary_node))
3074 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3075 instance, device, info):
3076 logger.Error("failed to create volume %s on primary!" %
3083 def _RemoveDisks(lu, instance):
3084 """Remove all disks for an instance.
3086 This abstracts away some work from `AddInstance()` and
3087 `RemoveInstance()`. Note that in case some of the devices couldn't
3088 be removed, the removal will continue with the other ones (compare
3089 with `_CreateDisks()`).
3092 instance: the instance object
3095 True or False showing the success of the removal proces
3098 logger.Info("removing block devices for instance %s" % instance.name)
3101 for device in instance.disks:
3102 for node, disk in device.ComputeNodeTree(instance.primary_node):
3103 lu.cfg.SetDiskID(disk, node)
3104 if not rpc.call_blockdev_remove(node, disk):
3105 logger.Error("could not remove block device %s on node %s,"
3106 " continuing anyway" %
3107 (device.iv_name, node))
3110 if instance.disk_template == constants.DT_FILE:
3111 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3112 if not rpc.call_file_storage_dir_remove(instance.primary_node,
3114 logger.Error("could not remove directory '%s'" % file_storage_dir)
3120 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3121 """Compute disk size requirements in the volume group
3123 This is currently hard-coded for the two-drive layout.
3126 # Required free disk space as a function of disk and swap space
3128 constants.DT_DISKLESS: None,
3129 constants.DT_PLAIN: disk_size + swap_size,
3130 # 256 MB are added for drbd metadata, 128MB for each drbd device
3131 constants.DT_DRBD8: disk_size + swap_size + 256,
3132 constants.DT_FILE: None,
3135 if disk_template not in req_size_dict:
3136 raise errors.ProgrammerError("Disk template '%s' size requirement"
3137 " is unknown" % disk_template)
3139 return req_size_dict[disk_template]
3142 class LUCreateInstance(LogicalUnit):
3143 """Create an instance.
3146 HPATH = "instance-add"
3147 HTYPE = constants.HTYPE_INSTANCE
3148 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3149 "disk_template", "swap_size", "mode", "start", "vcpus",
3150 "wait_for_sync", "ip_check", "mac"]
3153 def _ExpandNode(self, node):
3154 """Expands and checks one node name.
3157 node_full = self.cfg.ExpandNodeName(node)
3158 if node_full is None:
3159 raise errors.OpPrereqError("Unknown node %s" % node)
3162 def ExpandNames(self):
3163 """ExpandNames for CreateInstance.
3165 Figure out the right locks for instance creation.
3168 self.needed_locks = {}
3170 # set optional parameters to none if they don't exist
3171 for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3172 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3173 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3174 "vnc_bind_address", "hypervisor"]:
3175 if not hasattr(self.op, attr):
3176 setattr(self.op, attr, None)
3178 # cheap checks, mostly valid constants given
3180 # verify creation mode
3181 if self.op.mode not in (constants.INSTANCE_CREATE,
3182 constants.INSTANCE_IMPORT):
3183 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3186 # disk template and mirror node verification
3187 if self.op.disk_template not in constants.DISK_TEMPLATES:
3188 raise errors.OpPrereqError("Invalid disk template name")
3190 if self.op.hypervisor is None:
3191 self.op.hypervisor = self.cfg.GetHypervisorType()
3193 enabled_hvs = self.cfg.GetClusterInfo().enabled_hypervisors
3194 if self.op.hypervisor not in enabled_hvs:
3195 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3196 " cluster (%s)" % (self.op.hypervisor,
3197 ",".join(enabled_hvs)))
3199 #### instance parameters check
3201 # instance name verification
3202 hostname1 = utils.HostInfo(self.op.instance_name)
3203 self.op.instance_name = instance_name = hostname1.name
3205 # this is just a preventive check, but someone might still add this
3206 # instance in the meantime, and creation will fail at lock-add time
3207 if instance_name in self.cfg.GetInstanceList():
3208 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3211 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3213 # ip validity checks
3214 ip = getattr(self.op, "ip", None)
3215 if ip is None or ip.lower() == "none":
3217 elif ip.lower() == "auto":
3218 inst_ip = hostname1.ip
3220 if not utils.IsValidIP(ip):
3221 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3222 " like a valid IP" % ip)
3224 self.inst_ip = self.op.ip = inst_ip
3225 # used in CheckPrereq for ip ping check
3226 self.check_ip = hostname1.ip
3228 # MAC address verification
3229 if self.op.mac != "auto":
3230 if not utils.IsValidMac(self.op.mac.lower()):
3231 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3234 # boot order verification
3235 if self.op.hvm_boot_order is not None:
3236 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3237 raise errors.OpPrereqError("invalid boot order specified,"
3238 " must be one or more of [acdn]")
3239 # file storage checks
3240 if (self.op.file_driver and
3241 not self.op.file_driver in constants.FILE_DRIVER):
3242 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3243 self.op.file_driver)
3245 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3246 raise errors.OpPrereqError("File storage directory path not absolute")
3248 ### Node/iallocator related checks
3249 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3250 raise errors.OpPrereqError("One and only one of iallocator and primary"
3251 " node must be given")
3253 if self.op.iallocator:
3254 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3256 self.op.pnode = self._ExpandNode(self.op.pnode)
3257 nodelist = [self.op.pnode]
3258 if self.op.snode is not None:
3259 self.op.snode = self._ExpandNode(self.op.snode)
3260 nodelist.append(self.op.snode)
3261 self.needed_locks[locking.LEVEL_NODE] = nodelist
3263 # in case of import lock the source node too
3264 if self.op.mode == constants.INSTANCE_IMPORT:
3265 src_node = getattr(self.op, "src_node", None)
3266 src_path = getattr(self.op, "src_path", None)
3268 if src_node is None or src_path is None:
3269 raise errors.OpPrereqError("Importing an instance requires source"
3270 " node and path options")
3272 if not os.path.isabs(src_path):
3273 raise errors.OpPrereqError("The source path must be absolute")
3275 self.op.src_node = src_node = self._ExpandNode(src_node)
3276 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3277 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3279 else: # INSTANCE_CREATE
3280 if getattr(self.op, "os_type", None) is None:
3281 raise errors.OpPrereqError("No guest OS specified")
3283 def _RunAllocator(self):
3284 """Run the allocator based on input opcode.
3287 disks = [{"size": self.op.disk_size, "mode": "w"},
3288 {"size": self.op.swap_size, "mode": "w"}]
3289 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3290 "bridge": self.op.bridge}]
3291 ial = IAllocator(self.cfg,
3292 mode=constants.IALLOCATOR_MODE_ALLOC,
3293 name=self.op.instance_name,
3294 disk_template=self.op.disk_template,
3297 vcpus=self.op.vcpus,
3298 mem_size=self.op.mem_size,
3303 ial.Run(self.op.iallocator)
3306 raise errors.OpPrereqError("Can't compute nodes using"
3307 " iallocator '%s': %s" % (self.op.iallocator,
3309 if len(ial.nodes) != ial.required_nodes:
3310 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3311 " of nodes (%s), required %s" %
3312 (self.op.iallocator, len(ial.nodes),
3313 ial.required_nodes))
3314 self.op.pnode = ial.nodes[0]
3315 logger.ToStdout("Selected nodes for the instance: %s" %
3316 (", ".join(ial.nodes),))
3317 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3318 (self.op.instance_name, self.op.iallocator, ial.nodes))
3319 if ial.required_nodes == 2:
3320 self.op.snode = ial.nodes[1]
3322 def BuildHooksEnv(self):
3325 This runs on master, primary and secondary nodes of the instance.
3329 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3330 "INSTANCE_DISK_SIZE": self.op.disk_size,
3331 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3332 "INSTANCE_ADD_MODE": self.op.mode,
3334 if self.op.mode == constants.INSTANCE_IMPORT:
3335 env["INSTANCE_SRC_NODE"] = self.op.src_node
3336 env["INSTANCE_SRC_PATH"] = self.op.src_path
3337 env["INSTANCE_SRC_IMAGE"] = self.src_image
3339 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3340 primary_node=self.op.pnode,
3341 secondary_nodes=self.secondaries,
3342 status=self.instance_status,
3343 os_type=self.op.os_type,
3344 memory=self.op.mem_size,
3345 vcpus=self.op.vcpus,
3346 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3349 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3354 def CheckPrereq(self):
3355 """Check prerequisites.
3358 if (not self.cfg.GetVGName() and
3359 self.op.disk_template not in constants.DTS_NOT_LVM):
3360 raise errors.OpPrereqError("Cluster does not support lvm-based"
3364 if self.op.mode == constants.INSTANCE_IMPORT:
3365 src_node = self.op.src_node
3366 src_path = self.op.src_path
3368 export_info = rpc.call_export_info(src_node, src_path)
3371 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3373 if not export_info.has_section(constants.INISECT_EXP):
3374 raise errors.ProgrammerError("Corrupted export config")
3376 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3377 if (int(ei_version) != constants.EXPORT_VERSION):
3378 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3379 (ei_version, constants.EXPORT_VERSION))
3381 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3382 raise errors.OpPrereqError("Can't import instance with more than"
3385 # FIXME: are the old os-es, disk sizes, etc. useful?
3386 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3387 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3389 self.src_image = diskimage
3391 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3393 if self.op.start and not self.op.ip_check:
3394 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3395 " adding an instance in start mode")
3397 if self.op.ip_check:
3398 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3399 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3400 (self.check_ip, self.op.instance_name))
3402 # bridge verification
3403 bridge = getattr(self.op, "bridge", None)
3405 self.op.bridge = self.cfg.GetDefBridge()
3407 self.op.bridge = bridge
3411 if self.op.iallocator is not None:
3412 self._RunAllocator()
3414 #### node related checks
3416 # check primary node
3417 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3418 assert self.pnode is not None, \
3419 "Cannot retrieve locked node %s" % self.op.pnode
3420 self.secondaries = []
3422 # mirror node verification
3423 if self.op.disk_template in constants.DTS_NET_MIRROR:
3424 if self.op.snode is None:
3425 raise errors.OpPrereqError("The networked disk templates need"
3427 if self.op.snode == pnode.name:
3428 raise errors.OpPrereqError("The secondary node cannot be"
3429 " the primary node.")
3430 self.secondaries.append(self.op.snode)
3432 req_size = _ComputeDiskSize(self.op.disk_template,
3433 self.op.disk_size, self.op.swap_size)
3435 # Check lv size requirements
3436 if req_size is not None:
3437 nodenames = [pnode.name] + self.secondaries
3438 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3440 for node in nodenames:
3441 info = nodeinfo.get(node, None)
3443 raise errors.OpPrereqError("Cannot get current information"
3444 " from node '%s'" % node)
3445 vg_free = info.get('vg_free', None)
3446 if not isinstance(vg_free, int):
3447 raise errors.OpPrereqError("Can't compute free disk space on"
3449 if req_size > info['vg_free']:
3450 raise errors.OpPrereqError("Not enough disk space on target node %s."
3451 " %d MB available, %d MB required" %
3452 (node, info['vg_free'], req_size))
3455 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3457 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3458 " primary node" % self.op.os_type)
3460 if self.op.kernel_path == constants.VALUE_NONE:
3461 raise errors.OpPrereqError("Can't set instance kernel to none")
3463 # bridge check on primary node
3464 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3465 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3466 " destination node '%s'" %
3467 (self.op.bridge, pnode.name))
3469 # memory check on primary node
3471 _CheckNodeFreeMemory(self, self.pnode.name,
3472 "creating instance %s" % self.op.instance_name,
3473 self.op.mem_size, self.op.hypervisor)
3475 # hvm_cdrom_image_path verification
3476 if self.op.hvm_cdrom_image_path is not None:
3477 # FIXME (als): shouldn't these checks happen on the destination node?
3478 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3479 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3480 " be an absolute path or None, not %s" %
3481 self.op.hvm_cdrom_image_path)
3482 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3483 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3484 " regular file or a symlink pointing to"
3485 " an existing regular file, not %s" %
3486 self.op.hvm_cdrom_image_path)
3488 # vnc_bind_address verification
3489 if self.op.vnc_bind_address is not None:
3490 if not utils.IsValidIP(self.op.vnc_bind_address):
3491 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3492 " like a valid IP address" %
3493 self.op.vnc_bind_address)
3495 # Xen HVM device type checks
3496 if self.op.hypervisor == constants.HT_XEN_HVM:
3497 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3498 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3499 " hypervisor" % self.op.hvm_nic_type)
3500 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3501 raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3502 " hypervisor" % self.op.hvm_disk_type)
3505 self.instance_status = 'up'
3507 self.instance_status = 'down'
3509 def Exec(self, feedback_fn):
3510 """Create and add the instance to the cluster.
3513 instance = self.op.instance_name
3514 pnode_name = self.pnode.name
3516 if self.op.mac == "auto":
3517 mac_address = self.cfg.GenerateMAC()
3519 mac_address = self.op.mac
3521 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3522 if self.inst_ip is not None:
3523 nic.ip = self.inst_ip
3525 ht_kind = self.op.hypervisor
3526 if ht_kind in constants.HTS_REQ_PORT:
3527 network_port = self.cfg.AllocatePort()
3531 if self.op.vnc_bind_address is None:
3532 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3534 # this is needed because os.path.join does not accept None arguments
3535 if self.op.file_storage_dir is None:
3536 string_file_storage_dir = ""
3538 string_file_storage_dir = self.op.file_storage_dir
3540 # build the full file storage dir path
3541 file_storage_dir = os.path.normpath(os.path.join(
3542 self.cfg.GetFileStorageDir(),
3543 string_file_storage_dir, instance))
3546 disks = _GenerateDiskTemplate(self,
3547 self.op.disk_template,
3548 instance, pnode_name,
3549 self.secondaries, self.op.disk_size,
3552 self.op.file_driver)
3554 iobj = objects.Instance(name=instance, os=self.op.os_type,
3555 primary_node=pnode_name,
3556 memory=self.op.mem_size,
3557 vcpus=self.op.vcpus,
3558 nics=[nic], disks=disks,
3559 disk_template=self.op.disk_template,
3560 status=self.instance_status,
3561 network_port=network_port,
3562 kernel_path=self.op.kernel_path,
3563 initrd_path=self.op.initrd_path,
3564 hvm_boot_order=self.op.hvm_boot_order,
3565 hvm_acpi=self.op.hvm_acpi,
3566 hvm_pae=self.op.hvm_pae,
3567 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3568 vnc_bind_address=self.op.vnc_bind_address,
3569 hvm_nic_type=self.op.hvm_nic_type,
3570 hvm_disk_type=self.op.hvm_disk_type,
3571 hypervisor=self.op.hypervisor,
3574 feedback_fn("* creating instance disks...")
3575 if not _CreateDisks(self, iobj):
3576 _RemoveDisks(self, iobj)
3577 self.cfg.ReleaseDRBDMinors(instance)
3578 raise errors.OpExecError("Device creation failed, reverting...")
3580 feedback_fn("adding instance %s to cluster config" % instance)
3582 self.cfg.AddInstance(iobj)
3583 # Declare that we don't want to remove the instance lock anymore, as we've
3584 # added the instance to the config
3585 del self.remove_locks[locking.LEVEL_INSTANCE]
3586 # Remove the temp. assignements for the instance's drbds
3587 self.cfg.ReleaseDRBDMinors(instance)
3589 if self.op.wait_for_sync:
3590 disk_abort = not _WaitForSync(self, iobj)
3591 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3592 # make sure the disks are not degraded (still sync-ing is ok)
3594 feedback_fn("* checking mirrors status")
3595 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3600 _RemoveDisks(self, iobj)
3601 self.cfg.RemoveInstance(iobj.name)
3602 # Make sure the instance lock gets removed
3603 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3604 raise errors.OpExecError("There are some degraded disks for"
3607 feedback_fn("creating os for instance %s on node %s" %
3608 (instance, pnode_name))
3610 if iobj.disk_template != constants.DT_DISKLESS:
3611 if self.op.mode == constants.INSTANCE_CREATE:
3612 feedback_fn("* running the instance OS create scripts...")
3613 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3614 raise errors.OpExecError("could not add os for instance %s"
3616 (instance, pnode_name))
3618 elif self.op.mode == constants.INSTANCE_IMPORT:
3619 feedback_fn("* running the instance OS import scripts...")
3620 src_node = self.op.src_node
3621 src_image = self.src_image
3622 cluster_name = self.cfg.GetClusterName()
3623 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3624 src_node, src_image, cluster_name):
3625 raise errors.OpExecError("Could not import os for instance"
3627 (instance, pnode_name))
3629 # also checked in the prereq part
3630 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3634 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3635 feedback_fn("* starting instance...")
3636 if not rpc.call_instance_start(pnode_name, iobj, None):
3637 raise errors.OpExecError("Could not start instance")
3640 class LUConnectConsole(NoHooksLU):
3641 """Connect to an instance's console.
3643 This is somewhat special in that it returns the command line that
3644 you need to run on the master node in order to connect to the
3648 _OP_REQP = ["instance_name"]
3651 def ExpandNames(self):
3652 self._ExpandAndLockInstance()
3654 def CheckPrereq(self):
3655 """Check prerequisites.
3657 This checks that the instance is in the cluster.
3660 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3661 assert self.instance is not None, \
3662 "Cannot retrieve locked instance %s" % self.op.instance_name
3664 def Exec(self, feedback_fn):
3665 """Connect to the console of an instance
3668 instance = self.instance
3669 node = instance.primary_node
3671 node_insts = rpc.call_instance_list([node],
3672 [instance.hypervisor])[node]
3673 if node_insts is False:
3674 raise errors.OpExecError("Can't connect to node %s." % node)
3676 if instance.name not in node_insts:
3677 raise errors.OpExecError("Instance %s is not running." % instance.name)
3679 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3681 hyper = hypervisor.GetHypervisor(instance.hypervisor)
3682 console_cmd = hyper.GetShellCommandForConsole(instance)
3685 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3688 class LUReplaceDisks(LogicalUnit):
3689 """Replace the disks of an instance.
3692 HPATH = "mirrors-replace"
3693 HTYPE = constants.HTYPE_INSTANCE
3694 _OP_REQP = ["instance_name", "mode", "disks"]
3697 def ExpandNames(self):
3698 self._ExpandAndLockInstance()
3700 if not hasattr(self.op, "remote_node"):
3701 self.op.remote_node = None
3703 ia_name = getattr(self.op, "iallocator", None)
3704 if ia_name is not None:
3705 if self.op.remote_node is not None:
3706 raise errors.OpPrereqError("Give either the iallocator or the new"
3707 " secondary, not both")
3708 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3709 elif self.op.remote_node is not None:
3710 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3711 if remote_node is None:
3712 raise errors.OpPrereqError("Node '%s' not known" %
3713 self.op.remote_node)
3714 self.op.remote_node = remote_node
3715 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3716 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3718 self.needed_locks[locking.LEVEL_NODE] = []
3719 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3721 def DeclareLocks(self, level):
3722 # If we're not already locking all nodes in the set we have to declare the
3723 # instance's primary/secondary nodes.
3724 if (level == locking.LEVEL_NODE and
3725 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3726 self._LockInstancesNodes()
3728 def _RunAllocator(self):
3729 """Compute a new secondary node using an IAllocator.
3732 ial = IAllocator(self.cfg,
3733 mode=constants.IALLOCATOR_MODE_RELOC,
3734 name=self.op.instance_name,
3735 relocate_from=[self.sec_node])
3737 ial.Run(self.op.iallocator)
3740 raise errors.OpPrereqError("Can't compute nodes using"
3741 " iallocator '%s': %s" % (self.op.iallocator,
3743 if len(ial.nodes) != ial.required_nodes:
3744 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3745 " of nodes (%s), required %s" %
3746 (len(ial.nodes), ial.required_nodes))
3747 self.op.remote_node = ial.nodes[0]
3748 logger.ToStdout("Selected new secondary for the instance: %s" %
3749 self.op.remote_node)
3751 def BuildHooksEnv(self):
3754 This runs on the master, the primary and all the secondaries.
3758 "MODE": self.op.mode,
3759 "NEW_SECONDARY": self.op.remote_node,
3760 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3762 env.update(_BuildInstanceHookEnvByObject(self.instance))
3764 self.cfg.GetMasterNode(),
3765 self.instance.primary_node,
3767 if self.op.remote_node is not None:
3768 nl.append(self.op.remote_node)
3771 def CheckPrereq(self):
3772 """Check prerequisites.
3774 This checks that the instance is in the cluster.
3777 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3778 assert instance is not None, \
3779 "Cannot retrieve locked instance %s" % self.op.instance_name
3780 self.instance = instance
3782 if instance.disk_template not in constants.DTS_NET_MIRROR:
3783 raise errors.OpPrereqError("Instance's disk layout is not"
3784 " network mirrored.")
3786 if len(instance.secondary_nodes) != 1:
3787 raise errors.OpPrereqError("The instance has a strange layout,"
3788 " expected one secondary but found %d" %
3789 len(instance.secondary_nodes))
3791 self.sec_node = instance.secondary_nodes[0]
3793 ia_name = getattr(self.op, "iallocator", None)
3794 if ia_name is not None:
3795 self._RunAllocator()
3797 remote_node = self.op.remote_node
3798 if remote_node is not None:
3799 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3800 assert self.remote_node_info is not None, \
3801 "Cannot retrieve locked node %s" % remote_node
3803 self.remote_node_info = None
3804 if remote_node == instance.primary_node:
3805 raise errors.OpPrereqError("The specified node is the primary node of"
3807 elif remote_node == self.sec_node:
3808 if self.op.mode == constants.REPLACE_DISK_SEC:
3809 # this is for DRBD8, where we can't execute the same mode of
3810 # replacement as for drbd7 (no different port allocated)
3811 raise errors.OpPrereqError("Same secondary given, cannot execute"
3813 if instance.disk_template == constants.DT_DRBD8:
3814 if (self.op.mode == constants.REPLACE_DISK_ALL and
3815 remote_node is not None):
3816 # switch to replace secondary mode
3817 self.op.mode = constants.REPLACE_DISK_SEC
3819 if self.op.mode == constants.REPLACE_DISK_ALL:
3820 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3821 " secondary disk replacement, not"
3823 elif self.op.mode == constants.REPLACE_DISK_PRI:
3824 if remote_node is not None:
3825 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3826 " the secondary while doing a primary"
3827 " node disk replacement")
3828 self.tgt_node = instance.primary_node
3829 self.oth_node = instance.secondary_nodes[0]
3830 elif self.op.mode == constants.REPLACE_DISK_SEC:
3831 self.new_node = remote_node # this can be None, in which case
3832 # we don't change the secondary
3833 self.tgt_node = instance.secondary_nodes[0]
3834 self.oth_node = instance.primary_node
3836 raise errors.ProgrammerError("Unhandled disk replace mode")
3838 for name in self.op.disks:
3839 if instance.FindDisk(name) is None:
3840 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3841 (name, instance.name))
3843 def _ExecD8DiskOnly(self, feedback_fn):
3844 """Replace a disk on the primary or secondary for dbrd8.
3846 The algorithm for replace is quite complicated:
3847 - for each disk to be replaced:
3848 - create new LVs on the target node with unique names
3849 - detach old LVs from the drbd device
3850 - rename old LVs to name_replaced.<time_t>
3851 - rename new LVs to old LVs
3852 - attach the new LVs (with the old names now) to the drbd device
3853 - wait for sync across all devices
3854 - for each modified disk:
3855 - remove old LVs (which have the name name_replaces.<time_t>)
3857 Failures are not very well handled.
3861 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3862 instance = self.instance
3864 vgname = self.cfg.GetVGName()
3867 tgt_node = self.tgt_node
3868 oth_node = self.oth_node
3870 # Step: check device activation
3871 self.proc.LogStep(1, steps_total, "check device existence")
3872 info("checking volume groups")
3873 my_vg = cfg.GetVGName()
3874 results = rpc.call_vg_list([oth_node, tgt_node])
3876 raise errors.OpExecError("Can't list volume groups on the nodes")
3877 for node in oth_node, tgt_node:
3878 res = results.get(node, False)
3879 if not res or my_vg not in res:
3880 raise errors.OpExecError("Volume group '%s' not found on %s" %
3882 for dev in instance.disks:
3883 if not dev.iv_name in self.op.disks:
3885 for node in tgt_node, oth_node:
3886 info("checking %s on %s" % (dev.iv_name, node))
3887 cfg.SetDiskID(dev, node)
3888 if not rpc.call_blockdev_find(node, dev):
3889 raise errors.OpExecError("Can't find device %s on node %s" %
3890 (dev.iv_name, node))
3892 # Step: check other node consistency
3893 self.proc.LogStep(2, steps_total, "check peer consistency")
3894 for dev in instance.disks:
3895 if not dev.iv_name in self.op.disks:
3897 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3898 if not _CheckDiskConsistency(self, dev, oth_node,
3899 oth_node==instance.primary_node):
3900 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3901 " to replace disks on this node (%s)" %
3902 (oth_node, tgt_node))
3904 # Step: create new storage
3905 self.proc.LogStep(3, steps_total, "allocate new storage")
3906 for dev in instance.disks:
3907 if not dev.iv_name in self.op.disks:
3910 cfg.SetDiskID(dev, tgt_node)
3911 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3912 names = _GenerateUniqueNames(self, lv_names)
3913 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3914 logical_id=(vgname, names[0]))
3915 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3916 logical_id=(vgname, names[1]))
3917 new_lvs = [lv_data, lv_meta]
3918 old_lvs = dev.children
3919 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3920 info("creating new local storage on %s for %s" %
3921 (tgt_node, dev.iv_name))
3922 # since we *always* want to create this LV, we use the
3923 # _Create...OnPrimary (which forces the creation), even if we
3924 # are talking about the secondary node
3925 for new_lv in new_lvs:
3926 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3927 _GetInstanceInfoText(instance)):
3928 raise errors.OpExecError("Failed to create new LV named '%s' on"
3930 (new_lv.logical_id[1], tgt_node))
3932 # Step: for each lv, detach+rename*2+attach
3933 self.proc.LogStep(4, steps_total, "change drbd configuration")
3934 for dev, old_lvs, new_lvs in iv_names.itervalues():
3935 info("detaching %s drbd from local storage" % dev.iv_name)
3936 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3937 raise errors.OpExecError("Can't detach drbd from local storage on node"
3938 " %s for device %s" % (tgt_node, dev.iv_name))
3940 #cfg.Update(instance)
3942 # ok, we created the new LVs, so now we know we have the needed
3943 # storage; as such, we proceed on the target node to rename
3944 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3945 # using the assumption that logical_id == physical_id (which in
3946 # turn is the unique_id on that node)
3948 # FIXME(iustin): use a better name for the replaced LVs
3949 temp_suffix = int(time.time())
3950 ren_fn = lambda d, suff: (d.physical_id[0],
3951 d.physical_id[1] + "_replaced-%s" % suff)
3952 # build the rename list based on what LVs exist on the node
3954 for to_ren in old_lvs:
3955 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3956 if find_res is not None: # device exists
3957 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3959 info("renaming the old LVs on the target node")
3960 if not rpc.call_blockdev_rename(tgt_node, rlist):
3961 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3962 # now we rename the new LVs to the old LVs
3963 info("renaming the new LVs on the target node")
3964 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3965 if not rpc.call_blockdev_rename(tgt_node, rlist):
3966 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3968 for old, new in zip(old_lvs, new_lvs):
3969 new.logical_id = old.logical_id
3970 cfg.SetDiskID(new, tgt_node)
3972 for disk in old_lvs:
3973 disk.logical_id = ren_fn(disk, temp_suffix)
3974 cfg.SetDiskID(disk, tgt_node)
3976 # now that the new lvs have the old name, we can add them to the device
3977 info("adding new mirror component on %s" % tgt_node)
3978 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3979 for new_lv in new_lvs:
3980 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3981 warning("Can't rollback device %s", hint="manually cleanup unused"
3983 raise errors.OpExecError("Can't add local storage to drbd")
3985 dev.children = new_lvs
3986 cfg.Update(instance)
3988 # Step: wait for sync
3990 # this can fail as the old devices are degraded and _WaitForSync
3991 # does a combined result over all disks, so we don't check its
3993 self.proc.LogStep(5, steps_total, "sync devices")
3994 _WaitForSync(self, instance, unlock=True)
3996 # so check manually all the devices
3997 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3998 cfg.SetDiskID(dev, instance.primary_node)
3999 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4001 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4003 # Step: remove old storage
4004 self.proc.LogStep(6, steps_total, "removing old storage")
4005 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4006 info("remove logical volumes for %s" % name)
4008 cfg.SetDiskID(lv, tgt_node)
4009 if not rpc.call_blockdev_remove(tgt_node, lv):
4010 warning("Can't remove old LV", hint="manually remove unused LVs")
4013 def _ExecD8Secondary(self, feedback_fn):
4014 """Replace the secondary node for drbd8.
4016 The algorithm for replace is quite complicated:
4017 - for all disks of the instance:
4018 - create new LVs on the new node with same names
4019 - shutdown the drbd device on the old secondary
4020 - disconnect the drbd network on the primary
4021 - create the drbd device on the new secondary
4022 - network attach the drbd on the primary, using an artifice:
4023 the drbd code for Attach() will connect to the network if it
4024 finds a device which is connected to the good local disks but
4026 - wait for sync across all devices
4027 - remove all disks from the old secondary
4029 Failures are not very well handled.
4033 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4034 instance = self.instance
4036 vgname = self.cfg.GetVGName()
4039 old_node = self.tgt_node
4040 new_node = self.new_node
4041 pri_node = instance.primary_node
4043 # Step: check device activation
4044 self.proc.LogStep(1, steps_total, "check device existence")
4045 info("checking volume groups")
4046 my_vg = cfg.GetVGName()
4047 results = rpc.call_vg_list([pri_node, new_node])
4049 raise errors.OpExecError("Can't list volume groups on the nodes")
4050 for node in pri_node, new_node:
4051 res = results.get(node, False)
4052 if not res or my_vg not in res:
4053 raise errors.OpExecError("Volume group '%s' not found on %s" %
4055 for dev in instance.disks:
4056 if not dev.iv_name in self.op.disks:
4058 info("checking %s on %s" % (dev.iv_name, pri_node))
4059 cfg.SetDiskID(dev, pri_node)
4060 if not rpc.call_blockdev_find(pri_node, dev):
4061 raise errors.OpExecError("Can't find device %s on node %s" %
4062 (dev.iv_name, pri_node))
4064 # Step: check other node consistency
4065 self.proc.LogStep(2, steps_total, "check peer consistency")
4066 for dev in instance.disks:
4067 if not dev.iv_name in self.op.disks:
4069 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4070 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4071 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4072 " unsafe to replace the secondary" %
4075 # Step: create new storage
4076 self.proc.LogStep(3, steps_total, "allocate new storage")
4077 for dev in instance.disks:
4079 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4080 # since we *always* want to create this LV, we use the
4081 # _Create...OnPrimary (which forces the creation), even if we
4082 # are talking about the secondary node
4083 for new_lv in dev.children:
4084 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4085 _GetInstanceInfoText(instance)):
4086 raise errors.OpExecError("Failed to create new LV named '%s' on"
4088 (new_lv.logical_id[1], new_node))
4091 # Step 4: dbrd minors and drbd setups changes
4092 # after this, we must manually remove the drbd minors on both the
4093 # error and the success paths
4094 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4096 logging.debug("Allocated minors %s" % (minors,))
4097 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4098 for dev, new_minor in zip(instance.disks, minors):
4100 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4101 # create new devices on new_node
4102 if pri_node == dev.logical_id[0]:
4103 new_logical_id = (pri_node, new_node,
4104 dev.logical_id[2], dev.logical_id[3], new_minor,
4107 new_logical_id = (new_node, pri_node,
4108 dev.logical_id[2], new_minor, dev.logical_id[4],
4110 iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4111 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4113 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4114 logical_id=new_logical_id,
4115 children=dev.children)
4116 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4118 _GetInstanceInfoText(instance)):
4119 self.cfg.ReleaseDRBDMinors(instance.name)
4120 raise errors.OpExecError("Failed to create new DRBD on"
4121 " node '%s'" % new_node)
4123 for dev in instance.disks:
4124 # we have new devices, shutdown the drbd on the old secondary
4125 info("shutting down drbd for %s on old node" % dev.iv_name)
4126 cfg.SetDiskID(dev, old_node)
4127 if not rpc.call_blockdev_shutdown(old_node, dev):
4128 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4129 hint="Please cleanup this device manually as soon as possible")
4131 info("detaching primary drbds from the network (=> standalone)")
4133 for dev in instance.disks:
4134 cfg.SetDiskID(dev, pri_node)
4135 # set the network part of the physical (unique in bdev terms) id
4136 # to None, meaning detach from network
4137 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4138 # and 'find' the device, which will 'fix' it to match the
4140 if rpc.call_blockdev_find(pri_node, dev):
4143 warning("Failed to detach drbd %s from network, unusual case" %
4147 # no detaches succeeded (very unlikely)
4148 self.cfg.ReleaseDRBDMinors(instance.name)
4149 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4151 # if we managed to detach at least one, we update all the disks of
4152 # the instance to point to the new secondary
4153 info("updating instance configuration")
4154 for dev, _, new_logical_id in iv_names.itervalues():
4155 dev.logical_id = new_logical_id
4156 cfg.SetDiskID(dev, pri_node)
4157 cfg.Update(instance)
4158 # we can remove now the temp minors as now the new values are
4159 # written to the config file (and therefore stable)
4160 self.cfg.ReleaseDRBDMinors(instance.name)
4162 # and now perform the drbd attach
4163 info("attaching primary drbds to new secondary (standalone => connected)")
4165 for dev in instance.disks:
4166 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4167 # since the attach is smart, it's enough to 'find' the device,
4168 # it will automatically activate the network, if the physical_id
4170 cfg.SetDiskID(dev, pri_node)
4171 logging.debug("Disk to attach: %s", dev)
4172 if not rpc.call_blockdev_find(pri_node, dev):
4173 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4174 "please do a gnt-instance info to see the status of disks")
4176 # this can fail as the old devices are degraded and _WaitForSync
4177 # does a combined result over all disks, so we don't check its
4179 self.proc.LogStep(5, steps_total, "sync devices")
4180 _WaitForSync(self, instance, unlock=True)
4182 # so check manually all the devices
4183 for name, (dev, old_lvs, _) in iv_names.iteritems():
4184 cfg.SetDiskID(dev, pri_node)
4185 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4187 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4189 self.proc.LogStep(6, steps_total, "removing old storage")
4190 for name, (dev, old_lvs, _) in iv_names.iteritems():
4191 info("remove logical volumes for %s" % name)
4193 cfg.SetDiskID(lv, old_node)
4194 if not rpc.call_blockdev_remove(old_node, lv):
4195 warning("Can't remove LV on old secondary",
4196 hint="Cleanup stale volumes by hand")
4198 def Exec(self, feedback_fn):
4199 """Execute disk replacement.
4201 This dispatches the disk replacement to the appropriate handler.
4204 instance = self.instance
4206 # Activate the instance disks if we're replacing them on a down instance
4207 if instance.status == "down":
4208 _StartInstanceDisks(self, instance, True)
4210 if instance.disk_template == constants.DT_DRBD8:
4211 if self.op.remote_node is None:
4212 fn = self._ExecD8DiskOnly
4214 fn = self._ExecD8Secondary
4216 raise errors.ProgrammerError("Unhandled disk replacement case")
4218 ret = fn(feedback_fn)
4220 # Deactivate the instance disks if we're replacing them on a down instance
4221 if instance.status == "down":
4222 _SafeShutdownInstanceDisks(self, instance)
4227 class LUGrowDisk(LogicalUnit):
4228 """Grow a disk of an instance.
4232 HTYPE = constants.HTYPE_INSTANCE
4233 _OP_REQP = ["instance_name", "disk", "amount"]
4236 def ExpandNames(self):
4237 self._ExpandAndLockInstance()
4238 self.needed_locks[locking.LEVEL_NODE] = []
4239 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4241 def DeclareLocks(self, level):
4242 if level == locking.LEVEL_NODE:
4243 self._LockInstancesNodes()
4245 def BuildHooksEnv(self):
4248 This runs on the master, the primary and all the secondaries.
4252 "DISK": self.op.disk,
4253 "AMOUNT": self.op.amount,
4255 env.update(_BuildInstanceHookEnvByObject(self.instance))
4257 self.cfg.GetMasterNode(),
4258 self.instance.primary_node,
4262 def CheckPrereq(self):
4263 """Check prerequisites.
4265 This checks that the instance is in the cluster.
4268 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4269 assert instance is not None, \
4270 "Cannot retrieve locked instance %s" % self.op.instance_name
4272 self.instance = instance
4274 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4275 raise errors.OpPrereqError("Instance's disk layout does not support"
4278 if instance.FindDisk(self.op.disk) is None:
4279 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4280 (self.op.disk, instance.name))
4282 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4283 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4284 instance.hypervisor)
4285 for node in nodenames:
4286 info = nodeinfo.get(node, None)
4288 raise errors.OpPrereqError("Cannot get current information"
4289 " from node '%s'" % node)
4290 vg_free = info.get('vg_free', None)
4291 if not isinstance(vg_free, int):
4292 raise errors.OpPrereqError("Can't compute free disk space on"
4294 if self.op.amount > info['vg_free']:
4295 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4296 " %d MiB available, %d MiB required" %
4297 (node, info['vg_free'], self.op.amount))
4299 def Exec(self, feedback_fn):
4300 """Execute disk grow.
4303 instance = self.instance
4304 disk = instance.FindDisk(self.op.disk)
4305 for node in (instance.secondary_nodes + (instance.primary_node,)):
4306 self.cfg.SetDiskID(disk, node)
4307 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4308 if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4309 raise errors.OpExecError("grow request failed to node %s" % node)
4311 raise errors.OpExecError("grow request failed to node %s: %s" %
4313 disk.RecordGrow(self.op.amount)
4314 self.cfg.Update(instance)
4318 class LUQueryInstanceData(NoHooksLU):
4319 """Query runtime instance data.
4322 _OP_REQP = ["instances"]
4325 def ExpandNames(self):
4326 self.needed_locks = {}
4327 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4329 if not isinstance(self.op.instances, list):
4330 raise errors.OpPrereqError("Invalid argument type 'instances'")
4332 if self.op.instances:
4333 self.wanted_names = []
4334 for name in self.op.instances:
4335 full_name = self.cfg.ExpandInstanceName(name)
4336 if full_name is None:
4337 raise errors.OpPrereqError("Instance '%s' not known" %
4338 self.op.instance_name)
4339 self.wanted_names.append(full_name)
4340 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4342 self.wanted_names = None
4343 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4345 self.needed_locks[locking.LEVEL_NODE] = []
4346 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4348 def DeclareLocks(self, level):
4349 if level == locking.LEVEL_NODE:
4350 self._LockInstancesNodes()
4352 def CheckPrereq(self):
4353 """Check prerequisites.
4355 This only checks the optional instance list against the existing names.
4358 if self.wanted_names is None:
4359 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4361 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4362 in self.wanted_names]
4365 def _ComputeDiskStatus(self, instance, snode, dev):
4366 """Compute block device status.
4369 self.cfg.SetDiskID(dev, instance.primary_node)
4370 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4371 if dev.dev_type in constants.LDS_DRBD:
4372 # we change the snode then (otherwise we use the one passed in)
4373 if dev.logical_id[0] == instance.primary_node:
4374 snode = dev.logical_id[1]
4376 snode = dev.logical_id[0]
4379 self.cfg.SetDiskID(dev, snode)
4380 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4385 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4386 for child in dev.children]
4391 "iv_name": dev.iv_name,
4392 "dev_type": dev.dev_type,
4393 "logical_id": dev.logical_id,
4394 "physical_id": dev.physical_id,
4395 "pstatus": dev_pstatus,
4396 "sstatus": dev_sstatus,
4397 "children": dev_children,
4402 def Exec(self, feedback_fn):
4403 """Gather and return data"""
4405 for instance in self.wanted_instances:
4406 remote_info = rpc.call_instance_info(instance.primary_node,
4408 instance.hypervisor)
4409 if remote_info and "state" in remote_info:
4412 remote_state = "down"
4413 if instance.status == "down":
4414 config_state = "down"
4418 disks = [self._ComputeDiskStatus(instance, None, device)
4419 for device in instance.disks]
4422 "name": instance.name,
4423 "config_state": config_state,
4424 "run_state": remote_state,
4425 "pnode": instance.primary_node,
4426 "snodes": instance.secondary_nodes,
4428 "memory": instance.memory,
4429 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4431 "vcpus": instance.vcpus,
4432 "hypervisor": instance.hypervisor,
4435 htkind = instance.hypervisor
4436 if htkind == constants.HT_XEN_PVM:
4437 idict["kernel_path"] = instance.kernel_path
4438 idict["initrd_path"] = instance.initrd_path
4440 if htkind == constants.HT_XEN_HVM:
4441 idict["hvm_boot_order"] = instance.hvm_boot_order
4442 idict["hvm_acpi"] = instance.hvm_acpi
4443 idict["hvm_pae"] = instance.hvm_pae
4444 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4445 idict["hvm_nic_type"] = instance.hvm_nic_type
4446 idict["hvm_disk_type"] = instance.hvm_disk_type
4448 if htkind in constants.HTS_REQ_PORT:
4449 if instance.vnc_bind_address is None:
4450 vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4452 vnc_bind_address = instance.vnc_bind_address
4453 if instance.network_port is None:
4454 vnc_console_port = None
4455 elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4456 vnc_console_port = "%s:%s" % (instance.primary_node,
4457 instance.network_port)
4458 elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4459 vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4460 instance.network_port,
4461 instance.primary_node)
4463 vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4464 instance.network_port)
4465 idict["vnc_console_port"] = vnc_console_port
4466 idict["vnc_bind_address"] = vnc_bind_address
4467 idict["network_port"] = instance.network_port
4469 result[instance.name] = idict
4474 class LUSetInstanceParams(LogicalUnit):
4475 """Modifies an instances's parameters.
4478 HPATH = "instance-modify"
4479 HTYPE = constants.HTYPE_INSTANCE
4480 _OP_REQP = ["instance_name"]
4483 def ExpandNames(self):
4484 self._ExpandAndLockInstance()
4486 def BuildHooksEnv(self):
4489 This runs on the master, primary and secondaries.
4494 args['memory'] = self.mem
4496 args['vcpus'] = self.vcpus
4497 if self.do_ip or self.do_bridge or self.mac:
4501 ip = self.instance.nics[0].ip
4503 bridge = self.bridge
4505 bridge = self.instance.nics[0].bridge
4509 mac = self.instance.nics[0].mac
4510 args['nics'] = [(ip, bridge, mac)]
4511 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4512 nl = [self.cfg.GetMasterNode(),
4513 self.instance.primary_node] + list(self.instance.secondary_nodes)
4516 def CheckPrereq(self):
4517 """Check prerequisites.
4519 This only checks the instance list against the existing names.
4522 # FIXME: all the parameters could be checked before, in ExpandNames, or in
4523 # a separate CheckArguments function, if we implement one, so the operation
4524 # can be aborted without waiting for any lock, should it have an error...
4525 self.mem = getattr(self.op, "mem", None)
4526 self.vcpus = getattr(self.op, "vcpus", None)
4527 self.ip = getattr(self.op, "ip", None)
4528 self.mac = getattr(self.op, "mac", None)
4529 self.bridge = getattr(self.op, "bridge", None)
4530 self.kernel_path = getattr(self.op, "kernel_path", None)
4531 self.initrd_path = getattr(self.op, "initrd_path", None)
4532 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4533 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4534 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4535 self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4536 self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4537 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4538 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4539 self.force = getattr(self.op, "force", None)
4540 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4541 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4542 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4543 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4544 if all_parms.count(None) == len(all_parms):
4545 raise errors.OpPrereqError("No changes submitted")
4546 if self.mem is not None:
4548 self.mem = int(self.mem)
4549 except ValueError, err:
4550 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4551 if self.vcpus is not None:
4553 self.vcpus = int(self.vcpus)
4554 except ValueError, err:
4555 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4556 if self.ip is not None:
4558 if self.ip.lower() == "none":
4561 if not utils.IsValidIP(self.ip):
4562 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4565 self.do_bridge = (self.bridge is not None)
4566 if self.mac is not None:
4567 if self.cfg.IsMacInUse(self.mac):
4568 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4570 if not utils.IsValidMac(self.mac):
4571 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4573 if self.kernel_path is not None:
4574 self.do_kernel_path = True
4575 if self.kernel_path == constants.VALUE_NONE:
4576 raise errors.OpPrereqError("Can't set instance to no kernel")
4578 if self.kernel_path != constants.VALUE_DEFAULT:
4579 if not os.path.isabs(self.kernel_path):
4580 raise errors.OpPrereqError("The kernel path must be an absolute"
4583 self.do_kernel_path = False
4585 if self.initrd_path is not None:
4586 self.do_initrd_path = True
4587 if self.initrd_path not in (constants.VALUE_NONE,
4588 constants.VALUE_DEFAULT):
4589 if not os.path.isabs(self.initrd_path):
4590 raise errors.OpPrereqError("The initrd path must be an absolute"
4593 self.do_initrd_path = False
4595 # boot order verification
4596 if self.hvm_boot_order is not None:
4597 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4598 if len(self.hvm_boot_order.strip("acdn")) != 0:
4599 raise errors.OpPrereqError("invalid boot order specified,"
4600 " must be one or more of [acdn]"
4603 # hvm_cdrom_image_path verification
4604 if self.op.hvm_cdrom_image_path is not None:
4605 if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4606 self.op.hvm_cdrom_image_path.lower() == "none"):
4607 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4608 " be an absolute path or None, not %s" %
4609 self.op.hvm_cdrom_image_path)
4610 if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4611 self.op.hvm_cdrom_image_path.lower() == "none"):
4612 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4613 " regular file or a symlink pointing to"
4614 " an existing regular file, not %s" %
4615 self.op.hvm_cdrom_image_path)
4617 # vnc_bind_address verification
4618 if self.op.vnc_bind_address is not None:
4619 if not utils.IsValidIP(self.op.vnc_bind_address):
4620 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4621 " like a valid IP address" %
4622 self.op.vnc_bind_address)
4624 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4625 assert self.instance is not None, \
4626 "Cannot retrieve locked instance %s" % self.op.instance_name
4628 if self.mem is not None and not self.force:
4629 pnode = self.instance.primary_node
4631 nodelist.extend(instance.secondary_nodes)
4632 instance_info = rpc.call_instance_info(pnode, instance.name,
4633 instance.hypervisor)
4634 nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName(),
4635 instance.hypervisor)
4637 if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4638 # Assume the primary node is unreachable and go ahead
4639 self.warn.append("Can't get info from primary node %s" % pnode)
4642 current_mem = instance_info['memory']
4644 # Assume instance not running
4645 # (there is a slight race condition here, but it's not very probable,
4646 # and we have no other way to check)
4648 miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4650 raise errors.OpPrereqError("This change will prevent the instance"
4651 " from starting, due to %d MB of memory"
4652 " missing on its primary node" % miss_mem)
4654 for node in instance.secondary_nodes:
4655 if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4656 self.warn.append("Can't get info from secondary node %s" % node)
4657 elif self.mem > nodeinfo[node]['memory_free']:
4658 self.warn.append("Not enough memory to failover instance to secondary"
4661 # Xen HVM device type checks
4662 if instance.hypervisor == constants.HT_XEN_HVM:
4663 if self.op.hvm_nic_type is not None:
4664 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4665 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4666 " HVM hypervisor" % self.op.hvm_nic_type)
4667 if self.op.hvm_disk_type is not None:
4668 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4669 raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4670 " HVM hypervisor" % self.op.hvm_disk_type)
4674 def Exec(self, feedback_fn):
4675 """Modifies an instance.
4677 All parameters take effect only at the next restart of the instance.
4679 # Process here the warnings from CheckPrereq, as we don't have a
4680 # feedback_fn there.
4681 for warn in self.warn:
4682 feedback_fn("WARNING: %s" % warn)
4685 instance = self.instance
4687 instance.memory = self.mem
4688 result.append(("mem", self.mem))
4690 instance.vcpus = self.vcpus
4691 result.append(("vcpus", self.vcpus))
4693 instance.nics[0].ip = self.ip
4694 result.append(("ip", self.ip))
4696 instance.nics[0].bridge = self.bridge
4697 result.append(("bridge", self.bridge))
4699 instance.nics[0].mac = self.mac
4700 result.append(("mac", self.mac))
4701 if self.do_kernel_path:
4702 instance.kernel_path = self.kernel_path
4703 result.append(("kernel_path", self.kernel_path))
4704 if self.do_initrd_path:
4705 instance.initrd_path = self.initrd_path
4706 result.append(("initrd_path", self.initrd_path))
4707 if self.hvm_boot_order:
4708 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4709 instance.hvm_boot_order = None
4711 instance.hvm_boot_order = self.hvm_boot_order
4712 result.append(("hvm_boot_order", self.hvm_boot_order))
4713 if self.hvm_acpi is not None:
4714 instance.hvm_acpi = self.hvm_acpi
4715 result.append(("hvm_acpi", self.hvm_acpi))
4716 if self.hvm_pae is not None:
4717 instance.hvm_pae = self.hvm_pae
4718 result.append(("hvm_pae", self.hvm_pae))
4719 if self.hvm_nic_type is not None:
4720 instance.hvm_nic_type = self.hvm_nic_type
4721 result.append(("hvm_nic_type", self.hvm_nic_type))
4722 if self.hvm_disk_type is not None:
4723 instance.hvm_disk_type = self.hvm_disk_type
4724 result.append(("hvm_disk_type", self.hvm_disk_type))
4725 if self.hvm_cdrom_image_path:
4726 if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4727 instance.hvm_cdrom_image_path = None
4729 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4730 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4731 if self.vnc_bind_address:
4732 instance.vnc_bind_address = self.vnc_bind_address
4733 result.append(("vnc_bind_address", self.vnc_bind_address))
4735 self.cfg.Update(instance)
4740 class LUQueryExports(NoHooksLU):
4741 """Query the exports list
4744 _OP_REQP = ['nodes']
4747 def ExpandNames(self):
4748 self.needed_locks = {}
4749 self.share_locks[locking.LEVEL_NODE] = 1
4750 if not self.op.nodes:
4751 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4753 self.needed_locks[locking.LEVEL_NODE] = \
4754 _GetWantedNodes(self, self.op.nodes)
4756 def CheckPrereq(self):
4757 """Check prerequisites.
4760 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4762 def Exec(self, feedback_fn):
4763 """Compute the list of all the exported system images.
4766 a dictionary with the structure node->(export-list)
4767 where export-list is a list of the instances exported on
4771 return rpc.call_export_list(self.nodes)
4774 class LUExportInstance(LogicalUnit):
4775 """Export an instance to an image in the cluster.
4778 HPATH = "instance-export"
4779 HTYPE = constants.HTYPE_INSTANCE
4780 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4783 def ExpandNames(self):
4784 self._ExpandAndLockInstance()
4785 # FIXME: lock only instance primary and destination node
4787 # Sad but true, for now we have do lock all nodes, as we don't know where
4788 # the previous export might be, and and in this LU we search for it and
4789 # remove it from its current node. In the future we could fix this by:
4790 # - making a tasklet to search (share-lock all), then create the new one,
4791 # then one to remove, after
4792 # - removing the removal operation altoghether
4793 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4795 def DeclareLocks(self, level):
4796 """Last minute lock declaration."""
4797 # All nodes are locked anyway, so nothing to do here.
4799 def BuildHooksEnv(self):
4802 This will run on the master, primary node and target node.
4806 "EXPORT_NODE": self.op.target_node,
4807 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4809 env.update(_BuildInstanceHookEnvByObject(self.instance))
4810 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4811 self.op.target_node]
4814 def CheckPrereq(self):
4815 """Check prerequisites.
4817 This checks that the instance and node names are valid.
4820 instance_name = self.op.instance_name
4821 self.instance = self.cfg.GetInstanceInfo(instance_name)
4822 assert self.instance is not None, \
4823 "Cannot retrieve locked instance %s" % self.op.instance_name
4825 self.dst_node = self.cfg.GetNodeInfo(
4826 self.cfg.ExpandNodeName(self.op.target_node))
4828 assert self.dst_node is not None, \
4829 "Cannot retrieve locked node %s" % self.op.target_node
4831 # instance disk type verification
4832 for disk in self.instance.disks:
4833 if disk.dev_type == constants.LD_FILE:
4834 raise errors.OpPrereqError("Export not supported for instances with"
4835 " file-based disks")
4837 def Exec(self, feedback_fn):
4838 """Export an instance to an image in the cluster.
4841 instance = self.instance
4842 dst_node = self.dst_node
4843 src_node = instance.primary_node
4844 if self.op.shutdown:
4845 # shutdown the instance, but not the disks
4846 if not rpc.call_instance_shutdown(src_node, instance):
4847 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4848 (instance.name, src_node))
4850 vgname = self.cfg.GetVGName()
4855 for disk in instance.disks:
4856 if disk.iv_name == "sda":
4857 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4858 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4860 if not new_dev_name:
4861 logger.Error("could not snapshot block device %s on node %s" %
4862 (disk.logical_id[1], src_node))
4864 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4865 logical_id=(vgname, new_dev_name),
4866 physical_id=(vgname, new_dev_name),
4867 iv_name=disk.iv_name)
4868 snap_disks.append(new_dev)
4871 if self.op.shutdown and instance.status == "up":
4872 if not rpc.call_instance_start(src_node, instance, None):
4873 _ShutdownInstanceDisks(self, instance)
4874 raise errors.OpExecError("Could not start instance")
4876 # TODO: check for size
4878 cluster_name = self.cfg.GetClusterName()
4879 for dev in snap_disks:
4880 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4881 instance, cluster_name):
4882 logger.Error("could not export block device %s from node %s to node %s"
4883 % (dev.logical_id[1], src_node, dst_node.name))
4884 if not rpc.call_blockdev_remove(src_node, dev):
4885 logger.Error("could not remove snapshot block device %s from node %s" %
4886 (dev.logical_id[1], src_node))
4888 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4889 logger.Error("could not finalize export for instance %s on node %s" %
4890 (instance.name, dst_node.name))
4892 nodelist = self.cfg.GetNodeList()
4893 nodelist.remove(dst_node.name)
4895 # on one-node clusters nodelist will be empty after the removal
4896 # if we proceed the backup would be removed because OpQueryExports
4897 # substitutes an empty list with the full cluster node list.
4899 exportlist = rpc.call_export_list(nodelist)
4900 for node in exportlist:
4901 if instance.name in exportlist[node]:
4902 if not rpc.call_export_remove(node, instance.name):
4903 logger.Error("could not remove older export for instance %s"
4904 " on node %s" % (instance.name, node))
4907 class LURemoveExport(NoHooksLU):
4908 """Remove exports related to the named instance.
4911 _OP_REQP = ["instance_name"]
4914 def ExpandNames(self):
4915 self.needed_locks = {}
4916 # We need all nodes to be locked in order for RemoveExport to work, but we
4917 # don't need to lock the instance itself, as nothing will happen to it (and
4918 # we can remove exports also for a removed instance)
4919 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4921 def CheckPrereq(self):
4922 """Check prerequisites.
4926 def Exec(self, feedback_fn):
4927 """Remove any export.
4930 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4931 # If the instance was not found we'll try with the name that was passed in.
4932 # This will only work if it was an FQDN, though.
4934 if not instance_name:
4936 instance_name = self.op.instance_name
4938 exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4940 for node in exportlist:
4941 if instance_name in exportlist[node]:
4943 if not rpc.call_export_remove(node, instance_name):
4944 logger.Error("could not remove export for instance %s"
4945 " on node %s" % (instance_name, node))
4947 if fqdn_warn and not found:
4948 feedback_fn("Export not found. If trying to remove an export belonging"
4949 " to a deleted instance please use its Fully Qualified"
4953 class TagsLU(NoHooksLU):
4956 This is an abstract class which is the parent of all the other tags LUs.
4960 def ExpandNames(self):
4961 self.needed_locks = {}
4962 if self.op.kind == constants.TAG_NODE:
4963 name = self.cfg.ExpandNodeName(self.op.name)
4965 raise errors.OpPrereqError("Invalid node name (%s)" %
4968 self.needed_locks[locking.LEVEL_NODE] = name
4969 elif self.op.kind == constants.TAG_INSTANCE:
4970 name = self.cfg.ExpandInstanceName(self.op.name)
4972 raise errors.OpPrereqError("Invalid instance name (%s)" %
4975 self.needed_locks[locking.LEVEL_INSTANCE] = name
4977 def CheckPrereq(self):
4978 """Check prerequisites.
4981 if self.op.kind == constants.TAG_CLUSTER:
4982 self.target = self.cfg.GetClusterInfo()
4983 elif self.op.kind == constants.TAG_NODE:
4984 self.target = self.cfg.GetNodeInfo(self.op.name)
4985 elif self.op.kind == constants.TAG_INSTANCE:
4986 self.target = self.cfg.GetInstanceInfo(self.op.name)
4988 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4992 class LUGetTags(TagsLU):
4993 """Returns the tags of a given object.
4996 _OP_REQP = ["kind", "name"]
4999 def Exec(self, feedback_fn):
5000 """Returns the tag list.
5003 return list(self.target.GetTags())
5006 class LUSearchTags(NoHooksLU):
5007 """Searches the tags for a given pattern.
5010 _OP_REQP = ["pattern"]
5013 def ExpandNames(self):
5014 self.needed_locks = {}
5016 def CheckPrereq(self):
5017 """Check prerequisites.
5019 This checks the pattern passed for validity by compiling it.
5023 self.re = re.compile(self.op.pattern)
5024 except re.error, err:
5025 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5026 (self.op.pattern, err))
5028 def Exec(self, feedback_fn):
5029 """Returns the tag list.
5033 tgts = [("/cluster", cfg.GetClusterInfo())]
5034 ilist = cfg.GetAllInstancesInfo().values()
5035 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5036 nlist = cfg.GetAllNodesInfo().values()
5037 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5039 for path, target in tgts:
5040 for tag in target.GetTags():
5041 if self.re.search(tag):
5042 results.append((path, tag))
5046 class LUAddTags(TagsLU):
5047 """Sets a tag on a given object.
5050 _OP_REQP = ["kind", "name", "tags"]
5053 def CheckPrereq(self):
5054 """Check prerequisites.
5056 This checks the type and length of the tag name and value.
5059 TagsLU.CheckPrereq(self)
5060 for tag in self.op.tags:
5061 objects.TaggableObject.ValidateTag(tag)
5063 def Exec(self, feedback_fn):
5068 for tag in self.op.tags:
5069 self.target.AddTag(tag)
5070 except errors.TagError, err:
5071 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5073 self.cfg.Update(self.target)
5074 except errors.ConfigurationError:
5075 raise errors.OpRetryError("There has been a modification to the"
5076 " config file and the operation has been"
5077 " aborted. Please retry.")
5080 class LUDelTags(TagsLU):
5081 """Delete a list of tags from a given object.
5084 _OP_REQP = ["kind", "name", "tags"]
5087 def CheckPrereq(self):
5088 """Check prerequisites.
5090 This checks that we have the given tag.
5093 TagsLU.CheckPrereq(self)
5094 for tag in self.op.tags:
5095 objects.TaggableObject.ValidateTag(tag)
5096 del_tags = frozenset(self.op.tags)
5097 cur_tags = self.target.GetTags()
5098 if not del_tags <= cur_tags:
5099 diff_tags = del_tags - cur_tags
5100 diff_names = ["'%s'" % tag for tag in diff_tags]
5102 raise errors.OpPrereqError("Tag(s) %s not found" %
5103 (",".join(diff_names)))
5105 def Exec(self, feedback_fn):
5106 """Remove the tag from the object.
5109 for tag in self.op.tags:
5110 self.target.RemoveTag(tag)
5112 self.cfg.Update(self.target)
5113 except errors.ConfigurationError:
5114 raise errors.OpRetryError("There has been a modification to the"
5115 " config file and the operation has been"
5116 " aborted. Please retry.")
5119 class LUTestDelay(NoHooksLU):
5120 """Sleep for a specified amount of time.
5122 This LU sleeps on the master and/or nodes for a specified amount of
5126 _OP_REQP = ["duration", "on_master", "on_nodes"]
5129 def ExpandNames(self):
5130 """Expand names and set required locks.
5132 This expands the node list, if any.
5135 self.needed_locks = {}
5136 if self.op.on_nodes:
5137 # _GetWantedNodes can be used here, but is not always appropriate to use
5138 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5140 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5141 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5143 def CheckPrereq(self):
5144 """Check prerequisites.
5148 def Exec(self, feedback_fn):
5149 """Do the actual sleep.
5152 if self.op.on_master:
5153 if not utils.TestDelay(self.op.duration):
5154 raise errors.OpExecError("Error during master delay test")
5155 if self.op.on_nodes:
5156 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5158 raise errors.OpExecError("Complete failure from rpc call")
5159 for node, node_result in result.items():
5161 raise errors.OpExecError("Failure during rpc call to node %s,"
5162 " result: %s" % (node, node_result))
5165 class IAllocator(object):
5166 """IAllocator framework.
5168 An IAllocator instance has three sets of attributes:
5169 - cfg that is needed to query the cluster
5170 - input data (all members of the _KEYS class attribute are required)
5171 - four buffer attributes (in|out_data|text), that represent the
5172 input (to the external script) in text and data structure format,
5173 and the output from it, again in two formats
5174 - the result variables from the script (success, info, nodes) for
5179 "mem_size", "disks", "disk_template",
5180 "os", "tags", "nics", "vcpus",
5186 def __init__(self, cfg, mode, name, **kwargs):
5188 # init buffer variables
5189 self.in_text = self.out_text = self.in_data = self.out_data = None
5190 # init all input fields so that pylint is happy
5193 self.mem_size = self.disks = self.disk_template = None
5194 self.os = self.tags = self.nics = self.vcpus = None
5195 self.relocate_from = None
5197 self.required_nodes = None
5198 # init result fields
5199 self.success = self.info = self.nodes = None
5200 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5201 keyset = self._ALLO_KEYS
5202 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5203 keyset = self._RELO_KEYS
5205 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5206 " IAllocator" % self.mode)
5208 if key not in keyset:
5209 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5210 " IAllocator" % key)
5211 setattr(self, key, kwargs[key])
5213 if key not in kwargs:
5214 raise errors.ProgrammerError("Missing input parameter '%s' to"
5215 " IAllocator" % key)
5216 self._BuildInputData()
5218 def _ComputeClusterData(self):
5219 """Compute the generic allocator input data.
5221 This is the data that is independent of the actual operation.
5225 cluster_info = cfg.GetClusterInfo()
5229 "cluster_name": self.cfg.GetClusterName(),
5230 "cluster_tags": list(cluster_info.GetTags()),
5231 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5232 # we don't have job IDs
5235 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5239 node_list = cfg.GetNodeList()
5240 # FIXME: here we have only one hypervisor information, but
5241 # instance can belong to different hypervisors
5242 node_data = rpc.call_node_info(node_list, cfg.GetVGName(),
5243 cfg.GetHypervisorType())
5244 for nname in node_list:
5245 ninfo = cfg.GetNodeInfo(nname)
5246 if nname not in node_data or not isinstance(node_data[nname], dict):
5247 raise errors.OpExecError("Can't get data for node %s" % nname)
5248 remote_info = node_data[nname]
5249 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5250 'vg_size', 'vg_free', 'cpu_total']:
5251 if attr not in remote_info:
5252 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5255 remote_info[attr] = int(remote_info[attr])
5256 except ValueError, err:
5257 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5258 " %s" % (nname, attr, str(err)))
5259 # compute memory used by primary instances
5260 i_p_mem = i_p_up_mem = 0
5261 for iinfo in i_list:
5262 if iinfo.primary_node == nname:
5263 i_p_mem += iinfo.memory
5264 if iinfo.status == "up":
5265 i_p_up_mem += iinfo.memory
5267 # compute memory used by instances
5269 "tags": list(ninfo.GetTags()),
5270 "total_memory": remote_info['memory_total'],
5271 "reserved_memory": remote_info['memory_dom0'],
5272 "free_memory": remote_info['memory_free'],
5273 "i_pri_memory": i_p_mem,
5274 "i_pri_up_memory": i_p_up_mem,
5275 "total_disk": remote_info['vg_size'],
5276 "free_disk": remote_info['vg_free'],
5277 "primary_ip": ninfo.primary_ip,
5278 "secondary_ip": ninfo.secondary_ip,
5279 "total_cpus": remote_info['cpu_total'],
5281 node_results[nname] = pnr
5282 data["nodes"] = node_results
5286 for iinfo in i_list:
5287 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5288 for n in iinfo.nics]
5290 "tags": list(iinfo.GetTags()),
5291 "should_run": iinfo.status == "up",
5292 "vcpus": iinfo.vcpus,
5293 "memory": iinfo.memory,
5295 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5297 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5298 "disk_template": iinfo.disk_template,
5299 "hypervisor": iinfo.hypervisor,
5301 instance_data[iinfo.name] = pir
5303 data["instances"] = instance_data
5307 def _AddNewInstance(self):
5308 """Add new instance data to allocator structure.
5310 This in combination with _AllocatorGetClusterData will create the
5311 correct structure needed as input for the allocator.
5313 The checks for the completeness of the opcode must have already been
5318 if len(self.disks) != 2:
5319 raise errors.OpExecError("Only two-disk configurations supported")
5321 disk_space = _ComputeDiskSize(self.disk_template,
5322 self.disks[0]["size"], self.disks[1]["size"])
5324 if self.disk_template in constants.DTS_NET_MIRROR:
5325 self.required_nodes = 2
5327 self.required_nodes = 1
5331 "disk_template": self.disk_template,
5334 "vcpus": self.vcpus,
5335 "memory": self.mem_size,
5336 "disks": self.disks,
5337 "disk_space_total": disk_space,
5339 "required_nodes": self.required_nodes,
5341 data["request"] = request
5343 def _AddRelocateInstance(self):
5344 """Add relocate instance data to allocator structure.
5346 This in combination with _IAllocatorGetClusterData will create the
5347 correct structure needed as input for the allocator.
5349 The checks for the completeness of the opcode must have already been
5353 instance = self.cfg.GetInstanceInfo(self.name)
5354 if instance is None:
5355 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5356 " IAllocator" % self.name)
5358 if instance.disk_template not in constants.DTS_NET_MIRROR:
5359 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5361 if len(instance.secondary_nodes) != 1:
5362 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5364 self.required_nodes = 1
5366 disk_space = _ComputeDiskSize(instance.disk_template,
5367 instance.disks[0].size,
5368 instance.disks[1].size)
5373 "disk_space_total": disk_space,
5374 "required_nodes": self.required_nodes,
5375 "relocate_from": self.relocate_from,
5377 self.in_data["request"] = request
5379 def _BuildInputData(self):
5380 """Build input data structures.
5383 self._ComputeClusterData()
5385 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5386 self._AddNewInstance()
5388 self._AddRelocateInstance()
5390 self.in_text = serializer.Dump(self.in_data)
5392 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5393 """Run an instance allocator and return the results.
5398 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
5400 if not isinstance(result, (list, tuple)) or len(result) != 4:
5401 raise errors.OpExecError("Invalid result from master iallocator runner")
5403 rcode, stdout, stderr, fail = result
5405 if rcode == constants.IARUN_NOTFOUND:
5406 raise errors.OpExecError("Can't find allocator '%s'" % name)
5407 elif rcode == constants.IARUN_FAILURE:
5408 raise errors.OpExecError("Instance allocator call failed: %s,"
5409 " output: %s" % (fail, stdout+stderr))
5410 self.out_text = stdout
5412 self._ValidateResult()
5414 def _ValidateResult(self):
5415 """Process the allocator results.
5417 This will process and if successful save the result in
5418 self.out_data and the other parameters.
5422 rdict = serializer.Load(self.out_text)
5423 except Exception, err:
5424 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5426 if not isinstance(rdict, dict):
5427 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5429 for key in "success", "info", "nodes":
5430 if key not in rdict:
5431 raise errors.OpExecError("Can't parse iallocator results:"
5432 " missing key '%s'" % key)
5433 setattr(self, key, rdict[key])
5435 if not isinstance(rdict["nodes"], list):
5436 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5438 self.out_data = rdict
5441 class LUTestAllocator(NoHooksLU):
5442 """Run allocator tests.
5444 This LU runs the allocator tests
5447 _OP_REQP = ["direction", "mode", "name"]
5449 def CheckPrereq(self):
5450 """Check prerequisites.
5452 This checks the opcode parameters depending on the director and mode test.
5455 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5456 for attr in ["name", "mem_size", "disks", "disk_template",
5457 "os", "tags", "nics", "vcpus"]:
5458 if not hasattr(self.op, attr):
5459 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5461 iname = self.cfg.ExpandInstanceName(self.op.name)
5462 if iname is not None:
5463 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5465 if not isinstance(self.op.nics, list):
5466 raise errors.OpPrereqError("Invalid parameter 'nics'")
5467 for row in self.op.nics:
5468 if (not isinstance(row, dict) or
5471 "bridge" not in row):
5472 raise errors.OpPrereqError("Invalid contents of the"
5473 " 'nics' parameter")
5474 if not isinstance(self.op.disks, list):
5475 raise errors.OpPrereqError("Invalid parameter 'disks'")
5476 if len(self.op.disks) != 2:
5477 raise errors.OpPrereqError("Only two-disk configurations supported")
5478 for row in self.op.disks:
5479 if (not isinstance(row, dict) or
5480 "size" not in row or
5481 not isinstance(row["size"], int) or
5482 "mode" not in row or
5483 row["mode"] not in ['r', 'w']):
5484 raise errors.OpPrereqError("Invalid contents of the"
5485 " 'disks' parameter")
5486 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5487 if not hasattr(self.op, "name"):
5488 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5489 fname = self.cfg.ExpandInstanceName(self.op.name)
5491 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5493 self.op.name = fname
5494 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5496 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5499 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5500 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5501 raise errors.OpPrereqError("Missing allocator name")
5502 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5503 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5506 def Exec(self, feedback_fn):
5507 """Run the allocator test.
5510 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5511 ial = IAllocator(self.cfg,
5514 mem_size=self.op.mem_size,
5515 disks=self.op.disks,
5516 disk_template=self.op.disk_template,
5520 vcpus=self.op.vcpus,
5523 ial = IAllocator(self.cfg,
5526 relocate_from=list(self.relocate_from),
5529 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5530 result = ial.in_text
5532 ial.Run(self.op.allocator, validate=False)
5533 result = ial.out_text