4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement ExpandNames
52 - implement CheckPrereq
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements:
57 REQ_MASTER: the LU needs to run on the master node
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 self.LogWarning = processor.LogWarning
92 self.LogInfo = processor.LogInfo
94 for attr_name in self._OP_REQP:
95 attr_val = getattr(op, attr_name, None)
97 raise errors.OpPrereqError("Required parameter '%s' missing" %
100 if not self.cfg.IsCluster():
101 raise errors.OpPrereqError("Cluster not initialized yet,"
102 " use 'gnt-cluster init' first.")
104 master = self.cfg.GetMasterNode()
105 if master != utils.HostInfo().name:
106 raise errors.OpPrereqError("Commands must be run on the master"
110 """Returns the SshRunner object
114 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
117 ssh = property(fget=__GetSSH)
119 def ExpandNames(self):
120 """Expand names for this LU.
122 This method is called before starting to execute the opcode, and it should
123 update all the parameters of the opcode to their canonical form (e.g. a
124 short node name must be fully expanded after this method has successfully
125 completed). This way locking, hooks, logging, ecc. can work correctly.
127 LUs which implement this method must also populate the self.needed_locks
128 member, as a dict with lock levels as keys, and a list of needed lock names
131 - use an empty dict if you don't need any lock
132 - if you don't need any lock at a particular level omit that level
133 - don't put anything for the BGL level
134 - if you want all locks at a level use locking.ALL_SET as a value
136 If you need to share locks (rather than acquire them exclusively) at one
137 level you can modify self.share_locks, setting a true value (usually 1) for
138 that level. By default locks are not shared.
142 # Acquire all nodes and one instance
143 self.needed_locks = {
144 locking.LEVEL_NODE: locking.ALL_SET,
145 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
147 # Acquire just two nodes
148 self.needed_locks = {
149 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
152 self.needed_locks = {} # No, you can't leave it to the default value None
155 # The implementation of this method is mandatory only if the new LU is
156 # concurrent, so that old LUs don't need to be changed all at the same
159 self.needed_locks = {} # Exclusive LUs don't need locks.
161 raise NotImplementedError
163 def DeclareLocks(self, level):
164 """Declare LU locking needs for a level
166 While most LUs can just declare their locking needs at ExpandNames time,
167 sometimes there's the need to calculate some locks after having acquired
168 the ones before. This function is called just before acquiring locks at a
169 particular level, but after acquiring the ones at lower levels, and permits
170 such calculations. It can be used to modify self.needed_locks, and by
171 default it does nothing.
173 This function is only called if you have something already set in
174 self.needed_locks for the level.
176 @param level: Locking level which is going to be locked
177 @type level: member of ganeti.locking.LEVELS
181 def CheckPrereq(self):
182 """Check prerequisites for this LU.
184 This method should check that the prerequisites for the execution
185 of this LU are fulfilled. It can do internode communication, but
186 it should be idempotent - no cluster or system changes are
189 The method should raise errors.OpPrereqError in case something is
190 not fulfilled. Its return value is ignored.
192 This method should also update all the parameters of the opcode to
193 their canonical form if it hasn't been done by ExpandNames before.
196 raise NotImplementedError
198 def Exec(self, feedback_fn):
201 This method should implement the actual work. It should raise
202 errors.OpExecError for failures that are somewhat dealt with in
206 raise NotImplementedError
208 def BuildHooksEnv(self):
209 """Build hooks environment for this LU.
211 This method should return a three-node tuple consisting of: a dict
212 containing the environment that will be used for running the
213 specific hook for this LU, a list of node names on which the hook
214 should run before the execution, and a list of node names on which
215 the hook should run after the execution.
217 The keys of the dict must not have 'GANETI_' prefixed as this will
218 be handled in the hooks runner. Also note additional keys will be
219 added by the hooks runner. If the LU doesn't define any
220 environment, an empty dict (and not None) should be returned.
222 No nodes should be returned as an empty list (and not None).
224 Note that if the HPATH for a LU class is None, this function will
228 raise NotImplementedError
230 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
231 """Notify the LU about the results of its hooks.
233 This method is called every time a hooks phase is executed, and notifies
234 the Logical Unit about the hooks' result. The LU can then use it to alter
235 its result based on the hooks. By default the method does nothing and the
236 previous result is passed back unchanged but any LU can define it if it
237 wants to use the local cluster hook-scripts somehow.
239 @param phase: one of L{constants.HOOKS_PHASE_POST} or
240 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
241 @param hook_results: the results of the multi-node hooks rpc call
242 @param feedback_fn: function used send feedback back to the caller
243 @param lu_result: the previous Exec result this LU had, or None
245 @return: the new Exec result, based on the previous result
251 def _ExpandAndLockInstance(self):
252 """Helper function to expand and lock an instance.
254 Many LUs that work on an instance take its name in self.op.instance_name
255 and need to expand it and then declare the expanded name for locking. This
256 function does it, and then updates self.op.instance_name to the expanded
257 name. It also initializes needed_locks as a dict, if this hasn't been done
261 if self.needed_locks is None:
262 self.needed_locks = {}
264 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
265 "_ExpandAndLockInstance called with instance-level locks set"
266 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
267 if expanded_name is None:
268 raise errors.OpPrereqError("Instance '%s' not known" %
269 self.op.instance_name)
270 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
271 self.op.instance_name = expanded_name
273 def _LockInstancesNodes(self, primary_only=False):
274 """Helper function to declare instances' nodes for locking.
276 This function should be called after locking one or more instances to lock
277 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
278 with all primary or secondary nodes for instances already locked and
279 present in self.needed_locks[locking.LEVEL_INSTANCE].
281 It should be called from DeclareLocks, and for safety only works if
282 self.recalculate_locks[locking.LEVEL_NODE] is set.
284 In the future it may grow parameters to just lock some instance's nodes, or
285 to just lock primaries or secondary nodes, if needed.
287 If should be called in DeclareLocks in a way similar to::
289 if level == locking.LEVEL_NODE:
290 self._LockInstancesNodes()
292 @type primary_only: boolean
293 @param primary_only: only lock primary nodes of locked instances
296 assert locking.LEVEL_NODE in self.recalculate_locks, \
297 "_LockInstancesNodes helper function called with no nodes to recalculate"
299 # TODO: check if we're really been called with the instance locks held
301 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
302 # future we might want to have different behaviors depending on the value
303 # of self.recalculate_locks[locking.LEVEL_NODE]
305 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
306 instance = self.context.cfg.GetInstanceInfo(instance_name)
307 wanted_nodes.append(instance.primary_node)
309 wanted_nodes.extend(instance.secondary_nodes)
311 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
312 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
313 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
314 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
316 del self.recalculate_locks[locking.LEVEL_NODE]
319 class NoHooksLU(LogicalUnit):
320 """Simple LU which runs no hooks.
322 This LU is intended as a parent for other LogicalUnits which will
323 run no hooks, in order to reduce duplicate code.
330 def _GetWantedNodes(lu, nodes):
331 """Returns list of checked and expanded node names.
333 @type lu: L{LogicalUnit}
334 @param lu: the logical unit on whose behalf we execute
336 @param nodes: list of node names or None for all nodes
338 @return: the list of nodes, sorted
339 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
342 if not isinstance(nodes, list):
343 raise errors.OpPrereqError("Invalid argument type 'nodes'")
346 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
347 " non-empty list of nodes whose name is to be expanded.")
351 node = lu.cfg.ExpandNodeName(name)
353 raise errors.OpPrereqError("No such node name '%s'" % name)
356 return utils.NiceSort(wanted)
359 def _GetWantedInstances(lu, instances):
360 """Returns list of checked and expanded instance names.
362 @type lu: L{LogicalUnit}
363 @param lu: the logical unit on whose behalf we execute
364 @type instances: list
365 @param instances: list of instance names or None for all instances
367 @return: the list of instances, sorted
368 @raise errors.OpPrereqError: if the instances parameter is wrong type
369 @raise errors.OpPrereqError: if any of the passed instances is not found
372 if not isinstance(instances, list):
373 raise errors.OpPrereqError("Invalid argument type 'instances'")
378 for name in instances:
379 instance = lu.cfg.ExpandInstanceName(name)
381 raise errors.OpPrereqError("No such instance name '%s'" % name)
382 wanted.append(instance)
385 wanted = lu.cfg.GetInstanceList()
386 return utils.NiceSort(wanted)
389 def _CheckOutputFields(static, dynamic, selected):
390 """Checks whether all selected fields are valid.
392 @type static: L{utils.FieldSet}
393 @param static: static fields set
394 @type dynamic: L{utils.FieldSet}
395 @param dynamic: dynamic fields set
402 delta = f.NonMatching(selected)
404 raise errors.OpPrereqError("Unknown output fields selected: %s"
408 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
409 memory, vcpus, nics):
410 """Builds instance related env variables for hooks
412 This builds the hook environment from individual variables.
415 @param name: the name of the instance
416 @type primary_node: string
417 @param primary_node: the name of the instance's primary node
418 @type secondary_nodes: list
419 @param secondary_nodes: list of secondary nodes as strings
420 @type os_type: string
421 @param os_type: the name of the instance's OS
423 @param status: the desired status of the instances
425 @param memory: the memory size of the instance
427 @param vcpus: the count of VCPUs the instance has
429 @param nics: list of tuples (ip, bridge, mac) representing
430 the NICs the instance has
432 @return: the hook environment for this instance
437 "INSTANCE_NAME": name,
438 "INSTANCE_PRIMARY": primary_node,
439 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
440 "INSTANCE_OS_TYPE": os_type,
441 "INSTANCE_STATUS": status,
442 "INSTANCE_MEMORY": memory,
443 "INSTANCE_VCPUS": vcpus,
447 nic_count = len(nics)
448 for idx, (ip, bridge, mac) in enumerate(nics):
451 env["INSTANCE_NIC%d_IP" % idx] = ip
452 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
453 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
457 env["INSTANCE_NIC_COUNT"] = nic_count
462 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
463 """Builds instance related env variables for hooks from an object.
465 @type lu: L{LogicalUnit}
466 @param lu: the logical unit on whose behalf we execute
467 @type instance: L{objects.Instance}
468 @param instance: the instance for which we should build the
471 @param override: dictionary with key/values that will override
474 @return: the hook environment dictionary
477 bep = lu.cfg.GetClusterInfo().FillBE(instance)
479 'name': instance.name,
480 'primary_node': instance.primary_node,
481 'secondary_nodes': instance.secondary_nodes,
482 'os_type': instance.os,
483 'status': instance.os,
484 'memory': bep[constants.BE_MEMORY],
485 'vcpus': bep[constants.BE_VCPUS],
486 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
489 args.update(override)
490 return _BuildInstanceHookEnv(**args)
493 def _CheckInstanceBridgesExist(lu, instance):
494 """Check that the brigdes needed by an instance exist.
497 # check bridges existance
498 brlist = [nic.bridge for nic in instance.nics]
499 if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
500 raise errors.OpPrereqError("one or more target bridges %s does not"
501 " exist on destination node '%s'" %
502 (brlist, instance.primary_node))
505 class LUDestroyCluster(NoHooksLU):
506 """Logical unit for destroying the cluster.
511 def CheckPrereq(self):
512 """Check prerequisites.
514 This checks whether the cluster is empty.
516 Any errors are signalled by raising errors.OpPrereqError.
519 master = self.cfg.GetMasterNode()
521 nodelist = self.cfg.GetNodeList()
522 if len(nodelist) != 1 or nodelist[0] != master:
523 raise errors.OpPrereqError("There are still %d node(s) in"
524 " this cluster." % (len(nodelist) - 1))
525 instancelist = self.cfg.GetInstanceList()
527 raise errors.OpPrereqError("There are still %d instance(s) in"
528 " this cluster." % len(instancelist))
530 def Exec(self, feedback_fn):
531 """Destroys the cluster.
534 master = self.cfg.GetMasterNode()
535 if not self.rpc.call_node_stop_master(master, False):
536 raise errors.OpExecError("Could not disable the master role")
537 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
538 utils.CreateBackup(priv_key)
539 utils.CreateBackup(pub_key)
543 class LUVerifyCluster(LogicalUnit):
544 """Verifies the cluster status.
547 HPATH = "cluster-verify"
548 HTYPE = constants.HTYPE_CLUSTER
549 _OP_REQP = ["skip_checks"]
552 def ExpandNames(self):
553 self.needed_locks = {
554 locking.LEVEL_NODE: locking.ALL_SET,
555 locking.LEVEL_INSTANCE: locking.ALL_SET,
557 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
559 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
560 remote_version, feedback_fn):
561 """Run multiple tests against a node.
565 - compares ganeti version
566 - checks vg existance and size > 20G
567 - checks config file checksum
568 - checks ssh to other nodes
571 @param node: the name of the node to check
572 @param file_list: required list of files
573 @param local_cksum: dictionary of local files and their checksums
575 @param vglist: dictionary of volume group names and their size
576 @param node_result: the results from the node
577 @param remote_version: the RPC version from the remote node
578 @param feedback_fn: function used to accumulate results
581 # compares ganeti version
582 local_version = constants.PROTOCOL_VERSION
583 if not remote_version:
584 feedback_fn(" - ERROR: connection to %s failed" % (node))
587 if local_version != remote_version:
588 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
589 (local_version, node, remote_version))
592 # checks vg existance and size > 20G
596 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
600 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
601 constants.MIN_VG_SIZE)
603 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
607 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
610 # checks config file checksum
613 if 'filelist' not in node_result:
615 feedback_fn(" - ERROR: node hasn't returned file checksum data")
617 remote_cksum = node_result['filelist']
618 for file_name in file_list:
619 if file_name not in remote_cksum:
621 feedback_fn(" - ERROR: file '%s' missing" % file_name)
622 elif remote_cksum[file_name] != local_cksum[file_name]:
624 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
626 if 'nodelist' not in node_result:
628 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
630 if node_result['nodelist']:
632 for node in node_result['nodelist']:
633 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
634 (node, node_result['nodelist'][node]))
635 if 'node-net-test' not in node_result:
637 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
639 if node_result['node-net-test']:
641 nlist = utils.NiceSort(node_result['node-net-test'].keys())
643 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
644 (node, node_result['node-net-test'][node]))
646 hyp_result = node_result.get('hypervisor', None)
647 if isinstance(hyp_result, dict):
648 for hv_name, hv_result in hyp_result.iteritems():
649 if hv_result is not None:
650 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
651 (hv_name, hv_result))
654 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
655 node_instance, feedback_fn):
656 """Verify an instance.
658 This function checks to see if the required block devices are
659 available on the instance's node.
664 node_current = instanceconfig.primary_node
667 instanceconfig.MapLVsByNode(node_vol_should)
669 for node in node_vol_should:
670 for volume in node_vol_should[node]:
671 if node not in node_vol_is or volume not in node_vol_is[node]:
672 feedback_fn(" - ERROR: volume %s missing on node %s" %
676 if not instanceconfig.status == 'down':
677 if (node_current not in node_instance or
678 not instance in node_instance[node_current]):
679 feedback_fn(" - ERROR: instance %s not running on node %s" %
680 (instance, node_current))
683 for node in node_instance:
684 if (not node == node_current):
685 if instance in node_instance[node]:
686 feedback_fn(" - ERROR: instance %s should not run on node %s" %
692 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
693 """Verify if there are any unknown volumes in the cluster.
695 The .os, .swap and backup volumes are ignored. All other volumes are
701 for node in node_vol_is:
702 for volume in node_vol_is[node]:
703 if node not in node_vol_should or volume not in node_vol_should[node]:
704 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
709 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
710 """Verify the list of running instances.
712 This checks what instances are running but unknown to the cluster.
716 for node in node_instance:
717 for runninginstance in node_instance[node]:
718 if runninginstance not in instancelist:
719 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
720 (runninginstance, node))
724 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
725 """Verify N+1 Memory Resilience.
727 Check that if one single node dies we can still start all the instances it
733 for node, nodeinfo in node_info.iteritems():
734 # This code checks that every node which is now listed as secondary has
735 # enough memory to host all instances it is supposed to should a single
736 # other node in the cluster fail.
737 # FIXME: not ready for failover to an arbitrary node
738 # FIXME: does not support file-backed instances
739 # WARNING: we currently take into account down instances as well as up
740 # ones, considering that even if they're down someone might want to start
741 # them even in the event of a node failure.
742 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
744 for instance in instances:
745 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
746 if bep[constants.BE_AUTO_BALANCE]:
747 needed_mem += bep[constants.BE_MEMORY]
748 if nodeinfo['mfree'] < needed_mem:
749 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
750 " failovers should node %s fail" % (node, prinode))
754 def CheckPrereq(self):
755 """Check prerequisites.
757 Transform the list of checks we're going to skip into a set and check that
758 all its members are valid.
761 self.skip_set = frozenset(self.op.skip_checks)
762 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
763 raise errors.OpPrereqError("Invalid checks to be skipped specified")
765 def BuildHooksEnv(self):
768 Cluster-Verify hooks just rone in the post phase and their failure makes
769 the output be logged in the verify output and the verification to fail.
772 all_nodes = self.cfg.GetNodeList()
773 # TODO: populate the environment with useful information for verify hooks
775 return env, [], all_nodes
777 def Exec(self, feedback_fn):
778 """Verify integrity of cluster, performing various test on nodes.
782 feedback_fn("* Verifying global settings")
783 for msg in self.cfg.VerifyConfig():
784 feedback_fn(" - ERROR: %s" % msg)
786 vg_name = self.cfg.GetVGName()
787 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
788 nodelist = utils.NiceSort(self.cfg.GetNodeList())
789 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
790 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
791 i_non_redundant = [] # Non redundant instances
792 i_non_a_balanced = [] # Non auto-balanced instances
798 # FIXME: verify OS list
801 file_names.append(constants.SSL_CERT_FILE)
802 file_names.append(constants.CLUSTER_CONF_FILE)
803 local_checksums = utils.FingerprintFiles(file_names)
805 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
806 all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
807 all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
808 all_vglist = self.rpc.call_vg_list(nodelist)
809 node_verify_param = {
810 'filelist': file_names,
811 'nodelist': nodelist,
812 'hypervisor': hypervisors,
813 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
814 for node in nodeinfo]
816 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
817 self.cfg.GetClusterName())
818 all_rversion = self.rpc.call_version(nodelist)
819 all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
820 self.cfg.GetHypervisorType())
822 cluster = self.cfg.GetClusterInfo()
823 for node in nodelist:
824 feedback_fn("* Verifying node %s" % node)
825 result = self._VerifyNode(node, file_names, local_checksums,
826 all_vglist[node], all_nvinfo[node],
827 all_rversion[node], feedback_fn)
831 volumeinfo = all_volumeinfo[node]
833 if isinstance(volumeinfo, basestring):
834 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
835 (node, volumeinfo[-400:].encode('string_escape')))
837 node_volume[node] = {}
838 elif not isinstance(volumeinfo, dict):
839 feedback_fn(" - ERROR: connection to %s failed" % (node,))
843 node_volume[node] = volumeinfo
846 nodeinstance = all_instanceinfo[node]
847 if type(nodeinstance) != list:
848 feedback_fn(" - ERROR: connection to %s failed" % (node,))
852 node_instance[node] = nodeinstance
855 nodeinfo = all_ninfo[node]
856 if not isinstance(nodeinfo, dict):
857 feedback_fn(" - ERROR: connection to %s failed" % (node,))
863 "mfree": int(nodeinfo['memory_free']),
864 "dfree": int(nodeinfo['vg_free']),
867 # dictionary holding all instances this node is secondary for,
868 # grouped by their primary node. Each key is a cluster node, and each
869 # value is a list of instances which have the key as primary and the
870 # current node as secondary. this is handy to calculate N+1 memory
871 # availability if you can only failover from a primary to its
873 "sinst-by-pnode": {},
876 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
882 for instance in instancelist:
883 feedback_fn("* Verifying instance %s" % instance)
884 inst_config = self.cfg.GetInstanceInfo(instance)
885 result = self._VerifyInstance(instance, inst_config, node_volume,
886 node_instance, feedback_fn)
889 inst_config.MapLVsByNode(node_vol_should)
891 instance_cfg[instance] = inst_config
893 pnode = inst_config.primary_node
894 if pnode in node_info:
895 node_info[pnode]['pinst'].append(instance)
897 feedback_fn(" - ERROR: instance %s, connection to primary node"
898 " %s failed" % (instance, pnode))
901 # If the instance is non-redundant we cannot survive losing its primary
902 # node, so we are not N+1 compliant. On the other hand we have no disk
903 # templates with more than one secondary so that situation is not well
905 # FIXME: does not support file-backed instances
906 if len(inst_config.secondary_nodes) == 0:
907 i_non_redundant.append(instance)
908 elif len(inst_config.secondary_nodes) > 1:
909 feedback_fn(" - WARNING: multiple secondaries for instance %s"
912 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
913 i_non_a_balanced.append(instance)
915 for snode in inst_config.secondary_nodes:
916 if snode in node_info:
917 node_info[snode]['sinst'].append(instance)
918 if pnode not in node_info[snode]['sinst-by-pnode']:
919 node_info[snode]['sinst-by-pnode'][pnode] = []
920 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
922 feedback_fn(" - ERROR: instance %s, connection to secondary node"
923 " %s failed" % (instance, snode))
925 feedback_fn("* Verifying orphan volumes")
926 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
930 feedback_fn("* Verifying remaining instances")
931 result = self._VerifyOrphanInstances(instancelist, node_instance,
935 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
936 feedback_fn("* Verifying N+1 Memory redundancy")
937 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
940 feedback_fn("* Other Notes")
942 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
943 % len(i_non_redundant))
946 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
947 % len(i_non_a_balanced))
951 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
952 """Analize the post-hooks' result
954 This method analyses the hook result, handles it, and sends some
955 nicely-formatted feedback back to the user.
957 @param phase: one of L{constants.HOOKS_PHASE_POST} or
958 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
959 @param hooks_results: the results of the multi-node hooks rpc call
960 @param feedback_fn: function used send feedback back to the caller
961 @param lu_result: previous Exec result
962 @return: the new Exec result, based on the previous result
966 # We only really run POST phase hooks, and are only interested in
968 if phase == constants.HOOKS_PHASE_POST:
969 # Used to change hooks' output to proper indentation
970 indent_re = re.compile('^', re.M)
971 feedback_fn("* Hooks Results")
972 if not hooks_results:
973 feedback_fn(" - ERROR: general communication failure")
976 for node_name in hooks_results:
977 show_node_header = True
978 res = hooks_results[node_name]
979 if res is False or not isinstance(res, list):
980 feedback_fn(" Communication failure")
983 for script, hkr, output in res:
984 if hkr == constants.HKR_FAIL:
985 # The node header is only shown once, if there are
986 # failing hooks on that node
988 feedback_fn(" Node %s:" % node_name)
989 show_node_header = False
990 feedback_fn(" ERROR: Script %s failed, output:" % script)
991 output = indent_re.sub(' ', output)
992 feedback_fn("%s" % output)
998 class LUVerifyDisks(NoHooksLU):
999 """Verifies the cluster disks status.
1005 def ExpandNames(self):
1006 self.needed_locks = {
1007 locking.LEVEL_NODE: locking.ALL_SET,
1008 locking.LEVEL_INSTANCE: locking.ALL_SET,
1010 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1012 def CheckPrereq(self):
1013 """Check prerequisites.
1015 This has no prerequisites.
1020 def Exec(self, feedback_fn):
1021 """Verify integrity of cluster disks.
1024 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1026 vg_name = self.cfg.GetVGName()
1027 nodes = utils.NiceSort(self.cfg.GetNodeList())
1028 instances = [self.cfg.GetInstanceInfo(name)
1029 for name in self.cfg.GetInstanceList()]
1032 for inst in instances:
1034 if (inst.status != "up" or
1035 inst.disk_template not in constants.DTS_NET_MIRROR):
1037 inst.MapLVsByNode(inst_lvs)
1038 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1039 for node, vol_list in inst_lvs.iteritems():
1040 for vol in vol_list:
1041 nv_dict[(node, vol)] = inst
1046 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1051 lvs = node_lvs[node]
1053 if isinstance(lvs, basestring):
1054 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1055 res_nlvm[node] = lvs
1056 elif not isinstance(lvs, dict):
1057 logging.warning("Connection to node %s failed or invalid data"
1059 res_nodes.append(node)
1062 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1063 inst = nv_dict.pop((node, lv_name), None)
1064 if (not lv_online and inst is not None
1065 and inst.name not in res_instances):
1066 res_instances.append(inst.name)
1068 # any leftover items in nv_dict are missing LVs, let's arrange the
1070 for key, inst in nv_dict.iteritems():
1071 if inst.name not in res_missing:
1072 res_missing[inst.name] = []
1073 res_missing[inst.name].append(key)
1078 class LURenameCluster(LogicalUnit):
1079 """Rename the cluster.
1082 HPATH = "cluster-rename"
1083 HTYPE = constants.HTYPE_CLUSTER
1086 def BuildHooksEnv(self):
1091 "OP_TARGET": self.cfg.GetClusterName(),
1092 "NEW_NAME": self.op.name,
1094 mn = self.cfg.GetMasterNode()
1095 return env, [mn], [mn]
1097 def CheckPrereq(self):
1098 """Verify that the passed name is a valid one.
1101 hostname = utils.HostInfo(self.op.name)
1103 new_name = hostname.name
1104 self.ip = new_ip = hostname.ip
1105 old_name = self.cfg.GetClusterName()
1106 old_ip = self.cfg.GetMasterIP()
1107 if new_name == old_name and new_ip == old_ip:
1108 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1109 " cluster has changed")
1110 if new_ip != old_ip:
1111 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1112 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1113 " reachable on the network. Aborting." %
1116 self.op.name = new_name
1118 def Exec(self, feedback_fn):
1119 """Rename the cluster.
1122 clustername = self.op.name
1125 # shutdown the master IP
1126 master = self.cfg.GetMasterNode()
1127 if not self.rpc.call_node_stop_master(master, False):
1128 raise errors.OpExecError("Could not disable the master role")
1133 ss.SetKey(ss.SS_MASTER_IP, ip)
1134 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1136 # Distribute updated ss config to all nodes
1137 myself = self.cfg.GetNodeInfo(master)
1138 dist_nodes = self.cfg.GetNodeList()
1139 if myself.name in dist_nodes:
1140 dist_nodes.remove(myself.name)
1142 logging.debug("Copying updated ssconf data to all nodes")
1143 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1144 fname = ss.KeyToFilename(keyname)
1145 result = self.rpc.call_upload_file(dist_nodes, fname)
1146 for to_node in dist_nodes:
1147 if not result[to_node]:
1148 self.LogWarning("Copy of file %s to node %s failed",
1151 if not self.rpc.call_node_start_master(master, False):
1152 self.LogWarning("Could not re-enable the master role on"
1153 " the master, please restart manually.")
1156 def _RecursiveCheckIfLVMBased(disk):
1157 """Check if the given disk or its children are lvm-based.
1159 @type disk: L{objects.Disk}
1160 @param disk: the disk to check
1162 @return: boolean indicating whether a LD_LV dev_type was found or not
1166 for chdisk in disk.children:
1167 if _RecursiveCheckIfLVMBased(chdisk):
1169 return disk.dev_type == constants.LD_LV
1172 class LUSetClusterParams(LogicalUnit):
1173 """Change the parameters of the cluster.
1176 HPATH = "cluster-modify"
1177 HTYPE = constants.HTYPE_CLUSTER
1181 def ExpandNames(self):
1182 # FIXME: in the future maybe other cluster params won't require checking on
1183 # all nodes to be modified.
1184 self.needed_locks = {
1185 locking.LEVEL_NODE: locking.ALL_SET,
1187 self.share_locks[locking.LEVEL_NODE] = 1
1189 def BuildHooksEnv(self):
1194 "OP_TARGET": self.cfg.GetClusterName(),
1195 "NEW_VG_NAME": self.op.vg_name,
1197 mn = self.cfg.GetMasterNode()
1198 return env, [mn], [mn]
1200 def CheckPrereq(self):
1201 """Check prerequisites.
1203 This checks whether the given params don't conflict and
1204 if the given volume group is valid.
1207 # FIXME: This only works because there is only one parameter that can be
1208 # changed or removed.
1209 if self.op.vg_name is not None and not self.op.vg_name:
1210 instances = self.cfg.GetAllInstancesInfo().values()
1211 for inst in instances:
1212 for disk in inst.disks:
1213 if _RecursiveCheckIfLVMBased(disk):
1214 raise errors.OpPrereqError("Cannot disable lvm storage while"
1215 " lvm-based instances exist")
1217 node_list = self.acquired_locks[locking.LEVEL_NODE]
1219 # if vg_name not None, checks given volume group on all nodes
1221 vglist = self.rpc.call_vg_list(node_list)
1222 for node in node_list:
1223 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1224 constants.MIN_VG_SIZE)
1226 raise errors.OpPrereqError("Error on node '%s': %s" %
1229 self.cluster = cluster = self.cfg.GetClusterInfo()
1230 # beparams changes do not need validation (we can't validate?),
1231 # but we still process here
1232 if self.op.beparams:
1233 self.new_beparams = cluster.FillDict(
1234 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1236 # hypervisor list/parameters
1237 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1238 if self.op.hvparams:
1239 if not isinstance(self.op.hvparams, dict):
1240 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1241 for hv_name, hv_dict in self.op.hvparams.items():
1242 if hv_name not in self.new_hvparams:
1243 self.new_hvparams[hv_name] = hv_dict
1245 self.new_hvparams[hv_name].update(hv_dict)
1247 if self.op.enabled_hypervisors is not None:
1248 self.hv_list = self.op.enabled_hypervisors
1250 self.hv_list = cluster.enabled_hypervisors
1252 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1253 # either the enabled list has changed, or the parameters have, validate
1254 for hv_name, hv_params in self.new_hvparams.items():
1255 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1256 (self.op.enabled_hypervisors and
1257 hv_name in self.op.enabled_hypervisors)):
1258 # either this is a new hypervisor, or its parameters have changed
1259 hv_class = hypervisor.GetHypervisor(hv_name)
1260 hv_class.CheckParameterSyntax(hv_params)
1261 _CheckHVParams(self, node_list, hv_name, hv_params)
1263 def Exec(self, feedback_fn):
1264 """Change the parameters of the cluster.
1267 if self.op.vg_name is not None:
1268 if self.op.vg_name != self.cfg.GetVGName():
1269 self.cfg.SetVGName(self.op.vg_name)
1271 feedback_fn("Cluster LVM configuration already in desired"
1272 " state, not changing")
1273 if self.op.hvparams:
1274 self.cluster.hvparams = self.new_hvparams
1275 if self.op.enabled_hypervisors is not None:
1276 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1277 if self.op.beparams:
1278 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1279 self.cfg.Update(self.cluster)
1282 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1283 """Sleep and poll for an instance's disk to sync.
1286 if not instance.disks:
1290 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1292 node = instance.primary_node
1294 for dev in instance.disks:
1295 lu.cfg.SetDiskID(dev, node)
1301 cumul_degraded = False
1302 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1304 lu.LogWarning("Can't get any data from node %s", node)
1307 raise errors.RemoteError("Can't contact node %s for mirror data,"
1308 " aborting." % node)
1312 for i in range(len(rstats)):
1315 lu.LogWarning("Can't compute data for node %s/%s",
1316 node, instance.disks[i].iv_name)
1318 # we ignore the ldisk parameter
1319 perc_done, est_time, is_degraded, _ = mstat
1320 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1321 if perc_done is not None:
1323 if est_time is not None:
1324 rem_time = "%d estimated seconds remaining" % est_time
1327 rem_time = "no time estimate"
1328 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1329 (instance.disks[i].iv_name, perc_done, rem_time))
1333 time.sleep(min(60, max_time))
1336 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1337 return not cumul_degraded
1340 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1341 """Check that mirrors are not degraded.
1343 The ldisk parameter, if True, will change the test from the
1344 is_degraded attribute (which represents overall non-ok status for
1345 the device(s)) to the ldisk (representing the local storage status).
1348 lu.cfg.SetDiskID(dev, node)
1355 if on_primary or dev.AssembleOnSecondary():
1356 rstats = lu.rpc.call_blockdev_find(node, dev)
1358 logging.warning("Node %s: disk degraded, not found or node down", node)
1361 result = result and (not rstats[idx])
1363 for child in dev.children:
1364 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1369 class LUDiagnoseOS(NoHooksLU):
1370 """Logical unit for OS diagnose/query.
1373 _OP_REQP = ["output_fields", "names"]
1375 _FIELDS_STATIC = utils.FieldSet()
1376 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1378 def ExpandNames(self):
1380 raise errors.OpPrereqError("Selective OS query not supported")
1382 _CheckOutputFields(static=self._FIELDS_STATIC,
1383 dynamic=self._FIELDS_DYNAMIC,
1384 selected=self.op.output_fields)
1386 # Lock all nodes, in shared mode
1387 self.needed_locks = {}
1388 self.share_locks[locking.LEVEL_NODE] = 1
1389 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1391 def CheckPrereq(self):
1392 """Check prerequisites.
1397 def _DiagnoseByOS(node_list, rlist):
1398 """Remaps a per-node return list into an a per-os per-node dictionary
1400 @param node_list: a list with the names of all nodes
1401 @param rlist: a map with node names as keys and OS objects as values
1404 @returns: a dictionary with osnames as keys and as value another map, with
1405 nodes as keys and list of OS objects as values, eg::
1407 {"debian-etch": {"node1": [<object>,...],
1408 "node2": [<object>,]}
1413 for node_name, nr in rlist.iteritems():
1417 if os_obj.name not in all_os:
1418 # build a list of nodes for this os containing empty lists
1419 # for each node in node_list
1420 all_os[os_obj.name] = {}
1421 for nname in node_list:
1422 all_os[os_obj.name][nname] = []
1423 all_os[os_obj.name][node_name].append(os_obj)
1426 def Exec(self, feedback_fn):
1427 """Compute the list of OSes.
1430 node_list = self.acquired_locks[locking.LEVEL_NODE]
1431 node_data = self.rpc.call_os_diagnose(node_list)
1432 if node_data == False:
1433 raise errors.OpExecError("Can't gather the list of OSes")
1434 pol = self._DiagnoseByOS(node_list, node_data)
1436 for os_name, os_data in pol.iteritems():
1438 for field in self.op.output_fields:
1441 elif field == "valid":
1442 val = utils.all([osl and osl[0] for osl in os_data.values()])
1443 elif field == "node_status":
1445 for node_name, nos_list in os_data.iteritems():
1446 val[node_name] = [(v.status, v.path) for v in nos_list]
1448 raise errors.ParameterError(field)
1455 class LURemoveNode(LogicalUnit):
1456 """Logical unit for removing a node.
1459 HPATH = "node-remove"
1460 HTYPE = constants.HTYPE_NODE
1461 _OP_REQP = ["node_name"]
1463 def BuildHooksEnv(self):
1466 This doesn't run on the target node in the pre phase as a failed
1467 node would then be impossible to remove.
1471 "OP_TARGET": self.op.node_name,
1472 "NODE_NAME": self.op.node_name,
1474 all_nodes = self.cfg.GetNodeList()
1475 all_nodes.remove(self.op.node_name)
1476 return env, all_nodes, all_nodes
1478 def CheckPrereq(self):
1479 """Check prerequisites.
1482 - the node exists in the configuration
1483 - it does not have primary or secondary instances
1484 - it's not the master
1486 Any errors are signalled by raising errors.OpPrereqError.
1489 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1491 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1493 instance_list = self.cfg.GetInstanceList()
1495 masternode = self.cfg.GetMasterNode()
1496 if node.name == masternode:
1497 raise errors.OpPrereqError("Node is the master node,"
1498 " you need to failover first.")
1500 for instance_name in instance_list:
1501 instance = self.cfg.GetInstanceInfo(instance_name)
1502 if node.name == instance.primary_node:
1503 raise errors.OpPrereqError("Instance %s still running on the node,"
1504 " please remove first." % instance_name)
1505 if node.name in instance.secondary_nodes:
1506 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1507 " please remove first." % instance_name)
1508 self.op.node_name = node.name
1511 def Exec(self, feedback_fn):
1512 """Removes the node from the cluster.
1516 logging.info("Stopping the node daemon and removing configs from node %s",
1519 self.context.RemoveNode(node.name)
1521 self.rpc.call_node_leave_cluster(node.name)
1524 class LUQueryNodes(NoHooksLU):
1525 """Logical unit for querying nodes.
1528 _OP_REQP = ["output_fields", "names"]
1530 _FIELDS_DYNAMIC = utils.FieldSet(
1532 "mtotal", "mnode", "mfree",
1537 _FIELDS_STATIC = utils.FieldSet(
1538 "name", "pinst_cnt", "sinst_cnt",
1539 "pinst_list", "sinst_list",
1540 "pip", "sip", "tags",
1544 def ExpandNames(self):
1545 _CheckOutputFields(static=self._FIELDS_STATIC,
1546 dynamic=self._FIELDS_DYNAMIC,
1547 selected=self.op.output_fields)
1549 self.needed_locks = {}
1550 self.share_locks[locking.LEVEL_NODE] = 1
1553 self.wanted = _GetWantedNodes(self, self.op.names)
1555 self.wanted = locking.ALL_SET
1557 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1559 # if we don't request only static fields, we need to lock the nodes
1560 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1563 def CheckPrereq(self):
1564 """Check prerequisites.
1567 # The validation of the node list is done in the _GetWantedNodes,
1568 # if non empty, and if empty, there's no validation to do
1571 def Exec(self, feedback_fn):
1572 """Computes the list of nodes and their attributes.
1575 all_info = self.cfg.GetAllNodesInfo()
1577 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1578 elif self.wanted != locking.ALL_SET:
1579 nodenames = self.wanted
1580 missing = set(nodenames).difference(all_info.keys())
1582 raise errors.OpExecError(
1583 "Some nodes were removed before retrieving their data: %s" % missing)
1585 nodenames = all_info.keys()
1587 nodenames = utils.NiceSort(nodenames)
1588 nodelist = [all_info[name] for name in nodenames]
1590 # begin data gathering
1594 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1595 self.cfg.GetHypervisorType())
1596 for name in nodenames:
1597 nodeinfo = node_data.get(name, None)
1600 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1601 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1602 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1603 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1604 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1605 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1606 "bootid": nodeinfo['bootid'],
1609 live_data[name] = {}
1611 live_data = dict.fromkeys(nodenames, {})
1613 node_to_primary = dict([(name, set()) for name in nodenames])
1614 node_to_secondary = dict([(name, set()) for name in nodenames])
1616 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1617 "sinst_cnt", "sinst_list"))
1618 if inst_fields & frozenset(self.op.output_fields):
1619 instancelist = self.cfg.GetInstanceList()
1621 for instance_name in instancelist:
1622 inst = self.cfg.GetInstanceInfo(instance_name)
1623 if inst.primary_node in node_to_primary:
1624 node_to_primary[inst.primary_node].add(inst.name)
1625 for secnode in inst.secondary_nodes:
1626 if secnode in node_to_secondary:
1627 node_to_secondary[secnode].add(inst.name)
1629 # end data gathering
1632 for node in nodelist:
1634 for field in self.op.output_fields:
1637 elif field == "pinst_list":
1638 val = list(node_to_primary[node.name])
1639 elif field == "sinst_list":
1640 val = list(node_to_secondary[node.name])
1641 elif field == "pinst_cnt":
1642 val = len(node_to_primary[node.name])
1643 elif field == "sinst_cnt":
1644 val = len(node_to_secondary[node.name])
1645 elif field == "pip":
1646 val = node.primary_ip
1647 elif field == "sip":
1648 val = node.secondary_ip
1649 elif field == "tags":
1650 val = list(node.GetTags())
1651 elif field == "serial_no":
1652 val = node.serial_no
1653 elif self._FIELDS_DYNAMIC.Matches(field):
1654 val = live_data[node.name].get(field, None)
1656 raise errors.ParameterError(field)
1657 node_output.append(val)
1658 output.append(node_output)
1663 class LUQueryNodeVolumes(NoHooksLU):
1664 """Logical unit for getting volumes on node(s).
1667 _OP_REQP = ["nodes", "output_fields"]
1669 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1670 _FIELDS_STATIC = utils.FieldSet("node")
1672 def ExpandNames(self):
1673 _CheckOutputFields(static=self._FIELDS_STATIC,
1674 dynamic=self._FIELDS_DYNAMIC,
1675 selected=self.op.output_fields)
1677 self.needed_locks = {}
1678 self.share_locks[locking.LEVEL_NODE] = 1
1679 if not self.op.nodes:
1680 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1682 self.needed_locks[locking.LEVEL_NODE] = \
1683 _GetWantedNodes(self, self.op.nodes)
1685 def CheckPrereq(self):
1686 """Check prerequisites.
1688 This checks that the fields required are valid output fields.
1691 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1693 def Exec(self, feedback_fn):
1694 """Computes the list of nodes and their attributes.
1697 nodenames = self.nodes
1698 volumes = self.rpc.call_node_volumes(nodenames)
1700 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1701 in self.cfg.GetInstanceList()]
1703 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1706 for node in nodenames:
1707 if node not in volumes or not volumes[node]:
1710 node_vols = volumes[node][:]
1711 node_vols.sort(key=lambda vol: vol['dev'])
1713 for vol in node_vols:
1715 for field in self.op.output_fields:
1718 elif field == "phys":
1722 elif field == "name":
1724 elif field == "size":
1725 val = int(float(vol['size']))
1726 elif field == "instance":
1728 if node not in lv_by_node[inst]:
1730 if vol['name'] in lv_by_node[inst][node]:
1736 raise errors.ParameterError(field)
1737 node_output.append(str(val))
1739 output.append(node_output)
1744 class LUAddNode(LogicalUnit):
1745 """Logical unit for adding node to the cluster.
1749 HTYPE = constants.HTYPE_NODE
1750 _OP_REQP = ["node_name"]
1752 def BuildHooksEnv(self):
1755 This will run on all nodes before, and on all nodes + the new node after.
1759 "OP_TARGET": self.op.node_name,
1760 "NODE_NAME": self.op.node_name,
1761 "NODE_PIP": self.op.primary_ip,
1762 "NODE_SIP": self.op.secondary_ip,
1764 nodes_0 = self.cfg.GetNodeList()
1765 nodes_1 = nodes_0 + [self.op.node_name, ]
1766 return env, nodes_0, nodes_1
1768 def CheckPrereq(self):
1769 """Check prerequisites.
1772 - the new node is not already in the config
1774 - its parameters (single/dual homed) matches the cluster
1776 Any errors are signalled by raising errors.OpPrereqError.
1779 node_name = self.op.node_name
1782 dns_data = utils.HostInfo(node_name)
1784 node = dns_data.name
1785 primary_ip = self.op.primary_ip = dns_data.ip
1786 secondary_ip = getattr(self.op, "secondary_ip", None)
1787 if secondary_ip is None:
1788 secondary_ip = primary_ip
1789 if not utils.IsValidIP(secondary_ip):
1790 raise errors.OpPrereqError("Invalid secondary IP given")
1791 self.op.secondary_ip = secondary_ip
1793 node_list = cfg.GetNodeList()
1794 if not self.op.readd and node in node_list:
1795 raise errors.OpPrereqError("Node %s is already in the configuration" %
1797 elif self.op.readd and node not in node_list:
1798 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1800 for existing_node_name in node_list:
1801 existing_node = cfg.GetNodeInfo(existing_node_name)
1803 if self.op.readd and node == existing_node_name:
1804 if (existing_node.primary_ip != primary_ip or
1805 existing_node.secondary_ip != secondary_ip):
1806 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1807 " address configuration as before")
1810 if (existing_node.primary_ip == primary_ip or
1811 existing_node.secondary_ip == primary_ip or
1812 existing_node.primary_ip == secondary_ip or
1813 existing_node.secondary_ip == secondary_ip):
1814 raise errors.OpPrereqError("New node ip address(es) conflict with"
1815 " existing node %s" % existing_node.name)
1817 # check that the type of the node (single versus dual homed) is the
1818 # same as for the master
1819 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1820 master_singlehomed = myself.secondary_ip == myself.primary_ip
1821 newbie_singlehomed = secondary_ip == primary_ip
1822 if master_singlehomed != newbie_singlehomed:
1823 if master_singlehomed:
1824 raise errors.OpPrereqError("The master has no private ip but the"
1825 " new node has one")
1827 raise errors.OpPrereqError("The master has a private ip but the"
1828 " new node doesn't have one")
1830 # checks reachablity
1831 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1832 raise errors.OpPrereqError("Node not reachable by ping")
1834 if not newbie_singlehomed:
1835 # check reachability from my secondary ip to newbie's secondary ip
1836 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1837 source=myself.secondary_ip):
1838 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1839 " based ping to noded port")
1841 self.new_node = objects.Node(name=node,
1842 primary_ip=primary_ip,
1843 secondary_ip=secondary_ip)
1845 def Exec(self, feedback_fn):
1846 """Adds the new node to the cluster.
1849 new_node = self.new_node
1850 node = new_node.name
1852 # check connectivity
1853 result = self.rpc.call_version([node])[node]
1855 if constants.PROTOCOL_VERSION == result:
1856 logging.info("Communication to node %s fine, sw version %s match",
1859 raise errors.OpExecError("Version mismatch master version %s,"
1860 " node version %s" %
1861 (constants.PROTOCOL_VERSION, result))
1863 raise errors.OpExecError("Cannot get version from the new node")
1866 logging.info("Copy ssh key to node %s", node)
1867 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1869 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1870 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1876 keyarray.append(f.read())
1880 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1882 keyarray[3], keyarray[4], keyarray[5])
1885 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1887 # Add node to our /etc/hosts, and add key to known_hosts
1888 utils.AddHostToEtcHosts(new_node.name)
1890 if new_node.secondary_ip != new_node.primary_ip:
1891 if not self.rpc.call_node_has_ip_address(new_node.name,
1892 new_node.secondary_ip):
1893 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1894 " you gave (%s). Please fix and re-run this"
1895 " command." % new_node.secondary_ip)
1897 node_verify_list = [self.cfg.GetMasterNode()]
1898 node_verify_param = {
1900 # TODO: do a node-net-test as well?
1903 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1904 self.cfg.GetClusterName())
1905 for verifier in node_verify_list:
1906 if not result[verifier]:
1907 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1908 " for remote verification" % verifier)
1909 if result[verifier]['nodelist']:
1910 for failed in result[verifier]['nodelist']:
1911 feedback_fn("ssh/hostname verification failed %s -> %s" %
1912 (verifier, result[verifier]['nodelist'][failed]))
1913 raise errors.OpExecError("ssh/hostname verification failed.")
1915 # Distribute updated /etc/hosts and known_hosts to all nodes,
1916 # including the node just added
1917 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1918 dist_nodes = self.cfg.GetNodeList()
1919 if not self.op.readd:
1920 dist_nodes.append(node)
1921 if myself.name in dist_nodes:
1922 dist_nodes.remove(myself.name)
1924 logging.debug("Copying hosts and known_hosts to all nodes")
1925 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1926 result = self.rpc.call_upload_file(dist_nodes, fname)
1927 for to_node in dist_nodes:
1928 if not result[to_node]:
1929 logging.error("Copy of file %s to node %s failed", fname, to_node)
1932 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1933 to_copy.append(constants.VNC_PASSWORD_FILE)
1934 for fname in to_copy:
1935 result = self.rpc.call_upload_file([node], fname)
1936 if not result[node]:
1937 logging.error("Could not copy file %s to node %s", fname, node)
1940 self.context.ReaddNode(new_node)
1942 self.context.AddNode(new_node)
1945 class LUQueryClusterInfo(NoHooksLU):
1946 """Query cluster configuration.
1953 def ExpandNames(self):
1954 self.needed_locks = {}
1956 def CheckPrereq(self):
1957 """No prerequsites needed for this LU.
1962 def Exec(self, feedback_fn):
1963 """Return cluster config.
1966 cluster = self.cfg.GetClusterInfo()
1968 "software_version": constants.RELEASE_VERSION,
1969 "protocol_version": constants.PROTOCOL_VERSION,
1970 "config_version": constants.CONFIG_VERSION,
1971 "os_api_version": constants.OS_API_VERSION,
1972 "export_version": constants.EXPORT_VERSION,
1973 "architecture": (platform.architecture()[0], platform.machine()),
1974 "name": cluster.cluster_name,
1975 "master": cluster.master_node,
1976 "default_hypervisor": cluster.default_hypervisor,
1977 "enabled_hypervisors": cluster.enabled_hypervisors,
1978 "hvparams": cluster.hvparams,
1979 "beparams": cluster.beparams,
1985 class LUQueryConfigValues(NoHooksLU):
1986 """Return configuration values.
1991 _FIELDS_DYNAMIC = utils.FieldSet()
1992 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
1994 def ExpandNames(self):
1995 self.needed_locks = {}
1997 _CheckOutputFields(static=self._FIELDS_STATIC,
1998 dynamic=self._FIELDS_DYNAMIC,
1999 selected=self.op.output_fields)
2001 def CheckPrereq(self):
2002 """No prerequisites.
2007 def Exec(self, feedback_fn):
2008 """Dump a representation of the cluster config to the standard output.
2012 for field in self.op.output_fields:
2013 if field == "cluster_name":
2014 entry = self.cfg.GetClusterName()
2015 elif field == "master_node":
2016 entry = self.cfg.GetMasterNode()
2017 elif field == "drain_flag":
2018 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2020 raise errors.ParameterError(field)
2021 values.append(entry)
2025 class LUActivateInstanceDisks(NoHooksLU):
2026 """Bring up an instance's disks.
2029 _OP_REQP = ["instance_name"]
2032 def ExpandNames(self):
2033 self._ExpandAndLockInstance()
2034 self.needed_locks[locking.LEVEL_NODE] = []
2035 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2037 def DeclareLocks(self, level):
2038 if level == locking.LEVEL_NODE:
2039 self._LockInstancesNodes()
2041 def CheckPrereq(self):
2042 """Check prerequisites.
2044 This checks that the instance is in the cluster.
2047 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2048 assert self.instance is not None, \
2049 "Cannot retrieve locked instance %s" % self.op.instance_name
2051 def Exec(self, feedback_fn):
2052 """Activate the disks.
2055 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2057 raise errors.OpExecError("Cannot activate block devices")
2062 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2063 """Prepare the block devices for an instance.
2065 This sets up the block devices on all nodes.
2067 @type lu: L{LogicalUnit}
2068 @param lu: the logical unit on whose behalf we execute
2069 @type instance: L{objects.Instance}
2070 @param instance: the instance for whose disks we assemble
2071 @type ignore_secondaries: boolean
2072 @param ignore_secondaries: if true, errors on secondary nodes
2073 won't result in an error return from the function
2074 @return: False if the operation failed, otherwise a list of
2075 (host, instance_visible_name, node_visible_name)
2076 with the mapping from node devices to instance devices
2081 iname = instance.name
2082 # With the two passes mechanism we try to reduce the window of
2083 # opportunity for the race condition of switching DRBD to primary
2084 # before handshaking occured, but we do not eliminate it
2086 # The proper fix would be to wait (with some limits) until the
2087 # connection has been made and drbd transitions from WFConnection
2088 # into any other network-connected state (Connected, SyncTarget,
2091 # 1st pass, assemble on all nodes in secondary mode
2092 for inst_disk in instance.disks:
2093 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2094 lu.cfg.SetDiskID(node_disk, node)
2095 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2097 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2098 " (is_primary=False, pass=1)",
2099 inst_disk.iv_name, node)
2100 if not ignore_secondaries:
2103 # FIXME: race condition on drbd migration to primary
2105 # 2nd pass, do only the primary node
2106 for inst_disk in instance.disks:
2107 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2108 if node != instance.primary_node:
2110 lu.cfg.SetDiskID(node_disk, node)
2111 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2113 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2114 " (is_primary=True, pass=2)",
2115 inst_disk.iv_name, node)
2117 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2119 # leave the disks configured for the primary node
2120 # this is a workaround that would be fixed better by
2121 # improving the logical/physical id handling
2122 for disk in instance.disks:
2123 lu.cfg.SetDiskID(disk, instance.primary_node)
2125 return disks_ok, device_info
2128 def _StartInstanceDisks(lu, instance, force):
2129 """Start the disks of an instance.
2132 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2133 ignore_secondaries=force)
2135 _ShutdownInstanceDisks(lu, instance)
2136 if force is not None and not force:
2137 lu.proc.LogWarning("", hint="If the message above refers to a"
2139 " you can retry the operation using '--force'.")
2140 raise errors.OpExecError("Disk consistency error")
2143 class LUDeactivateInstanceDisks(NoHooksLU):
2144 """Shutdown an instance's disks.
2147 _OP_REQP = ["instance_name"]
2150 def ExpandNames(self):
2151 self._ExpandAndLockInstance()
2152 self.needed_locks[locking.LEVEL_NODE] = []
2153 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2155 def DeclareLocks(self, level):
2156 if level == locking.LEVEL_NODE:
2157 self._LockInstancesNodes()
2159 def CheckPrereq(self):
2160 """Check prerequisites.
2162 This checks that the instance is in the cluster.
2165 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2166 assert self.instance is not None, \
2167 "Cannot retrieve locked instance %s" % self.op.instance_name
2169 def Exec(self, feedback_fn):
2170 """Deactivate the disks
2173 instance = self.instance
2174 _SafeShutdownInstanceDisks(self, instance)
2177 def _SafeShutdownInstanceDisks(lu, instance):
2178 """Shutdown block devices of an instance.
2180 This function checks if an instance is running, before calling
2181 _ShutdownInstanceDisks.
2184 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2185 [instance.hypervisor])
2186 ins_l = ins_l[instance.primary_node]
2187 if not type(ins_l) is list:
2188 raise errors.OpExecError("Can't contact node '%s'" %
2189 instance.primary_node)
2191 if instance.name in ins_l:
2192 raise errors.OpExecError("Instance is running, can't shutdown"
2195 _ShutdownInstanceDisks(lu, instance)
2198 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2199 """Shutdown block devices of an instance.
2201 This does the shutdown on all nodes of the instance.
2203 If the ignore_primary is false, errors on the primary node are
2208 for disk in instance.disks:
2209 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2210 lu.cfg.SetDiskID(top_disk, node)
2211 if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2212 logging.error("Could not shutdown block device %s on node %s",
2214 if not ignore_primary or node != instance.primary_node:
2219 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2220 """Checks if a node has enough free memory.
2222 This function check if a given node has the needed amount of free
2223 memory. In case the node has less memory or we cannot get the
2224 information from the node, this function raise an OpPrereqError
2227 @type lu: C{LogicalUnit}
2228 @param lu: a logical unit from which we get configuration data
2230 @param node: the node to check
2231 @type reason: C{str}
2232 @param reason: string to use in the error message
2233 @type requested: C{int}
2234 @param requested: the amount of memory in MiB to check for
2235 @type hypervisor: C{str}
2236 @param hypervisor: the hypervisor to ask for memory stats
2237 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2238 we cannot check the node
2241 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2242 if not nodeinfo or not isinstance(nodeinfo, dict):
2243 raise errors.OpPrereqError("Could not contact node %s for resource"
2244 " information" % (node,))
2246 free_mem = nodeinfo[node].get('memory_free')
2247 if not isinstance(free_mem, int):
2248 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2249 " was '%s'" % (node, free_mem))
2250 if requested > free_mem:
2251 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2252 " needed %s MiB, available %s MiB" %
2253 (node, reason, requested, free_mem))
2256 class LUStartupInstance(LogicalUnit):
2257 """Starts an instance.
2260 HPATH = "instance-start"
2261 HTYPE = constants.HTYPE_INSTANCE
2262 _OP_REQP = ["instance_name", "force"]
2265 def ExpandNames(self):
2266 self._ExpandAndLockInstance()
2268 def BuildHooksEnv(self):
2271 This runs on master, primary and secondary nodes of the instance.
2275 "FORCE": self.op.force,
2277 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2278 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2279 list(self.instance.secondary_nodes))
2282 def CheckPrereq(self):
2283 """Check prerequisites.
2285 This checks that the instance is in the cluster.
2288 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2289 assert self.instance is not None, \
2290 "Cannot retrieve locked instance %s" % self.op.instance_name
2292 bep = self.cfg.GetClusterInfo().FillBE(instance)
2293 # check bridges existance
2294 _CheckInstanceBridgesExist(self, instance)
2296 _CheckNodeFreeMemory(self, instance.primary_node,
2297 "starting instance %s" % instance.name,
2298 bep[constants.BE_MEMORY], instance.hypervisor)
2300 def Exec(self, feedback_fn):
2301 """Start the instance.
2304 instance = self.instance
2305 force = self.op.force
2306 extra_args = getattr(self.op, "extra_args", "")
2308 self.cfg.MarkInstanceUp(instance.name)
2310 node_current = instance.primary_node
2312 _StartInstanceDisks(self, instance, force)
2314 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2315 _ShutdownInstanceDisks(self, instance)
2316 raise errors.OpExecError("Could not start instance")
2319 class LURebootInstance(LogicalUnit):
2320 """Reboot an instance.
2323 HPATH = "instance-reboot"
2324 HTYPE = constants.HTYPE_INSTANCE
2325 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2328 def ExpandNames(self):
2329 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2330 constants.INSTANCE_REBOOT_HARD,
2331 constants.INSTANCE_REBOOT_FULL]:
2332 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2333 (constants.INSTANCE_REBOOT_SOFT,
2334 constants.INSTANCE_REBOOT_HARD,
2335 constants.INSTANCE_REBOOT_FULL))
2336 self._ExpandAndLockInstance()
2338 def BuildHooksEnv(self):
2341 This runs on master, primary and secondary nodes of the instance.
2345 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2347 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2348 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2349 list(self.instance.secondary_nodes))
2352 def CheckPrereq(self):
2353 """Check prerequisites.
2355 This checks that the instance is in the cluster.
2358 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2359 assert self.instance is not None, \
2360 "Cannot retrieve locked instance %s" % self.op.instance_name
2362 # check bridges existance
2363 _CheckInstanceBridgesExist(self, instance)
2365 def Exec(self, feedback_fn):
2366 """Reboot the instance.
2369 instance = self.instance
2370 ignore_secondaries = self.op.ignore_secondaries
2371 reboot_type = self.op.reboot_type
2372 extra_args = getattr(self.op, "extra_args", "")
2374 node_current = instance.primary_node
2376 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2377 constants.INSTANCE_REBOOT_HARD]:
2378 if not self.rpc.call_instance_reboot(node_current, instance,
2379 reboot_type, extra_args):
2380 raise errors.OpExecError("Could not reboot instance")
2382 if not self.rpc.call_instance_shutdown(node_current, instance):
2383 raise errors.OpExecError("could not shutdown instance for full reboot")
2384 _ShutdownInstanceDisks(self, instance)
2385 _StartInstanceDisks(self, instance, ignore_secondaries)
2386 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2387 _ShutdownInstanceDisks(self, instance)
2388 raise errors.OpExecError("Could not start instance for full reboot")
2390 self.cfg.MarkInstanceUp(instance.name)
2393 class LUShutdownInstance(LogicalUnit):
2394 """Shutdown an instance.
2397 HPATH = "instance-stop"
2398 HTYPE = constants.HTYPE_INSTANCE
2399 _OP_REQP = ["instance_name"]
2402 def ExpandNames(self):
2403 self._ExpandAndLockInstance()
2405 def BuildHooksEnv(self):
2408 This runs on master, primary and secondary nodes of the instance.
2411 env = _BuildInstanceHookEnvByObject(self, self.instance)
2412 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2413 list(self.instance.secondary_nodes))
2416 def CheckPrereq(self):
2417 """Check prerequisites.
2419 This checks that the instance is in the cluster.
2422 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2423 assert self.instance is not None, \
2424 "Cannot retrieve locked instance %s" % self.op.instance_name
2426 def Exec(self, feedback_fn):
2427 """Shutdown the instance.
2430 instance = self.instance
2431 node_current = instance.primary_node
2432 self.cfg.MarkInstanceDown(instance.name)
2433 if not self.rpc.call_instance_shutdown(node_current, instance):
2434 self.proc.LogWarning("Could not shutdown instance")
2436 _ShutdownInstanceDisks(self, instance)
2439 class LUReinstallInstance(LogicalUnit):
2440 """Reinstall an instance.
2443 HPATH = "instance-reinstall"
2444 HTYPE = constants.HTYPE_INSTANCE
2445 _OP_REQP = ["instance_name"]
2448 def ExpandNames(self):
2449 self._ExpandAndLockInstance()
2451 def BuildHooksEnv(self):
2454 This runs on master, primary and secondary nodes of the instance.
2457 env = _BuildInstanceHookEnvByObject(self, self.instance)
2458 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2459 list(self.instance.secondary_nodes))
2462 def CheckPrereq(self):
2463 """Check prerequisites.
2465 This checks that the instance is in the cluster and is not running.
2468 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2469 assert instance is not None, \
2470 "Cannot retrieve locked instance %s" % self.op.instance_name
2472 if instance.disk_template == constants.DT_DISKLESS:
2473 raise errors.OpPrereqError("Instance '%s' has no disks" %
2474 self.op.instance_name)
2475 if instance.status != "down":
2476 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2477 self.op.instance_name)
2478 remote_info = self.rpc.call_instance_info(instance.primary_node,
2480 instance.hypervisor)
2482 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2483 (self.op.instance_name,
2484 instance.primary_node))
2486 self.op.os_type = getattr(self.op, "os_type", None)
2487 if self.op.os_type is not None:
2489 pnode = self.cfg.GetNodeInfo(
2490 self.cfg.ExpandNodeName(instance.primary_node))
2492 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2494 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2496 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2497 " primary node" % self.op.os_type)
2499 self.instance = instance
2501 def Exec(self, feedback_fn):
2502 """Reinstall the instance.
2505 inst = self.instance
2507 if self.op.os_type is not None:
2508 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2509 inst.os = self.op.os_type
2510 self.cfg.Update(inst)
2512 _StartInstanceDisks(self, inst, None)
2514 feedback_fn("Running the instance OS create scripts...")
2515 if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2516 raise errors.OpExecError("Could not install OS for instance %s"
2518 (inst.name, inst.primary_node))
2520 _ShutdownInstanceDisks(self, inst)
2523 class LURenameInstance(LogicalUnit):
2524 """Rename an instance.
2527 HPATH = "instance-rename"
2528 HTYPE = constants.HTYPE_INSTANCE
2529 _OP_REQP = ["instance_name", "new_name"]
2531 def BuildHooksEnv(self):
2534 This runs on master, primary and secondary nodes of the instance.
2537 env = _BuildInstanceHookEnvByObject(self, self.instance)
2538 env["INSTANCE_NEW_NAME"] = self.op.new_name
2539 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2540 list(self.instance.secondary_nodes))
2543 def CheckPrereq(self):
2544 """Check prerequisites.
2546 This checks that the instance is in the cluster and is not running.
2549 instance = self.cfg.GetInstanceInfo(
2550 self.cfg.ExpandInstanceName(self.op.instance_name))
2551 if instance is None:
2552 raise errors.OpPrereqError("Instance '%s' not known" %
2553 self.op.instance_name)
2554 if instance.status != "down":
2555 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2556 self.op.instance_name)
2557 remote_info = self.rpc.call_instance_info(instance.primary_node,
2559 instance.hypervisor)
2561 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2562 (self.op.instance_name,
2563 instance.primary_node))
2564 self.instance = instance
2566 # new name verification
2567 name_info = utils.HostInfo(self.op.new_name)
2569 self.op.new_name = new_name = name_info.name
2570 instance_list = self.cfg.GetInstanceList()
2571 if new_name in instance_list:
2572 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2575 if not getattr(self.op, "ignore_ip", False):
2576 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2577 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2578 (name_info.ip, new_name))
2581 def Exec(self, feedback_fn):
2582 """Reinstall the instance.
2585 inst = self.instance
2586 old_name = inst.name
2588 if inst.disk_template == constants.DT_FILE:
2589 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2591 self.cfg.RenameInstance(inst.name, self.op.new_name)
2592 # Change the instance lock. This is definitely safe while we hold the BGL
2593 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2594 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2596 # re-read the instance from the configuration after rename
2597 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2599 if inst.disk_template == constants.DT_FILE:
2600 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2601 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2602 old_file_storage_dir,
2603 new_file_storage_dir)
2606 raise errors.OpExecError("Could not connect to node '%s' to rename"
2607 " directory '%s' to '%s' (but the instance"
2608 " has been renamed in Ganeti)" % (
2609 inst.primary_node, old_file_storage_dir,
2610 new_file_storage_dir))
2613 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2614 " (but the instance has been renamed in"
2615 " Ganeti)" % (old_file_storage_dir,
2616 new_file_storage_dir))
2618 _StartInstanceDisks(self, inst, None)
2620 if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2622 msg = ("Could not run OS rename script for instance %s on node %s"
2623 " (but the instance has been renamed in Ganeti)" %
2624 (inst.name, inst.primary_node))
2625 self.proc.LogWarning(msg)
2627 _ShutdownInstanceDisks(self, inst)
2630 class LURemoveInstance(LogicalUnit):
2631 """Remove an instance.
2634 HPATH = "instance-remove"
2635 HTYPE = constants.HTYPE_INSTANCE
2636 _OP_REQP = ["instance_name", "ignore_failures"]
2639 def ExpandNames(self):
2640 self._ExpandAndLockInstance()
2641 self.needed_locks[locking.LEVEL_NODE] = []
2642 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2644 def DeclareLocks(self, level):
2645 if level == locking.LEVEL_NODE:
2646 self._LockInstancesNodes()
2648 def BuildHooksEnv(self):
2651 This runs on master, primary and secondary nodes of the instance.
2654 env = _BuildInstanceHookEnvByObject(self, self.instance)
2655 nl = [self.cfg.GetMasterNode()]
2658 def CheckPrereq(self):
2659 """Check prerequisites.
2661 This checks that the instance is in the cluster.
2664 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2665 assert self.instance is not None, \
2666 "Cannot retrieve locked instance %s" % self.op.instance_name
2668 def Exec(self, feedback_fn):
2669 """Remove the instance.
2672 instance = self.instance
2673 logging.info("Shutting down instance %s on node %s",
2674 instance.name, instance.primary_node)
2676 if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2677 if self.op.ignore_failures:
2678 feedback_fn("Warning: can't shutdown instance")
2680 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2681 (instance.name, instance.primary_node))
2683 logging.info("Removing block devices for instance %s", instance.name)
2685 if not _RemoveDisks(self, instance):
2686 if self.op.ignore_failures:
2687 feedback_fn("Warning: can't remove instance's disks")
2689 raise errors.OpExecError("Can't remove instance's disks")
2691 logging.info("Removing instance %s out of cluster config", instance.name)
2693 self.cfg.RemoveInstance(instance.name)
2694 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2697 class LUQueryInstances(NoHooksLU):
2698 """Logical unit for querying instances.
2701 _OP_REQP = ["output_fields", "names"]
2703 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2704 "admin_state", "admin_ram",
2705 "disk_template", "ip", "mac", "bridge",
2706 "sda_size", "sdb_size", "vcpus", "tags",
2707 "network_port", "beparams",
2708 "(disk).(size)/([0-9]+)",
2710 "(nic).(mac|ip|bridge)/([0-9]+)",
2711 "(nic).(macs|ips|bridges)",
2712 "(disk|nic).(count)",
2713 "serial_no", "hypervisor", "hvparams",] +
2715 for name in constants.HVS_PARAMETERS] +
2717 for name in constants.BES_PARAMETERS])
2718 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2721 def ExpandNames(self):
2722 _CheckOutputFields(static=self._FIELDS_STATIC,
2723 dynamic=self._FIELDS_DYNAMIC,
2724 selected=self.op.output_fields)
2726 self.needed_locks = {}
2727 self.share_locks[locking.LEVEL_INSTANCE] = 1
2728 self.share_locks[locking.LEVEL_NODE] = 1
2731 self.wanted = _GetWantedInstances(self, self.op.names)
2733 self.wanted = locking.ALL_SET
2735 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2737 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2738 self.needed_locks[locking.LEVEL_NODE] = []
2739 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2741 def DeclareLocks(self, level):
2742 if level == locking.LEVEL_NODE and self.do_locking:
2743 self._LockInstancesNodes()
2745 def CheckPrereq(self):
2746 """Check prerequisites.
2751 def Exec(self, feedback_fn):
2752 """Computes the list of nodes and their attributes.
2755 all_info = self.cfg.GetAllInstancesInfo()
2757 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2758 elif self.wanted != locking.ALL_SET:
2759 instance_names = self.wanted
2760 missing = set(instance_names).difference(all_info.keys())
2762 raise errors.OpExecError(
2763 "Some instances were removed before retrieving their data: %s"
2766 instance_names = all_info.keys()
2768 instance_names = utils.NiceSort(instance_names)
2769 instance_list = [all_info[iname] for iname in instance_names]
2771 # begin data gathering
2773 nodes = frozenset([inst.primary_node for inst in instance_list])
2774 hv_list = list(set([inst.hypervisor for inst in instance_list]))
2779 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2781 result = node_data[name]
2783 live_data.update(result)
2784 elif result == False:
2785 bad_nodes.append(name)
2786 # else no instance is alive
2788 live_data = dict([(name, {}) for name in instance_names])
2790 # end data gathering
2795 for instance in instance_list:
2797 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2798 i_be = self.cfg.GetClusterInfo().FillBE(instance)
2799 for field in self.op.output_fields:
2800 st_match = self._FIELDS_STATIC.Matches(field)
2805 elif field == "pnode":
2806 val = instance.primary_node
2807 elif field == "snodes":
2808 val = list(instance.secondary_nodes)
2809 elif field == "admin_state":
2810 val = (instance.status != "down")
2811 elif field == "oper_state":
2812 if instance.primary_node in bad_nodes:
2815 val = bool(live_data.get(instance.name))
2816 elif field == "status":
2817 if instance.primary_node in bad_nodes:
2818 val = "ERROR_nodedown"
2820 running = bool(live_data.get(instance.name))
2822 if instance.status != "down":
2827 if instance.status != "down":
2831 elif field == "oper_ram":
2832 if instance.primary_node in bad_nodes:
2834 elif instance.name in live_data:
2835 val = live_data[instance.name].get("memory", "?")
2838 elif field == "disk_template":
2839 val = instance.disk_template
2841 val = instance.nics[0].ip
2842 elif field == "bridge":
2843 val = instance.nics[0].bridge
2844 elif field == "mac":
2845 val = instance.nics[0].mac
2846 elif field == "sda_size" or field == "sdb_size":
2847 idx = ord(field[2]) - ord('a')
2849 val = instance.FindDisk(idx).size
2850 except errors.OpPrereqError:
2852 elif field == "tags":
2853 val = list(instance.GetTags())
2854 elif field == "serial_no":
2855 val = instance.serial_no
2856 elif field == "network_port":
2857 val = instance.network_port
2858 elif field == "hypervisor":
2859 val = instance.hypervisor
2860 elif field == "hvparams":
2862 elif (field.startswith(HVPREFIX) and
2863 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2864 val = i_hv.get(field[len(HVPREFIX):], None)
2865 elif field == "beparams":
2867 elif (field.startswith(BEPREFIX) and
2868 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2869 val = i_be.get(field[len(BEPREFIX):], None)
2870 elif st_match and st_match.groups():
2871 # matches a variable list
2872 st_groups = st_match.groups()
2873 if st_groups and st_groups[0] == "disk":
2874 if st_groups[1] == "count":
2875 val = len(instance.disks)
2876 elif st_groups[1] == "sizes":
2877 val = [disk.size for disk in instance.disks]
2878 elif st_groups[1] == "size":
2880 val = instance.FindDisk(st_groups[2]).size
2881 except errors.OpPrereqError:
2884 assert False, "Unhandled disk parameter"
2885 elif st_groups[0] == "nic":
2886 if st_groups[1] == "count":
2887 val = len(instance.nics)
2888 elif st_groups[1] == "macs":
2889 val = [nic.mac for nic in instance.nics]
2890 elif st_groups[1] == "ips":
2891 val = [nic.ip for nic in instance.nics]
2892 elif st_groups[1] == "bridges":
2893 val = [nic.bridge for nic in instance.nics]
2896 nic_idx = int(st_groups[2])
2897 if nic_idx >= len(instance.nics):
2900 if st_groups[1] == "mac":
2901 val = instance.nics[nic_idx].mac
2902 elif st_groups[1] == "ip":
2903 val = instance.nics[nic_idx].ip
2904 elif st_groups[1] == "bridge":
2905 val = instance.nics[nic_idx].bridge
2907 assert False, "Unhandled NIC parameter"
2909 assert False, "Unhandled variable parameter"
2911 raise errors.ParameterError(field)
2918 class LUFailoverInstance(LogicalUnit):
2919 """Failover an instance.
2922 HPATH = "instance-failover"
2923 HTYPE = constants.HTYPE_INSTANCE
2924 _OP_REQP = ["instance_name", "ignore_consistency"]
2927 def ExpandNames(self):
2928 self._ExpandAndLockInstance()
2929 self.needed_locks[locking.LEVEL_NODE] = []
2930 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2932 def DeclareLocks(self, level):
2933 if level == locking.LEVEL_NODE:
2934 self._LockInstancesNodes()
2936 def BuildHooksEnv(self):
2939 This runs on master, primary and secondary nodes of the instance.
2943 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2945 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2946 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2949 def CheckPrereq(self):
2950 """Check prerequisites.
2952 This checks that the instance is in the cluster.
2955 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2956 assert self.instance is not None, \
2957 "Cannot retrieve locked instance %s" % self.op.instance_name
2959 bep = self.cfg.GetClusterInfo().FillBE(instance)
2960 if instance.disk_template not in constants.DTS_NET_MIRROR:
2961 raise errors.OpPrereqError("Instance's disk layout is not"
2962 " network mirrored, cannot failover.")
2964 secondary_nodes = instance.secondary_nodes
2965 if not secondary_nodes:
2966 raise errors.ProgrammerError("no secondary node but using "
2967 "a mirrored disk template")
2969 target_node = secondary_nodes[0]
2970 # check memory requirements on the secondary node
2971 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2972 instance.name, bep[constants.BE_MEMORY],
2973 instance.hypervisor)
2975 # check bridge existance
2976 brlist = [nic.bridge for nic in instance.nics]
2977 if not self.rpc.call_bridges_exist(target_node, brlist):
2978 raise errors.OpPrereqError("One or more target bridges %s does not"
2979 " exist on destination node '%s'" %
2980 (brlist, target_node))
2982 def Exec(self, feedback_fn):
2983 """Failover an instance.
2985 The failover is done by shutting it down on its present node and
2986 starting it on the secondary.
2989 instance = self.instance
2991 source_node = instance.primary_node
2992 target_node = instance.secondary_nodes[0]
2994 feedback_fn("* checking disk consistency between source and target")
2995 for dev in instance.disks:
2996 # for drbd, these are drbd over lvm
2997 if not _CheckDiskConsistency(self, dev, target_node, False):
2998 if instance.status == "up" and not self.op.ignore_consistency:
2999 raise errors.OpExecError("Disk %s is degraded on target node,"
3000 " aborting failover." % dev.iv_name)
3002 feedback_fn("* shutting down instance on source node")
3003 logging.info("Shutting down instance %s on node %s",
3004 instance.name, source_node)
3006 if not self.rpc.call_instance_shutdown(source_node, instance):
3007 if self.op.ignore_consistency:
3008 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3010 " anyway. Please make sure node %s is down",
3011 instance.name, source_node, source_node)
3013 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3014 (instance.name, source_node))
3016 feedback_fn("* deactivating the instance's disks on source node")
3017 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3018 raise errors.OpExecError("Can't shut down the instance's disks.")
3020 instance.primary_node = target_node
3021 # distribute new instance config to the other nodes
3022 self.cfg.Update(instance)
3024 # Only start the instance if it's marked as up
3025 if instance.status == "up":
3026 feedback_fn("* activating the instance's disks on target node")
3027 logging.info("Starting instance %s on node %s",
3028 instance.name, target_node)
3030 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3031 ignore_secondaries=True)
3033 _ShutdownInstanceDisks(self, instance)
3034 raise errors.OpExecError("Can't activate the instance's disks")
3036 feedback_fn("* starting the instance on the target node")
3037 if not self.rpc.call_instance_start(target_node, instance, None):
3038 _ShutdownInstanceDisks(self, instance)
3039 raise errors.OpExecError("Could not start instance %s on node %s." %
3040 (instance.name, target_node))
3043 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3044 """Create a tree of block devices on the primary node.
3046 This always creates all devices.
3050 for child in device.children:
3051 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3054 lu.cfg.SetDiskID(device, node)
3055 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3056 instance.name, True, info)
3059 if device.physical_id is None:
3060 device.physical_id = new_id
3064 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3065 """Create a tree of block devices on a secondary node.
3067 If this device type has to be created on secondaries, create it and
3070 If not, just recurse to children keeping the same 'force' value.
3073 if device.CreateOnSecondary():
3076 for child in device.children:
3077 if not _CreateBlockDevOnSecondary(lu, node, instance,
3078 child, force, info):
3083 lu.cfg.SetDiskID(device, node)
3084 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3085 instance.name, False, info)
3088 if device.physical_id is None:
3089 device.physical_id = new_id
3093 def _GenerateUniqueNames(lu, exts):
3094 """Generate a suitable LV name.
3096 This will generate a logical volume name for the given instance.
3101 new_id = lu.cfg.GenerateUniqueID()
3102 results.append("%s%s" % (new_id, val))
3106 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3108 """Generate a drbd8 device complete with its children.
3111 port = lu.cfg.AllocatePort()
3112 vgname = lu.cfg.GetVGName()
3113 shared_secret = lu.cfg.GenerateDRBDSecret()
3114 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3115 logical_id=(vgname, names[0]))
3116 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3117 logical_id=(vgname, names[1]))
3118 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3119 logical_id=(primary, secondary, port,
3122 children=[dev_data, dev_meta],
3127 def _GenerateDiskTemplate(lu, template_name,
3128 instance_name, primary_node,
3129 secondary_nodes, disk_info,
3130 file_storage_dir, file_driver):
3131 """Generate the entire disk layout for a given template type.
3134 #TODO: compute space requirements
3136 vgname = lu.cfg.GetVGName()
3137 disk_count = len(disk_info)
3139 if template_name == constants.DT_DISKLESS:
3141 elif template_name == constants.DT_PLAIN:
3142 if len(secondary_nodes) != 0:
3143 raise errors.ProgrammerError("Wrong template configuration")
3145 names = _GenerateUniqueNames(lu, [".disk%d" % i
3146 for i in range(disk_count)])
3147 for idx, disk in enumerate(disk_info):
3148 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3149 logical_id=(vgname, names[idx]),
3150 iv_name = "disk/%d" % idx)
3151 disks.append(disk_dev)
3152 elif template_name == constants.DT_DRBD8:
3153 if len(secondary_nodes) != 1:
3154 raise errors.ProgrammerError("Wrong template configuration")
3155 remote_node = secondary_nodes[0]
3156 minors = lu.cfg.AllocateDRBDMinor(
3157 [primary_node, remote_node] * len(disk_info), instance_name)
3159 names = _GenerateUniqueNames(lu,
3160 [".disk%d_%s" % (i, s)
3161 for i in range(disk_count)
3162 for s in ("data", "meta")
3164 for idx, disk in enumerate(disk_info):
3165 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3166 disk["size"], names[idx*2:idx*2+2],
3168 minors[idx*2], minors[idx*2+1])
3169 disks.append(disk_dev)
3170 elif template_name == constants.DT_FILE:
3171 if len(secondary_nodes) != 0:
3172 raise errors.ProgrammerError("Wrong template configuration")
3174 for idx, disk in enumerate(disk_info):
3176 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3177 iv_name="disk/%d" % idx,
3178 logical_id=(file_driver,
3179 "%s/disk%d" % (file_storage_dir,
3181 disks.append(disk_dev)
3183 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3187 def _GetInstanceInfoText(instance):
3188 """Compute that text that should be added to the disk's metadata.
3191 return "originstname+%s" % instance.name
3194 def _CreateDisks(lu, instance):
3195 """Create all disks for an instance.
3197 This abstracts away some work from AddInstance.
3199 @type lu: L{LogicalUnit}
3200 @param lu: the logical unit on whose behalf we execute
3201 @type instance: L{objects.Instance}
3202 @param instance: the instance whose disks we should create
3204 @return: the success of the creation
3207 info = _GetInstanceInfoText(instance)
3209 if instance.disk_template == constants.DT_FILE:
3210 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3211 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3215 logging.error("Could not connect to node '%s'", instance.primary_node)
3219 logging.error("Failed to create directory '%s'", file_storage_dir)
3222 for device in instance.disks:
3223 logging.info("Creating volume %s for instance %s",
3224 device.iv_name, instance.name)
3226 for secondary_node in instance.secondary_nodes:
3227 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3228 device, False, info):
3229 logging.error("Failed to create volume %s (%s) on secondary node %s!",
3230 device.iv_name, device, secondary_node)
3233 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3234 instance, device, info):
3235 logging.error("Failed to create volume %s on primary!", device.iv_name)
3241 def _RemoveDisks(lu, instance):
3242 """Remove all disks for an instance.
3244 This abstracts away some work from `AddInstance()` and
3245 `RemoveInstance()`. Note that in case some of the devices couldn't
3246 be removed, the removal will continue with the other ones (compare
3247 with `_CreateDisks()`).
3249 @type lu: L{LogicalUnit}
3250 @param lu: the logical unit on whose behalf we execute
3251 @type instance: L{objects.Instance}
3252 @param instance: the instance whose disks we should remove
3254 @return: the success of the removal
3257 logging.info("Removing block devices for instance %s", instance.name)
3260 for device in instance.disks:
3261 for node, disk in device.ComputeNodeTree(instance.primary_node):
3262 lu.cfg.SetDiskID(disk, node)
3263 if not lu.rpc.call_blockdev_remove(node, disk):
3264 lu.proc.LogWarning("Could not remove block device %s on node %s,"
3265 " continuing anyway", device.iv_name, node)
3268 if instance.disk_template == constants.DT_FILE:
3269 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3270 if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3272 logging.error("Could not remove directory '%s'", file_storage_dir)
3278 def _ComputeDiskSize(disk_template, disks):
3279 """Compute disk size requirements in the volume group
3282 # Required free disk space as a function of disk and swap space
3284 constants.DT_DISKLESS: None,
3285 constants.DT_PLAIN: sum(d["size"] for d in disks),
3286 # 128 MB are added for drbd metadata for each disk
3287 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3288 constants.DT_FILE: None,
3291 if disk_template not in req_size_dict:
3292 raise errors.ProgrammerError("Disk template '%s' size requirement"
3293 " is unknown" % disk_template)
3295 return req_size_dict[disk_template]
3298 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3299 """Hypervisor parameter validation.
3301 This function abstract the hypervisor parameter validation to be
3302 used in both instance create and instance modify.
3304 @type lu: L{LogicalUnit}
3305 @param lu: the logical unit for which we check
3306 @type nodenames: list
3307 @param nodenames: the list of nodes on which we should check
3308 @type hvname: string
3309 @param hvname: the name of the hypervisor we should use
3310 @type hvparams: dict
3311 @param hvparams: the parameters which we need to check
3312 @raise errors.OpPrereqError: if the parameters are not valid
3315 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3318 for node in nodenames:
3319 info = hvinfo.get(node, None)
3320 if not info or not isinstance(info, (tuple, list)):
3321 raise errors.OpPrereqError("Cannot get current information"
3322 " from node '%s' (%s)" % (node, info))
3324 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3328 class LUCreateInstance(LogicalUnit):
3329 """Create an instance.
3332 HPATH = "instance-add"
3333 HTYPE = constants.HTYPE_INSTANCE
3334 _OP_REQP = ["instance_name", "disks", "disk_template",
3336 "wait_for_sync", "ip_check", "nics",
3337 "hvparams", "beparams"]
3340 def _ExpandNode(self, node):
3341 """Expands and checks one node name.
3344 node_full = self.cfg.ExpandNodeName(node)
3345 if node_full is None:
3346 raise errors.OpPrereqError("Unknown node %s" % node)
3349 def ExpandNames(self):
3350 """ExpandNames for CreateInstance.
3352 Figure out the right locks for instance creation.
3355 self.needed_locks = {}
3357 # set optional parameters to none if they don't exist
3358 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3359 if not hasattr(self.op, attr):
3360 setattr(self.op, attr, None)
3362 # cheap checks, mostly valid constants given
3364 # verify creation mode
3365 if self.op.mode not in (constants.INSTANCE_CREATE,
3366 constants.INSTANCE_IMPORT):
3367 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3370 # disk template and mirror node verification
3371 if self.op.disk_template not in constants.DISK_TEMPLATES:
3372 raise errors.OpPrereqError("Invalid disk template name")
3374 if self.op.hypervisor is None:
3375 self.op.hypervisor = self.cfg.GetHypervisorType()
3377 cluster = self.cfg.GetClusterInfo()
3378 enabled_hvs = cluster.enabled_hypervisors
3379 if self.op.hypervisor not in enabled_hvs:
3380 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3381 " cluster (%s)" % (self.op.hypervisor,
3382 ",".join(enabled_hvs)))
3384 # check hypervisor parameter syntax (locally)
3386 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3388 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3389 hv_type.CheckParameterSyntax(filled_hvp)
3391 # fill and remember the beparams dict
3392 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3395 #### instance parameters check
3397 # instance name verification
3398 hostname1 = utils.HostInfo(self.op.instance_name)
3399 self.op.instance_name = instance_name = hostname1.name
3401 # this is just a preventive check, but someone might still add this
3402 # instance in the meantime, and creation will fail at lock-add time
3403 if instance_name in self.cfg.GetInstanceList():
3404 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3407 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3411 for nic in self.op.nics:
3412 # ip validity checks
3413 ip = nic.get("ip", None)
3414 if ip is None or ip.lower() == "none":
3416 elif ip.lower() == constants.VALUE_AUTO:
3417 nic_ip = hostname1.ip
3419 if not utils.IsValidIP(ip):
3420 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3421 " like a valid IP" % ip)
3424 # MAC address verification
3425 mac = nic.get("mac", constants.VALUE_AUTO)
3426 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3427 if not utils.IsValidMac(mac.lower()):
3428 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3430 # bridge verification
3431 bridge = nic.get("bridge", self.cfg.GetDefBridge())
3432 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3434 # disk checks/pre-build
3436 for disk in self.op.disks:
3437 mode = disk.get("mode", constants.DISK_RDWR)
3438 if mode not in constants.DISK_ACCESS_SET:
3439 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3441 size = disk.get("size", None)
3443 raise errors.OpPrereqError("Missing disk size")
3447 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3448 self.disks.append({"size": size, "mode": mode})
3450 # used in CheckPrereq for ip ping check
3451 self.check_ip = hostname1.ip
3453 # file storage checks
3454 if (self.op.file_driver and
3455 not self.op.file_driver in constants.FILE_DRIVER):
3456 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3457 self.op.file_driver)
3459 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3460 raise errors.OpPrereqError("File storage directory path not absolute")
3462 ### Node/iallocator related checks
3463 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3464 raise errors.OpPrereqError("One and only one of iallocator and primary"
3465 " node must be given")
3467 if self.op.iallocator:
3468 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3470 self.op.pnode = self._ExpandNode(self.op.pnode)
3471 nodelist = [self.op.pnode]
3472 if self.op.snode is not None:
3473 self.op.snode = self._ExpandNode(self.op.snode)
3474 nodelist.append(self.op.snode)
3475 self.needed_locks[locking.LEVEL_NODE] = nodelist
3477 # in case of import lock the source node too
3478 if self.op.mode == constants.INSTANCE_IMPORT:
3479 src_node = getattr(self.op, "src_node", None)
3480 src_path = getattr(self.op, "src_path", None)
3482 if src_node is None or src_path is None:
3483 raise errors.OpPrereqError("Importing an instance requires source"
3484 " node and path options")
3486 if not os.path.isabs(src_path):
3487 raise errors.OpPrereqError("The source path must be absolute")
3489 self.op.src_node = src_node = self._ExpandNode(src_node)
3490 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3491 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3493 else: # INSTANCE_CREATE
3494 if getattr(self.op, "os_type", None) is None:
3495 raise errors.OpPrereqError("No guest OS specified")
3497 def _RunAllocator(self):
3498 """Run the allocator based on input opcode.
3501 nics = [n.ToDict() for n in self.nics]
3502 ial = IAllocator(self,
3503 mode=constants.IALLOCATOR_MODE_ALLOC,
3504 name=self.op.instance_name,
3505 disk_template=self.op.disk_template,
3508 vcpus=self.be_full[constants.BE_VCPUS],
3509 mem_size=self.be_full[constants.BE_MEMORY],
3514 ial.Run(self.op.iallocator)
3517 raise errors.OpPrereqError("Can't compute nodes using"
3518 " iallocator '%s': %s" % (self.op.iallocator,
3520 if len(ial.nodes) != ial.required_nodes:
3521 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3522 " of nodes (%s), required %s" %
3523 (self.op.iallocator, len(ial.nodes),
3524 ial.required_nodes))
3525 self.op.pnode = ial.nodes[0]
3526 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3527 self.op.instance_name, self.op.iallocator,
3528 ", ".join(ial.nodes))
3529 if ial.required_nodes == 2:
3530 self.op.snode = ial.nodes[1]
3532 def BuildHooksEnv(self):
3535 This runs on master, primary and secondary nodes of the instance.
3539 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3540 "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3541 "INSTANCE_ADD_MODE": self.op.mode,
3543 if self.op.mode == constants.INSTANCE_IMPORT:
3544 env["INSTANCE_SRC_NODE"] = self.op.src_node
3545 env["INSTANCE_SRC_PATH"] = self.op.src_path
3546 env["INSTANCE_SRC_IMAGES"] = self.src_images
3548 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3549 primary_node=self.op.pnode,
3550 secondary_nodes=self.secondaries,
3551 status=self.instance_status,
3552 os_type=self.op.os_type,
3553 memory=self.be_full[constants.BE_MEMORY],
3554 vcpus=self.be_full[constants.BE_VCPUS],
3555 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3558 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3563 def CheckPrereq(self):
3564 """Check prerequisites.
3567 if (not self.cfg.GetVGName() and
3568 self.op.disk_template not in constants.DTS_NOT_LVM):
3569 raise errors.OpPrereqError("Cluster does not support lvm-based"
3573 if self.op.mode == constants.INSTANCE_IMPORT:
3574 src_node = self.op.src_node
3575 src_path = self.op.src_path
3577 export_info = self.rpc.call_export_info(src_node, src_path)
3580 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3582 if not export_info.has_section(constants.INISECT_EXP):
3583 raise errors.ProgrammerError("Corrupted export config")
3585 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3586 if (int(ei_version) != constants.EXPORT_VERSION):
3587 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3588 (ei_version, constants.EXPORT_VERSION))
3590 # Check that the new instance doesn't have less disks than the export
3591 instance_disks = len(self.disks)
3592 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3593 if instance_disks < export_disks:
3594 raise errors.OpPrereqError("Not enough disks to import."
3595 " (instance: %d, export: %d)" %
3598 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3600 for idx in range(export_disks):
3601 option = 'disk%d_dump' % idx
3602 if export_info.has_option(constants.INISECT_INS, option):
3603 # FIXME: are the old os-es, disk sizes, etc. useful?
3604 export_name = export_info.get(constants.INISECT_INS, option)
3605 image = os.path.join(src_path, export_name)
3606 disk_images.append(image)
3608 disk_images.append(False)
3610 self.src_images = disk_images
3612 if self.op.mac == constants.VALUE_AUTO:
3613 old_name = export_info.get(constants.INISECT_INS, 'name')
3614 if self.op.instance_name == old_name:
3615 # FIXME: adjust every nic, when we'll be able to create instances
3616 # with more than one
3617 if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3618 self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3620 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3622 if self.op.start and not self.op.ip_check:
3623 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3624 " adding an instance in start mode")
3626 if self.op.ip_check:
3627 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3628 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3629 (self.check_ip, self.op.instance_name))
3633 if self.op.iallocator is not None:
3634 self._RunAllocator()
3636 #### node related checks
3638 # check primary node
3639 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3640 assert self.pnode is not None, \
3641 "Cannot retrieve locked node %s" % self.op.pnode
3642 self.secondaries = []
3644 # mirror node verification
3645 if self.op.disk_template in constants.DTS_NET_MIRROR:
3646 if self.op.snode is None:
3647 raise errors.OpPrereqError("The networked disk templates need"
3649 if self.op.snode == pnode.name:
3650 raise errors.OpPrereqError("The secondary node cannot be"
3651 " the primary node.")
3652 self.secondaries.append(self.op.snode)
3654 nodenames = [pnode.name] + self.secondaries
3656 req_size = _ComputeDiskSize(self.op.disk_template,
3659 # Check lv size requirements
3660 if req_size is not None:
3661 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3663 for node in nodenames:
3664 info = nodeinfo.get(node, None)
3666 raise errors.OpPrereqError("Cannot get current information"
3667 " from node '%s'" % node)
3668 vg_free = info.get('vg_free', None)
3669 if not isinstance(vg_free, int):
3670 raise errors.OpPrereqError("Can't compute free disk space on"
3672 if req_size > info['vg_free']:
3673 raise errors.OpPrereqError("Not enough disk space on target node %s."
3674 " %d MB available, %d MB required" %
3675 (node, info['vg_free'], req_size))
3677 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3680 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3682 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3683 " primary node" % self.op.os_type)
3685 # bridge check on primary node
3686 bridges = [n.bridge for n in self.nics]
3687 if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3688 raise errors.OpPrereqError("one of the target bridges '%s' does not"
3690 " destination node '%s'" %
3691 (",".join(bridges), pnode.name))
3693 # memory check on primary node
3695 _CheckNodeFreeMemory(self, self.pnode.name,
3696 "creating instance %s" % self.op.instance_name,
3697 self.be_full[constants.BE_MEMORY],
3701 self.instance_status = 'up'
3703 self.instance_status = 'down'
3705 def Exec(self, feedback_fn):
3706 """Create and add the instance to the cluster.
3709 instance = self.op.instance_name
3710 pnode_name = self.pnode.name
3712 for nic in self.nics:
3713 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3714 nic.mac = self.cfg.GenerateMAC()
3716 ht_kind = self.op.hypervisor
3717 if ht_kind in constants.HTS_REQ_PORT:
3718 network_port = self.cfg.AllocatePort()
3722 ##if self.op.vnc_bind_address is None:
3723 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3725 # this is needed because os.path.join does not accept None arguments
3726 if self.op.file_storage_dir is None:
3727 string_file_storage_dir = ""
3729 string_file_storage_dir = self.op.file_storage_dir
3731 # build the full file storage dir path
3732 file_storage_dir = os.path.normpath(os.path.join(
3733 self.cfg.GetFileStorageDir(),
3734 string_file_storage_dir, instance))
3737 disks = _GenerateDiskTemplate(self,
3738 self.op.disk_template,
3739 instance, pnode_name,
3743 self.op.file_driver)
3745 iobj = objects.Instance(name=instance, os=self.op.os_type,
3746 primary_node=pnode_name,
3747 nics=self.nics, disks=disks,
3748 disk_template=self.op.disk_template,
3749 status=self.instance_status,
3750 network_port=network_port,
3751 beparams=self.op.beparams,
3752 hvparams=self.op.hvparams,
3753 hypervisor=self.op.hypervisor,
3756 feedback_fn("* creating instance disks...")
3757 if not _CreateDisks(self, iobj):
3758 _RemoveDisks(self, iobj)
3759 self.cfg.ReleaseDRBDMinors(instance)
3760 raise errors.OpExecError("Device creation failed, reverting...")
3762 feedback_fn("adding instance %s to cluster config" % instance)
3764 self.cfg.AddInstance(iobj)
3765 # Declare that we don't want to remove the instance lock anymore, as we've
3766 # added the instance to the config
3767 del self.remove_locks[locking.LEVEL_INSTANCE]
3768 # Remove the temp. assignements for the instance's drbds
3769 self.cfg.ReleaseDRBDMinors(instance)
3771 if self.op.wait_for_sync:
3772 disk_abort = not _WaitForSync(self, iobj)
3773 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3774 # make sure the disks are not degraded (still sync-ing is ok)
3776 feedback_fn("* checking mirrors status")
3777 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3782 _RemoveDisks(self, iobj)
3783 self.cfg.RemoveInstance(iobj.name)
3784 # Make sure the instance lock gets removed
3785 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3786 raise errors.OpExecError("There are some degraded disks for"
3789 feedback_fn("creating os for instance %s on node %s" %
3790 (instance, pnode_name))
3792 if iobj.disk_template != constants.DT_DISKLESS:
3793 if self.op.mode == constants.INSTANCE_CREATE:
3794 feedback_fn("* running the instance OS create scripts...")
3795 if not self.rpc.call_instance_os_add(pnode_name, iobj):
3796 raise errors.OpExecError("could not add os for instance %s"
3798 (instance, pnode_name))
3800 elif self.op.mode == constants.INSTANCE_IMPORT:
3801 feedback_fn("* running the instance OS import scripts...")
3802 src_node = self.op.src_node
3803 src_images = self.src_images
3804 cluster_name = self.cfg.GetClusterName()
3805 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3806 src_node, src_images,
3808 for idx, result in enumerate(import_result):
3810 self.LogWarning("Could not image %s for on instance %s, disk %d,"
3811 " on node %s" % (src_images[idx], instance, idx,
3814 # also checked in the prereq part
3815 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3819 logging.info("Starting instance %s on node %s", instance, pnode_name)
3820 feedback_fn("* starting instance...")
3821 if not self.rpc.call_instance_start(pnode_name, iobj, None):
3822 raise errors.OpExecError("Could not start instance")
3825 class LUConnectConsole(NoHooksLU):
3826 """Connect to an instance's console.
3828 This is somewhat special in that it returns the command line that
3829 you need to run on the master node in order to connect to the
3833 _OP_REQP = ["instance_name"]
3836 def ExpandNames(self):
3837 self._ExpandAndLockInstance()
3839 def CheckPrereq(self):
3840 """Check prerequisites.
3842 This checks that the instance is in the cluster.
3845 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3846 assert self.instance is not None, \
3847 "Cannot retrieve locked instance %s" % self.op.instance_name
3849 def Exec(self, feedback_fn):
3850 """Connect to the console of an instance
3853 instance = self.instance
3854 node = instance.primary_node
3856 node_insts = self.rpc.call_instance_list([node],
3857 [instance.hypervisor])[node]
3858 if node_insts is False:
3859 raise errors.OpExecError("Can't connect to node %s." % node)
3861 if instance.name not in node_insts:
3862 raise errors.OpExecError("Instance %s is not running." % instance.name)
3864 logging.debug("Connecting to console of %s on %s", instance.name, node)
3866 hyper = hypervisor.GetHypervisor(instance.hypervisor)
3867 console_cmd = hyper.GetShellCommandForConsole(instance)
3870 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3873 class LUReplaceDisks(LogicalUnit):
3874 """Replace the disks of an instance.
3877 HPATH = "mirrors-replace"
3878 HTYPE = constants.HTYPE_INSTANCE
3879 _OP_REQP = ["instance_name", "mode", "disks"]
3882 def ExpandNames(self):
3883 self._ExpandAndLockInstance()
3885 if not hasattr(self.op, "remote_node"):
3886 self.op.remote_node = None
3888 ia_name = getattr(self.op, "iallocator", None)
3889 if ia_name is not None:
3890 if self.op.remote_node is not None:
3891 raise errors.OpPrereqError("Give either the iallocator or the new"
3892 " secondary, not both")
3893 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3894 elif self.op.remote_node is not None:
3895 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3896 if remote_node is None:
3897 raise errors.OpPrereqError("Node '%s' not known" %
3898 self.op.remote_node)
3899 self.op.remote_node = remote_node
3900 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3901 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3903 self.needed_locks[locking.LEVEL_NODE] = []
3904 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3906 def DeclareLocks(self, level):
3907 # If we're not already locking all nodes in the set we have to declare the
3908 # instance's primary/secondary nodes.
3909 if (level == locking.LEVEL_NODE and
3910 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3911 self._LockInstancesNodes()
3913 def _RunAllocator(self):
3914 """Compute a new secondary node using an IAllocator.
3917 ial = IAllocator(self,
3918 mode=constants.IALLOCATOR_MODE_RELOC,
3919 name=self.op.instance_name,
3920 relocate_from=[self.sec_node])
3922 ial.Run(self.op.iallocator)
3925 raise errors.OpPrereqError("Can't compute nodes using"
3926 " iallocator '%s': %s" % (self.op.iallocator,
3928 if len(ial.nodes) != ial.required_nodes:
3929 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3930 " of nodes (%s), required %s" %
3931 (len(ial.nodes), ial.required_nodes))
3932 self.op.remote_node = ial.nodes[0]
3933 self.LogInfo("Selected new secondary for the instance: %s",
3934 self.op.remote_node)
3936 def BuildHooksEnv(self):
3939 This runs on the master, the primary and all the secondaries.
3943 "MODE": self.op.mode,
3944 "NEW_SECONDARY": self.op.remote_node,
3945 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3947 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3949 self.cfg.GetMasterNode(),
3950 self.instance.primary_node,
3952 if self.op.remote_node is not None:
3953 nl.append(self.op.remote_node)
3956 def CheckPrereq(self):
3957 """Check prerequisites.
3959 This checks that the instance is in the cluster.
3962 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3963 assert instance is not None, \
3964 "Cannot retrieve locked instance %s" % self.op.instance_name
3965 self.instance = instance
3967 if instance.disk_template not in constants.DTS_NET_MIRROR:
3968 raise errors.OpPrereqError("Instance's disk layout is not"
3969 " network mirrored.")
3971 if len(instance.secondary_nodes) != 1:
3972 raise errors.OpPrereqError("The instance has a strange layout,"
3973 " expected one secondary but found %d" %
3974 len(instance.secondary_nodes))
3976 self.sec_node = instance.secondary_nodes[0]
3978 ia_name = getattr(self.op, "iallocator", None)
3979 if ia_name is not None:
3980 self._RunAllocator()
3982 remote_node = self.op.remote_node
3983 if remote_node is not None:
3984 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3985 assert self.remote_node_info is not None, \
3986 "Cannot retrieve locked node %s" % remote_node
3988 self.remote_node_info = None
3989 if remote_node == instance.primary_node:
3990 raise errors.OpPrereqError("The specified node is the primary node of"
3992 elif remote_node == self.sec_node:
3993 if self.op.mode == constants.REPLACE_DISK_SEC:
3994 # this is for DRBD8, where we can't execute the same mode of
3995 # replacement as for drbd7 (no different port allocated)
3996 raise errors.OpPrereqError("Same secondary given, cannot execute"
3998 if instance.disk_template == constants.DT_DRBD8:
3999 if (self.op.mode == constants.REPLACE_DISK_ALL and
4000 remote_node is not None):
4001 # switch to replace secondary mode
4002 self.op.mode = constants.REPLACE_DISK_SEC
4004 if self.op.mode == constants.REPLACE_DISK_ALL:
4005 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4006 " secondary disk replacement, not"
4008 elif self.op.mode == constants.REPLACE_DISK_PRI:
4009 if remote_node is not None:
4010 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4011 " the secondary while doing a primary"
4012 " node disk replacement")
4013 self.tgt_node = instance.primary_node
4014 self.oth_node = instance.secondary_nodes[0]
4015 elif self.op.mode == constants.REPLACE_DISK_SEC:
4016 self.new_node = remote_node # this can be None, in which case
4017 # we don't change the secondary
4018 self.tgt_node = instance.secondary_nodes[0]
4019 self.oth_node = instance.primary_node
4021 raise errors.ProgrammerError("Unhandled disk replace mode")
4023 if not self.op.disks:
4024 self.op.disks = range(len(instance.disks))
4026 for disk_idx in self.op.disks:
4027 instance.FindDisk(disk_idx)
4029 def _ExecD8DiskOnly(self, feedback_fn):
4030 """Replace a disk on the primary or secondary for dbrd8.
4032 The algorithm for replace is quite complicated:
4034 1. for each disk to be replaced:
4036 1. create new LVs on the target node with unique names
4037 1. detach old LVs from the drbd device
4038 1. rename old LVs to name_replaced.<time_t>
4039 1. rename new LVs to old LVs
4040 1. attach the new LVs (with the old names now) to the drbd device
4042 1. wait for sync across all devices
4044 1. for each modified disk:
4046 1. remove old LVs (which have the name name_replaces.<time_t>)
4048 Failures are not very well handled.
4052 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4053 instance = self.instance
4055 vgname = self.cfg.GetVGName()
4058 tgt_node = self.tgt_node
4059 oth_node = self.oth_node
4061 # Step: check device activation
4062 self.proc.LogStep(1, steps_total, "check device existence")
4063 info("checking volume groups")
4064 my_vg = cfg.GetVGName()
4065 results = self.rpc.call_vg_list([oth_node, tgt_node])
4067 raise errors.OpExecError("Can't list volume groups on the nodes")
4068 for node in oth_node, tgt_node:
4069 res = results.get(node, False)
4070 if not res or my_vg not in res:
4071 raise errors.OpExecError("Volume group '%s' not found on %s" %
4073 for idx, dev in enumerate(instance.disks):
4074 if idx not in self.op.disks:
4076 for node in tgt_node, oth_node:
4077 info("checking disk/%d on %s" % (idx, node))
4078 cfg.SetDiskID(dev, node)
4079 if not self.rpc.call_blockdev_find(node, dev):
4080 raise errors.OpExecError("Can't find disk/%d on node %s" %
4083 # Step: check other node consistency
4084 self.proc.LogStep(2, steps_total, "check peer consistency")
4085 for idx, dev in enumerate(instance.disks):
4086 if idx not in self.op.disks:
4088 info("checking disk/%d consistency on %s" % (idx, oth_node))
4089 if not _CheckDiskConsistency(self, dev, oth_node,
4090 oth_node==instance.primary_node):
4091 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4092 " to replace disks on this node (%s)" %
4093 (oth_node, tgt_node))
4095 # Step: create new storage
4096 self.proc.LogStep(3, steps_total, "allocate new storage")
4097 for idx, dev in enumerate(instance.disks):
4098 if idx not in self.op.disks:
4101 cfg.SetDiskID(dev, tgt_node)
4102 lv_names = [".disk%d_%s" % (idx, suf)
4103 for suf in ["data", "meta"]]
4104 names = _GenerateUniqueNames(self, lv_names)
4105 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4106 logical_id=(vgname, names[0]))
4107 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4108 logical_id=(vgname, names[1]))
4109 new_lvs = [lv_data, lv_meta]
4110 old_lvs = dev.children
4111 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4112 info("creating new local storage on %s for %s" %
4113 (tgt_node, dev.iv_name))
4114 # since we *always* want to create this LV, we use the
4115 # _Create...OnPrimary (which forces the creation), even if we
4116 # are talking about the secondary node
4117 for new_lv in new_lvs:
4118 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4119 _GetInstanceInfoText(instance)):
4120 raise errors.OpExecError("Failed to create new LV named '%s' on"
4122 (new_lv.logical_id[1], tgt_node))
4124 # Step: for each lv, detach+rename*2+attach
4125 self.proc.LogStep(4, steps_total, "change drbd configuration")
4126 for dev, old_lvs, new_lvs in iv_names.itervalues():
4127 info("detaching %s drbd from local storage" % dev.iv_name)
4128 if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4129 raise errors.OpExecError("Can't detach drbd from local storage on node"
4130 " %s for device %s" % (tgt_node, dev.iv_name))
4132 #cfg.Update(instance)
4134 # ok, we created the new LVs, so now we know we have the needed
4135 # storage; as such, we proceed on the target node to rename
4136 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4137 # using the assumption that logical_id == physical_id (which in
4138 # turn is the unique_id on that node)
4140 # FIXME(iustin): use a better name for the replaced LVs
4141 temp_suffix = int(time.time())
4142 ren_fn = lambda d, suff: (d.physical_id[0],
4143 d.physical_id[1] + "_replaced-%s" % suff)
4144 # build the rename list based on what LVs exist on the node
4146 for to_ren in old_lvs:
4147 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4148 if find_res is not None: # device exists
4149 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4151 info("renaming the old LVs on the target node")
4152 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4153 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4154 # now we rename the new LVs to the old LVs
4155 info("renaming the new LVs on the target node")
4156 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4157 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4158 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4160 for old, new in zip(old_lvs, new_lvs):
4161 new.logical_id = old.logical_id
4162 cfg.SetDiskID(new, tgt_node)
4164 for disk in old_lvs:
4165 disk.logical_id = ren_fn(disk, temp_suffix)
4166 cfg.SetDiskID(disk, tgt_node)
4168 # now that the new lvs have the old name, we can add them to the device
4169 info("adding new mirror component on %s" % tgt_node)
4170 if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4171 for new_lv in new_lvs:
4172 if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4173 warning("Can't rollback device %s", hint="manually cleanup unused"
4175 raise errors.OpExecError("Can't add local storage to drbd")
4177 dev.children = new_lvs
4178 cfg.Update(instance)
4180 # Step: wait for sync
4182 # this can fail as the old devices are degraded and _WaitForSync
4183 # does a combined result over all disks, so we don't check its
4185 self.proc.LogStep(5, steps_total, "sync devices")
4186 _WaitForSync(self, instance, unlock=True)
4188 # so check manually all the devices
4189 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4190 cfg.SetDiskID(dev, instance.primary_node)
4191 is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4193 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4195 # Step: remove old storage
4196 self.proc.LogStep(6, steps_total, "removing old storage")
4197 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4198 info("remove logical volumes for %s" % name)
4200 cfg.SetDiskID(lv, tgt_node)
4201 if not self.rpc.call_blockdev_remove(tgt_node, lv):
4202 warning("Can't remove old LV", hint="manually remove unused LVs")
4205 def _ExecD8Secondary(self, feedback_fn):
4206 """Replace the secondary node for drbd8.
4208 The algorithm for replace is quite complicated:
4209 - for all disks of the instance:
4210 - create new LVs on the new node with same names
4211 - shutdown the drbd device on the old secondary
4212 - disconnect the drbd network on the primary
4213 - create the drbd device on the new secondary
4214 - network attach the drbd on the primary, using an artifice:
4215 the drbd code for Attach() will connect to the network if it
4216 finds a device which is connected to the good local disks but
4218 - wait for sync across all devices
4219 - remove all disks from the old secondary
4221 Failures are not very well handled.
4225 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4226 instance = self.instance
4228 vgname = self.cfg.GetVGName()
4231 old_node = self.tgt_node
4232 new_node = self.new_node
4233 pri_node = instance.primary_node
4235 # Step: check device activation
4236 self.proc.LogStep(1, steps_total, "check device existence")
4237 info("checking volume groups")
4238 my_vg = cfg.GetVGName()
4239 results = self.rpc.call_vg_list([pri_node, new_node])
4241 raise errors.OpExecError("Can't list volume groups on the nodes")
4242 for node in pri_node, new_node:
4243 res = results.get(node, False)
4244 if not res or my_vg not in res:
4245 raise errors.OpExecError("Volume group '%s' not found on %s" %
4247 for idx, dev in enumerate(instance.disks):
4248 if idx not in self.op.disks:
4250 info("checking disk/%d on %s" % (idx, pri_node))
4251 cfg.SetDiskID(dev, pri_node)
4252 if not self.rpc.call_blockdev_find(pri_node, dev):
4253 raise errors.OpExecError("Can't find disk/%d on node %s" %
4256 # Step: check other node consistency
4257 self.proc.LogStep(2, steps_total, "check peer consistency")
4258 for idx, dev in enumerate(instance.disks):
4259 if idx not in self.op.disks:
4261 info("checking disk/%d consistency on %s" % (idx, pri_node))
4262 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4263 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4264 " unsafe to replace the secondary" %
4267 # Step: create new storage
4268 self.proc.LogStep(3, steps_total, "allocate new storage")
4269 for idx, dev in enumerate(instance.disks):
4271 info("adding new local storage on %s for disk/%d" %
4273 # since we *always* want to create this LV, we use the
4274 # _Create...OnPrimary (which forces the creation), even if we
4275 # are talking about the secondary node
4276 for new_lv in dev.children:
4277 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4278 _GetInstanceInfoText(instance)):
4279 raise errors.OpExecError("Failed to create new LV named '%s' on"
4281 (new_lv.logical_id[1], new_node))
4283 # Step 4: dbrd minors and drbd setups changes
4284 # after this, we must manually remove the drbd minors on both the
4285 # error and the success paths
4286 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4288 logging.debug("Allocated minors %s" % (minors,))
4289 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4290 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4292 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4293 # create new devices on new_node
4294 if pri_node == dev.logical_id[0]:
4295 new_logical_id = (pri_node, new_node,
4296 dev.logical_id[2], dev.logical_id[3], new_minor,
4299 new_logical_id = (new_node, pri_node,
4300 dev.logical_id[2], new_minor, dev.logical_id[4],
4302 iv_names[idx] = (dev, dev.children, new_logical_id)
4303 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4305 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4306 logical_id=new_logical_id,
4307 children=dev.children)
4308 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4310 _GetInstanceInfoText(instance)):
4311 self.cfg.ReleaseDRBDMinors(instance.name)
4312 raise errors.OpExecError("Failed to create new DRBD on"
4313 " node '%s'" % new_node)
4315 for idx, dev in enumerate(instance.disks):
4316 # we have new devices, shutdown the drbd on the old secondary
4317 info("shutting down drbd for disk/%d on old node" % idx)
4318 cfg.SetDiskID(dev, old_node)
4319 if not self.rpc.call_blockdev_shutdown(old_node, dev):
4320 warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4321 hint="Please cleanup this device manually as soon as possible")
4323 info("detaching primary drbds from the network (=> standalone)")
4325 for idx, dev in enumerate(instance.disks):
4326 cfg.SetDiskID(dev, pri_node)
4327 # set the network part of the physical (unique in bdev terms) id
4328 # to None, meaning detach from network
4329 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4330 # and 'find' the device, which will 'fix' it to match the
4332 if self.rpc.call_blockdev_find(pri_node, dev):
4335 warning("Failed to detach drbd disk/%d from network, unusual case" %
4339 # no detaches succeeded (very unlikely)
4340 self.cfg.ReleaseDRBDMinors(instance.name)
4341 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4343 # if we managed to detach at least one, we update all the disks of
4344 # the instance to point to the new secondary
4345 info("updating instance configuration")
4346 for dev, _, new_logical_id in iv_names.itervalues():
4347 dev.logical_id = new_logical_id
4348 cfg.SetDiskID(dev, pri_node)
4349 cfg.Update(instance)
4350 # we can remove now the temp minors as now the new values are
4351 # written to the config file (and therefore stable)
4352 self.cfg.ReleaseDRBDMinors(instance.name)
4354 # and now perform the drbd attach
4355 info("attaching primary drbds to new secondary (standalone => connected)")
4357 for idx, dev in enumerate(instance.disks):
4358 info("attaching primary drbd for disk/%d to new secondary node" % idx)
4359 # since the attach is smart, it's enough to 'find' the device,
4360 # it will automatically activate the network, if the physical_id
4362 cfg.SetDiskID(dev, pri_node)
4363 logging.debug("Disk to attach: %s", dev)
4364 if not self.rpc.call_blockdev_find(pri_node, dev):
4365 warning("can't attach drbd disk/%d to new secondary!" % idx,
4366 "please do a gnt-instance info to see the status of disks")
4368 # this can fail as the old devices are degraded and _WaitForSync
4369 # does a combined result over all disks, so we don't check its
4371 self.proc.LogStep(5, steps_total, "sync devices")
4372 _WaitForSync(self, instance, unlock=True)
4374 # so check manually all the devices
4375 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4376 cfg.SetDiskID(dev, pri_node)
4377 is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4379 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4381 self.proc.LogStep(6, steps_total, "removing old storage")
4382 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4383 info("remove logical volumes for disk/%d" % idx)
4385 cfg.SetDiskID(lv, old_node)
4386 if not self.rpc.call_blockdev_remove(old_node, lv):
4387 warning("Can't remove LV on old secondary",
4388 hint="Cleanup stale volumes by hand")
4390 def Exec(self, feedback_fn):
4391 """Execute disk replacement.
4393 This dispatches the disk replacement to the appropriate handler.
4396 instance = self.instance
4398 # Activate the instance disks if we're replacing them on a down instance
4399 if instance.status == "down":
4400 _StartInstanceDisks(self, instance, True)
4402 if instance.disk_template == constants.DT_DRBD8:
4403 if self.op.remote_node is None:
4404 fn = self._ExecD8DiskOnly
4406 fn = self._ExecD8Secondary
4408 raise errors.ProgrammerError("Unhandled disk replacement case")
4410 ret = fn(feedback_fn)
4412 # Deactivate the instance disks if we're replacing them on a down instance
4413 if instance.status == "down":
4414 _SafeShutdownInstanceDisks(self, instance)
4419 class LUGrowDisk(LogicalUnit):
4420 """Grow a disk of an instance.
4424 HTYPE = constants.HTYPE_INSTANCE
4425 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4428 def ExpandNames(self):
4429 self._ExpandAndLockInstance()
4430 self.needed_locks[locking.LEVEL_NODE] = []
4431 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4433 def DeclareLocks(self, level):
4434 if level == locking.LEVEL_NODE:
4435 self._LockInstancesNodes()
4437 def BuildHooksEnv(self):
4440 This runs on the master, the primary and all the secondaries.
4444 "DISK": self.op.disk,
4445 "AMOUNT": self.op.amount,
4447 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4449 self.cfg.GetMasterNode(),
4450 self.instance.primary_node,
4454 def CheckPrereq(self):
4455 """Check prerequisites.
4457 This checks that the instance is in the cluster.
4460 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4461 assert instance is not None, \
4462 "Cannot retrieve locked instance %s" % self.op.instance_name
4464 self.instance = instance
4466 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4467 raise errors.OpPrereqError("Instance's disk layout does not support"
4470 self.disk = instance.FindDisk(self.op.disk)
4472 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4473 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4474 instance.hypervisor)
4475 for node in nodenames:
4476 info = nodeinfo.get(node, None)
4478 raise errors.OpPrereqError("Cannot get current information"
4479 " from node '%s'" % node)
4480 vg_free = info.get('vg_free', None)
4481 if not isinstance(vg_free, int):
4482 raise errors.OpPrereqError("Can't compute free disk space on"
4484 if self.op.amount > info['vg_free']:
4485 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4486 " %d MiB available, %d MiB required" %
4487 (node, info['vg_free'], self.op.amount))
4489 def Exec(self, feedback_fn):
4490 """Execute disk grow.
4493 instance = self.instance
4495 for node in (instance.secondary_nodes + (instance.primary_node,)):
4496 self.cfg.SetDiskID(disk, node)
4497 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4498 if (not result or not isinstance(result, (list, tuple)) or
4500 raise errors.OpExecError("grow request failed to node %s" % node)
4502 raise errors.OpExecError("grow request failed to node %s: %s" %
4504 disk.RecordGrow(self.op.amount)
4505 self.cfg.Update(instance)
4506 if self.op.wait_for_sync:
4507 disk_abort = not _WaitForSync(self, instance)
4509 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4510 " status.\nPlease check the instance.")
4513 class LUQueryInstanceData(NoHooksLU):
4514 """Query runtime instance data.
4517 _OP_REQP = ["instances", "static"]
4520 def ExpandNames(self):
4521 self.needed_locks = {}
4522 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4524 if not isinstance(self.op.instances, list):
4525 raise errors.OpPrereqError("Invalid argument type 'instances'")
4527 if self.op.instances:
4528 self.wanted_names = []
4529 for name in self.op.instances:
4530 full_name = self.cfg.ExpandInstanceName(name)
4531 if full_name is None:
4532 raise errors.OpPrereqError("Instance '%s' not known" %
4533 self.op.instance_name)
4534 self.wanted_names.append(full_name)
4535 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4537 self.wanted_names = None
4538 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4540 self.needed_locks[locking.LEVEL_NODE] = []
4541 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4543 def DeclareLocks(self, level):
4544 if level == locking.LEVEL_NODE:
4545 self._LockInstancesNodes()
4547 def CheckPrereq(self):
4548 """Check prerequisites.
4550 This only checks the optional instance list against the existing names.
4553 if self.wanted_names is None:
4554 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4556 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4557 in self.wanted_names]
4560 def _ComputeDiskStatus(self, instance, snode, dev):
4561 """Compute block device status.
4564 static = self.op.static
4566 self.cfg.SetDiskID(dev, instance.primary_node)
4567 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4571 if dev.dev_type in constants.LDS_DRBD:
4572 # we change the snode then (otherwise we use the one passed in)
4573 if dev.logical_id[0] == instance.primary_node:
4574 snode = dev.logical_id[1]
4576 snode = dev.logical_id[0]
4578 if snode and not static:
4579 self.cfg.SetDiskID(dev, snode)
4580 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4585 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4586 for child in dev.children]
4591 "iv_name": dev.iv_name,
4592 "dev_type": dev.dev_type,
4593 "logical_id": dev.logical_id,
4594 "physical_id": dev.physical_id,
4595 "pstatus": dev_pstatus,
4596 "sstatus": dev_sstatus,
4597 "children": dev_children,
4602 def Exec(self, feedback_fn):
4603 """Gather and return data"""
4606 cluster = self.cfg.GetClusterInfo()
4608 for instance in self.wanted_instances:
4609 if not self.op.static:
4610 remote_info = self.rpc.call_instance_info(instance.primary_node,
4612 instance.hypervisor)
4613 if remote_info and "state" in remote_info:
4616 remote_state = "down"
4619 if instance.status == "down":
4620 config_state = "down"
4624 disks = [self._ComputeDiskStatus(instance, None, device)
4625 for device in instance.disks]
4628 "name": instance.name,
4629 "config_state": config_state,
4630 "run_state": remote_state,
4631 "pnode": instance.primary_node,
4632 "snodes": instance.secondary_nodes,
4634 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4636 "hypervisor": instance.hypervisor,
4637 "network_port": instance.network_port,
4638 "hv_instance": instance.hvparams,
4639 "hv_actual": cluster.FillHV(instance),
4640 "be_instance": instance.beparams,
4641 "be_actual": cluster.FillBE(instance),
4644 result[instance.name] = idict
4649 class LUSetInstanceParams(LogicalUnit):
4650 """Modifies an instances's parameters.
4653 HPATH = "instance-modify"
4654 HTYPE = constants.HTYPE_INSTANCE
4655 _OP_REQP = ["instance_name", "hvparams"]
4658 def ExpandNames(self):
4659 self._ExpandAndLockInstance()
4660 self.needed_locks[locking.LEVEL_NODE] = []
4661 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4664 def DeclareLocks(self, level):
4665 if level == locking.LEVEL_NODE:
4666 self._LockInstancesNodes()
4668 def BuildHooksEnv(self):
4671 This runs on the master, primary and secondaries.
4675 if constants.BE_MEMORY in self.be_new:
4676 args['memory'] = self.be_new[constants.BE_MEMORY]
4677 if constants.BE_VCPUS in self.be_new:
4678 args['vcpus'] = self.be_new[constants.BE_VCPUS]
4679 if self.do_ip or self.do_bridge or self.mac:
4683 ip = self.instance.nics[0].ip
4685 bridge = self.bridge
4687 bridge = self.instance.nics[0].bridge
4691 mac = self.instance.nics[0].mac
4692 args['nics'] = [(ip, bridge, mac)]
4693 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4694 nl = [self.cfg.GetMasterNode(),
4695 self.instance.primary_node] + list(self.instance.secondary_nodes)
4698 def CheckPrereq(self):
4699 """Check prerequisites.
4701 This only checks the instance list against the existing names.
4704 # FIXME: all the parameters could be checked before, in ExpandNames, or in
4705 # a separate CheckArguments function, if we implement one, so the operation
4706 # can be aborted without waiting for any lock, should it have an error...
4707 self.ip = getattr(self.op, "ip", None)
4708 self.mac = getattr(self.op, "mac", None)
4709 self.bridge = getattr(self.op, "bridge", None)
4710 self.kernel_path = getattr(self.op, "kernel_path", None)
4711 self.initrd_path = getattr(self.op, "initrd_path", None)
4712 self.force = getattr(self.op, "force", None)
4713 all_parms = [self.ip, self.bridge, self.mac]
4714 if (all_parms.count(None) == len(all_parms) and
4715 not self.op.hvparams and
4716 not self.op.beparams):
4717 raise errors.OpPrereqError("No changes submitted")
4718 for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4719 val = self.op.beparams.get(item, None)
4723 except ValueError, err:
4724 raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4725 self.op.beparams[item] = val
4726 if self.ip is not None:
4728 if self.ip.lower() == "none":
4731 if not utils.IsValidIP(self.ip):
4732 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4735 self.do_bridge = (self.bridge is not None)
4736 if self.mac is not None:
4737 if self.cfg.IsMacInUse(self.mac):
4738 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4740 if not utils.IsValidMac(self.mac):
4741 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4743 # checking the new params on the primary/secondary nodes
4745 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4746 assert self.instance is not None, \
4747 "Cannot retrieve locked instance %s" % self.op.instance_name
4748 pnode = self.instance.primary_node
4750 nodelist.extend(instance.secondary_nodes)
4752 # hvparams processing
4753 if self.op.hvparams:
4754 i_hvdict = copy.deepcopy(instance.hvparams)
4755 for key, val in self.op.hvparams.iteritems():
4763 cluster = self.cfg.GetClusterInfo()
4764 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4767 hypervisor.GetHypervisor(
4768 instance.hypervisor).CheckParameterSyntax(hv_new)
4769 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4770 self.hv_new = hv_new # the new actual values
4771 self.hv_inst = i_hvdict # the new dict (without defaults)
4773 self.hv_new = self.hv_inst = {}
4775 # beparams processing
4776 if self.op.beparams:
4777 i_bedict = copy.deepcopy(instance.beparams)
4778 for key, val in self.op.beparams.iteritems():
4786 cluster = self.cfg.GetClusterInfo()
4787 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4789 self.be_new = be_new # the new actual values
4790 self.be_inst = i_bedict # the new dict (without defaults)
4792 self.hv_new = self.hv_inst = {}
4796 if constants.BE_MEMORY in self.op.beparams and not self.force:
4797 mem_check_list = [pnode]
4798 if be_new[constants.BE_AUTO_BALANCE]:
4799 # either we changed auto_balance to yes or it was from before
4800 mem_check_list.extend(instance.secondary_nodes)
4801 instance_info = self.rpc.call_instance_info(pnode, instance.name,
4802 instance.hypervisor)
4803 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4804 instance.hypervisor)
4806 if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4807 # Assume the primary node is unreachable and go ahead
4808 self.warn.append("Can't get info from primary node %s" % pnode)
4811 current_mem = instance_info['memory']
4813 # Assume instance not running
4814 # (there is a slight race condition here, but it's not very probable,
4815 # and we have no other way to check)
4817 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4818 nodeinfo[pnode]['memory_free'])
4820 raise errors.OpPrereqError("This change will prevent the instance"
4821 " from starting, due to %d MB of memory"
4822 " missing on its primary node" % miss_mem)
4824 if be_new[constants.BE_AUTO_BALANCE]:
4825 for node in instance.secondary_nodes:
4826 if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4827 self.warn.append("Can't get info from secondary node %s" % node)
4828 elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4829 self.warn.append("Not enough memory to failover instance to"
4830 " secondary node %s" % node)
4834 def Exec(self, feedback_fn):
4835 """Modifies an instance.
4837 All parameters take effect only at the next restart of the instance.
4839 # Process here the warnings from CheckPrereq, as we don't have a
4840 # feedback_fn there.
4841 for warn in self.warn:
4842 feedback_fn("WARNING: %s" % warn)
4845 instance = self.instance
4847 instance.nics[0].ip = self.ip
4848 result.append(("ip", self.ip))
4850 instance.nics[0].bridge = self.bridge
4851 result.append(("bridge", self.bridge))
4853 instance.nics[0].mac = self.mac
4854 result.append(("mac", self.mac))
4855 if self.op.hvparams:
4856 instance.hvparams = self.hv_new
4857 for key, val in self.op.hvparams.iteritems():
4858 result.append(("hv/%s" % key, val))
4859 if self.op.beparams:
4860 instance.beparams = self.be_inst
4861 for key, val in self.op.beparams.iteritems():
4862 result.append(("be/%s" % key, val))
4864 self.cfg.Update(instance)
4869 class LUQueryExports(NoHooksLU):
4870 """Query the exports list
4873 _OP_REQP = ['nodes']
4876 def ExpandNames(self):
4877 self.needed_locks = {}
4878 self.share_locks[locking.LEVEL_NODE] = 1
4879 if not self.op.nodes:
4880 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4882 self.needed_locks[locking.LEVEL_NODE] = \
4883 _GetWantedNodes(self, self.op.nodes)
4885 def CheckPrereq(self):
4886 """Check prerequisites.
4889 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4891 def Exec(self, feedback_fn):
4892 """Compute the list of all the exported system images.
4895 @return: a dictionary with the structure node->(export-list)
4896 where export-list is a list of the instances exported on
4900 return self.rpc.call_export_list(self.nodes)
4903 class LUExportInstance(LogicalUnit):
4904 """Export an instance to an image in the cluster.
4907 HPATH = "instance-export"
4908 HTYPE = constants.HTYPE_INSTANCE
4909 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4912 def ExpandNames(self):
4913 self._ExpandAndLockInstance()
4914 # FIXME: lock only instance primary and destination node
4916 # Sad but true, for now we have do lock all nodes, as we don't know where
4917 # the previous export might be, and and in this LU we search for it and
4918 # remove it from its current node. In the future we could fix this by:
4919 # - making a tasklet to search (share-lock all), then create the new one,
4920 # then one to remove, after
4921 # - removing the removal operation altoghether
4922 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4924 def DeclareLocks(self, level):
4925 """Last minute lock declaration."""
4926 # All nodes are locked anyway, so nothing to do here.
4928 def BuildHooksEnv(self):
4931 This will run on the master, primary node and target node.
4935 "EXPORT_NODE": self.op.target_node,
4936 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4938 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4939 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4940 self.op.target_node]
4943 def CheckPrereq(self):
4944 """Check prerequisites.
4946 This checks that the instance and node names are valid.
4949 instance_name = self.op.instance_name
4950 self.instance = self.cfg.GetInstanceInfo(instance_name)
4951 assert self.instance is not None, \
4952 "Cannot retrieve locked instance %s" % self.op.instance_name
4954 self.dst_node = self.cfg.GetNodeInfo(
4955 self.cfg.ExpandNodeName(self.op.target_node))
4957 assert self.dst_node is not None, \
4958 "Cannot retrieve locked node %s" % self.op.target_node
4960 # instance disk type verification
4961 for disk in self.instance.disks:
4962 if disk.dev_type == constants.LD_FILE:
4963 raise errors.OpPrereqError("Export not supported for instances with"
4964 " file-based disks")
4966 def Exec(self, feedback_fn):
4967 """Export an instance to an image in the cluster.
4970 instance = self.instance
4971 dst_node = self.dst_node
4972 src_node = instance.primary_node
4973 if self.op.shutdown:
4974 # shutdown the instance, but not the disks
4975 if not self.rpc.call_instance_shutdown(src_node, instance):
4976 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4977 (instance.name, src_node))
4979 vgname = self.cfg.GetVGName()
4984 for disk in instance.disks:
4985 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4986 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4988 if not new_dev_name:
4989 self.LogWarning("Could not snapshot block device %s on node %s",
4990 disk.logical_id[1], src_node)
4991 snap_disks.append(False)
4993 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4994 logical_id=(vgname, new_dev_name),
4995 physical_id=(vgname, new_dev_name),
4996 iv_name=disk.iv_name)
4997 snap_disks.append(new_dev)
5000 if self.op.shutdown and instance.status == "up":
5001 if not self.rpc.call_instance_start(src_node, instance, None):
5002 _ShutdownInstanceDisks(self, instance)
5003 raise errors.OpExecError("Could not start instance")
5005 # TODO: check for size
5007 cluster_name = self.cfg.GetClusterName()
5008 for idx, dev in enumerate(snap_disks):
5010 if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5011 instance, cluster_name, idx):
5012 self.LogWarning("Could not export block device %s from node %s to"
5013 " node %s", dev.logical_id[1], src_node,
5015 if not self.rpc.call_blockdev_remove(src_node, dev):
5016 self.LogWarning("Could not remove snapshot block device %s from node"
5017 " %s", dev.logical_id[1], src_node)
5019 if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5020 self.LogWarning("Could not finalize export for instance %s on node %s",
5021 instance.name, dst_node.name)
5023 nodelist = self.cfg.GetNodeList()
5024 nodelist.remove(dst_node.name)
5026 # on one-node clusters nodelist will be empty after the removal
5027 # if we proceed the backup would be removed because OpQueryExports
5028 # substitutes an empty list with the full cluster node list.
5030 exportlist = self.rpc.call_export_list(nodelist)
5031 for node in exportlist:
5032 if instance.name in exportlist[node]:
5033 if not self.rpc.call_export_remove(node, instance.name):
5034 self.LogWarning("Could not remove older export for instance %s"
5035 " on node %s", instance.name, node)
5038 class LURemoveExport(NoHooksLU):
5039 """Remove exports related to the named instance.
5042 _OP_REQP = ["instance_name"]
5045 def ExpandNames(self):
5046 self.needed_locks = {}
5047 # We need all nodes to be locked in order for RemoveExport to work, but we
5048 # don't need to lock the instance itself, as nothing will happen to it (and
5049 # we can remove exports also for a removed instance)
5050 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5052 def CheckPrereq(self):
5053 """Check prerequisites.
5057 def Exec(self, feedback_fn):
5058 """Remove any export.
5061 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5062 # If the instance was not found we'll try with the name that was passed in.
5063 # This will only work if it was an FQDN, though.
5065 if not instance_name:
5067 instance_name = self.op.instance_name
5069 exportlist = self.rpc.call_export_list(self.acquired_locks[
5070 locking.LEVEL_NODE])
5072 for node in exportlist:
5073 if instance_name in exportlist[node]:
5075 if not self.rpc.call_export_remove(node, instance_name):
5076 logging.error("Could not remove export for instance %s"
5077 " on node %s", instance_name, node)
5079 if fqdn_warn and not found:
5080 feedback_fn("Export not found. If trying to remove an export belonging"
5081 " to a deleted instance please use its Fully Qualified"
5085 class TagsLU(NoHooksLU):
5088 This is an abstract class which is the parent of all the other tags LUs.
5092 def ExpandNames(self):
5093 self.needed_locks = {}
5094 if self.op.kind == constants.TAG_NODE:
5095 name = self.cfg.ExpandNodeName(self.op.name)
5097 raise errors.OpPrereqError("Invalid node name (%s)" %
5100 self.needed_locks[locking.LEVEL_NODE] = name
5101 elif self.op.kind == constants.TAG_INSTANCE:
5102 name = self.cfg.ExpandInstanceName(self.op.name)
5104 raise errors.OpPrereqError("Invalid instance name (%s)" %
5107 self.needed_locks[locking.LEVEL_INSTANCE] = name
5109 def CheckPrereq(self):
5110 """Check prerequisites.
5113 if self.op.kind == constants.TAG_CLUSTER:
5114 self.target = self.cfg.GetClusterInfo()
5115 elif self.op.kind == constants.TAG_NODE:
5116 self.target = self.cfg.GetNodeInfo(self.op.name)
5117 elif self.op.kind == constants.TAG_INSTANCE:
5118 self.target = self.cfg.GetInstanceInfo(self.op.name)
5120 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5124 class LUGetTags(TagsLU):
5125 """Returns the tags of a given object.
5128 _OP_REQP = ["kind", "name"]
5131 def Exec(self, feedback_fn):
5132 """Returns the tag list.
5135 return list(self.target.GetTags())
5138 class LUSearchTags(NoHooksLU):
5139 """Searches the tags for a given pattern.
5142 _OP_REQP = ["pattern"]
5145 def ExpandNames(self):
5146 self.needed_locks = {}
5148 def CheckPrereq(self):
5149 """Check prerequisites.
5151 This checks the pattern passed for validity by compiling it.
5155 self.re = re.compile(self.op.pattern)
5156 except re.error, err:
5157 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5158 (self.op.pattern, err))
5160 def Exec(self, feedback_fn):
5161 """Returns the tag list.
5165 tgts = [("/cluster", cfg.GetClusterInfo())]
5166 ilist = cfg.GetAllInstancesInfo().values()
5167 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5168 nlist = cfg.GetAllNodesInfo().values()
5169 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5171 for path, target in tgts:
5172 for tag in target.GetTags():
5173 if self.re.search(tag):
5174 results.append((path, tag))
5178 class LUAddTags(TagsLU):
5179 """Sets a tag on a given object.
5182 _OP_REQP = ["kind", "name", "tags"]
5185 def CheckPrereq(self):
5186 """Check prerequisites.
5188 This checks the type and length of the tag name and value.
5191 TagsLU.CheckPrereq(self)
5192 for tag in self.op.tags:
5193 objects.TaggableObject.ValidateTag(tag)
5195 def Exec(self, feedback_fn):
5200 for tag in self.op.tags:
5201 self.target.AddTag(tag)
5202 except errors.TagError, err:
5203 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5205 self.cfg.Update(self.target)
5206 except errors.ConfigurationError:
5207 raise errors.OpRetryError("There has been a modification to the"
5208 " config file and the operation has been"
5209 " aborted. Please retry.")
5212 class LUDelTags(TagsLU):
5213 """Delete a list of tags from a given object.
5216 _OP_REQP = ["kind", "name", "tags"]
5219 def CheckPrereq(self):
5220 """Check prerequisites.
5222 This checks that we have the given tag.
5225 TagsLU.CheckPrereq(self)
5226 for tag in self.op.tags:
5227 objects.TaggableObject.ValidateTag(tag)
5228 del_tags = frozenset(self.op.tags)
5229 cur_tags = self.target.GetTags()
5230 if not del_tags <= cur_tags:
5231 diff_tags = del_tags - cur_tags
5232 diff_names = ["'%s'" % tag for tag in diff_tags]
5234 raise errors.OpPrereqError("Tag(s) %s not found" %
5235 (",".join(diff_names)))
5237 def Exec(self, feedback_fn):
5238 """Remove the tag from the object.
5241 for tag in self.op.tags:
5242 self.target.RemoveTag(tag)
5244 self.cfg.Update(self.target)
5245 except errors.ConfigurationError:
5246 raise errors.OpRetryError("There has been a modification to the"
5247 " config file and the operation has been"
5248 " aborted. Please retry.")
5251 class LUTestDelay(NoHooksLU):
5252 """Sleep for a specified amount of time.
5254 This LU sleeps on the master and/or nodes for a specified amount of
5258 _OP_REQP = ["duration", "on_master", "on_nodes"]
5261 def ExpandNames(self):
5262 """Expand names and set required locks.
5264 This expands the node list, if any.
5267 self.needed_locks = {}
5268 if self.op.on_nodes:
5269 # _GetWantedNodes can be used here, but is not always appropriate to use
5270 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5272 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5273 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5275 def CheckPrereq(self):
5276 """Check prerequisites.
5280 def Exec(self, feedback_fn):
5281 """Do the actual sleep.
5284 if self.op.on_master:
5285 if not utils.TestDelay(self.op.duration):
5286 raise errors.OpExecError("Error during master delay test")
5287 if self.op.on_nodes:
5288 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5290 raise errors.OpExecError("Complete failure from rpc call")
5291 for node, node_result in result.items():
5293 raise errors.OpExecError("Failure during rpc call to node %s,"
5294 " result: %s" % (node, node_result))
5297 class IAllocator(object):
5298 """IAllocator framework.
5300 An IAllocator instance has three sets of attributes:
5301 - cfg that is needed to query the cluster
5302 - input data (all members of the _KEYS class attribute are required)
5303 - four buffer attributes (in|out_data|text), that represent the
5304 input (to the external script) in text and data structure format,
5305 and the output from it, again in two formats
5306 - the result variables from the script (success, info, nodes) for
5311 "mem_size", "disks", "disk_template",
5312 "os", "tags", "nics", "vcpus",
5318 def __init__(self, lu, mode, name, **kwargs):
5320 # init buffer variables
5321 self.in_text = self.out_text = self.in_data = self.out_data = None
5322 # init all input fields so that pylint is happy
5325 self.mem_size = self.disks = self.disk_template = None
5326 self.os = self.tags = self.nics = self.vcpus = None
5327 self.relocate_from = None
5329 self.required_nodes = None
5330 # init result fields
5331 self.success = self.info = self.nodes = None
5332 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5333 keyset = self._ALLO_KEYS
5334 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5335 keyset = self._RELO_KEYS
5337 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5338 " IAllocator" % self.mode)
5340 if key not in keyset:
5341 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5342 " IAllocator" % key)
5343 setattr(self, key, kwargs[key])
5345 if key not in kwargs:
5346 raise errors.ProgrammerError("Missing input parameter '%s' to"
5347 " IAllocator" % key)
5348 self._BuildInputData()
5350 def _ComputeClusterData(self):
5351 """Compute the generic allocator input data.
5353 This is the data that is independent of the actual operation.
5357 cluster_info = cfg.GetClusterInfo()
5361 "cluster_name": cfg.GetClusterName(),
5362 "cluster_tags": list(cluster_info.GetTags()),
5363 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5364 # we don't have job IDs
5368 cluster = self.cfg.GetClusterInfo()
5369 for iname in cfg.GetInstanceList():
5370 i_obj = cfg.GetInstanceInfo(iname)
5371 i_list.append((i_obj, cluster.FillBE(i_obj)))
5375 node_list = cfg.GetNodeList()
5376 # FIXME: here we have only one hypervisor information, but
5377 # instance can belong to different hypervisors
5378 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5379 cfg.GetHypervisorType())
5380 for nname in node_list:
5381 ninfo = cfg.GetNodeInfo(nname)
5382 if nname not in node_data or not isinstance(node_data[nname], dict):
5383 raise errors.OpExecError("Can't get data for node %s" % nname)
5384 remote_info = node_data[nname]
5385 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5386 'vg_size', 'vg_free', 'cpu_total']:
5387 if attr not in remote_info:
5388 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5391 remote_info[attr] = int(remote_info[attr])
5392 except ValueError, err:
5393 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5394 " %s" % (nname, attr, str(err)))
5395 # compute memory used by primary instances
5396 i_p_mem = i_p_up_mem = 0
5397 for iinfo, beinfo in i_list:
5398 if iinfo.primary_node == nname:
5399 i_p_mem += beinfo[constants.BE_MEMORY]
5400 if iinfo.status == "up":
5401 i_p_up_mem += beinfo[constants.BE_MEMORY]
5403 # compute memory used by instances
5405 "tags": list(ninfo.GetTags()),
5406 "total_memory": remote_info['memory_total'],
5407 "reserved_memory": remote_info['memory_dom0'],
5408 "free_memory": remote_info['memory_free'],
5409 "i_pri_memory": i_p_mem,
5410 "i_pri_up_memory": i_p_up_mem,
5411 "total_disk": remote_info['vg_size'],
5412 "free_disk": remote_info['vg_free'],
5413 "primary_ip": ninfo.primary_ip,
5414 "secondary_ip": ninfo.secondary_ip,
5415 "total_cpus": remote_info['cpu_total'],
5417 node_results[nname] = pnr
5418 data["nodes"] = node_results
5422 for iinfo, beinfo in i_list:
5423 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5424 for n in iinfo.nics]
5426 "tags": list(iinfo.GetTags()),
5427 "should_run": iinfo.status == "up",
5428 "vcpus": beinfo[constants.BE_VCPUS],
5429 "memory": beinfo[constants.BE_MEMORY],
5431 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5433 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5434 "disk_template": iinfo.disk_template,
5435 "hypervisor": iinfo.hypervisor,
5437 instance_data[iinfo.name] = pir
5439 data["instances"] = instance_data
5443 def _AddNewInstance(self):
5444 """Add new instance data to allocator structure.
5446 This in combination with _AllocatorGetClusterData will create the
5447 correct structure needed as input for the allocator.
5449 The checks for the completeness of the opcode must have already been
5454 if len(self.disks) != 2:
5455 raise errors.OpExecError("Only two-disk configurations supported")
5457 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5459 if self.disk_template in constants.DTS_NET_MIRROR:
5460 self.required_nodes = 2
5462 self.required_nodes = 1
5466 "disk_template": self.disk_template,
5469 "vcpus": self.vcpus,
5470 "memory": self.mem_size,
5471 "disks": self.disks,
5472 "disk_space_total": disk_space,
5474 "required_nodes": self.required_nodes,
5476 data["request"] = request
5478 def _AddRelocateInstance(self):
5479 """Add relocate instance data to allocator structure.
5481 This in combination with _IAllocatorGetClusterData will create the
5482 correct structure needed as input for the allocator.
5484 The checks for the completeness of the opcode must have already been
5488 instance = self.lu.cfg.GetInstanceInfo(self.name)
5489 if instance is None:
5490 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5491 " IAllocator" % self.name)
5493 if instance.disk_template not in constants.DTS_NET_MIRROR:
5494 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5496 if len(instance.secondary_nodes) != 1:
5497 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5499 self.required_nodes = 1
5500 disk_sizes = [{'size': disk.size} for disk in instance.disks]
5501 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5506 "disk_space_total": disk_space,
5507 "required_nodes": self.required_nodes,
5508 "relocate_from": self.relocate_from,
5510 self.in_data["request"] = request
5512 def _BuildInputData(self):
5513 """Build input data structures.
5516 self._ComputeClusterData()
5518 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5519 self._AddNewInstance()
5521 self._AddRelocateInstance()
5523 self.in_text = serializer.Dump(self.in_data)
5525 def Run(self, name, validate=True, call_fn=None):
5526 """Run an instance allocator and return the results.
5530 call_fn = self.lu.rpc.call_iallocator_runner
5533 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5535 if not isinstance(result, (list, tuple)) or len(result) != 4:
5536 raise errors.OpExecError("Invalid result from master iallocator runner")
5538 rcode, stdout, stderr, fail = result
5540 if rcode == constants.IARUN_NOTFOUND:
5541 raise errors.OpExecError("Can't find allocator '%s'" % name)
5542 elif rcode == constants.IARUN_FAILURE:
5543 raise errors.OpExecError("Instance allocator call failed: %s,"
5544 " output: %s" % (fail, stdout+stderr))
5545 self.out_text = stdout
5547 self._ValidateResult()
5549 def _ValidateResult(self):
5550 """Process the allocator results.
5552 This will process and if successful save the result in
5553 self.out_data and the other parameters.
5557 rdict = serializer.Load(self.out_text)
5558 except Exception, err:
5559 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5561 if not isinstance(rdict, dict):
5562 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5564 for key in "success", "info", "nodes":
5565 if key not in rdict:
5566 raise errors.OpExecError("Can't parse iallocator results:"
5567 " missing key '%s'" % key)
5568 setattr(self, key, rdict[key])
5570 if not isinstance(rdict["nodes"], list):
5571 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5573 self.out_data = rdict
5576 class LUTestAllocator(NoHooksLU):
5577 """Run allocator tests.
5579 This LU runs the allocator tests
5582 _OP_REQP = ["direction", "mode", "name"]
5584 def CheckPrereq(self):
5585 """Check prerequisites.
5587 This checks the opcode parameters depending on the director and mode test.
5590 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5591 for attr in ["name", "mem_size", "disks", "disk_template",
5592 "os", "tags", "nics", "vcpus"]:
5593 if not hasattr(self.op, attr):
5594 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5596 iname = self.cfg.ExpandInstanceName(self.op.name)
5597 if iname is not None:
5598 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5600 if not isinstance(self.op.nics, list):
5601 raise errors.OpPrereqError("Invalid parameter 'nics'")
5602 for row in self.op.nics:
5603 if (not isinstance(row, dict) or
5606 "bridge" not in row):
5607 raise errors.OpPrereqError("Invalid contents of the"
5608 " 'nics' parameter")
5609 if not isinstance(self.op.disks, list):
5610 raise errors.OpPrereqError("Invalid parameter 'disks'")
5611 if len(self.op.disks) != 2:
5612 raise errors.OpPrereqError("Only two-disk configurations supported")
5613 for row in self.op.disks:
5614 if (not isinstance(row, dict) or
5615 "size" not in row or
5616 not isinstance(row["size"], int) or
5617 "mode" not in row or
5618 row["mode"] not in ['r', 'w']):
5619 raise errors.OpPrereqError("Invalid contents of the"
5620 " 'disks' parameter")
5621 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5622 if not hasattr(self.op, "name"):
5623 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5624 fname = self.cfg.ExpandInstanceName(self.op.name)
5626 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5628 self.op.name = fname
5629 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5631 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5634 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5635 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5636 raise errors.OpPrereqError("Missing allocator name")
5637 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5638 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5641 def Exec(self, feedback_fn):
5642 """Run the allocator test.
5645 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5646 ial = IAllocator(self,
5649 mem_size=self.op.mem_size,
5650 disks=self.op.disks,
5651 disk_template=self.op.disk_template,
5655 vcpus=self.op.vcpus,
5658 ial = IAllocator(self,
5661 relocate_from=list(self.relocate_from),
5664 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5665 result = ial.in_text
5667 ial.Run(self.op.allocator, validate=False)
5668 result = ial.out_text