4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement ExpandNames
52 - implement CheckPrereq
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements:
57 REQ_MASTER: the LU needs to run on the master node
58 REQ_WSSTORE: the LU needs a writable SimpleStore
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
71 def __init__(self, processor, op, context, sstore):
72 """Constructor for LogicalUnit.
74 This needs to be overriden in derived classes in order to check op
80 self.cfg = context.cfg
82 self.context = context
83 self.needed_locks = None
84 self.acquired_locks = {}
85 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 for attr_name in self._OP_REQP:
91 attr_val = getattr(op, attr_name, None)
93 raise errors.OpPrereqError("Required parameter '%s' missing" %
96 if not self.cfg.IsCluster():
97 raise errors.OpPrereqError("Cluster not initialized yet,"
98 " use 'gnt-cluster init' first.")
100 master = sstore.GetMasterNode()
101 if master != utils.HostInfo().name:
102 raise errors.OpPrereqError("Commands must be run on the master"
106 """Returns the SshRunner object
110 self.__ssh = ssh.SshRunner(self.sstore)
113 ssh = property(fget=__GetSSH)
115 def ExpandNames(self):
116 """Expand names for this LU.
118 This method is called before starting to execute the opcode, and it should
119 update all the parameters of the opcode to their canonical form (e.g. a
120 short node name must be fully expanded after this method has successfully
121 completed). This way locking, hooks, logging, ecc. can work correctly.
123 LUs which implement this method must also populate the self.needed_locks
124 member, as a dict with lock levels as keys, and a list of needed lock names
126 - Use an empty dict if you don't need any lock
127 - If you don't need any lock at a particular level omit that level
128 - Don't put anything for the BGL level
129 - If you want all locks at a level use None as a value
130 (this reflects what LockSet does, and will be replaced before
131 CheckPrereq with the full list of nodes that have been locked)
133 If you need to share locks (rather than acquire them exclusively) at one
134 level you can modify self.share_locks, setting a true value (usually 1) for
135 that level. By default locks are not shared.
138 # Acquire all nodes and one instance
139 self.needed_locks = {
140 locking.LEVEL_NODE: None,
141 locking.LEVEL_INSTANCES: ['instance1.example.tld'],
143 # Acquire just two nodes
144 self.needed_locks = {
145 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148 self.needed_locks = {} # No, you can't leave it to the default value None
151 # The implementation of this method is mandatory only if the new LU is
152 # concurrent, so that old LUs don't need to be changed all at the same
155 self.needed_locks = {} # Exclusive LUs don't need locks.
157 raise NotImplementedError
159 def DeclareLocks(self, level):
160 """Declare LU locking needs for a level
162 While most LUs can just declare their locking needs at ExpandNames time,
163 sometimes there's the need to calculate some locks after having acquired
164 the ones before. This function is called just before acquiring locks at a
165 particular level, but after acquiring the ones at lower levels, and permits
166 such calculations. It can be used to modify self.needed_locks, and by
167 default it does nothing.
169 This function is only called if you have something already set in
170 self.needed_locks for the level.
172 @param level: Locking level which is going to be locked
173 @type level: member of ganeti.locking.LEVELS
177 def CheckPrereq(self):
178 """Check prerequisites for this LU.
180 This method should check that the prerequisites for the execution
181 of this LU are fulfilled. It can do internode communication, but
182 it should be idempotent - no cluster or system changes are
185 The method should raise errors.OpPrereqError in case something is
186 not fulfilled. Its return value is ignored.
188 This method should also update all the parameters of the opcode to
189 their canonical form if it hasn't been done by ExpandNames before.
192 raise NotImplementedError
194 def Exec(self, feedback_fn):
197 This method should implement the actual work. It should raise
198 errors.OpExecError for failures that are somewhat dealt with in
202 raise NotImplementedError
204 def BuildHooksEnv(self):
205 """Build hooks environment for this LU.
207 This method should return a three-node tuple consisting of: a dict
208 containing the environment that will be used for running the
209 specific hook for this LU, a list of node names on which the hook
210 should run before the execution, and a list of node names on which
211 the hook should run after the execution.
213 The keys of the dict must not have 'GANETI_' prefixed as this will
214 be handled in the hooks runner. Also note additional keys will be
215 added by the hooks runner. If the LU doesn't define any
216 environment, an empty dict (and not None) should be returned.
218 No nodes should be returned as an empty list (and not None).
220 Note that if the HPATH for a LU class is None, this function will
224 raise NotImplementedError
226 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227 """Notify the LU about the results of its hooks.
229 This method is called every time a hooks phase is executed, and notifies
230 the Logical Unit about the hooks' result. The LU can then use it to alter
231 its result based on the hooks. By default the method does nothing and the
232 previous result is passed back unchanged but any LU can define it if it
233 wants to use the local cluster hook-scripts somehow.
236 phase: the hooks phase that has just been run
237 hooks_results: the results of the multi-node hooks rpc call
238 feedback_fn: function to send feedback back to the caller
239 lu_result: the previous result this LU had, or None in the PRE phase.
244 def _ExpandAndLockInstance(self):
245 """Helper function to expand and lock an instance.
247 Many LUs that work on an instance take its name in self.op.instance_name
248 and need to expand it and then declare the expanded name for locking. This
249 function does it, and then updates self.op.instance_name to the expanded
250 name. It also initializes needed_locks as a dict, if this hasn't been done
254 if self.needed_locks is None:
255 self.needed_locks = {}
257 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258 "_ExpandAndLockInstance called with instance-level locks set"
259 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260 if expanded_name is None:
261 raise errors.OpPrereqError("Instance '%s' not known" %
262 self.op.instance_name)
263 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264 self.op.instance_name = expanded_name
266 def _LockInstancesNodes(self):
267 """Helper function to declare instances' nodes for locking.
269 This function should be called after locking one or more instances to lock
270 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271 with all primary or secondary nodes for instances already locked and
272 present in self.needed_locks[locking.LEVEL_INSTANCE].
274 It should be called from DeclareLocks, and for safety only works if
275 self.recalculate_locks[locking.LEVEL_NODE] is set.
277 In the future it may grow parameters to just lock some instance's nodes, or
278 to just lock primaries or secondary nodes, if needed.
280 If should be called in DeclareLocks in a way similar to:
282 if level == locking.LEVEL_NODE:
283 self._LockInstancesNodes()
286 assert locking.LEVEL_NODE in self.recalculate_locks, \
287 "_LockInstancesNodes helper function called with no nodes to recalculate"
289 # TODO: check if we're really been called with the instance locks held
291 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
292 # future we might want to have different behaviors depending on the value
293 # of self.recalculate_locks[locking.LEVEL_NODE]
295 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
296 instance = self.context.cfg.GetInstanceInfo(instance_name)
297 wanted_nodes.append(instance.primary_node)
298 wanted_nodes.extend(instance.secondary_nodes)
299 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
301 del self.recalculate_locks[locking.LEVEL_NODE]
304 class NoHooksLU(LogicalUnit):
305 """Simple LU which runs no hooks.
307 This LU is intended as a parent for other LogicalUnits which will
308 run no hooks, in order to reduce duplicate code.
315 def _GetWantedNodes(lu, nodes):
316 """Returns list of checked and expanded node names.
319 nodes: List of nodes (strings) or None for all
322 if not isinstance(nodes, list):
323 raise errors.OpPrereqError("Invalid argument type 'nodes'")
329 node = lu.cfg.ExpandNodeName(name)
331 raise errors.OpPrereqError("No such node name '%s'" % name)
335 wanted = lu.cfg.GetNodeList()
336 return utils.NiceSort(wanted)
339 def _GetWantedInstances(lu, instances):
340 """Returns list of checked and expanded instance names.
343 instances: List of instances (strings) or None for all
346 if not isinstance(instances, list):
347 raise errors.OpPrereqError("Invalid argument type 'instances'")
352 for name in instances:
353 instance = lu.cfg.ExpandInstanceName(name)
355 raise errors.OpPrereqError("No such instance name '%s'" % name)
356 wanted.append(instance)
359 wanted = lu.cfg.GetInstanceList()
360 return utils.NiceSort(wanted)
363 def _CheckOutputFields(static, dynamic, selected):
364 """Checks whether all selected fields are valid.
367 static: Static fields
368 dynamic: Dynamic fields
371 static_fields = frozenset(static)
372 dynamic_fields = frozenset(dynamic)
374 all_fields = static_fields | dynamic_fields
376 if not all_fields.issuperset(selected):
377 raise errors.OpPrereqError("Unknown output fields selected: %s"
378 % ",".join(frozenset(selected).
379 difference(all_fields)))
382 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
383 memory, vcpus, nics):
384 """Builds instance related env variables for hooks from single variables.
387 secondary_nodes: List of secondary nodes as strings
391 "INSTANCE_NAME": name,
392 "INSTANCE_PRIMARY": primary_node,
393 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
394 "INSTANCE_OS_TYPE": os_type,
395 "INSTANCE_STATUS": status,
396 "INSTANCE_MEMORY": memory,
397 "INSTANCE_VCPUS": vcpus,
401 nic_count = len(nics)
402 for idx, (ip, bridge, mac) in enumerate(nics):
405 env["INSTANCE_NIC%d_IP" % idx] = ip
406 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
407 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
411 env["INSTANCE_NIC_COUNT"] = nic_count
416 def _BuildInstanceHookEnvByObject(instance, override=None):
417 """Builds instance related env variables for hooks from an object.
420 instance: objects.Instance object of instance
421 override: dict of values to override
424 'name': instance.name,
425 'primary_node': instance.primary_node,
426 'secondary_nodes': instance.secondary_nodes,
427 'os_type': instance.os,
428 'status': instance.os,
429 'memory': instance.memory,
430 'vcpus': instance.vcpus,
431 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
434 args.update(override)
435 return _BuildInstanceHookEnv(**args)
438 def _CheckInstanceBridgesExist(instance):
439 """Check that the brigdes needed by an instance exist.
442 # check bridges existance
443 brlist = [nic.bridge for nic in instance.nics]
444 if not rpc.call_bridges_exist(instance.primary_node, brlist):
445 raise errors.OpPrereqError("one or more target bridges %s does not"
446 " exist on destination node '%s'" %
447 (brlist, instance.primary_node))
450 class LUDestroyCluster(NoHooksLU):
451 """Logical unit for destroying the cluster.
456 def CheckPrereq(self):
457 """Check prerequisites.
459 This checks whether the cluster is empty.
461 Any errors are signalled by raising errors.OpPrereqError.
464 master = self.sstore.GetMasterNode()
466 nodelist = self.cfg.GetNodeList()
467 if len(nodelist) != 1 or nodelist[0] != master:
468 raise errors.OpPrereqError("There are still %d node(s) in"
469 " this cluster." % (len(nodelist) - 1))
470 instancelist = self.cfg.GetInstanceList()
472 raise errors.OpPrereqError("There are still %d instance(s) in"
473 " this cluster." % len(instancelist))
475 def Exec(self, feedback_fn):
476 """Destroys the cluster.
479 master = self.sstore.GetMasterNode()
480 if not rpc.call_node_stop_master(master, False):
481 raise errors.OpExecError("Could not disable the master role")
482 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
483 utils.CreateBackup(priv_key)
484 utils.CreateBackup(pub_key)
488 class LUVerifyCluster(LogicalUnit):
489 """Verifies the cluster status.
492 HPATH = "cluster-verify"
493 HTYPE = constants.HTYPE_CLUSTER
494 _OP_REQP = ["skip_checks"]
496 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
497 remote_version, feedback_fn):
498 """Run multiple tests against a node.
501 - compares ganeti version
502 - checks vg existance and size > 20G
503 - checks config file checksum
504 - checks ssh to other nodes
507 node: name of the node to check
508 file_list: required list of files
509 local_cksum: dictionary of local files and their checksums
512 # compares ganeti version
513 local_version = constants.PROTOCOL_VERSION
514 if not remote_version:
515 feedback_fn(" - ERROR: connection to %s failed" % (node))
518 if local_version != remote_version:
519 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
520 (local_version, node, remote_version))
523 # checks vg existance and size > 20G
527 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
531 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
532 constants.MIN_VG_SIZE)
534 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
537 # checks config file checksum
540 if 'filelist' not in node_result:
542 feedback_fn(" - ERROR: node hasn't returned file checksum data")
544 remote_cksum = node_result['filelist']
545 for file_name in file_list:
546 if file_name not in remote_cksum:
548 feedback_fn(" - ERROR: file '%s' missing" % file_name)
549 elif remote_cksum[file_name] != local_cksum[file_name]:
551 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
553 if 'nodelist' not in node_result:
555 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
557 if node_result['nodelist']:
559 for node in node_result['nodelist']:
560 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
561 (node, node_result['nodelist'][node]))
562 if 'node-net-test' not in node_result:
564 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
566 if node_result['node-net-test']:
568 nlist = utils.NiceSort(node_result['node-net-test'].keys())
570 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
571 (node, node_result['node-net-test'][node]))
573 hyp_result = node_result.get('hypervisor', None)
574 if hyp_result is not None:
575 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
578 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
579 node_instance, feedback_fn):
580 """Verify an instance.
582 This function checks to see if the required block devices are
583 available on the instance's node.
588 node_current = instanceconfig.primary_node
591 instanceconfig.MapLVsByNode(node_vol_should)
593 for node in node_vol_should:
594 for volume in node_vol_should[node]:
595 if node not in node_vol_is or volume not in node_vol_is[node]:
596 feedback_fn(" - ERROR: volume %s missing on node %s" %
600 if not instanceconfig.status == 'down':
601 if (node_current not in node_instance or
602 not instance in node_instance[node_current]):
603 feedback_fn(" - ERROR: instance %s not running on node %s" %
604 (instance, node_current))
607 for node in node_instance:
608 if (not node == node_current):
609 if instance in node_instance[node]:
610 feedback_fn(" - ERROR: instance %s should not run on node %s" %
616 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
617 """Verify if there are any unknown volumes in the cluster.
619 The .os, .swap and backup volumes are ignored. All other volumes are
625 for node in node_vol_is:
626 for volume in node_vol_is[node]:
627 if node not in node_vol_should or volume not in node_vol_should[node]:
628 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
633 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
634 """Verify the list of running instances.
636 This checks what instances are running but unknown to the cluster.
640 for node in node_instance:
641 for runninginstance in node_instance[node]:
642 if runninginstance not in instancelist:
643 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
644 (runninginstance, node))
648 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
649 """Verify N+1 Memory Resilience.
651 Check that if one single node dies we can still start all the instances it
657 for node, nodeinfo in node_info.iteritems():
658 # This code checks that every node which is now listed as secondary has
659 # enough memory to host all instances it is supposed to should a single
660 # other node in the cluster fail.
661 # FIXME: not ready for failover to an arbitrary node
662 # FIXME: does not support file-backed instances
663 # WARNING: we currently take into account down instances as well as up
664 # ones, considering that even if they're down someone might want to start
665 # them even in the event of a node failure.
666 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
668 for instance in instances:
669 needed_mem += instance_cfg[instance].memory
670 if nodeinfo['mfree'] < needed_mem:
671 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
672 " failovers should node %s fail" % (node, prinode))
676 def CheckPrereq(self):
677 """Check prerequisites.
679 Transform the list of checks we're going to skip into a set and check that
680 all its members are valid.
683 self.skip_set = frozenset(self.op.skip_checks)
684 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
685 raise errors.OpPrereqError("Invalid checks to be skipped specified")
687 def BuildHooksEnv(self):
690 Cluster-Verify hooks just rone in the post phase and their failure makes
691 the output be logged in the verify output and the verification to fail.
694 all_nodes = self.cfg.GetNodeList()
695 # TODO: populate the environment with useful information for verify hooks
697 return env, [], all_nodes
699 def Exec(self, feedback_fn):
700 """Verify integrity of cluster, performing various test on nodes.
704 feedback_fn("* Verifying global settings")
705 for msg in self.cfg.VerifyConfig():
706 feedback_fn(" - ERROR: %s" % msg)
708 vg_name = self.cfg.GetVGName()
709 nodelist = utils.NiceSort(self.cfg.GetNodeList())
710 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
711 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
712 i_non_redundant = [] # Non redundant instances
718 # FIXME: verify OS list
720 file_names = list(self.sstore.GetFileList())
721 file_names.append(constants.SSL_CERT_FILE)
722 file_names.append(constants.CLUSTER_CONF_FILE)
723 local_checksums = utils.FingerprintFiles(file_names)
725 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
726 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
727 all_instanceinfo = rpc.call_instance_list(nodelist)
728 all_vglist = rpc.call_vg_list(nodelist)
729 node_verify_param = {
730 'filelist': file_names,
731 'nodelist': nodelist,
733 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
734 for node in nodeinfo]
736 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
737 all_rversion = rpc.call_version(nodelist)
738 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
740 for node in nodelist:
741 feedback_fn("* Verifying node %s" % node)
742 result = self._VerifyNode(node, file_names, local_checksums,
743 all_vglist[node], all_nvinfo[node],
744 all_rversion[node], feedback_fn)
748 volumeinfo = all_volumeinfo[node]
750 if isinstance(volumeinfo, basestring):
751 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
752 (node, volumeinfo[-400:].encode('string_escape')))
754 node_volume[node] = {}
755 elif not isinstance(volumeinfo, dict):
756 feedback_fn(" - ERROR: connection to %s failed" % (node,))
760 node_volume[node] = volumeinfo
763 nodeinstance = all_instanceinfo[node]
764 if type(nodeinstance) != list:
765 feedback_fn(" - ERROR: connection to %s failed" % (node,))
769 node_instance[node] = nodeinstance
772 nodeinfo = all_ninfo[node]
773 if not isinstance(nodeinfo, dict):
774 feedback_fn(" - ERROR: connection to %s failed" % (node,))
780 "mfree": int(nodeinfo['memory_free']),
781 "dfree": int(nodeinfo['vg_free']),
784 # dictionary holding all instances this node is secondary for,
785 # grouped by their primary node. Each key is a cluster node, and each
786 # value is a list of instances which have the key as primary and the
787 # current node as secondary. this is handy to calculate N+1 memory
788 # availability if you can only failover from a primary to its
790 "sinst-by-pnode": {},
793 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
799 for instance in instancelist:
800 feedback_fn("* Verifying instance %s" % instance)
801 inst_config = self.cfg.GetInstanceInfo(instance)
802 result = self._VerifyInstance(instance, inst_config, node_volume,
803 node_instance, feedback_fn)
806 inst_config.MapLVsByNode(node_vol_should)
808 instance_cfg[instance] = inst_config
810 pnode = inst_config.primary_node
811 if pnode in node_info:
812 node_info[pnode]['pinst'].append(instance)
814 feedback_fn(" - ERROR: instance %s, connection to primary node"
815 " %s failed" % (instance, pnode))
818 # If the instance is non-redundant we cannot survive losing its primary
819 # node, so we are not N+1 compliant. On the other hand we have no disk
820 # templates with more than one secondary so that situation is not well
822 # FIXME: does not support file-backed instances
823 if len(inst_config.secondary_nodes) == 0:
824 i_non_redundant.append(instance)
825 elif len(inst_config.secondary_nodes) > 1:
826 feedback_fn(" - WARNING: multiple secondaries for instance %s"
829 for snode in inst_config.secondary_nodes:
830 if snode in node_info:
831 node_info[snode]['sinst'].append(instance)
832 if pnode not in node_info[snode]['sinst-by-pnode']:
833 node_info[snode]['sinst-by-pnode'][pnode] = []
834 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
836 feedback_fn(" - ERROR: instance %s, connection to secondary node"
837 " %s failed" % (instance, snode))
839 feedback_fn("* Verifying orphan volumes")
840 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
844 feedback_fn("* Verifying remaining instances")
845 result = self._VerifyOrphanInstances(instancelist, node_instance,
849 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
850 feedback_fn("* Verifying N+1 Memory redundancy")
851 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
854 feedback_fn("* Other Notes")
856 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
857 % len(i_non_redundant))
861 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
862 """Analize the post-hooks' result, handle it, and send some
863 nicely-formatted feedback back to the user.
866 phase: the hooks phase that has just been run
867 hooks_results: the results of the multi-node hooks rpc call
868 feedback_fn: function to send feedback back to the caller
869 lu_result: previous Exec result
872 # We only really run POST phase hooks, and are only interested in
874 if phase == constants.HOOKS_PHASE_POST:
875 # Used to change hooks' output to proper indentation
876 indent_re = re.compile('^', re.M)
877 feedback_fn("* Hooks Results")
878 if not hooks_results:
879 feedback_fn(" - ERROR: general communication failure")
882 for node_name in hooks_results:
883 show_node_header = True
884 res = hooks_results[node_name]
885 if res is False or not isinstance(res, list):
886 feedback_fn(" Communication failure")
889 for script, hkr, output in res:
890 if hkr == constants.HKR_FAIL:
891 # The node header is only shown once, if there are
892 # failing hooks on that node
894 feedback_fn(" Node %s:" % node_name)
895 show_node_header = False
896 feedback_fn(" ERROR: Script %s failed, output:" % script)
897 output = indent_re.sub(' ', output)
898 feedback_fn("%s" % output)
904 class LUVerifyDisks(NoHooksLU):
905 """Verifies the cluster disks status.
910 def CheckPrereq(self):
911 """Check prerequisites.
913 This has no prerequisites.
918 def Exec(self, feedback_fn):
919 """Verify integrity of cluster disks.
922 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
924 vg_name = self.cfg.GetVGName()
925 nodes = utils.NiceSort(self.cfg.GetNodeList())
926 instances = [self.cfg.GetInstanceInfo(name)
927 for name in self.cfg.GetInstanceList()]
930 for inst in instances:
932 if (inst.status != "up" or
933 inst.disk_template not in constants.DTS_NET_MIRROR):
935 inst.MapLVsByNode(inst_lvs)
936 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
937 for node, vol_list in inst_lvs.iteritems():
939 nv_dict[(node, vol)] = inst
944 node_lvs = rpc.call_volume_list(nodes, vg_name)
951 if isinstance(lvs, basestring):
952 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
954 elif not isinstance(lvs, dict):
955 logger.Info("connection to node %s failed or invalid data returned" %
957 res_nodes.append(node)
960 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
961 inst = nv_dict.pop((node, lv_name), None)
962 if (not lv_online and inst is not None
963 and inst.name not in res_instances):
964 res_instances.append(inst.name)
966 # any leftover items in nv_dict are missing LVs, let's arrange the
968 for key, inst in nv_dict.iteritems():
969 if inst.name not in res_missing:
970 res_missing[inst.name] = []
971 res_missing[inst.name].append(key)
976 class LURenameCluster(LogicalUnit):
977 """Rename the cluster.
980 HPATH = "cluster-rename"
981 HTYPE = constants.HTYPE_CLUSTER
985 def BuildHooksEnv(self):
990 "OP_TARGET": self.sstore.GetClusterName(),
991 "NEW_NAME": self.op.name,
993 mn = self.sstore.GetMasterNode()
994 return env, [mn], [mn]
996 def CheckPrereq(self):
997 """Verify that the passed name is a valid one.
1000 hostname = utils.HostInfo(self.op.name)
1002 new_name = hostname.name
1003 self.ip = new_ip = hostname.ip
1004 old_name = self.sstore.GetClusterName()
1005 old_ip = self.sstore.GetMasterIP()
1006 if new_name == old_name and new_ip == old_ip:
1007 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1008 " cluster has changed")
1009 if new_ip != old_ip:
1010 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1011 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1012 " reachable on the network. Aborting." %
1015 self.op.name = new_name
1017 def Exec(self, feedback_fn):
1018 """Rename the cluster.
1021 clustername = self.op.name
1025 # shutdown the master IP
1026 master = ss.GetMasterNode()
1027 if not rpc.call_node_stop_master(master, False):
1028 raise errors.OpExecError("Could not disable the master role")
1032 ss.SetKey(ss.SS_MASTER_IP, ip)
1033 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1035 # Distribute updated ss config to all nodes
1036 myself = self.cfg.GetNodeInfo(master)
1037 dist_nodes = self.cfg.GetNodeList()
1038 if myself.name in dist_nodes:
1039 dist_nodes.remove(myself.name)
1041 logger.Debug("Copying updated ssconf data to all nodes")
1042 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1043 fname = ss.KeyToFilename(keyname)
1044 result = rpc.call_upload_file(dist_nodes, fname)
1045 for to_node in dist_nodes:
1046 if not result[to_node]:
1047 logger.Error("copy of file %s to node %s failed" %
1050 if not rpc.call_node_start_master(master, False):
1051 logger.Error("Could not re-enable the master role on the master,"
1052 " please restart manually.")
1055 def _RecursiveCheckIfLVMBased(disk):
1056 """Check if the given disk or its children are lvm-based.
1059 disk: ganeti.objects.Disk object
1062 boolean indicating whether a LD_LV dev_type was found or not
1066 for chdisk in disk.children:
1067 if _RecursiveCheckIfLVMBased(chdisk):
1069 return disk.dev_type == constants.LD_LV
1072 class LUSetClusterParams(LogicalUnit):
1073 """Change the parameters of the cluster.
1076 HPATH = "cluster-modify"
1077 HTYPE = constants.HTYPE_CLUSTER
1080 def BuildHooksEnv(self):
1085 "OP_TARGET": self.sstore.GetClusterName(),
1086 "NEW_VG_NAME": self.op.vg_name,
1088 mn = self.sstore.GetMasterNode()
1089 return env, [mn], [mn]
1091 def CheckPrereq(self):
1092 """Check prerequisites.
1094 This checks whether the given params don't conflict and
1095 if the given volume group is valid.
1098 if not self.op.vg_name:
1099 instances = [self.cfg.GetInstanceInfo(name)
1100 for name in self.cfg.GetInstanceList()]
1101 for inst in instances:
1102 for disk in inst.disks:
1103 if _RecursiveCheckIfLVMBased(disk):
1104 raise errors.OpPrereqError("Cannot disable lvm storage while"
1105 " lvm-based instances exist")
1107 # if vg_name not None, checks given volume group on all nodes
1109 node_list = self.cfg.GetNodeList()
1110 vglist = rpc.call_vg_list(node_list)
1111 for node in node_list:
1112 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1113 constants.MIN_VG_SIZE)
1115 raise errors.OpPrereqError("Error on node '%s': %s" %
1118 def Exec(self, feedback_fn):
1119 """Change the parameters of the cluster.
1122 if self.op.vg_name != self.cfg.GetVGName():
1123 self.cfg.SetVGName(self.op.vg_name)
1125 feedback_fn("Cluster LVM configuration already in desired"
1126 " state, not changing")
1129 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1130 """Sleep and poll for an instance's disk to sync.
1133 if not instance.disks:
1137 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1139 node = instance.primary_node
1141 for dev in instance.disks:
1142 cfgw.SetDiskID(dev, node)
1148 cumul_degraded = False
1149 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1151 proc.LogWarning("Can't get any data from node %s" % node)
1154 raise errors.RemoteError("Can't contact node %s for mirror data,"
1155 " aborting." % node)
1159 for i in range(len(rstats)):
1162 proc.LogWarning("Can't compute data for node %s/%s" %
1163 (node, instance.disks[i].iv_name))
1165 # we ignore the ldisk parameter
1166 perc_done, est_time, is_degraded, _ = mstat
1167 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1168 if perc_done is not None:
1170 if est_time is not None:
1171 rem_time = "%d estimated seconds remaining" % est_time
1174 rem_time = "no time estimate"
1175 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1176 (instance.disks[i].iv_name, perc_done, rem_time))
1180 time.sleep(min(60, max_time))
1183 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1184 return not cumul_degraded
1187 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1188 """Check that mirrors are not degraded.
1190 The ldisk parameter, if True, will change the test from the
1191 is_degraded attribute (which represents overall non-ok status for
1192 the device(s)) to the ldisk (representing the local storage status).
1195 cfgw.SetDiskID(dev, node)
1202 if on_primary or dev.AssembleOnSecondary():
1203 rstats = rpc.call_blockdev_find(node, dev)
1205 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1208 result = result and (not rstats[idx])
1210 for child in dev.children:
1211 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1216 class LUDiagnoseOS(NoHooksLU):
1217 """Logical unit for OS diagnose/query.
1220 _OP_REQP = ["output_fields", "names"]
1222 def CheckPrereq(self):
1223 """Check prerequisites.
1225 This always succeeds, since this is a pure query LU.
1229 raise errors.OpPrereqError("Selective OS query not supported")
1231 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1232 _CheckOutputFields(static=[],
1233 dynamic=self.dynamic_fields,
1234 selected=self.op.output_fields)
1237 def _DiagnoseByOS(node_list, rlist):
1238 """Remaps a per-node return list into an a per-os per-node dictionary
1241 node_list: a list with the names of all nodes
1242 rlist: a map with node names as keys and OS objects as values
1245 map: a map with osnames as keys and as value another map, with
1247 keys and list of OS objects as values
1248 e.g. {"debian-etch": {"node1": [<object>,...],
1249 "node2": [<object>,]}
1254 for node_name, nr in rlist.iteritems():
1258 if os_obj.name not in all_os:
1259 # build a list of nodes for this os containing empty lists
1260 # for each node in node_list
1261 all_os[os_obj.name] = {}
1262 for nname in node_list:
1263 all_os[os_obj.name][nname] = []
1264 all_os[os_obj.name][node_name].append(os_obj)
1267 def Exec(self, feedback_fn):
1268 """Compute the list of OSes.
1271 node_list = self.cfg.GetNodeList()
1272 node_data = rpc.call_os_diagnose(node_list)
1273 if node_data == False:
1274 raise errors.OpExecError("Can't gather the list of OSes")
1275 pol = self._DiagnoseByOS(node_list, node_data)
1277 for os_name, os_data in pol.iteritems():
1279 for field in self.op.output_fields:
1282 elif field == "valid":
1283 val = utils.all([osl and osl[0] for osl in os_data.values()])
1284 elif field == "node_status":
1286 for node_name, nos_list in os_data.iteritems():
1287 val[node_name] = [(v.status, v.path) for v in nos_list]
1289 raise errors.ParameterError(field)
1296 class LURemoveNode(LogicalUnit):
1297 """Logical unit for removing a node.
1300 HPATH = "node-remove"
1301 HTYPE = constants.HTYPE_NODE
1302 _OP_REQP = ["node_name"]
1304 def BuildHooksEnv(self):
1307 This doesn't run on the target node in the pre phase as a failed
1308 node would then be impossible to remove.
1312 "OP_TARGET": self.op.node_name,
1313 "NODE_NAME": self.op.node_name,
1315 all_nodes = self.cfg.GetNodeList()
1316 all_nodes.remove(self.op.node_name)
1317 return env, all_nodes, all_nodes
1319 def CheckPrereq(self):
1320 """Check prerequisites.
1323 - the node exists in the configuration
1324 - it does not have primary or secondary instances
1325 - it's not the master
1327 Any errors are signalled by raising errors.OpPrereqError.
1330 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1332 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1334 instance_list = self.cfg.GetInstanceList()
1336 masternode = self.sstore.GetMasterNode()
1337 if node.name == masternode:
1338 raise errors.OpPrereqError("Node is the master node,"
1339 " you need to failover first.")
1341 for instance_name in instance_list:
1342 instance = self.cfg.GetInstanceInfo(instance_name)
1343 if node.name == instance.primary_node:
1344 raise errors.OpPrereqError("Instance %s still running on the node,"
1345 " please remove first." % instance_name)
1346 if node.name in instance.secondary_nodes:
1347 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1348 " please remove first." % instance_name)
1349 self.op.node_name = node.name
1352 def Exec(self, feedback_fn):
1353 """Removes the node from the cluster.
1357 logger.Info("stopping the node daemon and removing configs from node %s" %
1360 self.context.RemoveNode(node.name)
1362 rpc.call_node_leave_cluster(node.name)
1365 class LUQueryNodes(NoHooksLU):
1366 """Logical unit for querying nodes.
1369 _OP_REQP = ["output_fields", "names"]
1372 def ExpandNames(self):
1373 self.dynamic_fields = frozenset([
1375 "mtotal", "mnode", "mfree",
1380 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1381 "pinst_list", "sinst_list",
1382 "pip", "sip", "tags"],
1383 dynamic=self.dynamic_fields,
1384 selected=self.op.output_fields)
1386 self.needed_locks = {}
1387 self.share_locks[locking.LEVEL_NODE] = 1
1388 # TODO: we could lock nodes only if the user asked for dynamic fields. For
1389 # that we need atomic ways to get info for a group of nodes from the
1391 if not self.op.names:
1392 self.needed_locks[locking.LEVEL_NODE] = None
1394 self.needed_locks[locking.LEVEL_NODE] = \
1395 _GetWantedNodes(self, self.op.names)
1397 def CheckPrereq(self):
1398 """Check prerequisites.
1401 # This of course is valid only if we locked the nodes
1402 self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1404 def Exec(self, feedback_fn):
1405 """Computes the list of nodes and their attributes.
1408 nodenames = self.wanted
1409 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1411 # begin data gathering
1413 if self.dynamic_fields.intersection(self.op.output_fields):
1415 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1416 for name in nodenames:
1417 nodeinfo = node_data.get(name, None)
1420 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1421 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1422 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1423 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1424 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1425 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1426 "bootid": nodeinfo['bootid'],
1429 live_data[name] = {}
1431 live_data = dict.fromkeys(nodenames, {})
1433 node_to_primary = dict([(name, set()) for name in nodenames])
1434 node_to_secondary = dict([(name, set()) for name in nodenames])
1436 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1437 "sinst_cnt", "sinst_list"))
1438 if inst_fields & frozenset(self.op.output_fields):
1439 instancelist = self.cfg.GetInstanceList()
1441 for instance_name in instancelist:
1442 inst = self.cfg.GetInstanceInfo(instance_name)
1443 if inst.primary_node in node_to_primary:
1444 node_to_primary[inst.primary_node].add(inst.name)
1445 for secnode in inst.secondary_nodes:
1446 if secnode in node_to_secondary:
1447 node_to_secondary[secnode].add(inst.name)
1449 # end data gathering
1452 for node in nodelist:
1454 for field in self.op.output_fields:
1457 elif field == "pinst_list":
1458 val = list(node_to_primary[node.name])
1459 elif field == "sinst_list":
1460 val = list(node_to_secondary[node.name])
1461 elif field == "pinst_cnt":
1462 val = len(node_to_primary[node.name])
1463 elif field == "sinst_cnt":
1464 val = len(node_to_secondary[node.name])
1465 elif field == "pip":
1466 val = node.primary_ip
1467 elif field == "sip":
1468 val = node.secondary_ip
1469 elif field == "tags":
1470 val = list(node.GetTags())
1471 elif field in self.dynamic_fields:
1472 val = live_data[node.name].get(field, None)
1474 raise errors.ParameterError(field)
1475 node_output.append(val)
1476 output.append(node_output)
1481 class LUQueryNodeVolumes(NoHooksLU):
1482 """Logical unit for getting volumes on node(s).
1485 _OP_REQP = ["nodes", "output_fields"]
1487 def CheckPrereq(self):
1488 """Check prerequisites.
1490 This checks that the fields required are valid output fields.
1493 self.nodes = _GetWantedNodes(self, self.op.nodes)
1495 _CheckOutputFields(static=["node"],
1496 dynamic=["phys", "vg", "name", "size", "instance"],
1497 selected=self.op.output_fields)
1500 def Exec(self, feedback_fn):
1501 """Computes the list of nodes and their attributes.
1504 nodenames = self.nodes
1505 volumes = rpc.call_node_volumes(nodenames)
1507 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1508 in self.cfg.GetInstanceList()]
1510 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1513 for node in nodenames:
1514 if node not in volumes or not volumes[node]:
1517 node_vols = volumes[node][:]
1518 node_vols.sort(key=lambda vol: vol['dev'])
1520 for vol in node_vols:
1522 for field in self.op.output_fields:
1525 elif field == "phys":
1529 elif field == "name":
1531 elif field == "size":
1532 val = int(float(vol['size']))
1533 elif field == "instance":
1535 if node not in lv_by_node[inst]:
1537 if vol['name'] in lv_by_node[inst][node]:
1543 raise errors.ParameterError(field)
1544 node_output.append(str(val))
1546 output.append(node_output)
1551 class LUAddNode(LogicalUnit):
1552 """Logical unit for adding node to the cluster.
1556 HTYPE = constants.HTYPE_NODE
1557 _OP_REQP = ["node_name"]
1559 def BuildHooksEnv(self):
1562 This will run on all nodes before, and on all nodes + the new node after.
1566 "OP_TARGET": self.op.node_name,
1567 "NODE_NAME": self.op.node_name,
1568 "NODE_PIP": self.op.primary_ip,
1569 "NODE_SIP": self.op.secondary_ip,
1571 nodes_0 = self.cfg.GetNodeList()
1572 nodes_1 = nodes_0 + [self.op.node_name, ]
1573 return env, nodes_0, nodes_1
1575 def CheckPrereq(self):
1576 """Check prerequisites.
1579 - the new node is not already in the config
1581 - its parameters (single/dual homed) matches the cluster
1583 Any errors are signalled by raising errors.OpPrereqError.
1586 node_name = self.op.node_name
1589 dns_data = utils.HostInfo(node_name)
1591 node = dns_data.name
1592 primary_ip = self.op.primary_ip = dns_data.ip
1593 secondary_ip = getattr(self.op, "secondary_ip", None)
1594 if secondary_ip is None:
1595 secondary_ip = primary_ip
1596 if not utils.IsValidIP(secondary_ip):
1597 raise errors.OpPrereqError("Invalid secondary IP given")
1598 self.op.secondary_ip = secondary_ip
1600 node_list = cfg.GetNodeList()
1601 if not self.op.readd and node in node_list:
1602 raise errors.OpPrereqError("Node %s is already in the configuration" %
1604 elif self.op.readd and node not in node_list:
1605 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1607 for existing_node_name in node_list:
1608 existing_node = cfg.GetNodeInfo(existing_node_name)
1610 if self.op.readd and node == existing_node_name:
1611 if (existing_node.primary_ip != primary_ip or
1612 existing_node.secondary_ip != secondary_ip):
1613 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1614 " address configuration as before")
1617 if (existing_node.primary_ip == primary_ip or
1618 existing_node.secondary_ip == primary_ip or
1619 existing_node.primary_ip == secondary_ip or
1620 existing_node.secondary_ip == secondary_ip):
1621 raise errors.OpPrereqError("New node ip address(es) conflict with"
1622 " existing node %s" % existing_node.name)
1624 # check that the type of the node (single versus dual homed) is the
1625 # same as for the master
1626 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1627 master_singlehomed = myself.secondary_ip == myself.primary_ip
1628 newbie_singlehomed = secondary_ip == primary_ip
1629 if master_singlehomed != newbie_singlehomed:
1630 if master_singlehomed:
1631 raise errors.OpPrereqError("The master has no private ip but the"
1632 " new node has one")
1634 raise errors.OpPrereqError("The master has a private ip but the"
1635 " new node doesn't have one")
1637 # checks reachablity
1638 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1639 raise errors.OpPrereqError("Node not reachable by ping")
1641 if not newbie_singlehomed:
1642 # check reachability from my secondary ip to newbie's secondary ip
1643 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1644 source=myself.secondary_ip):
1645 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1646 " based ping to noded port")
1648 self.new_node = objects.Node(name=node,
1649 primary_ip=primary_ip,
1650 secondary_ip=secondary_ip)
1652 def Exec(self, feedback_fn):
1653 """Adds the new node to the cluster.
1656 new_node = self.new_node
1657 node = new_node.name
1659 # check connectivity
1660 result = rpc.call_version([node])[node]
1662 if constants.PROTOCOL_VERSION == result:
1663 logger.Info("communication to node %s fine, sw version %s match" %
1666 raise errors.OpExecError("Version mismatch master version %s,"
1667 " node version %s" %
1668 (constants.PROTOCOL_VERSION, result))
1670 raise errors.OpExecError("Cannot get version from the new node")
1673 logger.Info("copy ssh key to node %s" % node)
1674 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1676 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1677 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1683 keyarray.append(f.read())
1687 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1688 keyarray[3], keyarray[4], keyarray[5])
1691 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1693 # Add node to our /etc/hosts, and add key to known_hosts
1694 utils.AddHostToEtcHosts(new_node.name)
1696 if new_node.secondary_ip != new_node.primary_ip:
1697 if not rpc.call_node_tcp_ping(new_node.name,
1698 constants.LOCALHOST_IP_ADDRESS,
1699 new_node.secondary_ip,
1700 constants.DEFAULT_NODED_PORT,
1702 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1703 " you gave (%s). Please fix and re-run this"
1704 " command." % new_node.secondary_ip)
1706 node_verify_list = [self.sstore.GetMasterNode()]
1707 node_verify_param = {
1709 # TODO: do a node-net-test as well?
1712 result = rpc.call_node_verify(node_verify_list, node_verify_param)
1713 for verifier in node_verify_list:
1714 if not result[verifier]:
1715 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1716 " for remote verification" % verifier)
1717 if result[verifier]['nodelist']:
1718 for failed in result[verifier]['nodelist']:
1719 feedback_fn("ssh/hostname verification failed %s -> %s" %
1720 (verifier, result[verifier]['nodelist'][failed]))
1721 raise errors.OpExecError("ssh/hostname verification failed.")
1723 # Distribute updated /etc/hosts and known_hosts to all nodes,
1724 # including the node just added
1725 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1726 dist_nodes = self.cfg.GetNodeList()
1727 if not self.op.readd:
1728 dist_nodes.append(node)
1729 if myself.name in dist_nodes:
1730 dist_nodes.remove(myself.name)
1732 logger.Debug("Copying hosts and known_hosts to all nodes")
1733 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1734 result = rpc.call_upload_file(dist_nodes, fname)
1735 for to_node in dist_nodes:
1736 if not result[to_node]:
1737 logger.Error("copy of file %s to node %s failed" %
1740 to_copy = self.sstore.GetFileList()
1741 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1742 to_copy.append(constants.VNC_PASSWORD_FILE)
1743 for fname in to_copy:
1744 result = rpc.call_upload_file([node], fname)
1745 if not result[node]:
1746 logger.Error("could not copy file %s to node %s" % (fname, node))
1749 self.context.ReaddNode(new_node)
1751 self.context.AddNode(new_node)
1754 class LUQueryClusterInfo(NoHooksLU):
1755 """Query cluster configuration.
1762 def ExpandNames(self):
1763 self.needed_locks = {}
1765 def CheckPrereq(self):
1766 """No prerequsites needed for this LU.
1771 def Exec(self, feedback_fn):
1772 """Return cluster config.
1776 "name": self.sstore.GetClusterName(),
1777 "software_version": constants.RELEASE_VERSION,
1778 "protocol_version": constants.PROTOCOL_VERSION,
1779 "config_version": constants.CONFIG_VERSION,
1780 "os_api_version": constants.OS_API_VERSION,
1781 "export_version": constants.EXPORT_VERSION,
1782 "master": self.sstore.GetMasterNode(),
1783 "architecture": (platform.architecture()[0], platform.machine()),
1784 "hypervisor_type": self.sstore.GetHypervisorType(),
1790 class LUDumpClusterConfig(NoHooksLU):
1791 """Return a text-representation of the cluster-config.
1797 def ExpandNames(self):
1798 self.needed_locks = {}
1800 def CheckPrereq(self):
1801 """No prerequisites.
1806 def Exec(self, feedback_fn):
1807 """Dump a representation of the cluster config to the standard output.
1810 return self.cfg.DumpConfig()
1813 class LUActivateInstanceDisks(NoHooksLU):
1814 """Bring up an instance's disks.
1817 _OP_REQP = ["instance_name"]
1819 def CheckPrereq(self):
1820 """Check prerequisites.
1822 This checks that the instance is in the cluster.
1825 instance = self.cfg.GetInstanceInfo(
1826 self.cfg.ExpandInstanceName(self.op.instance_name))
1827 if instance is None:
1828 raise errors.OpPrereqError("Instance '%s' not known" %
1829 self.op.instance_name)
1830 self.instance = instance
1833 def Exec(self, feedback_fn):
1834 """Activate the disks.
1837 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1839 raise errors.OpExecError("Cannot activate block devices")
1844 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1845 """Prepare the block devices for an instance.
1847 This sets up the block devices on all nodes.
1850 instance: a ganeti.objects.Instance object
1851 ignore_secondaries: if true, errors on secondary nodes won't result
1852 in an error return from the function
1855 false if the operation failed
1856 list of (host, instance_visible_name, node_visible_name) if the operation
1857 suceeded with the mapping from node devices to instance devices
1861 iname = instance.name
1862 # With the two passes mechanism we try to reduce the window of
1863 # opportunity for the race condition of switching DRBD to primary
1864 # before handshaking occured, but we do not eliminate it
1866 # The proper fix would be to wait (with some limits) until the
1867 # connection has been made and drbd transitions from WFConnection
1868 # into any other network-connected state (Connected, SyncTarget,
1871 # 1st pass, assemble on all nodes in secondary mode
1872 for inst_disk in instance.disks:
1873 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1874 cfg.SetDiskID(node_disk, node)
1875 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1877 logger.Error("could not prepare block device %s on node %s"
1878 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1879 if not ignore_secondaries:
1882 # FIXME: race condition on drbd migration to primary
1884 # 2nd pass, do only the primary node
1885 for inst_disk in instance.disks:
1886 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1887 if node != instance.primary_node:
1889 cfg.SetDiskID(node_disk, node)
1890 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1892 logger.Error("could not prepare block device %s on node %s"
1893 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1895 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1897 # leave the disks configured for the primary node
1898 # this is a workaround that would be fixed better by
1899 # improving the logical/physical id handling
1900 for disk in instance.disks:
1901 cfg.SetDiskID(disk, instance.primary_node)
1903 return disks_ok, device_info
1906 def _StartInstanceDisks(cfg, instance, force):
1907 """Start the disks of an instance.
1910 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1911 ignore_secondaries=force)
1913 _ShutdownInstanceDisks(instance, cfg)
1914 if force is not None and not force:
1915 logger.Error("If the message above refers to a secondary node,"
1916 " you can retry the operation using '--force'.")
1917 raise errors.OpExecError("Disk consistency error")
1920 class LUDeactivateInstanceDisks(NoHooksLU):
1921 """Shutdown an instance's disks.
1924 _OP_REQP = ["instance_name"]
1926 def CheckPrereq(self):
1927 """Check prerequisites.
1929 This checks that the instance is in the cluster.
1932 instance = self.cfg.GetInstanceInfo(
1933 self.cfg.ExpandInstanceName(self.op.instance_name))
1934 if instance is None:
1935 raise errors.OpPrereqError("Instance '%s' not known" %
1936 self.op.instance_name)
1937 self.instance = instance
1939 def Exec(self, feedback_fn):
1940 """Deactivate the disks
1943 instance = self.instance
1944 ins_l = rpc.call_instance_list([instance.primary_node])
1945 ins_l = ins_l[instance.primary_node]
1946 if not type(ins_l) is list:
1947 raise errors.OpExecError("Can't contact node '%s'" %
1948 instance.primary_node)
1950 if self.instance.name in ins_l:
1951 raise errors.OpExecError("Instance is running, can't shutdown"
1954 _ShutdownInstanceDisks(instance, self.cfg)
1957 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1958 """Shutdown block devices of an instance.
1960 This does the shutdown on all nodes of the instance.
1962 If the ignore_primary is false, errors on the primary node are
1967 for disk in instance.disks:
1968 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1969 cfg.SetDiskID(top_disk, node)
1970 if not rpc.call_blockdev_shutdown(node, top_disk):
1971 logger.Error("could not shutdown block device %s on node %s" %
1972 (disk.iv_name, node))
1973 if not ignore_primary or node != instance.primary_node:
1978 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1979 """Checks if a node has enough free memory.
1981 This function check if a given node has the needed amount of free
1982 memory. In case the node has less memory or we cannot get the
1983 information from the node, this function raise an OpPrereqError
1987 - cfg: a ConfigWriter instance
1988 - node: the node name
1989 - reason: string to use in the error message
1990 - requested: the amount of memory in MiB
1993 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1994 if not nodeinfo or not isinstance(nodeinfo, dict):
1995 raise errors.OpPrereqError("Could not contact node %s for resource"
1996 " information" % (node,))
1998 free_mem = nodeinfo[node].get('memory_free')
1999 if not isinstance(free_mem, int):
2000 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2001 " was '%s'" % (node, free_mem))
2002 if requested > free_mem:
2003 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2004 " needed %s MiB, available %s MiB" %
2005 (node, reason, requested, free_mem))
2008 class LUStartupInstance(LogicalUnit):
2009 """Starts an instance.
2012 HPATH = "instance-start"
2013 HTYPE = constants.HTYPE_INSTANCE
2014 _OP_REQP = ["instance_name", "force"]
2017 def ExpandNames(self):
2018 self._ExpandAndLockInstance()
2019 self.needed_locks[locking.LEVEL_NODE] = []
2020 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2022 def DeclareLocks(self, level):
2023 if level == locking.LEVEL_NODE:
2024 self._LockInstancesNodes()
2026 def BuildHooksEnv(self):
2029 This runs on master, primary and secondary nodes of the instance.
2033 "FORCE": self.op.force,
2035 env.update(_BuildInstanceHookEnvByObject(self.instance))
2036 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2037 list(self.instance.secondary_nodes))
2040 def CheckPrereq(self):
2041 """Check prerequisites.
2043 This checks that the instance is in the cluster.
2046 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2047 assert self.instance is not None, \
2048 "Cannot retrieve locked instance %s" % self.op.instance_name
2050 # check bridges existance
2051 _CheckInstanceBridgesExist(instance)
2053 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2054 "starting instance %s" % instance.name,
2057 def Exec(self, feedback_fn):
2058 """Start the instance.
2061 instance = self.instance
2062 force = self.op.force
2063 extra_args = getattr(self.op, "extra_args", "")
2065 self.cfg.MarkInstanceUp(instance.name)
2067 node_current = instance.primary_node
2069 _StartInstanceDisks(self.cfg, instance, force)
2071 if not rpc.call_instance_start(node_current, instance, extra_args):
2072 _ShutdownInstanceDisks(instance, self.cfg)
2073 raise errors.OpExecError("Could not start instance")
2076 class LURebootInstance(LogicalUnit):
2077 """Reboot an instance.
2080 HPATH = "instance-reboot"
2081 HTYPE = constants.HTYPE_INSTANCE
2082 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2085 def ExpandNames(self):
2086 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2087 constants.INSTANCE_REBOOT_HARD,
2088 constants.INSTANCE_REBOOT_FULL]:
2089 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2090 (constants.INSTANCE_REBOOT_SOFT,
2091 constants.INSTANCE_REBOOT_HARD,
2092 constants.INSTANCE_REBOOT_FULL))
2093 self._ExpandAndLockInstance()
2094 self.needed_locks[locking.LEVEL_NODE] = []
2095 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2097 def DeclareLocks(self, level):
2098 if level == locking.LEVEL_NODE:
2099 # FIXME: lock only primary on (not constants.INSTANCE_REBOOT_FULL)
2100 self._LockInstancesNodes()
2102 def BuildHooksEnv(self):
2105 This runs on master, primary and secondary nodes of the instance.
2109 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2111 env.update(_BuildInstanceHookEnvByObject(self.instance))
2112 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2113 list(self.instance.secondary_nodes))
2116 def CheckPrereq(self):
2117 """Check prerequisites.
2119 This checks that the instance is in the cluster.
2122 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2123 assert self.instance is not None, \
2124 "Cannot retrieve locked instance %s" % self.op.instance_name
2126 # check bridges existance
2127 _CheckInstanceBridgesExist(instance)
2129 def Exec(self, feedback_fn):
2130 """Reboot the instance.
2133 instance = self.instance
2134 ignore_secondaries = self.op.ignore_secondaries
2135 reboot_type = self.op.reboot_type
2136 extra_args = getattr(self.op, "extra_args", "")
2138 node_current = instance.primary_node
2140 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2141 constants.INSTANCE_REBOOT_HARD]:
2142 if not rpc.call_instance_reboot(node_current, instance,
2143 reboot_type, extra_args):
2144 raise errors.OpExecError("Could not reboot instance")
2146 if not rpc.call_instance_shutdown(node_current, instance):
2147 raise errors.OpExecError("could not shutdown instance for full reboot")
2148 _ShutdownInstanceDisks(instance, self.cfg)
2149 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2150 if not rpc.call_instance_start(node_current, instance, extra_args):
2151 _ShutdownInstanceDisks(instance, self.cfg)
2152 raise errors.OpExecError("Could not start instance for full reboot")
2154 self.cfg.MarkInstanceUp(instance.name)
2157 class LUShutdownInstance(LogicalUnit):
2158 """Shutdown an instance.
2161 HPATH = "instance-stop"
2162 HTYPE = constants.HTYPE_INSTANCE
2163 _OP_REQP = ["instance_name"]
2166 def ExpandNames(self):
2167 self._ExpandAndLockInstance()
2168 self.needed_locks[locking.LEVEL_NODE] = []
2169 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2171 def DeclareLocks(self, level):
2172 if level == locking.LEVEL_NODE:
2173 self._LockInstancesNodes()
2175 def BuildHooksEnv(self):
2178 This runs on master, primary and secondary nodes of the instance.
2181 env = _BuildInstanceHookEnvByObject(self.instance)
2182 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2183 list(self.instance.secondary_nodes))
2186 def CheckPrereq(self):
2187 """Check prerequisites.
2189 This checks that the instance is in the cluster.
2192 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2193 assert self.instance is not None, \
2194 "Cannot retrieve locked instance %s" % self.op.instance_name
2196 def Exec(self, feedback_fn):
2197 """Shutdown the instance.
2200 instance = self.instance
2201 node_current = instance.primary_node
2202 self.cfg.MarkInstanceDown(instance.name)
2203 if not rpc.call_instance_shutdown(node_current, instance):
2204 logger.Error("could not shutdown instance")
2206 _ShutdownInstanceDisks(instance, self.cfg)
2209 class LUReinstallInstance(LogicalUnit):
2210 """Reinstall an instance.
2213 HPATH = "instance-reinstall"
2214 HTYPE = constants.HTYPE_INSTANCE
2215 _OP_REQP = ["instance_name"]
2218 def ExpandNames(self):
2219 self._ExpandAndLockInstance()
2220 self.needed_locks[locking.LEVEL_NODE] = []
2221 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2223 def DeclareLocks(self, level):
2224 if level == locking.LEVEL_NODE:
2225 self._LockInstancesNodes()
2227 def BuildHooksEnv(self):
2230 This runs on master, primary and secondary nodes of the instance.
2233 env = _BuildInstanceHookEnvByObject(self.instance)
2234 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2235 list(self.instance.secondary_nodes))
2238 def CheckPrereq(self):
2239 """Check prerequisites.
2241 This checks that the instance is in the cluster and is not running.
2244 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2245 assert instance is not None, \
2246 "Cannot retrieve locked instance %s" % self.op.instance_name
2248 if instance.disk_template == constants.DT_DISKLESS:
2249 raise errors.OpPrereqError("Instance '%s' has no disks" %
2250 self.op.instance_name)
2251 if instance.status != "down":
2252 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2253 self.op.instance_name)
2254 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2256 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2257 (self.op.instance_name,
2258 instance.primary_node))
2260 self.op.os_type = getattr(self.op, "os_type", None)
2261 if self.op.os_type is not None:
2263 pnode = self.cfg.GetNodeInfo(
2264 self.cfg.ExpandNodeName(instance.primary_node))
2266 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2268 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2270 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2271 " primary node" % self.op.os_type)
2273 self.instance = instance
2275 def Exec(self, feedback_fn):
2276 """Reinstall the instance.
2279 inst = self.instance
2281 if self.op.os_type is not None:
2282 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2283 inst.os = self.op.os_type
2284 self.cfg.AddInstance(inst)
2286 _StartInstanceDisks(self.cfg, inst, None)
2288 feedback_fn("Running the instance OS create scripts...")
2289 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2290 raise errors.OpExecError("Could not install OS for instance %s"
2292 (inst.name, inst.primary_node))
2294 _ShutdownInstanceDisks(inst, self.cfg)
2297 class LURenameInstance(LogicalUnit):
2298 """Rename an instance.
2301 HPATH = "instance-rename"
2302 HTYPE = constants.HTYPE_INSTANCE
2303 _OP_REQP = ["instance_name", "new_name"]
2305 def BuildHooksEnv(self):
2308 This runs on master, primary and secondary nodes of the instance.
2311 env = _BuildInstanceHookEnvByObject(self.instance)
2312 env["INSTANCE_NEW_NAME"] = self.op.new_name
2313 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2314 list(self.instance.secondary_nodes))
2317 def CheckPrereq(self):
2318 """Check prerequisites.
2320 This checks that the instance is in the cluster and is not running.
2323 instance = self.cfg.GetInstanceInfo(
2324 self.cfg.ExpandInstanceName(self.op.instance_name))
2325 if instance is None:
2326 raise errors.OpPrereqError("Instance '%s' not known" %
2327 self.op.instance_name)
2328 if instance.status != "down":
2329 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2330 self.op.instance_name)
2331 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2333 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2334 (self.op.instance_name,
2335 instance.primary_node))
2336 self.instance = instance
2338 # new name verification
2339 name_info = utils.HostInfo(self.op.new_name)
2341 self.op.new_name = new_name = name_info.name
2342 instance_list = self.cfg.GetInstanceList()
2343 if new_name in instance_list:
2344 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2347 if not getattr(self.op, "ignore_ip", False):
2348 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2349 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2350 (name_info.ip, new_name))
2353 def Exec(self, feedback_fn):
2354 """Reinstall the instance.
2357 inst = self.instance
2358 old_name = inst.name
2360 if inst.disk_template == constants.DT_FILE:
2361 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2363 self.cfg.RenameInstance(inst.name, self.op.new_name)
2364 # Change the instance lock. This is definitely safe while we hold the BGL
2365 self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2366 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2368 # re-read the instance from the configuration after rename
2369 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2371 if inst.disk_template == constants.DT_FILE:
2372 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2373 result = rpc.call_file_storage_dir_rename(inst.primary_node,
2374 old_file_storage_dir,
2375 new_file_storage_dir)
2378 raise errors.OpExecError("Could not connect to node '%s' to rename"
2379 " directory '%s' to '%s' (but the instance"
2380 " has been renamed in Ganeti)" % (
2381 inst.primary_node, old_file_storage_dir,
2382 new_file_storage_dir))
2385 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2386 " (but the instance has been renamed in"
2387 " Ganeti)" % (old_file_storage_dir,
2388 new_file_storage_dir))
2390 _StartInstanceDisks(self.cfg, inst, None)
2392 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2394 msg = ("Could run OS rename script for instance %s on node %s (but the"
2395 " instance has been renamed in Ganeti)" %
2396 (inst.name, inst.primary_node))
2399 _ShutdownInstanceDisks(inst, self.cfg)
2402 class LURemoveInstance(LogicalUnit):
2403 """Remove an instance.
2406 HPATH = "instance-remove"
2407 HTYPE = constants.HTYPE_INSTANCE
2408 _OP_REQP = ["instance_name", "ignore_failures"]
2410 def BuildHooksEnv(self):
2413 This runs on master, primary and secondary nodes of the instance.
2416 env = _BuildInstanceHookEnvByObject(self.instance)
2417 nl = [self.sstore.GetMasterNode()]
2420 def CheckPrereq(self):
2421 """Check prerequisites.
2423 This checks that the instance is in the cluster.
2426 instance = self.cfg.GetInstanceInfo(
2427 self.cfg.ExpandInstanceName(self.op.instance_name))
2428 if instance is None:
2429 raise errors.OpPrereqError("Instance '%s' not known" %
2430 self.op.instance_name)
2431 self.instance = instance
2433 def Exec(self, feedback_fn):
2434 """Remove the instance.
2437 instance = self.instance
2438 logger.Info("shutting down instance %s on node %s" %
2439 (instance.name, instance.primary_node))
2441 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2442 if self.op.ignore_failures:
2443 feedback_fn("Warning: can't shutdown instance")
2445 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2446 (instance.name, instance.primary_node))
2448 logger.Info("removing block devices for instance %s" % instance.name)
2450 if not _RemoveDisks(instance, self.cfg):
2451 if self.op.ignore_failures:
2452 feedback_fn("Warning: can't remove instance's disks")
2454 raise errors.OpExecError("Can't remove instance's disks")
2456 logger.Info("removing instance %s out of cluster config" % instance.name)
2458 self.cfg.RemoveInstance(instance.name)
2459 # Remove the new instance from the Ganeti Lock Manager
2460 self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2463 class LUQueryInstances(NoHooksLU):
2464 """Logical unit for querying instances.
2467 _OP_REQP = ["output_fields", "names"]
2470 def ExpandNames(self):
2471 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2472 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2473 "admin_state", "admin_ram",
2474 "disk_template", "ip", "mac", "bridge",
2475 "sda_size", "sdb_size", "vcpus", "tags"],
2476 dynamic=self.dynamic_fields,
2477 selected=self.op.output_fields)
2479 self.needed_locks = {}
2480 self.share_locks[locking.LEVEL_INSTANCE] = 1
2481 self.share_locks[locking.LEVEL_NODE] = 1
2483 # TODO: we could lock instances (and nodes) only if the user asked for
2484 # dynamic fields. For that we need atomic ways to get info for a group of
2485 # instances from the config, though.
2486 if not self.op.names:
2487 self.needed_locks[locking.LEVEL_INSTANCE] = None # Acquire all
2489 self.needed_locks[locking.LEVEL_INSTANCE] = \
2490 _GetWantedInstances(self, self.op.names)
2492 self.needed_locks[locking.LEVEL_NODE] = []
2493 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2495 def DeclareLocks(self, level):
2496 # TODO: locking of nodes could be avoided when not querying them
2497 if level == locking.LEVEL_NODE:
2498 self._LockInstancesNodes()
2500 def CheckPrereq(self):
2501 """Check prerequisites.
2504 # This of course is valid only if we locked the instances
2505 self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2507 def Exec(self, feedback_fn):
2508 """Computes the list of nodes and their attributes.
2511 instance_names = self.wanted
2512 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2515 # begin data gathering
2517 nodes = frozenset([inst.primary_node for inst in instance_list])
2520 if self.dynamic_fields.intersection(self.op.output_fields):
2522 node_data = rpc.call_all_instances_info(nodes)
2524 result = node_data[name]
2526 live_data.update(result)
2527 elif result == False:
2528 bad_nodes.append(name)
2529 # else no instance is alive
2531 live_data = dict([(name, {}) for name in instance_names])
2533 # end data gathering
2536 for instance in instance_list:
2538 for field in self.op.output_fields:
2543 elif field == "pnode":
2544 val = instance.primary_node
2545 elif field == "snodes":
2546 val = list(instance.secondary_nodes)
2547 elif field == "admin_state":
2548 val = (instance.status != "down")
2549 elif field == "oper_state":
2550 if instance.primary_node in bad_nodes:
2553 val = bool(live_data.get(instance.name))
2554 elif field == "status":
2555 if instance.primary_node in bad_nodes:
2556 val = "ERROR_nodedown"
2558 running = bool(live_data.get(instance.name))
2560 if instance.status != "down":
2565 if instance.status != "down":
2569 elif field == "admin_ram":
2570 val = instance.memory
2571 elif field == "oper_ram":
2572 if instance.primary_node in bad_nodes:
2574 elif instance.name in live_data:
2575 val = live_data[instance.name].get("memory", "?")
2578 elif field == "disk_template":
2579 val = instance.disk_template
2581 val = instance.nics[0].ip
2582 elif field == "bridge":
2583 val = instance.nics[0].bridge
2584 elif field == "mac":
2585 val = instance.nics[0].mac
2586 elif field == "sda_size" or field == "sdb_size":
2587 disk = instance.FindDisk(field[:3])
2592 elif field == "vcpus":
2593 val = instance.vcpus
2594 elif field == "tags":
2595 val = list(instance.GetTags())
2597 raise errors.ParameterError(field)
2604 class LUFailoverInstance(LogicalUnit):
2605 """Failover an instance.
2608 HPATH = "instance-failover"
2609 HTYPE = constants.HTYPE_INSTANCE
2610 _OP_REQP = ["instance_name", "ignore_consistency"]
2613 def ExpandNames(self):
2614 self._ExpandAndLockInstance()
2615 self.needed_locks[locking.LEVEL_NODE] = []
2616 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2618 def DeclareLocks(self, level):
2619 if level == locking.LEVEL_NODE:
2620 self._LockInstancesNodes()
2622 def BuildHooksEnv(self):
2625 This runs on master, primary and secondary nodes of the instance.
2629 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2631 env.update(_BuildInstanceHookEnvByObject(self.instance))
2632 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2635 def CheckPrereq(self):
2636 """Check prerequisites.
2638 This checks that the instance is in the cluster.
2641 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2642 assert self.instance is not None, \
2643 "Cannot retrieve locked instance %s" % self.op.instance_name
2645 if instance.disk_template not in constants.DTS_NET_MIRROR:
2646 raise errors.OpPrereqError("Instance's disk layout is not"
2647 " network mirrored, cannot failover.")
2649 secondary_nodes = instance.secondary_nodes
2650 if not secondary_nodes:
2651 raise errors.ProgrammerError("no secondary node but using "
2652 "a mirrored disk template")
2654 target_node = secondary_nodes[0]
2655 # check memory requirements on the secondary node
2656 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2657 instance.name, instance.memory)
2659 # check bridge existance
2660 brlist = [nic.bridge for nic in instance.nics]
2661 if not rpc.call_bridges_exist(target_node, brlist):
2662 raise errors.OpPrereqError("One or more target bridges %s does not"
2663 " exist on destination node '%s'" %
2664 (brlist, target_node))
2666 def Exec(self, feedback_fn):
2667 """Failover an instance.
2669 The failover is done by shutting it down on its present node and
2670 starting it on the secondary.
2673 instance = self.instance
2675 source_node = instance.primary_node
2676 target_node = instance.secondary_nodes[0]
2678 feedback_fn("* checking disk consistency between source and target")
2679 for dev in instance.disks:
2680 # for drbd, these are drbd over lvm
2681 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2682 if instance.status == "up" and not self.op.ignore_consistency:
2683 raise errors.OpExecError("Disk %s is degraded on target node,"
2684 " aborting failover." % dev.iv_name)
2686 feedback_fn("* shutting down instance on source node")
2687 logger.Info("Shutting down instance %s on node %s" %
2688 (instance.name, source_node))
2690 if not rpc.call_instance_shutdown(source_node, instance):
2691 if self.op.ignore_consistency:
2692 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2693 " anyway. Please make sure node %s is down" %
2694 (instance.name, source_node, source_node))
2696 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2697 (instance.name, source_node))
2699 feedback_fn("* deactivating the instance's disks on source node")
2700 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2701 raise errors.OpExecError("Can't shut down the instance's disks.")
2703 instance.primary_node = target_node
2704 # distribute new instance config to the other nodes
2705 self.cfg.Update(instance)
2707 # Only start the instance if it's marked as up
2708 if instance.status == "up":
2709 feedback_fn("* activating the instance's disks on target node")
2710 logger.Info("Starting instance %s on node %s" %
2711 (instance.name, target_node))
2713 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2714 ignore_secondaries=True)
2716 _ShutdownInstanceDisks(instance, self.cfg)
2717 raise errors.OpExecError("Can't activate the instance's disks")
2719 feedback_fn("* starting the instance on the target node")
2720 if not rpc.call_instance_start(target_node, instance, None):
2721 _ShutdownInstanceDisks(instance, self.cfg)
2722 raise errors.OpExecError("Could not start instance %s on node %s." %
2723 (instance.name, target_node))
2726 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2727 """Create a tree of block devices on the primary node.
2729 This always creates all devices.
2733 for child in device.children:
2734 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2737 cfg.SetDiskID(device, node)
2738 new_id = rpc.call_blockdev_create(node, device, device.size,
2739 instance.name, True, info)
2742 if device.physical_id is None:
2743 device.physical_id = new_id
2747 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2748 """Create a tree of block devices on a secondary node.
2750 If this device type has to be created on secondaries, create it and
2753 If not, just recurse to children keeping the same 'force' value.
2756 if device.CreateOnSecondary():
2759 for child in device.children:
2760 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2761 child, force, info):
2766 cfg.SetDiskID(device, node)
2767 new_id = rpc.call_blockdev_create(node, device, device.size,
2768 instance.name, False, info)
2771 if device.physical_id is None:
2772 device.physical_id = new_id
2776 def _GenerateUniqueNames(cfg, exts):
2777 """Generate a suitable LV name.
2779 This will generate a logical volume name for the given instance.
2784 new_id = cfg.GenerateUniqueID()
2785 results.append("%s%s" % (new_id, val))
2789 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2790 """Generate a drbd8 device complete with its children.
2793 port = cfg.AllocatePort()
2794 vgname = cfg.GetVGName()
2795 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2796 logical_id=(vgname, names[0]))
2797 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2798 logical_id=(vgname, names[1]))
2799 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2800 logical_id = (primary, secondary, port),
2801 children = [dev_data, dev_meta],
2806 def _GenerateDiskTemplate(cfg, template_name,
2807 instance_name, primary_node,
2808 secondary_nodes, disk_sz, swap_sz,
2809 file_storage_dir, file_driver):
2810 """Generate the entire disk layout for a given template type.
2813 #TODO: compute space requirements
2815 vgname = cfg.GetVGName()
2816 if template_name == constants.DT_DISKLESS:
2818 elif template_name == constants.DT_PLAIN:
2819 if len(secondary_nodes) != 0:
2820 raise errors.ProgrammerError("Wrong template configuration")
2822 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2823 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2824 logical_id=(vgname, names[0]),
2826 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2827 logical_id=(vgname, names[1]),
2829 disks = [sda_dev, sdb_dev]
2830 elif template_name == constants.DT_DRBD8:
2831 if len(secondary_nodes) != 1:
2832 raise errors.ProgrammerError("Wrong template configuration")
2833 remote_node = secondary_nodes[0]
2834 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2835 ".sdb_data", ".sdb_meta"])
2836 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2837 disk_sz, names[0:2], "sda")
2838 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2839 swap_sz, names[2:4], "sdb")
2840 disks = [drbd_sda_dev, drbd_sdb_dev]
2841 elif template_name == constants.DT_FILE:
2842 if len(secondary_nodes) != 0:
2843 raise errors.ProgrammerError("Wrong template configuration")
2845 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2846 iv_name="sda", logical_id=(file_driver,
2847 "%s/sda" % file_storage_dir))
2848 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2849 iv_name="sdb", logical_id=(file_driver,
2850 "%s/sdb" % file_storage_dir))
2851 disks = [file_sda_dev, file_sdb_dev]
2853 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2857 def _GetInstanceInfoText(instance):
2858 """Compute that text that should be added to the disk's metadata.
2861 return "originstname+%s" % instance.name
2864 def _CreateDisks(cfg, instance):
2865 """Create all disks for an instance.
2867 This abstracts away some work from AddInstance.
2870 instance: the instance object
2873 True or False showing the success of the creation process
2876 info = _GetInstanceInfoText(instance)
2878 if instance.disk_template == constants.DT_FILE:
2879 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2880 result = rpc.call_file_storage_dir_create(instance.primary_node,
2884 logger.Error("Could not connect to node '%s'" % instance.primary_node)
2888 logger.Error("failed to create directory '%s'" % file_storage_dir)
2891 for device in instance.disks:
2892 logger.Info("creating volume %s for instance %s" %
2893 (device.iv_name, instance.name))
2895 for secondary_node in instance.secondary_nodes:
2896 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2897 device, False, info):
2898 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2899 (device.iv_name, device, secondary_node))
2902 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2903 instance, device, info):
2904 logger.Error("failed to create volume %s on primary!" %
2911 def _RemoveDisks(instance, cfg):
2912 """Remove all disks for an instance.
2914 This abstracts away some work from `AddInstance()` and
2915 `RemoveInstance()`. Note that in case some of the devices couldn't
2916 be removed, the removal will continue with the other ones (compare
2917 with `_CreateDisks()`).
2920 instance: the instance object
2923 True or False showing the success of the removal proces
2926 logger.Info("removing block devices for instance %s" % instance.name)
2929 for device in instance.disks:
2930 for node, disk in device.ComputeNodeTree(instance.primary_node):
2931 cfg.SetDiskID(disk, node)
2932 if not rpc.call_blockdev_remove(node, disk):
2933 logger.Error("could not remove block device %s on node %s,"
2934 " continuing anyway" %
2935 (device.iv_name, node))
2938 if instance.disk_template == constants.DT_FILE:
2939 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2940 if not rpc.call_file_storage_dir_remove(instance.primary_node,
2942 logger.Error("could not remove directory '%s'" % file_storage_dir)
2948 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2949 """Compute disk size requirements in the volume group
2951 This is currently hard-coded for the two-drive layout.
2954 # Required free disk space as a function of disk and swap space
2956 constants.DT_DISKLESS: None,
2957 constants.DT_PLAIN: disk_size + swap_size,
2958 # 256 MB are added for drbd metadata, 128MB for each drbd device
2959 constants.DT_DRBD8: disk_size + swap_size + 256,
2960 constants.DT_FILE: None,
2963 if disk_template not in req_size_dict:
2964 raise errors.ProgrammerError("Disk template '%s' size requirement"
2965 " is unknown" % disk_template)
2967 return req_size_dict[disk_template]
2970 class LUCreateInstance(LogicalUnit):
2971 """Create an instance.
2974 HPATH = "instance-add"
2975 HTYPE = constants.HTYPE_INSTANCE
2976 _OP_REQP = ["instance_name", "mem_size", "disk_size",
2977 "disk_template", "swap_size", "mode", "start", "vcpus",
2978 "wait_for_sync", "ip_check", "mac"]
2980 def _RunAllocator(self):
2981 """Run the allocator based on input opcode.
2984 disks = [{"size": self.op.disk_size, "mode": "w"},
2985 {"size": self.op.swap_size, "mode": "w"}]
2986 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2987 "bridge": self.op.bridge}]
2988 ial = IAllocator(self.cfg, self.sstore,
2989 mode=constants.IALLOCATOR_MODE_ALLOC,
2990 name=self.op.instance_name,
2991 disk_template=self.op.disk_template,
2994 vcpus=self.op.vcpus,
2995 mem_size=self.op.mem_size,
3000 ial.Run(self.op.iallocator)
3003 raise errors.OpPrereqError("Can't compute nodes using"
3004 " iallocator '%s': %s" % (self.op.iallocator,
3006 if len(ial.nodes) != ial.required_nodes:
3007 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3008 " of nodes (%s), required %s" %
3009 (len(ial.nodes), ial.required_nodes))
3010 self.op.pnode = ial.nodes[0]
3011 logger.ToStdout("Selected nodes for the instance: %s" %
3012 (", ".join(ial.nodes),))
3013 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3014 (self.op.instance_name, self.op.iallocator, ial.nodes))
3015 if ial.required_nodes == 2:
3016 self.op.snode = ial.nodes[1]
3018 def BuildHooksEnv(self):
3021 This runs on master, primary and secondary nodes of the instance.
3025 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3026 "INSTANCE_DISK_SIZE": self.op.disk_size,
3027 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3028 "INSTANCE_ADD_MODE": self.op.mode,
3030 if self.op.mode == constants.INSTANCE_IMPORT:
3031 env["INSTANCE_SRC_NODE"] = self.op.src_node
3032 env["INSTANCE_SRC_PATH"] = self.op.src_path
3033 env["INSTANCE_SRC_IMAGE"] = self.src_image
3035 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3036 primary_node=self.op.pnode,
3037 secondary_nodes=self.secondaries,
3038 status=self.instance_status,
3039 os_type=self.op.os_type,
3040 memory=self.op.mem_size,
3041 vcpus=self.op.vcpus,
3042 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3045 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3050 def CheckPrereq(self):
3051 """Check prerequisites.
3054 # set optional parameters to none if they don't exist
3055 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3056 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3057 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3058 if not hasattr(self.op, attr):
3059 setattr(self.op, attr, None)
3061 if self.op.mode not in (constants.INSTANCE_CREATE,
3062 constants.INSTANCE_IMPORT):
3063 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3066 if (not self.cfg.GetVGName() and
3067 self.op.disk_template not in constants.DTS_NOT_LVM):
3068 raise errors.OpPrereqError("Cluster does not support lvm-based"
3071 if self.op.mode == constants.INSTANCE_IMPORT:
3072 src_node = getattr(self.op, "src_node", None)
3073 src_path = getattr(self.op, "src_path", None)
3074 if src_node is None or src_path is None:
3075 raise errors.OpPrereqError("Importing an instance requires source"
3076 " node and path options")
3077 src_node_full = self.cfg.ExpandNodeName(src_node)
3078 if src_node_full is None:
3079 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3080 self.op.src_node = src_node = src_node_full
3082 if not os.path.isabs(src_path):
3083 raise errors.OpPrereqError("The source path must be absolute")
3085 export_info = rpc.call_export_info(src_node, src_path)
3088 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3090 if not export_info.has_section(constants.INISECT_EXP):
3091 raise errors.ProgrammerError("Corrupted export config")
3093 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3094 if (int(ei_version) != constants.EXPORT_VERSION):
3095 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3096 (ei_version, constants.EXPORT_VERSION))
3098 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3099 raise errors.OpPrereqError("Can't import instance with more than"
3102 # FIXME: are the old os-es, disk sizes, etc. useful?
3103 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3104 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3106 self.src_image = diskimage
3107 else: # INSTANCE_CREATE
3108 if getattr(self.op, "os_type", None) is None:
3109 raise errors.OpPrereqError("No guest OS specified")
3111 #### instance parameters check
3113 # disk template and mirror node verification
3114 if self.op.disk_template not in constants.DISK_TEMPLATES:
3115 raise errors.OpPrereqError("Invalid disk template name")
3117 # instance name verification
3118 hostname1 = utils.HostInfo(self.op.instance_name)
3120 self.op.instance_name = instance_name = hostname1.name
3121 instance_list = self.cfg.GetInstanceList()
3122 if instance_name in instance_list:
3123 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3126 # ip validity checks
3127 ip = getattr(self.op, "ip", None)
3128 if ip is None or ip.lower() == "none":
3130 elif ip.lower() == "auto":
3131 inst_ip = hostname1.ip
3133 if not utils.IsValidIP(ip):
3134 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3135 " like a valid IP" % ip)
3137 self.inst_ip = self.op.ip = inst_ip
3139 if self.op.start and not self.op.ip_check:
3140 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3141 " adding an instance in start mode")
3143 if self.op.ip_check:
3144 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3145 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3146 (hostname1.ip, instance_name))
3148 # MAC address verification
3149 if self.op.mac != "auto":
3150 if not utils.IsValidMac(self.op.mac.lower()):
3151 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3154 # bridge verification
3155 bridge = getattr(self.op, "bridge", None)
3157 self.op.bridge = self.cfg.GetDefBridge()
3159 self.op.bridge = bridge
3161 # boot order verification
3162 if self.op.hvm_boot_order is not None:
3163 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3164 raise errors.OpPrereqError("invalid boot order specified,"
3165 " must be one or more of [acdn]")
3166 # file storage checks
3167 if (self.op.file_driver and
3168 not self.op.file_driver in constants.FILE_DRIVER):
3169 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3170 self.op.file_driver)
3172 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3173 raise errors.OpPrereqError("File storage directory not a relative"
3177 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3178 raise errors.OpPrereqError("One and only one of iallocator and primary"
3179 " node must be given")
3181 if self.op.iallocator is not None:
3182 self._RunAllocator()
3184 #### node related checks
3186 # check primary node
3187 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3189 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3191 self.op.pnode = pnode.name
3193 self.secondaries = []
3195 # mirror node verification
3196 if self.op.disk_template in constants.DTS_NET_MIRROR:
3197 if getattr(self.op, "snode", None) is None:
3198 raise errors.OpPrereqError("The networked disk templates need"
3201 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3202 if snode_name is None:
3203 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3205 elif snode_name == pnode.name:
3206 raise errors.OpPrereqError("The secondary node cannot be"
3207 " the primary node.")
3208 self.secondaries.append(snode_name)
3210 req_size = _ComputeDiskSize(self.op.disk_template,
3211 self.op.disk_size, self.op.swap_size)
3213 # Check lv size requirements
3214 if req_size is not None:
3215 nodenames = [pnode.name] + self.secondaries
3216 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3217 for node in nodenames:
3218 info = nodeinfo.get(node, None)
3220 raise errors.OpPrereqError("Cannot get current information"
3221 " from node '%s'" % node)
3222 vg_free = info.get('vg_free', None)
3223 if not isinstance(vg_free, int):
3224 raise errors.OpPrereqError("Can't compute free disk space on"
3226 if req_size > info['vg_free']:
3227 raise errors.OpPrereqError("Not enough disk space on target node %s."
3228 " %d MB available, %d MB required" %
3229 (node, info['vg_free'], req_size))
3232 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3234 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3235 " primary node" % self.op.os_type)
3237 if self.op.kernel_path == constants.VALUE_NONE:
3238 raise errors.OpPrereqError("Can't set instance kernel to none")
3241 # bridge check on primary node
3242 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3243 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3244 " destination node '%s'" %
3245 (self.op.bridge, pnode.name))
3247 # memory check on primary node
3249 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3250 "creating instance %s" % self.op.instance_name,
3253 # hvm_cdrom_image_path verification
3254 if self.op.hvm_cdrom_image_path is not None:
3255 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3256 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3257 " be an absolute path or None, not %s" %
3258 self.op.hvm_cdrom_image_path)
3259 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3260 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3261 " regular file or a symlink pointing to"
3262 " an existing regular file, not %s" %
3263 self.op.hvm_cdrom_image_path)
3265 # vnc_bind_address verification
3266 if self.op.vnc_bind_address is not None:
3267 if not utils.IsValidIP(self.op.vnc_bind_address):
3268 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3269 " like a valid IP address" %
3270 self.op.vnc_bind_address)
3272 # Xen HVM device type checks
3273 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3274 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3275 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3276 " hypervisor" % self.op.hvm_nic_type)
3277 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3278 raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3279 " hypervisor" % self.op.hvm_disk_type)
3282 self.instance_status = 'up'
3284 self.instance_status = 'down'
3286 def Exec(self, feedback_fn):
3287 """Create and add the instance to the cluster.
3290 instance = self.op.instance_name
3291 pnode_name = self.pnode.name
3293 if self.op.mac == "auto":
3294 mac_address = self.cfg.GenerateMAC()
3296 mac_address = self.op.mac
3298 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3299 if self.inst_ip is not None:
3300 nic.ip = self.inst_ip
3302 ht_kind = self.sstore.GetHypervisorType()
3303 if ht_kind in constants.HTS_REQ_PORT:
3304 network_port = self.cfg.AllocatePort()
3308 if self.op.vnc_bind_address is None:
3309 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3311 # this is needed because os.path.join does not accept None arguments
3312 if self.op.file_storage_dir is None:
3313 string_file_storage_dir = ""
3315 string_file_storage_dir = self.op.file_storage_dir
3317 # build the full file storage dir path
3318 file_storage_dir = os.path.normpath(os.path.join(
3319 self.sstore.GetFileStorageDir(),
3320 string_file_storage_dir, instance))
3323 disks = _GenerateDiskTemplate(self.cfg,
3324 self.op.disk_template,
3325 instance, pnode_name,
3326 self.secondaries, self.op.disk_size,
3329 self.op.file_driver)
3331 iobj = objects.Instance(name=instance, os=self.op.os_type,
3332 primary_node=pnode_name,
3333 memory=self.op.mem_size,
3334 vcpus=self.op.vcpus,
3335 nics=[nic], disks=disks,
3336 disk_template=self.op.disk_template,
3337 status=self.instance_status,
3338 network_port=network_port,
3339 kernel_path=self.op.kernel_path,
3340 initrd_path=self.op.initrd_path,
3341 hvm_boot_order=self.op.hvm_boot_order,
3342 hvm_acpi=self.op.hvm_acpi,
3343 hvm_pae=self.op.hvm_pae,
3344 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3345 vnc_bind_address=self.op.vnc_bind_address,
3346 hvm_nic_type=self.op.hvm_nic_type,
3347 hvm_disk_type=self.op.hvm_disk_type,
3350 feedback_fn("* creating instance disks...")
3351 if not _CreateDisks(self.cfg, iobj):
3352 _RemoveDisks(iobj, self.cfg)
3353 raise errors.OpExecError("Device creation failed, reverting...")
3355 feedback_fn("adding instance %s to cluster config" % instance)
3357 self.cfg.AddInstance(iobj)
3358 # Add the new instance to the Ganeti Lock Manager
3359 self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3361 if self.op.wait_for_sync:
3362 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3363 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3364 # make sure the disks are not degraded (still sync-ing is ok)
3366 feedback_fn("* checking mirrors status")
3367 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3372 _RemoveDisks(iobj, self.cfg)
3373 self.cfg.RemoveInstance(iobj.name)
3374 # Remove the new instance from the Ganeti Lock Manager
3375 self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3376 raise errors.OpExecError("There are some degraded disks for"
3379 feedback_fn("creating os for instance %s on node %s" %
3380 (instance, pnode_name))
3382 if iobj.disk_template != constants.DT_DISKLESS:
3383 if self.op.mode == constants.INSTANCE_CREATE:
3384 feedback_fn("* running the instance OS create scripts...")
3385 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3386 raise errors.OpExecError("could not add os for instance %s"
3388 (instance, pnode_name))
3390 elif self.op.mode == constants.INSTANCE_IMPORT:
3391 feedback_fn("* running the instance OS import scripts...")
3392 src_node = self.op.src_node
3393 src_image = self.src_image
3394 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3395 src_node, src_image):
3396 raise errors.OpExecError("Could not import os for instance"
3398 (instance, pnode_name))
3400 # also checked in the prereq part
3401 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3405 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3406 feedback_fn("* starting instance...")
3407 if not rpc.call_instance_start(pnode_name, iobj, None):
3408 raise errors.OpExecError("Could not start instance")
3411 class LUConnectConsole(NoHooksLU):
3412 """Connect to an instance's console.
3414 This is somewhat special in that it returns the command line that
3415 you need to run on the master node in order to connect to the
3419 _OP_REQP = ["instance_name"]
3422 def ExpandNames(self):
3423 self._ExpandAndLockInstance()
3425 def CheckPrereq(self):
3426 """Check prerequisites.
3428 This checks that the instance is in the cluster.
3431 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3432 assert self.instance is not None, \
3433 "Cannot retrieve locked instance %s" % self.op.instance_name
3435 def Exec(self, feedback_fn):
3436 """Connect to the console of an instance
3439 instance = self.instance
3440 node = instance.primary_node
3442 node_insts = rpc.call_instance_list([node])[node]
3443 if node_insts is False:
3444 raise errors.OpExecError("Can't connect to node %s." % node)
3446 if instance.name not in node_insts:
3447 raise errors.OpExecError("Instance %s is not running." % instance.name)
3449 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3451 hyper = hypervisor.GetHypervisor()
3452 console_cmd = hyper.GetShellCommandForConsole(instance)
3455 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3458 class LUReplaceDisks(LogicalUnit):
3459 """Replace the disks of an instance.
3462 HPATH = "mirrors-replace"
3463 HTYPE = constants.HTYPE_INSTANCE
3464 _OP_REQP = ["instance_name", "mode", "disks"]
3466 def _RunAllocator(self):
3467 """Compute a new secondary node using an IAllocator.
3470 ial = IAllocator(self.cfg, self.sstore,
3471 mode=constants.IALLOCATOR_MODE_RELOC,
3472 name=self.op.instance_name,
3473 relocate_from=[self.sec_node])
3475 ial.Run(self.op.iallocator)
3478 raise errors.OpPrereqError("Can't compute nodes using"
3479 " iallocator '%s': %s" % (self.op.iallocator,
3481 if len(ial.nodes) != ial.required_nodes:
3482 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3483 " of nodes (%s), required %s" %
3484 (len(ial.nodes), ial.required_nodes))
3485 self.op.remote_node = ial.nodes[0]
3486 logger.ToStdout("Selected new secondary for the instance: %s" %
3487 self.op.remote_node)
3489 def BuildHooksEnv(self):
3492 This runs on the master, the primary and all the secondaries.
3496 "MODE": self.op.mode,
3497 "NEW_SECONDARY": self.op.remote_node,
3498 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3500 env.update(_BuildInstanceHookEnvByObject(self.instance))
3502 self.sstore.GetMasterNode(),
3503 self.instance.primary_node,
3505 if self.op.remote_node is not None:
3506 nl.append(self.op.remote_node)
3509 def CheckPrereq(self):
3510 """Check prerequisites.
3512 This checks that the instance is in the cluster.
3515 if not hasattr(self.op, "remote_node"):
3516 self.op.remote_node = None
3518 instance = self.cfg.GetInstanceInfo(
3519 self.cfg.ExpandInstanceName(self.op.instance_name))
3520 if instance is None:
3521 raise errors.OpPrereqError("Instance '%s' not known" %
3522 self.op.instance_name)
3523 self.instance = instance
3524 self.op.instance_name = instance.name
3526 if instance.disk_template not in constants.DTS_NET_MIRROR:
3527 raise errors.OpPrereqError("Instance's disk layout is not"
3528 " network mirrored.")
3530 if len(instance.secondary_nodes) != 1:
3531 raise errors.OpPrereqError("The instance has a strange layout,"
3532 " expected one secondary but found %d" %
3533 len(instance.secondary_nodes))
3535 self.sec_node = instance.secondary_nodes[0]
3537 ia_name = getattr(self.op, "iallocator", None)
3538 if ia_name is not None:
3539 if self.op.remote_node is not None:
3540 raise errors.OpPrereqError("Give either the iallocator or the new"
3541 " secondary, not both")
3542 self.op.remote_node = self._RunAllocator()
3544 remote_node = self.op.remote_node
3545 if remote_node is not None:
3546 remote_node = self.cfg.ExpandNodeName(remote_node)
3547 if remote_node is None:
3548 raise errors.OpPrereqError("Node '%s' not known" %
3549 self.op.remote_node)
3550 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3552 self.remote_node_info = None
3553 if remote_node == instance.primary_node:
3554 raise errors.OpPrereqError("The specified node is the primary node of"
3556 elif remote_node == self.sec_node:
3557 if self.op.mode == constants.REPLACE_DISK_SEC:
3558 # this is for DRBD8, where we can't execute the same mode of
3559 # replacement as for drbd7 (no different port allocated)
3560 raise errors.OpPrereqError("Same secondary given, cannot execute"
3562 if instance.disk_template == constants.DT_DRBD8:
3563 if (self.op.mode == constants.REPLACE_DISK_ALL and
3564 remote_node is not None):
3565 # switch to replace secondary mode
3566 self.op.mode = constants.REPLACE_DISK_SEC
3568 if self.op.mode == constants.REPLACE_DISK_ALL:
3569 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3570 " secondary disk replacement, not"
3572 elif self.op.mode == constants.REPLACE_DISK_PRI:
3573 if remote_node is not None:
3574 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3575 " the secondary while doing a primary"
3576 " node disk replacement")
3577 self.tgt_node = instance.primary_node
3578 self.oth_node = instance.secondary_nodes[0]
3579 elif self.op.mode == constants.REPLACE_DISK_SEC:
3580 self.new_node = remote_node # this can be None, in which case
3581 # we don't change the secondary
3582 self.tgt_node = instance.secondary_nodes[0]
3583 self.oth_node = instance.primary_node
3585 raise errors.ProgrammerError("Unhandled disk replace mode")
3587 for name in self.op.disks:
3588 if instance.FindDisk(name) is None:
3589 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3590 (name, instance.name))
3591 self.op.remote_node = remote_node
3593 def _ExecD8DiskOnly(self, feedback_fn):
3594 """Replace a disk on the primary or secondary for dbrd8.
3596 The algorithm for replace is quite complicated:
3597 - for each disk to be replaced:
3598 - create new LVs on the target node with unique names
3599 - detach old LVs from the drbd device
3600 - rename old LVs to name_replaced.<time_t>
3601 - rename new LVs to old LVs
3602 - attach the new LVs (with the old names now) to the drbd device
3603 - wait for sync across all devices
3604 - for each modified disk:
3605 - remove old LVs (which have the name name_replaces.<time_t>)
3607 Failures are not very well handled.
3611 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3612 instance = self.instance
3614 vgname = self.cfg.GetVGName()
3617 tgt_node = self.tgt_node
3618 oth_node = self.oth_node
3620 # Step: check device activation
3621 self.proc.LogStep(1, steps_total, "check device existence")
3622 info("checking volume groups")
3623 my_vg = cfg.GetVGName()
3624 results = rpc.call_vg_list([oth_node, tgt_node])
3626 raise errors.OpExecError("Can't list volume groups on the nodes")
3627 for node in oth_node, tgt_node:
3628 res = results.get(node, False)
3629 if not res or my_vg not in res:
3630 raise errors.OpExecError("Volume group '%s' not found on %s" %
3632 for dev in instance.disks:
3633 if not dev.iv_name in self.op.disks:
3635 for node in tgt_node, oth_node:
3636 info("checking %s on %s" % (dev.iv_name, node))
3637 cfg.SetDiskID(dev, node)
3638 if not rpc.call_blockdev_find(node, dev):
3639 raise errors.OpExecError("Can't find device %s on node %s" %
3640 (dev.iv_name, node))
3642 # Step: check other node consistency
3643 self.proc.LogStep(2, steps_total, "check peer consistency")
3644 for dev in instance.disks:
3645 if not dev.iv_name in self.op.disks:
3647 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3648 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3649 oth_node==instance.primary_node):
3650 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3651 " to replace disks on this node (%s)" %
3652 (oth_node, tgt_node))
3654 # Step: create new storage
3655 self.proc.LogStep(3, steps_total, "allocate new storage")
3656 for dev in instance.disks:
3657 if not dev.iv_name in self.op.disks:
3660 cfg.SetDiskID(dev, tgt_node)
3661 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3662 names = _GenerateUniqueNames(cfg, lv_names)
3663 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3664 logical_id=(vgname, names[0]))
3665 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3666 logical_id=(vgname, names[1]))
3667 new_lvs = [lv_data, lv_meta]
3668 old_lvs = dev.children
3669 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3670 info("creating new local storage on %s for %s" %
3671 (tgt_node, dev.iv_name))
3672 # since we *always* want to create this LV, we use the
3673 # _Create...OnPrimary (which forces the creation), even if we
3674 # are talking about the secondary node
3675 for new_lv in new_lvs:
3676 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3677 _GetInstanceInfoText(instance)):
3678 raise errors.OpExecError("Failed to create new LV named '%s' on"
3680 (new_lv.logical_id[1], tgt_node))
3682 # Step: for each lv, detach+rename*2+attach
3683 self.proc.LogStep(4, steps_total, "change drbd configuration")
3684 for dev, old_lvs, new_lvs in iv_names.itervalues():
3685 info("detaching %s drbd from local storage" % dev.iv_name)
3686 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3687 raise errors.OpExecError("Can't detach drbd from local storage on node"
3688 " %s for device %s" % (tgt_node, dev.iv_name))
3690 #cfg.Update(instance)
3692 # ok, we created the new LVs, so now we know we have the needed
3693 # storage; as such, we proceed on the target node to rename
3694 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3695 # using the assumption that logical_id == physical_id (which in
3696 # turn is the unique_id on that node)
3698 # FIXME(iustin): use a better name for the replaced LVs
3699 temp_suffix = int(time.time())
3700 ren_fn = lambda d, suff: (d.physical_id[0],
3701 d.physical_id[1] + "_replaced-%s" % suff)
3702 # build the rename list based on what LVs exist on the node
3704 for to_ren in old_lvs:
3705 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3706 if find_res is not None: # device exists
3707 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3709 info("renaming the old LVs on the target node")
3710 if not rpc.call_blockdev_rename(tgt_node, rlist):
3711 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3712 # now we rename the new LVs to the old LVs
3713 info("renaming the new LVs on the target node")
3714 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3715 if not rpc.call_blockdev_rename(tgt_node, rlist):
3716 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3718 for old, new in zip(old_lvs, new_lvs):
3719 new.logical_id = old.logical_id
3720 cfg.SetDiskID(new, tgt_node)
3722 for disk in old_lvs:
3723 disk.logical_id = ren_fn(disk, temp_suffix)
3724 cfg.SetDiskID(disk, tgt_node)
3726 # now that the new lvs have the old name, we can add them to the device
3727 info("adding new mirror component on %s" % tgt_node)
3728 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3729 for new_lv in new_lvs:
3730 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3731 warning("Can't rollback device %s", hint="manually cleanup unused"
3733 raise errors.OpExecError("Can't add local storage to drbd")
3735 dev.children = new_lvs
3736 cfg.Update(instance)
3738 # Step: wait for sync
3740 # this can fail as the old devices are degraded and _WaitForSync
3741 # does a combined result over all disks, so we don't check its
3743 self.proc.LogStep(5, steps_total, "sync devices")
3744 _WaitForSync(cfg, instance, self.proc, unlock=True)
3746 # so check manually all the devices
3747 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3748 cfg.SetDiskID(dev, instance.primary_node)
3749 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3751 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3753 # Step: remove old storage
3754 self.proc.LogStep(6, steps_total, "removing old storage")
3755 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3756 info("remove logical volumes for %s" % name)
3758 cfg.SetDiskID(lv, tgt_node)
3759 if not rpc.call_blockdev_remove(tgt_node, lv):
3760 warning("Can't remove old LV", hint="manually remove unused LVs")
3763 def _ExecD8Secondary(self, feedback_fn):
3764 """Replace the secondary node for drbd8.
3766 The algorithm for replace is quite complicated:
3767 - for all disks of the instance:
3768 - create new LVs on the new node with same names
3769 - shutdown the drbd device on the old secondary
3770 - disconnect the drbd network on the primary
3771 - create the drbd device on the new secondary
3772 - network attach the drbd on the primary, using an artifice:
3773 the drbd code for Attach() will connect to the network if it
3774 finds a device which is connected to the good local disks but
3776 - wait for sync across all devices
3777 - remove all disks from the old secondary
3779 Failures are not very well handled.
3783 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3784 instance = self.instance
3786 vgname = self.cfg.GetVGName()
3789 old_node = self.tgt_node
3790 new_node = self.new_node
3791 pri_node = instance.primary_node
3793 # Step: check device activation
3794 self.proc.LogStep(1, steps_total, "check device existence")
3795 info("checking volume groups")
3796 my_vg = cfg.GetVGName()
3797 results = rpc.call_vg_list([pri_node, new_node])
3799 raise errors.OpExecError("Can't list volume groups on the nodes")
3800 for node in pri_node, new_node:
3801 res = results.get(node, False)
3802 if not res or my_vg not in res:
3803 raise errors.OpExecError("Volume group '%s' not found on %s" %
3805 for dev in instance.disks:
3806 if not dev.iv_name in self.op.disks:
3808 info("checking %s on %s" % (dev.iv_name, pri_node))
3809 cfg.SetDiskID(dev, pri_node)
3810 if not rpc.call_blockdev_find(pri_node, dev):
3811 raise errors.OpExecError("Can't find device %s on node %s" %
3812 (dev.iv_name, pri_node))
3814 # Step: check other node consistency
3815 self.proc.LogStep(2, steps_total, "check peer consistency")
3816 for dev in instance.disks:
3817 if not dev.iv_name in self.op.disks:
3819 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3820 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3821 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3822 " unsafe to replace the secondary" %
3825 # Step: create new storage
3826 self.proc.LogStep(3, steps_total, "allocate new storage")
3827 for dev in instance.disks:
3829 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3830 # since we *always* want to create this LV, we use the
3831 # _Create...OnPrimary (which forces the creation), even if we
3832 # are talking about the secondary node
3833 for new_lv in dev.children:
3834 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3835 _GetInstanceInfoText(instance)):
3836 raise errors.OpExecError("Failed to create new LV named '%s' on"
3838 (new_lv.logical_id[1], new_node))
3840 iv_names[dev.iv_name] = (dev, dev.children)
3842 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3843 for dev in instance.disks:
3845 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3846 # create new devices on new_node
3847 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3848 logical_id=(pri_node, new_node,
3850 children=dev.children)
3851 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3853 _GetInstanceInfoText(instance)):
3854 raise errors.OpExecError("Failed to create new DRBD on"
3855 " node '%s'" % new_node)
3857 for dev in instance.disks:
3858 # we have new devices, shutdown the drbd on the old secondary
3859 info("shutting down drbd for %s on old node" % dev.iv_name)
3860 cfg.SetDiskID(dev, old_node)
3861 if not rpc.call_blockdev_shutdown(old_node, dev):
3862 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3863 hint="Please cleanup this device manually as soon as possible")
3865 info("detaching primary drbds from the network (=> standalone)")
3867 for dev in instance.disks:
3868 cfg.SetDiskID(dev, pri_node)
3869 # set the physical (unique in bdev terms) id to None, meaning
3870 # detach from network
3871 dev.physical_id = (None,) * len(dev.physical_id)
3872 # and 'find' the device, which will 'fix' it to match the
3874 if rpc.call_blockdev_find(pri_node, dev):
3877 warning("Failed to detach drbd %s from network, unusual case" %
3881 # no detaches succeeded (very unlikely)
3882 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3884 # if we managed to detach at least one, we update all the disks of
3885 # the instance to point to the new secondary
3886 info("updating instance configuration")
3887 for dev in instance.disks:
3888 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3889 cfg.SetDiskID(dev, pri_node)
3890 cfg.Update(instance)
3892 # and now perform the drbd attach
3893 info("attaching primary drbds to new secondary (standalone => connected)")
3895 for dev in instance.disks:
3896 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3897 # since the attach is smart, it's enough to 'find' the device,
3898 # it will automatically activate the network, if the physical_id
3900 cfg.SetDiskID(dev, pri_node)
3901 if not rpc.call_blockdev_find(pri_node, dev):
3902 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3903 "please do a gnt-instance info to see the status of disks")
3905 # this can fail as the old devices are degraded and _WaitForSync
3906 # does a combined result over all disks, so we don't check its
3908 self.proc.LogStep(5, steps_total, "sync devices")
3909 _WaitForSync(cfg, instance, self.proc, unlock=True)
3911 # so check manually all the devices
3912 for name, (dev, old_lvs) in iv_names.iteritems():
3913 cfg.SetDiskID(dev, pri_node)
3914 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3916 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3918 self.proc.LogStep(6, steps_total, "removing old storage")
3919 for name, (dev, old_lvs) in iv_names.iteritems():
3920 info("remove logical volumes for %s" % name)
3922 cfg.SetDiskID(lv, old_node)
3923 if not rpc.call_blockdev_remove(old_node, lv):
3924 warning("Can't remove LV on old secondary",
3925 hint="Cleanup stale volumes by hand")
3927 def Exec(self, feedback_fn):
3928 """Execute disk replacement.
3930 This dispatches the disk replacement to the appropriate handler.
3933 instance = self.instance
3935 # Activate the instance disks if we're replacing them on a down instance
3936 if instance.status == "down":
3937 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3938 self.proc.ChainOpCode(op)
3940 if instance.disk_template == constants.DT_DRBD8:
3941 if self.op.remote_node is None:
3942 fn = self._ExecD8DiskOnly
3944 fn = self._ExecD8Secondary
3946 raise errors.ProgrammerError("Unhandled disk replacement case")
3948 ret = fn(feedback_fn)
3950 # Deactivate the instance disks if we're replacing them on a down instance
3951 if instance.status == "down":
3952 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3953 self.proc.ChainOpCode(op)
3958 class LUGrowDisk(LogicalUnit):
3959 """Grow a disk of an instance.
3963 HTYPE = constants.HTYPE_INSTANCE
3964 _OP_REQP = ["instance_name", "disk", "amount"]
3966 def BuildHooksEnv(self):
3969 This runs on the master, the primary and all the secondaries.
3973 "DISK": self.op.disk,
3974 "AMOUNT": self.op.amount,
3976 env.update(_BuildInstanceHookEnvByObject(self.instance))
3978 self.sstore.GetMasterNode(),
3979 self.instance.primary_node,
3983 def CheckPrereq(self):
3984 """Check prerequisites.
3986 This checks that the instance is in the cluster.
3989 instance = self.cfg.GetInstanceInfo(
3990 self.cfg.ExpandInstanceName(self.op.instance_name))
3991 if instance is None:
3992 raise errors.OpPrereqError("Instance '%s' not known" %
3993 self.op.instance_name)
3994 self.instance = instance
3995 self.op.instance_name = instance.name
3997 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3998 raise errors.OpPrereqError("Instance's disk layout does not support"
4001 if instance.FindDisk(self.op.disk) is None:
4002 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4003 (self.op.disk, instance.name))
4005 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4006 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4007 for node in nodenames:
4008 info = nodeinfo.get(node, None)
4010 raise errors.OpPrereqError("Cannot get current information"
4011 " from node '%s'" % node)
4012 vg_free = info.get('vg_free', None)
4013 if not isinstance(vg_free, int):
4014 raise errors.OpPrereqError("Can't compute free disk space on"
4016 if self.op.amount > info['vg_free']:
4017 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4018 " %d MiB available, %d MiB required" %
4019 (node, info['vg_free'], self.op.amount))
4021 def Exec(self, feedback_fn):
4022 """Execute disk grow.
4025 instance = self.instance
4026 disk = instance.FindDisk(self.op.disk)
4027 for node in (instance.secondary_nodes + (instance.primary_node,)):
4028 self.cfg.SetDiskID(disk, node)
4029 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4030 if not result or not isinstance(result, tuple) or len(result) != 2:
4031 raise errors.OpExecError("grow request failed to node %s" % node)
4033 raise errors.OpExecError("grow request failed to node %s: %s" %
4035 disk.RecordGrow(self.op.amount)
4036 self.cfg.Update(instance)
4040 class LUQueryInstanceData(NoHooksLU):
4041 """Query runtime instance data.
4044 _OP_REQP = ["instances"]
4046 def CheckPrereq(self):
4047 """Check prerequisites.
4049 This only checks the optional instance list against the existing names.
4052 if not isinstance(self.op.instances, list):
4053 raise errors.OpPrereqError("Invalid argument type 'instances'")
4054 if self.op.instances:
4055 self.wanted_instances = []
4056 names = self.op.instances
4058 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4059 if instance is None:
4060 raise errors.OpPrereqError("No such instance name '%s'" % name)
4061 self.wanted_instances.append(instance)
4063 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4064 in self.cfg.GetInstanceList()]
4068 def _ComputeDiskStatus(self, instance, snode, dev):
4069 """Compute block device status.
4072 self.cfg.SetDiskID(dev, instance.primary_node)
4073 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4074 if dev.dev_type in constants.LDS_DRBD:
4075 # we change the snode then (otherwise we use the one passed in)
4076 if dev.logical_id[0] == instance.primary_node:
4077 snode = dev.logical_id[1]
4079 snode = dev.logical_id[0]
4082 self.cfg.SetDiskID(dev, snode)
4083 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4088 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4089 for child in dev.children]
4094 "iv_name": dev.iv_name,
4095 "dev_type": dev.dev_type,
4096 "logical_id": dev.logical_id,
4097 "physical_id": dev.physical_id,
4098 "pstatus": dev_pstatus,
4099 "sstatus": dev_sstatus,
4100 "children": dev_children,
4105 def Exec(self, feedback_fn):
4106 """Gather and return data"""
4108 for instance in self.wanted_instances:
4109 remote_info = rpc.call_instance_info(instance.primary_node,
4111 if remote_info and "state" in remote_info:
4114 remote_state = "down"
4115 if instance.status == "down":
4116 config_state = "down"
4120 disks = [self._ComputeDiskStatus(instance, None, device)
4121 for device in instance.disks]
4124 "name": instance.name,
4125 "config_state": config_state,
4126 "run_state": remote_state,
4127 "pnode": instance.primary_node,
4128 "snodes": instance.secondary_nodes,
4130 "memory": instance.memory,
4131 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4133 "vcpus": instance.vcpus,
4136 htkind = self.sstore.GetHypervisorType()
4137 if htkind == constants.HT_XEN_PVM30:
4138 idict["kernel_path"] = instance.kernel_path
4139 idict["initrd_path"] = instance.initrd_path
4141 if htkind == constants.HT_XEN_HVM31:
4142 idict["hvm_boot_order"] = instance.hvm_boot_order
4143 idict["hvm_acpi"] = instance.hvm_acpi
4144 idict["hvm_pae"] = instance.hvm_pae
4145 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4146 idict["hvm_nic_type"] = instance.hvm_nic_type
4147 idict["hvm_disk_type"] = instance.hvm_disk_type
4149 if htkind in constants.HTS_REQ_PORT:
4150 idict["vnc_bind_address"] = instance.vnc_bind_address
4151 idict["network_port"] = instance.network_port
4153 result[instance.name] = idict
4158 class LUSetInstanceParams(LogicalUnit):
4159 """Modifies an instances's parameters.
4162 HPATH = "instance-modify"
4163 HTYPE = constants.HTYPE_INSTANCE
4164 _OP_REQP = ["instance_name"]
4167 def ExpandNames(self):
4168 self._ExpandAndLockInstance()
4170 def BuildHooksEnv(self):
4173 This runs on the master, primary and secondaries.
4178 args['memory'] = self.mem
4180 args['vcpus'] = self.vcpus
4181 if self.do_ip or self.do_bridge or self.mac:
4185 ip = self.instance.nics[0].ip
4187 bridge = self.bridge
4189 bridge = self.instance.nics[0].bridge
4193 mac = self.instance.nics[0].mac
4194 args['nics'] = [(ip, bridge, mac)]
4195 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4196 nl = [self.sstore.GetMasterNode(),
4197 self.instance.primary_node] + list(self.instance.secondary_nodes)
4200 def CheckPrereq(self):
4201 """Check prerequisites.
4203 This only checks the instance list against the existing names.
4206 # FIXME: all the parameters could be checked before, in ExpandNames, or in
4207 # a separate CheckArguments function, if we implement one, so the operation
4208 # can be aborted without waiting for any lock, should it have an error...
4209 self.mem = getattr(self.op, "mem", None)
4210 self.vcpus = getattr(self.op, "vcpus", None)
4211 self.ip = getattr(self.op, "ip", None)
4212 self.mac = getattr(self.op, "mac", None)
4213 self.bridge = getattr(self.op, "bridge", None)
4214 self.kernel_path = getattr(self.op, "kernel_path", None)
4215 self.initrd_path = getattr(self.op, "initrd_path", None)
4216 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4217 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4218 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4219 self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4220 self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4221 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4222 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4223 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4224 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4225 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4226 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4227 if all_parms.count(None) == len(all_parms):
4228 raise errors.OpPrereqError("No changes submitted")
4229 if self.mem is not None:
4231 self.mem = int(self.mem)
4232 except ValueError, err:
4233 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4234 if self.vcpus is not None:
4236 self.vcpus = int(self.vcpus)
4237 except ValueError, err:
4238 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4239 if self.ip is not None:
4241 if self.ip.lower() == "none":
4244 if not utils.IsValidIP(self.ip):
4245 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4248 self.do_bridge = (self.bridge is not None)
4249 if self.mac is not None:
4250 if self.cfg.IsMacInUse(self.mac):
4251 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4253 if not utils.IsValidMac(self.mac):
4254 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4256 if self.kernel_path is not None:
4257 self.do_kernel_path = True
4258 if self.kernel_path == constants.VALUE_NONE:
4259 raise errors.OpPrereqError("Can't set instance to no kernel")
4261 if self.kernel_path != constants.VALUE_DEFAULT:
4262 if not os.path.isabs(self.kernel_path):
4263 raise errors.OpPrereqError("The kernel path must be an absolute"
4266 self.do_kernel_path = False
4268 if self.initrd_path is not None:
4269 self.do_initrd_path = True
4270 if self.initrd_path not in (constants.VALUE_NONE,
4271 constants.VALUE_DEFAULT):
4272 if not os.path.isabs(self.initrd_path):
4273 raise errors.OpPrereqError("The initrd path must be an absolute"
4276 self.do_initrd_path = False
4278 # boot order verification
4279 if self.hvm_boot_order is not None:
4280 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4281 if len(self.hvm_boot_order.strip("acdn")) != 0:
4282 raise errors.OpPrereqError("invalid boot order specified,"
4283 " must be one or more of [acdn]"
4286 # hvm_cdrom_image_path verification
4287 if self.op.hvm_cdrom_image_path is not None:
4288 if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4289 self.op.hvm_cdrom_image_path.lower() == "none"):
4290 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4291 " be an absolute path or None, not %s" %
4292 self.op.hvm_cdrom_image_path)
4293 if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4294 self.op.hvm_cdrom_image_path.lower() == "none"):
4295 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4296 " regular file or a symlink pointing to"
4297 " an existing regular file, not %s" %
4298 self.op.hvm_cdrom_image_path)
4300 # vnc_bind_address verification
4301 if self.op.vnc_bind_address is not None:
4302 if not utils.IsValidIP(self.op.vnc_bind_address):
4303 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4304 " like a valid IP address" %
4305 self.op.vnc_bind_address)
4307 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4308 assert self.instance is not None, \
4309 "Cannot retrieve locked instance %s" % self.op.instance_name
4312 def Exec(self, feedback_fn):
4313 """Modifies an instance.
4315 All parameters take effect only at the next restart of the instance.
4318 instance = self.instance
4320 instance.memory = self.mem
4321 result.append(("mem", self.mem))
4323 instance.vcpus = self.vcpus
4324 result.append(("vcpus", self.vcpus))
4326 instance.nics[0].ip = self.ip
4327 result.append(("ip", self.ip))
4329 instance.nics[0].bridge = self.bridge
4330 result.append(("bridge", self.bridge))
4332 instance.nics[0].mac = self.mac
4333 result.append(("mac", self.mac))
4334 if self.do_kernel_path:
4335 instance.kernel_path = self.kernel_path
4336 result.append(("kernel_path", self.kernel_path))
4337 if self.do_initrd_path:
4338 instance.initrd_path = self.initrd_path
4339 result.append(("initrd_path", self.initrd_path))
4340 if self.hvm_boot_order:
4341 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4342 instance.hvm_boot_order = None
4344 instance.hvm_boot_order = self.hvm_boot_order
4345 result.append(("hvm_boot_order", self.hvm_boot_order))
4346 if self.hvm_acpi is not None:
4347 instance.hvm_acpi = self.hvm_acpi
4348 result.append(("hvm_acpi", self.hvm_acpi))
4349 if self.hvm_pae is not None:
4350 instance.hvm_pae = self.hvm_pae
4351 result.append(("hvm_pae", self.hvm_pae))
4352 if self.hvm_nic_type is not None:
4353 instance.hvm_nic_type = self.hvm_nic_type
4354 result.append(("hvm_nic_type", self.hvm_nic_type))
4355 if self.hvm_disk_type is not None:
4356 instance.hvm_disk_type = self.hvm_disk_type
4357 result.append(("hvm_disk_type", self.hvm_disk_type))
4358 if self.hvm_cdrom_image_path:
4359 if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4360 instance.hvm_cdrom_image_path = None
4362 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4363 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4364 if self.vnc_bind_address:
4365 instance.vnc_bind_address = self.vnc_bind_address
4366 result.append(("vnc_bind_address", self.vnc_bind_address))
4368 self.cfg.Update(instance)
4373 class LUQueryExports(NoHooksLU):
4374 """Query the exports list
4379 def CheckPrereq(self):
4380 """Check that the nodelist contains only existing nodes.
4383 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4385 def Exec(self, feedback_fn):
4386 """Compute the list of all the exported system images.
4389 a dictionary with the structure node->(export-list)
4390 where export-list is a list of the instances exported on
4394 return rpc.call_export_list(self.nodes)
4397 class LUExportInstance(LogicalUnit):
4398 """Export an instance to an image in the cluster.
4401 HPATH = "instance-export"
4402 HTYPE = constants.HTYPE_INSTANCE
4403 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4405 def BuildHooksEnv(self):
4408 This will run on the master, primary node and target node.
4412 "EXPORT_NODE": self.op.target_node,
4413 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4415 env.update(_BuildInstanceHookEnvByObject(self.instance))
4416 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4417 self.op.target_node]
4420 def CheckPrereq(self):
4421 """Check prerequisites.
4423 This checks that the instance and node names are valid.
4426 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4427 self.instance = self.cfg.GetInstanceInfo(instance_name)
4428 if self.instance is None:
4429 raise errors.OpPrereqError("Instance '%s' not found" %
4430 self.op.instance_name)
4433 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4434 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4436 if self.dst_node is None:
4437 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4438 self.op.target_node)
4439 self.op.target_node = self.dst_node.name
4441 # instance disk type verification
4442 for disk in self.instance.disks:
4443 if disk.dev_type == constants.LD_FILE:
4444 raise errors.OpPrereqError("Export not supported for instances with"
4445 " file-based disks")
4447 def Exec(self, feedback_fn):
4448 """Export an instance to an image in the cluster.
4451 instance = self.instance
4452 dst_node = self.dst_node
4453 src_node = instance.primary_node
4454 if self.op.shutdown:
4455 # shutdown the instance, but not the disks
4456 if not rpc.call_instance_shutdown(src_node, instance):
4457 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4458 (instance.name, src_node))
4460 vgname = self.cfg.GetVGName()
4465 for disk in instance.disks:
4466 if disk.iv_name == "sda":
4467 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4468 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4470 if not new_dev_name:
4471 logger.Error("could not snapshot block device %s on node %s" %
4472 (disk.logical_id[1], src_node))
4474 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4475 logical_id=(vgname, new_dev_name),
4476 physical_id=(vgname, new_dev_name),
4477 iv_name=disk.iv_name)
4478 snap_disks.append(new_dev)
4481 if self.op.shutdown and instance.status == "up":
4482 if not rpc.call_instance_start(src_node, instance, None):
4483 _ShutdownInstanceDisks(instance, self.cfg)
4484 raise errors.OpExecError("Could not start instance")
4486 # TODO: check for size
4488 for dev in snap_disks:
4489 if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4490 logger.Error("could not export block device %s from node %s to node %s"
4491 % (dev.logical_id[1], src_node, dst_node.name))
4492 if not rpc.call_blockdev_remove(src_node, dev):
4493 logger.Error("could not remove snapshot block device %s from node %s" %
4494 (dev.logical_id[1], src_node))
4496 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4497 logger.Error("could not finalize export for instance %s on node %s" %
4498 (instance.name, dst_node.name))
4500 nodelist = self.cfg.GetNodeList()
4501 nodelist.remove(dst_node.name)
4503 # on one-node clusters nodelist will be empty after the removal
4504 # if we proceed the backup would be removed because OpQueryExports
4505 # substitutes an empty list with the full cluster node list.
4507 op = opcodes.OpQueryExports(nodes=nodelist)
4508 exportlist = self.proc.ChainOpCode(op)
4509 for node in exportlist:
4510 if instance.name in exportlist[node]:
4511 if not rpc.call_export_remove(node, instance.name):
4512 logger.Error("could not remove older export for instance %s"
4513 " on node %s" % (instance.name, node))
4516 class LURemoveExport(NoHooksLU):
4517 """Remove exports related to the named instance.
4520 _OP_REQP = ["instance_name"]
4522 def CheckPrereq(self):
4523 """Check prerequisites.
4527 def Exec(self, feedback_fn):
4528 """Remove any export.
4531 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4532 # If the instance was not found we'll try with the name that was passed in.
4533 # This will only work if it was an FQDN, though.
4535 if not instance_name:
4537 instance_name = self.op.instance_name
4539 op = opcodes.OpQueryExports(nodes=[])
4540 exportlist = self.proc.ChainOpCode(op)
4542 for node in exportlist:
4543 if instance_name in exportlist[node]:
4545 if not rpc.call_export_remove(node, instance_name):
4546 logger.Error("could not remove export for instance %s"
4547 " on node %s" % (instance_name, node))
4549 if fqdn_warn and not found:
4550 feedback_fn("Export not found. If trying to remove an export belonging"
4551 " to a deleted instance please use its Fully Qualified"
4555 class TagsLU(NoHooksLU):
4558 This is an abstract class which is the parent of all the other tags LUs.
4561 def CheckPrereq(self):
4562 """Check prerequisites.
4565 if self.op.kind == constants.TAG_CLUSTER:
4566 self.target = self.cfg.GetClusterInfo()
4567 elif self.op.kind == constants.TAG_NODE:
4568 name = self.cfg.ExpandNodeName(self.op.name)
4570 raise errors.OpPrereqError("Invalid node name (%s)" %
4573 self.target = self.cfg.GetNodeInfo(name)
4574 elif self.op.kind == constants.TAG_INSTANCE:
4575 name = self.cfg.ExpandInstanceName(self.op.name)
4577 raise errors.OpPrereqError("Invalid instance name (%s)" %
4580 self.target = self.cfg.GetInstanceInfo(name)
4582 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4586 class LUGetTags(TagsLU):
4587 """Returns the tags of a given object.
4590 _OP_REQP = ["kind", "name"]
4592 def Exec(self, feedback_fn):
4593 """Returns the tag list.
4596 return list(self.target.GetTags())
4599 class LUSearchTags(NoHooksLU):
4600 """Searches the tags for a given pattern.
4603 _OP_REQP = ["pattern"]
4605 def CheckPrereq(self):
4606 """Check prerequisites.
4608 This checks the pattern passed for validity by compiling it.
4612 self.re = re.compile(self.op.pattern)
4613 except re.error, err:
4614 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4615 (self.op.pattern, err))
4617 def Exec(self, feedback_fn):
4618 """Returns the tag list.
4622 tgts = [("/cluster", cfg.GetClusterInfo())]
4623 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4624 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4625 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4626 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4628 for path, target in tgts:
4629 for tag in target.GetTags():
4630 if self.re.search(tag):
4631 results.append((path, tag))
4635 class LUAddTags(TagsLU):
4636 """Sets a tag on a given object.
4639 _OP_REQP = ["kind", "name", "tags"]
4641 def CheckPrereq(self):
4642 """Check prerequisites.
4644 This checks the type and length of the tag name and value.
4647 TagsLU.CheckPrereq(self)
4648 for tag in self.op.tags:
4649 objects.TaggableObject.ValidateTag(tag)
4651 def Exec(self, feedback_fn):
4656 for tag in self.op.tags:
4657 self.target.AddTag(tag)
4658 except errors.TagError, err:
4659 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4661 self.cfg.Update(self.target)
4662 except errors.ConfigurationError:
4663 raise errors.OpRetryError("There has been a modification to the"
4664 " config file and the operation has been"
4665 " aborted. Please retry.")
4668 class LUDelTags(TagsLU):
4669 """Delete a list of tags from a given object.
4672 _OP_REQP = ["kind", "name", "tags"]
4674 def CheckPrereq(self):
4675 """Check prerequisites.
4677 This checks that we have the given tag.
4680 TagsLU.CheckPrereq(self)
4681 for tag in self.op.tags:
4682 objects.TaggableObject.ValidateTag(tag)
4683 del_tags = frozenset(self.op.tags)
4684 cur_tags = self.target.GetTags()
4685 if not del_tags <= cur_tags:
4686 diff_tags = del_tags - cur_tags
4687 diff_names = ["'%s'" % tag for tag in diff_tags]
4689 raise errors.OpPrereqError("Tag(s) %s not found" %
4690 (",".join(diff_names)))
4692 def Exec(self, feedback_fn):
4693 """Remove the tag from the object.
4696 for tag in self.op.tags:
4697 self.target.RemoveTag(tag)
4699 self.cfg.Update(self.target)
4700 except errors.ConfigurationError:
4701 raise errors.OpRetryError("There has been a modification to the"
4702 " config file and the operation has been"
4703 " aborted. Please retry.")
4706 class LUTestDelay(NoHooksLU):
4707 """Sleep for a specified amount of time.
4709 This LU sleeps on the master and/or nodes for a specified amount of
4713 _OP_REQP = ["duration", "on_master", "on_nodes"]
4716 def ExpandNames(self):
4717 """Expand names and set required locks.
4719 This expands the node list, if any.
4722 self.needed_locks = {}
4723 if self.op.on_nodes:
4724 # _GetWantedNodes can be used here, but is not always appropriate to use
4725 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4727 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4728 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4730 def CheckPrereq(self):
4731 """Check prerequisites.
4735 def Exec(self, feedback_fn):
4736 """Do the actual sleep.
4739 if self.op.on_master:
4740 if not utils.TestDelay(self.op.duration):
4741 raise errors.OpExecError("Error during master delay test")
4742 if self.op.on_nodes:
4743 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4745 raise errors.OpExecError("Complete failure from rpc call")
4746 for node, node_result in result.items():
4748 raise errors.OpExecError("Failure during rpc call to node %s,"
4749 " result: %s" % (node, node_result))
4752 class IAllocator(object):
4753 """IAllocator framework.
4755 An IAllocator instance has three sets of attributes:
4756 - cfg/sstore that are needed to query the cluster
4757 - input data (all members of the _KEYS class attribute are required)
4758 - four buffer attributes (in|out_data|text), that represent the
4759 input (to the external script) in text and data structure format,
4760 and the output from it, again in two formats
4761 - the result variables from the script (success, info, nodes) for
4766 "mem_size", "disks", "disk_template",
4767 "os", "tags", "nics", "vcpus",
4773 def __init__(self, cfg, sstore, mode, name, **kwargs):
4775 self.sstore = sstore
4776 # init buffer variables
4777 self.in_text = self.out_text = self.in_data = self.out_data = None
4778 # init all input fields so that pylint is happy
4781 self.mem_size = self.disks = self.disk_template = None
4782 self.os = self.tags = self.nics = self.vcpus = None
4783 self.relocate_from = None
4785 self.required_nodes = None
4786 # init result fields
4787 self.success = self.info = self.nodes = None
4788 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4789 keyset = self._ALLO_KEYS
4790 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4791 keyset = self._RELO_KEYS
4793 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4794 " IAllocator" % self.mode)
4796 if key not in keyset:
4797 raise errors.ProgrammerError("Invalid input parameter '%s' to"
4798 " IAllocator" % key)
4799 setattr(self, key, kwargs[key])
4801 if key not in kwargs:
4802 raise errors.ProgrammerError("Missing input parameter '%s' to"
4803 " IAllocator" % key)
4804 self._BuildInputData()
4806 def _ComputeClusterData(self):
4807 """Compute the generic allocator input data.
4809 This is the data that is independent of the actual operation.
4816 "cluster_name": self.sstore.GetClusterName(),
4817 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4818 "hypervisor_type": self.sstore.GetHypervisorType(),
4819 # we don't have job IDs
4822 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4826 node_list = cfg.GetNodeList()
4827 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4828 for nname in node_list:
4829 ninfo = cfg.GetNodeInfo(nname)
4830 if nname not in node_data or not isinstance(node_data[nname], dict):
4831 raise errors.OpExecError("Can't get data for node %s" % nname)
4832 remote_info = node_data[nname]
4833 for attr in ['memory_total', 'memory_free', 'memory_dom0',
4834 'vg_size', 'vg_free', 'cpu_total']:
4835 if attr not in remote_info:
4836 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4839 remote_info[attr] = int(remote_info[attr])
4840 except ValueError, err:
4841 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4842 " %s" % (nname, attr, str(err)))
4843 # compute memory used by primary instances
4844 i_p_mem = i_p_up_mem = 0
4845 for iinfo in i_list:
4846 if iinfo.primary_node == nname:
4847 i_p_mem += iinfo.memory
4848 if iinfo.status == "up":
4849 i_p_up_mem += iinfo.memory
4851 # compute memory used by instances
4853 "tags": list(ninfo.GetTags()),
4854 "total_memory": remote_info['memory_total'],
4855 "reserved_memory": remote_info['memory_dom0'],
4856 "free_memory": remote_info['memory_free'],
4857 "i_pri_memory": i_p_mem,
4858 "i_pri_up_memory": i_p_up_mem,
4859 "total_disk": remote_info['vg_size'],
4860 "free_disk": remote_info['vg_free'],
4861 "primary_ip": ninfo.primary_ip,
4862 "secondary_ip": ninfo.secondary_ip,
4863 "total_cpus": remote_info['cpu_total'],
4865 node_results[nname] = pnr
4866 data["nodes"] = node_results
4870 for iinfo in i_list:
4871 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4872 for n in iinfo.nics]
4874 "tags": list(iinfo.GetTags()),
4875 "should_run": iinfo.status == "up",
4876 "vcpus": iinfo.vcpus,
4877 "memory": iinfo.memory,
4879 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4881 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4882 "disk_template": iinfo.disk_template,
4884 instance_data[iinfo.name] = pir
4886 data["instances"] = instance_data
4890 def _AddNewInstance(self):
4891 """Add new instance data to allocator structure.
4893 This in combination with _AllocatorGetClusterData will create the
4894 correct structure needed as input for the allocator.
4896 The checks for the completeness of the opcode must have already been
4901 if len(self.disks) != 2:
4902 raise errors.OpExecError("Only two-disk configurations supported")
4904 disk_space = _ComputeDiskSize(self.disk_template,
4905 self.disks[0]["size"], self.disks[1]["size"])
4907 if self.disk_template in constants.DTS_NET_MIRROR:
4908 self.required_nodes = 2
4910 self.required_nodes = 1
4914 "disk_template": self.disk_template,
4917 "vcpus": self.vcpus,
4918 "memory": self.mem_size,
4919 "disks": self.disks,
4920 "disk_space_total": disk_space,
4922 "required_nodes": self.required_nodes,
4924 data["request"] = request
4926 def _AddRelocateInstance(self):
4927 """Add relocate instance data to allocator structure.
4929 This in combination with _IAllocatorGetClusterData will create the
4930 correct structure needed as input for the allocator.
4932 The checks for the completeness of the opcode must have already been
4936 instance = self.cfg.GetInstanceInfo(self.name)
4937 if instance is None:
4938 raise errors.ProgrammerError("Unknown instance '%s' passed to"
4939 " IAllocator" % self.name)
4941 if instance.disk_template not in constants.DTS_NET_MIRROR:
4942 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4944 if len(instance.secondary_nodes) != 1:
4945 raise errors.OpPrereqError("Instance has not exactly one secondary node")
4947 self.required_nodes = 1
4949 disk_space = _ComputeDiskSize(instance.disk_template,
4950 instance.disks[0].size,
4951 instance.disks[1].size)
4956 "disk_space_total": disk_space,
4957 "required_nodes": self.required_nodes,
4958 "relocate_from": self.relocate_from,
4960 self.in_data["request"] = request
4962 def _BuildInputData(self):
4963 """Build input data structures.
4966 self._ComputeClusterData()
4968 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4969 self._AddNewInstance()
4971 self._AddRelocateInstance()
4973 self.in_text = serializer.Dump(self.in_data)
4975 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4976 """Run an instance allocator and return the results.
4981 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4983 if not isinstance(result, tuple) or len(result) != 4:
4984 raise errors.OpExecError("Invalid result from master iallocator runner")
4986 rcode, stdout, stderr, fail = result
4988 if rcode == constants.IARUN_NOTFOUND:
4989 raise errors.OpExecError("Can't find allocator '%s'" % name)
4990 elif rcode == constants.IARUN_FAILURE:
4991 raise errors.OpExecError("Instance allocator call failed: %s,"
4992 " output: %s" % (fail, stdout+stderr))
4993 self.out_text = stdout
4995 self._ValidateResult()
4997 def _ValidateResult(self):
4998 """Process the allocator results.
5000 This will process and if successful save the result in
5001 self.out_data and the other parameters.
5005 rdict = serializer.Load(self.out_text)
5006 except Exception, err:
5007 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5009 if not isinstance(rdict, dict):
5010 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5012 for key in "success", "info", "nodes":
5013 if key not in rdict:
5014 raise errors.OpExecError("Can't parse iallocator results:"
5015 " missing key '%s'" % key)
5016 setattr(self, key, rdict[key])
5018 if not isinstance(rdict["nodes"], list):
5019 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5021 self.out_data = rdict
5024 class LUTestAllocator(NoHooksLU):
5025 """Run allocator tests.
5027 This LU runs the allocator tests
5030 _OP_REQP = ["direction", "mode", "name"]
5032 def CheckPrereq(self):
5033 """Check prerequisites.
5035 This checks the opcode parameters depending on the director and mode test.
5038 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5039 for attr in ["name", "mem_size", "disks", "disk_template",
5040 "os", "tags", "nics", "vcpus"]:
5041 if not hasattr(self.op, attr):
5042 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5044 iname = self.cfg.ExpandInstanceName(self.op.name)
5045 if iname is not None:
5046 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5048 if not isinstance(self.op.nics, list):
5049 raise errors.OpPrereqError("Invalid parameter 'nics'")
5050 for row in self.op.nics:
5051 if (not isinstance(row, dict) or
5054 "bridge" not in row):
5055 raise errors.OpPrereqError("Invalid contents of the"
5056 " 'nics' parameter")
5057 if not isinstance(self.op.disks, list):
5058 raise errors.OpPrereqError("Invalid parameter 'disks'")
5059 if len(self.op.disks) != 2:
5060 raise errors.OpPrereqError("Only two-disk configurations supported")
5061 for row in self.op.disks:
5062 if (not isinstance(row, dict) or
5063 "size" not in row or
5064 not isinstance(row["size"], int) or
5065 "mode" not in row or
5066 row["mode"] not in ['r', 'w']):
5067 raise errors.OpPrereqError("Invalid contents of the"
5068 " 'disks' parameter")
5069 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5070 if not hasattr(self.op, "name"):
5071 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5072 fname = self.cfg.ExpandInstanceName(self.op.name)
5074 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5076 self.op.name = fname
5077 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5079 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5082 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5083 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5084 raise errors.OpPrereqError("Missing allocator name")
5085 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5086 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5089 def Exec(self, feedback_fn):
5090 """Run the allocator test.
5093 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5094 ial = IAllocator(self.cfg, self.sstore,
5097 mem_size=self.op.mem_size,
5098 disks=self.op.disks,
5099 disk_template=self.op.disk_template,
5103 vcpus=self.op.vcpus,
5106 ial = IAllocator(self.cfg, self.sstore,
5109 relocate_from=list(self.relocate_from),
5112 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5113 result = ial.in_text
5115 ial.Run(self.op.allocator, validate=False)
5116 result = ial.out_text