4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement ExpandNames
52 - implement CheckPrereq
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements:
57 REQ_MASTER: the LU needs to run on the master node
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 for attr_name in self._OP_REQP:
92 attr_val = getattr(op, attr_name, None)
94 raise errors.OpPrereqError("Required parameter '%s' missing" %
97 if not self.cfg.IsCluster():
98 raise errors.OpPrereqError("Cluster not initialized yet,"
99 " use 'gnt-cluster init' first.")
101 master = self.cfg.GetMasterNode()
102 if master != utils.HostInfo().name:
103 raise errors.OpPrereqError("Commands must be run on the master"
107 """Returns the SshRunner object
111 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
114 ssh = property(fget=__GetSSH)
116 def ExpandNames(self):
117 """Expand names for this LU.
119 This method is called before starting to execute the opcode, and it should
120 update all the parameters of the opcode to their canonical form (e.g. a
121 short node name must be fully expanded after this method has successfully
122 completed). This way locking, hooks, logging, ecc. can work correctly.
124 LUs which implement this method must also populate the self.needed_locks
125 member, as a dict with lock levels as keys, and a list of needed lock names
127 - Use an empty dict if you don't need any lock
128 - If you don't need any lock at a particular level omit that level
129 - Don't put anything for the BGL level
130 - If you want all locks at a level use locking.ALL_SET as a value
132 If you need to share locks (rather than acquire them exclusively) at one
133 level you can modify self.share_locks, setting a true value (usually 1) for
134 that level. By default locks are not shared.
137 # Acquire all nodes and one instance
138 self.needed_locks = {
139 locking.LEVEL_NODE: locking.ALL_SET,
140 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
142 # Acquire just two nodes
143 self.needed_locks = {
144 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
147 self.needed_locks = {} # No, you can't leave it to the default value None
150 # The implementation of this method is mandatory only if the new LU is
151 # concurrent, so that old LUs don't need to be changed all at the same
154 self.needed_locks = {} # Exclusive LUs don't need locks.
156 raise NotImplementedError
158 def DeclareLocks(self, level):
159 """Declare LU locking needs for a level
161 While most LUs can just declare their locking needs at ExpandNames time,
162 sometimes there's the need to calculate some locks after having acquired
163 the ones before. This function is called just before acquiring locks at a
164 particular level, but after acquiring the ones at lower levels, and permits
165 such calculations. It can be used to modify self.needed_locks, and by
166 default it does nothing.
168 This function is only called if you have something already set in
169 self.needed_locks for the level.
171 @param level: Locking level which is going to be locked
172 @type level: member of ganeti.locking.LEVELS
176 def CheckPrereq(self):
177 """Check prerequisites for this LU.
179 This method should check that the prerequisites for the execution
180 of this LU are fulfilled. It can do internode communication, but
181 it should be idempotent - no cluster or system changes are
184 The method should raise errors.OpPrereqError in case something is
185 not fulfilled. Its return value is ignored.
187 This method should also update all the parameters of the opcode to
188 their canonical form if it hasn't been done by ExpandNames before.
191 raise NotImplementedError
193 def Exec(self, feedback_fn):
196 This method should implement the actual work. It should raise
197 errors.OpExecError for failures that are somewhat dealt with in
201 raise NotImplementedError
203 def BuildHooksEnv(self):
204 """Build hooks environment for this LU.
206 This method should return a three-node tuple consisting of: a dict
207 containing the environment that will be used for running the
208 specific hook for this LU, a list of node names on which the hook
209 should run before the execution, and a list of node names on which
210 the hook should run after the execution.
212 The keys of the dict must not have 'GANETI_' prefixed as this will
213 be handled in the hooks runner. Also note additional keys will be
214 added by the hooks runner. If the LU doesn't define any
215 environment, an empty dict (and not None) should be returned.
217 No nodes should be returned as an empty list (and not None).
219 Note that if the HPATH for a LU class is None, this function will
223 raise NotImplementedError
225 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226 """Notify the LU about the results of its hooks.
228 This method is called every time a hooks phase is executed, and notifies
229 the Logical Unit about the hooks' result. The LU can then use it to alter
230 its result based on the hooks. By default the method does nothing and the
231 previous result is passed back unchanged but any LU can define it if it
232 wants to use the local cluster hook-scripts somehow.
235 phase: the hooks phase that has just been run
236 hooks_results: the results of the multi-node hooks rpc call
237 feedback_fn: function to send feedback back to the caller
238 lu_result: the previous result this LU had, or None in the PRE phase.
243 def _ExpandAndLockInstance(self):
244 """Helper function to expand and lock an instance.
246 Many LUs that work on an instance take its name in self.op.instance_name
247 and need to expand it and then declare the expanded name for locking. This
248 function does it, and then updates self.op.instance_name to the expanded
249 name. It also initializes needed_locks as a dict, if this hasn't been done
253 if self.needed_locks is None:
254 self.needed_locks = {}
256 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257 "_ExpandAndLockInstance called with instance-level locks set"
258 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259 if expanded_name is None:
260 raise errors.OpPrereqError("Instance '%s' not known" %
261 self.op.instance_name)
262 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263 self.op.instance_name = expanded_name
265 def _LockInstancesNodes(self, primary_only=False):
266 """Helper function to declare instances' nodes for locking.
268 This function should be called after locking one or more instances to lock
269 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270 with all primary or secondary nodes for instances already locked and
271 present in self.needed_locks[locking.LEVEL_INSTANCE].
273 It should be called from DeclareLocks, and for safety only works if
274 self.recalculate_locks[locking.LEVEL_NODE] is set.
276 In the future it may grow parameters to just lock some instance's nodes, or
277 to just lock primaries or secondary nodes, if needed.
279 If should be called in DeclareLocks in a way similar to:
281 if level == locking.LEVEL_NODE:
282 self._LockInstancesNodes()
284 @type primary_only: boolean
285 @param primary_only: only lock primary nodes of locked instances
288 assert locking.LEVEL_NODE in self.recalculate_locks, \
289 "_LockInstancesNodes helper function called with no nodes to recalculate"
291 # TODO: check if we're really been called with the instance locks held
293 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294 # future we might want to have different behaviors depending on the value
295 # of self.recalculate_locks[locking.LEVEL_NODE]
297 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298 instance = self.context.cfg.GetInstanceInfo(instance_name)
299 wanted_nodes.append(instance.primary_node)
301 wanted_nodes.extend(instance.secondary_nodes)
303 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
308 del self.recalculate_locks[locking.LEVEL_NODE]
311 class NoHooksLU(LogicalUnit):
312 """Simple LU which runs no hooks.
314 This LU is intended as a parent for other LogicalUnits which will
315 run no hooks, in order to reduce duplicate code.
322 def _GetWantedNodes(lu, nodes):
323 """Returns list of checked and expanded node names.
326 nodes: List of nodes (strings) or None for all
329 if not isinstance(nodes, list):
330 raise errors.OpPrereqError("Invalid argument type 'nodes'")
333 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334 " non-empty list of nodes whose name is to be expanded.")
338 node = lu.cfg.ExpandNodeName(name)
340 raise errors.OpPrereqError("No such node name '%s'" % name)
343 return utils.NiceSort(wanted)
346 def _GetWantedInstances(lu, instances):
347 """Returns list of checked and expanded instance names.
350 instances: List of instances (strings) or None for all
353 if not isinstance(instances, list):
354 raise errors.OpPrereqError("Invalid argument type 'instances'")
359 for name in instances:
360 instance = lu.cfg.ExpandInstanceName(name)
362 raise errors.OpPrereqError("No such instance name '%s'" % name)
363 wanted.append(instance)
366 wanted = lu.cfg.GetInstanceList()
367 return utils.NiceSort(wanted)
370 def _CheckOutputFields(static, dynamic, selected):
371 """Checks whether all selected fields are valid.
374 static: Static fields
375 dynamic: Dynamic fields
378 static_fields = frozenset(static)
379 dynamic_fields = frozenset(dynamic)
381 all_fields = static_fields | dynamic_fields
383 if not all_fields.issuperset(selected):
384 raise errors.OpPrereqError("Unknown output fields selected: %s"
385 % ",".join(frozenset(selected).
386 difference(all_fields)))
389 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390 memory, vcpus, nics):
391 """Builds instance related env variables for hooks from single variables.
394 secondary_nodes: List of secondary nodes as strings
398 "INSTANCE_NAME": name,
399 "INSTANCE_PRIMARY": primary_node,
400 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401 "INSTANCE_OS_TYPE": os_type,
402 "INSTANCE_STATUS": status,
403 "INSTANCE_MEMORY": memory,
404 "INSTANCE_VCPUS": vcpus,
408 nic_count = len(nics)
409 for idx, (ip, bridge, mac) in enumerate(nics):
412 env["INSTANCE_NIC%d_IP" % idx] = ip
413 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418 env["INSTANCE_NIC_COUNT"] = nic_count
423 def _BuildInstanceHookEnvByObject(instance, override=None):
424 """Builds instance related env variables for hooks from an object.
427 instance: objects.Instance object of instance
428 override: dict of values to override
431 'name': instance.name,
432 'primary_node': instance.primary_node,
433 'secondary_nodes': instance.secondary_nodes,
434 'os_type': instance.os,
435 'status': instance.os,
436 'memory': instance.memory,
437 'vcpus': instance.vcpus,
438 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441 args.update(override)
442 return _BuildInstanceHookEnv(**args)
445 def _CheckInstanceBridgesExist(lu, instance):
446 """Check that the brigdes needed by an instance exist.
449 # check bridges existance
450 brlist = [nic.bridge for nic in instance.nics]
451 if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
452 raise errors.OpPrereqError("one or more target bridges %s does not"
453 " exist on destination node '%s'" %
454 (brlist, instance.primary_node))
457 class LUDestroyCluster(NoHooksLU):
458 """Logical unit for destroying the cluster.
463 def CheckPrereq(self):
464 """Check prerequisites.
466 This checks whether the cluster is empty.
468 Any errors are signalled by raising errors.OpPrereqError.
471 master = self.cfg.GetMasterNode()
473 nodelist = self.cfg.GetNodeList()
474 if len(nodelist) != 1 or nodelist[0] != master:
475 raise errors.OpPrereqError("There are still %d node(s) in"
476 " this cluster." % (len(nodelist) - 1))
477 instancelist = self.cfg.GetInstanceList()
479 raise errors.OpPrereqError("There are still %d instance(s) in"
480 " this cluster." % len(instancelist))
482 def Exec(self, feedback_fn):
483 """Destroys the cluster.
486 master = self.cfg.GetMasterNode()
487 if not self.rpc.call_node_stop_master(master, False):
488 raise errors.OpExecError("Could not disable the master role")
489 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
490 utils.CreateBackup(priv_key)
491 utils.CreateBackup(pub_key)
495 class LUVerifyCluster(LogicalUnit):
496 """Verifies the cluster status.
499 HPATH = "cluster-verify"
500 HTYPE = constants.HTYPE_CLUSTER
501 _OP_REQP = ["skip_checks"]
504 def ExpandNames(self):
505 self.needed_locks = {
506 locking.LEVEL_NODE: locking.ALL_SET,
507 locking.LEVEL_INSTANCE: locking.ALL_SET,
509 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
511 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
512 remote_version, feedback_fn):
513 """Run multiple tests against a node.
516 - compares ganeti version
517 - checks vg existance and size > 20G
518 - checks config file checksum
519 - checks ssh to other nodes
522 node: name of the node to check
523 file_list: required list of files
524 local_cksum: dictionary of local files and their checksums
527 # compares ganeti version
528 local_version = constants.PROTOCOL_VERSION
529 if not remote_version:
530 feedback_fn(" - ERROR: connection to %s failed" % (node))
533 if local_version != remote_version:
534 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
535 (local_version, node, remote_version))
538 # checks vg existance and size > 20G
542 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
546 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
547 constants.MIN_VG_SIZE)
549 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
553 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
556 # checks config file checksum
559 if 'filelist' not in node_result:
561 feedback_fn(" - ERROR: node hasn't returned file checksum data")
563 remote_cksum = node_result['filelist']
564 for file_name in file_list:
565 if file_name not in remote_cksum:
567 feedback_fn(" - ERROR: file '%s' missing" % file_name)
568 elif remote_cksum[file_name] != local_cksum[file_name]:
570 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
572 if 'nodelist' not in node_result:
574 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
576 if node_result['nodelist']:
578 for node in node_result['nodelist']:
579 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
580 (node, node_result['nodelist'][node]))
581 if 'node-net-test' not in node_result:
583 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
585 if node_result['node-net-test']:
587 nlist = utils.NiceSort(node_result['node-net-test'].keys())
589 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
590 (node, node_result['node-net-test'][node]))
592 hyp_result = node_result.get('hypervisor', None)
593 if isinstance(hyp_result, dict):
594 for hv_name, hv_result in hyp_result.iteritems():
595 if hv_result is not None:
596 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
597 (hv_name, hv_result))
600 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
601 node_instance, feedback_fn):
602 """Verify an instance.
604 This function checks to see if the required block devices are
605 available on the instance's node.
610 node_current = instanceconfig.primary_node
613 instanceconfig.MapLVsByNode(node_vol_should)
615 for node in node_vol_should:
616 for volume in node_vol_should[node]:
617 if node not in node_vol_is or volume not in node_vol_is[node]:
618 feedback_fn(" - ERROR: volume %s missing on node %s" %
622 if not instanceconfig.status == 'down':
623 if (node_current not in node_instance or
624 not instance in node_instance[node_current]):
625 feedback_fn(" - ERROR: instance %s not running on node %s" %
626 (instance, node_current))
629 for node in node_instance:
630 if (not node == node_current):
631 if instance in node_instance[node]:
632 feedback_fn(" - ERROR: instance %s should not run on node %s" %
638 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
639 """Verify if there are any unknown volumes in the cluster.
641 The .os, .swap and backup volumes are ignored. All other volumes are
647 for node in node_vol_is:
648 for volume in node_vol_is[node]:
649 if node not in node_vol_should or volume not in node_vol_should[node]:
650 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
655 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
656 """Verify the list of running instances.
658 This checks what instances are running but unknown to the cluster.
662 for node in node_instance:
663 for runninginstance in node_instance[node]:
664 if runninginstance not in instancelist:
665 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
666 (runninginstance, node))
670 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
671 """Verify N+1 Memory Resilience.
673 Check that if one single node dies we can still start all the instances it
679 for node, nodeinfo in node_info.iteritems():
680 # This code checks that every node which is now listed as secondary has
681 # enough memory to host all instances it is supposed to should a single
682 # other node in the cluster fail.
683 # FIXME: not ready for failover to an arbitrary node
684 # FIXME: does not support file-backed instances
685 # WARNING: we currently take into account down instances as well as up
686 # ones, considering that even if they're down someone might want to start
687 # them even in the event of a node failure.
688 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
690 for instance in instances:
691 needed_mem += instance_cfg[instance].memory
692 if nodeinfo['mfree'] < needed_mem:
693 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
694 " failovers should node %s fail" % (node, prinode))
698 def CheckPrereq(self):
699 """Check prerequisites.
701 Transform the list of checks we're going to skip into a set and check that
702 all its members are valid.
705 self.skip_set = frozenset(self.op.skip_checks)
706 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
707 raise errors.OpPrereqError("Invalid checks to be skipped specified")
709 def BuildHooksEnv(self):
712 Cluster-Verify hooks just rone in the post phase and their failure makes
713 the output be logged in the verify output and the verification to fail.
716 all_nodes = self.cfg.GetNodeList()
717 # TODO: populate the environment with useful information for verify hooks
719 return env, [], all_nodes
721 def Exec(self, feedback_fn):
722 """Verify integrity of cluster, performing various test on nodes.
726 feedback_fn("* Verifying global settings")
727 for msg in self.cfg.VerifyConfig():
728 feedback_fn(" - ERROR: %s" % msg)
730 vg_name = self.cfg.GetVGName()
731 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
732 nodelist = utils.NiceSort(self.cfg.GetNodeList())
733 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
734 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
735 i_non_redundant = [] # Non redundant instances
741 # FIXME: verify OS list
744 file_names.append(constants.SSL_CERT_FILE)
745 file_names.append(constants.CLUSTER_CONF_FILE)
746 local_checksums = utils.FingerprintFiles(file_names)
748 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
749 all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
750 all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
751 all_vglist = self.rpc.call_vg_list(nodelist)
752 node_verify_param = {
753 'filelist': file_names,
754 'nodelist': nodelist,
755 'hypervisor': hypervisors,
756 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
757 for node in nodeinfo]
759 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
760 self.cfg.GetClusterName())
761 all_rversion = self.rpc.call_version(nodelist)
762 all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
763 self.cfg.GetHypervisorType())
765 for node in nodelist:
766 feedback_fn("* Verifying node %s" % node)
767 result = self._VerifyNode(node, file_names, local_checksums,
768 all_vglist[node], all_nvinfo[node],
769 all_rversion[node], feedback_fn)
773 volumeinfo = all_volumeinfo[node]
775 if isinstance(volumeinfo, basestring):
776 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
777 (node, volumeinfo[-400:].encode('string_escape')))
779 node_volume[node] = {}
780 elif not isinstance(volumeinfo, dict):
781 feedback_fn(" - ERROR: connection to %s failed" % (node,))
785 node_volume[node] = volumeinfo
788 nodeinstance = all_instanceinfo[node]
789 if type(nodeinstance) != list:
790 feedback_fn(" - ERROR: connection to %s failed" % (node,))
794 node_instance[node] = nodeinstance
797 nodeinfo = all_ninfo[node]
798 if not isinstance(nodeinfo, dict):
799 feedback_fn(" - ERROR: connection to %s failed" % (node,))
805 "mfree": int(nodeinfo['memory_free']),
806 "dfree": int(nodeinfo['vg_free']),
809 # dictionary holding all instances this node is secondary for,
810 # grouped by their primary node. Each key is a cluster node, and each
811 # value is a list of instances which have the key as primary and the
812 # current node as secondary. this is handy to calculate N+1 memory
813 # availability if you can only failover from a primary to its
815 "sinst-by-pnode": {},
818 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
824 for instance in instancelist:
825 feedback_fn("* Verifying instance %s" % instance)
826 inst_config = self.cfg.GetInstanceInfo(instance)
827 result = self._VerifyInstance(instance, inst_config, node_volume,
828 node_instance, feedback_fn)
831 inst_config.MapLVsByNode(node_vol_should)
833 instance_cfg[instance] = inst_config
835 pnode = inst_config.primary_node
836 if pnode in node_info:
837 node_info[pnode]['pinst'].append(instance)
839 feedback_fn(" - ERROR: instance %s, connection to primary node"
840 " %s failed" % (instance, pnode))
843 # If the instance is non-redundant we cannot survive losing its primary
844 # node, so we are not N+1 compliant. On the other hand we have no disk
845 # templates with more than one secondary so that situation is not well
847 # FIXME: does not support file-backed instances
848 if len(inst_config.secondary_nodes) == 0:
849 i_non_redundant.append(instance)
850 elif len(inst_config.secondary_nodes) > 1:
851 feedback_fn(" - WARNING: multiple secondaries for instance %s"
854 for snode in inst_config.secondary_nodes:
855 if snode in node_info:
856 node_info[snode]['sinst'].append(instance)
857 if pnode not in node_info[snode]['sinst-by-pnode']:
858 node_info[snode]['sinst-by-pnode'][pnode] = []
859 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
861 feedback_fn(" - ERROR: instance %s, connection to secondary node"
862 " %s failed" % (instance, snode))
864 feedback_fn("* Verifying orphan volumes")
865 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
869 feedback_fn("* Verifying remaining instances")
870 result = self._VerifyOrphanInstances(instancelist, node_instance,
874 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
875 feedback_fn("* Verifying N+1 Memory redundancy")
876 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
879 feedback_fn("* Other Notes")
881 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
882 % len(i_non_redundant))
886 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
887 """Analize the post-hooks' result, handle it, and send some
888 nicely-formatted feedback back to the user.
891 phase: the hooks phase that has just been run
892 hooks_results: the results of the multi-node hooks rpc call
893 feedback_fn: function to send feedback back to the caller
894 lu_result: previous Exec result
897 # We only really run POST phase hooks, and are only interested in
899 if phase == constants.HOOKS_PHASE_POST:
900 # Used to change hooks' output to proper indentation
901 indent_re = re.compile('^', re.M)
902 feedback_fn("* Hooks Results")
903 if not hooks_results:
904 feedback_fn(" - ERROR: general communication failure")
907 for node_name in hooks_results:
908 show_node_header = True
909 res = hooks_results[node_name]
910 if res is False or not isinstance(res, list):
911 feedback_fn(" Communication failure")
914 for script, hkr, output in res:
915 if hkr == constants.HKR_FAIL:
916 # The node header is only shown once, if there are
917 # failing hooks on that node
919 feedback_fn(" Node %s:" % node_name)
920 show_node_header = False
921 feedback_fn(" ERROR: Script %s failed, output:" % script)
922 output = indent_re.sub(' ', output)
923 feedback_fn("%s" % output)
929 class LUVerifyDisks(NoHooksLU):
930 """Verifies the cluster disks status.
936 def ExpandNames(self):
937 self.needed_locks = {
938 locking.LEVEL_NODE: locking.ALL_SET,
939 locking.LEVEL_INSTANCE: locking.ALL_SET,
941 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
943 def CheckPrereq(self):
944 """Check prerequisites.
946 This has no prerequisites.
951 def Exec(self, feedback_fn):
952 """Verify integrity of cluster disks.
955 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
957 vg_name = self.cfg.GetVGName()
958 nodes = utils.NiceSort(self.cfg.GetNodeList())
959 instances = [self.cfg.GetInstanceInfo(name)
960 for name in self.cfg.GetInstanceList()]
963 for inst in instances:
965 if (inst.status != "up" or
966 inst.disk_template not in constants.DTS_NET_MIRROR):
968 inst.MapLVsByNode(inst_lvs)
969 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
970 for node, vol_list in inst_lvs.iteritems():
972 nv_dict[(node, vol)] = inst
977 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
984 if isinstance(lvs, basestring):
985 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
987 elif not isinstance(lvs, dict):
988 logger.Info("connection to node %s failed or invalid data returned" %
990 res_nodes.append(node)
993 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
994 inst = nv_dict.pop((node, lv_name), None)
995 if (not lv_online and inst is not None
996 and inst.name not in res_instances):
997 res_instances.append(inst.name)
999 # any leftover items in nv_dict are missing LVs, let's arrange the
1001 for key, inst in nv_dict.iteritems():
1002 if inst.name not in res_missing:
1003 res_missing[inst.name] = []
1004 res_missing[inst.name].append(key)
1009 class LURenameCluster(LogicalUnit):
1010 """Rename the cluster.
1013 HPATH = "cluster-rename"
1014 HTYPE = constants.HTYPE_CLUSTER
1017 def BuildHooksEnv(self):
1022 "OP_TARGET": self.cfg.GetClusterName(),
1023 "NEW_NAME": self.op.name,
1025 mn = self.cfg.GetMasterNode()
1026 return env, [mn], [mn]
1028 def CheckPrereq(self):
1029 """Verify that the passed name is a valid one.
1032 hostname = utils.HostInfo(self.op.name)
1034 new_name = hostname.name
1035 self.ip = new_ip = hostname.ip
1036 old_name = self.cfg.GetClusterName()
1037 old_ip = self.cfg.GetMasterIP()
1038 if new_name == old_name and new_ip == old_ip:
1039 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1040 " cluster has changed")
1041 if new_ip != old_ip:
1042 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1043 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1044 " reachable on the network. Aborting." %
1047 self.op.name = new_name
1049 def Exec(self, feedback_fn):
1050 """Rename the cluster.
1053 clustername = self.op.name
1056 # shutdown the master IP
1057 master = self.cfg.GetMasterNode()
1058 if not self.rpc.call_node_stop_master(master, False):
1059 raise errors.OpExecError("Could not disable the master role")
1064 ss.SetKey(ss.SS_MASTER_IP, ip)
1065 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1067 # Distribute updated ss config to all nodes
1068 myself = self.cfg.GetNodeInfo(master)
1069 dist_nodes = self.cfg.GetNodeList()
1070 if myself.name in dist_nodes:
1071 dist_nodes.remove(myself.name)
1073 logger.Debug("Copying updated ssconf data to all nodes")
1074 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1075 fname = ss.KeyToFilename(keyname)
1076 result = self.rpc.call_upload_file(dist_nodes, fname)
1077 for to_node in dist_nodes:
1078 if not result[to_node]:
1079 logger.Error("copy of file %s to node %s failed" %
1082 if not self.rpc.call_node_start_master(master, False):
1083 logger.Error("Could not re-enable the master role on the master,"
1084 " please restart manually.")
1087 def _RecursiveCheckIfLVMBased(disk):
1088 """Check if the given disk or its children are lvm-based.
1091 disk: ganeti.objects.Disk object
1094 boolean indicating whether a LD_LV dev_type was found or not
1098 for chdisk in disk.children:
1099 if _RecursiveCheckIfLVMBased(chdisk):
1101 return disk.dev_type == constants.LD_LV
1104 class LUSetClusterParams(LogicalUnit):
1105 """Change the parameters of the cluster.
1108 HPATH = "cluster-modify"
1109 HTYPE = constants.HTYPE_CLUSTER
1113 def ExpandNames(self):
1114 # FIXME: in the future maybe other cluster params won't require checking on
1115 # all nodes to be modified.
1116 self.needed_locks = {
1117 locking.LEVEL_NODE: locking.ALL_SET,
1119 self.share_locks[locking.LEVEL_NODE] = 1
1121 def BuildHooksEnv(self):
1126 "OP_TARGET": self.cfg.GetClusterName(),
1127 "NEW_VG_NAME": self.op.vg_name,
1129 mn = self.cfg.GetMasterNode()
1130 return env, [mn], [mn]
1132 def CheckPrereq(self):
1133 """Check prerequisites.
1135 This checks whether the given params don't conflict and
1136 if the given volume group is valid.
1139 # FIXME: This only works because there is only one parameter that can be
1140 # changed or removed.
1141 if not self.op.vg_name:
1142 instances = self.cfg.GetAllInstancesInfo().values()
1143 for inst in instances:
1144 for disk in inst.disks:
1145 if _RecursiveCheckIfLVMBased(disk):
1146 raise errors.OpPrereqError("Cannot disable lvm storage while"
1147 " lvm-based instances exist")
1149 # if vg_name not None, checks given volume group on all nodes
1151 node_list = self.acquired_locks[locking.LEVEL_NODE]
1152 vglist = self.rpc.call_vg_list(node_list)
1153 for node in node_list:
1154 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1155 constants.MIN_VG_SIZE)
1157 raise errors.OpPrereqError("Error on node '%s': %s" %
1160 def Exec(self, feedback_fn):
1161 """Change the parameters of the cluster.
1164 if self.op.vg_name != self.cfg.GetVGName():
1165 self.cfg.SetVGName(self.op.vg_name)
1167 feedback_fn("Cluster LVM configuration already in desired"
1168 " state, not changing")
1171 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1172 """Sleep and poll for an instance's disk to sync.
1175 if not instance.disks:
1179 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1181 node = instance.primary_node
1183 for dev in instance.disks:
1184 lu.cfg.SetDiskID(dev, node)
1190 cumul_degraded = False
1191 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1193 lu.proc.LogWarning("Can't get any data from node %s" % node)
1196 raise errors.RemoteError("Can't contact node %s for mirror data,"
1197 " aborting." % node)
1201 for i in range(len(rstats)):
1204 lu.proc.LogWarning("Can't compute data for node %s/%s" %
1205 (node, instance.disks[i].iv_name))
1207 # we ignore the ldisk parameter
1208 perc_done, est_time, is_degraded, _ = mstat
1209 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1210 if perc_done is not None:
1212 if est_time is not None:
1213 rem_time = "%d estimated seconds remaining" % est_time
1216 rem_time = "no time estimate"
1217 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1218 (instance.disks[i].iv_name, perc_done, rem_time))
1222 time.sleep(min(60, max_time))
1225 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1226 return not cumul_degraded
1229 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1230 """Check that mirrors are not degraded.
1232 The ldisk parameter, if True, will change the test from the
1233 is_degraded attribute (which represents overall non-ok status for
1234 the device(s)) to the ldisk (representing the local storage status).
1237 lu.cfg.SetDiskID(dev, node)
1244 if on_primary or dev.AssembleOnSecondary():
1245 rstats = lu.rpc.call_blockdev_find(node, dev)
1247 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1250 result = result and (not rstats[idx])
1252 for child in dev.children:
1253 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1258 class LUDiagnoseOS(NoHooksLU):
1259 """Logical unit for OS diagnose/query.
1262 _OP_REQP = ["output_fields", "names"]
1265 def ExpandNames(self):
1267 raise errors.OpPrereqError("Selective OS query not supported")
1269 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1270 _CheckOutputFields(static=[],
1271 dynamic=self.dynamic_fields,
1272 selected=self.op.output_fields)
1274 # Lock all nodes, in shared mode
1275 self.needed_locks = {}
1276 self.share_locks[locking.LEVEL_NODE] = 1
1277 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1279 def CheckPrereq(self):
1280 """Check prerequisites.
1285 def _DiagnoseByOS(node_list, rlist):
1286 """Remaps a per-node return list into an a per-os per-node dictionary
1289 node_list: a list with the names of all nodes
1290 rlist: a map with node names as keys and OS objects as values
1293 map: a map with osnames as keys and as value another map, with
1295 keys and list of OS objects as values
1296 e.g. {"debian-etch": {"node1": [<object>,...],
1297 "node2": [<object>,]}
1302 for node_name, nr in rlist.iteritems():
1306 if os_obj.name not in all_os:
1307 # build a list of nodes for this os containing empty lists
1308 # for each node in node_list
1309 all_os[os_obj.name] = {}
1310 for nname in node_list:
1311 all_os[os_obj.name][nname] = []
1312 all_os[os_obj.name][node_name].append(os_obj)
1315 def Exec(self, feedback_fn):
1316 """Compute the list of OSes.
1319 node_list = self.acquired_locks[locking.LEVEL_NODE]
1320 node_data = self.rpc.call_os_diagnose(node_list)
1321 if node_data == False:
1322 raise errors.OpExecError("Can't gather the list of OSes")
1323 pol = self._DiagnoseByOS(node_list, node_data)
1325 for os_name, os_data in pol.iteritems():
1327 for field in self.op.output_fields:
1330 elif field == "valid":
1331 val = utils.all([osl and osl[0] for osl in os_data.values()])
1332 elif field == "node_status":
1334 for node_name, nos_list in os_data.iteritems():
1335 val[node_name] = [(v.status, v.path) for v in nos_list]
1337 raise errors.ParameterError(field)
1344 class LURemoveNode(LogicalUnit):
1345 """Logical unit for removing a node.
1348 HPATH = "node-remove"
1349 HTYPE = constants.HTYPE_NODE
1350 _OP_REQP = ["node_name"]
1352 def BuildHooksEnv(self):
1355 This doesn't run on the target node in the pre phase as a failed
1356 node would then be impossible to remove.
1360 "OP_TARGET": self.op.node_name,
1361 "NODE_NAME": self.op.node_name,
1363 all_nodes = self.cfg.GetNodeList()
1364 all_nodes.remove(self.op.node_name)
1365 return env, all_nodes, all_nodes
1367 def CheckPrereq(self):
1368 """Check prerequisites.
1371 - the node exists in the configuration
1372 - it does not have primary or secondary instances
1373 - it's not the master
1375 Any errors are signalled by raising errors.OpPrereqError.
1378 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1380 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1382 instance_list = self.cfg.GetInstanceList()
1384 masternode = self.cfg.GetMasterNode()
1385 if node.name == masternode:
1386 raise errors.OpPrereqError("Node is the master node,"
1387 " you need to failover first.")
1389 for instance_name in instance_list:
1390 instance = self.cfg.GetInstanceInfo(instance_name)
1391 if node.name == instance.primary_node:
1392 raise errors.OpPrereqError("Instance %s still running on the node,"
1393 " please remove first." % instance_name)
1394 if node.name in instance.secondary_nodes:
1395 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1396 " please remove first." % instance_name)
1397 self.op.node_name = node.name
1400 def Exec(self, feedback_fn):
1401 """Removes the node from the cluster.
1405 logger.Info("stopping the node daemon and removing configs from node %s" %
1408 self.context.RemoveNode(node.name)
1410 self.rpc.call_node_leave_cluster(node.name)
1413 class LUQueryNodes(NoHooksLU):
1414 """Logical unit for querying nodes.
1417 _OP_REQP = ["output_fields", "names"]
1420 def ExpandNames(self):
1421 self.dynamic_fields = frozenset([
1423 "mtotal", "mnode", "mfree",
1428 self.static_fields = frozenset([
1429 "name", "pinst_cnt", "sinst_cnt",
1430 "pinst_list", "sinst_list",
1431 "pip", "sip", "tags",
1435 _CheckOutputFields(static=self.static_fields,
1436 dynamic=self.dynamic_fields,
1437 selected=self.op.output_fields)
1439 self.needed_locks = {}
1440 self.share_locks[locking.LEVEL_NODE] = 1
1443 self.wanted = _GetWantedNodes(self, self.op.names)
1445 self.wanted = locking.ALL_SET
1447 self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1449 # if we don't request only static fields, we need to lock the nodes
1450 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1453 def CheckPrereq(self):
1454 """Check prerequisites.
1457 # The validation of the node list is done in the _GetWantedNodes,
1458 # if non empty, and if empty, there's no validation to do
1461 def Exec(self, feedback_fn):
1462 """Computes the list of nodes and their attributes.
1465 all_info = self.cfg.GetAllNodesInfo()
1467 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1468 elif self.wanted != locking.ALL_SET:
1469 nodenames = self.wanted
1470 missing = set(nodenames).difference(all_info.keys())
1472 raise errors.OpExecError(
1473 "Some nodes were removed before retrieving their data: %s" % missing)
1475 nodenames = all_info.keys()
1476 nodelist = [all_info[name] for name in nodenames]
1478 # begin data gathering
1480 if self.dynamic_fields.intersection(self.op.output_fields):
1482 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1483 self.cfg.GetHypervisorType())
1484 for name in nodenames:
1485 nodeinfo = node_data.get(name, None)
1488 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1489 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1490 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1491 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1492 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1493 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1494 "bootid": nodeinfo['bootid'],
1497 live_data[name] = {}
1499 live_data = dict.fromkeys(nodenames, {})
1501 node_to_primary = dict([(name, set()) for name in nodenames])
1502 node_to_secondary = dict([(name, set()) for name in nodenames])
1504 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1505 "sinst_cnt", "sinst_list"))
1506 if inst_fields & frozenset(self.op.output_fields):
1507 instancelist = self.cfg.GetInstanceList()
1509 for instance_name in instancelist:
1510 inst = self.cfg.GetInstanceInfo(instance_name)
1511 if inst.primary_node in node_to_primary:
1512 node_to_primary[inst.primary_node].add(inst.name)
1513 for secnode in inst.secondary_nodes:
1514 if secnode in node_to_secondary:
1515 node_to_secondary[secnode].add(inst.name)
1517 # end data gathering
1520 for node in nodelist:
1522 for field in self.op.output_fields:
1525 elif field == "pinst_list":
1526 val = list(node_to_primary[node.name])
1527 elif field == "sinst_list":
1528 val = list(node_to_secondary[node.name])
1529 elif field == "pinst_cnt":
1530 val = len(node_to_primary[node.name])
1531 elif field == "sinst_cnt":
1532 val = len(node_to_secondary[node.name])
1533 elif field == "pip":
1534 val = node.primary_ip
1535 elif field == "sip":
1536 val = node.secondary_ip
1537 elif field == "tags":
1538 val = list(node.GetTags())
1539 elif field == "serial_no":
1540 val = node.serial_no
1541 elif field in self.dynamic_fields:
1542 val = live_data[node.name].get(field, None)
1544 raise errors.ParameterError(field)
1545 node_output.append(val)
1546 output.append(node_output)
1551 class LUQueryNodeVolumes(NoHooksLU):
1552 """Logical unit for getting volumes on node(s).
1555 _OP_REQP = ["nodes", "output_fields"]
1558 def ExpandNames(self):
1559 _CheckOutputFields(static=["node"],
1560 dynamic=["phys", "vg", "name", "size", "instance"],
1561 selected=self.op.output_fields)
1563 self.needed_locks = {}
1564 self.share_locks[locking.LEVEL_NODE] = 1
1565 if not self.op.nodes:
1566 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1568 self.needed_locks[locking.LEVEL_NODE] = \
1569 _GetWantedNodes(self, self.op.nodes)
1571 def CheckPrereq(self):
1572 """Check prerequisites.
1574 This checks that the fields required are valid output fields.
1577 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1579 def Exec(self, feedback_fn):
1580 """Computes the list of nodes and their attributes.
1583 nodenames = self.nodes
1584 volumes = self.rpc.call_node_volumes(nodenames)
1586 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1587 in self.cfg.GetInstanceList()]
1589 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1592 for node in nodenames:
1593 if node not in volumes or not volumes[node]:
1596 node_vols = volumes[node][:]
1597 node_vols.sort(key=lambda vol: vol['dev'])
1599 for vol in node_vols:
1601 for field in self.op.output_fields:
1604 elif field == "phys":
1608 elif field == "name":
1610 elif field == "size":
1611 val = int(float(vol['size']))
1612 elif field == "instance":
1614 if node not in lv_by_node[inst]:
1616 if vol['name'] in lv_by_node[inst][node]:
1622 raise errors.ParameterError(field)
1623 node_output.append(str(val))
1625 output.append(node_output)
1630 class LUAddNode(LogicalUnit):
1631 """Logical unit for adding node to the cluster.
1635 HTYPE = constants.HTYPE_NODE
1636 _OP_REQP = ["node_name"]
1638 def BuildHooksEnv(self):
1641 This will run on all nodes before, and on all nodes + the new node after.
1645 "OP_TARGET": self.op.node_name,
1646 "NODE_NAME": self.op.node_name,
1647 "NODE_PIP": self.op.primary_ip,
1648 "NODE_SIP": self.op.secondary_ip,
1650 nodes_0 = self.cfg.GetNodeList()
1651 nodes_1 = nodes_0 + [self.op.node_name, ]
1652 return env, nodes_0, nodes_1
1654 def CheckPrereq(self):
1655 """Check prerequisites.
1658 - the new node is not already in the config
1660 - its parameters (single/dual homed) matches the cluster
1662 Any errors are signalled by raising errors.OpPrereqError.
1665 node_name = self.op.node_name
1668 dns_data = utils.HostInfo(node_name)
1670 node = dns_data.name
1671 primary_ip = self.op.primary_ip = dns_data.ip
1672 secondary_ip = getattr(self.op, "secondary_ip", None)
1673 if secondary_ip is None:
1674 secondary_ip = primary_ip
1675 if not utils.IsValidIP(secondary_ip):
1676 raise errors.OpPrereqError("Invalid secondary IP given")
1677 self.op.secondary_ip = secondary_ip
1679 node_list = cfg.GetNodeList()
1680 if not self.op.readd and node in node_list:
1681 raise errors.OpPrereqError("Node %s is already in the configuration" %
1683 elif self.op.readd and node not in node_list:
1684 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1686 for existing_node_name in node_list:
1687 existing_node = cfg.GetNodeInfo(existing_node_name)
1689 if self.op.readd and node == existing_node_name:
1690 if (existing_node.primary_ip != primary_ip or
1691 existing_node.secondary_ip != secondary_ip):
1692 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1693 " address configuration as before")
1696 if (existing_node.primary_ip == primary_ip or
1697 existing_node.secondary_ip == primary_ip or
1698 existing_node.primary_ip == secondary_ip or
1699 existing_node.secondary_ip == secondary_ip):
1700 raise errors.OpPrereqError("New node ip address(es) conflict with"
1701 " existing node %s" % existing_node.name)
1703 # check that the type of the node (single versus dual homed) is the
1704 # same as for the master
1705 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1706 master_singlehomed = myself.secondary_ip == myself.primary_ip
1707 newbie_singlehomed = secondary_ip == primary_ip
1708 if master_singlehomed != newbie_singlehomed:
1709 if master_singlehomed:
1710 raise errors.OpPrereqError("The master has no private ip but the"
1711 " new node has one")
1713 raise errors.OpPrereqError("The master has a private ip but the"
1714 " new node doesn't have one")
1716 # checks reachablity
1717 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1718 raise errors.OpPrereqError("Node not reachable by ping")
1720 if not newbie_singlehomed:
1721 # check reachability from my secondary ip to newbie's secondary ip
1722 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1723 source=myself.secondary_ip):
1724 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1725 " based ping to noded port")
1727 self.new_node = objects.Node(name=node,
1728 primary_ip=primary_ip,
1729 secondary_ip=secondary_ip)
1731 def Exec(self, feedback_fn):
1732 """Adds the new node to the cluster.
1735 new_node = self.new_node
1736 node = new_node.name
1738 # check connectivity
1739 result = self.rpc.call_version([node])[node]
1741 if constants.PROTOCOL_VERSION == result:
1742 logger.Info("communication to node %s fine, sw version %s match" %
1745 raise errors.OpExecError("Version mismatch master version %s,"
1746 " node version %s" %
1747 (constants.PROTOCOL_VERSION, result))
1749 raise errors.OpExecError("Cannot get version from the new node")
1752 logger.Info("copy ssh key to node %s" % node)
1753 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1755 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1756 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1762 keyarray.append(f.read())
1766 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1768 keyarray[3], keyarray[4], keyarray[5])
1771 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1773 # Add node to our /etc/hosts, and add key to known_hosts
1774 utils.AddHostToEtcHosts(new_node.name)
1776 if new_node.secondary_ip != new_node.primary_ip:
1777 if not self.rpc.call_node_has_ip_address(new_node.name,
1778 new_node.secondary_ip):
1779 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1780 " you gave (%s). Please fix and re-run this"
1781 " command." % new_node.secondary_ip)
1783 node_verify_list = [self.cfg.GetMasterNode()]
1784 node_verify_param = {
1786 # TODO: do a node-net-test as well?
1789 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1790 self.cfg.GetClusterName())
1791 for verifier in node_verify_list:
1792 if not result[verifier]:
1793 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1794 " for remote verification" % verifier)
1795 if result[verifier]['nodelist']:
1796 for failed in result[verifier]['nodelist']:
1797 feedback_fn("ssh/hostname verification failed %s -> %s" %
1798 (verifier, result[verifier]['nodelist'][failed]))
1799 raise errors.OpExecError("ssh/hostname verification failed.")
1801 # Distribute updated /etc/hosts and known_hosts to all nodes,
1802 # including the node just added
1803 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1804 dist_nodes = self.cfg.GetNodeList()
1805 if not self.op.readd:
1806 dist_nodes.append(node)
1807 if myself.name in dist_nodes:
1808 dist_nodes.remove(myself.name)
1810 logger.Debug("Copying hosts and known_hosts to all nodes")
1811 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1812 result = self.rpc.call_upload_file(dist_nodes, fname)
1813 for to_node in dist_nodes:
1814 if not result[to_node]:
1815 logger.Error("copy of file %s to node %s failed" %
1819 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1820 to_copy.append(constants.VNC_PASSWORD_FILE)
1821 for fname in to_copy:
1822 result = self.rpc.call_upload_file([node], fname)
1823 if not result[node]:
1824 logger.Error("could not copy file %s to node %s" % (fname, node))
1827 self.context.ReaddNode(new_node)
1829 self.context.AddNode(new_node)
1832 class LUQueryClusterInfo(NoHooksLU):
1833 """Query cluster configuration.
1840 def ExpandNames(self):
1841 self.needed_locks = {}
1843 def CheckPrereq(self):
1844 """No prerequsites needed for this LU.
1849 def Exec(self, feedback_fn):
1850 """Return cluster config.
1854 "name": self.cfg.GetClusterName(),
1855 "software_version": constants.RELEASE_VERSION,
1856 "protocol_version": constants.PROTOCOL_VERSION,
1857 "config_version": constants.CONFIG_VERSION,
1858 "os_api_version": constants.OS_API_VERSION,
1859 "export_version": constants.EXPORT_VERSION,
1860 "master": self.cfg.GetMasterNode(),
1861 "architecture": (platform.architecture()[0], platform.machine()),
1862 "hypervisor_type": self.cfg.GetHypervisorType(),
1863 "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1869 class LUQueryConfigValues(NoHooksLU):
1870 """Return configuration values.
1876 def ExpandNames(self):
1877 self.needed_locks = {}
1879 static_fields = ["cluster_name", "master_node"]
1880 _CheckOutputFields(static=static_fields,
1882 selected=self.op.output_fields)
1884 def CheckPrereq(self):
1885 """No prerequisites.
1890 def Exec(self, feedback_fn):
1891 """Dump a representation of the cluster config to the standard output.
1895 for field in self.op.output_fields:
1896 if field == "cluster_name":
1897 values.append(self.cfg.GetClusterName())
1898 elif field == "master_node":
1899 values.append(self.cfg.GetMasterNode())
1901 raise errors.ParameterError(field)
1905 class LUActivateInstanceDisks(NoHooksLU):
1906 """Bring up an instance's disks.
1909 _OP_REQP = ["instance_name"]
1912 def ExpandNames(self):
1913 self._ExpandAndLockInstance()
1914 self.needed_locks[locking.LEVEL_NODE] = []
1915 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1917 def DeclareLocks(self, level):
1918 if level == locking.LEVEL_NODE:
1919 self._LockInstancesNodes()
1921 def CheckPrereq(self):
1922 """Check prerequisites.
1924 This checks that the instance is in the cluster.
1927 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1928 assert self.instance is not None, \
1929 "Cannot retrieve locked instance %s" % self.op.instance_name
1931 def Exec(self, feedback_fn):
1932 """Activate the disks.
1935 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1937 raise errors.OpExecError("Cannot activate block devices")
1942 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
1943 """Prepare the block devices for an instance.
1945 This sets up the block devices on all nodes.
1948 instance: a ganeti.objects.Instance object
1949 ignore_secondaries: if true, errors on secondary nodes won't result
1950 in an error return from the function
1953 false if the operation failed
1954 list of (host, instance_visible_name, node_visible_name) if the operation
1955 suceeded with the mapping from node devices to instance devices
1959 iname = instance.name
1960 # With the two passes mechanism we try to reduce the window of
1961 # opportunity for the race condition of switching DRBD to primary
1962 # before handshaking occured, but we do not eliminate it
1964 # The proper fix would be to wait (with some limits) until the
1965 # connection has been made and drbd transitions from WFConnection
1966 # into any other network-connected state (Connected, SyncTarget,
1969 # 1st pass, assemble on all nodes in secondary mode
1970 for inst_disk in instance.disks:
1971 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1972 lu.cfg.SetDiskID(node_disk, node)
1973 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
1975 logger.Error("could not prepare block device %s on node %s"
1976 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1977 if not ignore_secondaries:
1980 # FIXME: race condition on drbd migration to primary
1982 # 2nd pass, do only the primary node
1983 for inst_disk in instance.disks:
1984 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1985 if node != instance.primary_node:
1987 lu.cfg.SetDiskID(node_disk, node)
1988 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
1990 logger.Error("could not prepare block device %s on node %s"
1991 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1993 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1995 # leave the disks configured for the primary node
1996 # this is a workaround that would be fixed better by
1997 # improving the logical/physical id handling
1998 for disk in instance.disks:
1999 lu.cfg.SetDiskID(disk, instance.primary_node)
2001 return disks_ok, device_info
2004 def _StartInstanceDisks(lu, instance, force):
2005 """Start the disks of an instance.
2008 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2009 ignore_secondaries=force)
2011 _ShutdownInstanceDisks(lu, instance)
2012 if force is not None and not force:
2013 logger.Error("If the message above refers to a secondary node,"
2014 " you can retry the operation using '--force'.")
2015 raise errors.OpExecError("Disk consistency error")
2018 class LUDeactivateInstanceDisks(NoHooksLU):
2019 """Shutdown an instance's disks.
2022 _OP_REQP = ["instance_name"]
2025 def ExpandNames(self):
2026 self._ExpandAndLockInstance()
2027 self.needed_locks[locking.LEVEL_NODE] = []
2028 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2030 def DeclareLocks(self, level):
2031 if level == locking.LEVEL_NODE:
2032 self._LockInstancesNodes()
2034 def CheckPrereq(self):
2035 """Check prerequisites.
2037 This checks that the instance is in the cluster.
2040 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2041 assert self.instance is not None, \
2042 "Cannot retrieve locked instance %s" % self.op.instance_name
2044 def Exec(self, feedback_fn):
2045 """Deactivate the disks
2048 instance = self.instance
2049 _SafeShutdownInstanceDisks(self, instance)
2052 def _SafeShutdownInstanceDisks(lu, instance):
2053 """Shutdown block devices of an instance.
2055 This function checks if an instance is running, before calling
2056 _ShutdownInstanceDisks.
2059 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2060 [instance.hypervisor])
2061 ins_l = ins_l[instance.primary_node]
2062 if not type(ins_l) is list:
2063 raise errors.OpExecError("Can't contact node '%s'" %
2064 instance.primary_node)
2066 if instance.name in ins_l:
2067 raise errors.OpExecError("Instance is running, can't shutdown"
2070 _ShutdownInstanceDisks(lu, instance)
2073 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2074 """Shutdown block devices of an instance.
2076 This does the shutdown on all nodes of the instance.
2078 If the ignore_primary is false, errors on the primary node are
2083 for disk in instance.disks:
2084 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2085 lu.cfg.SetDiskID(top_disk, node)
2086 if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2087 logger.Error("could not shutdown block device %s on node %s" %
2088 (disk.iv_name, node))
2089 if not ignore_primary or node != instance.primary_node:
2094 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2095 """Checks if a node has enough free memory.
2097 This function check if a given node has the needed amount of free
2098 memory. In case the node has less memory or we cannot get the
2099 information from the node, this function raise an OpPrereqError
2102 @type lu: C{LogicalUnit}
2103 @param lu: a logical unit from which we get configuration data
2105 @param node: the node to check
2106 @type reason: C{str}
2107 @param reason: string to use in the error message
2108 @type requested: C{int}
2109 @param requested: the amount of memory in MiB to check for
2110 @type hypervisor: C{str}
2111 @param hypervisor: the hypervisor to ask for memory stats
2112 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2113 we cannot check the node
2116 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2117 if not nodeinfo or not isinstance(nodeinfo, dict):
2118 raise errors.OpPrereqError("Could not contact node %s for resource"
2119 " information" % (node,))
2121 free_mem = nodeinfo[node].get('memory_free')
2122 if not isinstance(free_mem, int):
2123 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2124 " was '%s'" % (node, free_mem))
2125 if requested > free_mem:
2126 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2127 " needed %s MiB, available %s MiB" %
2128 (node, reason, requested, free_mem))
2131 class LUStartupInstance(LogicalUnit):
2132 """Starts an instance.
2135 HPATH = "instance-start"
2136 HTYPE = constants.HTYPE_INSTANCE
2137 _OP_REQP = ["instance_name", "force"]
2140 def ExpandNames(self):
2141 self._ExpandAndLockInstance()
2142 self.needed_locks[locking.LEVEL_NODE] = []
2143 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2145 def DeclareLocks(self, level):
2146 if level == locking.LEVEL_NODE:
2147 self._LockInstancesNodes()
2149 def BuildHooksEnv(self):
2152 This runs on master, primary and secondary nodes of the instance.
2156 "FORCE": self.op.force,
2158 env.update(_BuildInstanceHookEnvByObject(self.instance))
2159 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2160 list(self.instance.secondary_nodes))
2163 def CheckPrereq(self):
2164 """Check prerequisites.
2166 This checks that the instance is in the cluster.
2169 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2170 assert self.instance is not None, \
2171 "Cannot retrieve locked instance %s" % self.op.instance_name
2173 # check bridges existance
2174 _CheckInstanceBridgesExist(self, instance)
2176 _CheckNodeFreeMemory(self, instance.primary_node,
2177 "starting instance %s" % instance.name,
2178 instance.memory, instance.hypervisor)
2180 def Exec(self, feedback_fn):
2181 """Start the instance.
2184 instance = self.instance
2185 force = self.op.force
2186 extra_args = getattr(self.op, "extra_args", "")
2188 self.cfg.MarkInstanceUp(instance.name)
2190 node_current = instance.primary_node
2192 _StartInstanceDisks(self, instance, force)
2194 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2195 _ShutdownInstanceDisks(self, instance)
2196 raise errors.OpExecError("Could not start instance")
2199 class LURebootInstance(LogicalUnit):
2200 """Reboot an instance.
2203 HPATH = "instance-reboot"
2204 HTYPE = constants.HTYPE_INSTANCE
2205 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2208 def ExpandNames(self):
2209 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2210 constants.INSTANCE_REBOOT_HARD,
2211 constants.INSTANCE_REBOOT_FULL]:
2212 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2213 (constants.INSTANCE_REBOOT_SOFT,
2214 constants.INSTANCE_REBOOT_HARD,
2215 constants.INSTANCE_REBOOT_FULL))
2216 self._ExpandAndLockInstance()
2217 self.needed_locks[locking.LEVEL_NODE] = []
2218 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2220 def DeclareLocks(self, level):
2221 if level == locking.LEVEL_NODE:
2222 primary_only = not constants.INSTANCE_REBOOT_FULL
2223 self._LockInstancesNodes(primary_only=primary_only)
2225 def BuildHooksEnv(self):
2228 This runs on master, primary and secondary nodes of the instance.
2232 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2234 env.update(_BuildInstanceHookEnvByObject(self.instance))
2235 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2236 list(self.instance.secondary_nodes))
2239 def CheckPrereq(self):
2240 """Check prerequisites.
2242 This checks that the instance is in the cluster.
2245 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2246 assert self.instance is not None, \
2247 "Cannot retrieve locked instance %s" % self.op.instance_name
2249 # check bridges existance
2250 _CheckInstanceBridgesExist(self, instance)
2252 def Exec(self, feedback_fn):
2253 """Reboot the instance.
2256 instance = self.instance
2257 ignore_secondaries = self.op.ignore_secondaries
2258 reboot_type = self.op.reboot_type
2259 extra_args = getattr(self.op, "extra_args", "")
2261 node_current = instance.primary_node
2263 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2264 constants.INSTANCE_REBOOT_HARD]:
2265 if not self.rpc.call_instance_reboot(node_current, instance,
2266 reboot_type, extra_args):
2267 raise errors.OpExecError("Could not reboot instance")
2269 if not self.rpc.call_instance_shutdown(node_current, instance):
2270 raise errors.OpExecError("could not shutdown instance for full reboot")
2271 _ShutdownInstanceDisks(self, instance)
2272 _StartInstanceDisks(self, instance, ignore_secondaries)
2273 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2274 _ShutdownInstanceDisks(self, instance)
2275 raise errors.OpExecError("Could not start instance for full reboot")
2277 self.cfg.MarkInstanceUp(instance.name)
2280 class LUShutdownInstance(LogicalUnit):
2281 """Shutdown an instance.
2284 HPATH = "instance-stop"
2285 HTYPE = constants.HTYPE_INSTANCE
2286 _OP_REQP = ["instance_name"]
2289 def ExpandNames(self):
2290 self._ExpandAndLockInstance()
2291 self.needed_locks[locking.LEVEL_NODE] = []
2292 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2294 def DeclareLocks(self, level):
2295 if level == locking.LEVEL_NODE:
2296 self._LockInstancesNodes()
2298 def BuildHooksEnv(self):
2301 This runs on master, primary and secondary nodes of the instance.
2304 env = _BuildInstanceHookEnvByObject(self.instance)
2305 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2306 list(self.instance.secondary_nodes))
2309 def CheckPrereq(self):
2310 """Check prerequisites.
2312 This checks that the instance is in the cluster.
2315 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2316 assert self.instance is not None, \
2317 "Cannot retrieve locked instance %s" % self.op.instance_name
2319 def Exec(self, feedback_fn):
2320 """Shutdown the instance.
2323 instance = self.instance
2324 node_current = instance.primary_node
2325 self.cfg.MarkInstanceDown(instance.name)
2326 if not self.rpc.call_instance_shutdown(node_current, instance):
2327 logger.Error("could not shutdown instance")
2329 _ShutdownInstanceDisks(self, instance)
2332 class LUReinstallInstance(LogicalUnit):
2333 """Reinstall an instance.
2336 HPATH = "instance-reinstall"
2337 HTYPE = constants.HTYPE_INSTANCE
2338 _OP_REQP = ["instance_name"]
2341 def ExpandNames(self):
2342 self._ExpandAndLockInstance()
2343 self.needed_locks[locking.LEVEL_NODE] = []
2344 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2346 def DeclareLocks(self, level):
2347 if level == locking.LEVEL_NODE:
2348 self._LockInstancesNodes()
2350 def BuildHooksEnv(self):
2353 This runs on master, primary and secondary nodes of the instance.
2356 env = _BuildInstanceHookEnvByObject(self.instance)
2357 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2358 list(self.instance.secondary_nodes))
2361 def CheckPrereq(self):
2362 """Check prerequisites.
2364 This checks that the instance is in the cluster and is not running.
2367 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2368 assert instance is not None, \
2369 "Cannot retrieve locked instance %s" % self.op.instance_name
2371 if instance.disk_template == constants.DT_DISKLESS:
2372 raise errors.OpPrereqError("Instance '%s' has no disks" %
2373 self.op.instance_name)
2374 if instance.status != "down":
2375 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2376 self.op.instance_name)
2377 remote_info = self.rpc.call_instance_info(instance.primary_node,
2379 instance.hypervisor)
2381 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2382 (self.op.instance_name,
2383 instance.primary_node))
2385 self.op.os_type = getattr(self.op, "os_type", None)
2386 if self.op.os_type is not None:
2388 pnode = self.cfg.GetNodeInfo(
2389 self.cfg.ExpandNodeName(instance.primary_node))
2391 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2393 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2395 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2396 " primary node" % self.op.os_type)
2398 self.instance = instance
2400 def Exec(self, feedback_fn):
2401 """Reinstall the instance.
2404 inst = self.instance
2406 if self.op.os_type is not None:
2407 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2408 inst.os = self.op.os_type
2409 self.cfg.Update(inst)
2411 _StartInstanceDisks(self, inst, None)
2413 feedback_fn("Running the instance OS create scripts...")
2414 if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2416 raise errors.OpExecError("Could not install OS for instance %s"
2418 (inst.name, inst.primary_node))
2420 _ShutdownInstanceDisks(self, inst)
2423 class LURenameInstance(LogicalUnit):
2424 """Rename an instance.
2427 HPATH = "instance-rename"
2428 HTYPE = constants.HTYPE_INSTANCE
2429 _OP_REQP = ["instance_name", "new_name"]
2431 def BuildHooksEnv(self):
2434 This runs on master, primary and secondary nodes of the instance.
2437 env = _BuildInstanceHookEnvByObject(self.instance)
2438 env["INSTANCE_NEW_NAME"] = self.op.new_name
2439 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2440 list(self.instance.secondary_nodes))
2443 def CheckPrereq(self):
2444 """Check prerequisites.
2446 This checks that the instance is in the cluster and is not running.
2449 instance = self.cfg.GetInstanceInfo(
2450 self.cfg.ExpandInstanceName(self.op.instance_name))
2451 if instance is None:
2452 raise errors.OpPrereqError("Instance '%s' not known" %
2453 self.op.instance_name)
2454 if instance.status != "down":
2455 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2456 self.op.instance_name)
2457 remote_info = self.rpc.call_instance_info(instance.primary_node,
2459 instance.hypervisor)
2461 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2462 (self.op.instance_name,
2463 instance.primary_node))
2464 self.instance = instance
2466 # new name verification
2467 name_info = utils.HostInfo(self.op.new_name)
2469 self.op.new_name = new_name = name_info.name
2470 instance_list = self.cfg.GetInstanceList()
2471 if new_name in instance_list:
2472 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2475 if not getattr(self.op, "ignore_ip", False):
2476 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2477 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2478 (name_info.ip, new_name))
2481 def Exec(self, feedback_fn):
2482 """Reinstall the instance.
2485 inst = self.instance
2486 old_name = inst.name
2488 if inst.disk_template == constants.DT_FILE:
2489 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2491 self.cfg.RenameInstance(inst.name, self.op.new_name)
2492 # Change the instance lock. This is definitely safe while we hold the BGL
2493 self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2494 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2496 # re-read the instance from the configuration after rename
2497 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2499 if inst.disk_template == constants.DT_FILE:
2500 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2501 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2502 old_file_storage_dir,
2503 new_file_storage_dir)
2506 raise errors.OpExecError("Could not connect to node '%s' to rename"
2507 " directory '%s' to '%s' (but the instance"
2508 " has been renamed in Ganeti)" % (
2509 inst.primary_node, old_file_storage_dir,
2510 new_file_storage_dir))
2513 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2514 " (but the instance has been renamed in"
2515 " Ganeti)" % (old_file_storage_dir,
2516 new_file_storage_dir))
2518 _StartInstanceDisks(self, inst, None)
2520 if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2523 msg = ("Could not run OS rename script for instance %s on node %s"
2524 " (but the instance has been renamed in Ganeti)" %
2525 (inst.name, inst.primary_node))
2528 _ShutdownInstanceDisks(self, inst)
2531 class LURemoveInstance(LogicalUnit):
2532 """Remove an instance.
2535 HPATH = "instance-remove"
2536 HTYPE = constants.HTYPE_INSTANCE
2537 _OP_REQP = ["instance_name", "ignore_failures"]
2540 def ExpandNames(self):
2541 self._ExpandAndLockInstance()
2542 self.needed_locks[locking.LEVEL_NODE] = []
2543 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2545 def DeclareLocks(self, level):
2546 if level == locking.LEVEL_NODE:
2547 self._LockInstancesNodes()
2549 def BuildHooksEnv(self):
2552 This runs on master, primary and secondary nodes of the instance.
2555 env = _BuildInstanceHookEnvByObject(self.instance)
2556 nl = [self.cfg.GetMasterNode()]
2559 def CheckPrereq(self):
2560 """Check prerequisites.
2562 This checks that the instance is in the cluster.
2565 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2566 assert self.instance is not None, \
2567 "Cannot retrieve locked instance %s" % self.op.instance_name
2569 def Exec(self, feedback_fn):
2570 """Remove the instance.
2573 instance = self.instance
2574 logger.Info("shutting down instance %s on node %s" %
2575 (instance.name, instance.primary_node))
2577 if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2578 if self.op.ignore_failures:
2579 feedback_fn("Warning: can't shutdown instance")
2581 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2582 (instance.name, instance.primary_node))
2584 logger.Info("removing block devices for instance %s" % instance.name)
2586 if not _RemoveDisks(self, instance):
2587 if self.op.ignore_failures:
2588 feedback_fn("Warning: can't remove instance's disks")
2590 raise errors.OpExecError("Can't remove instance's disks")
2592 logger.Info("removing instance %s out of cluster config" % instance.name)
2594 self.cfg.RemoveInstance(instance.name)
2595 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2598 class LUQueryInstances(NoHooksLU):
2599 """Logical unit for querying instances.
2602 _OP_REQP = ["output_fields", "names"]
2605 def ExpandNames(self):
2606 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2607 self.static_fields = frozenset([
2608 "name", "os", "pnode", "snodes",
2609 "admin_state", "admin_ram",
2610 "disk_template", "ip", "mac", "bridge",
2611 "sda_size", "sdb_size", "vcpus", "tags",
2612 "network_port", "kernel_path", "initrd_path",
2613 "hvm_boot_order", "hvm_acpi", "hvm_pae",
2614 "hvm_cdrom_image_path", "hvm_nic_type",
2615 "hvm_disk_type", "vnc_bind_address",
2616 "serial_no", "hypervisor",
2618 _CheckOutputFields(static=self.static_fields,
2619 dynamic=self.dynamic_fields,
2620 selected=self.op.output_fields)
2622 self.needed_locks = {}
2623 self.share_locks[locking.LEVEL_INSTANCE] = 1
2624 self.share_locks[locking.LEVEL_NODE] = 1
2627 self.wanted = _GetWantedInstances(self, self.op.names)
2629 self.wanted = locking.ALL_SET
2631 self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2633 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2634 self.needed_locks[locking.LEVEL_NODE] = []
2635 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2637 def DeclareLocks(self, level):
2638 if level == locking.LEVEL_NODE and self.do_locking:
2639 self._LockInstancesNodes()
2641 def CheckPrereq(self):
2642 """Check prerequisites.
2647 def Exec(self, feedback_fn):
2648 """Computes the list of nodes and their attributes.
2651 all_info = self.cfg.GetAllInstancesInfo()
2653 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2654 elif self.wanted != locking.ALL_SET:
2655 instance_names = self.wanted
2656 missing = set(instance_names).difference(all_info.keys())
2658 raise errors.OpExecError(
2659 "Some instances were removed before retrieving their data: %s"
2662 instance_names = all_info.keys()
2663 instance_list = [all_info[iname] for iname in instance_names]
2665 # begin data gathering
2667 nodes = frozenset([inst.primary_node for inst in instance_list])
2668 hv_list = list(set([inst.hypervisor for inst in instance_list]))
2671 if self.dynamic_fields.intersection(self.op.output_fields):
2673 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2675 result = node_data[name]
2677 live_data.update(result)
2678 elif result == False:
2679 bad_nodes.append(name)
2680 # else no instance is alive
2682 live_data = dict([(name, {}) for name in instance_names])
2684 # end data gathering
2687 for instance in instance_list:
2689 for field in self.op.output_fields:
2694 elif field == "pnode":
2695 val = instance.primary_node
2696 elif field == "snodes":
2697 val = list(instance.secondary_nodes)
2698 elif field == "admin_state":
2699 val = (instance.status != "down")
2700 elif field == "oper_state":
2701 if instance.primary_node in bad_nodes:
2704 val = bool(live_data.get(instance.name))
2705 elif field == "status":
2706 if instance.primary_node in bad_nodes:
2707 val = "ERROR_nodedown"
2709 running = bool(live_data.get(instance.name))
2711 if instance.status != "down":
2716 if instance.status != "down":
2720 elif field == "admin_ram":
2721 val = instance.memory
2722 elif field == "oper_ram":
2723 if instance.primary_node in bad_nodes:
2725 elif instance.name in live_data:
2726 val = live_data[instance.name].get("memory", "?")
2729 elif field == "disk_template":
2730 val = instance.disk_template
2732 val = instance.nics[0].ip
2733 elif field == "bridge":
2734 val = instance.nics[0].bridge
2735 elif field == "mac":
2736 val = instance.nics[0].mac
2737 elif field == "sda_size" or field == "sdb_size":
2738 disk = instance.FindDisk(field[:3])
2743 elif field == "vcpus":
2744 val = instance.vcpus
2745 elif field == "tags":
2746 val = list(instance.GetTags())
2747 elif field == "serial_no":
2748 val = instance.serial_no
2749 elif field in ("network_port", "kernel_path", "initrd_path",
2750 "hvm_boot_order", "hvm_acpi", "hvm_pae",
2751 "hvm_cdrom_image_path", "hvm_nic_type",
2752 "hvm_disk_type", "vnc_bind_address"):
2753 val = getattr(instance, field, None)
2756 elif field in ("hvm_nic_type", "hvm_disk_type",
2757 "kernel_path", "initrd_path"):
2761 elif field == "hypervisor":
2762 val = instance.hypervisor
2764 raise errors.ParameterError(field)
2771 class LUFailoverInstance(LogicalUnit):
2772 """Failover an instance.
2775 HPATH = "instance-failover"
2776 HTYPE = constants.HTYPE_INSTANCE
2777 _OP_REQP = ["instance_name", "ignore_consistency"]
2780 def ExpandNames(self):
2781 self._ExpandAndLockInstance()
2782 self.needed_locks[locking.LEVEL_NODE] = []
2783 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2785 def DeclareLocks(self, level):
2786 if level == locking.LEVEL_NODE:
2787 self._LockInstancesNodes()
2789 def BuildHooksEnv(self):
2792 This runs on master, primary and secondary nodes of the instance.
2796 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2798 env.update(_BuildInstanceHookEnvByObject(self.instance))
2799 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2802 def CheckPrereq(self):
2803 """Check prerequisites.
2805 This checks that the instance is in the cluster.
2808 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2809 assert self.instance is not None, \
2810 "Cannot retrieve locked instance %s" % self.op.instance_name
2812 if instance.disk_template not in constants.DTS_NET_MIRROR:
2813 raise errors.OpPrereqError("Instance's disk layout is not"
2814 " network mirrored, cannot failover.")
2816 secondary_nodes = instance.secondary_nodes
2817 if not secondary_nodes:
2818 raise errors.ProgrammerError("no secondary node but using "
2819 "a mirrored disk template")
2821 target_node = secondary_nodes[0]
2822 # check memory requirements on the secondary node
2823 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2824 instance.name, instance.memory,
2825 instance.hypervisor)
2827 # check bridge existance
2828 brlist = [nic.bridge for nic in instance.nics]
2829 if not self.rpc.call_bridges_exist(target_node, brlist):
2830 raise errors.OpPrereqError("One or more target bridges %s does not"
2831 " exist on destination node '%s'" %
2832 (brlist, target_node))
2834 def Exec(self, feedback_fn):
2835 """Failover an instance.
2837 The failover is done by shutting it down on its present node and
2838 starting it on the secondary.
2841 instance = self.instance
2843 source_node = instance.primary_node
2844 target_node = instance.secondary_nodes[0]
2846 feedback_fn("* checking disk consistency between source and target")
2847 for dev in instance.disks:
2848 # for drbd, these are drbd over lvm
2849 if not _CheckDiskConsistency(self, dev, target_node, False):
2850 if instance.status == "up" and not self.op.ignore_consistency:
2851 raise errors.OpExecError("Disk %s is degraded on target node,"
2852 " aborting failover." % dev.iv_name)
2854 feedback_fn("* shutting down instance on source node")
2855 logger.Info("Shutting down instance %s on node %s" %
2856 (instance.name, source_node))
2858 if not self.rpc.call_instance_shutdown(source_node, instance):
2859 if self.op.ignore_consistency:
2860 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2861 " anyway. Please make sure node %s is down" %
2862 (instance.name, source_node, source_node))
2864 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2865 (instance.name, source_node))
2867 feedback_fn("* deactivating the instance's disks on source node")
2868 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2869 raise errors.OpExecError("Can't shut down the instance's disks.")
2871 instance.primary_node = target_node
2872 # distribute new instance config to the other nodes
2873 self.cfg.Update(instance)
2875 # Only start the instance if it's marked as up
2876 if instance.status == "up":
2877 feedback_fn("* activating the instance's disks on target node")
2878 logger.Info("Starting instance %s on node %s" %
2879 (instance.name, target_node))
2881 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2882 ignore_secondaries=True)
2884 _ShutdownInstanceDisks(self, instance)
2885 raise errors.OpExecError("Can't activate the instance's disks")
2887 feedback_fn("* starting the instance on the target node")
2888 if not self.rpc.call_instance_start(target_node, instance, None):
2889 _ShutdownInstanceDisks(self, instance)
2890 raise errors.OpExecError("Could not start instance %s on node %s." %
2891 (instance.name, target_node))
2894 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2895 """Create a tree of block devices on the primary node.
2897 This always creates all devices.
2901 for child in device.children:
2902 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2905 lu.cfg.SetDiskID(device, node)
2906 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2907 instance.name, True, info)
2910 if device.physical_id is None:
2911 device.physical_id = new_id
2915 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2916 """Create a tree of block devices on a secondary node.
2918 If this device type has to be created on secondaries, create it and
2921 If not, just recurse to children keeping the same 'force' value.
2924 if device.CreateOnSecondary():
2927 for child in device.children:
2928 if not _CreateBlockDevOnSecondary(lu, node, instance,
2929 child, force, info):
2934 lu.cfg.SetDiskID(device, node)
2935 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2936 instance.name, False, info)
2939 if device.physical_id is None:
2940 device.physical_id = new_id
2944 def _GenerateUniqueNames(lu, exts):
2945 """Generate a suitable LV name.
2947 This will generate a logical volume name for the given instance.
2952 new_id = lu.cfg.GenerateUniqueID()
2953 results.append("%s%s" % (new_id, val))
2957 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
2959 """Generate a drbd8 device complete with its children.
2962 port = lu.cfg.AllocatePort()
2963 vgname = lu.cfg.GetVGName()
2964 shared_secret = lu.cfg.GenerateDRBDSecret()
2965 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2966 logical_id=(vgname, names[0]))
2967 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2968 logical_id=(vgname, names[1]))
2969 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2970 logical_id=(primary, secondary, port,
2973 children=[dev_data, dev_meta],
2978 def _GenerateDiskTemplate(lu, template_name,
2979 instance_name, primary_node,
2980 secondary_nodes, disk_sz, swap_sz,
2981 file_storage_dir, file_driver):
2982 """Generate the entire disk layout for a given template type.
2985 #TODO: compute space requirements
2987 vgname = lu.cfg.GetVGName()
2988 if template_name == constants.DT_DISKLESS:
2990 elif template_name == constants.DT_PLAIN:
2991 if len(secondary_nodes) != 0:
2992 raise errors.ProgrammerError("Wrong template configuration")
2994 names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
2995 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2996 logical_id=(vgname, names[0]),
2998 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2999 logical_id=(vgname, names[1]),
3001 disks = [sda_dev, sdb_dev]
3002 elif template_name == constants.DT_DRBD8:
3003 if len(secondary_nodes) != 1:
3004 raise errors.ProgrammerError("Wrong template configuration")
3005 remote_node = secondary_nodes[0]
3006 (minor_pa, minor_pb,
3007 minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3008 [primary_node, primary_node, remote_node, remote_node], instance_name)
3010 names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3011 ".sdb_data", ".sdb_meta"])
3012 drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3013 disk_sz, names[0:2], "sda",
3015 drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3016 swap_sz, names[2:4], "sdb",
3018 disks = [drbd_sda_dev, drbd_sdb_dev]
3019 elif template_name == constants.DT_FILE:
3020 if len(secondary_nodes) != 0:
3021 raise errors.ProgrammerError("Wrong template configuration")
3023 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3024 iv_name="sda", logical_id=(file_driver,
3025 "%s/sda" % file_storage_dir))
3026 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3027 iv_name="sdb", logical_id=(file_driver,
3028 "%s/sdb" % file_storage_dir))
3029 disks = [file_sda_dev, file_sdb_dev]
3031 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3035 def _GetInstanceInfoText(instance):
3036 """Compute that text that should be added to the disk's metadata.
3039 return "originstname+%s" % instance.name
3042 def _CreateDisks(lu, instance):
3043 """Create all disks for an instance.
3045 This abstracts away some work from AddInstance.
3048 instance: the instance object
3051 True or False showing the success of the creation process
3054 info = _GetInstanceInfoText(instance)
3056 if instance.disk_template == constants.DT_FILE:
3057 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3058 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3062 logger.Error("Could not connect to node '%s'" % instance.primary_node)
3066 logger.Error("failed to create directory '%s'" % file_storage_dir)
3069 for device in instance.disks:
3070 logger.Info("creating volume %s for instance %s" %
3071 (device.iv_name, instance.name))
3073 for secondary_node in instance.secondary_nodes:
3074 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3075 device, False, info):
3076 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3077 (device.iv_name, device, secondary_node))
3080 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3081 instance, device, info):
3082 logger.Error("failed to create volume %s on primary!" %
3089 def _RemoveDisks(lu, instance):
3090 """Remove all disks for an instance.
3092 This abstracts away some work from `AddInstance()` and
3093 `RemoveInstance()`. Note that in case some of the devices couldn't
3094 be removed, the removal will continue with the other ones (compare
3095 with `_CreateDisks()`).
3098 instance: the instance object
3101 True or False showing the success of the removal proces
3104 logger.Info("removing block devices for instance %s" % instance.name)
3107 for device in instance.disks:
3108 for node, disk in device.ComputeNodeTree(instance.primary_node):
3109 lu.cfg.SetDiskID(disk, node)
3110 if not lu.rpc.call_blockdev_remove(node, disk):
3111 logger.Error("could not remove block device %s on node %s,"
3112 " continuing anyway" %
3113 (device.iv_name, node))
3116 if instance.disk_template == constants.DT_FILE:
3117 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3118 if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3120 logger.Error("could not remove directory '%s'" % file_storage_dir)
3126 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3127 """Compute disk size requirements in the volume group
3129 This is currently hard-coded for the two-drive layout.
3132 # Required free disk space as a function of disk and swap space
3134 constants.DT_DISKLESS: None,
3135 constants.DT_PLAIN: disk_size + swap_size,
3136 # 256 MB are added for drbd metadata, 128MB for each drbd device
3137 constants.DT_DRBD8: disk_size + swap_size + 256,
3138 constants.DT_FILE: None,
3141 if disk_template not in req_size_dict:
3142 raise errors.ProgrammerError("Disk template '%s' size requirement"
3143 " is unknown" % disk_template)
3145 return req_size_dict[disk_template]
3148 class LUCreateInstance(LogicalUnit):
3149 """Create an instance.
3152 HPATH = "instance-add"
3153 HTYPE = constants.HTYPE_INSTANCE
3154 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3155 "disk_template", "swap_size", "mode", "start", "vcpus",
3156 "wait_for_sync", "ip_check", "mac"]
3159 def _ExpandNode(self, node):
3160 """Expands and checks one node name.
3163 node_full = self.cfg.ExpandNodeName(node)
3164 if node_full is None:
3165 raise errors.OpPrereqError("Unknown node %s" % node)
3168 def ExpandNames(self):
3169 """ExpandNames for CreateInstance.
3171 Figure out the right locks for instance creation.
3174 self.needed_locks = {}
3176 # set optional parameters to none if they don't exist
3177 for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3178 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3179 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3180 "vnc_bind_address", "hypervisor"]:
3181 if not hasattr(self.op, attr):
3182 setattr(self.op, attr, None)
3184 # cheap checks, mostly valid constants given
3186 # verify creation mode
3187 if self.op.mode not in (constants.INSTANCE_CREATE,
3188 constants.INSTANCE_IMPORT):
3189 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3192 # disk template and mirror node verification
3193 if self.op.disk_template not in constants.DISK_TEMPLATES:
3194 raise errors.OpPrereqError("Invalid disk template name")
3196 if self.op.hypervisor is None:
3197 self.op.hypervisor = self.cfg.GetHypervisorType()
3199 enabled_hvs = self.cfg.GetClusterInfo().enabled_hypervisors
3200 if self.op.hypervisor not in enabled_hvs:
3201 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3202 " cluster (%s)" % (self.op.hypervisor,
3203 ",".join(enabled_hvs)))
3205 #### instance parameters check
3207 # instance name verification
3208 hostname1 = utils.HostInfo(self.op.instance_name)
3209 self.op.instance_name = instance_name = hostname1.name
3211 # this is just a preventive check, but someone might still add this
3212 # instance in the meantime, and creation will fail at lock-add time
3213 if instance_name in self.cfg.GetInstanceList():
3214 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3217 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3219 # ip validity checks
3220 ip = getattr(self.op, "ip", None)
3221 if ip is None or ip.lower() == "none":
3223 elif ip.lower() == "auto":
3224 inst_ip = hostname1.ip
3226 if not utils.IsValidIP(ip):
3227 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3228 " like a valid IP" % ip)
3230 self.inst_ip = self.op.ip = inst_ip
3231 # used in CheckPrereq for ip ping check
3232 self.check_ip = hostname1.ip
3234 # MAC address verification
3235 if self.op.mac != "auto":
3236 if not utils.IsValidMac(self.op.mac.lower()):
3237 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3240 # boot order verification
3241 if self.op.hvm_boot_order is not None:
3242 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3243 raise errors.OpPrereqError("invalid boot order specified,"
3244 " must be one or more of [acdn]")
3245 # file storage checks
3246 if (self.op.file_driver and
3247 not self.op.file_driver in constants.FILE_DRIVER):
3248 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3249 self.op.file_driver)
3251 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3252 raise errors.OpPrereqError("File storage directory path not absolute")
3254 ### Node/iallocator related checks
3255 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3256 raise errors.OpPrereqError("One and only one of iallocator and primary"
3257 " node must be given")
3259 if self.op.iallocator:
3260 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3262 self.op.pnode = self._ExpandNode(self.op.pnode)
3263 nodelist = [self.op.pnode]
3264 if self.op.snode is not None:
3265 self.op.snode = self._ExpandNode(self.op.snode)
3266 nodelist.append(self.op.snode)
3267 self.needed_locks[locking.LEVEL_NODE] = nodelist
3269 # in case of import lock the source node too
3270 if self.op.mode == constants.INSTANCE_IMPORT:
3271 src_node = getattr(self.op, "src_node", None)
3272 src_path = getattr(self.op, "src_path", None)
3274 if src_node is None or src_path is None:
3275 raise errors.OpPrereqError("Importing an instance requires source"
3276 " node and path options")
3278 if not os.path.isabs(src_path):
3279 raise errors.OpPrereqError("The source path must be absolute")
3281 self.op.src_node = src_node = self._ExpandNode(src_node)
3282 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3283 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3285 else: # INSTANCE_CREATE
3286 if getattr(self.op, "os_type", None) is None:
3287 raise errors.OpPrereqError("No guest OS specified")
3289 def _RunAllocator(self):
3290 """Run the allocator based on input opcode.
3293 disks = [{"size": self.op.disk_size, "mode": "w"},
3294 {"size": self.op.swap_size, "mode": "w"}]
3295 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3296 "bridge": self.op.bridge}]
3297 ial = IAllocator(self,
3298 mode=constants.IALLOCATOR_MODE_ALLOC,
3299 name=self.op.instance_name,
3300 disk_template=self.op.disk_template,
3303 vcpus=self.op.vcpus,
3304 mem_size=self.op.mem_size,
3309 ial.Run(self.op.iallocator)
3312 raise errors.OpPrereqError("Can't compute nodes using"
3313 " iallocator '%s': %s" % (self.op.iallocator,
3315 if len(ial.nodes) != ial.required_nodes:
3316 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3317 " of nodes (%s), required %s" %
3318 (self.op.iallocator, len(ial.nodes),
3319 ial.required_nodes))
3320 self.op.pnode = ial.nodes[0]
3321 logger.ToStdout("Selected nodes for the instance: %s" %
3322 (", ".join(ial.nodes),))
3323 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3324 (self.op.instance_name, self.op.iallocator, ial.nodes))
3325 if ial.required_nodes == 2:
3326 self.op.snode = ial.nodes[1]
3328 def BuildHooksEnv(self):
3331 This runs on master, primary and secondary nodes of the instance.
3335 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3336 "INSTANCE_DISK_SIZE": self.op.disk_size,
3337 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3338 "INSTANCE_ADD_MODE": self.op.mode,
3340 if self.op.mode == constants.INSTANCE_IMPORT:
3341 env["INSTANCE_SRC_NODE"] = self.op.src_node
3342 env["INSTANCE_SRC_PATH"] = self.op.src_path
3343 env["INSTANCE_SRC_IMAGE"] = self.src_image
3345 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3346 primary_node=self.op.pnode,
3347 secondary_nodes=self.secondaries,
3348 status=self.instance_status,
3349 os_type=self.op.os_type,
3350 memory=self.op.mem_size,
3351 vcpus=self.op.vcpus,
3352 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3355 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3360 def CheckPrereq(self):
3361 """Check prerequisites.
3364 if (not self.cfg.GetVGName() and
3365 self.op.disk_template not in constants.DTS_NOT_LVM):
3366 raise errors.OpPrereqError("Cluster does not support lvm-based"
3370 if self.op.mode == constants.INSTANCE_IMPORT:
3371 src_node = self.op.src_node
3372 src_path = self.op.src_path
3374 export_info = self.rpc.call_export_info(src_node, src_path)
3377 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3379 if not export_info.has_section(constants.INISECT_EXP):
3380 raise errors.ProgrammerError("Corrupted export config")
3382 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3383 if (int(ei_version) != constants.EXPORT_VERSION):
3384 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3385 (ei_version, constants.EXPORT_VERSION))
3387 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3388 raise errors.OpPrereqError("Can't import instance with more than"
3391 # FIXME: are the old os-es, disk sizes, etc. useful?
3392 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3393 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3395 self.src_image = diskimage
3397 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3399 if self.op.start and not self.op.ip_check:
3400 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3401 " adding an instance in start mode")
3403 if self.op.ip_check:
3404 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3405 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3406 (self.check_ip, self.op.instance_name))
3408 # bridge verification
3409 bridge = getattr(self.op, "bridge", None)
3411 self.op.bridge = self.cfg.GetDefBridge()
3413 self.op.bridge = bridge
3417 if self.op.iallocator is not None:
3418 self._RunAllocator()
3420 #### node related checks
3422 # check primary node
3423 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3424 assert self.pnode is not None, \
3425 "Cannot retrieve locked node %s" % self.op.pnode
3426 self.secondaries = []
3428 # mirror node verification
3429 if self.op.disk_template in constants.DTS_NET_MIRROR:
3430 if self.op.snode is None:
3431 raise errors.OpPrereqError("The networked disk templates need"
3433 if self.op.snode == pnode.name:
3434 raise errors.OpPrereqError("The secondary node cannot be"
3435 " the primary node.")
3436 self.secondaries.append(self.op.snode)
3438 req_size = _ComputeDiskSize(self.op.disk_template,
3439 self.op.disk_size, self.op.swap_size)
3441 # Check lv size requirements
3442 if req_size is not None:
3443 nodenames = [pnode.name] + self.secondaries
3444 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3446 for node in nodenames:
3447 info = nodeinfo.get(node, None)
3449 raise errors.OpPrereqError("Cannot get current information"
3450 " from node '%s'" % node)
3451 vg_free = info.get('vg_free', None)
3452 if not isinstance(vg_free, int):
3453 raise errors.OpPrereqError("Can't compute free disk space on"
3455 if req_size > info['vg_free']:
3456 raise errors.OpPrereqError("Not enough disk space on target node %s."
3457 " %d MB available, %d MB required" %
3458 (node, info['vg_free'], req_size))
3461 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3463 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3464 " primary node" % self.op.os_type)
3466 if self.op.kernel_path == constants.VALUE_NONE:
3467 raise errors.OpPrereqError("Can't set instance kernel to none")
3469 # bridge check on primary node
3470 if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3471 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3472 " destination node '%s'" %
3473 (self.op.bridge, pnode.name))
3475 # memory check on primary node
3477 _CheckNodeFreeMemory(self, self.pnode.name,
3478 "creating instance %s" % self.op.instance_name,
3479 self.op.mem_size, self.op.hypervisor)
3481 # hvm_cdrom_image_path verification
3482 if self.op.hvm_cdrom_image_path is not None:
3483 # FIXME (als): shouldn't these checks happen on the destination node?
3484 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3485 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3486 " be an absolute path or None, not %s" %
3487 self.op.hvm_cdrom_image_path)
3488 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3489 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3490 " regular file or a symlink pointing to"
3491 " an existing regular file, not %s" %
3492 self.op.hvm_cdrom_image_path)
3494 # vnc_bind_address verification
3495 if self.op.vnc_bind_address is not None:
3496 if not utils.IsValidIP(self.op.vnc_bind_address):
3497 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3498 " like a valid IP address" %
3499 self.op.vnc_bind_address)
3501 # Xen HVM device type checks
3502 if self.op.hypervisor == constants.HT_XEN_HVM:
3503 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3504 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3505 " hypervisor" % self.op.hvm_nic_type)
3506 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3507 raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3508 " hypervisor" % self.op.hvm_disk_type)
3511 self.instance_status = 'up'
3513 self.instance_status = 'down'
3515 def Exec(self, feedback_fn):
3516 """Create and add the instance to the cluster.
3519 instance = self.op.instance_name
3520 pnode_name = self.pnode.name
3522 if self.op.mac == "auto":
3523 mac_address = self.cfg.GenerateMAC()
3525 mac_address = self.op.mac
3527 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3528 if self.inst_ip is not None:
3529 nic.ip = self.inst_ip
3531 ht_kind = self.op.hypervisor
3532 if ht_kind in constants.HTS_REQ_PORT:
3533 network_port = self.cfg.AllocatePort()
3537 if self.op.vnc_bind_address is None:
3538 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3540 # this is needed because os.path.join does not accept None arguments
3541 if self.op.file_storage_dir is None:
3542 string_file_storage_dir = ""
3544 string_file_storage_dir = self.op.file_storage_dir
3546 # build the full file storage dir path
3547 file_storage_dir = os.path.normpath(os.path.join(
3548 self.cfg.GetFileStorageDir(),
3549 string_file_storage_dir, instance))
3552 disks = _GenerateDiskTemplate(self,
3553 self.op.disk_template,
3554 instance, pnode_name,
3555 self.secondaries, self.op.disk_size,
3558 self.op.file_driver)
3560 iobj = objects.Instance(name=instance, os=self.op.os_type,
3561 primary_node=pnode_name,
3562 memory=self.op.mem_size,
3563 vcpus=self.op.vcpus,
3564 nics=[nic], disks=disks,
3565 disk_template=self.op.disk_template,
3566 status=self.instance_status,
3567 network_port=network_port,
3568 kernel_path=self.op.kernel_path,
3569 initrd_path=self.op.initrd_path,
3570 hvm_boot_order=self.op.hvm_boot_order,
3571 hvm_acpi=self.op.hvm_acpi,
3572 hvm_pae=self.op.hvm_pae,
3573 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3574 vnc_bind_address=self.op.vnc_bind_address,
3575 hvm_nic_type=self.op.hvm_nic_type,
3576 hvm_disk_type=self.op.hvm_disk_type,
3577 hypervisor=self.op.hypervisor,
3580 feedback_fn("* creating instance disks...")
3581 if not _CreateDisks(self, iobj):
3582 _RemoveDisks(self, iobj)
3583 self.cfg.ReleaseDRBDMinors(instance)
3584 raise errors.OpExecError("Device creation failed, reverting...")
3586 feedback_fn("adding instance %s to cluster config" % instance)
3588 self.cfg.AddInstance(iobj)
3589 # Declare that we don't want to remove the instance lock anymore, as we've
3590 # added the instance to the config
3591 del self.remove_locks[locking.LEVEL_INSTANCE]
3592 # Remove the temp. assignements for the instance's drbds
3593 self.cfg.ReleaseDRBDMinors(instance)
3595 if self.op.wait_for_sync:
3596 disk_abort = not _WaitForSync(self, iobj)
3597 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3598 # make sure the disks are not degraded (still sync-ing is ok)
3600 feedback_fn("* checking mirrors status")
3601 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3606 _RemoveDisks(self, iobj)
3607 self.cfg.RemoveInstance(iobj.name)
3608 # Make sure the instance lock gets removed
3609 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3610 raise errors.OpExecError("There are some degraded disks for"
3613 feedback_fn("creating os for instance %s on node %s" %
3614 (instance, pnode_name))
3616 if iobj.disk_template != constants.DT_DISKLESS:
3617 if self.op.mode == constants.INSTANCE_CREATE:
3618 feedback_fn("* running the instance OS create scripts...")
3619 if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3620 raise errors.OpExecError("could not add os for instance %s"
3622 (instance, pnode_name))
3624 elif self.op.mode == constants.INSTANCE_IMPORT:
3625 feedback_fn("* running the instance OS import scripts...")
3626 src_node = self.op.src_node
3627 src_image = self.src_image
3628 cluster_name = self.cfg.GetClusterName()
3629 if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3630 src_node, src_image,
3632 raise errors.OpExecError("Could not import os for instance"
3634 (instance, pnode_name))
3636 # also checked in the prereq part
3637 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3641 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3642 feedback_fn("* starting instance...")
3643 if not self.rpc.call_instance_start(pnode_name, iobj, None):
3644 raise errors.OpExecError("Could not start instance")
3647 class LUConnectConsole(NoHooksLU):
3648 """Connect to an instance's console.
3650 This is somewhat special in that it returns the command line that
3651 you need to run on the master node in order to connect to the
3655 _OP_REQP = ["instance_name"]
3658 def ExpandNames(self):
3659 self._ExpandAndLockInstance()
3661 def CheckPrereq(self):
3662 """Check prerequisites.
3664 This checks that the instance is in the cluster.
3667 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3668 assert self.instance is not None, \
3669 "Cannot retrieve locked instance %s" % self.op.instance_name
3671 def Exec(self, feedback_fn):
3672 """Connect to the console of an instance
3675 instance = self.instance
3676 node = instance.primary_node
3678 node_insts = self.rpc.call_instance_list([node],
3679 [instance.hypervisor])[node]
3680 if node_insts is False:
3681 raise errors.OpExecError("Can't connect to node %s." % node)
3683 if instance.name not in node_insts:
3684 raise errors.OpExecError("Instance %s is not running." % instance.name)
3686 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3688 hyper = hypervisor.GetHypervisor(instance.hypervisor)
3689 console_cmd = hyper.GetShellCommandForConsole(instance)
3692 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3695 class LUReplaceDisks(LogicalUnit):
3696 """Replace the disks of an instance.
3699 HPATH = "mirrors-replace"
3700 HTYPE = constants.HTYPE_INSTANCE
3701 _OP_REQP = ["instance_name", "mode", "disks"]
3704 def ExpandNames(self):
3705 self._ExpandAndLockInstance()
3707 if not hasattr(self.op, "remote_node"):
3708 self.op.remote_node = None
3710 ia_name = getattr(self.op, "iallocator", None)
3711 if ia_name is not None:
3712 if self.op.remote_node is not None:
3713 raise errors.OpPrereqError("Give either the iallocator or the new"
3714 " secondary, not both")
3715 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3716 elif self.op.remote_node is not None:
3717 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3718 if remote_node is None:
3719 raise errors.OpPrereqError("Node '%s' not known" %
3720 self.op.remote_node)
3721 self.op.remote_node = remote_node
3722 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3723 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3725 self.needed_locks[locking.LEVEL_NODE] = []
3726 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3728 def DeclareLocks(self, level):
3729 # If we're not already locking all nodes in the set we have to declare the
3730 # instance's primary/secondary nodes.
3731 if (level == locking.LEVEL_NODE and
3732 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3733 self._LockInstancesNodes()
3735 def _RunAllocator(self):
3736 """Compute a new secondary node using an IAllocator.
3739 ial = IAllocator(self,
3740 mode=constants.IALLOCATOR_MODE_RELOC,
3741 name=self.op.instance_name,
3742 relocate_from=[self.sec_node])
3744 ial.Run(self.op.iallocator)
3747 raise errors.OpPrereqError("Can't compute nodes using"
3748 " iallocator '%s': %s" % (self.op.iallocator,
3750 if len(ial.nodes) != ial.required_nodes:
3751 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3752 " of nodes (%s), required %s" %
3753 (len(ial.nodes), ial.required_nodes))
3754 self.op.remote_node = ial.nodes[0]
3755 logger.ToStdout("Selected new secondary for the instance: %s" %
3756 self.op.remote_node)
3758 def BuildHooksEnv(self):
3761 This runs on the master, the primary and all the secondaries.
3765 "MODE": self.op.mode,
3766 "NEW_SECONDARY": self.op.remote_node,
3767 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3769 env.update(_BuildInstanceHookEnvByObject(self.instance))
3771 self.cfg.GetMasterNode(),
3772 self.instance.primary_node,
3774 if self.op.remote_node is not None:
3775 nl.append(self.op.remote_node)
3778 def CheckPrereq(self):
3779 """Check prerequisites.
3781 This checks that the instance is in the cluster.
3784 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3785 assert instance is not None, \
3786 "Cannot retrieve locked instance %s" % self.op.instance_name
3787 self.instance = instance
3789 if instance.disk_template not in constants.DTS_NET_MIRROR:
3790 raise errors.OpPrereqError("Instance's disk layout is not"
3791 " network mirrored.")
3793 if len(instance.secondary_nodes) != 1:
3794 raise errors.OpPrereqError("The instance has a strange layout,"
3795 " expected one secondary but found %d" %
3796 len(instance.secondary_nodes))
3798 self.sec_node = instance.secondary_nodes[0]
3800 ia_name = getattr(self.op, "iallocator", None)
3801 if ia_name is not None:
3802 self._RunAllocator()
3804 remote_node = self.op.remote_node
3805 if remote_node is not None:
3806 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3807 assert self.remote_node_info is not None, \
3808 "Cannot retrieve locked node %s" % remote_node
3810 self.remote_node_info = None
3811 if remote_node == instance.primary_node:
3812 raise errors.OpPrereqError("The specified node is the primary node of"
3814 elif remote_node == self.sec_node:
3815 if self.op.mode == constants.REPLACE_DISK_SEC:
3816 # this is for DRBD8, where we can't execute the same mode of
3817 # replacement as for drbd7 (no different port allocated)
3818 raise errors.OpPrereqError("Same secondary given, cannot execute"
3820 if instance.disk_template == constants.DT_DRBD8:
3821 if (self.op.mode == constants.REPLACE_DISK_ALL and
3822 remote_node is not None):
3823 # switch to replace secondary mode
3824 self.op.mode = constants.REPLACE_DISK_SEC
3826 if self.op.mode == constants.REPLACE_DISK_ALL:
3827 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3828 " secondary disk replacement, not"
3830 elif self.op.mode == constants.REPLACE_DISK_PRI:
3831 if remote_node is not None:
3832 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3833 " the secondary while doing a primary"
3834 " node disk replacement")
3835 self.tgt_node = instance.primary_node
3836 self.oth_node = instance.secondary_nodes[0]
3837 elif self.op.mode == constants.REPLACE_DISK_SEC:
3838 self.new_node = remote_node # this can be None, in which case
3839 # we don't change the secondary
3840 self.tgt_node = instance.secondary_nodes[0]
3841 self.oth_node = instance.primary_node
3843 raise errors.ProgrammerError("Unhandled disk replace mode")
3845 for name in self.op.disks:
3846 if instance.FindDisk(name) is None:
3847 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3848 (name, instance.name))
3850 def _ExecD8DiskOnly(self, feedback_fn):
3851 """Replace a disk on the primary or secondary for dbrd8.
3853 The algorithm for replace is quite complicated:
3854 - for each disk to be replaced:
3855 - create new LVs on the target node with unique names
3856 - detach old LVs from the drbd device
3857 - rename old LVs to name_replaced.<time_t>
3858 - rename new LVs to old LVs
3859 - attach the new LVs (with the old names now) to the drbd device
3860 - wait for sync across all devices
3861 - for each modified disk:
3862 - remove old LVs (which have the name name_replaces.<time_t>)
3864 Failures are not very well handled.
3868 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3869 instance = self.instance
3871 vgname = self.cfg.GetVGName()
3874 tgt_node = self.tgt_node
3875 oth_node = self.oth_node
3877 # Step: check device activation
3878 self.proc.LogStep(1, steps_total, "check device existence")
3879 info("checking volume groups")
3880 my_vg = cfg.GetVGName()
3881 results = self.rpc.call_vg_list([oth_node, tgt_node])
3883 raise errors.OpExecError("Can't list volume groups on the nodes")
3884 for node in oth_node, tgt_node:
3885 res = results.get(node, False)
3886 if not res or my_vg not in res:
3887 raise errors.OpExecError("Volume group '%s' not found on %s" %
3889 for dev in instance.disks:
3890 if not dev.iv_name in self.op.disks:
3892 for node in tgt_node, oth_node:
3893 info("checking %s on %s" % (dev.iv_name, node))
3894 cfg.SetDiskID(dev, node)
3895 if not self.rpc.call_blockdev_find(node, dev):
3896 raise errors.OpExecError("Can't find device %s on node %s" %
3897 (dev.iv_name, node))
3899 # Step: check other node consistency
3900 self.proc.LogStep(2, steps_total, "check peer consistency")
3901 for dev in instance.disks:
3902 if not dev.iv_name in self.op.disks:
3904 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3905 if not _CheckDiskConsistency(self, dev, oth_node,
3906 oth_node==instance.primary_node):
3907 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3908 " to replace disks on this node (%s)" %
3909 (oth_node, tgt_node))
3911 # Step: create new storage
3912 self.proc.LogStep(3, steps_total, "allocate new storage")
3913 for dev in instance.disks:
3914 if not dev.iv_name in self.op.disks:
3917 cfg.SetDiskID(dev, tgt_node)
3918 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3919 names = _GenerateUniqueNames(self, lv_names)
3920 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3921 logical_id=(vgname, names[0]))
3922 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3923 logical_id=(vgname, names[1]))
3924 new_lvs = [lv_data, lv_meta]
3925 old_lvs = dev.children
3926 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3927 info("creating new local storage on %s for %s" %
3928 (tgt_node, dev.iv_name))
3929 # since we *always* want to create this LV, we use the
3930 # _Create...OnPrimary (which forces the creation), even if we
3931 # are talking about the secondary node
3932 for new_lv in new_lvs:
3933 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3934 _GetInstanceInfoText(instance)):
3935 raise errors.OpExecError("Failed to create new LV named '%s' on"
3937 (new_lv.logical_id[1], tgt_node))
3939 # Step: for each lv, detach+rename*2+attach
3940 self.proc.LogStep(4, steps_total, "change drbd configuration")
3941 for dev, old_lvs, new_lvs in iv_names.itervalues():
3942 info("detaching %s drbd from local storage" % dev.iv_name)
3943 if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3944 raise errors.OpExecError("Can't detach drbd from local storage on node"
3945 " %s for device %s" % (tgt_node, dev.iv_name))
3947 #cfg.Update(instance)
3949 # ok, we created the new LVs, so now we know we have the needed
3950 # storage; as such, we proceed on the target node to rename
3951 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3952 # using the assumption that logical_id == physical_id (which in
3953 # turn is the unique_id on that node)
3955 # FIXME(iustin): use a better name for the replaced LVs
3956 temp_suffix = int(time.time())
3957 ren_fn = lambda d, suff: (d.physical_id[0],
3958 d.physical_id[1] + "_replaced-%s" % suff)
3959 # build the rename list based on what LVs exist on the node
3961 for to_ren in old_lvs:
3962 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
3963 if find_res is not None: # device exists
3964 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3966 info("renaming the old LVs on the target node")
3967 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3968 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3969 # now we rename the new LVs to the old LVs
3970 info("renaming the new LVs on the target node")
3971 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3972 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3973 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3975 for old, new in zip(old_lvs, new_lvs):
3976 new.logical_id = old.logical_id
3977 cfg.SetDiskID(new, tgt_node)
3979 for disk in old_lvs:
3980 disk.logical_id = ren_fn(disk, temp_suffix)
3981 cfg.SetDiskID(disk, tgt_node)
3983 # now that the new lvs have the old name, we can add them to the device
3984 info("adding new mirror component on %s" % tgt_node)
3985 if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3986 for new_lv in new_lvs:
3987 if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
3988 warning("Can't rollback device %s", hint="manually cleanup unused"
3990 raise errors.OpExecError("Can't add local storage to drbd")
3992 dev.children = new_lvs
3993 cfg.Update(instance)
3995 # Step: wait for sync
3997 # this can fail as the old devices are degraded and _WaitForSync
3998 # does a combined result over all disks, so we don't check its
4000 self.proc.LogStep(5, steps_total, "sync devices")
4001 _WaitForSync(self, instance, unlock=True)
4003 # so check manually all the devices
4004 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4005 cfg.SetDiskID(dev, instance.primary_node)
4006 is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4008 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4010 # Step: remove old storage
4011 self.proc.LogStep(6, steps_total, "removing old storage")
4012 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4013 info("remove logical volumes for %s" % name)
4015 cfg.SetDiskID(lv, tgt_node)
4016 if not self.rpc.call_blockdev_remove(tgt_node, lv):
4017 warning("Can't remove old LV", hint="manually remove unused LVs")
4020 def _ExecD8Secondary(self, feedback_fn):
4021 """Replace the secondary node for drbd8.
4023 The algorithm for replace is quite complicated:
4024 - for all disks of the instance:
4025 - create new LVs on the new node with same names
4026 - shutdown the drbd device on the old secondary
4027 - disconnect the drbd network on the primary
4028 - create the drbd device on the new secondary
4029 - network attach the drbd on the primary, using an artifice:
4030 the drbd code for Attach() will connect to the network if it
4031 finds a device which is connected to the good local disks but
4033 - wait for sync across all devices
4034 - remove all disks from the old secondary
4036 Failures are not very well handled.
4040 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4041 instance = self.instance
4043 vgname = self.cfg.GetVGName()
4046 old_node = self.tgt_node
4047 new_node = self.new_node
4048 pri_node = instance.primary_node
4050 # Step: check device activation
4051 self.proc.LogStep(1, steps_total, "check device existence")
4052 info("checking volume groups")
4053 my_vg = cfg.GetVGName()
4054 results = self.rpc.call_vg_list([pri_node, new_node])
4056 raise errors.OpExecError("Can't list volume groups on the nodes")
4057 for node in pri_node, new_node:
4058 res = results.get(node, False)
4059 if not res or my_vg not in res:
4060 raise errors.OpExecError("Volume group '%s' not found on %s" %
4062 for dev in instance.disks:
4063 if not dev.iv_name in self.op.disks:
4065 info("checking %s on %s" % (dev.iv_name, pri_node))
4066 cfg.SetDiskID(dev, pri_node)
4067 if not self.rpc.call_blockdev_find(pri_node, dev):
4068 raise errors.OpExecError("Can't find device %s on node %s" %
4069 (dev.iv_name, pri_node))
4071 # Step: check other node consistency
4072 self.proc.LogStep(2, steps_total, "check peer consistency")
4073 for dev in instance.disks:
4074 if not dev.iv_name in self.op.disks:
4076 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4077 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4078 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4079 " unsafe to replace the secondary" %
4082 # Step: create new storage
4083 self.proc.LogStep(3, steps_total, "allocate new storage")
4084 for dev in instance.disks:
4086 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4087 # since we *always* want to create this LV, we use the
4088 # _Create...OnPrimary (which forces the creation), even if we
4089 # are talking about the secondary node
4090 for new_lv in dev.children:
4091 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4092 _GetInstanceInfoText(instance)):
4093 raise errors.OpExecError("Failed to create new LV named '%s' on"
4095 (new_lv.logical_id[1], new_node))
4098 # Step 4: dbrd minors and drbd setups changes
4099 # after this, we must manually remove the drbd minors on both the
4100 # error and the success paths
4101 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4103 logging.debug("Allocated minors %s" % (minors,))
4104 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4105 for dev, new_minor in zip(instance.disks, minors):
4107 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4108 # create new devices on new_node
4109 if pri_node == dev.logical_id[0]:
4110 new_logical_id = (pri_node, new_node,
4111 dev.logical_id[2], dev.logical_id[3], new_minor,
4114 new_logical_id = (new_node, pri_node,
4115 dev.logical_id[2], new_minor, dev.logical_id[4],
4117 iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4118 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4120 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4121 logical_id=new_logical_id,
4122 children=dev.children)
4123 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4125 _GetInstanceInfoText(instance)):
4126 self.cfg.ReleaseDRBDMinors(instance.name)
4127 raise errors.OpExecError("Failed to create new DRBD on"
4128 " node '%s'" % new_node)
4130 for dev in instance.disks:
4131 # we have new devices, shutdown the drbd on the old secondary
4132 info("shutting down drbd for %s on old node" % dev.iv_name)
4133 cfg.SetDiskID(dev, old_node)
4134 if not self.rpc.call_blockdev_shutdown(old_node, dev):
4135 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4136 hint="Please cleanup this device manually as soon as possible")
4138 info("detaching primary drbds from the network (=> standalone)")
4140 for dev in instance.disks:
4141 cfg.SetDiskID(dev, pri_node)
4142 # set the network part of the physical (unique in bdev terms) id
4143 # to None, meaning detach from network
4144 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4145 # and 'find' the device, which will 'fix' it to match the
4147 if self.rpc.call_blockdev_find(pri_node, dev):
4150 warning("Failed to detach drbd %s from network, unusual case" %
4154 # no detaches succeeded (very unlikely)
4155 self.cfg.ReleaseDRBDMinors(instance.name)
4156 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4158 # if we managed to detach at least one, we update all the disks of
4159 # the instance to point to the new secondary
4160 info("updating instance configuration")
4161 for dev, _, new_logical_id in iv_names.itervalues():
4162 dev.logical_id = new_logical_id
4163 cfg.SetDiskID(dev, pri_node)
4164 cfg.Update(instance)
4165 # we can remove now the temp minors as now the new values are
4166 # written to the config file (and therefore stable)
4167 self.cfg.ReleaseDRBDMinors(instance.name)
4169 # and now perform the drbd attach
4170 info("attaching primary drbds to new secondary (standalone => connected)")
4172 for dev in instance.disks:
4173 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4174 # since the attach is smart, it's enough to 'find' the device,
4175 # it will automatically activate the network, if the physical_id
4177 cfg.SetDiskID(dev, pri_node)
4178 logging.debug("Disk to attach: %s", dev)
4179 if not self.rpc.call_blockdev_find(pri_node, dev):
4180 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4181 "please do a gnt-instance info to see the status of disks")
4183 # this can fail as the old devices are degraded and _WaitForSync
4184 # does a combined result over all disks, so we don't check its
4186 self.proc.LogStep(5, steps_total, "sync devices")
4187 _WaitForSync(self, instance, unlock=True)
4189 # so check manually all the devices
4190 for name, (dev, old_lvs, _) in iv_names.iteritems():
4191 cfg.SetDiskID(dev, pri_node)
4192 is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4194 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4196 self.proc.LogStep(6, steps_total, "removing old storage")
4197 for name, (dev, old_lvs, _) in iv_names.iteritems():
4198 info("remove logical volumes for %s" % name)
4200 cfg.SetDiskID(lv, old_node)
4201 if not self.rpc.call_blockdev_remove(old_node, lv):
4202 warning("Can't remove LV on old secondary",
4203 hint="Cleanup stale volumes by hand")
4205 def Exec(self, feedback_fn):
4206 """Execute disk replacement.
4208 This dispatches the disk replacement to the appropriate handler.
4211 instance = self.instance
4213 # Activate the instance disks if we're replacing them on a down instance
4214 if instance.status == "down":
4215 _StartInstanceDisks(self, instance, True)
4217 if instance.disk_template == constants.DT_DRBD8:
4218 if self.op.remote_node is None:
4219 fn = self._ExecD8DiskOnly
4221 fn = self._ExecD8Secondary
4223 raise errors.ProgrammerError("Unhandled disk replacement case")
4225 ret = fn(feedback_fn)
4227 # Deactivate the instance disks if we're replacing them on a down instance
4228 if instance.status == "down":
4229 _SafeShutdownInstanceDisks(self, instance)
4234 class LUGrowDisk(LogicalUnit):
4235 """Grow a disk of an instance.
4239 HTYPE = constants.HTYPE_INSTANCE
4240 _OP_REQP = ["instance_name", "disk", "amount"]
4243 def ExpandNames(self):
4244 self._ExpandAndLockInstance()
4245 self.needed_locks[locking.LEVEL_NODE] = []
4246 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4248 def DeclareLocks(self, level):
4249 if level == locking.LEVEL_NODE:
4250 self._LockInstancesNodes()
4252 def BuildHooksEnv(self):
4255 This runs on the master, the primary and all the secondaries.
4259 "DISK": self.op.disk,
4260 "AMOUNT": self.op.amount,
4262 env.update(_BuildInstanceHookEnvByObject(self.instance))
4264 self.cfg.GetMasterNode(),
4265 self.instance.primary_node,
4269 def CheckPrereq(self):
4270 """Check prerequisites.
4272 This checks that the instance is in the cluster.
4275 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4276 assert instance is not None, \
4277 "Cannot retrieve locked instance %s" % self.op.instance_name
4279 self.instance = instance
4281 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4282 raise errors.OpPrereqError("Instance's disk layout does not support"
4285 if instance.FindDisk(self.op.disk) is None:
4286 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4287 (self.op.disk, instance.name))
4289 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4290 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4291 instance.hypervisor)
4292 for node in nodenames:
4293 info = nodeinfo.get(node, None)
4295 raise errors.OpPrereqError("Cannot get current information"
4296 " from node '%s'" % node)
4297 vg_free = info.get('vg_free', None)
4298 if not isinstance(vg_free, int):
4299 raise errors.OpPrereqError("Can't compute free disk space on"
4301 if self.op.amount > info['vg_free']:
4302 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4303 " %d MiB available, %d MiB required" %
4304 (node, info['vg_free'], self.op.amount))
4306 def Exec(self, feedback_fn):
4307 """Execute disk grow.
4310 instance = self.instance
4311 disk = instance.FindDisk(self.op.disk)
4312 for node in (instance.secondary_nodes + (instance.primary_node,)):
4313 self.cfg.SetDiskID(disk, node)
4314 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4315 if (not result or not isinstance(result, (list, tuple)) or
4317 raise errors.OpExecError("grow request failed to node %s" % node)
4319 raise errors.OpExecError("grow request failed to node %s: %s" %
4321 disk.RecordGrow(self.op.amount)
4322 self.cfg.Update(instance)
4326 class LUQueryInstanceData(NoHooksLU):
4327 """Query runtime instance data.
4330 _OP_REQP = ["instances"]
4333 def ExpandNames(self):
4334 self.needed_locks = {}
4335 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4337 if not isinstance(self.op.instances, list):
4338 raise errors.OpPrereqError("Invalid argument type 'instances'")
4340 if self.op.instances:
4341 self.wanted_names = []
4342 for name in self.op.instances:
4343 full_name = self.cfg.ExpandInstanceName(name)
4344 if full_name is None:
4345 raise errors.OpPrereqError("Instance '%s' not known" %
4346 self.op.instance_name)
4347 self.wanted_names.append(full_name)
4348 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4350 self.wanted_names = None
4351 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4353 self.needed_locks[locking.LEVEL_NODE] = []
4354 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4356 def DeclareLocks(self, level):
4357 if level == locking.LEVEL_NODE:
4358 self._LockInstancesNodes()
4360 def CheckPrereq(self):
4361 """Check prerequisites.
4363 This only checks the optional instance list against the existing names.
4366 if self.wanted_names is None:
4367 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4369 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4370 in self.wanted_names]
4373 def _ComputeDiskStatus(self, instance, snode, dev):
4374 """Compute block device status.
4377 self.cfg.SetDiskID(dev, instance.primary_node)
4378 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4379 if dev.dev_type in constants.LDS_DRBD:
4380 # we change the snode then (otherwise we use the one passed in)
4381 if dev.logical_id[0] == instance.primary_node:
4382 snode = dev.logical_id[1]
4384 snode = dev.logical_id[0]
4387 self.cfg.SetDiskID(dev, snode)
4388 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4393 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4394 for child in dev.children]
4399 "iv_name": dev.iv_name,
4400 "dev_type": dev.dev_type,
4401 "logical_id": dev.logical_id,
4402 "physical_id": dev.physical_id,
4403 "pstatus": dev_pstatus,
4404 "sstatus": dev_sstatus,
4405 "children": dev_children,
4410 def Exec(self, feedback_fn):
4411 """Gather and return data"""
4413 for instance in self.wanted_instances:
4414 remote_info = self.rpc.call_instance_info(instance.primary_node,
4416 instance.hypervisor)
4417 if remote_info and "state" in remote_info:
4420 remote_state = "down"
4421 if instance.status == "down":
4422 config_state = "down"
4426 disks = [self._ComputeDiskStatus(instance, None, device)
4427 for device in instance.disks]
4430 "name": instance.name,
4431 "config_state": config_state,
4432 "run_state": remote_state,
4433 "pnode": instance.primary_node,
4434 "snodes": instance.secondary_nodes,
4436 "memory": instance.memory,
4437 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4439 "vcpus": instance.vcpus,
4440 "hypervisor": instance.hypervisor,
4443 htkind = instance.hypervisor
4444 if htkind == constants.HT_XEN_PVM:
4445 idict["kernel_path"] = instance.kernel_path
4446 idict["initrd_path"] = instance.initrd_path
4448 if htkind == constants.HT_XEN_HVM:
4449 idict["hvm_boot_order"] = instance.hvm_boot_order
4450 idict["hvm_acpi"] = instance.hvm_acpi
4451 idict["hvm_pae"] = instance.hvm_pae
4452 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4453 idict["hvm_nic_type"] = instance.hvm_nic_type
4454 idict["hvm_disk_type"] = instance.hvm_disk_type
4456 if htkind in constants.HTS_REQ_PORT:
4457 if instance.vnc_bind_address is None:
4458 vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4460 vnc_bind_address = instance.vnc_bind_address
4461 if instance.network_port is None:
4462 vnc_console_port = None
4463 elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4464 vnc_console_port = "%s:%s" % (instance.primary_node,
4465 instance.network_port)
4466 elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4467 vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4468 instance.network_port,
4469 instance.primary_node)
4471 vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4472 instance.network_port)
4473 idict["vnc_console_port"] = vnc_console_port
4474 idict["vnc_bind_address"] = vnc_bind_address
4475 idict["network_port"] = instance.network_port
4477 result[instance.name] = idict
4482 class LUSetInstanceParams(LogicalUnit):
4483 """Modifies an instances's parameters.
4486 HPATH = "instance-modify"
4487 HTYPE = constants.HTYPE_INSTANCE
4488 _OP_REQP = ["instance_name"]
4491 def ExpandNames(self):
4492 self._ExpandAndLockInstance()
4494 def BuildHooksEnv(self):
4497 This runs on the master, primary and secondaries.
4502 args['memory'] = self.mem
4504 args['vcpus'] = self.vcpus
4505 if self.do_ip or self.do_bridge or self.mac:
4509 ip = self.instance.nics[0].ip
4511 bridge = self.bridge
4513 bridge = self.instance.nics[0].bridge
4517 mac = self.instance.nics[0].mac
4518 args['nics'] = [(ip, bridge, mac)]
4519 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4520 nl = [self.cfg.GetMasterNode(),
4521 self.instance.primary_node] + list(self.instance.secondary_nodes)
4524 def CheckPrereq(self):
4525 """Check prerequisites.
4527 This only checks the instance list against the existing names.
4530 # FIXME: all the parameters could be checked before, in ExpandNames, or in
4531 # a separate CheckArguments function, if we implement one, so the operation
4532 # can be aborted without waiting for any lock, should it have an error...
4533 self.mem = getattr(self.op, "mem", None)
4534 self.vcpus = getattr(self.op, "vcpus", None)
4535 self.ip = getattr(self.op, "ip", None)
4536 self.mac = getattr(self.op, "mac", None)
4537 self.bridge = getattr(self.op, "bridge", None)
4538 self.kernel_path = getattr(self.op, "kernel_path", None)
4539 self.initrd_path = getattr(self.op, "initrd_path", None)
4540 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4541 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4542 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4543 self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4544 self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4545 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4546 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4547 self.force = getattr(self.op, "force", None)
4548 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4549 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4550 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4551 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4552 if all_parms.count(None) == len(all_parms):
4553 raise errors.OpPrereqError("No changes submitted")
4554 if self.mem is not None:
4556 self.mem = int(self.mem)
4557 except ValueError, err:
4558 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4559 if self.vcpus is not None:
4561 self.vcpus = int(self.vcpus)
4562 except ValueError, err:
4563 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4564 if self.ip is not None:
4566 if self.ip.lower() == "none":
4569 if not utils.IsValidIP(self.ip):
4570 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4573 self.do_bridge = (self.bridge is not None)
4574 if self.mac is not None:
4575 if self.cfg.IsMacInUse(self.mac):
4576 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4578 if not utils.IsValidMac(self.mac):
4579 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4581 if self.kernel_path is not None:
4582 self.do_kernel_path = True
4583 if self.kernel_path == constants.VALUE_NONE:
4584 raise errors.OpPrereqError("Can't set instance to no kernel")
4586 if self.kernel_path != constants.VALUE_DEFAULT:
4587 if not os.path.isabs(self.kernel_path):
4588 raise errors.OpPrereqError("The kernel path must be an absolute"
4591 self.do_kernel_path = False
4593 if self.initrd_path is not None:
4594 self.do_initrd_path = True
4595 if self.initrd_path not in (constants.VALUE_NONE,
4596 constants.VALUE_DEFAULT):
4597 if not os.path.isabs(self.initrd_path):
4598 raise errors.OpPrereqError("The initrd path must be an absolute"
4601 self.do_initrd_path = False
4603 # boot order verification
4604 if self.hvm_boot_order is not None:
4605 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4606 if len(self.hvm_boot_order.strip("acdn")) != 0:
4607 raise errors.OpPrereqError("invalid boot order specified,"
4608 " must be one or more of [acdn]"
4611 # hvm_cdrom_image_path verification
4612 if self.op.hvm_cdrom_image_path is not None:
4613 if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4614 self.op.hvm_cdrom_image_path.lower() == "none"):
4615 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4616 " be an absolute path or None, not %s" %
4617 self.op.hvm_cdrom_image_path)
4618 if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4619 self.op.hvm_cdrom_image_path.lower() == "none"):
4620 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4621 " regular file or a symlink pointing to"
4622 " an existing regular file, not %s" %
4623 self.op.hvm_cdrom_image_path)
4625 # vnc_bind_address verification
4626 if self.op.vnc_bind_address is not None:
4627 if not utils.IsValidIP(self.op.vnc_bind_address):
4628 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4629 " like a valid IP address" %
4630 self.op.vnc_bind_address)
4632 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4633 assert self.instance is not None, \
4634 "Cannot retrieve locked instance %s" % self.op.instance_name
4636 if self.mem is not None and not self.force:
4637 pnode = self.instance.primary_node
4639 nodelist.extend(instance.secondary_nodes)
4640 instance_info = self.rpc.call_instance_info(pnode, instance.name,
4641 instance.hypervisor)
4642 nodeinfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
4643 instance.hypervisor)
4645 if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4646 # Assume the primary node is unreachable and go ahead
4647 self.warn.append("Can't get info from primary node %s" % pnode)
4650 current_mem = instance_info['memory']
4652 # Assume instance not running
4653 # (there is a slight race condition here, but it's not very probable,
4654 # and we have no other way to check)
4656 miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4658 raise errors.OpPrereqError("This change will prevent the instance"
4659 " from starting, due to %d MB of memory"
4660 " missing on its primary node" % miss_mem)
4662 for node in instance.secondary_nodes:
4663 if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4664 self.warn.append("Can't get info from secondary node %s" % node)
4665 elif self.mem > nodeinfo[node]['memory_free']:
4666 self.warn.append("Not enough memory to failover instance to secondary"
4669 # Xen HVM device type checks
4670 if instance.hypervisor == constants.HT_XEN_HVM:
4671 if self.op.hvm_nic_type is not None:
4672 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4673 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4674 " HVM hypervisor" % self.op.hvm_nic_type)
4675 if self.op.hvm_disk_type is not None:
4676 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4677 raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4678 " HVM hypervisor" % self.op.hvm_disk_type)
4682 def Exec(self, feedback_fn):
4683 """Modifies an instance.
4685 All parameters take effect only at the next restart of the instance.
4687 # Process here the warnings from CheckPrereq, as we don't have a
4688 # feedback_fn there.
4689 for warn in self.warn:
4690 feedback_fn("WARNING: %s" % warn)
4693 instance = self.instance
4695 instance.memory = self.mem
4696 result.append(("mem", self.mem))
4698 instance.vcpus = self.vcpus
4699 result.append(("vcpus", self.vcpus))
4701 instance.nics[0].ip = self.ip
4702 result.append(("ip", self.ip))
4704 instance.nics[0].bridge = self.bridge
4705 result.append(("bridge", self.bridge))
4707 instance.nics[0].mac = self.mac
4708 result.append(("mac", self.mac))
4709 if self.do_kernel_path:
4710 instance.kernel_path = self.kernel_path
4711 result.append(("kernel_path", self.kernel_path))
4712 if self.do_initrd_path:
4713 instance.initrd_path = self.initrd_path
4714 result.append(("initrd_path", self.initrd_path))
4715 if self.hvm_boot_order:
4716 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4717 instance.hvm_boot_order = None
4719 instance.hvm_boot_order = self.hvm_boot_order
4720 result.append(("hvm_boot_order", self.hvm_boot_order))
4721 if self.hvm_acpi is not None:
4722 instance.hvm_acpi = self.hvm_acpi
4723 result.append(("hvm_acpi", self.hvm_acpi))
4724 if self.hvm_pae is not None:
4725 instance.hvm_pae = self.hvm_pae
4726 result.append(("hvm_pae", self.hvm_pae))
4727 if self.hvm_nic_type is not None:
4728 instance.hvm_nic_type = self.hvm_nic_type
4729 result.append(("hvm_nic_type", self.hvm_nic_type))
4730 if self.hvm_disk_type is not None:
4731 instance.hvm_disk_type = self.hvm_disk_type
4732 result.append(("hvm_disk_type", self.hvm_disk_type))
4733 if self.hvm_cdrom_image_path:
4734 if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4735 instance.hvm_cdrom_image_path = None
4737 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4738 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4739 if self.vnc_bind_address:
4740 instance.vnc_bind_address = self.vnc_bind_address
4741 result.append(("vnc_bind_address", self.vnc_bind_address))
4743 self.cfg.Update(instance)
4748 class LUQueryExports(NoHooksLU):
4749 """Query the exports list
4752 _OP_REQP = ['nodes']
4755 def ExpandNames(self):
4756 self.needed_locks = {}
4757 self.share_locks[locking.LEVEL_NODE] = 1
4758 if not self.op.nodes:
4759 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4761 self.needed_locks[locking.LEVEL_NODE] = \
4762 _GetWantedNodes(self, self.op.nodes)
4764 def CheckPrereq(self):
4765 """Check prerequisites.
4768 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4770 def Exec(self, feedback_fn):
4771 """Compute the list of all the exported system images.
4774 a dictionary with the structure node->(export-list)
4775 where export-list is a list of the instances exported on
4779 return self.rpc.call_export_list(self.nodes)
4782 class LUExportInstance(LogicalUnit):
4783 """Export an instance to an image in the cluster.
4786 HPATH = "instance-export"
4787 HTYPE = constants.HTYPE_INSTANCE
4788 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4791 def ExpandNames(self):
4792 self._ExpandAndLockInstance()
4793 # FIXME: lock only instance primary and destination node
4795 # Sad but true, for now we have do lock all nodes, as we don't know where
4796 # the previous export might be, and and in this LU we search for it and
4797 # remove it from its current node. In the future we could fix this by:
4798 # - making a tasklet to search (share-lock all), then create the new one,
4799 # then one to remove, after
4800 # - removing the removal operation altoghether
4801 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4803 def DeclareLocks(self, level):
4804 """Last minute lock declaration."""
4805 # All nodes are locked anyway, so nothing to do here.
4807 def BuildHooksEnv(self):
4810 This will run on the master, primary node and target node.
4814 "EXPORT_NODE": self.op.target_node,
4815 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4817 env.update(_BuildInstanceHookEnvByObject(self.instance))
4818 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4819 self.op.target_node]
4822 def CheckPrereq(self):
4823 """Check prerequisites.
4825 This checks that the instance and node names are valid.
4828 instance_name = self.op.instance_name
4829 self.instance = self.cfg.GetInstanceInfo(instance_name)
4830 assert self.instance is not None, \
4831 "Cannot retrieve locked instance %s" % self.op.instance_name
4833 self.dst_node = self.cfg.GetNodeInfo(
4834 self.cfg.ExpandNodeName(self.op.target_node))
4836 assert self.dst_node is not None, \
4837 "Cannot retrieve locked node %s" % self.op.target_node
4839 # instance disk type verification
4840 for disk in self.instance.disks:
4841 if disk.dev_type == constants.LD_FILE:
4842 raise errors.OpPrereqError("Export not supported for instances with"
4843 " file-based disks")
4845 def Exec(self, feedback_fn):
4846 """Export an instance to an image in the cluster.
4849 instance = self.instance
4850 dst_node = self.dst_node
4851 src_node = instance.primary_node
4852 if self.op.shutdown:
4853 # shutdown the instance, but not the disks
4854 if not self.rpc.call_instance_shutdown(src_node, instance):
4855 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4856 (instance.name, src_node))
4858 vgname = self.cfg.GetVGName()
4863 for disk in instance.disks:
4864 if disk.iv_name == "sda":
4865 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4866 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4868 if not new_dev_name:
4869 logger.Error("could not snapshot block device %s on node %s" %
4870 (disk.logical_id[1], src_node))
4872 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4873 logical_id=(vgname, new_dev_name),
4874 physical_id=(vgname, new_dev_name),
4875 iv_name=disk.iv_name)
4876 snap_disks.append(new_dev)
4879 if self.op.shutdown and instance.status == "up":
4880 if not self.rpc.call_instance_start(src_node, instance, None):
4881 _ShutdownInstanceDisks(self, instance)
4882 raise errors.OpExecError("Could not start instance")
4884 # TODO: check for size
4886 cluster_name = self.cfg.GetClusterName()
4887 for dev in snap_disks:
4888 if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4889 instance, cluster_name):
4890 logger.Error("could not export block device %s from node %s to node %s"
4891 % (dev.logical_id[1], src_node, dst_node.name))
4892 if not self.rpc.call_blockdev_remove(src_node, dev):
4893 logger.Error("could not remove snapshot block device %s from node %s" %
4894 (dev.logical_id[1], src_node))
4896 if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4897 logger.Error("could not finalize export for instance %s on node %s" %
4898 (instance.name, dst_node.name))
4900 nodelist = self.cfg.GetNodeList()
4901 nodelist.remove(dst_node.name)
4903 # on one-node clusters nodelist will be empty after the removal
4904 # if we proceed the backup would be removed because OpQueryExports
4905 # substitutes an empty list with the full cluster node list.
4907 exportlist = self.rpc.call_export_list(nodelist)
4908 for node in exportlist:
4909 if instance.name in exportlist[node]:
4910 if not self.rpc.call_export_remove(node, instance.name):
4911 logger.Error("could not remove older export for instance %s"
4912 " on node %s" % (instance.name, node))
4915 class LURemoveExport(NoHooksLU):
4916 """Remove exports related to the named instance.
4919 _OP_REQP = ["instance_name"]
4922 def ExpandNames(self):
4923 self.needed_locks = {}
4924 # We need all nodes to be locked in order for RemoveExport to work, but we
4925 # don't need to lock the instance itself, as nothing will happen to it (and
4926 # we can remove exports also for a removed instance)
4927 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4929 def CheckPrereq(self):
4930 """Check prerequisites.
4934 def Exec(self, feedback_fn):
4935 """Remove any export.
4938 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4939 # If the instance was not found we'll try with the name that was passed in.
4940 # This will only work if it was an FQDN, though.
4942 if not instance_name:
4944 instance_name = self.op.instance_name
4946 exportlist = self.rpc.call_export_list(self.acquired_locks[
4947 locking.LEVEL_NODE])
4949 for node in exportlist:
4950 if instance_name in exportlist[node]:
4952 if not self.rpc.call_export_remove(node, instance_name):
4953 logger.Error("could not remove export for instance %s"
4954 " on node %s" % (instance_name, node))
4956 if fqdn_warn and not found:
4957 feedback_fn("Export not found. If trying to remove an export belonging"
4958 " to a deleted instance please use its Fully Qualified"
4962 class TagsLU(NoHooksLU):
4965 This is an abstract class which is the parent of all the other tags LUs.
4969 def ExpandNames(self):
4970 self.needed_locks = {}
4971 if self.op.kind == constants.TAG_NODE:
4972 name = self.cfg.ExpandNodeName(self.op.name)
4974 raise errors.OpPrereqError("Invalid node name (%s)" %
4977 self.needed_locks[locking.LEVEL_NODE] = name
4978 elif self.op.kind == constants.TAG_INSTANCE:
4979 name = self.cfg.ExpandInstanceName(self.op.name)
4981 raise errors.OpPrereqError("Invalid instance name (%s)" %
4984 self.needed_locks[locking.LEVEL_INSTANCE] = name
4986 def CheckPrereq(self):
4987 """Check prerequisites.
4990 if self.op.kind == constants.TAG_CLUSTER:
4991 self.target = self.cfg.GetClusterInfo()
4992 elif self.op.kind == constants.TAG_NODE:
4993 self.target = self.cfg.GetNodeInfo(self.op.name)
4994 elif self.op.kind == constants.TAG_INSTANCE:
4995 self.target = self.cfg.GetInstanceInfo(self.op.name)
4997 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5001 class LUGetTags(TagsLU):
5002 """Returns the tags of a given object.
5005 _OP_REQP = ["kind", "name"]
5008 def Exec(self, feedback_fn):
5009 """Returns the tag list.
5012 return list(self.target.GetTags())
5015 class LUSearchTags(NoHooksLU):
5016 """Searches the tags for a given pattern.
5019 _OP_REQP = ["pattern"]
5022 def ExpandNames(self):
5023 self.needed_locks = {}
5025 def CheckPrereq(self):
5026 """Check prerequisites.
5028 This checks the pattern passed for validity by compiling it.
5032 self.re = re.compile(self.op.pattern)
5033 except re.error, err:
5034 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5035 (self.op.pattern, err))
5037 def Exec(self, feedback_fn):
5038 """Returns the tag list.
5042 tgts = [("/cluster", cfg.GetClusterInfo())]
5043 ilist = cfg.GetAllInstancesInfo().values()
5044 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5045 nlist = cfg.GetAllNodesInfo().values()
5046 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5048 for path, target in tgts:
5049 for tag in target.GetTags():
5050 if self.re.search(tag):
5051 results.append((path, tag))
5055 class LUAddTags(TagsLU):
5056 """Sets a tag on a given object.
5059 _OP_REQP = ["kind", "name", "tags"]
5062 def CheckPrereq(self):
5063 """Check prerequisites.
5065 This checks the type and length of the tag name and value.
5068 TagsLU.CheckPrereq(self)
5069 for tag in self.op.tags:
5070 objects.TaggableObject.ValidateTag(tag)
5072 def Exec(self, feedback_fn):
5077 for tag in self.op.tags:
5078 self.target.AddTag(tag)
5079 except errors.TagError, err:
5080 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5082 self.cfg.Update(self.target)
5083 except errors.ConfigurationError:
5084 raise errors.OpRetryError("There has been a modification to the"
5085 " config file and the operation has been"
5086 " aborted. Please retry.")
5089 class LUDelTags(TagsLU):
5090 """Delete a list of tags from a given object.
5093 _OP_REQP = ["kind", "name", "tags"]
5096 def CheckPrereq(self):
5097 """Check prerequisites.
5099 This checks that we have the given tag.
5102 TagsLU.CheckPrereq(self)
5103 for tag in self.op.tags:
5104 objects.TaggableObject.ValidateTag(tag)
5105 del_tags = frozenset(self.op.tags)
5106 cur_tags = self.target.GetTags()
5107 if not del_tags <= cur_tags:
5108 diff_tags = del_tags - cur_tags
5109 diff_names = ["'%s'" % tag for tag in diff_tags]
5111 raise errors.OpPrereqError("Tag(s) %s not found" %
5112 (",".join(diff_names)))
5114 def Exec(self, feedback_fn):
5115 """Remove the tag from the object.
5118 for tag in self.op.tags:
5119 self.target.RemoveTag(tag)
5121 self.cfg.Update(self.target)
5122 except errors.ConfigurationError:
5123 raise errors.OpRetryError("There has been a modification to the"
5124 " config file and the operation has been"
5125 " aborted. Please retry.")
5128 class LUTestDelay(NoHooksLU):
5129 """Sleep for a specified amount of time.
5131 This LU sleeps on the master and/or nodes for a specified amount of
5135 _OP_REQP = ["duration", "on_master", "on_nodes"]
5138 def ExpandNames(self):
5139 """Expand names and set required locks.
5141 This expands the node list, if any.
5144 self.needed_locks = {}
5145 if self.op.on_nodes:
5146 # _GetWantedNodes can be used here, but is not always appropriate to use
5147 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5149 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5150 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5152 def CheckPrereq(self):
5153 """Check prerequisites.
5157 def Exec(self, feedback_fn):
5158 """Do the actual sleep.
5161 if self.op.on_master:
5162 if not utils.TestDelay(self.op.duration):
5163 raise errors.OpExecError("Error during master delay test")
5164 if self.op.on_nodes:
5165 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5167 raise errors.OpExecError("Complete failure from rpc call")
5168 for node, node_result in result.items():
5170 raise errors.OpExecError("Failure during rpc call to node %s,"
5171 " result: %s" % (node, node_result))
5174 class IAllocator(object):
5175 """IAllocator framework.
5177 An IAllocator instance has three sets of attributes:
5178 - cfg that is needed to query the cluster
5179 - input data (all members of the _KEYS class attribute are required)
5180 - four buffer attributes (in|out_data|text), that represent the
5181 input (to the external script) in text and data structure format,
5182 and the output from it, again in two formats
5183 - the result variables from the script (success, info, nodes) for
5188 "mem_size", "disks", "disk_template",
5189 "os", "tags", "nics", "vcpus",
5195 def __init__(self, lu, mode, name, **kwargs):
5197 # init buffer variables
5198 self.in_text = self.out_text = self.in_data = self.out_data = None
5199 # init all input fields so that pylint is happy
5202 self.mem_size = self.disks = self.disk_template = None
5203 self.os = self.tags = self.nics = self.vcpus = None
5204 self.relocate_from = None
5206 self.required_nodes = None
5207 # init result fields
5208 self.success = self.info = self.nodes = None
5209 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5210 keyset = self._ALLO_KEYS
5211 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5212 keyset = self._RELO_KEYS
5214 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5215 " IAllocator" % self.mode)
5217 if key not in keyset:
5218 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5219 " IAllocator" % key)
5220 setattr(self, key, kwargs[key])
5222 if key not in kwargs:
5223 raise errors.ProgrammerError("Missing input parameter '%s' to"
5224 " IAllocator" % key)
5225 self._BuildInputData()
5227 def _ComputeClusterData(self):
5228 """Compute the generic allocator input data.
5230 This is the data that is independent of the actual operation.
5234 cluster_info = cfg.GetClusterInfo()
5238 "cluster_name": cfg.GetClusterName(),
5239 "cluster_tags": list(cluster_info.GetTags()),
5240 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5241 # we don't have job IDs
5244 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5248 node_list = cfg.GetNodeList()
5249 # FIXME: here we have only one hypervisor information, but
5250 # instance can belong to different hypervisors
5251 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5252 cfg.GetHypervisorType())
5253 for nname in node_list:
5254 ninfo = cfg.GetNodeInfo(nname)
5255 if nname not in node_data or not isinstance(node_data[nname], dict):
5256 raise errors.OpExecError("Can't get data for node %s" % nname)
5257 remote_info = node_data[nname]
5258 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5259 'vg_size', 'vg_free', 'cpu_total']:
5260 if attr not in remote_info:
5261 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5264 remote_info[attr] = int(remote_info[attr])
5265 except ValueError, err:
5266 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5267 " %s" % (nname, attr, str(err)))
5268 # compute memory used by primary instances
5269 i_p_mem = i_p_up_mem = 0
5270 for iinfo in i_list:
5271 if iinfo.primary_node == nname:
5272 i_p_mem += iinfo.memory
5273 if iinfo.status == "up":
5274 i_p_up_mem += iinfo.memory
5276 # compute memory used by instances
5278 "tags": list(ninfo.GetTags()),
5279 "total_memory": remote_info['memory_total'],
5280 "reserved_memory": remote_info['memory_dom0'],
5281 "free_memory": remote_info['memory_free'],
5282 "i_pri_memory": i_p_mem,
5283 "i_pri_up_memory": i_p_up_mem,
5284 "total_disk": remote_info['vg_size'],
5285 "free_disk": remote_info['vg_free'],
5286 "primary_ip": ninfo.primary_ip,
5287 "secondary_ip": ninfo.secondary_ip,
5288 "total_cpus": remote_info['cpu_total'],
5290 node_results[nname] = pnr
5291 data["nodes"] = node_results
5295 for iinfo in i_list:
5296 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5297 for n in iinfo.nics]
5299 "tags": list(iinfo.GetTags()),
5300 "should_run": iinfo.status == "up",
5301 "vcpus": iinfo.vcpus,
5302 "memory": iinfo.memory,
5304 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5306 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5307 "disk_template": iinfo.disk_template,
5308 "hypervisor": iinfo.hypervisor,
5310 instance_data[iinfo.name] = pir
5312 data["instances"] = instance_data
5316 def _AddNewInstance(self):
5317 """Add new instance data to allocator structure.
5319 This in combination with _AllocatorGetClusterData will create the
5320 correct structure needed as input for the allocator.
5322 The checks for the completeness of the opcode must have already been
5327 if len(self.disks) != 2:
5328 raise errors.OpExecError("Only two-disk configurations supported")
5330 disk_space = _ComputeDiskSize(self.disk_template,
5331 self.disks[0]["size"], self.disks[1]["size"])
5333 if self.disk_template in constants.DTS_NET_MIRROR:
5334 self.required_nodes = 2
5336 self.required_nodes = 1
5340 "disk_template": self.disk_template,
5343 "vcpus": self.vcpus,
5344 "memory": self.mem_size,
5345 "disks": self.disks,
5346 "disk_space_total": disk_space,
5348 "required_nodes": self.required_nodes,
5350 data["request"] = request
5352 def _AddRelocateInstance(self):
5353 """Add relocate instance data to allocator structure.
5355 This in combination with _IAllocatorGetClusterData will create the
5356 correct structure needed as input for the allocator.
5358 The checks for the completeness of the opcode must have already been
5362 instance = self.lu.cfg.GetInstanceInfo(self.name)
5363 if instance is None:
5364 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5365 " IAllocator" % self.name)
5367 if instance.disk_template not in constants.DTS_NET_MIRROR:
5368 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5370 if len(instance.secondary_nodes) != 1:
5371 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5373 self.required_nodes = 1
5375 disk_space = _ComputeDiskSize(instance.disk_template,
5376 instance.disks[0].size,
5377 instance.disks[1].size)
5382 "disk_space_total": disk_space,
5383 "required_nodes": self.required_nodes,
5384 "relocate_from": self.relocate_from,
5386 self.in_data["request"] = request
5388 def _BuildInputData(self):
5389 """Build input data structures.
5392 self._ComputeClusterData()
5394 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5395 self._AddNewInstance()
5397 self._AddRelocateInstance()
5399 self.in_text = serializer.Dump(self.in_data)
5401 def Run(self, name, validate=True, call_fn=None):
5402 """Run an instance allocator and return the results.
5406 call_fn = self.lu.rpc.call_iallocator_runner
5409 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5411 if not isinstance(result, (list, tuple)) or len(result) != 4:
5412 raise errors.OpExecError("Invalid result from master iallocator runner")
5414 rcode, stdout, stderr, fail = result
5416 if rcode == constants.IARUN_NOTFOUND:
5417 raise errors.OpExecError("Can't find allocator '%s'" % name)
5418 elif rcode == constants.IARUN_FAILURE:
5419 raise errors.OpExecError("Instance allocator call failed: %s,"
5420 " output: %s" % (fail, stdout+stderr))
5421 self.out_text = stdout
5423 self._ValidateResult()
5425 def _ValidateResult(self):
5426 """Process the allocator results.
5428 This will process and if successful save the result in
5429 self.out_data and the other parameters.
5433 rdict = serializer.Load(self.out_text)
5434 except Exception, err:
5435 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5437 if not isinstance(rdict, dict):
5438 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5440 for key in "success", "info", "nodes":
5441 if key not in rdict:
5442 raise errors.OpExecError("Can't parse iallocator results:"
5443 " missing key '%s'" % key)
5444 setattr(self, key, rdict[key])
5446 if not isinstance(rdict["nodes"], list):
5447 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5449 self.out_data = rdict
5452 class LUTestAllocator(NoHooksLU):
5453 """Run allocator tests.
5455 This LU runs the allocator tests
5458 _OP_REQP = ["direction", "mode", "name"]
5460 def CheckPrereq(self):
5461 """Check prerequisites.
5463 This checks the opcode parameters depending on the director and mode test.
5466 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5467 for attr in ["name", "mem_size", "disks", "disk_template",
5468 "os", "tags", "nics", "vcpus"]:
5469 if not hasattr(self.op, attr):
5470 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5472 iname = self.cfg.ExpandInstanceName(self.op.name)
5473 if iname is not None:
5474 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5476 if not isinstance(self.op.nics, list):
5477 raise errors.OpPrereqError("Invalid parameter 'nics'")
5478 for row in self.op.nics:
5479 if (not isinstance(row, dict) or
5482 "bridge" not in row):
5483 raise errors.OpPrereqError("Invalid contents of the"
5484 " 'nics' parameter")
5485 if not isinstance(self.op.disks, list):
5486 raise errors.OpPrereqError("Invalid parameter 'disks'")
5487 if len(self.op.disks) != 2:
5488 raise errors.OpPrereqError("Only two-disk configurations supported")
5489 for row in self.op.disks:
5490 if (not isinstance(row, dict) or
5491 "size" not in row or
5492 not isinstance(row["size"], int) or
5493 "mode" not in row or
5494 row["mode"] not in ['r', 'w']):
5495 raise errors.OpPrereqError("Invalid contents of the"
5496 " 'disks' parameter")
5497 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5498 if not hasattr(self.op, "name"):
5499 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5500 fname = self.cfg.ExpandInstanceName(self.op.name)
5502 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5504 self.op.name = fname
5505 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5507 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5510 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5511 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5512 raise errors.OpPrereqError("Missing allocator name")
5513 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5514 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5517 def Exec(self, feedback_fn):
5518 """Run the allocator test.
5521 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5522 ial = IAllocator(self,
5525 mem_size=self.op.mem_size,
5526 disks=self.op.disks,
5527 disk_template=self.op.disk_template,
5531 vcpus=self.op.vcpus,
5534 ial = IAllocator(self,
5537 relocate_from=list(self.relocate_from),
5540 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5541 result = ial.in_text
5543 ial.Run(self.op.allocator, validate=False)
5544 result = ial.out_text