4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement ExpandNames
52 - implement CheckPrereq
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements:
57 REQ_MASTER: the LU needs to run on the master node
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 self.LogWarning = processor.LogWarning
92 self.LogInfo = processor.LogInfo
94 for attr_name in self._OP_REQP:
95 attr_val = getattr(op, attr_name, None)
97 raise errors.OpPrereqError("Required parameter '%s' missing" %
100 if not self.cfg.IsCluster():
101 raise errors.OpPrereqError("Cluster not initialized yet,"
102 " use 'gnt-cluster init' first.")
104 master = self.cfg.GetMasterNode()
105 if master != utils.HostInfo().name:
106 raise errors.OpPrereqError("Commands must be run on the master"
110 """Returns the SshRunner object
114 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
117 ssh = property(fget=__GetSSH)
119 def ExpandNames(self):
120 """Expand names for this LU.
122 This method is called before starting to execute the opcode, and it should
123 update all the parameters of the opcode to their canonical form (e.g. a
124 short node name must be fully expanded after this method has successfully
125 completed). This way locking, hooks, logging, ecc. can work correctly.
127 LUs which implement this method must also populate the self.needed_locks
128 member, as a dict with lock levels as keys, and a list of needed lock names
130 - Use an empty dict if you don't need any lock
131 - If you don't need any lock at a particular level omit that level
132 - Don't put anything for the BGL level
133 - If you want all locks at a level use locking.ALL_SET as a value
135 If you need to share locks (rather than acquire them exclusively) at one
136 level you can modify self.share_locks, setting a true value (usually 1) for
137 that level. By default locks are not shared.
140 # Acquire all nodes and one instance
141 self.needed_locks = {
142 locking.LEVEL_NODE: locking.ALL_SET,
143 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
145 # Acquire just two nodes
146 self.needed_locks = {
147 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
150 self.needed_locks = {} # No, you can't leave it to the default value None
153 # The implementation of this method is mandatory only if the new LU is
154 # concurrent, so that old LUs don't need to be changed all at the same
157 self.needed_locks = {} # Exclusive LUs don't need locks.
159 raise NotImplementedError
161 def DeclareLocks(self, level):
162 """Declare LU locking needs for a level
164 While most LUs can just declare their locking needs at ExpandNames time,
165 sometimes there's the need to calculate some locks after having acquired
166 the ones before. This function is called just before acquiring locks at a
167 particular level, but after acquiring the ones at lower levels, and permits
168 such calculations. It can be used to modify self.needed_locks, and by
169 default it does nothing.
171 This function is only called if you have something already set in
172 self.needed_locks for the level.
174 @param level: Locking level which is going to be locked
175 @type level: member of ganeti.locking.LEVELS
179 def CheckPrereq(self):
180 """Check prerequisites for this LU.
182 This method should check that the prerequisites for the execution
183 of this LU are fulfilled. It can do internode communication, but
184 it should be idempotent - no cluster or system changes are
187 The method should raise errors.OpPrereqError in case something is
188 not fulfilled. Its return value is ignored.
190 This method should also update all the parameters of the opcode to
191 their canonical form if it hasn't been done by ExpandNames before.
194 raise NotImplementedError
196 def Exec(self, feedback_fn):
199 This method should implement the actual work. It should raise
200 errors.OpExecError for failures that are somewhat dealt with in
204 raise NotImplementedError
206 def BuildHooksEnv(self):
207 """Build hooks environment for this LU.
209 This method should return a three-node tuple consisting of: a dict
210 containing the environment that will be used for running the
211 specific hook for this LU, a list of node names on which the hook
212 should run before the execution, and a list of node names on which
213 the hook should run after the execution.
215 The keys of the dict must not have 'GANETI_' prefixed as this will
216 be handled in the hooks runner. Also note additional keys will be
217 added by the hooks runner. If the LU doesn't define any
218 environment, an empty dict (and not None) should be returned.
220 No nodes should be returned as an empty list (and not None).
222 Note that if the HPATH for a LU class is None, this function will
226 raise NotImplementedError
228 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229 """Notify the LU about the results of its hooks.
231 This method is called every time a hooks phase is executed, and notifies
232 the Logical Unit about the hooks' result. The LU can then use it to alter
233 its result based on the hooks. By default the method does nothing and the
234 previous result is passed back unchanged but any LU can define it if it
235 wants to use the local cluster hook-scripts somehow.
238 phase: the hooks phase that has just been run
239 hooks_results: the results of the multi-node hooks rpc call
240 feedback_fn: function to send feedback back to the caller
241 lu_result: the previous result this LU had, or None in the PRE phase.
246 def _ExpandAndLockInstance(self):
247 """Helper function to expand and lock an instance.
249 Many LUs that work on an instance take its name in self.op.instance_name
250 and need to expand it and then declare the expanded name for locking. This
251 function does it, and then updates self.op.instance_name to the expanded
252 name. It also initializes needed_locks as a dict, if this hasn't been done
256 if self.needed_locks is None:
257 self.needed_locks = {}
259 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260 "_ExpandAndLockInstance called with instance-level locks set"
261 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262 if expanded_name is None:
263 raise errors.OpPrereqError("Instance '%s' not known" %
264 self.op.instance_name)
265 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266 self.op.instance_name = expanded_name
268 def _LockInstancesNodes(self, primary_only=False):
269 """Helper function to declare instances' nodes for locking.
271 This function should be called after locking one or more instances to lock
272 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273 with all primary or secondary nodes for instances already locked and
274 present in self.needed_locks[locking.LEVEL_INSTANCE].
276 It should be called from DeclareLocks, and for safety only works if
277 self.recalculate_locks[locking.LEVEL_NODE] is set.
279 In the future it may grow parameters to just lock some instance's nodes, or
280 to just lock primaries or secondary nodes, if needed.
282 If should be called in DeclareLocks in a way similar to:
284 if level == locking.LEVEL_NODE:
285 self._LockInstancesNodes()
287 @type primary_only: boolean
288 @param primary_only: only lock primary nodes of locked instances
291 assert locking.LEVEL_NODE in self.recalculate_locks, \
292 "_LockInstancesNodes helper function called with no nodes to recalculate"
294 # TODO: check if we're really been called with the instance locks held
296 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297 # future we might want to have different behaviors depending on the value
298 # of self.recalculate_locks[locking.LEVEL_NODE]
300 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301 instance = self.context.cfg.GetInstanceInfo(instance_name)
302 wanted_nodes.append(instance.primary_node)
304 wanted_nodes.extend(instance.secondary_nodes)
306 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
311 del self.recalculate_locks[locking.LEVEL_NODE]
314 class NoHooksLU(LogicalUnit):
315 """Simple LU which runs no hooks.
317 This LU is intended as a parent for other LogicalUnits which will
318 run no hooks, in order to reduce duplicate code.
325 def _GetWantedNodes(lu, nodes):
326 """Returns list of checked and expanded node names.
329 nodes: List of nodes (strings) or None for all
332 if not isinstance(nodes, list):
333 raise errors.OpPrereqError("Invalid argument type 'nodes'")
336 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337 " non-empty list of nodes whose name is to be expanded.")
341 node = lu.cfg.ExpandNodeName(name)
343 raise errors.OpPrereqError("No such node name '%s'" % name)
346 return utils.NiceSort(wanted)
349 def _GetWantedInstances(lu, instances):
350 """Returns list of checked and expanded instance names.
353 instances: List of instances (strings) or None for all
356 if not isinstance(instances, list):
357 raise errors.OpPrereqError("Invalid argument type 'instances'")
362 for name in instances:
363 instance = lu.cfg.ExpandInstanceName(name)
365 raise errors.OpPrereqError("No such instance name '%s'" % name)
366 wanted.append(instance)
369 wanted = lu.cfg.GetInstanceList()
370 return utils.NiceSort(wanted)
373 def _CheckOutputFields(static, dynamic, selected):
374 """Checks whether all selected fields are valid.
377 static: Static fields
378 dynamic: Dynamic fields
381 static_fields = frozenset(static)
382 dynamic_fields = frozenset(dynamic)
384 all_fields = static_fields | dynamic_fields
386 if not all_fields.issuperset(selected):
387 raise errors.OpPrereqError("Unknown output fields selected: %s"
388 % ",".join(frozenset(selected).
389 difference(all_fields)))
392 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393 memory, vcpus, nics):
394 """Builds instance related env variables for hooks from single variables.
397 secondary_nodes: List of secondary nodes as strings
401 "INSTANCE_NAME": name,
402 "INSTANCE_PRIMARY": primary_node,
403 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404 "INSTANCE_OS_TYPE": os_type,
405 "INSTANCE_STATUS": status,
406 "INSTANCE_MEMORY": memory,
407 "INSTANCE_VCPUS": vcpus,
411 nic_count = len(nics)
412 for idx, (ip, bridge, mac) in enumerate(nics):
415 env["INSTANCE_NIC%d_IP" % idx] = ip
416 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
421 env["INSTANCE_NIC_COUNT"] = nic_count
426 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
427 """Builds instance related env variables for hooks from an object.
430 instance: objects.Instance object of instance
431 override: dict of values to override
433 bep = lu.cfg.GetClusterInfo().FillBE(instance)
435 'name': instance.name,
436 'primary_node': instance.primary_node,
437 'secondary_nodes': instance.secondary_nodes,
438 'os_type': instance.os,
439 'status': instance.os,
440 'memory': bep[constants.BE_MEMORY],
441 'vcpus': bep[constants.BE_VCPUS],
442 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
445 args.update(override)
446 return _BuildInstanceHookEnv(**args)
449 def _CheckInstanceBridgesExist(lu, instance):
450 """Check that the brigdes needed by an instance exist.
453 # check bridges existance
454 brlist = [nic.bridge for nic in instance.nics]
455 if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
456 raise errors.OpPrereqError("one or more target bridges %s does not"
457 " exist on destination node '%s'" %
458 (brlist, instance.primary_node))
461 class LUDestroyCluster(NoHooksLU):
462 """Logical unit for destroying the cluster.
467 def CheckPrereq(self):
468 """Check prerequisites.
470 This checks whether the cluster is empty.
472 Any errors are signalled by raising errors.OpPrereqError.
475 master = self.cfg.GetMasterNode()
477 nodelist = self.cfg.GetNodeList()
478 if len(nodelist) != 1 or nodelist[0] != master:
479 raise errors.OpPrereqError("There are still %d node(s) in"
480 " this cluster." % (len(nodelist) - 1))
481 instancelist = self.cfg.GetInstanceList()
483 raise errors.OpPrereqError("There are still %d instance(s) in"
484 " this cluster." % len(instancelist))
486 def Exec(self, feedback_fn):
487 """Destroys the cluster.
490 master = self.cfg.GetMasterNode()
491 if not self.rpc.call_node_stop_master(master, False):
492 raise errors.OpExecError("Could not disable the master role")
493 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
494 utils.CreateBackup(priv_key)
495 utils.CreateBackup(pub_key)
499 class LUVerifyCluster(LogicalUnit):
500 """Verifies the cluster status.
503 HPATH = "cluster-verify"
504 HTYPE = constants.HTYPE_CLUSTER
505 _OP_REQP = ["skip_checks"]
508 def ExpandNames(self):
509 self.needed_locks = {
510 locking.LEVEL_NODE: locking.ALL_SET,
511 locking.LEVEL_INSTANCE: locking.ALL_SET,
513 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
515 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
516 remote_version, feedback_fn):
517 """Run multiple tests against a node.
520 - compares ganeti version
521 - checks vg existance and size > 20G
522 - checks config file checksum
523 - checks ssh to other nodes
526 node: name of the node to check
527 file_list: required list of files
528 local_cksum: dictionary of local files and their checksums
531 # compares ganeti version
532 local_version = constants.PROTOCOL_VERSION
533 if not remote_version:
534 feedback_fn(" - ERROR: connection to %s failed" % (node))
537 if local_version != remote_version:
538 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
539 (local_version, node, remote_version))
542 # checks vg existance and size > 20G
546 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
550 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
551 constants.MIN_VG_SIZE)
553 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
557 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
560 # checks config file checksum
563 if 'filelist' not in node_result:
565 feedback_fn(" - ERROR: node hasn't returned file checksum data")
567 remote_cksum = node_result['filelist']
568 for file_name in file_list:
569 if file_name not in remote_cksum:
571 feedback_fn(" - ERROR: file '%s' missing" % file_name)
572 elif remote_cksum[file_name] != local_cksum[file_name]:
574 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
576 if 'nodelist' not in node_result:
578 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
580 if node_result['nodelist']:
582 for node in node_result['nodelist']:
583 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
584 (node, node_result['nodelist'][node]))
585 if 'node-net-test' not in node_result:
587 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
589 if node_result['node-net-test']:
591 nlist = utils.NiceSort(node_result['node-net-test'].keys())
593 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
594 (node, node_result['node-net-test'][node]))
596 hyp_result = node_result.get('hypervisor', None)
597 if isinstance(hyp_result, dict):
598 for hv_name, hv_result in hyp_result.iteritems():
599 if hv_result is not None:
600 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
601 (hv_name, hv_result))
604 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
605 node_instance, feedback_fn):
606 """Verify an instance.
608 This function checks to see if the required block devices are
609 available on the instance's node.
614 node_current = instanceconfig.primary_node
617 instanceconfig.MapLVsByNode(node_vol_should)
619 for node in node_vol_should:
620 for volume in node_vol_should[node]:
621 if node not in node_vol_is or volume not in node_vol_is[node]:
622 feedback_fn(" - ERROR: volume %s missing on node %s" %
626 if not instanceconfig.status == 'down':
627 if (node_current not in node_instance or
628 not instance in node_instance[node_current]):
629 feedback_fn(" - ERROR: instance %s not running on node %s" %
630 (instance, node_current))
633 for node in node_instance:
634 if (not node == node_current):
635 if instance in node_instance[node]:
636 feedback_fn(" - ERROR: instance %s should not run on node %s" %
642 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
643 """Verify if there are any unknown volumes in the cluster.
645 The .os, .swap and backup volumes are ignored. All other volumes are
651 for node in node_vol_is:
652 for volume in node_vol_is[node]:
653 if node not in node_vol_should or volume not in node_vol_should[node]:
654 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
659 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
660 """Verify the list of running instances.
662 This checks what instances are running but unknown to the cluster.
666 for node in node_instance:
667 for runninginstance in node_instance[node]:
668 if runninginstance not in instancelist:
669 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
670 (runninginstance, node))
674 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
675 """Verify N+1 Memory Resilience.
677 Check that if one single node dies we can still start all the instances it
683 for node, nodeinfo in node_info.iteritems():
684 # This code checks that every node which is now listed as secondary has
685 # enough memory to host all instances it is supposed to should a single
686 # other node in the cluster fail.
687 # FIXME: not ready for failover to an arbitrary node
688 # FIXME: does not support file-backed instances
689 # WARNING: we currently take into account down instances as well as up
690 # ones, considering that even if they're down someone might want to start
691 # them even in the event of a node failure.
692 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
694 for instance in instances:
695 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
696 if bep[constants.BE_AUTO_BALANCE]:
697 needed_mem += bep[constants.BE_MEMORY]
698 if nodeinfo['mfree'] < needed_mem:
699 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
700 " failovers should node %s fail" % (node, prinode))
704 def CheckPrereq(self):
705 """Check prerequisites.
707 Transform the list of checks we're going to skip into a set and check that
708 all its members are valid.
711 self.skip_set = frozenset(self.op.skip_checks)
712 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
713 raise errors.OpPrereqError("Invalid checks to be skipped specified")
715 def BuildHooksEnv(self):
718 Cluster-Verify hooks just rone in the post phase and their failure makes
719 the output be logged in the verify output and the verification to fail.
722 all_nodes = self.cfg.GetNodeList()
723 # TODO: populate the environment with useful information for verify hooks
725 return env, [], all_nodes
727 def Exec(self, feedback_fn):
728 """Verify integrity of cluster, performing various test on nodes.
732 feedback_fn("* Verifying global settings")
733 for msg in self.cfg.VerifyConfig():
734 feedback_fn(" - ERROR: %s" % msg)
736 vg_name = self.cfg.GetVGName()
737 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
738 nodelist = utils.NiceSort(self.cfg.GetNodeList())
739 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
740 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
741 i_non_redundant = [] # Non redundant instances
742 i_non_a_balanced = [] # Non auto-balanced instances
748 # FIXME: verify OS list
751 file_names.append(constants.SSL_CERT_FILE)
752 file_names.append(constants.CLUSTER_CONF_FILE)
753 local_checksums = utils.FingerprintFiles(file_names)
755 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
756 all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
757 all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
758 all_vglist = self.rpc.call_vg_list(nodelist)
759 node_verify_param = {
760 'filelist': file_names,
761 'nodelist': nodelist,
762 'hypervisor': hypervisors,
763 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
764 for node in nodeinfo]
766 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
767 self.cfg.GetClusterName())
768 all_rversion = self.rpc.call_version(nodelist)
769 all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
770 self.cfg.GetHypervisorType())
772 cluster = self.cfg.GetClusterInfo()
773 for node in nodelist:
774 feedback_fn("* Verifying node %s" % node)
775 result = self._VerifyNode(node, file_names, local_checksums,
776 all_vglist[node], all_nvinfo[node],
777 all_rversion[node], feedback_fn)
781 volumeinfo = all_volumeinfo[node]
783 if isinstance(volumeinfo, basestring):
784 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
785 (node, volumeinfo[-400:].encode('string_escape')))
787 node_volume[node] = {}
788 elif not isinstance(volumeinfo, dict):
789 feedback_fn(" - ERROR: connection to %s failed" % (node,))
793 node_volume[node] = volumeinfo
796 nodeinstance = all_instanceinfo[node]
797 if type(nodeinstance) != list:
798 feedback_fn(" - ERROR: connection to %s failed" % (node,))
802 node_instance[node] = nodeinstance
805 nodeinfo = all_ninfo[node]
806 if not isinstance(nodeinfo, dict):
807 feedback_fn(" - ERROR: connection to %s failed" % (node,))
813 "mfree": int(nodeinfo['memory_free']),
814 "dfree": int(nodeinfo['vg_free']),
817 # dictionary holding all instances this node is secondary for,
818 # grouped by their primary node. Each key is a cluster node, and each
819 # value is a list of instances which have the key as primary and the
820 # current node as secondary. this is handy to calculate N+1 memory
821 # availability if you can only failover from a primary to its
823 "sinst-by-pnode": {},
826 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
832 for instance in instancelist:
833 feedback_fn("* Verifying instance %s" % instance)
834 inst_config = self.cfg.GetInstanceInfo(instance)
835 result = self._VerifyInstance(instance, inst_config, node_volume,
836 node_instance, feedback_fn)
839 inst_config.MapLVsByNode(node_vol_should)
841 instance_cfg[instance] = inst_config
843 pnode = inst_config.primary_node
844 if pnode in node_info:
845 node_info[pnode]['pinst'].append(instance)
847 feedback_fn(" - ERROR: instance %s, connection to primary node"
848 " %s failed" % (instance, pnode))
851 # If the instance is non-redundant we cannot survive losing its primary
852 # node, so we are not N+1 compliant. On the other hand we have no disk
853 # templates with more than one secondary so that situation is not well
855 # FIXME: does not support file-backed instances
856 if len(inst_config.secondary_nodes) == 0:
857 i_non_redundant.append(instance)
858 elif len(inst_config.secondary_nodes) > 1:
859 feedback_fn(" - WARNING: multiple secondaries for instance %s"
862 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
863 i_non_a_balanced.append(instance)
865 for snode in inst_config.secondary_nodes:
866 if snode in node_info:
867 node_info[snode]['sinst'].append(instance)
868 if pnode not in node_info[snode]['sinst-by-pnode']:
869 node_info[snode]['sinst-by-pnode'][pnode] = []
870 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
872 feedback_fn(" - ERROR: instance %s, connection to secondary node"
873 " %s failed" % (instance, snode))
875 feedback_fn("* Verifying orphan volumes")
876 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
880 feedback_fn("* Verifying remaining instances")
881 result = self._VerifyOrphanInstances(instancelist, node_instance,
885 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
886 feedback_fn("* Verifying N+1 Memory redundancy")
887 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
890 feedback_fn("* Other Notes")
892 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
893 % len(i_non_redundant))
896 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
897 % len(i_non_a_balanced))
901 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
902 """Analize the post-hooks' result, handle it, and send some
903 nicely-formatted feedback back to the user.
906 phase: the hooks phase that has just been run
907 hooks_results: the results of the multi-node hooks rpc call
908 feedback_fn: function to send feedback back to the caller
909 lu_result: previous Exec result
912 # We only really run POST phase hooks, and are only interested in
914 if phase == constants.HOOKS_PHASE_POST:
915 # Used to change hooks' output to proper indentation
916 indent_re = re.compile('^', re.M)
917 feedback_fn("* Hooks Results")
918 if not hooks_results:
919 feedback_fn(" - ERROR: general communication failure")
922 for node_name in hooks_results:
923 show_node_header = True
924 res = hooks_results[node_name]
925 if res is False or not isinstance(res, list):
926 feedback_fn(" Communication failure")
929 for script, hkr, output in res:
930 if hkr == constants.HKR_FAIL:
931 # The node header is only shown once, if there are
932 # failing hooks on that node
934 feedback_fn(" Node %s:" % node_name)
935 show_node_header = False
936 feedback_fn(" ERROR: Script %s failed, output:" % script)
937 output = indent_re.sub(' ', output)
938 feedback_fn("%s" % output)
944 class LUVerifyDisks(NoHooksLU):
945 """Verifies the cluster disks status.
951 def ExpandNames(self):
952 self.needed_locks = {
953 locking.LEVEL_NODE: locking.ALL_SET,
954 locking.LEVEL_INSTANCE: locking.ALL_SET,
956 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
958 def CheckPrereq(self):
959 """Check prerequisites.
961 This has no prerequisites.
966 def Exec(self, feedback_fn):
967 """Verify integrity of cluster disks.
970 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
972 vg_name = self.cfg.GetVGName()
973 nodes = utils.NiceSort(self.cfg.GetNodeList())
974 instances = [self.cfg.GetInstanceInfo(name)
975 for name in self.cfg.GetInstanceList()]
978 for inst in instances:
980 if (inst.status != "up" or
981 inst.disk_template not in constants.DTS_NET_MIRROR):
983 inst.MapLVsByNode(inst_lvs)
984 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
985 for node, vol_list in inst_lvs.iteritems():
987 nv_dict[(node, vol)] = inst
992 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
999 if isinstance(lvs, basestring):
1000 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1001 res_nlvm[node] = lvs
1002 elif not isinstance(lvs, dict):
1003 logging.warning("Connection to node %s failed or invalid data"
1005 res_nodes.append(node)
1008 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1009 inst = nv_dict.pop((node, lv_name), None)
1010 if (not lv_online and inst is not None
1011 and inst.name not in res_instances):
1012 res_instances.append(inst.name)
1014 # any leftover items in nv_dict are missing LVs, let's arrange the
1016 for key, inst in nv_dict.iteritems():
1017 if inst.name not in res_missing:
1018 res_missing[inst.name] = []
1019 res_missing[inst.name].append(key)
1024 class LURenameCluster(LogicalUnit):
1025 """Rename the cluster.
1028 HPATH = "cluster-rename"
1029 HTYPE = constants.HTYPE_CLUSTER
1032 def BuildHooksEnv(self):
1037 "OP_TARGET": self.cfg.GetClusterName(),
1038 "NEW_NAME": self.op.name,
1040 mn = self.cfg.GetMasterNode()
1041 return env, [mn], [mn]
1043 def CheckPrereq(self):
1044 """Verify that the passed name is a valid one.
1047 hostname = utils.HostInfo(self.op.name)
1049 new_name = hostname.name
1050 self.ip = new_ip = hostname.ip
1051 old_name = self.cfg.GetClusterName()
1052 old_ip = self.cfg.GetMasterIP()
1053 if new_name == old_name and new_ip == old_ip:
1054 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1055 " cluster has changed")
1056 if new_ip != old_ip:
1057 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1058 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1059 " reachable on the network. Aborting." %
1062 self.op.name = new_name
1064 def Exec(self, feedback_fn):
1065 """Rename the cluster.
1068 clustername = self.op.name
1071 # shutdown the master IP
1072 master = self.cfg.GetMasterNode()
1073 if not self.rpc.call_node_stop_master(master, False):
1074 raise errors.OpExecError("Could not disable the master role")
1079 ss.SetKey(ss.SS_MASTER_IP, ip)
1080 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1082 # Distribute updated ss config to all nodes
1083 myself = self.cfg.GetNodeInfo(master)
1084 dist_nodes = self.cfg.GetNodeList()
1085 if myself.name in dist_nodes:
1086 dist_nodes.remove(myself.name)
1088 logging.debug("Copying updated ssconf data to all nodes")
1089 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1090 fname = ss.KeyToFilename(keyname)
1091 result = self.rpc.call_upload_file(dist_nodes, fname)
1092 for to_node in dist_nodes:
1093 if not result[to_node]:
1094 self.LogWarning("Copy of file %s to node %s failed",
1097 if not self.rpc.call_node_start_master(master, False):
1098 self.LogWarning("Could not re-enable the master role on"
1099 " the master, please restart manually.")
1102 def _RecursiveCheckIfLVMBased(disk):
1103 """Check if the given disk or its children are lvm-based.
1106 disk: ganeti.objects.Disk object
1109 boolean indicating whether a LD_LV dev_type was found or not
1113 for chdisk in disk.children:
1114 if _RecursiveCheckIfLVMBased(chdisk):
1116 return disk.dev_type == constants.LD_LV
1119 class LUSetClusterParams(LogicalUnit):
1120 """Change the parameters of the cluster.
1123 HPATH = "cluster-modify"
1124 HTYPE = constants.HTYPE_CLUSTER
1128 def ExpandNames(self):
1129 # FIXME: in the future maybe other cluster params won't require checking on
1130 # all nodes to be modified.
1131 self.needed_locks = {
1132 locking.LEVEL_NODE: locking.ALL_SET,
1134 self.share_locks[locking.LEVEL_NODE] = 1
1136 def BuildHooksEnv(self):
1141 "OP_TARGET": self.cfg.GetClusterName(),
1142 "NEW_VG_NAME": self.op.vg_name,
1144 mn = self.cfg.GetMasterNode()
1145 return env, [mn], [mn]
1147 def CheckPrereq(self):
1148 """Check prerequisites.
1150 This checks whether the given params don't conflict and
1151 if the given volume group is valid.
1154 # FIXME: This only works because there is only one parameter that can be
1155 # changed or removed.
1156 if self.op.vg_name is not None and not self.op.vg_name:
1157 instances = self.cfg.GetAllInstancesInfo().values()
1158 for inst in instances:
1159 for disk in inst.disks:
1160 if _RecursiveCheckIfLVMBased(disk):
1161 raise errors.OpPrereqError("Cannot disable lvm storage while"
1162 " lvm-based instances exist")
1164 node_list = self.acquired_locks[locking.LEVEL_NODE]
1166 # if vg_name not None, checks given volume group on all nodes
1168 vglist = self.rpc.call_vg_list(node_list)
1169 for node in node_list:
1170 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1171 constants.MIN_VG_SIZE)
1173 raise errors.OpPrereqError("Error on node '%s': %s" %
1176 self.cluster = cluster = self.cfg.GetClusterInfo()
1177 # beparams changes do not need validation (we can't validate?),
1178 # but we still process here
1179 if self.op.beparams:
1180 self.new_beparams = cluster.FillDict(
1181 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1183 # hypervisor list/parameters
1184 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1185 if self.op.hvparams:
1186 if not isinstance(self.op.hvparams, dict):
1187 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1188 for hv_name, hv_dict in self.op.hvparams.items():
1189 if hv_name not in self.new_hvparams:
1190 self.new_hvparams[hv_name] = hv_dict
1192 self.new_hvparams[hv_name].update(hv_dict)
1194 if self.op.enabled_hypervisors is not None:
1195 self.hv_list = self.op.enabled_hypervisors
1197 self.hv_list = cluster.enabled_hypervisors
1199 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1200 # either the enabled list has changed, or the parameters have, validate
1201 for hv_name, hv_params in self.new_hvparams.items():
1202 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1203 (self.op.enabled_hypervisors and
1204 hv_name in self.op.enabled_hypervisors)):
1205 # either this is a new hypervisor, or its parameters have changed
1206 hv_class = hypervisor.GetHypervisor(hv_name)
1207 hv_class.CheckParameterSyntax(hv_params)
1208 _CheckHVParams(self, node_list, hv_name, hv_params)
1210 def Exec(self, feedback_fn):
1211 """Change the parameters of the cluster.
1214 if self.op.vg_name is not None:
1215 if self.op.vg_name != self.cfg.GetVGName():
1216 self.cfg.SetVGName(self.op.vg_name)
1218 feedback_fn("Cluster LVM configuration already in desired"
1219 " state, not changing")
1220 if self.op.hvparams:
1221 self.cluster.hvparams = self.new_hvparams
1222 if self.op.enabled_hypervisors is not None:
1223 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1224 if self.op.beparams:
1225 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1226 self.cfg.Update(self.cluster)
1229 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1230 """Sleep and poll for an instance's disk to sync.
1233 if not instance.disks:
1237 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1239 node = instance.primary_node
1241 for dev in instance.disks:
1242 lu.cfg.SetDiskID(dev, node)
1248 cumul_degraded = False
1249 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1251 lu.LogWarning("Can't get any data from node %s", node)
1254 raise errors.RemoteError("Can't contact node %s for mirror data,"
1255 " aborting." % node)
1259 for i in range(len(rstats)):
1262 lu.LogWarning("Can't compute data for node %s/%s",
1263 node, instance.disks[i].iv_name)
1265 # we ignore the ldisk parameter
1266 perc_done, est_time, is_degraded, _ = mstat
1267 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1268 if perc_done is not None:
1270 if est_time is not None:
1271 rem_time = "%d estimated seconds remaining" % est_time
1274 rem_time = "no time estimate"
1275 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1276 (instance.disks[i].iv_name, perc_done, rem_time))
1280 time.sleep(min(60, max_time))
1283 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1284 return not cumul_degraded
1287 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1288 """Check that mirrors are not degraded.
1290 The ldisk parameter, if True, will change the test from the
1291 is_degraded attribute (which represents overall non-ok status for
1292 the device(s)) to the ldisk (representing the local storage status).
1295 lu.cfg.SetDiskID(dev, node)
1302 if on_primary or dev.AssembleOnSecondary():
1303 rstats = lu.rpc.call_blockdev_find(node, dev)
1305 logging.warning("Node %s: disk degraded, not found or node down", node)
1308 result = result and (not rstats[idx])
1310 for child in dev.children:
1311 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1316 class LUDiagnoseOS(NoHooksLU):
1317 """Logical unit for OS diagnose/query.
1320 _OP_REQP = ["output_fields", "names"]
1323 def ExpandNames(self):
1325 raise errors.OpPrereqError("Selective OS query not supported")
1327 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1328 _CheckOutputFields(static=[],
1329 dynamic=self.dynamic_fields,
1330 selected=self.op.output_fields)
1332 # Lock all nodes, in shared mode
1333 self.needed_locks = {}
1334 self.share_locks[locking.LEVEL_NODE] = 1
1335 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1337 def CheckPrereq(self):
1338 """Check prerequisites.
1343 def _DiagnoseByOS(node_list, rlist):
1344 """Remaps a per-node return list into an a per-os per-node dictionary
1347 node_list: a list with the names of all nodes
1348 rlist: a map with node names as keys and OS objects as values
1351 map: a map with osnames as keys and as value another map, with
1353 keys and list of OS objects as values
1354 e.g. {"debian-etch": {"node1": [<object>,...],
1355 "node2": [<object>,]}
1360 for node_name, nr in rlist.iteritems():
1364 if os_obj.name not in all_os:
1365 # build a list of nodes for this os containing empty lists
1366 # for each node in node_list
1367 all_os[os_obj.name] = {}
1368 for nname in node_list:
1369 all_os[os_obj.name][nname] = []
1370 all_os[os_obj.name][node_name].append(os_obj)
1373 def Exec(self, feedback_fn):
1374 """Compute the list of OSes.
1377 node_list = self.acquired_locks[locking.LEVEL_NODE]
1378 node_data = self.rpc.call_os_diagnose(node_list)
1379 if node_data == False:
1380 raise errors.OpExecError("Can't gather the list of OSes")
1381 pol = self._DiagnoseByOS(node_list, node_data)
1383 for os_name, os_data in pol.iteritems():
1385 for field in self.op.output_fields:
1388 elif field == "valid":
1389 val = utils.all([osl and osl[0] for osl in os_data.values()])
1390 elif field == "node_status":
1392 for node_name, nos_list in os_data.iteritems():
1393 val[node_name] = [(v.status, v.path) for v in nos_list]
1395 raise errors.ParameterError(field)
1402 class LURemoveNode(LogicalUnit):
1403 """Logical unit for removing a node.
1406 HPATH = "node-remove"
1407 HTYPE = constants.HTYPE_NODE
1408 _OP_REQP = ["node_name"]
1410 def BuildHooksEnv(self):
1413 This doesn't run on the target node in the pre phase as a failed
1414 node would then be impossible to remove.
1418 "OP_TARGET": self.op.node_name,
1419 "NODE_NAME": self.op.node_name,
1421 all_nodes = self.cfg.GetNodeList()
1422 all_nodes.remove(self.op.node_name)
1423 return env, all_nodes, all_nodes
1425 def CheckPrereq(self):
1426 """Check prerequisites.
1429 - the node exists in the configuration
1430 - it does not have primary or secondary instances
1431 - it's not the master
1433 Any errors are signalled by raising errors.OpPrereqError.
1436 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1438 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1440 instance_list = self.cfg.GetInstanceList()
1442 masternode = self.cfg.GetMasterNode()
1443 if node.name == masternode:
1444 raise errors.OpPrereqError("Node is the master node,"
1445 " you need to failover first.")
1447 for instance_name in instance_list:
1448 instance = self.cfg.GetInstanceInfo(instance_name)
1449 if node.name == instance.primary_node:
1450 raise errors.OpPrereqError("Instance %s still running on the node,"
1451 " please remove first." % instance_name)
1452 if node.name in instance.secondary_nodes:
1453 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1454 " please remove first." % instance_name)
1455 self.op.node_name = node.name
1458 def Exec(self, feedback_fn):
1459 """Removes the node from the cluster.
1463 logging.info("Stopping the node daemon and removing configs from node %s",
1466 self.context.RemoveNode(node.name)
1468 self.rpc.call_node_leave_cluster(node.name)
1471 class LUQueryNodes(NoHooksLU):
1472 """Logical unit for querying nodes.
1475 _OP_REQP = ["output_fields", "names"]
1478 def ExpandNames(self):
1479 self.dynamic_fields = frozenset([
1481 "mtotal", "mnode", "mfree",
1486 self.static_fields = frozenset([
1487 "name", "pinst_cnt", "sinst_cnt",
1488 "pinst_list", "sinst_list",
1489 "pip", "sip", "tags",
1493 _CheckOutputFields(static=self.static_fields,
1494 dynamic=self.dynamic_fields,
1495 selected=self.op.output_fields)
1497 self.needed_locks = {}
1498 self.share_locks[locking.LEVEL_NODE] = 1
1501 self.wanted = _GetWantedNodes(self, self.op.names)
1503 self.wanted = locking.ALL_SET
1505 self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1507 # if we don't request only static fields, we need to lock the nodes
1508 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1511 def CheckPrereq(self):
1512 """Check prerequisites.
1515 # The validation of the node list is done in the _GetWantedNodes,
1516 # if non empty, and if empty, there's no validation to do
1519 def Exec(self, feedback_fn):
1520 """Computes the list of nodes and their attributes.
1523 all_info = self.cfg.GetAllNodesInfo()
1525 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1526 elif self.wanted != locking.ALL_SET:
1527 nodenames = self.wanted
1528 missing = set(nodenames).difference(all_info.keys())
1530 raise errors.OpExecError(
1531 "Some nodes were removed before retrieving their data: %s" % missing)
1533 nodenames = all_info.keys()
1535 nodenames = utils.NiceSort(nodenames)
1536 nodelist = [all_info[name] for name in nodenames]
1538 # begin data gathering
1540 if self.dynamic_fields.intersection(self.op.output_fields):
1542 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1543 self.cfg.GetHypervisorType())
1544 for name in nodenames:
1545 nodeinfo = node_data.get(name, None)
1548 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1549 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1550 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1551 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1552 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1553 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1554 "bootid": nodeinfo['bootid'],
1557 live_data[name] = {}
1559 live_data = dict.fromkeys(nodenames, {})
1561 node_to_primary = dict([(name, set()) for name in nodenames])
1562 node_to_secondary = dict([(name, set()) for name in nodenames])
1564 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1565 "sinst_cnt", "sinst_list"))
1566 if inst_fields & frozenset(self.op.output_fields):
1567 instancelist = self.cfg.GetInstanceList()
1569 for instance_name in instancelist:
1570 inst = self.cfg.GetInstanceInfo(instance_name)
1571 if inst.primary_node in node_to_primary:
1572 node_to_primary[inst.primary_node].add(inst.name)
1573 for secnode in inst.secondary_nodes:
1574 if secnode in node_to_secondary:
1575 node_to_secondary[secnode].add(inst.name)
1577 # end data gathering
1580 for node in nodelist:
1582 for field in self.op.output_fields:
1585 elif field == "pinst_list":
1586 val = list(node_to_primary[node.name])
1587 elif field == "sinst_list":
1588 val = list(node_to_secondary[node.name])
1589 elif field == "pinst_cnt":
1590 val = len(node_to_primary[node.name])
1591 elif field == "sinst_cnt":
1592 val = len(node_to_secondary[node.name])
1593 elif field == "pip":
1594 val = node.primary_ip
1595 elif field == "sip":
1596 val = node.secondary_ip
1597 elif field == "tags":
1598 val = list(node.GetTags())
1599 elif field == "serial_no":
1600 val = node.serial_no
1601 elif field in self.dynamic_fields:
1602 val = live_data[node.name].get(field, None)
1604 raise errors.ParameterError(field)
1605 node_output.append(val)
1606 output.append(node_output)
1611 class LUQueryNodeVolumes(NoHooksLU):
1612 """Logical unit for getting volumes on node(s).
1615 _OP_REQP = ["nodes", "output_fields"]
1618 def ExpandNames(self):
1619 _CheckOutputFields(static=["node"],
1620 dynamic=["phys", "vg", "name", "size", "instance"],
1621 selected=self.op.output_fields)
1623 self.needed_locks = {}
1624 self.share_locks[locking.LEVEL_NODE] = 1
1625 if not self.op.nodes:
1626 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1628 self.needed_locks[locking.LEVEL_NODE] = \
1629 _GetWantedNodes(self, self.op.nodes)
1631 def CheckPrereq(self):
1632 """Check prerequisites.
1634 This checks that the fields required are valid output fields.
1637 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1639 def Exec(self, feedback_fn):
1640 """Computes the list of nodes and their attributes.
1643 nodenames = self.nodes
1644 volumes = self.rpc.call_node_volumes(nodenames)
1646 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1647 in self.cfg.GetInstanceList()]
1649 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1652 for node in nodenames:
1653 if node not in volumes or not volumes[node]:
1656 node_vols = volumes[node][:]
1657 node_vols.sort(key=lambda vol: vol['dev'])
1659 for vol in node_vols:
1661 for field in self.op.output_fields:
1664 elif field == "phys":
1668 elif field == "name":
1670 elif field == "size":
1671 val = int(float(vol['size']))
1672 elif field == "instance":
1674 if node not in lv_by_node[inst]:
1676 if vol['name'] in lv_by_node[inst][node]:
1682 raise errors.ParameterError(field)
1683 node_output.append(str(val))
1685 output.append(node_output)
1690 class LUAddNode(LogicalUnit):
1691 """Logical unit for adding node to the cluster.
1695 HTYPE = constants.HTYPE_NODE
1696 _OP_REQP = ["node_name"]
1698 def BuildHooksEnv(self):
1701 This will run on all nodes before, and on all nodes + the new node after.
1705 "OP_TARGET": self.op.node_name,
1706 "NODE_NAME": self.op.node_name,
1707 "NODE_PIP": self.op.primary_ip,
1708 "NODE_SIP": self.op.secondary_ip,
1710 nodes_0 = self.cfg.GetNodeList()
1711 nodes_1 = nodes_0 + [self.op.node_name, ]
1712 return env, nodes_0, nodes_1
1714 def CheckPrereq(self):
1715 """Check prerequisites.
1718 - the new node is not already in the config
1720 - its parameters (single/dual homed) matches the cluster
1722 Any errors are signalled by raising errors.OpPrereqError.
1725 node_name = self.op.node_name
1728 dns_data = utils.HostInfo(node_name)
1730 node = dns_data.name
1731 primary_ip = self.op.primary_ip = dns_data.ip
1732 secondary_ip = getattr(self.op, "secondary_ip", None)
1733 if secondary_ip is None:
1734 secondary_ip = primary_ip
1735 if not utils.IsValidIP(secondary_ip):
1736 raise errors.OpPrereqError("Invalid secondary IP given")
1737 self.op.secondary_ip = secondary_ip
1739 node_list = cfg.GetNodeList()
1740 if not self.op.readd and node in node_list:
1741 raise errors.OpPrereqError("Node %s is already in the configuration" %
1743 elif self.op.readd and node not in node_list:
1744 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1746 for existing_node_name in node_list:
1747 existing_node = cfg.GetNodeInfo(existing_node_name)
1749 if self.op.readd and node == existing_node_name:
1750 if (existing_node.primary_ip != primary_ip or
1751 existing_node.secondary_ip != secondary_ip):
1752 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1753 " address configuration as before")
1756 if (existing_node.primary_ip == primary_ip or
1757 existing_node.secondary_ip == primary_ip or
1758 existing_node.primary_ip == secondary_ip or
1759 existing_node.secondary_ip == secondary_ip):
1760 raise errors.OpPrereqError("New node ip address(es) conflict with"
1761 " existing node %s" % existing_node.name)
1763 # check that the type of the node (single versus dual homed) is the
1764 # same as for the master
1765 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1766 master_singlehomed = myself.secondary_ip == myself.primary_ip
1767 newbie_singlehomed = secondary_ip == primary_ip
1768 if master_singlehomed != newbie_singlehomed:
1769 if master_singlehomed:
1770 raise errors.OpPrereqError("The master has no private ip but the"
1771 " new node has one")
1773 raise errors.OpPrereqError("The master has a private ip but the"
1774 " new node doesn't have one")
1776 # checks reachablity
1777 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1778 raise errors.OpPrereqError("Node not reachable by ping")
1780 if not newbie_singlehomed:
1781 # check reachability from my secondary ip to newbie's secondary ip
1782 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1783 source=myself.secondary_ip):
1784 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1785 " based ping to noded port")
1787 self.new_node = objects.Node(name=node,
1788 primary_ip=primary_ip,
1789 secondary_ip=secondary_ip)
1791 def Exec(self, feedback_fn):
1792 """Adds the new node to the cluster.
1795 new_node = self.new_node
1796 node = new_node.name
1798 # check connectivity
1799 result = self.rpc.call_version([node])[node]
1801 if constants.PROTOCOL_VERSION == result:
1802 logging.info("Communication to node %s fine, sw version %s match",
1805 raise errors.OpExecError("Version mismatch master version %s,"
1806 " node version %s" %
1807 (constants.PROTOCOL_VERSION, result))
1809 raise errors.OpExecError("Cannot get version from the new node")
1812 logging.info("Copy ssh key to node %s", node)
1813 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1815 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1816 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1822 keyarray.append(f.read())
1826 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1828 keyarray[3], keyarray[4], keyarray[5])
1831 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1833 # Add node to our /etc/hosts, and add key to known_hosts
1834 utils.AddHostToEtcHosts(new_node.name)
1836 if new_node.secondary_ip != new_node.primary_ip:
1837 if not self.rpc.call_node_has_ip_address(new_node.name,
1838 new_node.secondary_ip):
1839 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1840 " you gave (%s). Please fix and re-run this"
1841 " command." % new_node.secondary_ip)
1843 node_verify_list = [self.cfg.GetMasterNode()]
1844 node_verify_param = {
1846 # TODO: do a node-net-test as well?
1849 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1850 self.cfg.GetClusterName())
1851 for verifier in node_verify_list:
1852 if not result[verifier]:
1853 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1854 " for remote verification" % verifier)
1855 if result[verifier]['nodelist']:
1856 for failed in result[verifier]['nodelist']:
1857 feedback_fn("ssh/hostname verification failed %s -> %s" %
1858 (verifier, result[verifier]['nodelist'][failed]))
1859 raise errors.OpExecError("ssh/hostname verification failed.")
1861 # Distribute updated /etc/hosts and known_hosts to all nodes,
1862 # including the node just added
1863 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1864 dist_nodes = self.cfg.GetNodeList()
1865 if not self.op.readd:
1866 dist_nodes.append(node)
1867 if myself.name in dist_nodes:
1868 dist_nodes.remove(myself.name)
1870 logging.debug("Copying hosts and known_hosts to all nodes")
1871 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1872 result = self.rpc.call_upload_file(dist_nodes, fname)
1873 for to_node in dist_nodes:
1874 if not result[to_node]:
1875 logging.error("Copy of file %s to node %s failed", fname, to_node)
1878 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1879 to_copy.append(constants.VNC_PASSWORD_FILE)
1880 for fname in to_copy:
1881 result = self.rpc.call_upload_file([node], fname)
1882 if not result[node]:
1883 logging.error("Could not copy file %s to node %s", fname, node)
1886 self.context.ReaddNode(new_node)
1888 self.context.AddNode(new_node)
1891 class LUQueryClusterInfo(NoHooksLU):
1892 """Query cluster configuration.
1899 def ExpandNames(self):
1900 self.needed_locks = {}
1902 def CheckPrereq(self):
1903 """No prerequsites needed for this LU.
1908 def Exec(self, feedback_fn):
1909 """Return cluster config.
1912 cluster = self.cfg.GetClusterInfo()
1914 "software_version": constants.RELEASE_VERSION,
1915 "protocol_version": constants.PROTOCOL_VERSION,
1916 "config_version": constants.CONFIG_VERSION,
1917 "os_api_version": constants.OS_API_VERSION,
1918 "export_version": constants.EXPORT_VERSION,
1919 "architecture": (platform.architecture()[0], platform.machine()),
1920 "name": cluster.cluster_name,
1921 "master": cluster.master_node,
1922 "default_hypervisor": cluster.default_hypervisor,
1923 "enabled_hypervisors": cluster.enabled_hypervisors,
1924 "hvparams": cluster.hvparams,
1925 "beparams": cluster.beparams,
1931 class LUQueryConfigValues(NoHooksLU):
1932 """Return configuration values.
1938 def ExpandNames(self):
1939 self.needed_locks = {}
1941 static_fields = ["cluster_name", "master_node", "drain_flag"]
1942 _CheckOutputFields(static=static_fields,
1944 selected=self.op.output_fields)
1946 def CheckPrereq(self):
1947 """No prerequisites.
1952 def Exec(self, feedback_fn):
1953 """Dump a representation of the cluster config to the standard output.
1957 for field in self.op.output_fields:
1958 if field == "cluster_name":
1959 entry = self.cfg.GetClusterName()
1960 elif field == "master_node":
1961 entry = self.cfg.GetMasterNode()
1962 elif field == "drain_flag":
1963 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1965 raise errors.ParameterError(field)
1966 values.append(entry)
1970 class LUActivateInstanceDisks(NoHooksLU):
1971 """Bring up an instance's disks.
1974 _OP_REQP = ["instance_name"]
1977 def ExpandNames(self):
1978 self._ExpandAndLockInstance()
1979 self.needed_locks[locking.LEVEL_NODE] = []
1980 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1982 def DeclareLocks(self, level):
1983 if level == locking.LEVEL_NODE:
1984 self._LockInstancesNodes()
1986 def CheckPrereq(self):
1987 """Check prerequisites.
1989 This checks that the instance is in the cluster.
1992 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1993 assert self.instance is not None, \
1994 "Cannot retrieve locked instance %s" % self.op.instance_name
1996 def Exec(self, feedback_fn):
1997 """Activate the disks.
2000 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2002 raise errors.OpExecError("Cannot activate block devices")
2007 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2008 """Prepare the block devices for an instance.
2010 This sets up the block devices on all nodes.
2013 instance: a ganeti.objects.Instance object
2014 ignore_secondaries: if true, errors on secondary nodes won't result
2015 in an error return from the function
2018 false if the operation failed
2019 list of (host, instance_visible_name, node_visible_name) if the operation
2020 suceeded with the mapping from node devices to instance devices
2024 iname = instance.name
2025 # With the two passes mechanism we try to reduce the window of
2026 # opportunity for the race condition of switching DRBD to primary
2027 # before handshaking occured, but we do not eliminate it
2029 # The proper fix would be to wait (with some limits) until the
2030 # connection has been made and drbd transitions from WFConnection
2031 # into any other network-connected state (Connected, SyncTarget,
2034 # 1st pass, assemble on all nodes in secondary mode
2035 for inst_disk in instance.disks:
2036 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2037 lu.cfg.SetDiskID(node_disk, node)
2038 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2040 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2041 " (is_primary=False, pass=1)",
2042 inst_disk.iv_name, node)
2043 if not ignore_secondaries:
2046 # FIXME: race condition on drbd migration to primary
2048 # 2nd pass, do only the primary node
2049 for inst_disk in instance.disks:
2050 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2051 if node != instance.primary_node:
2053 lu.cfg.SetDiskID(node_disk, node)
2054 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2056 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2057 " (is_primary=True, pass=2)",
2058 inst_disk.iv_name, node)
2060 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2062 # leave the disks configured for the primary node
2063 # this is a workaround that would be fixed better by
2064 # improving the logical/physical id handling
2065 for disk in instance.disks:
2066 lu.cfg.SetDiskID(disk, instance.primary_node)
2068 return disks_ok, device_info
2071 def _StartInstanceDisks(lu, instance, force):
2072 """Start the disks of an instance.
2075 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2076 ignore_secondaries=force)
2078 _ShutdownInstanceDisks(lu, instance)
2079 if force is not None and not force:
2080 lu.proc.LogWarning("", hint="If the message above refers to a"
2082 " you can retry the operation using '--force'.")
2083 raise errors.OpExecError("Disk consistency error")
2086 class LUDeactivateInstanceDisks(NoHooksLU):
2087 """Shutdown an instance's disks.
2090 _OP_REQP = ["instance_name"]
2093 def ExpandNames(self):
2094 self._ExpandAndLockInstance()
2095 self.needed_locks[locking.LEVEL_NODE] = []
2096 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2098 def DeclareLocks(self, level):
2099 if level == locking.LEVEL_NODE:
2100 self._LockInstancesNodes()
2102 def CheckPrereq(self):
2103 """Check prerequisites.
2105 This checks that the instance is in the cluster.
2108 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2109 assert self.instance is not None, \
2110 "Cannot retrieve locked instance %s" % self.op.instance_name
2112 def Exec(self, feedback_fn):
2113 """Deactivate the disks
2116 instance = self.instance
2117 _SafeShutdownInstanceDisks(self, instance)
2120 def _SafeShutdownInstanceDisks(lu, instance):
2121 """Shutdown block devices of an instance.
2123 This function checks if an instance is running, before calling
2124 _ShutdownInstanceDisks.
2127 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2128 [instance.hypervisor])
2129 ins_l = ins_l[instance.primary_node]
2130 if not type(ins_l) is list:
2131 raise errors.OpExecError("Can't contact node '%s'" %
2132 instance.primary_node)
2134 if instance.name in ins_l:
2135 raise errors.OpExecError("Instance is running, can't shutdown"
2138 _ShutdownInstanceDisks(lu, instance)
2141 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2142 """Shutdown block devices of an instance.
2144 This does the shutdown on all nodes of the instance.
2146 If the ignore_primary is false, errors on the primary node are
2151 for disk in instance.disks:
2152 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2153 lu.cfg.SetDiskID(top_disk, node)
2154 if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2155 logging.error("Could not shutdown block device %s on node %s",
2157 if not ignore_primary or node != instance.primary_node:
2162 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2163 """Checks if a node has enough free memory.
2165 This function check if a given node has the needed amount of free
2166 memory. In case the node has less memory or we cannot get the
2167 information from the node, this function raise an OpPrereqError
2170 @type lu: C{LogicalUnit}
2171 @param lu: a logical unit from which we get configuration data
2173 @param node: the node to check
2174 @type reason: C{str}
2175 @param reason: string to use in the error message
2176 @type requested: C{int}
2177 @param requested: the amount of memory in MiB to check for
2178 @type hypervisor: C{str}
2179 @param hypervisor: the hypervisor to ask for memory stats
2180 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2181 we cannot check the node
2184 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2185 if not nodeinfo or not isinstance(nodeinfo, dict):
2186 raise errors.OpPrereqError("Could not contact node %s for resource"
2187 " information" % (node,))
2189 free_mem = nodeinfo[node].get('memory_free')
2190 if not isinstance(free_mem, int):
2191 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2192 " was '%s'" % (node, free_mem))
2193 if requested > free_mem:
2194 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2195 " needed %s MiB, available %s MiB" %
2196 (node, reason, requested, free_mem))
2199 class LUStartupInstance(LogicalUnit):
2200 """Starts an instance.
2203 HPATH = "instance-start"
2204 HTYPE = constants.HTYPE_INSTANCE
2205 _OP_REQP = ["instance_name", "force"]
2208 def ExpandNames(self):
2209 self._ExpandAndLockInstance()
2210 self.needed_locks[locking.LEVEL_NODE] = []
2211 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2213 def DeclareLocks(self, level):
2214 if level == locking.LEVEL_NODE:
2215 self._LockInstancesNodes()
2217 def BuildHooksEnv(self):
2220 This runs on master, primary and secondary nodes of the instance.
2224 "FORCE": self.op.force,
2226 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2227 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2228 list(self.instance.secondary_nodes))
2231 def CheckPrereq(self):
2232 """Check prerequisites.
2234 This checks that the instance is in the cluster.
2237 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2238 assert self.instance is not None, \
2239 "Cannot retrieve locked instance %s" % self.op.instance_name
2241 bep = self.cfg.GetClusterInfo().FillBE(instance)
2242 # check bridges existance
2243 _CheckInstanceBridgesExist(self, instance)
2245 _CheckNodeFreeMemory(self, instance.primary_node,
2246 "starting instance %s" % instance.name,
2247 bep[constants.BE_MEMORY], instance.hypervisor)
2249 def Exec(self, feedback_fn):
2250 """Start the instance.
2253 instance = self.instance
2254 force = self.op.force
2255 extra_args = getattr(self.op, "extra_args", "")
2257 self.cfg.MarkInstanceUp(instance.name)
2259 node_current = instance.primary_node
2261 _StartInstanceDisks(self, instance, force)
2263 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2264 _ShutdownInstanceDisks(self, instance)
2265 raise errors.OpExecError("Could not start instance")
2268 class LURebootInstance(LogicalUnit):
2269 """Reboot an instance.
2272 HPATH = "instance-reboot"
2273 HTYPE = constants.HTYPE_INSTANCE
2274 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2277 def ExpandNames(self):
2278 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2279 constants.INSTANCE_REBOOT_HARD,
2280 constants.INSTANCE_REBOOT_FULL]:
2281 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2282 (constants.INSTANCE_REBOOT_SOFT,
2283 constants.INSTANCE_REBOOT_HARD,
2284 constants.INSTANCE_REBOOT_FULL))
2285 self._ExpandAndLockInstance()
2286 self.needed_locks[locking.LEVEL_NODE] = []
2287 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2289 def DeclareLocks(self, level):
2290 if level == locking.LEVEL_NODE:
2291 primary_only = not constants.INSTANCE_REBOOT_FULL
2292 self._LockInstancesNodes(primary_only=primary_only)
2294 def BuildHooksEnv(self):
2297 This runs on master, primary and secondary nodes of the instance.
2301 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2303 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2304 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2305 list(self.instance.secondary_nodes))
2308 def CheckPrereq(self):
2309 """Check prerequisites.
2311 This checks that the instance is in the cluster.
2314 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2315 assert self.instance is not None, \
2316 "Cannot retrieve locked instance %s" % self.op.instance_name
2318 # check bridges existance
2319 _CheckInstanceBridgesExist(self, instance)
2321 def Exec(self, feedback_fn):
2322 """Reboot the instance.
2325 instance = self.instance
2326 ignore_secondaries = self.op.ignore_secondaries
2327 reboot_type = self.op.reboot_type
2328 extra_args = getattr(self.op, "extra_args", "")
2330 node_current = instance.primary_node
2332 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2333 constants.INSTANCE_REBOOT_HARD]:
2334 if not self.rpc.call_instance_reboot(node_current, instance,
2335 reboot_type, extra_args):
2336 raise errors.OpExecError("Could not reboot instance")
2338 if not self.rpc.call_instance_shutdown(node_current, instance):
2339 raise errors.OpExecError("could not shutdown instance for full reboot")
2340 _ShutdownInstanceDisks(self, instance)
2341 _StartInstanceDisks(self, instance, ignore_secondaries)
2342 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2343 _ShutdownInstanceDisks(self, instance)
2344 raise errors.OpExecError("Could not start instance for full reboot")
2346 self.cfg.MarkInstanceUp(instance.name)
2349 class LUShutdownInstance(LogicalUnit):
2350 """Shutdown an instance.
2353 HPATH = "instance-stop"
2354 HTYPE = constants.HTYPE_INSTANCE
2355 _OP_REQP = ["instance_name"]
2358 def ExpandNames(self):
2359 self._ExpandAndLockInstance()
2360 self.needed_locks[locking.LEVEL_NODE] = []
2361 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2363 def DeclareLocks(self, level):
2364 if level == locking.LEVEL_NODE:
2365 self._LockInstancesNodes()
2367 def BuildHooksEnv(self):
2370 This runs on master, primary and secondary nodes of the instance.
2373 env = _BuildInstanceHookEnvByObject(self, self.instance)
2374 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2375 list(self.instance.secondary_nodes))
2378 def CheckPrereq(self):
2379 """Check prerequisites.
2381 This checks that the instance is in the cluster.
2384 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2385 assert self.instance is not None, \
2386 "Cannot retrieve locked instance %s" % self.op.instance_name
2388 def Exec(self, feedback_fn):
2389 """Shutdown the instance.
2392 instance = self.instance
2393 node_current = instance.primary_node
2394 self.cfg.MarkInstanceDown(instance.name)
2395 if not self.rpc.call_instance_shutdown(node_current, instance):
2396 self.proc.LogWarning("Could not shutdown instance")
2398 _ShutdownInstanceDisks(self, instance)
2401 class LUReinstallInstance(LogicalUnit):
2402 """Reinstall an instance.
2405 HPATH = "instance-reinstall"
2406 HTYPE = constants.HTYPE_INSTANCE
2407 _OP_REQP = ["instance_name"]
2410 def ExpandNames(self):
2411 self._ExpandAndLockInstance()
2412 self.needed_locks[locking.LEVEL_NODE] = []
2413 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2415 def DeclareLocks(self, level):
2416 if level == locking.LEVEL_NODE:
2417 self._LockInstancesNodes()
2419 def BuildHooksEnv(self):
2422 This runs on master, primary and secondary nodes of the instance.
2425 env = _BuildInstanceHookEnvByObject(self, self.instance)
2426 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2427 list(self.instance.secondary_nodes))
2430 def CheckPrereq(self):
2431 """Check prerequisites.
2433 This checks that the instance is in the cluster and is not running.
2436 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2437 assert instance is not None, \
2438 "Cannot retrieve locked instance %s" % self.op.instance_name
2440 if instance.disk_template == constants.DT_DISKLESS:
2441 raise errors.OpPrereqError("Instance '%s' has no disks" %
2442 self.op.instance_name)
2443 if instance.status != "down":
2444 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2445 self.op.instance_name)
2446 remote_info = self.rpc.call_instance_info(instance.primary_node,
2448 instance.hypervisor)
2450 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2451 (self.op.instance_name,
2452 instance.primary_node))
2454 self.op.os_type = getattr(self.op, "os_type", None)
2455 if self.op.os_type is not None:
2457 pnode = self.cfg.GetNodeInfo(
2458 self.cfg.ExpandNodeName(instance.primary_node))
2460 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2462 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2464 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2465 " primary node" % self.op.os_type)
2467 self.instance = instance
2469 def Exec(self, feedback_fn):
2470 """Reinstall the instance.
2473 inst = self.instance
2475 if self.op.os_type is not None:
2476 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2477 inst.os = self.op.os_type
2478 self.cfg.Update(inst)
2480 _StartInstanceDisks(self, inst, None)
2482 feedback_fn("Running the instance OS create scripts...")
2483 if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2485 raise errors.OpExecError("Could not install OS for instance %s"
2487 (inst.name, inst.primary_node))
2489 _ShutdownInstanceDisks(self, inst)
2492 class LURenameInstance(LogicalUnit):
2493 """Rename an instance.
2496 HPATH = "instance-rename"
2497 HTYPE = constants.HTYPE_INSTANCE
2498 _OP_REQP = ["instance_name", "new_name"]
2500 def BuildHooksEnv(self):
2503 This runs on master, primary and secondary nodes of the instance.
2506 env = _BuildInstanceHookEnvByObject(self, self.instance)
2507 env["INSTANCE_NEW_NAME"] = self.op.new_name
2508 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2509 list(self.instance.secondary_nodes))
2512 def CheckPrereq(self):
2513 """Check prerequisites.
2515 This checks that the instance is in the cluster and is not running.
2518 instance = self.cfg.GetInstanceInfo(
2519 self.cfg.ExpandInstanceName(self.op.instance_name))
2520 if instance is None:
2521 raise errors.OpPrereqError("Instance '%s' not known" %
2522 self.op.instance_name)
2523 if instance.status != "down":
2524 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2525 self.op.instance_name)
2526 remote_info = self.rpc.call_instance_info(instance.primary_node,
2528 instance.hypervisor)
2530 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2531 (self.op.instance_name,
2532 instance.primary_node))
2533 self.instance = instance
2535 # new name verification
2536 name_info = utils.HostInfo(self.op.new_name)
2538 self.op.new_name = new_name = name_info.name
2539 instance_list = self.cfg.GetInstanceList()
2540 if new_name in instance_list:
2541 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2544 if not getattr(self.op, "ignore_ip", False):
2545 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2546 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2547 (name_info.ip, new_name))
2550 def Exec(self, feedback_fn):
2551 """Reinstall the instance.
2554 inst = self.instance
2555 old_name = inst.name
2557 if inst.disk_template == constants.DT_FILE:
2558 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2560 self.cfg.RenameInstance(inst.name, self.op.new_name)
2561 # Change the instance lock. This is definitely safe while we hold the BGL
2562 self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2563 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2565 # re-read the instance from the configuration after rename
2566 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2568 if inst.disk_template == constants.DT_FILE:
2569 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2570 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2571 old_file_storage_dir,
2572 new_file_storage_dir)
2575 raise errors.OpExecError("Could not connect to node '%s' to rename"
2576 " directory '%s' to '%s' (but the instance"
2577 " has been renamed in Ganeti)" % (
2578 inst.primary_node, old_file_storage_dir,
2579 new_file_storage_dir))
2582 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2583 " (but the instance has been renamed in"
2584 " Ganeti)" % (old_file_storage_dir,
2585 new_file_storage_dir))
2587 _StartInstanceDisks(self, inst, None)
2589 if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2591 msg = ("Could not run OS rename script for instance %s on node %s"
2592 " (but the instance has been renamed in Ganeti)" %
2593 (inst.name, inst.primary_node))
2594 self.proc.LogWarning(msg)
2596 _ShutdownInstanceDisks(self, inst)
2599 class LURemoveInstance(LogicalUnit):
2600 """Remove an instance.
2603 HPATH = "instance-remove"
2604 HTYPE = constants.HTYPE_INSTANCE
2605 _OP_REQP = ["instance_name", "ignore_failures"]
2608 def ExpandNames(self):
2609 self._ExpandAndLockInstance()
2610 self.needed_locks[locking.LEVEL_NODE] = []
2611 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2613 def DeclareLocks(self, level):
2614 if level == locking.LEVEL_NODE:
2615 self._LockInstancesNodes()
2617 def BuildHooksEnv(self):
2620 This runs on master, primary and secondary nodes of the instance.
2623 env = _BuildInstanceHookEnvByObject(self, self.instance)
2624 nl = [self.cfg.GetMasterNode()]
2627 def CheckPrereq(self):
2628 """Check prerequisites.
2630 This checks that the instance is in the cluster.
2633 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2634 assert self.instance is not None, \
2635 "Cannot retrieve locked instance %s" % self.op.instance_name
2637 def Exec(self, feedback_fn):
2638 """Remove the instance.
2641 instance = self.instance
2642 logging.info("Shutting down instance %s on node %s",
2643 instance.name, instance.primary_node)
2645 if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2646 if self.op.ignore_failures:
2647 feedback_fn("Warning: can't shutdown instance")
2649 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2650 (instance.name, instance.primary_node))
2652 logging.info("Removing block devices for instance %s", instance.name)
2654 if not _RemoveDisks(self, instance):
2655 if self.op.ignore_failures:
2656 feedback_fn("Warning: can't remove instance's disks")
2658 raise errors.OpExecError("Can't remove instance's disks")
2660 logging.info("Removing instance %s out of cluster config", instance.name)
2662 self.cfg.RemoveInstance(instance.name)
2663 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2666 class LUQueryInstances(NoHooksLU):
2667 """Logical unit for querying instances.
2670 _OP_REQP = ["output_fields", "names"]
2673 def ExpandNames(self):
2674 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2675 hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2676 bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2677 self.static_fields = frozenset([
2678 "name", "os", "pnode", "snodes",
2679 "admin_state", "admin_ram",
2680 "disk_template", "ip", "mac", "bridge",
2681 "sda_size", "sdb_size", "vcpus", "tags",
2682 "network_port", "beparams",
2683 "serial_no", "hypervisor", "hvparams",
2686 _CheckOutputFields(static=self.static_fields,
2687 dynamic=self.dynamic_fields,
2688 selected=self.op.output_fields)
2690 self.needed_locks = {}
2691 self.share_locks[locking.LEVEL_INSTANCE] = 1
2692 self.share_locks[locking.LEVEL_NODE] = 1
2695 self.wanted = _GetWantedInstances(self, self.op.names)
2697 self.wanted = locking.ALL_SET
2699 self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2701 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2702 self.needed_locks[locking.LEVEL_NODE] = []
2703 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2705 def DeclareLocks(self, level):
2706 if level == locking.LEVEL_NODE and self.do_locking:
2707 self._LockInstancesNodes()
2709 def CheckPrereq(self):
2710 """Check prerequisites.
2715 def Exec(self, feedback_fn):
2716 """Computes the list of nodes and their attributes.
2719 all_info = self.cfg.GetAllInstancesInfo()
2721 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2722 elif self.wanted != locking.ALL_SET:
2723 instance_names = self.wanted
2724 missing = set(instance_names).difference(all_info.keys())
2726 raise errors.OpExecError(
2727 "Some instances were removed before retrieving their data: %s"
2730 instance_names = all_info.keys()
2732 instance_names = utils.NiceSort(instance_names)
2733 instance_list = [all_info[iname] for iname in instance_names]
2735 # begin data gathering
2737 nodes = frozenset([inst.primary_node for inst in instance_list])
2738 hv_list = list(set([inst.hypervisor for inst in instance_list]))
2741 if self.dynamic_fields.intersection(self.op.output_fields):
2743 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2745 result = node_data[name]
2747 live_data.update(result)
2748 elif result == False:
2749 bad_nodes.append(name)
2750 # else no instance is alive
2752 live_data = dict([(name, {}) for name in instance_names])
2754 # end data gathering
2759 for instance in instance_list:
2761 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2762 i_be = self.cfg.GetClusterInfo().FillBE(instance)
2763 for field in self.op.output_fields:
2768 elif field == "pnode":
2769 val = instance.primary_node
2770 elif field == "snodes":
2771 val = list(instance.secondary_nodes)
2772 elif field == "admin_state":
2773 val = (instance.status != "down")
2774 elif field == "oper_state":
2775 if instance.primary_node in bad_nodes:
2778 val = bool(live_data.get(instance.name))
2779 elif field == "status":
2780 if instance.primary_node in bad_nodes:
2781 val = "ERROR_nodedown"
2783 running = bool(live_data.get(instance.name))
2785 if instance.status != "down":
2790 if instance.status != "down":
2794 elif field == "oper_ram":
2795 if instance.primary_node in bad_nodes:
2797 elif instance.name in live_data:
2798 val = live_data[instance.name].get("memory", "?")
2801 elif field == "disk_template":
2802 val = instance.disk_template
2804 val = instance.nics[0].ip
2805 elif field == "bridge":
2806 val = instance.nics[0].bridge
2807 elif field == "mac":
2808 val = instance.nics[0].mac
2809 elif field == "sda_size" or field == "sdb_size":
2810 disk = instance.FindDisk(field[:3])
2815 elif field == "tags":
2816 val = list(instance.GetTags())
2817 elif field == "serial_no":
2818 val = instance.serial_no
2819 elif field == "network_port":
2820 val = instance.network_port
2821 elif field == "hypervisor":
2822 val = instance.hypervisor
2823 elif field == "hvparams":
2825 elif (field.startswith(HVPREFIX) and
2826 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2827 val = i_hv.get(field[len(HVPREFIX):], None)
2828 elif field == "beparams":
2830 elif (field.startswith(BEPREFIX) and
2831 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2832 val = i_be.get(field[len(BEPREFIX):], None)
2834 raise errors.ParameterError(field)
2841 class LUFailoverInstance(LogicalUnit):
2842 """Failover an instance.
2845 HPATH = "instance-failover"
2846 HTYPE = constants.HTYPE_INSTANCE
2847 _OP_REQP = ["instance_name", "ignore_consistency"]
2850 def ExpandNames(self):
2851 self._ExpandAndLockInstance()
2852 self.needed_locks[locking.LEVEL_NODE] = []
2853 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2855 def DeclareLocks(self, level):
2856 if level == locking.LEVEL_NODE:
2857 self._LockInstancesNodes()
2859 def BuildHooksEnv(self):
2862 This runs on master, primary and secondary nodes of the instance.
2866 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2868 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2869 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2872 def CheckPrereq(self):
2873 """Check prerequisites.
2875 This checks that the instance is in the cluster.
2878 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2879 assert self.instance is not None, \
2880 "Cannot retrieve locked instance %s" % self.op.instance_name
2882 bep = self.cfg.GetClusterInfo().FillBE(instance)
2883 if instance.disk_template not in constants.DTS_NET_MIRROR:
2884 raise errors.OpPrereqError("Instance's disk layout is not"
2885 " network mirrored, cannot failover.")
2887 secondary_nodes = instance.secondary_nodes
2888 if not secondary_nodes:
2889 raise errors.ProgrammerError("no secondary node but using "
2890 "a mirrored disk template")
2892 target_node = secondary_nodes[0]
2893 # check memory requirements on the secondary node
2894 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2895 instance.name, bep[constants.BE_MEMORY],
2896 instance.hypervisor)
2898 # check bridge existance
2899 brlist = [nic.bridge for nic in instance.nics]
2900 if not self.rpc.call_bridges_exist(target_node, brlist):
2901 raise errors.OpPrereqError("One or more target bridges %s does not"
2902 " exist on destination node '%s'" %
2903 (brlist, target_node))
2905 def Exec(self, feedback_fn):
2906 """Failover an instance.
2908 The failover is done by shutting it down on its present node and
2909 starting it on the secondary.
2912 instance = self.instance
2914 source_node = instance.primary_node
2915 target_node = instance.secondary_nodes[0]
2917 feedback_fn("* checking disk consistency between source and target")
2918 for dev in instance.disks:
2919 # for drbd, these are drbd over lvm
2920 if not _CheckDiskConsistency(self, dev, target_node, False):
2921 if instance.status == "up" and not self.op.ignore_consistency:
2922 raise errors.OpExecError("Disk %s is degraded on target node,"
2923 " aborting failover." % dev.iv_name)
2925 feedback_fn("* shutting down instance on source node")
2926 logging.info("Shutting down instance %s on node %s",
2927 instance.name, source_node)
2929 if not self.rpc.call_instance_shutdown(source_node, instance):
2930 if self.op.ignore_consistency:
2931 self.proc.LogWarning("Could not shutdown instance %s on node %s."
2933 " anyway. Please make sure node %s is down",
2934 instance.name, source_node, source_node)
2936 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2937 (instance.name, source_node))
2939 feedback_fn("* deactivating the instance's disks on source node")
2940 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2941 raise errors.OpExecError("Can't shut down the instance's disks.")
2943 instance.primary_node = target_node
2944 # distribute new instance config to the other nodes
2945 self.cfg.Update(instance)
2947 # Only start the instance if it's marked as up
2948 if instance.status == "up":
2949 feedback_fn("* activating the instance's disks on target node")
2950 logging.info("Starting instance %s on node %s",
2951 instance.name, target_node)
2953 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2954 ignore_secondaries=True)
2956 _ShutdownInstanceDisks(self, instance)
2957 raise errors.OpExecError("Can't activate the instance's disks")
2959 feedback_fn("* starting the instance on the target node")
2960 if not self.rpc.call_instance_start(target_node, instance, None):
2961 _ShutdownInstanceDisks(self, instance)
2962 raise errors.OpExecError("Could not start instance %s on node %s." %
2963 (instance.name, target_node))
2966 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2967 """Create a tree of block devices on the primary node.
2969 This always creates all devices.
2973 for child in device.children:
2974 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2977 lu.cfg.SetDiskID(device, node)
2978 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2979 instance.name, True, info)
2982 if device.physical_id is None:
2983 device.physical_id = new_id
2987 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2988 """Create a tree of block devices on a secondary node.
2990 If this device type has to be created on secondaries, create it and
2993 If not, just recurse to children keeping the same 'force' value.
2996 if device.CreateOnSecondary():
2999 for child in device.children:
3000 if not _CreateBlockDevOnSecondary(lu, node, instance,
3001 child, force, info):
3006 lu.cfg.SetDiskID(device, node)
3007 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3008 instance.name, False, info)
3011 if device.physical_id is None:
3012 device.physical_id = new_id
3016 def _GenerateUniqueNames(lu, exts):
3017 """Generate a suitable LV name.
3019 This will generate a logical volume name for the given instance.
3024 new_id = lu.cfg.GenerateUniqueID()
3025 results.append("%s%s" % (new_id, val))
3029 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3031 """Generate a drbd8 device complete with its children.
3034 port = lu.cfg.AllocatePort()
3035 vgname = lu.cfg.GetVGName()
3036 shared_secret = lu.cfg.GenerateDRBDSecret()
3037 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3038 logical_id=(vgname, names[0]))
3039 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3040 logical_id=(vgname, names[1]))
3041 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3042 logical_id=(primary, secondary, port,
3045 children=[dev_data, dev_meta],
3050 def _GenerateDiskTemplate(lu, template_name,
3051 instance_name, primary_node,
3052 secondary_nodes, disk_sz, swap_sz,
3053 file_storage_dir, file_driver):
3054 """Generate the entire disk layout for a given template type.
3057 #TODO: compute space requirements
3059 vgname = lu.cfg.GetVGName()
3060 if template_name == constants.DT_DISKLESS:
3062 elif template_name == constants.DT_PLAIN:
3063 if len(secondary_nodes) != 0:
3064 raise errors.ProgrammerError("Wrong template configuration")
3066 names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3067 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3068 logical_id=(vgname, names[0]),
3070 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3071 logical_id=(vgname, names[1]),
3073 disks = [sda_dev, sdb_dev]
3074 elif template_name == constants.DT_DRBD8:
3075 if len(secondary_nodes) != 1:
3076 raise errors.ProgrammerError("Wrong template configuration")
3077 remote_node = secondary_nodes[0]
3078 (minor_pa, minor_pb,
3079 minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3080 [primary_node, primary_node, remote_node, remote_node], instance_name)
3082 names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3083 ".sdb_data", ".sdb_meta"])
3084 drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3085 disk_sz, names[0:2], "sda",
3087 drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3088 swap_sz, names[2:4], "sdb",
3090 disks = [drbd_sda_dev, drbd_sdb_dev]
3091 elif template_name == constants.DT_FILE:
3092 if len(secondary_nodes) != 0:
3093 raise errors.ProgrammerError("Wrong template configuration")
3095 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3096 iv_name="sda", logical_id=(file_driver,
3097 "%s/sda" % file_storage_dir))
3098 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3099 iv_name="sdb", logical_id=(file_driver,
3100 "%s/sdb" % file_storage_dir))
3101 disks = [file_sda_dev, file_sdb_dev]
3103 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3107 def _GetInstanceInfoText(instance):
3108 """Compute that text that should be added to the disk's metadata.
3111 return "originstname+%s" % instance.name
3114 def _CreateDisks(lu, instance):
3115 """Create all disks for an instance.
3117 This abstracts away some work from AddInstance.
3120 instance: the instance object
3123 True or False showing the success of the creation process
3126 info = _GetInstanceInfoText(instance)
3128 if instance.disk_template == constants.DT_FILE:
3129 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3130 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3134 logging.error("Could not connect to node '%s'", instance.primary_node)
3138 logging.error("Failed to create directory '%s'", file_storage_dir)
3141 for device in instance.disks:
3142 logging.info("Creating volume %s for instance %s",
3143 device.iv_name, instance.name)
3145 for secondary_node in instance.secondary_nodes:
3146 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3147 device, False, info):
3148 logging.error("Failed to create volume %s (%s) on secondary node %s!",
3149 device.iv_name, device, secondary_node)
3152 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3153 instance, device, info):
3154 logging.error("Failed to create volume %s on primary!", device.iv_name)
3160 def _RemoveDisks(lu, instance):
3161 """Remove all disks for an instance.
3163 This abstracts away some work from `AddInstance()` and
3164 `RemoveInstance()`. Note that in case some of the devices couldn't
3165 be removed, the removal will continue with the other ones (compare
3166 with `_CreateDisks()`).
3169 instance: the instance object
3172 True or False showing the success of the removal proces
3175 logging.info("Removing block devices for instance %s", instance.name)
3178 for device in instance.disks:
3179 for node, disk in device.ComputeNodeTree(instance.primary_node):
3180 lu.cfg.SetDiskID(disk, node)
3181 if not lu.rpc.call_blockdev_remove(node, disk):
3182 lu.proc.LogWarning("Could not remove block device %s on node %s,"
3183 " continuing anyway", device.iv_name, node)
3186 if instance.disk_template == constants.DT_FILE:
3187 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3188 if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3190 logging.error("Could not remove directory '%s'", file_storage_dir)
3196 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3197 """Compute disk size requirements in the volume group
3199 This is currently hard-coded for the two-drive layout.
3202 # Required free disk space as a function of disk and swap space
3204 constants.DT_DISKLESS: None,
3205 constants.DT_PLAIN: disk_size + swap_size,
3206 # 256 MB are added for drbd metadata, 128MB for each drbd device
3207 constants.DT_DRBD8: disk_size + swap_size + 256,
3208 constants.DT_FILE: None,
3211 if disk_template not in req_size_dict:
3212 raise errors.ProgrammerError("Disk template '%s' size requirement"
3213 " is unknown" % disk_template)
3215 return req_size_dict[disk_template]
3218 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3219 """Hypervisor parameter validation.
3221 This function abstract the hypervisor parameter validation to be
3222 used in both instance create and instance modify.
3224 @type lu: L{LogicalUnit}
3225 @param lu: the logical unit for which we check
3226 @type nodenames: list
3227 @param nodenames: the list of nodes on which we should check
3228 @type hvname: string
3229 @param hvname: the name of the hypervisor we should use
3230 @type hvparams: dict
3231 @param hvparams: the parameters which we need to check
3232 @raise errors.OpPrereqError: if the parameters are not valid
3235 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3238 for node in nodenames:
3239 info = hvinfo.get(node, None)
3240 if not info or not isinstance(info, (tuple, list)):
3241 raise errors.OpPrereqError("Cannot get current information"
3242 " from node '%s' (%s)" % (node, info))
3244 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3248 class LUCreateInstance(LogicalUnit):
3249 """Create an instance.
3252 HPATH = "instance-add"
3253 HTYPE = constants.HTYPE_INSTANCE
3254 _OP_REQP = ["instance_name", "disk_size",
3255 "disk_template", "swap_size", "mode", "start",
3256 "wait_for_sync", "ip_check", "mac",
3257 "hvparams", "beparams"]
3260 def _ExpandNode(self, node):
3261 """Expands and checks one node name.
3264 node_full = self.cfg.ExpandNodeName(node)
3265 if node_full is None:
3266 raise errors.OpPrereqError("Unknown node %s" % node)
3269 def ExpandNames(self):
3270 """ExpandNames for CreateInstance.
3272 Figure out the right locks for instance creation.
3275 self.needed_locks = {}
3277 # set optional parameters to none if they don't exist
3278 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3279 if not hasattr(self.op, attr):
3280 setattr(self.op, attr, None)
3282 # cheap checks, mostly valid constants given
3284 # verify creation mode
3285 if self.op.mode not in (constants.INSTANCE_CREATE,
3286 constants.INSTANCE_IMPORT):
3287 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3290 # disk template and mirror node verification
3291 if self.op.disk_template not in constants.DISK_TEMPLATES:
3292 raise errors.OpPrereqError("Invalid disk template name")
3294 if self.op.hypervisor is None:
3295 self.op.hypervisor = self.cfg.GetHypervisorType()
3297 cluster = self.cfg.GetClusterInfo()
3298 enabled_hvs = cluster.enabled_hypervisors
3299 if self.op.hypervisor not in enabled_hvs:
3300 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3301 " cluster (%s)" % (self.op.hypervisor,
3302 ",".join(enabled_hvs)))
3304 # check hypervisor parameter syntax (locally)
3306 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3308 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3309 hv_type.CheckParameterSyntax(filled_hvp)
3311 # fill and remember the beparams dict
3312 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3315 #### instance parameters check
3317 # instance name verification
3318 hostname1 = utils.HostInfo(self.op.instance_name)
3319 self.op.instance_name = instance_name = hostname1.name
3321 # this is just a preventive check, but someone might still add this
3322 # instance in the meantime, and creation will fail at lock-add time
3323 if instance_name in self.cfg.GetInstanceList():
3324 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3327 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3329 # ip validity checks
3330 ip = getattr(self.op, "ip", None)
3331 if ip is None or ip.lower() == "none":
3333 elif ip.lower() == constants.VALUE_AUTO:
3334 inst_ip = hostname1.ip
3336 if not utils.IsValidIP(ip):
3337 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3338 " like a valid IP" % ip)
3340 self.inst_ip = self.op.ip = inst_ip
3341 # used in CheckPrereq for ip ping check
3342 self.check_ip = hostname1.ip
3344 # MAC address verification
3345 if self.op.mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3346 if not utils.IsValidMac(self.op.mac.lower()):
3347 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3350 # file storage checks
3351 if (self.op.file_driver and
3352 not self.op.file_driver in constants.FILE_DRIVER):
3353 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3354 self.op.file_driver)
3356 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3357 raise errors.OpPrereqError("File storage directory path not absolute")
3359 ### Node/iallocator related checks
3360 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3361 raise errors.OpPrereqError("One and only one of iallocator and primary"
3362 " node must be given")
3364 if self.op.iallocator:
3365 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3367 self.op.pnode = self._ExpandNode(self.op.pnode)
3368 nodelist = [self.op.pnode]
3369 if self.op.snode is not None:
3370 self.op.snode = self._ExpandNode(self.op.snode)
3371 nodelist.append(self.op.snode)
3372 self.needed_locks[locking.LEVEL_NODE] = nodelist
3374 # in case of import lock the source node too
3375 if self.op.mode == constants.INSTANCE_IMPORT:
3376 src_node = getattr(self.op, "src_node", None)
3377 src_path = getattr(self.op, "src_path", None)
3379 if src_node is None or src_path is None:
3380 raise errors.OpPrereqError("Importing an instance requires source"
3381 " node and path options")
3383 if not os.path.isabs(src_path):
3384 raise errors.OpPrereqError("The source path must be absolute")
3386 self.op.src_node = src_node = self._ExpandNode(src_node)
3387 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3388 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3390 else: # INSTANCE_CREATE
3391 if getattr(self.op, "os_type", None) is None:
3392 raise errors.OpPrereqError("No guest OS specified")
3394 def _RunAllocator(self):
3395 """Run the allocator based on input opcode.
3398 disks = [{"size": self.op.disk_size, "mode": "w"},
3399 {"size": self.op.swap_size, "mode": "w"}]
3400 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3401 "bridge": self.op.bridge}]
3402 ial = IAllocator(self,
3403 mode=constants.IALLOCATOR_MODE_ALLOC,
3404 name=self.op.instance_name,
3405 disk_template=self.op.disk_template,
3408 vcpus=self.be_full[constants.BE_VCPUS],
3409 mem_size=self.be_full[constants.BE_MEMORY],
3414 ial.Run(self.op.iallocator)
3417 raise errors.OpPrereqError("Can't compute nodes using"
3418 " iallocator '%s': %s" % (self.op.iallocator,
3420 if len(ial.nodes) != ial.required_nodes:
3421 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3422 " of nodes (%s), required %s" %
3423 (self.op.iallocator, len(ial.nodes),
3424 ial.required_nodes))
3425 self.op.pnode = ial.nodes[0]
3426 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3427 self.op.instance_name, self.op.iallocator,
3428 ", ".join(ial.nodes))
3429 if ial.required_nodes == 2:
3430 self.op.snode = ial.nodes[1]
3432 def BuildHooksEnv(self):
3435 This runs on master, primary and secondary nodes of the instance.
3439 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3440 "INSTANCE_DISK_SIZE": self.op.disk_size,
3441 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3442 "INSTANCE_ADD_MODE": self.op.mode,
3444 if self.op.mode == constants.INSTANCE_IMPORT:
3445 env["INSTANCE_SRC_NODE"] = self.op.src_node
3446 env["INSTANCE_SRC_PATH"] = self.op.src_path
3447 env["INSTANCE_SRC_IMAGES"] = self.src_images
3449 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3450 primary_node=self.op.pnode,
3451 secondary_nodes=self.secondaries,
3452 status=self.instance_status,
3453 os_type=self.op.os_type,
3454 memory=self.be_full[constants.BE_MEMORY],
3455 vcpus=self.be_full[constants.BE_VCPUS],
3456 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3459 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3464 def CheckPrereq(self):
3465 """Check prerequisites.
3468 if (not self.cfg.GetVGName() and
3469 self.op.disk_template not in constants.DTS_NOT_LVM):
3470 raise errors.OpPrereqError("Cluster does not support lvm-based"
3474 if self.op.mode == constants.INSTANCE_IMPORT:
3475 src_node = self.op.src_node
3476 src_path = self.op.src_path
3478 export_info = self.rpc.call_export_info(src_node, src_path)
3481 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3483 if not export_info.has_section(constants.INISECT_EXP):
3484 raise errors.ProgrammerError("Corrupted export config")
3486 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3487 if (int(ei_version) != constants.EXPORT_VERSION):
3488 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3489 (ei_version, constants.EXPORT_VERSION))
3491 # Check that the new instance doesn't have less disks than the export
3492 # TODO: substitute "2" with the actual number of disks requested
3494 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3495 if instance_disks < export_disks:
3496 raise errors.OpPrereqError("Not enough disks to import."
3497 " (instance: %d, export: %d)" %
3500 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3502 for idx in range(export_disks):
3503 option = 'disk%d_dump' % idx
3504 if export_info.has_option(constants.INISECT_INS, option):
3505 # FIXME: are the old os-es, disk sizes, etc. useful?
3506 export_name = export_info.get(constants.INISECT_INS, option)
3507 image = os.path.join(src_path, export_name)
3508 disk_images.append(image)
3510 disk_images.append(False)
3512 self.src_images = disk_images
3514 if self.op.mac == constants.VALUE_AUTO:
3515 old_name = export_info.get(constants.INISECT_INS, 'name')
3516 if self.op.instance_name == old_name:
3517 # FIXME: adjust every nic, when we'll be able to create instances
3518 # with more than one
3519 if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3520 self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3522 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3524 if self.op.start and not self.op.ip_check:
3525 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3526 " adding an instance in start mode")
3528 if self.op.ip_check:
3529 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3530 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3531 (self.check_ip, self.op.instance_name))
3533 # bridge verification
3534 bridge = getattr(self.op, "bridge", None)
3536 self.op.bridge = self.cfg.GetDefBridge()
3538 self.op.bridge = bridge
3542 if self.op.iallocator is not None:
3543 self._RunAllocator()
3545 #### node related checks
3547 # check primary node
3548 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3549 assert self.pnode is not None, \
3550 "Cannot retrieve locked node %s" % self.op.pnode
3551 self.secondaries = []
3553 # mirror node verification
3554 if self.op.disk_template in constants.DTS_NET_MIRROR:
3555 if self.op.snode is None:
3556 raise errors.OpPrereqError("The networked disk templates need"
3558 if self.op.snode == pnode.name:
3559 raise errors.OpPrereqError("The secondary node cannot be"
3560 " the primary node.")
3561 self.secondaries.append(self.op.snode)
3563 nodenames = [pnode.name] + self.secondaries
3565 req_size = _ComputeDiskSize(self.op.disk_template,
3566 self.op.disk_size, self.op.swap_size)
3568 # Check lv size requirements
3569 if req_size is not None:
3570 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3572 for node in nodenames:
3573 info = nodeinfo.get(node, None)
3575 raise errors.OpPrereqError("Cannot get current information"
3576 " from node '%s'" % node)
3577 vg_free = info.get('vg_free', None)
3578 if not isinstance(vg_free, int):
3579 raise errors.OpPrereqError("Can't compute free disk space on"
3581 if req_size > info['vg_free']:
3582 raise errors.OpPrereqError("Not enough disk space on target node %s."
3583 " %d MB available, %d MB required" %
3584 (node, info['vg_free'], req_size))
3586 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3589 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3591 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3592 " primary node" % self.op.os_type)
3594 # bridge check on primary node
3595 if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3596 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3597 " destination node '%s'" %
3598 (self.op.bridge, pnode.name))
3600 # memory check on primary node
3602 _CheckNodeFreeMemory(self, self.pnode.name,
3603 "creating instance %s" % self.op.instance_name,
3604 self.be_full[constants.BE_MEMORY],
3608 self.instance_status = 'up'
3610 self.instance_status = 'down'
3612 def Exec(self, feedback_fn):
3613 """Create and add the instance to the cluster.
3616 instance = self.op.instance_name
3617 pnode_name = self.pnode.name
3619 if self.op.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3620 mac_address = self.cfg.GenerateMAC()
3622 mac_address = self.op.mac
3624 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3625 if self.inst_ip is not None:
3626 nic.ip = self.inst_ip
3628 ht_kind = self.op.hypervisor
3629 if ht_kind in constants.HTS_REQ_PORT:
3630 network_port = self.cfg.AllocatePort()
3634 ##if self.op.vnc_bind_address is None:
3635 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3637 # this is needed because os.path.join does not accept None arguments
3638 if self.op.file_storage_dir is None:
3639 string_file_storage_dir = ""
3641 string_file_storage_dir = self.op.file_storage_dir
3643 # build the full file storage dir path
3644 file_storage_dir = os.path.normpath(os.path.join(
3645 self.cfg.GetFileStorageDir(),
3646 string_file_storage_dir, instance))
3649 disks = _GenerateDiskTemplate(self,
3650 self.op.disk_template,
3651 instance, pnode_name,
3652 self.secondaries, self.op.disk_size,
3655 self.op.file_driver)
3657 iobj = objects.Instance(name=instance, os=self.op.os_type,
3658 primary_node=pnode_name,
3659 nics=[nic], disks=disks,
3660 disk_template=self.op.disk_template,
3661 status=self.instance_status,
3662 network_port=network_port,
3663 beparams=self.op.beparams,
3664 hvparams=self.op.hvparams,
3665 hypervisor=self.op.hypervisor,
3668 feedback_fn("* creating instance disks...")
3669 if not _CreateDisks(self, iobj):
3670 _RemoveDisks(self, iobj)
3671 self.cfg.ReleaseDRBDMinors(instance)
3672 raise errors.OpExecError("Device creation failed, reverting...")
3674 feedback_fn("adding instance %s to cluster config" % instance)
3676 self.cfg.AddInstance(iobj)
3677 # Declare that we don't want to remove the instance lock anymore, as we've
3678 # added the instance to the config
3679 del self.remove_locks[locking.LEVEL_INSTANCE]
3680 # Remove the temp. assignements for the instance's drbds
3681 self.cfg.ReleaseDRBDMinors(instance)
3683 if self.op.wait_for_sync:
3684 disk_abort = not _WaitForSync(self, iobj)
3685 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3686 # make sure the disks are not degraded (still sync-ing is ok)
3688 feedback_fn("* checking mirrors status")
3689 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3694 _RemoveDisks(self, iobj)
3695 self.cfg.RemoveInstance(iobj.name)
3696 # Make sure the instance lock gets removed
3697 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3698 raise errors.OpExecError("There are some degraded disks for"
3701 feedback_fn("creating os for instance %s on node %s" %
3702 (instance, pnode_name))
3704 if iobj.disk_template != constants.DT_DISKLESS:
3705 if self.op.mode == constants.INSTANCE_CREATE:
3706 feedback_fn("* running the instance OS create scripts...")
3707 if not self.rpc.call_instance_os_add(pnode_name, iobj):
3708 raise errors.OpExecError("could not add os for instance %s"
3710 (instance, pnode_name))
3712 elif self.op.mode == constants.INSTANCE_IMPORT:
3713 feedback_fn("* running the instance OS import scripts...")
3714 src_node = self.op.src_node
3715 src_images = self.src_images
3716 cluster_name = self.cfg.GetClusterName()
3717 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3718 src_node, src_images,
3720 for idx, result in enumerate(import_result):
3722 self.LogWarning("Could not image %s for on instance %s, disk %d,"
3723 " on node %s" % (src_images[idx], instance, idx,
3726 # also checked in the prereq part
3727 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3731 logging.info("Starting instance %s on node %s", instance, pnode_name)
3732 feedback_fn("* starting instance...")
3733 if not self.rpc.call_instance_start(pnode_name, iobj, None):
3734 raise errors.OpExecError("Could not start instance")
3737 class LUConnectConsole(NoHooksLU):
3738 """Connect to an instance's console.
3740 This is somewhat special in that it returns the command line that
3741 you need to run on the master node in order to connect to the
3745 _OP_REQP = ["instance_name"]
3748 def ExpandNames(self):
3749 self._ExpandAndLockInstance()
3751 def CheckPrereq(self):
3752 """Check prerequisites.
3754 This checks that the instance is in the cluster.
3757 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3758 assert self.instance is not None, \
3759 "Cannot retrieve locked instance %s" % self.op.instance_name
3761 def Exec(self, feedback_fn):
3762 """Connect to the console of an instance
3765 instance = self.instance
3766 node = instance.primary_node
3768 node_insts = self.rpc.call_instance_list([node],
3769 [instance.hypervisor])[node]
3770 if node_insts is False:
3771 raise errors.OpExecError("Can't connect to node %s." % node)
3773 if instance.name not in node_insts:
3774 raise errors.OpExecError("Instance %s is not running." % instance.name)
3776 logging.debug("Connecting to console of %s on %s", instance.name, node)
3778 hyper = hypervisor.GetHypervisor(instance.hypervisor)
3779 console_cmd = hyper.GetShellCommandForConsole(instance)
3782 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3785 class LUReplaceDisks(LogicalUnit):
3786 """Replace the disks of an instance.
3789 HPATH = "mirrors-replace"
3790 HTYPE = constants.HTYPE_INSTANCE
3791 _OP_REQP = ["instance_name", "mode", "disks"]
3794 def ExpandNames(self):
3795 self._ExpandAndLockInstance()
3797 if not hasattr(self.op, "remote_node"):
3798 self.op.remote_node = None
3800 ia_name = getattr(self.op, "iallocator", None)
3801 if ia_name is not None:
3802 if self.op.remote_node is not None:
3803 raise errors.OpPrereqError("Give either the iallocator or the new"
3804 " secondary, not both")
3805 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3806 elif self.op.remote_node is not None:
3807 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3808 if remote_node is None:
3809 raise errors.OpPrereqError("Node '%s' not known" %
3810 self.op.remote_node)
3811 self.op.remote_node = remote_node
3812 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3813 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3815 self.needed_locks[locking.LEVEL_NODE] = []
3816 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3818 def DeclareLocks(self, level):
3819 # If we're not already locking all nodes in the set we have to declare the
3820 # instance's primary/secondary nodes.
3821 if (level == locking.LEVEL_NODE and
3822 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3823 self._LockInstancesNodes()
3825 def _RunAllocator(self):
3826 """Compute a new secondary node using an IAllocator.
3829 ial = IAllocator(self,
3830 mode=constants.IALLOCATOR_MODE_RELOC,
3831 name=self.op.instance_name,
3832 relocate_from=[self.sec_node])
3834 ial.Run(self.op.iallocator)
3837 raise errors.OpPrereqError("Can't compute nodes using"
3838 " iallocator '%s': %s" % (self.op.iallocator,
3840 if len(ial.nodes) != ial.required_nodes:
3841 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3842 " of nodes (%s), required %s" %
3843 (len(ial.nodes), ial.required_nodes))
3844 self.op.remote_node = ial.nodes[0]
3845 self.LogInfo("Selected new secondary for the instance: %s",
3846 self.op.remote_node)
3848 def BuildHooksEnv(self):
3851 This runs on the master, the primary and all the secondaries.
3855 "MODE": self.op.mode,
3856 "NEW_SECONDARY": self.op.remote_node,
3857 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3859 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3861 self.cfg.GetMasterNode(),
3862 self.instance.primary_node,
3864 if self.op.remote_node is not None:
3865 nl.append(self.op.remote_node)
3868 def CheckPrereq(self):
3869 """Check prerequisites.
3871 This checks that the instance is in the cluster.
3874 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3875 assert instance is not None, \
3876 "Cannot retrieve locked instance %s" % self.op.instance_name
3877 self.instance = instance
3879 if instance.disk_template not in constants.DTS_NET_MIRROR:
3880 raise errors.OpPrereqError("Instance's disk layout is not"
3881 " network mirrored.")
3883 if len(instance.secondary_nodes) != 1:
3884 raise errors.OpPrereqError("The instance has a strange layout,"
3885 " expected one secondary but found %d" %
3886 len(instance.secondary_nodes))
3888 self.sec_node = instance.secondary_nodes[0]
3890 ia_name = getattr(self.op, "iallocator", None)
3891 if ia_name is not None:
3892 self._RunAllocator()
3894 remote_node = self.op.remote_node
3895 if remote_node is not None:
3896 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3897 assert self.remote_node_info is not None, \
3898 "Cannot retrieve locked node %s" % remote_node
3900 self.remote_node_info = None
3901 if remote_node == instance.primary_node:
3902 raise errors.OpPrereqError("The specified node is the primary node of"
3904 elif remote_node == self.sec_node:
3905 if self.op.mode == constants.REPLACE_DISK_SEC:
3906 # this is for DRBD8, where we can't execute the same mode of
3907 # replacement as for drbd7 (no different port allocated)
3908 raise errors.OpPrereqError("Same secondary given, cannot execute"
3910 if instance.disk_template == constants.DT_DRBD8:
3911 if (self.op.mode == constants.REPLACE_DISK_ALL and
3912 remote_node is not None):
3913 # switch to replace secondary mode
3914 self.op.mode = constants.REPLACE_DISK_SEC
3916 if self.op.mode == constants.REPLACE_DISK_ALL:
3917 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3918 " secondary disk replacement, not"
3920 elif self.op.mode == constants.REPLACE_DISK_PRI:
3921 if remote_node is not None:
3922 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3923 " the secondary while doing a primary"
3924 " node disk replacement")
3925 self.tgt_node = instance.primary_node
3926 self.oth_node = instance.secondary_nodes[0]
3927 elif self.op.mode == constants.REPLACE_DISK_SEC:
3928 self.new_node = remote_node # this can be None, in which case
3929 # we don't change the secondary
3930 self.tgt_node = instance.secondary_nodes[0]
3931 self.oth_node = instance.primary_node
3933 raise errors.ProgrammerError("Unhandled disk replace mode")
3935 for name in self.op.disks:
3936 if instance.FindDisk(name) is None:
3937 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3938 (name, instance.name))
3940 def _ExecD8DiskOnly(self, feedback_fn):
3941 """Replace a disk on the primary or secondary for dbrd8.
3943 The algorithm for replace is quite complicated:
3944 - for each disk to be replaced:
3945 - create new LVs on the target node with unique names
3946 - detach old LVs from the drbd device
3947 - rename old LVs to name_replaced.<time_t>
3948 - rename new LVs to old LVs
3949 - attach the new LVs (with the old names now) to the drbd device
3950 - wait for sync across all devices
3951 - for each modified disk:
3952 - remove old LVs (which have the name name_replaces.<time_t>)
3954 Failures are not very well handled.
3958 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3959 instance = self.instance
3961 vgname = self.cfg.GetVGName()
3964 tgt_node = self.tgt_node
3965 oth_node = self.oth_node
3967 # Step: check device activation
3968 self.proc.LogStep(1, steps_total, "check device existence")
3969 info("checking volume groups")
3970 my_vg = cfg.GetVGName()
3971 results = self.rpc.call_vg_list([oth_node, tgt_node])
3973 raise errors.OpExecError("Can't list volume groups on the nodes")
3974 for node in oth_node, tgt_node:
3975 res = results.get(node, False)
3976 if not res or my_vg not in res:
3977 raise errors.OpExecError("Volume group '%s' not found on %s" %
3979 for dev in instance.disks:
3980 if not dev.iv_name in self.op.disks:
3982 for node in tgt_node, oth_node:
3983 info("checking %s on %s" % (dev.iv_name, node))
3984 cfg.SetDiskID(dev, node)
3985 if not self.rpc.call_blockdev_find(node, dev):
3986 raise errors.OpExecError("Can't find device %s on node %s" %
3987 (dev.iv_name, node))
3989 # Step: check other node consistency
3990 self.proc.LogStep(2, steps_total, "check peer consistency")
3991 for dev in instance.disks:
3992 if not dev.iv_name in self.op.disks:
3994 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3995 if not _CheckDiskConsistency(self, dev, oth_node,
3996 oth_node==instance.primary_node):
3997 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3998 " to replace disks on this node (%s)" %
3999 (oth_node, tgt_node))
4001 # Step: create new storage
4002 self.proc.LogStep(3, steps_total, "allocate new storage")
4003 for dev in instance.disks:
4004 if not dev.iv_name in self.op.disks:
4007 cfg.SetDiskID(dev, tgt_node)
4008 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4009 names = _GenerateUniqueNames(self, lv_names)
4010 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4011 logical_id=(vgname, names[0]))
4012 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4013 logical_id=(vgname, names[1]))
4014 new_lvs = [lv_data, lv_meta]
4015 old_lvs = dev.children
4016 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4017 info("creating new local storage on %s for %s" %
4018 (tgt_node, dev.iv_name))
4019 # since we *always* want to create this LV, we use the
4020 # _Create...OnPrimary (which forces the creation), even if we
4021 # are talking about the secondary node
4022 for new_lv in new_lvs:
4023 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4024 _GetInstanceInfoText(instance)):
4025 raise errors.OpExecError("Failed to create new LV named '%s' on"
4027 (new_lv.logical_id[1], tgt_node))
4029 # Step: for each lv, detach+rename*2+attach
4030 self.proc.LogStep(4, steps_total, "change drbd configuration")
4031 for dev, old_lvs, new_lvs in iv_names.itervalues():
4032 info("detaching %s drbd from local storage" % dev.iv_name)
4033 if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4034 raise errors.OpExecError("Can't detach drbd from local storage on node"
4035 " %s for device %s" % (tgt_node, dev.iv_name))
4037 #cfg.Update(instance)
4039 # ok, we created the new LVs, so now we know we have the needed
4040 # storage; as such, we proceed on the target node to rename
4041 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4042 # using the assumption that logical_id == physical_id (which in
4043 # turn is the unique_id on that node)
4045 # FIXME(iustin): use a better name for the replaced LVs
4046 temp_suffix = int(time.time())
4047 ren_fn = lambda d, suff: (d.physical_id[0],
4048 d.physical_id[1] + "_replaced-%s" % suff)
4049 # build the rename list based on what LVs exist on the node
4051 for to_ren in old_lvs:
4052 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4053 if find_res is not None: # device exists
4054 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4056 info("renaming the old LVs on the target node")
4057 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4058 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4059 # now we rename the new LVs to the old LVs
4060 info("renaming the new LVs on the target node")
4061 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4062 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4063 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4065 for old, new in zip(old_lvs, new_lvs):
4066 new.logical_id = old.logical_id
4067 cfg.SetDiskID(new, tgt_node)
4069 for disk in old_lvs:
4070 disk.logical_id = ren_fn(disk, temp_suffix)
4071 cfg.SetDiskID(disk, tgt_node)
4073 # now that the new lvs have the old name, we can add them to the device
4074 info("adding new mirror component on %s" % tgt_node)
4075 if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4076 for new_lv in new_lvs:
4077 if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4078 warning("Can't rollback device %s", hint="manually cleanup unused"
4080 raise errors.OpExecError("Can't add local storage to drbd")
4082 dev.children = new_lvs
4083 cfg.Update(instance)
4085 # Step: wait for sync
4087 # this can fail as the old devices are degraded and _WaitForSync
4088 # does a combined result over all disks, so we don't check its
4090 self.proc.LogStep(5, steps_total, "sync devices")
4091 _WaitForSync(self, instance, unlock=True)
4093 # so check manually all the devices
4094 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4095 cfg.SetDiskID(dev, instance.primary_node)
4096 is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4098 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4100 # Step: remove old storage
4101 self.proc.LogStep(6, steps_total, "removing old storage")
4102 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4103 info("remove logical volumes for %s" % name)
4105 cfg.SetDiskID(lv, tgt_node)
4106 if not self.rpc.call_blockdev_remove(tgt_node, lv):
4107 warning("Can't remove old LV", hint="manually remove unused LVs")
4110 def _ExecD8Secondary(self, feedback_fn):
4111 """Replace the secondary node for drbd8.
4113 The algorithm for replace is quite complicated:
4114 - for all disks of the instance:
4115 - create new LVs on the new node with same names
4116 - shutdown the drbd device on the old secondary
4117 - disconnect the drbd network on the primary
4118 - create the drbd device on the new secondary
4119 - network attach the drbd on the primary, using an artifice:
4120 the drbd code for Attach() will connect to the network if it
4121 finds a device which is connected to the good local disks but
4123 - wait for sync across all devices
4124 - remove all disks from the old secondary
4126 Failures are not very well handled.
4130 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4131 instance = self.instance
4133 vgname = self.cfg.GetVGName()
4136 old_node = self.tgt_node
4137 new_node = self.new_node
4138 pri_node = instance.primary_node
4140 # Step: check device activation
4141 self.proc.LogStep(1, steps_total, "check device existence")
4142 info("checking volume groups")
4143 my_vg = cfg.GetVGName()
4144 results = self.rpc.call_vg_list([pri_node, new_node])
4146 raise errors.OpExecError("Can't list volume groups on the nodes")
4147 for node in pri_node, new_node:
4148 res = results.get(node, False)
4149 if not res or my_vg not in res:
4150 raise errors.OpExecError("Volume group '%s' not found on %s" %
4152 for dev in instance.disks:
4153 if not dev.iv_name in self.op.disks:
4155 info("checking %s on %s" % (dev.iv_name, pri_node))
4156 cfg.SetDiskID(dev, pri_node)
4157 if not self.rpc.call_blockdev_find(pri_node, dev):
4158 raise errors.OpExecError("Can't find device %s on node %s" %
4159 (dev.iv_name, pri_node))
4161 # Step: check other node consistency
4162 self.proc.LogStep(2, steps_total, "check peer consistency")
4163 for dev in instance.disks:
4164 if not dev.iv_name in self.op.disks:
4166 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4167 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4168 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4169 " unsafe to replace the secondary" %
4172 # Step: create new storage
4173 self.proc.LogStep(3, steps_total, "allocate new storage")
4174 for dev in instance.disks:
4176 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4177 # since we *always* want to create this LV, we use the
4178 # _Create...OnPrimary (which forces the creation), even if we
4179 # are talking about the secondary node
4180 for new_lv in dev.children:
4181 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4182 _GetInstanceInfoText(instance)):
4183 raise errors.OpExecError("Failed to create new LV named '%s' on"
4185 (new_lv.logical_id[1], new_node))
4188 # Step 4: dbrd minors and drbd setups changes
4189 # after this, we must manually remove the drbd minors on both the
4190 # error and the success paths
4191 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4193 logging.debug("Allocated minors %s" % (minors,))
4194 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4195 for dev, new_minor in zip(instance.disks, minors):
4197 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4198 # create new devices on new_node
4199 if pri_node == dev.logical_id[0]:
4200 new_logical_id = (pri_node, new_node,
4201 dev.logical_id[2], dev.logical_id[3], new_minor,
4204 new_logical_id = (new_node, pri_node,
4205 dev.logical_id[2], new_minor, dev.logical_id[4],
4207 iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4208 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4210 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4211 logical_id=new_logical_id,
4212 children=dev.children)
4213 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4215 _GetInstanceInfoText(instance)):
4216 self.cfg.ReleaseDRBDMinors(instance.name)
4217 raise errors.OpExecError("Failed to create new DRBD on"
4218 " node '%s'" % new_node)
4220 for dev in instance.disks:
4221 # we have new devices, shutdown the drbd on the old secondary
4222 info("shutting down drbd for %s on old node" % dev.iv_name)
4223 cfg.SetDiskID(dev, old_node)
4224 if not self.rpc.call_blockdev_shutdown(old_node, dev):
4225 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4226 hint="Please cleanup this device manually as soon as possible")
4228 info("detaching primary drbds from the network (=> standalone)")
4230 for dev in instance.disks:
4231 cfg.SetDiskID(dev, pri_node)
4232 # set the network part of the physical (unique in bdev terms) id
4233 # to None, meaning detach from network
4234 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4235 # and 'find' the device, which will 'fix' it to match the
4237 if self.rpc.call_blockdev_find(pri_node, dev):
4240 warning("Failed to detach drbd %s from network, unusual case" %
4244 # no detaches succeeded (very unlikely)
4245 self.cfg.ReleaseDRBDMinors(instance.name)
4246 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4248 # if we managed to detach at least one, we update all the disks of
4249 # the instance to point to the new secondary
4250 info("updating instance configuration")
4251 for dev, _, new_logical_id in iv_names.itervalues():
4252 dev.logical_id = new_logical_id
4253 cfg.SetDiskID(dev, pri_node)
4254 cfg.Update(instance)
4255 # we can remove now the temp minors as now the new values are
4256 # written to the config file (and therefore stable)
4257 self.cfg.ReleaseDRBDMinors(instance.name)
4259 # and now perform the drbd attach
4260 info("attaching primary drbds to new secondary (standalone => connected)")
4262 for dev in instance.disks:
4263 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4264 # since the attach is smart, it's enough to 'find' the device,
4265 # it will automatically activate the network, if the physical_id
4267 cfg.SetDiskID(dev, pri_node)
4268 logging.debug("Disk to attach: %s", dev)
4269 if not self.rpc.call_blockdev_find(pri_node, dev):
4270 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4271 "please do a gnt-instance info to see the status of disks")
4273 # this can fail as the old devices are degraded and _WaitForSync
4274 # does a combined result over all disks, so we don't check its
4276 self.proc.LogStep(5, steps_total, "sync devices")
4277 _WaitForSync(self, instance, unlock=True)
4279 # so check manually all the devices
4280 for name, (dev, old_lvs, _) in iv_names.iteritems():
4281 cfg.SetDiskID(dev, pri_node)
4282 is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4284 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4286 self.proc.LogStep(6, steps_total, "removing old storage")
4287 for name, (dev, old_lvs, _) in iv_names.iteritems():
4288 info("remove logical volumes for %s" % name)
4290 cfg.SetDiskID(lv, old_node)
4291 if not self.rpc.call_blockdev_remove(old_node, lv):
4292 warning("Can't remove LV on old secondary",
4293 hint="Cleanup stale volumes by hand")
4295 def Exec(self, feedback_fn):
4296 """Execute disk replacement.
4298 This dispatches the disk replacement to the appropriate handler.
4301 instance = self.instance
4303 # Activate the instance disks if we're replacing them on a down instance
4304 if instance.status == "down":
4305 _StartInstanceDisks(self, instance, True)
4307 if instance.disk_template == constants.DT_DRBD8:
4308 if self.op.remote_node is None:
4309 fn = self._ExecD8DiskOnly
4311 fn = self._ExecD8Secondary
4313 raise errors.ProgrammerError("Unhandled disk replacement case")
4315 ret = fn(feedback_fn)
4317 # Deactivate the instance disks if we're replacing them on a down instance
4318 if instance.status == "down":
4319 _SafeShutdownInstanceDisks(self, instance)
4324 class LUGrowDisk(LogicalUnit):
4325 """Grow a disk of an instance.
4329 HTYPE = constants.HTYPE_INSTANCE
4330 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4333 def ExpandNames(self):
4334 self._ExpandAndLockInstance()
4335 self.needed_locks[locking.LEVEL_NODE] = []
4336 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4338 def DeclareLocks(self, level):
4339 if level == locking.LEVEL_NODE:
4340 self._LockInstancesNodes()
4342 def BuildHooksEnv(self):
4345 This runs on the master, the primary and all the secondaries.
4349 "DISK": self.op.disk,
4350 "AMOUNT": self.op.amount,
4352 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4354 self.cfg.GetMasterNode(),
4355 self.instance.primary_node,
4359 def CheckPrereq(self):
4360 """Check prerequisites.
4362 This checks that the instance is in the cluster.
4365 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4366 assert instance is not None, \
4367 "Cannot retrieve locked instance %s" % self.op.instance_name
4369 self.instance = instance
4371 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4372 raise errors.OpPrereqError("Instance's disk layout does not support"
4375 if instance.FindDisk(self.op.disk) is None:
4376 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4377 (self.op.disk, instance.name))
4379 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4380 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4381 instance.hypervisor)
4382 for node in nodenames:
4383 info = nodeinfo.get(node, None)
4385 raise errors.OpPrereqError("Cannot get current information"
4386 " from node '%s'" % node)
4387 vg_free = info.get('vg_free', None)
4388 if not isinstance(vg_free, int):
4389 raise errors.OpPrereqError("Can't compute free disk space on"
4391 if self.op.amount > info['vg_free']:
4392 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4393 " %d MiB available, %d MiB required" %
4394 (node, info['vg_free'], self.op.amount))
4396 def Exec(self, feedback_fn):
4397 """Execute disk grow.
4400 instance = self.instance
4401 disk = instance.FindDisk(self.op.disk)
4402 for node in (instance.secondary_nodes + (instance.primary_node,)):
4403 self.cfg.SetDiskID(disk, node)
4404 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4405 if (not result or not isinstance(result, (list, tuple)) or
4407 raise errors.OpExecError("grow request failed to node %s" % node)
4409 raise errors.OpExecError("grow request failed to node %s: %s" %
4411 disk.RecordGrow(self.op.amount)
4412 self.cfg.Update(instance)
4413 if self.op.wait_for_sync:
4414 disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4416 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4417 " status.\nPlease check the instance.")
4420 class LUQueryInstanceData(NoHooksLU):
4421 """Query runtime instance data.
4424 _OP_REQP = ["instances", "static"]
4427 def ExpandNames(self):
4428 self.needed_locks = {}
4429 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4431 if not isinstance(self.op.instances, list):
4432 raise errors.OpPrereqError("Invalid argument type 'instances'")
4434 if self.op.instances:
4435 self.wanted_names = []
4436 for name in self.op.instances:
4437 full_name = self.cfg.ExpandInstanceName(name)
4438 if full_name is None:
4439 raise errors.OpPrereqError("Instance '%s' not known" %
4440 self.op.instance_name)
4441 self.wanted_names.append(full_name)
4442 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4444 self.wanted_names = None
4445 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4447 self.needed_locks[locking.LEVEL_NODE] = []
4448 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4450 def DeclareLocks(self, level):
4451 if level == locking.LEVEL_NODE:
4452 self._LockInstancesNodes()
4454 def CheckPrereq(self):
4455 """Check prerequisites.
4457 This only checks the optional instance list against the existing names.
4460 if self.wanted_names is None:
4461 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4463 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4464 in self.wanted_names]
4467 def _ComputeDiskStatus(self, instance, snode, dev):
4468 """Compute block device status.
4471 static = self.op.static
4473 self.cfg.SetDiskID(dev, instance.primary_node)
4474 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4478 if dev.dev_type in constants.LDS_DRBD:
4479 # we change the snode then (otherwise we use the one passed in)
4480 if dev.logical_id[0] == instance.primary_node:
4481 snode = dev.logical_id[1]
4483 snode = dev.logical_id[0]
4485 if snode and not static:
4486 self.cfg.SetDiskID(dev, snode)
4487 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4492 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4493 for child in dev.children]
4498 "iv_name": dev.iv_name,
4499 "dev_type": dev.dev_type,
4500 "logical_id": dev.logical_id,
4501 "physical_id": dev.physical_id,
4502 "pstatus": dev_pstatus,
4503 "sstatus": dev_sstatus,
4504 "children": dev_children,
4509 def Exec(self, feedback_fn):
4510 """Gather and return data"""
4513 cluster = self.cfg.GetClusterInfo()
4515 for instance in self.wanted_instances:
4516 if not self.op.static:
4517 remote_info = self.rpc.call_instance_info(instance.primary_node,
4519 instance.hypervisor)
4520 if remote_info and "state" in remote_info:
4523 remote_state = "down"
4526 if instance.status == "down":
4527 config_state = "down"
4531 disks = [self._ComputeDiskStatus(instance, None, device)
4532 for device in instance.disks]
4535 "name": instance.name,
4536 "config_state": config_state,
4537 "run_state": remote_state,
4538 "pnode": instance.primary_node,
4539 "snodes": instance.secondary_nodes,
4541 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4543 "hypervisor": instance.hypervisor,
4544 "network_port": instance.network_port,
4545 "hv_instance": instance.hvparams,
4546 "hv_actual": cluster.FillHV(instance),
4547 "be_instance": instance.beparams,
4548 "be_actual": cluster.FillBE(instance),
4551 result[instance.name] = idict
4556 class LUSetInstanceParams(LogicalUnit):
4557 """Modifies an instances's parameters.
4560 HPATH = "instance-modify"
4561 HTYPE = constants.HTYPE_INSTANCE
4562 _OP_REQP = ["instance_name", "hvparams"]
4565 def ExpandNames(self):
4566 self._ExpandAndLockInstance()
4567 self.needed_locks[locking.LEVEL_NODE] = []
4568 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4571 def DeclareLocks(self, level):
4572 if level == locking.LEVEL_NODE:
4573 self._LockInstancesNodes()
4575 def BuildHooksEnv(self):
4578 This runs on the master, primary and secondaries.
4582 if constants.BE_MEMORY in self.be_new:
4583 args['memory'] = self.be_new[constants.BE_MEMORY]
4584 if constants.BE_VCPUS in self.be_new:
4585 args['vcpus'] = self.be_new[constants.BE_VCPUS]
4586 if self.do_ip or self.do_bridge or self.mac:
4590 ip = self.instance.nics[0].ip
4592 bridge = self.bridge
4594 bridge = self.instance.nics[0].bridge
4598 mac = self.instance.nics[0].mac
4599 args['nics'] = [(ip, bridge, mac)]
4600 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4601 nl = [self.cfg.GetMasterNode(),
4602 self.instance.primary_node] + list(self.instance.secondary_nodes)
4605 def CheckPrereq(self):
4606 """Check prerequisites.
4608 This only checks the instance list against the existing names.
4611 # FIXME: all the parameters could be checked before, in ExpandNames, or in
4612 # a separate CheckArguments function, if we implement one, so the operation
4613 # can be aborted without waiting for any lock, should it have an error...
4614 self.ip = getattr(self.op, "ip", None)
4615 self.mac = getattr(self.op, "mac", None)
4616 self.bridge = getattr(self.op, "bridge", None)
4617 self.kernel_path = getattr(self.op, "kernel_path", None)
4618 self.initrd_path = getattr(self.op, "initrd_path", None)
4619 self.force = getattr(self.op, "force", None)
4620 all_parms = [self.ip, self.bridge, self.mac]
4621 if (all_parms.count(None) == len(all_parms) and
4622 not self.op.hvparams and
4623 not self.op.beparams):
4624 raise errors.OpPrereqError("No changes submitted")
4625 for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4626 val = self.op.beparams.get(item, None)
4630 except ValueError, err:
4631 raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4632 self.op.beparams[item] = val
4633 if self.ip is not None:
4635 if self.ip.lower() == "none":
4638 if not utils.IsValidIP(self.ip):
4639 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4642 self.do_bridge = (self.bridge is not None)
4643 if self.mac is not None:
4644 if self.cfg.IsMacInUse(self.mac):
4645 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4647 if not utils.IsValidMac(self.mac):
4648 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4650 # checking the new params on the primary/secondary nodes
4652 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4653 assert self.instance is not None, \
4654 "Cannot retrieve locked instance %s" % self.op.instance_name
4655 pnode = self.instance.primary_node
4657 nodelist.extend(instance.secondary_nodes)
4659 # hvparams processing
4660 if self.op.hvparams:
4661 i_hvdict = copy.deepcopy(instance.hvparams)
4662 for key, val in self.op.hvparams.iteritems():
4670 cluster = self.cfg.GetClusterInfo()
4671 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4674 hypervisor.GetHypervisor(
4675 instance.hypervisor).CheckParameterSyntax(hv_new)
4676 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4677 self.hv_new = hv_new # the new actual values
4678 self.hv_inst = i_hvdict # the new dict (without defaults)
4680 self.hv_new = self.hv_inst = {}
4682 # beparams processing
4683 if self.op.beparams:
4684 i_bedict = copy.deepcopy(instance.beparams)
4685 for key, val in self.op.beparams.iteritems():
4693 cluster = self.cfg.GetClusterInfo()
4694 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4696 self.be_new = be_new # the new actual values
4697 self.be_inst = i_bedict # the new dict (without defaults)
4699 self.hv_new = self.hv_inst = {}
4703 if constants.BE_MEMORY in self.op.beparams and not self.force:
4704 mem_check_list = [pnode]
4705 if be_new[constants.BE_AUTO_BALANCE]:
4706 # either we changed auto_balance to yes or it was from before
4707 mem_check_list.extend(instance.secondary_nodes)
4708 instance_info = self.rpc.call_instance_info(pnode, instance.name,
4709 instance.hypervisor)
4710 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4711 instance.hypervisor)
4713 if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4714 # Assume the primary node is unreachable and go ahead
4715 self.warn.append("Can't get info from primary node %s" % pnode)
4718 current_mem = instance_info['memory']
4720 # Assume instance not running
4721 # (there is a slight race condition here, but it's not very probable,
4722 # and we have no other way to check)
4724 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4725 nodeinfo[pnode]['memory_free'])
4727 raise errors.OpPrereqError("This change will prevent the instance"
4728 " from starting, due to %d MB of memory"
4729 " missing on its primary node" % miss_mem)
4731 if be_new[constants.BE_AUTO_BALANCE]:
4732 for node in instance.secondary_nodes:
4733 if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4734 self.warn.append("Can't get info from secondary node %s" % node)
4735 elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4736 self.warn.append("Not enough memory to failover instance to"
4737 " secondary node %s" % node)
4741 def Exec(self, feedback_fn):
4742 """Modifies an instance.
4744 All parameters take effect only at the next restart of the instance.
4746 # Process here the warnings from CheckPrereq, as we don't have a
4747 # feedback_fn there.
4748 for warn in self.warn:
4749 feedback_fn("WARNING: %s" % warn)
4752 instance = self.instance
4754 instance.nics[0].ip = self.ip
4755 result.append(("ip", self.ip))
4757 instance.nics[0].bridge = self.bridge
4758 result.append(("bridge", self.bridge))
4760 instance.nics[0].mac = self.mac
4761 result.append(("mac", self.mac))
4762 if self.op.hvparams:
4763 instance.hvparams = self.hv_new
4764 for key, val in self.op.hvparams.iteritems():
4765 result.append(("hv/%s" % key, val))
4766 if self.op.beparams:
4767 instance.beparams = self.be_inst
4768 for key, val in self.op.beparams.iteritems():
4769 result.append(("be/%s" % key, val))
4771 self.cfg.Update(instance)
4776 class LUQueryExports(NoHooksLU):
4777 """Query the exports list
4780 _OP_REQP = ['nodes']
4783 def ExpandNames(self):
4784 self.needed_locks = {}
4785 self.share_locks[locking.LEVEL_NODE] = 1
4786 if not self.op.nodes:
4787 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4789 self.needed_locks[locking.LEVEL_NODE] = \
4790 _GetWantedNodes(self, self.op.nodes)
4792 def CheckPrereq(self):
4793 """Check prerequisites.
4796 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4798 def Exec(self, feedback_fn):
4799 """Compute the list of all the exported system images.
4802 a dictionary with the structure node->(export-list)
4803 where export-list is a list of the instances exported on
4807 return self.rpc.call_export_list(self.nodes)
4810 class LUExportInstance(LogicalUnit):
4811 """Export an instance to an image in the cluster.
4814 HPATH = "instance-export"
4815 HTYPE = constants.HTYPE_INSTANCE
4816 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4819 def ExpandNames(self):
4820 self._ExpandAndLockInstance()
4821 # FIXME: lock only instance primary and destination node
4823 # Sad but true, for now we have do lock all nodes, as we don't know where
4824 # the previous export might be, and and in this LU we search for it and
4825 # remove it from its current node. In the future we could fix this by:
4826 # - making a tasklet to search (share-lock all), then create the new one,
4827 # then one to remove, after
4828 # - removing the removal operation altoghether
4829 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4831 def DeclareLocks(self, level):
4832 """Last minute lock declaration."""
4833 # All nodes are locked anyway, so nothing to do here.
4835 def BuildHooksEnv(self):
4838 This will run on the master, primary node and target node.
4842 "EXPORT_NODE": self.op.target_node,
4843 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4845 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4846 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4847 self.op.target_node]
4850 def CheckPrereq(self):
4851 """Check prerequisites.
4853 This checks that the instance and node names are valid.
4856 instance_name = self.op.instance_name
4857 self.instance = self.cfg.GetInstanceInfo(instance_name)
4858 assert self.instance is not None, \
4859 "Cannot retrieve locked instance %s" % self.op.instance_name
4861 self.dst_node = self.cfg.GetNodeInfo(
4862 self.cfg.ExpandNodeName(self.op.target_node))
4864 assert self.dst_node is not None, \
4865 "Cannot retrieve locked node %s" % self.op.target_node
4867 # instance disk type verification
4868 for disk in self.instance.disks:
4869 if disk.dev_type == constants.LD_FILE:
4870 raise errors.OpPrereqError("Export not supported for instances with"
4871 " file-based disks")
4873 def Exec(self, feedback_fn):
4874 """Export an instance to an image in the cluster.
4877 instance = self.instance
4878 dst_node = self.dst_node
4879 src_node = instance.primary_node
4880 if self.op.shutdown:
4881 # shutdown the instance, but not the disks
4882 if not self.rpc.call_instance_shutdown(src_node, instance):
4883 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4884 (instance.name, src_node))
4886 vgname = self.cfg.GetVGName()
4891 for disk in instance.disks:
4892 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4893 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4895 if not new_dev_name:
4896 self.LogWarning("Could not snapshot block device %s on node %s",
4897 disk.logical_id[1], src_node)
4898 snap_disks.append(False)
4900 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4901 logical_id=(vgname, new_dev_name),
4902 physical_id=(vgname, new_dev_name),
4903 iv_name=disk.iv_name)
4904 snap_disks.append(new_dev)
4907 if self.op.shutdown and instance.status == "up":
4908 if not self.rpc.call_instance_start(src_node, instance, None):
4909 _ShutdownInstanceDisks(self, instance)
4910 raise errors.OpExecError("Could not start instance")
4912 # TODO: check for size
4914 cluster_name = self.cfg.GetClusterName()
4915 for dev in snap_disks:
4917 if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4918 instance, cluster_name):
4919 self.LogWarning("Could not export block device %s from node %s to"
4920 " node %s", dev.logical_id[1], src_node,
4922 if not self.rpc.call_blockdev_remove(src_node, dev):
4923 self.LogWarning("Could not remove snapshot block device %s from node"
4924 " %s", dev.logical_id[1], src_node)
4926 if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4927 self.LogWarning("Could not finalize export for instance %s on node %s",
4928 instance.name, dst_node.name)
4930 nodelist = self.cfg.GetNodeList()
4931 nodelist.remove(dst_node.name)
4933 # on one-node clusters nodelist will be empty after the removal
4934 # if we proceed the backup would be removed because OpQueryExports
4935 # substitutes an empty list with the full cluster node list.
4937 exportlist = self.rpc.call_export_list(nodelist)
4938 for node in exportlist:
4939 if instance.name in exportlist[node]:
4940 if not self.rpc.call_export_remove(node, instance.name):
4941 self.LogWarning("Could not remove older export for instance %s"
4942 " on node %s", instance.name, node)
4945 class LURemoveExport(NoHooksLU):
4946 """Remove exports related to the named instance.
4949 _OP_REQP = ["instance_name"]
4952 def ExpandNames(self):
4953 self.needed_locks = {}
4954 # We need all nodes to be locked in order for RemoveExport to work, but we
4955 # don't need to lock the instance itself, as nothing will happen to it (and
4956 # we can remove exports also for a removed instance)
4957 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4959 def CheckPrereq(self):
4960 """Check prerequisites.
4964 def Exec(self, feedback_fn):
4965 """Remove any export.
4968 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4969 # If the instance was not found we'll try with the name that was passed in.
4970 # This will only work if it was an FQDN, though.
4972 if not instance_name:
4974 instance_name = self.op.instance_name
4976 exportlist = self.rpc.call_export_list(self.acquired_locks[
4977 locking.LEVEL_NODE])
4979 for node in exportlist:
4980 if instance_name in exportlist[node]:
4982 if not self.rpc.call_export_remove(node, instance_name):
4983 logging.error("Could not remove export for instance %s"
4984 " on node %s", instance_name, node)
4986 if fqdn_warn and not found:
4987 feedback_fn("Export not found. If trying to remove an export belonging"
4988 " to a deleted instance please use its Fully Qualified"
4992 class TagsLU(NoHooksLU):
4995 This is an abstract class which is the parent of all the other tags LUs.
4999 def ExpandNames(self):
5000 self.needed_locks = {}
5001 if self.op.kind == constants.TAG_NODE:
5002 name = self.cfg.ExpandNodeName(self.op.name)
5004 raise errors.OpPrereqError("Invalid node name (%s)" %
5007 self.needed_locks[locking.LEVEL_NODE] = name
5008 elif self.op.kind == constants.TAG_INSTANCE:
5009 name = self.cfg.ExpandInstanceName(self.op.name)
5011 raise errors.OpPrereqError("Invalid instance name (%s)" %
5014 self.needed_locks[locking.LEVEL_INSTANCE] = name
5016 def CheckPrereq(self):
5017 """Check prerequisites.
5020 if self.op.kind == constants.TAG_CLUSTER:
5021 self.target = self.cfg.GetClusterInfo()
5022 elif self.op.kind == constants.TAG_NODE:
5023 self.target = self.cfg.GetNodeInfo(self.op.name)
5024 elif self.op.kind == constants.TAG_INSTANCE:
5025 self.target = self.cfg.GetInstanceInfo(self.op.name)
5027 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5031 class LUGetTags(TagsLU):
5032 """Returns the tags of a given object.
5035 _OP_REQP = ["kind", "name"]
5038 def Exec(self, feedback_fn):
5039 """Returns the tag list.
5042 return list(self.target.GetTags())
5045 class LUSearchTags(NoHooksLU):
5046 """Searches the tags for a given pattern.
5049 _OP_REQP = ["pattern"]
5052 def ExpandNames(self):
5053 self.needed_locks = {}
5055 def CheckPrereq(self):
5056 """Check prerequisites.
5058 This checks the pattern passed for validity by compiling it.
5062 self.re = re.compile(self.op.pattern)
5063 except re.error, err:
5064 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5065 (self.op.pattern, err))
5067 def Exec(self, feedback_fn):
5068 """Returns the tag list.
5072 tgts = [("/cluster", cfg.GetClusterInfo())]
5073 ilist = cfg.GetAllInstancesInfo().values()
5074 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5075 nlist = cfg.GetAllNodesInfo().values()
5076 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5078 for path, target in tgts:
5079 for tag in target.GetTags():
5080 if self.re.search(tag):
5081 results.append((path, tag))
5085 class LUAddTags(TagsLU):
5086 """Sets a tag on a given object.
5089 _OP_REQP = ["kind", "name", "tags"]
5092 def CheckPrereq(self):
5093 """Check prerequisites.
5095 This checks the type and length of the tag name and value.
5098 TagsLU.CheckPrereq(self)
5099 for tag in self.op.tags:
5100 objects.TaggableObject.ValidateTag(tag)
5102 def Exec(self, feedback_fn):
5107 for tag in self.op.tags:
5108 self.target.AddTag(tag)
5109 except errors.TagError, err:
5110 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5112 self.cfg.Update(self.target)
5113 except errors.ConfigurationError:
5114 raise errors.OpRetryError("There has been a modification to the"
5115 " config file and the operation has been"
5116 " aborted. Please retry.")
5119 class LUDelTags(TagsLU):
5120 """Delete a list of tags from a given object.
5123 _OP_REQP = ["kind", "name", "tags"]
5126 def CheckPrereq(self):
5127 """Check prerequisites.
5129 This checks that we have the given tag.
5132 TagsLU.CheckPrereq(self)
5133 for tag in self.op.tags:
5134 objects.TaggableObject.ValidateTag(tag)
5135 del_tags = frozenset(self.op.tags)
5136 cur_tags = self.target.GetTags()
5137 if not del_tags <= cur_tags:
5138 diff_tags = del_tags - cur_tags
5139 diff_names = ["'%s'" % tag for tag in diff_tags]
5141 raise errors.OpPrereqError("Tag(s) %s not found" %
5142 (",".join(diff_names)))
5144 def Exec(self, feedback_fn):
5145 """Remove the tag from the object.
5148 for tag in self.op.tags:
5149 self.target.RemoveTag(tag)
5151 self.cfg.Update(self.target)
5152 except errors.ConfigurationError:
5153 raise errors.OpRetryError("There has been a modification to the"
5154 " config file and the operation has been"
5155 " aborted. Please retry.")
5158 class LUTestDelay(NoHooksLU):
5159 """Sleep for a specified amount of time.
5161 This LU sleeps on the master and/or nodes for a specified amount of
5165 _OP_REQP = ["duration", "on_master", "on_nodes"]
5168 def ExpandNames(self):
5169 """Expand names and set required locks.
5171 This expands the node list, if any.
5174 self.needed_locks = {}
5175 if self.op.on_nodes:
5176 # _GetWantedNodes can be used here, but is not always appropriate to use
5177 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5179 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5180 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5182 def CheckPrereq(self):
5183 """Check prerequisites.
5187 def Exec(self, feedback_fn):
5188 """Do the actual sleep.
5191 if self.op.on_master:
5192 if not utils.TestDelay(self.op.duration):
5193 raise errors.OpExecError("Error during master delay test")
5194 if self.op.on_nodes:
5195 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5197 raise errors.OpExecError("Complete failure from rpc call")
5198 for node, node_result in result.items():
5200 raise errors.OpExecError("Failure during rpc call to node %s,"
5201 " result: %s" % (node, node_result))
5204 class IAllocator(object):
5205 """IAllocator framework.
5207 An IAllocator instance has three sets of attributes:
5208 - cfg that is needed to query the cluster
5209 - input data (all members of the _KEYS class attribute are required)
5210 - four buffer attributes (in|out_data|text), that represent the
5211 input (to the external script) in text and data structure format,
5212 and the output from it, again in two formats
5213 - the result variables from the script (success, info, nodes) for
5218 "mem_size", "disks", "disk_template",
5219 "os", "tags", "nics", "vcpus",
5225 def __init__(self, lu, mode, name, **kwargs):
5227 # init buffer variables
5228 self.in_text = self.out_text = self.in_data = self.out_data = None
5229 # init all input fields so that pylint is happy
5232 self.mem_size = self.disks = self.disk_template = None
5233 self.os = self.tags = self.nics = self.vcpus = None
5234 self.relocate_from = None
5236 self.required_nodes = None
5237 # init result fields
5238 self.success = self.info = self.nodes = None
5239 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5240 keyset = self._ALLO_KEYS
5241 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5242 keyset = self._RELO_KEYS
5244 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5245 " IAllocator" % self.mode)
5247 if key not in keyset:
5248 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5249 " IAllocator" % key)
5250 setattr(self, key, kwargs[key])
5252 if key not in kwargs:
5253 raise errors.ProgrammerError("Missing input parameter '%s' to"
5254 " IAllocator" % key)
5255 self._BuildInputData()
5257 def _ComputeClusterData(self):
5258 """Compute the generic allocator input data.
5260 This is the data that is independent of the actual operation.
5264 cluster_info = cfg.GetClusterInfo()
5268 "cluster_name": cfg.GetClusterName(),
5269 "cluster_tags": list(cluster_info.GetTags()),
5270 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5271 # we don't have job IDs
5275 cluster = self.cfg.GetClusterInfo()
5276 for iname in cfg.GetInstanceList():
5277 i_obj = cfg.GetInstanceInfo(iname)
5278 i_list.append((i_obj, cluster.FillBE(i_obj)))
5282 node_list = cfg.GetNodeList()
5283 # FIXME: here we have only one hypervisor information, but
5284 # instance can belong to different hypervisors
5285 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5286 cfg.GetHypervisorType())
5287 for nname in node_list:
5288 ninfo = cfg.GetNodeInfo(nname)
5289 if nname not in node_data or not isinstance(node_data[nname], dict):
5290 raise errors.OpExecError("Can't get data for node %s" % nname)
5291 remote_info = node_data[nname]
5292 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5293 'vg_size', 'vg_free', 'cpu_total']:
5294 if attr not in remote_info:
5295 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5298 remote_info[attr] = int(remote_info[attr])
5299 except ValueError, err:
5300 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5301 " %s" % (nname, attr, str(err)))
5302 # compute memory used by primary instances
5303 i_p_mem = i_p_up_mem = 0
5304 for iinfo, beinfo in i_list:
5305 if iinfo.primary_node == nname:
5306 i_p_mem += beinfo[constants.BE_MEMORY]
5307 if iinfo.status == "up":
5308 i_p_up_mem += beinfo[constants.BE_MEMORY]
5310 # compute memory used by instances
5312 "tags": list(ninfo.GetTags()),
5313 "total_memory": remote_info['memory_total'],
5314 "reserved_memory": remote_info['memory_dom0'],
5315 "free_memory": remote_info['memory_free'],
5316 "i_pri_memory": i_p_mem,
5317 "i_pri_up_memory": i_p_up_mem,
5318 "total_disk": remote_info['vg_size'],
5319 "free_disk": remote_info['vg_free'],
5320 "primary_ip": ninfo.primary_ip,
5321 "secondary_ip": ninfo.secondary_ip,
5322 "total_cpus": remote_info['cpu_total'],
5324 node_results[nname] = pnr
5325 data["nodes"] = node_results
5329 for iinfo, beinfo in i_list:
5330 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5331 for n in iinfo.nics]
5333 "tags": list(iinfo.GetTags()),
5334 "should_run": iinfo.status == "up",
5335 "vcpus": beinfo[constants.BE_VCPUS],
5336 "memory": beinfo[constants.BE_MEMORY],
5338 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5340 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5341 "disk_template": iinfo.disk_template,
5342 "hypervisor": iinfo.hypervisor,
5344 instance_data[iinfo.name] = pir
5346 data["instances"] = instance_data
5350 def _AddNewInstance(self):
5351 """Add new instance data to allocator structure.
5353 This in combination with _AllocatorGetClusterData will create the
5354 correct structure needed as input for the allocator.
5356 The checks for the completeness of the opcode must have already been
5361 if len(self.disks) != 2:
5362 raise errors.OpExecError("Only two-disk configurations supported")
5364 disk_space = _ComputeDiskSize(self.disk_template,
5365 self.disks[0]["size"], self.disks[1]["size"])
5367 if self.disk_template in constants.DTS_NET_MIRROR:
5368 self.required_nodes = 2
5370 self.required_nodes = 1
5374 "disk_template": self.disk_template,
5377 "vcpus": self.vcpus,
5378 "memory": self.mem_size,
5379 "disks": self.disks,
5380 "disk_space_total": disk_space,
5382 "required_nodes": self.required_nodes,
5384 data["request"] = request
5386 def _AddRelocateInstance(self):
5387 """Add relocate instance data to allocator structure.
5389 This in combination with _IAllocatorGetClusterData will create the
5390 correct structure needed as input for the allocator.
5392 The checks for the completeness of the opcode must have already been
5396 instance = self.lu.cfg.GetInstanceInfo(self.name)
5397 if instance is None:
5398 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5399 " IAllocator" % self.name)
5401 if instance.disk_template not in constants.DTS_NET_MIRROR:
5402 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5404 if len(instance.secondary_nodes) != 1:
5405 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5407 self.required_nodes = 1
5409 disk_space = _ComputeDiskSize(instance.disk_template,
5410 instance.disks[0].size,
5411 instance.disks[1].size)
5416 "disk_space_total": disk_space,
5417 "required_nodes": self.required_nodes,
5418 "relocate_from": self.relocate_from,
5420 self.in_data["request"] = request
5422 def _BuildInputData(self):
5423 """Build input data structures.
5426 self._ComputeClusterData()
5428 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5429 self._AddNewInstance()
5431 self._AddRelocateInstance()
5433 self.in_text = serializer.Dump(self.in_data)
5435 def Run(self, name, validate=True, call_fn=None):
5436 """Run an instance allocator and return the results.
5440 call_fn = self.lu.rpc.call_iallocator_runner
5443 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5445 if not isinstance(result, (list, tuple)) or len(result) != 4:
5446 raise errors.OpExecError("Invalid result from master iallocator runner")
5448 rcode, stdout, stderr, fail = result
5450 if rcode == constants.IARUN_NOTFOUND:
5451 raise errors.OpExecError("Can't find allocator '%s'" % name)
5452 elif rcode == constants.IARUN_FAILURE:
5453 raise errors.OpExecError("Instance allocator call failed: %s,"
5454 " output: %s" % (fail, stdout+stderr))
5455 self.out_text = stdout
5457 self._ValidateResult()
5459 def _ValidateResult(self):
5460 """Process the allocator results.
5462 This will process and if successful save the result in
5463 self.out_data and the other parameters.
5467 rdict = serializer.Load(self.out_text)
5468 except Exception, err:
5469 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5471 if not isinstance(rdict, dict):
5472 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5474 for key in "success", "info", "nodes":
5475 if key not in rdict:
5476 raise errors.OpExecError("Can't parse iallocator results:"
5477 " missing key '%s'" % key)
5478 setattr(self, key, rdict[key])
5480 if not isinstance(rdict["nodes"], list):
5481 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5483 self.out_data = rdict
5486 class LUTestAllocator(NoHooksLU):
5487 """Run allocator tests.
5489 This LU runs the allocator tests
5492 _OP_REQP = ["direction", "mode", "name"]
5494 def CheckPrereq(self):
5495 """Check prerequisites.
5497 This checks the opcode parameters depending on the director and mode test.
5500 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5501 for attr in ["name", "mem_size", "disks", "disk_template",
5502 "os", "tags", "nics", "vcpus"]:
5503 if not hasattr(self.op, attr):
5504 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5506 iname = self.cfg.ExpandInstanceName(self.op.name)
5507 if iname is not None:
5508 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5510 if not isinstance(self.op.nics, list):
5511 raise errors.OpPrereqError("Invalid parameter 'nics'")
5512 for row in self.op.nics:
5513 if (not isinstance(row, dict) or
5516 "bridge" not in row):
5517 raise errors.OpPrereqError("Invalid contents of the"
5518 " 'nics' parameter")
5519 if not isinstance(self.op.disks, list):
5520 raise errors.OpPrereqError("Invalid parameter 'disks'")
5521 if len(self.op.disks) != 2:
5522 raise errors.OpPrereqError("Only two-disk configurations supported")
5523 for row in self.op.disks:
5524 if (not isinstance(row, dict) or
5525 "size" not in row or
5526 not isinstance(row["size"], int) or
5527 "mode" not in row or
5528 row["mode"] not in ['r', 'w']):
5529 raise errors.OpPrereqError("Invalid contents of the"
5530 " 'disks' parameter")
5531 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5532 if not hasattr(self.op, "name"):
5533 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5534 fname = self.cfg.ExpandInstanceName(self.op.name)
5536 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5538 self.op.name = fname
5539 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5541 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5544 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5545 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5546 raise errors.OpPrereqError("Missing allocator name")
5547 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5548 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5551 def Exec(self, feedback_fn):
5552 """Run the allocator test.
5555 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5556 ial = IAllocator(self,
5559 mem_size=self.op.mem_size,
5560 disks=self.op.disks,
5561 disk_template=self.op.disk_template,
5565 vcpus=self.op.vcpus,
5568 ial = IAllocator(self,
5571 relocate_from=list(self.relocate_from),
5574 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5575 result = ial.in_text
5577 ial.Run(self.op.allocator, validate=False)
5578 result = ial.out_text