4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
49 class LogicalUnit(object):
50 """Logical Unit base class.
52 Subclasses must follow these rules:
53 - implement ExpandNames
54 - implement CheckPrereq
56 - implement BuildHooksEnv
57 - redefine HPATH and HTYPE
58 - optionally redefine their run requirements:
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 self.LogWarning = processor.LogWarning
92 self.LogInfo = processor.LogInfo
94 for attr_name in self._OP_REQP:
95 attr_val = getattr(op, attr_name, None)
97 raise errors.OpPrereqError("Required parameter '%s' missing" %
102 """Returns the SshRunner object
106 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
109 ssh = property(fget=__GetSSH)
111 def CheckArguments(self):
112 """Check syntactic validity for the opcode arguments.
114 This method is for doing a simple syntactic check and ensure
115 validity of opcode parameters, without any cluster-related
116 checks. While the same can be accomplished in ExpandNames and/or
117 CheckPrereq, doing these separate is better because:
119 - ExpandNames is left as as purely a lock-related function
120 - CheckPrereq is run after we have aquired locks (and possible
123 The function is allowed to change the self.op attribute so that
124 later methods can no longer worry about missing parameters.
129 def ExpandNames(self):
130 """Expand names for this LU.
132 This method is called before starting to execute the opcode, and it should
133 update all the parameters of the opcode to their canonical form (e.g. a
134 short node name must be fully expanded after this method has successfully
135 completed). This way locking, hooks, logging, ecc. can work correctly.
137 LUs which implement this method must also populate the self.needed_locks
138 member, as a dict with lock levels as keys, and a list of needed lock names
141 - use an empty dict if you don't need any lock
142 - if you don't need any lock at a particular level omit that level
143 - don't put anything for the BGL level
144 - if you want all locks at a level use locking.ALL_SET as a value
146 If you need to share locks (rather than acquire them exclusively) at one
147 level you can modify self.share_locks, setting a true value (usually 1) for
148 that level. By default locks are not shared.
152 # Acquire all nodes and one instance
153 self.needed_locks = {
154 locking.LEVEL_NODE: locking.ALL_SET,
155 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157 # Acquire just two nodes
158 self.needed_locks = {
159 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
162 self.needed_locks = {} # No, you can't leave it to the default value None
165 # The implementation of this method is mandatory only if the new LU is
166 # concurrent, so that old LUs don't need to be changed all at the same
169 self.needed_locks = {} # Exclusive LUs don't need locks.
171 raise NotImplementedError
173 def DeclareLocks(self, level):
174 """Declare LU locking needs for a level
176 While most LUs can just declare their locking needs at ExpandNames time,
177 sometimes there's the need to calculate some locks after having acquired
178 the ones before. This function is called just before acquiring locks at a
179 particular level, but after acquiring the ones at lower levels, and permits
180 such calculations. It can be used to modify self.needed_locks, and by
181 default it does nothing.
183 This function is only called if you have something already set in
184 self.needed_locks for the level.
186 @param level: Locking level which is going to be locked
187 @type level: member of ganeti.locking.LEVELS
191 def CheckPrereq(self):
192 """Check prerequisites for this LU.
194 This method should check that the prerequisites for the execution
195 of this LU are fulfilled. It can do internode communication, but
196 it should be idempotent - no cluster or system changes are
199 The method should raise errors.OpPrereqError in case something is
200 not fulfilled. Its return value is ignored.
202 This method should also update all the parameters of the opcode to
203 their canonical form if it hasn't been done by ExpandNames before.
206 raise NotImplementedError
208 def Exec(self, feedback_fn):
211 This method should implement the actual work. It should raise
212 errors.OpExecError for failures that are somewhat dealt with in
216 raise NotImplementedError
218 def BuildHooksEnv(self):
219 """Build hooks environment for this LU.
221 This method should return a three-node tuple consisting of: a dict
222 containing the environment that will be used for running the
223 specific hook for this LU, a list of node names on which the hook
224 should run before the execution, and a list of node names on which
225 the hook should run after the execution.
227 The keys of the dict must not have 'GANETI_' prefixed as this will
228 be handled in the hooks runner. Also note additional keys will be
229 added by the hooks runner. If the LU doesn't define any
230 environment, an empty dict (and not None) should be returned.
232 No nodes should be returned as an empty list (and not None).
234 Note that if the HPATH for a LU class is None, this function will
238 raise NotImplementedError
240 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241 """Notify the LU about the results of its hooks.
243 This method is called every time a hooks phase is executed, and notifies
244 the Logical Unit about the hooks' result. The LU can then use it to alter
245 its result based on the hooks. By default the method does nothing and the
246 previous result is passed back unchanged but any LU can define it if it
247 wants to use the local cluster hook-scripts somehow.
249 @param phase: one of L{constants.HOOKS_PHASE_POST} or
250 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251 @param hook_results: the results of the multi-node hooks rpc call
252 @param feedback_fn: function used send feedback back to the caller
253 @param lu_result: the previous Exec result this LU had, or None
255 @return: the new Exec result, based on the previous result
261 def _ExpandAndLockInstance(self):
262 """Helper function to expand and lock an instance.
264 Many LUs that work on an instance take its name in self.op.instance_name
265 and need to expand it and then declare the expanded name for locking. This
266 function does it, and then updates self.op.instance_name to the expanded
267 name. It also initializes needed_locks as a dict, if this hasn't been done
271 if self.needed_locks is None:
272 self.needed_locks = {}
274 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275 "_ExpandAndLockInstance called with instance-level locks set"
276 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277 if expanded_name is None:
278 raise errors.OpPrereqError("Instance '%s' not known" %
279 self.op.instance_name)
280 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281 self.op.instance_name = expanded_name
283 def _LockInstancesNodes(self, primary_only=False):
284 """Helper function to declare instances' nodes for locking.
286 This function should be called after locking one or more instances to lock
287 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288 with all primary or secondary nodes for instances already locked and
289 present in self.needed_locks[locking.LEVEL_INSTANCE].
291 It should be called from DeclareLocks, and for safety only works if
292 self.recalculate_locks[locking.LEVEL_NODE] is set.
294 In the future it may grow parameters to just lock some instance's nodes, or
295 to just lock primaries or secondary nodes, if needed.
297 If should be called in DeclareLocks in a way similar to::
299 if level == locking.LEVEL_NODE:
300 self._LockInstancesNodes()
302 @type primary_only: boolean
303 @param primary_only: only lock primary nodes of locked instances
306 assert locking.LEVEL_NODE in self.recalculate_locks, \
307 "_LockInstancesNodes helper function called with no nodes to recalculate"
309 # TODO: check if we're really been called with the instance locks held
311 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312 # future we might want to have different behaviors depending on the value
313 # of self.recalculate_locks[locking.LEVEL_NODE]
315 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316 instance = self.context.cfg.GetInstanceInfo(instance_name)
317 wanted_nodes.append(instance.primary_node)
319 wanted_nodes.extend(instance.secondary_nodes)
321 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326 del self.recalculate_locks[locking.LEVEL_NODE]
329 class NoHooksLU(LogicalUnit):
330 """Simple LU which runs no hooks.
332 This LU is intended as a parent for other LogicalUnits which will
333 run no hooks, in order to reduce duplicate code.
340 def _GetWantedNodes(lu, nodes):
341 """Returns list of checked and expanded node names.
343 @type lu: L{LogicalUnit}
344 @param lu: the logical unit on whose behalf we execute
346 @param nodes: list of node names or None for all nodes
348 @return: the list of nodes, sorted
349 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
352 if not isinstance(nodes, list):
353 raise errors.OpPrereqError("Invalid argument type 'nodes'")
356 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357 " non-empty list of nodes whose name is to be expanded.")
361 node = lu.cfg.ExpandNodeName(name)
363 raise errors.OpPrereqError("No such node name '%s'" % name)
366 return utils.NiceSort(wanted)
369 def _GetWantedInstances(lu, instances):
370 """Returns list of checked and expanded instance names.
372 @type lu: L{LogicalUnit}
373 @param lu: the logical unit on whose behalf we execute
374 @type instances: list
375 @param instances: list of instance names or None for all instances
377 @return: the list of instances, sorted
378 @raise errors.OpPrereqError: if the instances parameter is wrong type
379 @raise errors.OpPrereqError: if any of the passed instances is not found
382 if not isinstance(instances, list):
383 raise errors.OpPrereqError("Invalid argument type 'instances'")
388 for name in instances:
389 instance = lu.cfg.ExpandInstanceName(name)
391 raise errors.OpPrereqError("No such instance name '%s'" % name)
392 wanted.append(instance)
395 wanted = lu.cfg.GetInstanceList()
396 return utils.NiceSort(wanted)
399 def _CheckOutputFields(static, dynamic, selected):
400 """Checks whether all selected fields are valid.
402 @type static: L{utils.FieldSet}
403 @param static: static fields set
404 @type dynamic: L{utils.FieldSet}
405 @param dynamic: dynamic fields set
412 delta = f.NonMatching(selected)
414 raise errors.OpPrereqError("Unknown output fields selected: %s"
418 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419 memory, vcpus, nics):
420 """Builds instance related env variables for hooks
422 This builds the hook environment from individual variables.
425 @param name: the name of the instance
426 @type primary_node: string
427 @param primary_node: the name of the instance's primary node
428 @type secondary_nodes: list
429 @param secondary_nodes: list of secondary nodes as strings
430 @type os_type: string
431 @param os_type: the name of the instance's OS
433 @param status: the desired status of the instances
435 @param memory: the memory size of the instance
437 @param vcpus: the count of VCPUs the instance has
439 @param nics: list of tuples (ip, bridge, mac) representing
440 the NICs the instance has
442 @return: the hook environment for this instance
447 "INSTANCE_NAME": name,
448 "INSTANCE_PRIMARY": primary_node,
449 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450 "INSTANCE_OS_TYPE": os_type,
451 "INSTANCE_STATUS": status,
452 "INSTANCE_MEMORY": memory,
453 "INSTANCE_VCPUS": vcpus,
457 nic_count = len(nics)
458 for idx, (ip, bridge, mac) in enumerate(nics):
461 env["INSTANCE_NIC%d_IP" % idx] = ip
462 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
467 env["INSTANCE_NIC_COUNT"] = nic_count
472 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473 """Builds instance related env variables for hooks from an object.
475 @type lu: L{LogicalUnit}
476 @param lu: the logical unit on whose behalf we execute
477 @type instance: L{objects.Instance}
478 @param instance: the instance for which we should build the
481 @param override: dictionary with key/values that will override
484 @return: the hook environment dictionary
487 bep = lu.cfg.GetClusterInfo().FillBE(instance)
489 'name': instance.name,
490 'primary_node': instance.primary_node,
491 'secondary_nodes': instance.secondary_nodes,
492 'os_type': instance.os,
493 'status': instance.os,
494 'memory': bep[constants.BE_MEMORY],
495 'vcpus': bep[constants.BE_VCPUS],
496 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
499 args.update(override)
500 return _BuildInstanceHookEnv(**args)
503 def _CheckInstanceBridgesExist(lu, instance):
504 """Check that the brigdes needed by an instance exist.
507 # check bridges existance
508 brlist = [nic.bridge for nic in instance.nics]
509 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
512 raise errors.OpPrereqError("One or more target bridges %s does not"
513 " exist on destination node '%s'" %
514 (brlist, instance.primary_node))
517 class LUDestroyCluster(NoHooksLU):
518 """Logical unit for destroying the cluster.
523 def CheckPrereq(self):
524 """Check prerequisites.
526 This checks whether the cluster is empty.
528 Any errors are signalled by raising errors.OpPrereqError.
531 master = self.cfg.GetMasterNode()
533 nodelist = self.cfg.GetNodeList()
534 if len(nodelist) != 1 or nodelist[0] != master:
535 raise errors.OpPrereqError("There are still %d node(s) in"
536 " this cluster." % (len(nodelist) - 1))
537 instancelist = self.cfg.GetInstanceList()
539 raise errors.OpPrereqError("There are still %d instance(s) in"
540 " this cluster." % len(instancelist))
542 def Exec(self, feedback_fn):
543 """Destroys the cluster.
546 master = self.cfg.GetMasterNode()
547 result = self.rpc.call_node_stop_master(master, False)
550 raise errors.OpExecError("Could not disable the master role")
551 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552 utils.CreateBackup(priv_key)
553 utils.CreateBackup(pub_key)
557 class LUVerifyCluster(LogicalUnit):
558 """Verifies the cluster status.
561 HPATH = "cluster-verify"
562 HTYPE = constants.HTYPE_CLUSTER
563 _OP_REQP = ["skip_checks"]
566 def ExpandNames(self):
567 self.needed_locks = {
568 locking.LEVEL_NODE: locking.ALL_SET,
569 locking.LEVEL_INSTANCE: locking.ALL_SET,
571 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
573 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574 node_result, feedback_fn, master_files):
575 """Run multiple tests against a node.
579 - compares ganeti version
580 - checks vg existance and size > 20G
581 - checks config file checksum
582 - checks ssh to other nodes
584 @type nodeinfo: L{objects.Node}
585 @param nodeinfo: the node to check
586 @param file_list: required list of files
587 @param local_cksum: dictionary of local files and their checksums
588 @param node_result: the results from the node
589 @param feedback_fn: function used to accumulate results
590 @param master_files: list of files that only masters should have
595 # main result, node_result should be a non-empty dict
596 if not node_result or not isinstance(node_result, dict):
597 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
600 # compares ganeti version
601 local_version = constants.PROTOCOL_VERSION
602 remote_version = node_result.get('version', None)
603 if not remote_version:
604 feedback_fn(" - ERROR: connection to %s failed" % (node))
607 if local_version != remote_version:
608 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
609 (local_version, node, remote_version))
612 # checks vg existance and size > 20G
615 vglist = node_result.get(constants.NV_VGLIST, None)
617 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
621 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622 constants.MIN_VG_SIZE)
624 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
627 # checks config file checksum
629 remote_cksum = node_result.get(constants.NV_FILELIST, None)
630 if not isinstance(remote_cksum, dict):
632 feedback_fn(" - ERROR: node hasn't returned file checksum data")
634 for file_name in file_list:
635 node_is_mc = nodeinfo.master_candidate
636 must_have_file = file_name not in master_files
637 if file_name not in remote_cksum:
638 if node_is_mc or must_have_file:
640 feedback_fn(" - ERROR: file '%s' missing" % file_name)
641 elif remote_cksum[file_name] != local_cksum[file_name]:
642 if node_is_mc or must_have_file:
644 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
646 # not candidate and this is not a must-have file
648 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
651 # all good, except non-master/non-must have combination
652 if not node_is_mc and not must_have_file:
653 feedback_fn(" - ERROR: file '%s' should not exist on non master"
654 " candidates" % file_name)
658 if constants.NV_NODELIST not in node_result:
660 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
662 if node_result[constants.NV_NODELIST]:
664 for node in node_result[constants.NV_NODELIST]:
665 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
666 (node, node_result[constants.NV_NODELIST][node]))
668 if constants.NV_NODENETTEST not in node_result:
670 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
672 if node_result[constants.NV_NODENETTEST]:
674 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
676 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
677 (node, node_result[constants.NV_NODENETTEST][node]))
679 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680 if isinstance(hyp_result, dict):
681 for hv_name, hv_result in hyp_result.iteritems():
682 if hv_result is not None:
683 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
684 (hv_name, hv_result))
687 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688 node_instance, feedback_fn):
689 """Verify an instance.
691 This function checks to see if the required block devices are
692 available on the instance's node.
697 node_current = instanceconfig.primary_node
700 instanceconfig.MapLVsByNode(node_vol_should)
702 for node in node_vol_should:
703 for volume in node_vol_should[node]:
704 if node not in node_vol_is or volume not in node_vol_is[node]:
705 feedback_fn(" - ERROR: volume %s missing on node %s" %
709 if not instanceconfig.status == 'down':
710 if (node_current not in node_instance or
711 not instance in node_instance[node_current]):
712 feedback_fn(" - ERROR: instance %s not running on node %s" %
713 (instance, node_current))
716 for node in node_instance:
717 if (not node == node_current):
718 if instance in node_instance[node]:
719 feedback_fn(" - ERROR: instance %s should not run on node %s" %
725 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726 """Verify if there are any unknown volumes in the cluster.
728 The .os, .swap and backup volumes are ignored. All other volumes are
734 for node in node_vol_is:
735 for volume in node_vol_is[node]:
736 if node not in node_vol_should or volume not in node_vol_should[node]:
737 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
742 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743 """Verify the list of running instances.
745 This checks what instances are running but unknown to the cluster.
749 for node in node_instance:
750 for runninginstance in node_instance[node]:
751 if runninginstance not in instancelist:
752 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
753 (runninginstance, node))
757 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758 """Verify N+1 Memory Resilience.
760 Check that if one single node dies we can still start all the instances it
766 for node, nodeinfo in node_info.iteritems():
767 # This code checks that every node which is now listed as secondary has
768 # enough memory to host all instances it is supposed to should a single
769 # other node in the cluster fail.
770 # FIXME: not ready for failover to an arbitrary node
771 # FIXME: does not support file-backed instances
772 # WARNING: we currently take into account down instances as well as up
773 # ones, considering that even if they're down someone might want to start
774 # them even in the event of a node failure.
775 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
777 for instance in instances:
778 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779 if bep[constants.BE_AUTO_BALANCE]:
780 needed_mem += bep[constants.BE_MEMORY]
781 if nodeinfo['mfree'] < needed_mem:
782 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
783 " failovers should node %s fail" % (node, prinode))
787 def CheckPrereq(self):
788 """Check prerequisites.
790 Transform the list of checks we're going to skip into a set and check that
791 all its members are valid.
794 self.skip_set = frozenset(self.op.skip_checks)
795 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796 raise errors.OpPrereqError("Invalid checks to be skipped specified")
798 def BuildHooksEnv(self):
801 Cluster-Verify hooks just rone in the post phase and their failure makes
802 the output be logged in the verify output and the verification to fail.
805 all_nodes = self.cfg.GetNodeList()
806 # TODO: populate the environment with useful information for verify hooks
808 return env, [], all_nodes
810 def Exec(self, feedback_fn):
811 """Verify integrity of cluster, performing various test on nodes.
815 feedback_fn("* Verifying global settings")
816 for msg in self.cfg.VerifyConfig():
817 feedback_fn(" - ERROR: %s" % msg)
819 vg_name = self.cfg.GetVGName()
820 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821 nodelist = utils.NiceSort(self.cfg.GetNodeList())
822 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824 i_non_redundant = [] # Non redundant instances
825 i_non_a_balanced = [] # Non auto-balanced instances
831 # FIXME: verify OS list
833 master_files = [constants.CLUSTER_CONF_FILE]
835 file_names = ssconf.SimpleStore().GetFileList()
836 file_names.append(constants.SSL_CERT_FILE)
837 file_names.extend(master_files)
839 local_checksums = utils.FingerprintFiles(file_names)
841 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842 node_verify_param = {
843 constants.NV_FILELIST: file_names,
844 constants.NV_NODELIST: nodelist,
845 constants.NV_HYPERVISOR: hypervisors,
846 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847 node.secondary_ip) for node in nodeinfo],
848 constants.NV_LVLIST: vg_name,
849 constants.NV_INSTANCELIST: hypervisors,
850 constants.NV_VGLIST: None,
851 constants.NV_VERSION: None,
852 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
854 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855 self.cfg.GetClusterName())
857 cluster = self.cfg.GetClusterInfo()
858 master_node = self.cfg.GetMasterNode()
859 for node_i in nodeinfo:
861 nresult = all_nvinfo[node].data
863 if node == master_node:
865 elif node_i.master_candidate:
866 ntype = "master candidate"
869 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
871 if all_nvinfo[node].failed or not isinstance(nresult, dict):
872 feedback_fn(" - ERROR: connection to %s failed" % (node,))
876 result = self._VerifyNode(node_i, file_names, local_checksums,
877 nresult, feedback_fn, master_files)
880 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881 if isinstance(lvdata, basestring):
882 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
883 (node, lvdata.encode('string_escape')))
885 node_volume[node] = {}
886 elif not isinstance(lvdata, dict):
887 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
891 node_volume[node] = lvdata
894 idata = nresult.get(constants.NV_INSTANCELIST, None)
895 if not isinstance(idata, list):
896 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
901 node_instance[node] = idata
904 nodeinfo = nresult.get(constants.NV_HVINFO, None)
905 if not isinstance(nodeinfo, dict):
906 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
912 "mfree": int(nodeinfo['memory_free']),
913 "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
916 # dictionary holding all instances this node is secondary for,
917 # grouped by their primary node. Each key is a cluster node, and each
918 # value is a list of instances which have the key as primary and the
919 # current node as secondary. this is handy to calculate N+1 memory
920 # availability if you can only failover from a primary to its
922 "sinst-by-pnode": {},
925 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
931 for instance in instancelist:
932 feedback_fn("* Verifying instance %s" % instance)
933 inst_config = self.cfg.GetInstanceInfo(instance)
934 result = self._VerifyInstance(instance, inst_config, node_volume,
935 node_instance, feedback_fn)
938 inst_config.MapLVsByNode(node_vol_should)
940 instance_cfg[instance] = inst_config
942 pnode = inst_config.primary_node
943 if pnode in node_info:
944 node_info[pnode]['pinst'].append(instance)
946 feedback_fn(" - ERROR: instance %s, connection to primary node"
947 " %s failed" % (instance, pnode))
950 # If the instance is non-redundant we cannot survive losing its primary
951 # node, so we are not N+1 compliant. On the other hand we have no disk
952 # templates with more than one secondary so that situation is not well
954 # FIXME: does not support file-backed instances
955 if len(inst_config.secondary_nodes) == 0:
956 i_non_redundant.append(instance)
957 elif len(inst_config.secondary_nodes) > 1:
958 feedback_fn(" - WARNING: multiple secondaries for instance %s"
961 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962 i_non_a_balanced.append(instance)
964 for snode in inst_config.secondary_nodes:
965 if snode in node_info:
966 node_info[snode]['sinst'].append(instance)
967 if pnode not in node_info[snode]['sinst-by-pnode']:
968 node_info[snode]['sinst-by-pnode'][pnode] = []
969 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
971 feedback_fn(" - ERROR: instance %s, connection to secondary node"
972 " %s failed" % (instance, snode))
974 feedback_fn("* Verifying orphan volumes")
975 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
979 feedback_fn("* Verifying remaining instances")
980 result = self._VerifyOrphanInstances(instancelist, node_instance,
984 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985 feedback_fn("* Verifying N+1 Memory redundancy")
986 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
989 feedback_fn("* Other Notes")
991 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
992 % len(i_non_redundant))
995 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
996 % len(i_non_a_balanced))
1000 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001 """Analize the post-hooks' result
1003 This method analyses the hook result, handles it, and sends some
1004 nicely-formatted feedback back to the user.
1006 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008 @param hooks_results: the results of the multi-node hooks rpc call
1009 @param feedback_fn: function used send feedback back to the caller
1010 @param lu_result: previous Exec result
1011 @return: the new Exec result, based on the previous result
1015 # We only really run POST phase hooks, and are only interested in
1017 if phase == constants.HOOKS_PHASE_POST:
1018 # Used to change hooks' output to proper indentation
1019 indent_re = re.compile('^', re.M)
1020 feedback_fn("* Hooks Results")
1021 if not hooks_results:
1022 feedback_fn(" - ERROR: general communication failure")
1025 for node_name in hooks_results:
1026 show_node_header = True
1027 res = hooks_results[node_name]
1028 if res.failed or res.data is False or not isinstance(res.data, list):
1029 feedback_fn(" Communication failure in hooks execution")
1032 for script, hkr, output in res.data:
1033 if hkr == constants.HKR_FAIL:
1034 # The node header is only shown once, if there are
1035 # failing hooks on that node
1036 if show_node_header:
1037 feedback_fn(" Node %s:" % node_name)
1038 show_node_header = False
1039 feedback_fn(" ERROR: Script %s failed, output:" % script)
1040 output = indent_re.sub(' ', output)
1041 feedback_fn("%s" % output)
1047 class LUVerifyDisks(NoHooksLU):
1048 """Verifies the cluster disks status.
1054 def ExpandNames(self):
1055 self.needed_locks = {
1056 locking.LEVEL_NODE: locking.ALL_SET,
1057 locking.LEVEL_INSTANCE: locking.ALL_SET,
1059 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1061 def CheckPrereq(self):
1062 """Check prerequisites.
1064 This has no prerequisites.
1069 def Exec(self, feedback_fn):
1070 """Verify integrity of cluster disks.
1073 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1075 vg_name = self.cfg.GetVGName()
1076 nodes = utils.NiceSort(self.cfg.GetNodeList())
1077 instances = [self.cfg.GetInstanceInfo(name)
1078 for name in self.cfg.GetInstanceList()]
1081 for inst in instances:
1083 if (inst.status != "up" or
1084 inst.disk_template not in constants.DTS_NET_MIRROR):
1086 inst.MapLVsByNode(inst_lvs)
1087 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088 for node, vol_list in inst_lvs.iteritems():
1089 for vol in vol_list:
1090 nv_dict[(node, vol)] = inst
1095 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1100 lvs = node_lvs[node]
1102 self.LogWarning("Connection to node %s failed: %s" %
1106 if isinstance(lvs, basestring):
1107 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108 res_nlvm[node] = lvs
1109 elif not isinstance(lvs, dict):
1110 logging.warning("Connection to node %s failed or invalid data"
1112 res_nodes.append(node)
1115 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116 inst = nv_dict.pop((node, lv_name), None)
1117 if (not lv_online and inst is not None
1118 and inst.name not in res_instances):
1119 res_instances.append(inst.name)
1121 # any leftover items in nv_dict are missing LVs, let's arrange the
1123 for key, inst in nv_dict.iteritems():
1124 if inst.name not in res_missing:
1125 res_missing[inst.name] = []
1126 res_missing[inst.name].append(key)
1131 class LURenameCluster(LogicalUnit):
1132 """Rename the cluster.
1135 HPATH = "cluster-rename"
1136 HTYPE = constants.HTYPE_CLUSTER
1139 def BuildHooksEnv(self):
1144 "OP_TARGET": self.cfg.GetClusterName(),
1145 "NEW_NAME": self.op.name,
1147 mn = self.cfg.GetMasterNode()
1148 return env, [mn], [mn]
1150 def CheckPrereq(self):
1151 """Verify that the passed name is a valid one.
1154 hostname = utils.HostInfo(self.op.name)
1156 new_name = hostname.name
1157 self.ip = new_ip = hostname.ip
1158 old_name = self.cfg.GetClusterName()
1159 old_ip = self.cfg.GetMasterIP()
1160 if new_name == old_name and new_ip == old_ip:
1161 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162 " cluster has changed")
1163 if new_ip != old_ip:
1164 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166 " reachable on the network. Aborting." %
1169 self.op.name = new_name
1171 def Exec(self, feedback_fn):
1172 """Rename the cluster.
1175 clustername = self.op.name
1178 # shutdown the master IP
1179 master = self.cfg.GetMasterNode()
1180 result = self.rpc.call_node_stop_master(master, False)
1181 if result.failed or not result.data:
1182 raise errors.OpExecError("Could not disable the master role")
1185 cluster = self.cfg.GetClusterInfo()
1186 cluster.cluster_name = clustername
1187 cluster.master_ip = ip
1188 self.cfg.Update(cluster)
1190 # update the known hosts file
1191 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192 node_list = self.cfg.GetNodeList()
1194 node_list.remove(master)
1197 result = self.rpc.call_upload_file(node_list,
1198 constants.SSH_KNOWN_HOSTS_FILE)
1199 for to_node, to_result in result.iteritems():
1200 if to_result.failed or not to_result.data:
1201 logging.error("Copy of file %s to node %s failed", fname, to_node)
1204 result = self.rpc.call_node_start_master(master, False)
1205 if result.failed or not result.data:
1206 self.LogWarning("Could not re-enable the master role on"
1207 " the master, please restart manually.")
1210 def _RecursiveCheckIfLVMBased(disk):
1211 """Check if the given disk or its children are lvm-based.
1213 @type disk: L{objects.Disk}
1214 @param disk: the disk to check
1216 @return: boolean indicating whether a LD_LV dev_type was found or not
1220 for chdisk in disk.children:
1221 if _RecursiveCheckIfLVMBased(chdisk):
1223 return disk.dev_type == constants.LD_LV
1226 class LUSetClusterParams(LogicalUnit):
1227 """Change the parameters of the cluster.
1230 HPATH = "cluster-modify"
1231 HTYPE = constants.HTYPE_CLUSTER
1235 def CheckParameters(self):
1239 if not hasattr(self.op, "candidate_pool_size"):
1240 self.op.candidate_pool_size = None
1241 if self.op.candidate_pool_size is not None:
1243 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244 except ValueError, err:
1245 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1247 if self.op.candidate_pool_size < 1:
1248 raise errors.OpPrereqError("At least one master candidate needed")
1250 def ExpandNames(self):
1251 # FIXME: in the future maybe other cluster params won't require checking on
1252 # all nodes to be modified.
1253 self.needed_locks = {
1254 locking.LEVEL_NODE: locking.ALL_SET,
1256 self.share_locks[locking.LEVEL_NODE] = 1
1258 def BuildHooksEnv(self):
1263 "OP_TARGET": self.cfg.GetClusterName(),
1264 "NEW_VG_NAME": self.op.vg_name,
1266 mn = self.cfg.GetMasterNode()
1267 return env, [mn], [mn]
1269 def CheckPrereq(self):
1270 """Check prerequisites.
1272 This checks whether the given params don't conflict and
1273 if the given volume group is valid.
1276 # FIXME: This only works because there is only one parameter that can be
1277 # changed or removed.
1278 if self.op.vg_name is not None and not self.op.vg_name:
1279 instances = self.cfg.GetAllInstancesInfo().values()
1280 for inst in instances:
1281 for disk in inst.disks:
1282 if _RecursiveCheckIfLVMBased(disk):
1283 raise errors.OpPrereqError("Cannot disable lvm storage while"
1284 " lvm-based instances exist")
1286 node_list = self.acquired_locks[locking.LEVEL_NODE]
1288 # if vg_name not None, checks given volume group on all nodes
1290 vglist = self.rpc.call_vg_list(node_list)
1291 for node in node_list:
1292 if vglist[node].failed:
1293 # ignoring down node
1294 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1296 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1298 constants.MIN_VG_SIZE)
1300 raise errors.OpPrereqError("Error on node '%s': %s" %
1303 self.cluster = cluster = self.cfg.GetClusterInfo()
1304 # validate beparams changes
1305 if self.op.beparams:
1306 utils.CheckBEParams(self.op.beparams)
1307 self.new_beparams = cluster.FillDict(
1308 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1310 # hypervisor list/parameters
1311 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312 if self.op.hvparams:
1313 if not isinstance(self.op.hvparams, dict):
1314 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315 for hv_name, hv_dict in self.op.hvparams.items():
1316 if hv_name not in self.new_hvparams:
1317 self.new_hvparams[hv_name] = hv_dict
1319 self.new_hvparams[hv_name].update(hv_dict)
1321 if self.op.enabled_hypervisors is not None:
1322 self.hv_list = self.op.enabled_hypervisors
1324 self.hv_list = cluster.enabled_hypervisors
1326 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327 # either the enabled list has changed, or the parameters have, validate
1328 for hv_name, hv_params in self.new_hvparams.items():
1329 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330 (self.op.enabled_hypervisors and
1331 hv_name in self.op.enabled_hypervisors)):
1332 # either this is a new hypervisor, or its parameters have changed
1333 hv_class = hypervisor.GetHypervisor(hv_name)
1334 hv_class.CheckParameterSyntax(hv_params)
1335 _CheckHVParams(self, node_list, hv_name, hv_params)
1337 def Exec(self, feedback_fn):
1338 """Change the parameters of the cluster.
1341 if self.op.vg_name is not None:
1342 if self.op.vg_name != self.cfg.GetVGName():
1343 self.cfg.SetVGName(self.op.vg_name)
1345 feedback_fn("Cluster LVM configuration already in desired"
1346 " state, not changing")
1347 if self.op.hvparams:
1348 self.cluster.hvparams = self.new_hvparams
1349 if self.op.enabled_hypervisors is not None:
1350 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351 if self.op.beparams:
1352 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353 if self.op.candidate_pool_size is not None:
1354 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1356 self.cfg.Update(self.cluster)
1358 # we want to update nodes after the cluster so that if any errors
1359 # happen, we have recorded and saved the cluster info
1360 if self.op.candidate_pool_size is not None:
1361 node_info = self.cfg.GetAllNodesInfo().values()
1362 num_candidates = len([node for node in node_info
1363 if node.master_candidate])
1364 num_nodes = len(node_info)
1365 if num_candidates < self.op.candidate_pool_size:
1366 random.shuffle(node_info)
1367 for node in node_info:
1368 if num_candidates >= self.op.candidate_pool_size:
1370 if node.master_candidate:
1372 node.master_candidate = True
1373 self.LogInfo("Promoting node %s to master candidate", node.name)
1374 self.cfg.Update(node)
1375 self.context.ReaddNode(node)
1377 elif num_candidates > self.op.candidate_pool_size:
1378 self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379 " of candidate_pool_size (%d)" %
1380 (num_candidates, self.op.candidate_pool_size))
1383 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384 """Sleep and poll for an instance's disk to sync.
1387 if not instance.disks:
1391 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1393 node = instance.primary_node
1395 for dev in instance.disks:
1396 lu.cfg.SetDiskID(dev, node)
1402 cumul_degraded = False
1403 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404 if rstats.failed or not rstats.data:
1405 lu.LogWarning("Can't get any data from node %s", node)
1408 raise errors.RemoteError("Can't contact node %s for mirror data,"
1409 " aborting." % node)
1412 rstats = rstats.data
1414 for i in range(len(rstats)):
1417 lu.LogWarning("Can't compute data for node %s/%s",
1418 node, instance.disks[i].iv_name)
1420 # we ignore the ldisk parameter
1421 perc_done, est_time, is_degraded, _ = mstat
1422 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423 if perc_done is not None:
1425 if est_time is not None:
1426 rem_time = "%d estimated seconds remaining" % est_time
1429 rem_time = "no time estimate"
1430 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431 (instance.disks[i].iv_name, perc_done, rem_time))
1435 time.sleep(min(60, max_time))
1438 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439 return not cumul_degraded
1442 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443 """Check that mirrors are not degraded.
1445 The ldisk parameter, if True, will change the test from the
1446 is_degraded attribute (which represents overall non-ok status for
1447 the device(s)) to the ldisk (representing the local storage status).
1450 lu.cfg.SetDiskID(dev, node)
1457 if on_primary or dev.AssembleOnSecondary():
1458 rstats = lu.rpc.call_blockdev_find(node, dev)
1459 if rstats.failed or not rstats.data:
1460 logging.warning("Node %s: disk degraded, not found or node down", node)
1463 result = result and (not rstats.data[idx])
1465 for child in dev.children:
1466 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1471 class LUDiagnoseOS(NoHooksLU):
1472 """Logical unit for OS diagnose/query.
1475 _OP_REQP = ["output_fields", "names"]
1477 _FIELDS_STATIC = utils.FieldSet()
1478 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1480 def ExpandNames(self):
1482 raise errors.OpPrereqError("Selective OS query not supported")
1484 _CheckOutputFields(static=self._FIELDS_STATIC,
1485 dynamic=self._FIELDS_DYNAMIC,
1486 selected=self.op.output_fields)
1488 # Lock all nodes, in shared mode
1489 self.needed_locks = {}
1490 self.share_locks[locking.LEVEL_NODE] = 1
1491 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1493 def CheckPrereq(self):
1494 """Check prerequisites.
1499 def _DiagnoseByOS(node_list, rlist):
1500 """Remaps a per-node return list into an a per-os per-node dictionary
1502 @param node_list: a list with the names of all nodes
1503 @param rlist: a map with node names as keys and OS objects as values
1506 @returns: a dictionary with osnames as keys and as value another map, with
1507 nodes as keys and list of OS objects as values, eg::
1509 {"debian-etch": {"node1": [<object>,...],
1510 "node2": [<object>,]}
1515 for node_name, nr in rlist.iteritems():
1516 if nr.failed or not nr.data:
1518 for os_obj in nr.data:
1519 if os_obj.name not in all_os:
1520 # build a list of nodes for this os containing empty lists
1521 # for each node in node_list
1522 all_os[os_obj.name] = {}
1523 for nname in node_list:
1524 all_os[os_obj.name][nname] = []
1525 all_os[os_obj.name][node_name].append(os_obj)
1528 def Exec(self, feedback_fn):
1529 """Compute the list of OSes.
1532 node_list = self.acquired_locks[locking.LEVEL_NODE]
1533 node_data = self.rpc.call_os_diagnose(node_list)
1534 if node_data == False:
1535 raise errors.OpExecError("Can't gather the list of OSes")
1536 pol = self._DiagnoseByOS(node_list, node_data)
1538 for os_name, os_data in pol.iteritems():
1540 for field in self.op.output_fields:
1543 elif field == "valid":
1544 val = utils.all([osl and osl[0] for osl in os_data.values()])
1545 elif field == "node_status":
1547 for node_name, nos_list in os_data.iteritems():
1548 val[node_name] = [(v.status, v.path) for v in nos_list]
1550 raise errors.ParameterError(field)
1557 class LURemoveNode(LogicalUnit):
1558 """Logical unit for removing a node.
1561 HPATH = "node-remove"
1562 HTYPE = constants.HTYPE_NODE
1563 _OP_REQP = ["node_name"]
1565 def BuildHooksEnv(self):
1568 This doesn't run on the target node in the pre phase as a failed
1569 node would then be impossible to remove.
1573 "OP_TARGET": self.op.node_name,
1574 "NODE_NAME": self.op.node_name,
1576 all_nodes = self.cfg.GetNodeList()
1577 all_nodes.remove(self.op.node_name)
1578 return env, all_nodes, all_nodes
1580 def CheckPrereq(self):
1581 """Check prerequisites.
1584 - the node exists in the configuration
1585 - it does not have primary or secondary instances
1586 - it's not the master
1588 Any errors are signalled by raising errors.OpPrereqError.
1591 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1593 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1595 instance_list = self.cfg.GetInstanceList()
1597 masternode = self.cfg.GetMasterNode()
1598 if node.name == masternode:
1599 raise errors.OpPrereqError("Node is the master node,"
1600 " you need to failover first.")
1602 for instance_name in instance_list:
1603 instance = self.cfg.GetInstanceInfo(instance_name)
1604 if node.name == instance.primary_node:
1605 raise errors.OpPrereqError("Instance %s still running on the node,"
1606 " please remove first." % instance_name)
1607 if node.name in instance.secondary_nodes:
1608 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609 " please remove first." % instance_name)
1610 self.op.node_name = node.name
1613 def Exec(self, feedback_fn):
1614 """Removes the node from the cluster.
1618 logging.info("Stopping the node daemon and removing configs from node %s",
1621 self.context.RemoveNode(node.name)
1623 self.rpc.call_node_leave_cluster(node.name)
1626 class LUQueryNodes(NoHooksLU):
1627 """Logical unit for querying nodes.
1630 _OP_REQP = ["output_fields", "names"]
1632 _FIELDS_DYNAMIC = utils.FieldSet(
1634 "mtotal", "mnode", "mfree",
1639 _FIELDS_STATIC = utils.FieldSet(
1640 "name", "pinst_cnt", "sinst_cnt",
1641 "pinst_list", "sinst_list",
1642 "pip", "sip", "tags",
1648 def ExpandNames(self):
1649 _CheckOutputFields(static=self._FIELDS_STATIC,
1650 dynamic=self._FIELDS_DYNAMIC,
1651 selected=self.op.output_fields)
1653 self.needed_locks = {}
1654 self.share_locks[locking.LEVEL_NODE] = 1
1657 self.wanted = _GetWantedNodes(self, self.op.names)
1659 self.wanted = locking.ALL_SET
1661 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1663 # if we don't request only static fields, we need to lock the nodes
1664 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1667 def CheckPrereq(self):
1668 """Check prerequisites.
1671 # The validation of the node list is done in the _GetWantedNodes,
1672 # if non empty, and if empty, there's no validation to do
1675 def Exec(self, feedback_fn):
1676 """Computes the list of nodes and their attributes.
1679 all_info = self.cfg.GetAllNodesInfo()
1681 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1682 elif self.wanted != locking.ALL_SET:
1683 nodenames = self.wanted
1684 missing = set(nodenames).difference(all_info.keys())
1686 raise errors.OpExecError(
1687 "Some nodes were removed before retrieving their data: %s" % missing)
1689 nodenames = all_info.keys()
1691 nodenames = utils.NiceSort(nodenames)
1692 nodelist = [all_info[name] for name in nodenames]
1694 # begin data gathering
1698 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1699 self.cfg.GetHypervisorType())
1700 for name in nodenames:
1701 nodeinfo = node_data[name]
1702 if not nodeinfo.failed and nodeinfo.data:
1703 nodeinfo = nodeinfo.data
1704 fn = utils.TryConvert
1706 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1707 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1708 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1709 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1710 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1711 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1712 "bootid": nodeinfo.get('bootid', None),
1715 live_data[name] = {}
1717 live_data = dict.fromkeys(nodenames, {})
1719 node_to_primary = dict([(name, set()) for name in nodenames])
1720 node_to_secondary = dict([(name, set()) for name in nodenames])
1722 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1723 "sinst_cnt", "sinst_list"))
1724 if inst_fields & frozenset(self.op.output_fields):
1725 instancelist = self.cfg.GetInstanceList()
1727 for instance_name in instancelist:
1728 inst = self.cfg.GetInstanceInfo(instance_name)
1729 if inst.primary_node in node_to_primary:
1730 node_to_primary[inst.primary_node].add(inst.name)
1731 for secnode in inst.secondary_nodes:
1732 if secnode in node_to_secondary:
1733 node_to_secondary[secnode].add(inst.name)
1735 master_node = self.cfg.GetMasterNode()
1737 # end data gathering
1740 for node in nodelist:
1742 for field in self.op.output_fields:
1745 elif field == "pinst_list":
1746 val = list(node_to_primary[node.name])
1747 elif field == "sinst_list":
1748 val = list(node_to_secondary[node.name])
1749 elif field == "pinst_cnt":
1750 val = len(node_to_primary[node.name])
1751 elif field == "sinst_cnt":
1752 val = len(node_to_secondary[node.name])
1753 elif field == "pip":
1754 val = node.primary_ip
1755 elif field == "sip":
1756 val = node.secondary_ip
1757 elif field == "tags":
1758 val = list(node.GetTags())
1759 elif field == "serial_no":
1760 val = node.serial_no
1761 elif field == "master_candidate":
1762 val = node.master_candidate
1763 elif field == "master":
1764 val = node.name == master_node
1765 elif self._FIELDS_DYNAMIC.Matches(field):
1766 val = live_data[node.name].get(field, None)
1768 raise errors.ParameterError(field)
1769 node_output.append(val)
1770 output.append(node_output)
1775 class LUQueryNodeVolumes(NoHooksLU):
1776 """Logical unit for getting volumes on node(s).
1779 _OP_REQP = ["nodes", "output_fields"]
1781 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1782 _FIELDS_STATIC = utils.FieldSet("node")
1784 def ExpandNames(self):
1785 _CheckOutputFields(static=self._FIELDS_STATIC,
1786 dynamic=self._FIELDS_DYNAMIC,
1787 selected=self.op.output_fields)
1789 self.needed_locks = {}
1790 self.share_locks[locking.LEVEL_NODE] = 1
1791 if not self.op.nodes:
1792 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1794 self.needed_locks[locking.LEVEL_NODE] = \
1795 _GetWantedNodes(self, self.op.nodes)
1797 def CheckPrereq(self):
1798 """Check prerequisites.
1800 This checks that the fields required are valid output fields.
1803 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1805 def Exec(self, feedback_fn):
1806 """Computes the list of nodes and their attributes.
1809 nodenames = self.nodes
1810 volumes = self.rpc.call_node_volumes(nodenames)
1812 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1813 in self.cfg.GetInstanceList()]
1815 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1818 for node in nodenames:
1819 if node not in volumes or volumes[node].failed or not volumes[node].data:
1822 node_vols = volumes[node].data[:]
1823 node_vols.sort(key=lambda vol: vol['dev'])
1825 for vol in node_vols:
1827 for field in self.op.output_fields:
1830 elif field == "phys":
1834 elif field == "name":
1836 elif field == "size":
1837 val = int(float(vol['size']))
1838 elif field == "instance":
1840 if node not in lv_by_node[inst]:
1842 if vol['name'] in lv_by_node[inst][node]:
1848 raise errors.ParameterError(field)
1849 node_output.append(str(val))
1851 output.append(node_output)
1856 class LUAddNode(LogicalUnit):
1857 """Logical unit for adding node to the cluster.
1861 HTYPE = constants.HTYPE_NODE
1862 _OP_REQP = ["node_name"]
1864 def BuildHooksEnv(self):
1867 This will run on all nodes before, and on all nodes + the new node after.
1871 "OP_TARGET": self.op.node_name,
1872 "NODE_NAME": self.op.node_name,
1873 "NODE_PIP": self.op.primary_ip,
1874 "NODE_SIP": self.op.secondary_ip,
1876 nodes_0 = self.cfg.GetNodeList()
1877 nodes_1 = nodes_0 + [self.op.node_name, ]
1878 return env, nodes_0, nodes_1
1880 def CheckPrereq(self):
1881 """Check prerequisites.
1884 - the new node is not already in the config
1886 - its parameters (single/dual homed) matches the cluster
1888 Any errors are signalled by raising errors.OpPrereqError.
1891 node_name = self.op.node_name
1894 dns_data = utils.HostInfo(node_name)
1896 node = dns_data.name
1897 primary_ip = self.op.primary_ip = dns_data.ip
1898 secondary_ip = getattr(self.op, "secondary_ip", None)
1899 if secondary_ip is None:
1900 secondary_ip = primary_ip
1901 if not utils.IsValidIP(secondary_ip):
1902 raise errors.OpPrereqError("Invalid secondary IP given")
1903 self.op.secondary_ip = secondary_ip
1905 node_list = cfg.GetNodeList()
1906 if not self.op.readd and node in node_list:
1907 raise errors.OpPrereqError("Node %s is already in the configuration" %
1909 elif self.op.readd and node not in node_list:
1910 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1912 for existing_node_name in node_list:
1913 existing_node = cfg.GetNodeInfo(existing_node_name)
1915 if self.op.readd and node == existing_node_name:
1916 if (existing_node.primary_ip != primary_ip or
1917 existing_node.secondary_ip != secondary_ip):
1918 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1919 " address configuration as before")
1922 if (existing_node.primary_ip == primary_ip or
1923 existing_node.secondary_ip == primary_ip or
1924 existing_node.primary_ip == secondary_ip or
1925 existing_node.secondary_ip == secondary_ip):
1926 raise errors.OpPrereqError("New node ip address(es) conflict with"
1927 " existing node %s" % existing_node.name)
1929 # check that the type of the node (single versus dual homed) is the
1930 # same as for the master
1931 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1932 master_singlehomed = myself.secondary_ip == myself.primary_ip
1933 newbie_singlehomed = secondary_ip == primary_ip
1934 if master_singlehomed != newbie_singlehomed:
1935 if master_singlehomed:
1936 raise errors.OpPrereqError("The master has no private ip but the"
1937 " new node has one")
1939 raise errors.OpPrereqError("The master has a private ip but the"
1940 " new node doesn't have one")
1942 # checks reachablity
1943 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1944 raise errors.OpPrereqError("Node not reachable by ping")
1946 if not newbie_singlehomed:
1947 # check reachability from my secondary ip to newbie's secondary ip
1948 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1949 source=myself.secondary_ip):
1950 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1951 " based ping to noded port")
1953 self.new_node = objects.Node(name=node,
1954 primary_ip=primary_ip,
1955 secondary_ip=secondary_ip)
1957 def Exec(self, feedback_fn):
1958 """Adds the new node to the cluster.
1961 new_node = self.new_node
1962 node = new_node.name
1964 # check connectivity
1965 result = self.rpc.call_version([node])[node]
1968 if constants.PROTOCOL_VERSION == result.data:
1969 logging.info("Communication to node %s fine, sw version %s match",
1972 raise errors.OpExecError("Version mismatch master version %s,"
1973 " node version %s" %
1974 (constants.PROTOCOL_VERSION, result.data))
1976 raise errors.OpExecError("Cannot get version from the new node")
1979 logging.info("Copy ssh key to node %s", node)
1980 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1982 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1983 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1989 keyarray.append(f.read())
1993 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1995 keyarray[3], keyarray[4], keyarray[5])
1997 if result.failed or not result.data:
1998 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2000 # Add node to our /etc/hosts, and add key to known_hosts
2001 utils.AddHostToEtcHosts(new_node.name)
2003 if new_node.secondary_ip != new_node.primary_ip:
2004 result = self.rpc.call_node_has_ip_address(new_node.name,
2005 new_node.secondary_ip)
2006 if result.failed or not result.data:
2007 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2008 " you gave (%s). Please fix and re-run this"
2009 " command." % new_node.secondary_ip)
2011 node_verify_list = [self.cfg.GetMasterNode()]
2012 node_verify_param = {
2014 # TODO: do a node-net-test as well?
2017 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2018 self.cfg.GetClusterName())
2019 for verifier in node_verify_list:
2020 if result.failed or not result[verifier].data:
2021 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2022 " for remote verification" % verifier)
2023 if result[verifier].data['nodelist']:
2024 for failed in result[verifier].data['nodelist']:
2025 feedback_fn("ssh/hostname verification failed %s -> %s" %
2026 (verifier, result[verifier]['nodelist'][failed]))
2027 raise errors.OpExecError("ssh/hostname verification failed.")
2029 # Distribute updated /etc/hosts and known_hosts to all nodes,
2030 # including the node just added
2031 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2032 dist_nodes = self.cfg.GetNodeList()
2033 if not self.op.readd:
2034 dist_nodes.append(node)
2035 if myself.name in dist_nodes:
2036 dist_nodes.remove(myself.name)
2038 logging.debug("Copying hosts and known_hosts to all nodes")
2039 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2040 result = self.rpc.call_upload_file(dist_nodes, fname)
2041 for to_node, to_result in result.iteritems():
2042 if to_result.failed or not to_result.data:
2043 logging.error("Copy of file %s to node %s failed", fname, to_node)
2046 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2047 to_copy.append(constants.VNC_PASSWORD_FILE)
2048 for fname in to_copy:
2049 result = self.rpc.call_upload_file([node], fname)
2050 if result[node].failed or not result[node]:
2051 logging.error("Could not copy file %s to node %s", fname, node)
2054 self.context.ReaddNode(new_node)
2056 self.context.AddNode(new_node)
2059 class LUSetNodeParams(LogicalUnit):
2060 """Modifies the parameters of a node.
2063 HPATH = "node-modify"
2064 HTYPE = constants.HTYPE_NODE
2065 _OP_REQP = ["node_name"]
2068 def CheckArguments(self):
2069 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2070 if node_name is None:
2071 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2072 self.op.node_name = node_name
2073 if not hasattr(self.op, 'master_candidate'):
2074 raise errors.OpPrereqError("Please pass at least one modification")
2075 self.op.master_candidate = bool(self.op.master_candidate)
2077 def ExpandNames(self):
2078 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2080 def BuildHooksEnv(self):
2083 This runs on the master node.
2087 "OP_TARGET": self.op.node_name,
2088 "MASTER_CANDIDATE": str(self.op.master_candidate),
2090 nl = [self.cfg.GetMasterNode(),
2094 def CheckPrereq(self):
2095 """Check prerequisites.
2097 This only checks the instance list against the existing names.
2100 force = self.force = self.op.force
2102 if self.op.master_candidate == False:
2103 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2104 node_info = self.cfg.GetAllNodesInfo().values()
2105 num_candidates = len([node for node in node_info
2106 if node.master_candidate])
2107 if num_candidates <= cp_size:
2108 msg = ("Not enough master candidates (desired"
2109 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2111 self.LogWarning(msg)
2113 raise errors.OpPrereqError(msg)
2117 def Exec(self, feedback_fn):
2121 node = self.cfg.GetNodeInfo(self.op.node_name)
2125 if self.op.master_candidate is not None:
2126 node.master_candidate = self.op.master_candidate
2127 result.append(("master_candidate", str(self.op.master_candidate)))
2129 # this will trigger configuration file update, if needed
2130 self.cfg.Update(node)
2131 # this will trigger job queue propagation or cleanup
2132 self.context.ReaddNode(node)
2137 class LUQueryClusterInfo(NoHooksLU):
2138 """Query cluster configuration.
2144 def ExpandNames(self):
2145 self.needed_locks = {}
2147 def CheckPrereq(self):
2148 """No prerequsites needed for this LU.
2153 def Exec(self, feedback_fn):
2154 """Return cluster config.
2157 cluster = self.cfg.GetClusterInfo()
2159 "software_version": constants.RELEASE_VERSION,
2160 "protocol_version": constants.PROTOCOL_VERSION,
2161 "config_version": constants.CONFIG_VERSION,
2162 "os_api_version": constants.OS_API_VERSION,
2163 "export_version": constants.EXPORT_VERSION,
2164 "architecture": (platform.architecture()[0], platform.machine()),
2165 "name": cluster.cluster_name,
2166 "master": cluster.master_node,
2167 "default_hypervisor": cluster.default_hypervisor,
2168 "enabled_hypervisors": cluster.enabled_hypervisors,
2169 "hvparams": cluster.hvparams,
2170 "beparams": cluster.beparams,
2171 "candidate_pool_size": cluster.candidate_pool_size,
2177 class LUQueryConfigValues(NoHooksLU):
2178 """Return configuration values.
2183 _FIELDS_DYNAMIC = utils.FieldSet()
2184 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2186 def ExpandNames(self):
2187 self.needed_locks = {}
2189 _CheckOutputFields(static=self._FIELDS_STATIC,
2190 dynamic=self._FIELDS_DYNAMIC,
2191 selected=self.op.output_fields)
2193 def CheckPrereq(self):
2194 """No prerequisites.
2199 def Exec(self, feedback_fn):
2200 """Dump a representation of the cluster config to the standard output.
2204 for field in self.op.output_fields:
2205 if field == "cluster_name":
2206 entry = self.cfg.GetClusterName()
2207 elif field == "master_node":
2208 entry = self.cfg.GetMasterNode()
2209 elif field == "drain_flag":
2210 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2212 raise errors.ParameterError(field)
2213 values.append(entry)
2217 class LUActivateInstanceDisks(NoHooksLU):
2218 """Bring up an instance's disks.
2221 _OP_REQP = ["instance_name"]
2224 def ExpandNames(self):
2225 self._ExpandAndLockInstance()
2226 self.needed_locks[locking.LEVEL_NODE] = []
2227 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2229 def DeclareLocks(self, level):
2230 if level == locking.LEVEL_NODE:
2231 self._LockInstancesNodes()
2233 def CheckPrereq(self):
2234 """Check prerequisites.
2236 This checks that the instance is in the cluster.
2239 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2240 assert self.instance is not None, \
2241 "Cannot retrieve locked instance %s" % self.op.instance_name
2243 def Exec(self, feedback_fn):
2244 """Activate the disks.
2247 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2249 raise errors.OpExecError("Cannot activate block devices")
2254 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2255 """Prepare the block devices for an instance.
2257 This sets up the block devices on all nodes.
2259 @type lu: L{LogicalUnit}
2260 @param lu: the logical unit on whose behalf we execute
2261 @type instance: L{objects.Instance}
2262 @param instance: the instance for whose disks we assemble
2263 @type ignore_secondaries: boolean
2264 @param ignore_secondaries: if true, errors on secondary nodes
2265 won't result in an error return from the function
2266 @return: False if the operation failed, otherwise a list of
2267 (host, instance_visible_name, node_visible_name)
2268 with the mapping from node devices to instance devices
2273 iname = instance.name
2274 # With the two passes mechanism we try to reduce the window of
2275 # opportunity for the race condition of switching DRBD to primary
2276 # before handshaking occured, but we do not eliminate it
2278 # The proper fix would be to wait (with some limits) until the
2279 # connection has been made and drbd transitions from WFConnection
2280 # into any other network-connected state (Connected, SyncTarget,
2283 # 1st pass, assemble on all nodes in secondary mode
2284 for inst_disk in instance.disks:
2285 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2286 lu.cfg.SetDiskID(node_disk, node)
2287 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2288 if result.failed or not result:
2289 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2290 " (is_primary=False, pass=1)",
2291 inst_disk.iv_name, node)
2292 if not ignore_secondaries:
2295 # FIXME: race condition on drbd migration to primary
2297 # 2nd pass, do only the primary node
2298 for inst_disk in instance.disks:
2299 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2300 if node != instance.primary_node:
2302 lu.cfg.SetDiskID(node_disk, node)
2303 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2304 if result.failed or not result:
2305 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2306 " (is_primary=True, pass=2)",
2307 inst_disk.iv_name, node)
2309 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2311 # leave the disks configured for the primary node
2312 # this is a workaround that would be fixed better by
2313 # improving the logical/physical id handling
2314 for disk in instance.disks:
2315 lu.cfg.SetDiskID(disk, instance.primary_node)
2317 return disks_ok, device_info
2320 def _StartInstanceDisks(lu, instance, force):
2321 """Start the disks of an instance.
2324 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2325 ignore_secondaries=force)
2327 _ShutdownInstanceDisks(lu, instance)
2328 if force is not None and not force:
2329 lu.proc.LogWarning("", hint="If the message above refers to a"
2331 " you can retry the operation using '--force'.")
2332 raise errors.OpExecError("Disk consistency error")
2335 class LUDeactivateInstanceDisks(NoHooksLU):
2336 """Shutdown an instance's disks.
2339 _OP_REQP = ["instance_name"]
2342 def ExpandNames(self):
2343 self._ExpandAndLockInstance()
2344 self.needed_locks[locking.LEVEL_NODE] = []
2345 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2347 def DeclareLocks(self, level):
2348 if level == locking.LEVEL_NODE:
2349 self._LockInstancesNodes()
2351 def CheckPrereq(self):
2352 """Check prerequisites.
2354 This checks that the instance is in the cluster.
2357 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2358 assert self.instance is not None, \
2359 "Cannot retrieve locked instance %s" % self.op.instance_name
2361 def Exec(self, feedback_fn):
2362 """Deactivate the disks
2365 instance = self.instance
2366 _SafeShutdownInstanceDisks(self, instance)
2369 def _SafeShutdownInstanceDisks(lu, instance):
2370 """Shutdown block devices of an instance.
2372 This function checks if an instance is running, before calling
2373 _ShutdownInstanceDisks.
2376 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2377 [instance.hypervisor])
2378 ins_l = ins_l[instance.primary_node]
2379 if ins_l.failed or not isinstance(ins_l.data, list):
2380 raise errors.OpExecError("Can't contact node '%s'" %
2381 instance.primary_node)
2383 if instance.name in ins_l.data:
2384 raise errors.OpExecError("Instance is running, can't shutdown"
2387 _ShutdownInstanceDisks(lu, instance)
2390 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2391 """Shutdown block devices of an instance.
2393 This does the shutdown on all nodes of the instance.
2395 If the ignore_primary is false, errors on the primary node are
2400 for disk in instance.disks:
2401 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2402 lu.cfg.SetDiskID(top_disk, node)
2403 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2404 if result.failed or not result.data:
2405 logging.error("Could not shutdown block device %s on node %s",
2407 if not ignore_primary or node != instance.primary_node:
2412 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2413 """Checks if a node has enough free memory.
2415 This function check if a given node has the needed amount of free
2416 memory. In case the node has less memory or we cannot get the
2417 information from the node, this function raise an OpPrereqError
2420 @type lu: C{LogicalUnit}
2421 @param lu: a logical unit from which we get configuration data
2423 @param node: the node to check
2424 @type reason: C{str}
2425 @param reason: string to use in the error message
2426 @type requested: C{int}
2427 @param requested: the amount of memory in MiB to check for
2428 @type hypervisor: C{str}
2429 @param hypervisor: the hypervisor to ask for memory stats
2430 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2431 we cannot check the node
2434 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2435 nodeinfo[node].Raise()
2436 free_mem = nodeinfo[node].data.get('memory_free')
2437 if not isinstance(free_mem, int):
2438 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2439 " was '%s'" % (node, free_mem))
2440 if requested > free_mem:
2441 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2442 " needed %s MiB, available %s MiB" %
2443 (node, reason, requested, free_mem))
2446 class LUStartupInstance(LogicalUnit):
2447 """Starts an instance.
2450 HPATH = "instance-start"
2451 HTYPE = constants.HTYPE_INSTANCE
2452 _OP_REQP = ["instance_name", "force"]
2455 def ExpandNames(self):
2456 self._ExpandAndLockInstance()
2458 def BuildHooksEnv(self):
2461 This runs on master, primary and secondary nodes of the instance.
2465 "FORCE": self.op.force,
2467 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2468 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2469 list(self.instance.secondary_nodes))
2472 def CheckPrereq(self):
2473 """Check prerequisites.
2475 This checks that the instance is in the cluster.
2478 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2479 assert self.instance is not None, \
2480 "Cannot retrieve locked instance %s" % self.op.instance_name
2482 bep = self.cfg.GetClusterInfo().FillBE(instance)
2483 # check bridges existance
2484 _CheckInstanceBridgesExist(self, instance)
2486 _CheckNodeFreeMemory(self, instance.primary_node,
2487 "starting instance %s" % instance.name,
2488 bep[constants.BE_MEMORY], instance.hypervisor)
2490 def Exec(self, feedback_fn):
2491 """Start the instance.
2494 instance = self.instance
2495 force = self.op.force
2496 extra_args = getattr(self.op, "extra_args", "")
2498 self.cfg.MarkInstanceUp(instance.name)
2500 node_current = instance.primary_node
2502 _StartInstanceDisks(self, instance, force)
2504 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2505 if result.failed or not result.data:
2506 _ShutdownInstanceDisks(self, instance)
2507 raise errors.OpExecError("Could not start instance")
2510 class LURebootInstance(LogicalUnit):
2511 """Reboot an instance.
2514 HPATH = "instance-reboot"
2515 HTYPE = constants.HTYPE_INSTANCE
2516 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2519 def ExpandNames(self):
2520 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2521 constants.INSTANCE_REBOOT_HARD,
2522 constants.INSTANCE_REBOOT_FULL]:
2523 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2524 (constants.INSTANCE_REBOOT_SOFT,
2525 constants.INSTANCE_REBOOT_HARD,
2526 constants.INSTANCE_REBOOT_FULL))
2527 self._ExpandAndLockInstance()
2529 def BuildHooksEnv(self):
2532 This runs on master, primary and secondary nodes of the instance.
2536 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2538 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2539 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2540 list(self.instance.secondary_nodes))
2543 def CheckPrereq(self):
2544 """Check prerequisites.
2546 This checks that the instance is in the cluster.
2549 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2550 assert self.instance is not None, \
2551 "Cannot retrieve locked instance %s" % self.op.instance_name
2553 # check bridges existance
2554 _CheckInstanceBridgesExist(self, instance)
2556 def Exec(self, feedback_fn):
2557 """Reboot the instance.
2560 instance = self.instance
2561 ignore_secondaries = self.op.ignore_secondaries
2562 reboot_type = self.op.reboot_type
2563 extra_args = getattr(self.op, "extra_args", "")
2565 node_current = instance.primary_node
2567 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2568 constants.INSTANCE_REBOOT_HARD]:
2569 result = self.rpc.call_instance_reboot(node_current, instance,
2570 reboot_type, extra_args)
2571 if result.failed or not result.data:
2572 raise errors.OpExecError("Could not reboot instance")
2574 if not self.rpc.call_instance_shutdown(node_current, instance):
2575 raise errors.OpExecError("could not shutdown instance for full reboot")
2576 _ShutdownInstanceDisks(self, instance)
2577 _StartInstanceDisks(self, instance, ignore_secondaries)
2578 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2579 if result.failed or not result.data:
2580 _ShutdownInstanceDisks(self, instance)
2581 raise errors.OpExecError("Could not start instance for full reboot")
2583 self.cfg.MarkInstanceUp(instance.name)
2586 class LUShutdownInstance(LogicalUnit):
2587 """Shutdown an instance.
2590 HPATH = "instance-stop"
2591 HTYPE = constants.HTYPE_INSTANCE
2592 _OP_REQP = ["instance_name"]
2595 def ExpandNames(self):
2596 self._ExpandAndLockInstance()
2598 def BuildHooksEnv(self):
2601 This runs on master, primary and secondary nodes of the instance.
2604 env = _BuildInstanceHookEnvByObject(self, self.instance)
2605 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2606 list(self.instance.secondary_nodes))
2609 def CheckPrereq(self):
2610 """Check prerequisites.
2612 This checks that the instance is in the cluster.
2615 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2616 assert self.instance is not None, \
2617 "Cannot retrieve locked instance %s" % self.op.instance_name
2619 def Exec(self, feedback_fn):
2620 """Shutdown the instance.
2623 instance = self.instance
2624 node_current = instance.primary_node
2625 self.cfg.MarkInstanceDown(instance.name)
2626 result = self.rpc.call_instance_shutdown(node_current, instance)
2627 if result.failed or not result.data:
2628 self.proc.LogWarning("Could not shutdown instance")
2630 _ShutdownInstanceDisks(self, instance)
2633 class LUReinstallInstance(LogicalUnit):
2634 """Reinstall an instance.
2637 HPATH = "instance-reinstall"
2638 HTYPE = constants.HTYPE_INSTANCE
2639 _OP_REQP = ["instance_name"]
2642 def ExpandNames(self):
2643 self._ExpandAndLockInstance()
2645 def BuildHooksEnv(self):
2648 This runs on master, primary and secondary nodes of the instance.
2651 env = _BuildInstanceHookEnvByObject(self, self.instance)
2652 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2653 list(self.instance.secondary_nodes))
2656 def CheckPrereq(self):
2657 """Check prerequisites.
2659 This checks that the instance is in the cluster and is not running.
2662 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2663 assert instance is not None, \
2664 "Cannot retrieve locked instance %s" % self.op.instance_name
2666 if instance.disk_template == constants.DT_DISKLESS:
2667 raise errors.OpPrereqError("Instance '%s' has no disks" %
2668 self.op.instance_name)
2669 if instance.status != "down":
2670 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2671 self.op.instance_name)
2672 remote_info = self.rpc.call_instance_info(instance.primary_node,
2674 instance.hypervisor)
2675 if remote_info.failed or remote_info.data:
2676 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2677 (self.op.instance_name,
2678 instance.primary_node))
2680 self.op.os_type = getattr(self.op, "os_type", None)
2681 if self.op.os_type is not None:
2683 pnode = self.cfg.GetNodeInfo(
2684 self.cfg.ExpandNodeName(instance.primary_node))
2686 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2688 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2690 if not isinstance(result.data, objects.OS):
2691 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2692 " primary node" % self.op.os_type)
2694 self.instance = instance
2696 def Exec(self, feedback_fn):
2697 """Reinstall the instance.
2700 inst = self.instance
2702 if self.op.os_type is not None:
2703 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2704 inst.os = self.op.os_type
2705 self.cfg.Update(inst)
2707 _StartInstanceDisks(self, inst, None)
2709 feedback_fn("Running the instance OS create scripts...")
2710 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2713 raise errors.OpExecError("Could not install OS for instance %s"
2715 (inst.name, inst.primary_node))
2717 _ShutdownInstanceDisks(self, inst)
2720 class LURenameInstance(LogicalUnit):
2721 """Rename an instance.
2724 HPATH = "instance-rename"
2725 HTYPE = constants.HTYPE_INSTANCE
2726 _OP_REQP = ["instance_name", "new_name"]
2728 def BuildHooksEnv(self):
2731 This runs on master, primary and secondary nodes of the instance.
2734 env = _BuildInstanceHookEnvByObject(self, self.instance)
2735 env["INSTANCE_NEW_NAME"] = self.op.new_name
2736 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2737 list(self.instance.secondary_nodes))
2740 def CheckPrereq(self):
2741 """Check prerequisites.
2743 This checks that the instance is in the cluster and is not running.
2746 instance = self.cfg.GetInstanceInfo(
2747 self.cfg.ExpandInstanceName(self.op.instance_name))
2748 if instance is None:
2749 raise errors.OpPrereqError("Instance '%s' not known" %
2750 self.op.instance_name)
2751 if instance.status != "down":
2752 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2753 self.op.instance_name)
2754 remote_info = self.rpc.call_instance_info(instance.primary_node,
2756 instance.hypervisor)
2758 if remote_info.data:
2759 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2760 (self.op.instance_name,
2761 instance.primary_node))
2762 self.instance = instance
2764 # new name verification
2765 name_info = utils.HostInfo(self.op.new_name)
2767 self.op.new_name = new_name = name_info.name
2768 instance_list = self.cfg.GetInstanceList()
2769 if new_name in instance_list:
2770 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2773 if not getattr(self.op, "ignore_ip", False):
2774 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2775 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2776 (name_info.ip, new_name))
2779 def Exec(self, feedback_fn):
2780 """Reinstall the instance.
2783 inst = self.instance
2784 old_name = inst.name
2786 if inst.disk_template == constants.DT_FILE:
2787 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2789 self.cfg.RenameInstance(inst.name, self.op.new_name)
2790 # Change the instance lock. This is definitely safe while we hold the BGL
2791 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2792 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2794 # re-read the instance from the configuration after rename
2795 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2797 if inst.disk_template == constants.DT_FILE:
2798 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2799 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2800 old_file_storage_dir,
2801 new_file_storage_dir)
2804 raise errors.OpExecError("Could not connect to node '%s' to rename"
2805 " directory '%s' to '%s' (but the instance"
2806 " has been renamed in Ganeti)" % (
2807 inst.primary_node, old_file_storage_dir,
2808 new_file_storage_dir))
2810 if not result.data[0]:
2811 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2812 " (but the instance has been renamed in"
2813 " Ganeti)" % (old_file_storage_dir,
2814 new_file_storage_dir))
2816 _StartInstanceDisks(self, inst, None)
2818 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2820 if result.failed or not result.data:
2821 msg = ("Could not run OS rename script for instance %s on node %s"
2822 " (but the instance has been renamed in Ganeti)" %
2823 (inst.name, inst.primary_node))
2824 self.proc.LogWarning(msg)
2826 _ShutdownInstanceDisks(self, inst)
2829 class LURemoveInstance(LogicalUnit):
2830 """Remove an instance.
2833 HPATH = "instance-remove"
2834 HTYPE = constants.HTYPE_INSTANCE
2835 _OP_REQP = ["instance_name", "ignore_failures"]
2838 def ExpandNames(self):
2839 self._ExpandAndLockInstance()
2840 self.needed_locks[locking.LEVEL_NODE] = []
2841 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2843 def DeclareLocks(self, level):
2844 if level == locking.LEVEL_NODE:
2845 self._LockInstancesNodes()
2847 def BuildHooksEnv(self):
2850 This runs on master, primary and secondary nodes of the instance.
2853 env = _BuildInstanceHookEnvByObject(self, self.instance)
2854 nl = [self.cfg.GetMasterNode()]
2857 def CheckPrereq(self):
2858 """Check prerequisites.
2860 This checks that the instance is in the cluster.
2863 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2864 assert self.instance is not None, \
2865 "Cannot retrieve locked instance %s" % self.op.instance_name
2867 def Exec(self, feedback_fn):
2868 """Remove the instance.
2871 instance = self.instance
2872 logging.info("Shutting down instance %s on node %s",
2873 instance.name, instance.primary_node)
2875 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2876 if result.failed or not result.data:
2877 if self.op.ignore_failures:
2878 feedback_fn("Warning: can't shutdown instance")
2880 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2881 (instance.name, instance.primary_node))
2883 logging.info("Removing block devices for instance %s", instance.name)
2885 if not _RemoveDisks(self, instance):
2886 if self.op.ignore_failures:
2887 feedback_fn("Warning: can't remove instance's disks")
2889 raise errors.OpExecError("Can't remove instance's disks")
2891 logging.info("Removing instance %s out of cluster config", instance.name)
2893 self.cfg.RemoveInstance(instance.name)
2894 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2897 class LUQueryInstances(NoHooksLU):
2898 """Logical unit for querying instances.
2901 _OP_REQP = ["output_fields", "names"]
2903 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2904 "admin_state", "admin_ram",
2905 "disk_template", "ip", "mac", "bridge",
2906 "sda_size", "sdb_size", "vcpus", "tags",
2907 "network_port", "beparams",
2908 "(disk).(size)/([0-9]+)",
2910 "(nic).(mac|ip|bridge)/([0-9]+)",
2911 "(nic).(macs|ips|bridges)",
2912 "(disk|nic).(count)",
2913 "serial_no", "hypervisor", "hvparams",] +
2915 for name in constants.HVS_PARAMETERS] +
2917 for name in constants.BES_PARAMETERS])
2918 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2921 def ExpandNames(self):
2922 _CheckOutputFields(static=self._FIELDS_STATIC,
2923 dynamic=self._FIELDS_DYNAMIC,
2924 selected=self.op.output_fields)
2926 self.needed_locks = {}
2927 self.share_locks[locking.LEVEL_INSTANCE] = 1
2928 self.share_locks[locking.LEVEL_NODE] = 1
2931 self.wanted = _GetWantedInstances(self, self.op.names)
2933 self.wanted = locking.ALL_SET
2935 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2937 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2938 self.needed_locks[locking.LEVEL_NODE] = []
2939 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2941 def DeclareLocks(self, level):
2942 if level == locking.LEVEL_NODE and self.do_locking:
2943 self._LockInstancesNodes()
2945 def CheckPrereq(self):
2946 """Check prerequisites.
2951 def Exec(self, feedback_fn):
2952 """Computes the list of nodes and their attributes.
2955 all_info = self.cfg.GetAllInstancesInfo()
2957 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2958 elif self.wanted != locking.ALL_SET:
2959 instance_names = self.wanted
2960 missing = set(instance_names).difference(all_info.keys())
2962 raise errors.OpExecError(
2963 "Some instances were removed before retrieving their data: %s"
2966 instance_names = all_info.keys()
2968 instance_names = utils.NiceSort(instance_names)
2969 instance_list = [all_info[iname] for iname in instance_names]
2971 # begin data gathering
2973 nodes = frozenset([inst.primary_node for inst in instance_list])
2974 hv_list = list(set([inst.hypervisor for inst in instance_list]))
2979 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2981 result = node_data[name]
2983 bad_nodes.append(name)
2986 live_data.update(result.data)
2987 # else no instance is alive
2989 live_data = dict([(name, {}) for name in instance_names])
2991 # end data gathering
2996 for instance in instance_list:
2998 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2999 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3000 for field in self.op.output_fields:
3001 st_match = self._FIELDS_STATIC.Matches(field)
3006 elif field == "pnode":
3007 val = instance.primary_node
3008 elif field == "snodes":
3009 val = list(instance.secondary_nodes)
3010 elif field == "admin_state":
3011 val = (instance.status != "down")
3012 elif field == "oper_state":
3013 if instance.primary_node in bad_nodes:
3016 val = bool(live_data.get(instance.name))
3017 elif field == "status":
3018 if instance.primary_node in bad_nodes:
3019 val = "ERROR_nodedown"
3021 running = bool(live_data.get(instance.name))
3023 if instance.status != "down":
3028 if instance.status != "down":
3032 elif field == "oper_ram":
3033 if instance.primary_node in bad_nodes:
3035 elif instance.name in live_data:
3036 val = live_data[instance.name].get("memory", "?")
3039 elif field == "disk_template":
3040 val = instance.disk_template
3042 val = instance.nics[0].ip
3043 elif field == "bridge":
3044 val = instance.nics[0].bridge
3045 elif field == "mac":
3046 val = instance.nics[0].mac
3047 elif field == "sda_size" or field == "sdb_size":
3048 idx = ord(field[2]) - ord('a')
3050 val = instance.FindDisk(idx).size
3051 except errors.OpPrereqError:
3053 elif field == "tags":
3054 val = list(instance.GetTags())
3055 elif field == "serial_no":
3056 val = instance.serial_no
3057 elif field == "network_port":
3058 val = instance.network_port
3059 elif field == "hypervisor":
3060 val = instance.hypervisor
3061 elif field == "hvparams":
3063 elif (field.startswith(HVPREFIX) and
3064 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3065 val = i_hv.get(field[len(HVPREFIX):], None)
3066 elif field == "beparams":
3068 elif (field.startswith(BEPREFIX) and
3069 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3070 val = i_be.get(field[len(BEPREFIX):], None)
3071 elif st_match and st_match.groups():
3072 # matches a variable list
3073 st_groups = st_match.groups()
3074 if st_groups and st_groups[0] == "disk":
3075 if st_groups[1] == "count":
3076 val = len(instance.disks)
3077 elif st_groups[1] == "sizes":
3078 val = [disk.size for disk in instance.disks]
3079 elif st_groups[1] == "size":
3081 val = instance.FindDisk(st_groups[2]).size
3082 except errors.OpPrereqError:
3085 assert False, "Unhandled disk parameter"
3086 elif st_groups[0] == "nic":
3087 if st_groups[1] == "count":
3088 val = len(instance.nics)
3089 elif st_groups[1] == "macs":
3090 val = [nic.mac for nic in instance.nics]
3091 elif st_groups[1] == "ips":
3092 val = [nic.ip for nic in instance.nics]
3093 elif st_groups[1] == "bridges":
3094 val = [nic.bridge for nic in instance.nics]
3097 nic_idx = int(st_groups[2])
3098 if nic_idx >= len(instance.nics):
3101 if st_groups[1] == "mac":
3102 val = instance.nics[nic_idx].mac
3103 elif st_groups[1] == "ip":
3104 val = instance.nics[nic_idx].ip
3105 elif st_groups[1] == "bridge":
3106 val = instance.nics[nic_idx].bridge
3108 assert False, "Unhandled NIC parameter"
3110 assert False, "Unhandled variable parameter"
3112 raise errors.ParameterError(field)
3119 class LUFailoverInstance(LogicalUnit):
3120 """Failover an instance.
3123 HPATH = "instance-failover"
3124 HTYPE = constants.HTYPE_INSTANCE
3125 _OP_REQP = ["instance_name", "ignore_consistency"]
3128 def ExpandNames(self):
3129 self._ExpandAndLockInstance()
3130 self.needed_locks[locking.LEVEL_NODE] = []
3131 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3133 def DeclareLocks(self, level):
3134 if level == locking.LEVEL_NODE:
3135 self._LockInstancesNodes()
3137 def BuildHooksEnv(self):
3140 This runs on master, primary and secondary nodes of the instance.
3144 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3146 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3147 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3150 def CheckPrereq(self):
3151 """Check prerequisites.
3153 This checks that the instance is in the cluster.
3156 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3157 assert self.instance is not None, \
3158 "Cannot retrieve locked instance %s" % self.op.instance_name
3160 bep = self.cfg.GetClusterInfo().FillBE(instance)
3161 if instance.disk_template not in constants.DTS_NET_MIRROR:
3162 raise errors.OpPrereqError("Instance's disk layout is not"
3163 " network mirrored, cannot failover.")
3165 secondary_nodes = instance.secondary_nodes
3166 if not secondary_nodes:
3167 raise errors.ProgrammerError("no secondary node but using "
3168 "a mirrored disk template")
3170 target_node = secondary_nodes[0]
3171 # check memory requirements on the secondary node
3172 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3173 instance.name, bep[constants.BE_MEMORY],
3174 instance.hypervisor)
3176 # check bridge existance
3177 brlist = [nic.bridge for nic in instance.nics]
3178 result = self.rpc.call_bridges_exist(target_node, brlist)
3181 raise errors.OpPrereqError("One or more target bridges %s does not"
3182 " exist on destination node '%s'" %
3183 (brlist, target_node))
3185 def Exec(self, feedback_fn):
3186 """Failover an instance.
3188 The failover is done by shutting it down on its present node and
3189 starting it on the secondary.
3192 instance = self.instance
3194 source_node = instance.primary_node
3195 target_node = instance.secondary_nodes[0]
3197 feedback_fn("* checking disk consistency between source and target")
3198 for dev in instance.disks:
3199 # for drbd, these are drbd over lvm
3200 if not _CheckDiskConsistency(self, dev, target_node, False):
3201 if instance.status == "up" and not self.op.ignore_consistency:
3202 raise errors.OpExecError("Disk %s is degraded on target node,"
3203 " aborting failover." % dev.iv_name)
3205 feedback_fn("* shutting down instance on source node")
3206 logging.info("Shutting down instance %s on node %s",
3207 instance.name, source_node)
3209 result = self.rpc.call_instance_shutdown(source_node, instance)
3210 if result.failed or not result.data:
3211 if self.op.ignore_consistency:
3212 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3214 " anyway. Please make sure node %s is down",
3215 instance.name, source_node, source_node)
3217 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3218 (instance.name, source_node))
3220 feedback_fn("* deactivating the instance's disks on source node")
3221 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3222 raise errors.OpExecError("Can't shut down the instance's disks.")
3224 instance.primary_node = target_node
3225 # distribute new instance config to the other nodes
3226 self.cfg.Update(instance)
3228 # Only start the instance if it's marked as up
3229 if instance.status == "up":
3230 feedback_fn("* activating the instance's disks on target node")
3231 logging.info("Starting instance %s on node %s",
3232 instance.name, target_node)
3234 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3235 ignore_secondaries=True)
3237 _ShutdownInstanceDisks(self, instance)
3238 raise errors.OpExecError("Can't activate the instance's disks")
3240 feedback_fn("* starting the instance on the target node")
3241 result = self.rpc.call_instance_start(target_node, instance, None)
3242 if result.failed or not result.data:
3243 _ShutdownInstanceDisks(self, instance)
3244 raise errors.OpExecError("Could not start instance %s on node %s." %
3245 (instance.name, target_node))
3248 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3249 """Create a tree of block devices on the primary node.
3251 This always creates all devices.
3255 for child in device.children:
3256 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3259 lu.cfg.SetDiskID(device, node)
3260 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3261 instance.name, True, info)
3262 if new_id.failed or not new_id.data:
3264 if device.physical_id is None:
3265 device.physical_id = new_id
3269 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3270 """Create a tree of block devices on a secondary node.
3272 If this device type has to be created on secondaries, create it and
3275 If not, just recurse to children keeping the same 'force' value.
3278 if device.CreateOnSecondary():
3281 for child in device.children:
3282 if not _CreateBlockDevOnSecondary(lu, node, instance,
3283 child, force, info):
3288 lu.cfg.SetDiskID(device, node)
3289 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3290 instance.name, False, info)
3291 if new_id.failed or not new_id.data:
3293 if device.physical_id is None:
3294 device.physical_id = new_id
3298 def _GenerateUniqueNames(lu, exts):
3299 """Generate a suitable LV name.
3301 This will generate a logical volume name for the given instance.
3306 new_id = lu.cfg.GenerateUniqueID()
3307 results.append("%s%s" % (new_id, val))
3311 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3313 """Generate a drbd8 device complete with its children.
3316 port = lu.cfg.AllocatePort()
3317 vgname = lu.cfg.GetVGName()
3318 shared_secret = lu.cfg.GenerateDRBDSecret()
3319 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3320 logical_id=(vgname, names[0]))
3321 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3322 logical_id=(vgname, names[1]))
3323 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3324 logical_id=(primary, secondary, port,
3327 children=[dev_data, dev_meta],
3332 def _GenerateDiskTemplate(lu, template_name,
3333 instance_name, primary_node,
3334 secondary_nodes, disk_info,
3335 file_storage_dir, file_driver,
3337 """Generate the entire disk layout for a given template type.
3340 #TODO: compute space requirements
3342 vgname = lu.cfg.GetVGName()
3343 disk_count = len(disk_info)
3345 if template_name == constants.DT_DISKLESS:
3347 elif template_name == constants.DT_PLAIN:
3348 if len(secondary_nodes) != 0:
3349 raise errors.ProgrammerError("Wrong template configuration")
3351 names = _GenerateUniqueNames(lu, [".disk%d" % i
3352 for i in range(disk_count)])
3353 for idx, disk in enumerate(disk_info):
3354 disk_index = idx + base_index
3355 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3356 logical_id=(vgname, names[idx]),
3357 iv_name="disk/%d" % disk_index)
3358 disks.append(disk_dev)
3359 elif template_name == constants.DT_DRBD8:
3360 if len(secondary_nodes) != 1:
3361 raise errors.ProgrammerError("Wrong template configuration")
3362 remote_node = secondary_nodes[0]
3363 minors = lu.cfg.AllocateDRBDMinor(
3364 [primary_node, remote_node] * len(disk_info), instance_name)
3366 names = _GenerateUniqueNames(lu,
3367 [".disk%d_%s" % (i, s)
3368 for i in range(disk_count)
3369 for s in ("data", "meta")
3371 for idx, disk in enumerate(disk_info):
3372 disk_index = idx + base_index
3373 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3374 disk["size"], names[idx*2:idx*2+2],
3375 "disk/%d" % disk_index,
3376 minors[idx*2], minors[idx*2+1])
3377 disks.append(disk_dev)
3378 elif template_name == constants.DT_FILE:
3379 if len(secondary_nodes) != 0:
3380 raise errors.ProgrammerError("Wrong template configuration")
3382 for idx, disk in enumerate(disk_info):
3383 disk_index = idx + base_index
3384 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3385 iv_name="disk/%d" % disk_index,
3386 logical_id=(file_driver,
3387 "%s/disk%d" % (file_storage_dir,
3389 disks.append(disk_dev)
3391 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3395 def _GetInstanceInfoText(instance):
3396 """Compute that text that should be added to the disk's metadata.
3399 return "originstname+%s" % instance.name
3402 def _CreateDisks(lu, instance):
3403 """Create all disks for an instance.
3405 This abstracts away some work from AddInstance.
3407 @type lu: L{LogicalUnit}
3408 @param lu: the logical unit on whose behalf we execute
3409 @type instance: L{objects.Instance}
3410 @param instance: the instance whose disks we should create
3412 @return: the success of the creation
3415 info = _GetInstanceInfoText(instance)
3417 if instance.disk_template == constants.DT_FILE:
3418 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3419 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3422 if result.failed or not result.data:
3423 logging.error("Could not connect to node '%s'", instance.primary_node)
3426 if not result.data[0]:
3427 logging.error("Failed to create directory '%s'", file_storage_dir)
3430 # Note: this needs to be kept in sync with adding of disks in
3431 # LUSetInstanceParams
3432 for device in instance.disks:
3433 logging.info("Creating volume %s for instance %s",
3434 device.iv_name, instance.name)
3436 for secondary_node in instance.secondary_nodes:
3437 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3438 device, False, info):
3439 logging.error("Failed to create volume %s (%s) on secondary node %s!",
3440 device.iv_name, device, secondary_node)
3443 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3444 instance, device, info):
3445 logging.error("Failed to create volume %s on primary!", device.iv_name)
3451 def _RemoveDisks(lu, instance):
3452 """Remove all disks for an instance.
3454 This abstracts away some work from `AddInstance()` and
3455 `RemoveInstance()`. Note that in case some of the devices couldn't
3456 be removed, the removal will continue with the other ones (compare
3457 with `_CreateDisks()`).
3459 @type lu: L{LogicalUnit}
3460 @param lu: the logical unit on whose behalf we execute
3461 @type instance: L{objects.Instance}
3462 @param instance: the instance whose disks we should remove
3464 @return: the success of the removal
3467 logging.info("Removing block devices for instance %s", instance.name)
3470 for device in instance.disks:
3471 for node, disk in device.ComputeNodeTree(instance.primary_node):
3472 lu.cfg.SetDiskID(disk, node)
3473 result = lu.rpc.call_blockdev_remove(node, disk)
3474 if result.failed or not result.data:
3475 lu.proc.LogWarning("Could not remove block device %s on node %s,"
3476 " continuing anyway", device.iv_name, node)
3479 if instance.disk_template == constants.DT_FILE:
3480 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3481 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3483 if result.failed or not result.data:
3484 logging.error("Could not remove directory '%s'", file_storage_dir)
3490 def _ComputeDiskSize(disk_template, disks):
3491 """Compute disk size requirements in the volume group
3494 # Required free disk space as a function of disk and swap space
3496 constants.DT_DISKLESS: None,
3497 constants.DT_PLAIN: sum(d["size"] for d in disks),
3498 # 128 MB are added for drbd metadata for each disk
3499 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3500 constants.DT_FILE: None,
3503 if disk_template not in req_size_dict:
3504 raise errors.ProgrammerError("Disk template '%s' size requirement"
3505 " is unknown" % disk_template)
3507 return req_size_dict[disk_template]
3510 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3511 """Hypervisor parameter validation.
3513 This function abstract the hypervisor parameter validation to be
3514 used in both instance create and instance modify.
3516 @type lu: L{LogicalUnit}
3517 @param lu: the logical unit for which we check
3518 @type nodenames: list
3519 @param nodenames: the list of nodes on which we should check
3520 @type hvname: string
3521 @param hvname: the name of the hypervisor we should use
3522 @type hvparams: dict
3523 @param hvparams: the parameters which we need to check
3524 @raise errors.OpPrereqError: if the parameters are not valid
3527 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3530 for node in nodenames:
3533 if not info.data or not isinstance(info.data, (tuple, list)):
3534 raise errors.OpPrereqError("Cannot get current information"
3535 " from node '%s' (%s)" % (node, info.data))
3536 if not info.data[0]:
3537 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3538 " %s" % info.data[1])
3541 class LUCreateInstance(LogicalUnit):
3542 """Create an instance.
3545 HPATH = "instance-add"
3546 HTYPE = constants.HTYPE_INSTANCE
3547 _OP_REQP = ["instance_name", "disks", "disk_template",
3549 "wait_for_sync", "ip_check", "nics",
3550 "hvparams", "beparams"]
3553 def _ExpandNode(self, node):
3554 """Expands and checks one node name.
3557 node_full = self.cfg.ExpandNodeName(node)
3558 if node_full is None:
3559 raise errors.OpPrereqError("Unknown node %s" % node)
3562 def ExpandNames(self):
3563 """ExpandNames for CreateInstance.
3565 Figure out the right locks for instance creation.
3568 self.needed_locks = {}
3570 # set optional parameters to none if they don't exist
3571 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3572 if not hasattr(self.op, attr):
3573 setattr(self.op, attr, None)
3575 # cheap checks, mostly valid constants given
3577 # verify creation mode
3578 if self.op.mode not in (constants.INSTANCE_CREATE,
3579 constants.INSTANCE_IMPORT):
3580 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3583 # disk template and mirror node verification
3584 if self.op.disk_template not in constants.DISK_TEMPLATES:
3585 raise errors.OpPrereqError("Invalid disk template name")
3587 if self.op.hypervisor is None:
3588 self.op.hypervisor = self.cfg.GetHypervisorType()
3590 cluster = self.cfg.GetClusterInfo()
3591 enabled_hvs = cluster.enabled_hypervisors
3592 if self.op.hypervisor not in enabled_hvs:
3593 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3594 " cluster (%s)" % (self.op.hypervisor,
3595 ",".join(enabled_hvs)))
3597 # check hypervisor parameter syntax (locally)
3599 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3601 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3602 hv_type.CheckParameterSyntax(filled_hvp)
3604 # fill and remember the beparams dict
3605 utils.CheckBEParams(self.op.beparams)
3606 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3609 #### instance parameters check
3611 # instance name verification
3612 hostname1 = utils.HostInfo(self.op.instance_name)
3613 self.op.instance_name = instance_name = hostname1.name
3615 # this is just a preventive check, but someone might still add this
3616 # instance in the meantime, and creation will fail at lock-add time
3617 if instance_name in self.cfg.GetInstanceList():
3618 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3621 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3625 for nic in self.op.nics:
3626 # ip validity checks
3627 ip = nic.get("ip", None)
3628 if ip is None or ip.lower() == "none":
3630 elif ip.lower() == constants.VALUE_AUTO:
3631 nic_ip = hostname1.ip
3633 if not utils.IsValidIP(ip):
3634 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3635 " like a valid IP" % ip)
3638 # MAC address verification
3639 mac = nic.get("mac", constants.VALUE_AUTO)
3640 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3641 if not utils.IsValidMac(mac.lower()):
3642 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3644 # bridge verification
3645 bridge = nic.get("bridge", self.cfg.GetDefBridge())
3646 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3648 # disk checks/pre-build
3650 for disk in self.op.disks:
3651 mode = disk.get("mode", constants.DISK_RDWR)
3652 if mode not in constants.DISK_ACCESS_SET:
3653 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3655 size = disk.get("size", None)
3657 raise errors.OpPrereqError("Missing disk size")
3661 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3662 self.disks.append({"size": size, "mode": mode})
3664 # used in CheckPrereq for ip ping check
3665 self.check_ip = hostname1.ip
3667 # file storage checks
3668 if (self.op.file_driver and
3669 not self.op.file_driver in constants.FILE_DRIVER):
3670 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3671 self.op.file_driver)
3673 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3674 raise errors.OpPrereqError("File storage directory path not absolute")
3676 ### Node/iallocator related checks
3677 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3678 raise errors.OpPrereqError("One and only one of iallocator and primary"
3679 " node must be given")
3681 if self.op.iallocator:
3682 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3684 self.op.pnode = self._ExpandNode(self.op.pnode)
3685 nodelist = [self.op.pnode]
3686 if self.op.snode is not None:
3687 self.op.snode = self._ExpandNode(self.op.snode)
3688 nodelist.append(self.op.snode)
3689 self.needed_locks[locking.LEVEL_NODE] = nodelist
3691 # in case of import lock the source node too
3692 if self.op.mode == constants.INSTANCE_IMPORT:
3693 src_node = getattr(self.op, "src_node", None)
3694 src_path = getattr(self.op, "src_path", None)
3696 if src_path is None:
3697 self.op.src_path = src_path = self.op.instance_name
3699 if src_node is None:
3700 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3701 self.op.src_node = None
3702 if os.path.isabs(src_path):
3703 raise errors.OpPrereqError("Importing an instance from an absolute"
3704 " path requires a source node option.")
3706 self.op.src_node = src_node = self._ExpandNode(src_node)
3707 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3708 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3709 if not os.path.isabs(src_path):
3710 self.op.src_path = src_path = \
3711 os.path.join(constants.EXPORT_DIR, src_path)
3713 else: # INSTANCE_CREATE
3714 if getattr(self.op, "os_type", None) is None:
3715 raise errors.OpPrereqError("No guest OS specified")
3717 def _RunAllocator(self):
3718 """Run the allocator based on input opcode.
3721 nics = [n.ToDict() for n in self.nics]
3722 ial = IAllocator(self,
3723 mode=constants.IALLOCATOR_MODE_ALLOC,
3724 name=self.op.instance_name,
3725 disk_template=self.op.disk_template,
3728 vcpus=self.be_full[constants.BE_VCPUS],
3729 mem_size=self.be_full[constants.BE_MEMORY],
3732 hypervisor=self.op.hypervisor,
3735 ial.Run(self.op.iallocator)
3738 raise errors.OpPrereqError("Can't compute nodes using"
3739 " iallocator '%s': %s" % (self.op.iallocator,
3741 if len(ial.nodes) != ial.required_nodes:
3742 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3743 " of nodes (%s), required %s" %
3744 (self.op.iallocator, len(ial.nodes),
3745 ial.required_nodes))
3746 self.op.pnode = ial.nodes[0]
3747 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3748 self.op.instance_name, self.op.iallocator,
3749 ", ".join(ial.nodes))
3750 if ial.required_nodes == 2:
3751 self.op.snode = ial.nodes[1]
3753 def BuildHooksEnv(self):
3756 This runs on master, primary and secondary nodes of the instance.
3760 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3761 "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3762 "INSTANCE_ADD_MODE": self.op.mode,
3764 if self.op.mode == constants.INSTANCE_IMPORT:
3765 env["INSTANCE_SRC_NODE"] = self.op.src_node
3766 env["INSTANCE_SRC_PATH"] = self.op.src_path
3767 env["INSTANCE_SRC_IMAGES"] = self.src_images
3769 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3770 primary_node=self.op.pnode,
3771 secondary_nodes=self.secondaries,
3772 status=self.instance_status,
3773 os_type=self.op.os_type,
3774 memory=self.be_full[constants.BE_MEMORY],
3775 vcpus=self.be_full[constants.BE_VCPUS],
3776 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3779 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3784 def CheckPrereq(self):
3785 """Check prerequisites.
3788 if (not self.cfg.GetVGName() and
3789 self.op.disk_template not in constants.DTS_NOT_LVM):
3790 raise errors.OpPrereqError("Cluster does not support lvm-based"
3794 if self.op.mode == constants.INSTANCE_IMPORT:
3795 src_node = self.op.src_node
3796 src_path = self.op.src_path
3798 if src_node is None:
3799 exp_list = self.rpc.call_export_list(
3800 self.acquired_locks[locking.LEVEL_NODE])
3802 for node in exp_list:
3803 if not exp_list[node].failed and src_path in exp_list[node].data:
3805 self.op.src_node = src_node = node
3806 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3810 raise errors.OpPrereqError("No export found for relative path %s" %
3813 result = self.rpc.call_export_info(src_node, src_path)
3816 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3818 export_info = result.data
3819 if not export_info.has_section(constants.INISECT_EXP):
3820 raise errors.ProgrammerError("Corrupted export config")
3822 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3823 if (int(ei_version) != constants.EXPORT_VERSION):
3824 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3825 (ei_version, constants.EXPORT_VERSION))
3827 # Check that the new instance doesn't have less disks than the export
3828 instance_disks = len(self.disks)
3829 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3830 if instance_disks < export_disks:
3831 raise errors.OpPrereqError("Not enough disks to import."
3832 " (instance: %d, export: %d)" %
3833 (instance_disks, export_disks))
3835 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3837 for idx in range(export_disks):
3838 option = 'disk%d_dump' % idx
3839 if export_info.has_option(constants.INISECT_INS, option):
3840 # FIXME: are the old os-es, disk sizes, etc. useful?
3841 export_name = export_info.get(constants.INISECT_INS, option)
3842 image = os.path.join(src_path, export_name)
3843 disk_images.append(image)
3845 disk_images.append(False)
3847 self.src_images = disk_images
3849 old_name = export_info.get(constants.INISECT_INS, 'name')
3850 # FIXME: int() here could throw a ValueError on broken exports
3851 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3852 if self.op.instance_name == old_name:
3853 for idx, nic in enumerate(self.nics):
3854 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3855 nic_mac_ini = 'nic%d_mac' % idx
3856 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3858 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3859 if self.op.start and not self.op.ip_check:
3860 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3861 " adding an instance in start mode")
3863 if self.op.ip_check:
3864 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3865 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3866 (self.check_ip, self.op.instance_name))
3870 if self.op.iallocator is not None:
3871 self._RunAllocator()
3873 #### node related checks
3875 # check primary node
3876 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3877 assert self.pnode is not None, \
3878 "Cannot retrieve locked node %s" % self.op.pnode
3879 self.secondaries = []
3881 # mirror node verification
3882 if self.op.disk_template in constants.DTS_NET_MIRROR:
3883 if self.op.snode is None:
3884 raise errors.OpPrereqError("The networked disk templates need"
3886 if self.op.snode == pnode.name:
3887 raise errors.OpPrereqError("The secondary node cannot be"
3888 " the primary node.")
3889 self.secondaries.append(self.op.snode)
3891 nodenames = [pnode.name] + self.secondaries
3893 req_size = _ComputeDiskSize(self.op.disk_template,
3896 # Check lv size requirements
3897 if req_size is not None:
3898 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3900 for node in nodenames:
3901 info = nodeinfo[node]
3905 raise errors.OpPrereqError("Cannot get current information"
3906 " from node '%s'" % node)
3907 vg_free = info.get('vg_free', None)
3908 if not isinstance(vg_free, int):
3909 raise errors.OpPrereqError("Can't compute free disk space on"
3911 if req_size > info['vg_free']:
3912 raise errors.OpPrereqError("Not enough disk space on target node %s."
3913 " %d MB available, %d MB required" %
3914 (node, info['vg_free'], req_size))
3916 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3919 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3921 if not isinstance(result.data, objects.OS):
3922 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3923 " primary node" % self.op.os_type)
3925 # bridge check on primary node
3926 bridges = [n.bridge for n in self.nics]
3927 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3930 raise errors.OpPrereqError("One of the target bridges '%s' does not"
3931 " exist on destination node '%s'" %
3932 (",".join(bridges), pnode.name))
3934 # memory check on primary node
3936 _CheckNodeFreeMemory(self, self.pnode.name,
3937 "creating instance %s" % self.op.instance_name,
3938 self.be_full[constants.BE_MEMORY],
3942 self.instance_status = 'up'
3944 self.instance_status = 'down'
3946 def Exec(self, feedback_fn):
3947 """Create and add the instance to the cluster.
3950 instance = self.op.instance_name
3951 pnode_name = self.pnode.name
3953 for nic in self.nics:
3954 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3955 nic.mac = self.cfg.GenerateMAC()
3957 ht_kind = self.op.hypervisor
3958 if ht_kind in constants.HTS_REQ_PORT:
3959 network_port = self.cfg.AllocatePort()
3963 ##if self.op.vnc_bind_address is None:
3964 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3966 # this is needed because os.path.join does not accept None arguments
3967 if self.op.file_storage_dir is None:
3968 string_file_storage_dir = ""
3970 string_file_storage_dir = self.op.file_storage_dir
3972 # build the full file storage dir path
3973 file_storage_dir = os.path.normpath(os.path.join(
3974 self.cfg.GetFileStorageDir(),
3975 string_file_storage_dir, instance))
3978 disks = _GenerateDiskTemplate(self,
3979 self.op.disk_template,
3980 instance, pnode_name,
3984 self.op.file_driver,
3987 iobj = objects.Instance(name=instance, os=self.op.os_type,
3988 primary_node=pnode_name,
3989 nics=self.nics, disks=disks,
3990 disk_template=self.op.disk_template,
3991 status=self.instance_status,
3992 network_port=network_port,
3993 beparams=self.op.beparams,
3994 hvparams=self.op.hvparams,
3995 hypervisor=self.op.hypervisor,
3998 feedback_fn("* creating instance disks...")
3999 if not _CreateDisks(self, iobj):
4000 _RemoveDisks(self, iobj)
4001 self.cfg.ReleaseDRBDMinors(instance)
4002 raise errors.OpExecError("Device creation failed, reverting...")
4004 feedback_fn("adding instance %s to cluster config" % instance)
4006 self.cfg.AddInstance(iobj)
4007 # Declare that we don't want to remove the instance lock anymore, as we've
4008 # added the instance to the config
4009 del self.remove_locks[locking.LEVEL_INSTANCE]
4010 # Remove the temp. assignements for the instance's drbds
4011 self.cfg.ReleaseDRBDMinors(instance)
4012 # Unlock all the nodes
4013 if self.op.mode == constants.INSTANCE_IMPORT:
4014 nodes_keep = [self.op.src_node]
4015 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4016 if node != self.op.src_node]
4017 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4018 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4020 self.context.glm.release(locking.LEVEL_NODE)
4021 del self.acquired_locks[locking.LEVEL_NODE]
4023 if self.op.wait_for_sync:
4024 disk_abort = not _WaitForSync(self, iobj)
4025 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4026 # make sure the disks are not degraded (still sync-ing is ok)
4028 feedback_fn("* checking mirrors status")
4029 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4034 _RemoveDisks(self, iobj)
4035 self.cfg.RemoveInstance(iobj.name)
4036 # Make sure the instance lock gets removed
4037 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4038 raise errors.OpExecError("There are some degraded disks for"
4041 feedback_fn("creating os for instance %s on node %s" %
4042 (instance, pnode_name))
4044 if iobj.disk_template != constants.DT_DISKLESS:
4045 if self.op.mode == constants.INSTANCE_CREATE:
4046 feedback_fn("* running the instance OS create scripts...")
4047 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4050 raise errors.OpExecError("Could not add os for instance %s"
4052 (instance, pnode_name))
4054 elif self.op.mode == constants.INSTANCE_IMPORT:
4055 feedback_fn("* running the instance OS import scripts...")
4056 src_node = self.op.src_node
4057 src_images = self.src_images
4058 cluster_name = self.cfg.GetClusterName()
4059 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4060 src_node, src_images,
4062 import_result.Raise()
4063 for idx, result in enumerate(import_result.data):
4065 self.LogWarning("Could not import the image %s for instance"
4066 " %s, disk %d, on node %s" %
4067 (src_images[idx], instance, idx, pnode_name))
4069 # also checked in the prereq part
4070 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4074 logging.info("Starting instance %s on node %s", instance, pnode_name)
4075 feedback_fn("* starting instance...")
4076 result = self.rpc.call_instance_start(pnode_name, iobj, None)
4079 raise errors.OpExecError("Could not start instance")
4082 class LUConnectConsole(NoHooksLU):
4083 """Connect to an instance's console.
4085 This is somewhat special in that it returns the command line that
4086 you need to run on the master node in order to connect to the
4090 _OP_REQP = ["instance_name"]
4093 def ExpandNames(self):
4094 self._ExpandAndLockInstance()
4096 def CheckPrereq(self):
4097 """Check prerequisites.
4099 This checks that the instance is in the cluster.
4102 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4103 assert self.instance is not None, \
4104 "Cannot retrieve locked instance %s" % self.op.instance_name
4106 def Exec(self, feedback_fn):
4107 """Connect to the console of an instance
4110 instance = self.instance
4111 node = instance.primary_node
4113 node_insts = self.rpc.call_instance_list([node],
4114 [instance.hypervisor])[node]
4117 if instance.name not in node_insts.data:
4118 raise errors.OpExecError("Instance %s is not running." % instance.name)
4120 logging.debug("Connecting to console of %s on %s", instance.name, node)
4122 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4123 console_cmd = hyper.GetShellCommandForConsole(instance)
4126 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4129 class LUReplaceDisks(LogicalUnit):
4130 """Replace the disks of an instance.
4133 HPATH = "mirrors-replace"
4134 HTYPE = constants.HTYPE_INSTANCE
4135 _OP_REQP = ["instance_name", "mode", "disks"]
4138 def ExpandNames(self):
4139 self._ExpandAndLockInstance()
4141 if not hasattr(self.op, "remote_node"):
4142 self.op.remote_node = None
4144 ia_name = getattr(self.op, "iallocator", None)
4145 if ia_name is not None:
4146 if self.op.remote_node is not None:
4147 raise errors.OpPrereqError("Give either the iallocator or the new"
4148 " secondary, not both")
4149 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4150 elif self.op.remote_node is not None:
4151 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4152 if remote_node is None:
4153 raise errors.OpPrereqError("Node '%s' not known" %
4154 self.op.remote_node)
4155 self.op.remote_node = remote_node
4156 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4157 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4159 self.needed_locks[locking.LEVEL_NODE] = []
4160 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4162 def DeclareLocks(self, level):
4163 # If we're not already locking all nodes in the set we have to declare the
4164 # instance's primary/secondary nodes.
4165 if (level == locking.LEVEL_NODE and
4166 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4167 self._LockInstancesNodes()
4169 def _RunAllocator(self):
4170 """Compute a new secondary node using an IAllocator.
4173 ial = IAllocator(self,
4174 mode=constants.IALLOCATOR_MODE_RELOC,
4175 name=self.op.instance_name,
4176 relocate_from=[self.sec_node])
4178 ial.Run(self.op.iallocator)
4181 raise errors.OpPrereqError("Can't compute nodes using"
4182 " iallocator '%s': %s" % (self.op.iallocator,
4184 if len(ial.nodes) != ial.required_nodes:
4185 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4186 " of nodes (%s), required %s" %
4187 (len(ial.nodes), ial.required_nodes))
4188 self.op.remote_node = ial.nodes[0]
4189 self.LogInfo("Selected new secondary for the instance: %s",
4190 self.op.remote_node)
4192 def BuildHooksEnv(self):
4195 This runs on the master, the primary and all the secondaries.
4199 "MODE": self.op.mode,
4200 "NEW_SECONDARY": self.op.remote_node,
4201 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4203 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4205 self.cfg.GetMasterNode(),
4206 self.instance.primary_node,
4208 if self.op.remote_node is not None:
4209 nl.append(self.op.remote_node)
4212 def CheckPrereq(self):
4213 """Check prerequisites.
4215 This checks that the instance is in the cluster.
4218 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4219 assert instance is not None, \
4220 "Cannot retrieve locked instance %s" % self.op.instance_name
4221 self.instance = instance
4223 if instance.disk_template not in constants.DTS_NET_MIRROR:
4224 raise errors.OpPrereqError("Instance's disk layout is not"
4225 " network mirrored.")
4227 if len(instance.secondary_nodes) != 1:
4228 raise errors.OpPrereqError("The instance has a strange layout,"
4229 " expected one secondary but found %d" %
4230 len(instance.secondary_nodes))
4232 self.sec_node = instance.secondary_nodes[0]
4234 ia_name = getattr(self.op, "iallocator", None)
4235 if ia_name is not None:
4236 self._RunAllocator()
4238 remote_node = self.op.remote_node
4239 if remote_node is not None:
4240 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4241 assert self.remote_node_info is not None, \
4242 "Cannot retrieve locked node %s" % remote_node
4244 self.remote_node_info = None
4245 if remote_node == instance.primary_node:
4246 raise errors.OpPrereqError("The specified node is the primary node of"
4248 elif remote_node == self.sec_node:
4249 if self.op.mode == constants.REPLACE_DISK_SEC:
4250 # this is for DRBD8, where we can't execute the same mode of
4251 # replacement as for drbd7 (no different port allocated)
4252 raise errors.OpPrereqError("Same secondary given, cannot execute"
4254 if instance.disk_template == constants.DT_DRBD8:
4255 if (self.op.mode == constants.REPLACE_DISK_ALL and
4256 remote_node is not None):
4257 # switch to replace secondary mode
4258 self.op.mode = constants.REPLACE_DISK_SEC
4260 if self.op.mode == constants.REPLACE_DISK_ALL:
4261 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4262 " secondary disk replacement, not"
4264 elif self.op.mode == constants.REPLACE_DISK_PRI:
4265 if remote_node is not None:
4266 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4267 " the secondary while doing a primary"
4268 " node disk replacement")
4269 self.tgt_node = instance.primary_node
4270 self.oth_node = instance.secondary_nodes[0]
4271 elif self.op.mode == constants.REPLACE_DISK_SEC:
4272 self.new_node = remote_node # this can be None, in which case
4273 # we don't change the secondary
4274 self.tgt_node = instance.secondary_nodes[0]
4275 self.oth_node = instance.primary_node
4277 raise errors.ProgrammerError("Unhandled disk replace mode")
4279 if not self.op.disks:
4280 self.op.disks = range(len(instance.disks))
4282 for disk_idx in self.op.disks:
4283 instance.FindDisk(disk_idx)
4285 def _ExecD8DiskOnly(self, feedback_fn):
4286 """Replace a disk on the primary or secondary for dbrd8.
4288 The algorithm for replace is quite complicated:
4290 1. for each disk to be replaced:
4292 1. create new LVs on the target node with unique names
4293 1. detach old LVs from the drbd device
4294 1. rename old LVs to name_replaced.<time_t>
4295 1. rename new LVs to old LVs
4296 1. attach the new LVs (with the old names now) to the drbd device
4298 1. wait for sync across all devices
4300 1. for each modified disk:
4302 1. remove old LVs (which have the name name_replaces.<time_t>)
4304 Failures are not very well handled.
4308 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4309 instance = self.instance
4311 vgname = self.cfg.GetVGName()
4314 tgt_node = self.tgt_node
4315 oth_node = self.oth_node
4317 # Step: check device activation
4318 self.proc.LogStep(1, steps_total, "check device existence")
4319 info("checking volume groups")
4320 my_vg = cfg.GetVGName()
4321 results = self.rpc.call_vg_list([oth_node, tgt_node])
4323 raise errors.OpExecError("Can't list volume groups on the nodes")
4324 for node in oth_node, tgt_node:
4326 if res.failed or not res.data or my_vg not in res.data:
4327 raise errors.OpExecError("Volume group '%s' not found on %s" %
4329 for idx, dev in enumerate(instance.disks):
4330 if idx not in self.op.disks:
4332 for node in tgt_node, oth_node:
4333 info("checking disk/%d on %s" % (idx, node))
4334 cfg.SetDiskID(dev, node)
4335 if not self.rpc.call_blockdev_find(node, dev):
4336 raise errors.OpExecError("Can't find disk/%d on node %s" %
4339 # Step: check other node consistency
4340 self.proc.LogStep(2, steps_total, "check peer consistency")
4341 for idx, dev in enumerate(instance.disks):
4342 if idx not in self.op.disks:
4344 info("checking disk/%d consistency on %s" % (idx, oth_node))
4345 if not _CheckDiskConsistency(self, dev, oth_node,
4346 oth_node==instance.primary_node):
4347 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4348 " to replace disks on this node (%s)" %
4349 (oth_node, tgt_node))
4351 # Step: create new storage
4352 self.proc.LogStep(3, steps_total, "allocate new storage")
4353 for idx, dev in enumerate(instance.disks):
4354 if idx not in self.op.disks:
4357 cfg.SetDiskID(dev, tgt_node)
4358 lv_names = [".disk%d_%s" % (idx, suf)
4359 for suf in ["data", "meta"]]
4360 names = _GenerateUniqueNames(self, lv_names)
4361 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4362 logical_id=(vgname, names[0]))
4363 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4364 logical_id=(vgname, names[1]))
4365 new_lvs = [lv_data, lv_meta]
4366 old_lvs = dev.children
4367 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4368 info("creating new local storage on %s for %s" %
4369 (tgt_node, dev.iv_name))
4370 # since we *always* want to create this LV, we use the
4371 # _Create...OnPrimary (which forces the creation), even if we
4372 # are talking about the secondary node
4373 for new_lv in new_lvs:
4374 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4375 _GetInstanceInfoText(instance)):
4376 raise errors.OpExecError("Failed to create new LV named '%s' on"
4378 (new_lv.logical_id[1], tgt_node))
4380 # Step: for each lv, detach+rename*2+attach
4381 self.proc.LogStep(4, steps_total, "change drbd configuration")
4382 for dev, old_lvs, new_lvs in iv_names.itervalues():
4383 info("detaching %s drbd from local storage" % dev.iv_name)
4384 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4387 raise errors.OpExecError("Can't detach drbd from local storage on node"
4388 " %s for device %s" % (tgt_node, dev.iv_name))
4390 #cfg.Update(instance)
4392 # ok, we created the new LVs, so now we know we have the needed
4393 # storage; as such, we proceed on the target node to rename
4394 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4395 # using the assumption that logical_id == physical_id (which in
4396 # turn is the unique_id on that node)
4398 # FIXME(iustin): use a better name for the replaced LVs
4399 temp_suffix = int(time.time())
4400 ren_fn = lambda d, suff: (d.physical_id[0],
4401 d.physical_id[1] + "_replaced-%s" % suff)
4402 # build the rename list based on what LVs exist on the node
4404 for to_ren in old_lvs:
4405 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4406 if not find_res.failed and find_res.data is not None: # device exists
4407 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4409 info("renaming the old LVs on the target node")
4410 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4413 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4414 # now we rename the new LVs to the old LVs
4415 info("renaming the new LVs on the target node")
4416 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4417 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4420 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4422 for old, new in zip(old_lvs, new_lvs):
4423 new.logical_id = old.logical_id
4424 cfg.SetDiskID(new, tgt_node)
4426 for disk in old_lvs:
4427 disk.logical_id = ren_fn(disk, temp_suffix)
4428 cfg.SetDiskID(disk, tgt_node)
4430 # now that the new lvs have the old name, we can add them to the device
4431 info("adding new mirror component on %s" % tgt_node)
4432 result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4433 if result.failed or not result.data:
4434 for new_lv in new_lvs:
4435 result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4436 if result.failed or not result.data:
4437 warning("Can't rollback device %s", hint="manually cleanup unused"
4439 raise errors.OpExecError("Can't add local storage to drbd")
4441 dev.children = new_lvs
4442 cfg.Update(instance)
4444 # Step: wait for sync
4446 # this can fail as the old devices are degraded and _WaitForSync
4447 # does a combined result over all disks, so we don't check its
4449 self.proc.LogStep(5, steps_total, "sync devices")
4450 _WaitForSync(self, instance, unlock=True)
4452 # so check manually all the devices
4453 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4454 cfg.SetDiskID(dev, instance.primary_node)
4455 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4456 if result.failed or result.data[5]:
4457 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4459 # Step: remove old storage
4460 self.proc.LogStep(6, steps_total, "removing old storage")
4461 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4462 info("remove logical volumes for %s" % name)
4464 cfg.SetDiskID(lv, tgt_node)
4465 result = self.rpc.call_blockdev_remove(tgt_node, lv)
4466 if result.failed or not result.data:
4467 warning("Can't remove old LV", hint="manually remove unused LVs")
4470 def _ExecD8Secondary(self, feedback_fn):
4471 """Replace the secondary node for drbd8.
4473 The algorithm for replace is quite complicated:
4474 - for all disks of the instance:
4475 - create new LVs on the new node with same names
4476 - shutdown the drbd device on the old secondary
4477 - disconnect the drbd network on the primary
4478 - create the drbd device on the new secondary
4479 - network attach the drbd on the primary, using an artifice:
4480 the drbd code for Attach() will connect to the network if it
4481 finds a device which is connected to the good local disks but
4483 - wait for sync across all devices
4484 - remove all disks from the old secondary
4486 Failures are not very well handled.
4490 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4491 instance = self.instance
4493 vgname = self.cfg.GetVGName()
4496 old_node = self.tgt_node
4497 new_node = self.new_node
4498 pri_node = instance.primary_node
4500 # Step: check device activation
4501 self.proc.LogStep(1, steps_total, "check device existence")
4502 info("checking volume groups")
4503 my_vg = cfg.GetVGName()
4504 results = self.rpc.call_vg_list([pri_node, new_node])
4505 for node in pri_node, new_node:
4507 if res.failed or not res.data or my_vg not in res.data:
4508 raise errors.OpExecError("Volume group '%s' not found on %s" %
4510 for idx, dev in enumerate(instance.disks):
4511 if idx not in self.op.disks:
4513 info("checking disk/%d on %s" % (idx, pri_node))
4514 cfg.SetDiskID(dev, pri_node)
4515 result = self.rpc.call_blockdev_find(pri_node, dev)
4518 raise errors.OpExecError("Can't find disk/%d on node %s" %
4521 # Step: check other node consistency
4522 self.proc.LogStep(2, steps_total, "check peer consistency")
4523 for idx, dev in enumerate(instance.disks):
4524 if idx not in self.op.disks:
4526 info("checking disk/%d consistency on %s" % (idx, pri_node))
4527 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4528 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4529 " unsafe to replace the secondary" %
4532 # Step: create new storage
4533 self.proc.LogStep(3, steps_total, "allocate new storage")
4534 for idx, dev in enumerate(instance.disks):
4536 info("adding new local storage on %s for disk/%d" %
4538 # since we *always* want to create this LV, we use the
4539 # _Create...OnPrimary (which forces the creation), even if we
4540 # are talking about the secondary node
4541 for new_lv in dev.children:
4542 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4543 _GetInstanceInfoText(instance)):
4544 raise errors.OpExecError("Failed to create new LV named '%s' on"
4546 (new_lv.logical_id[1], new_node))
4548 # Step 4: dbrd minors and drbd setups changes
4549 # after this, we must manually remove the drbd minors on both the
4550 # error and the success paths
4551 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4553 logging.debug("Allocated minors %s" % (minors,))
4554 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4555 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4557 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4558 # create new devices on new_node
4559 if pri_node == dev.logical_id[0]:
4560 new_logical_id = (pri_node, new_node,
4561 dev.logical_id[2], dev.logical_id[3], new_minor,
4564 new_logical_id = (new_node, pri_node,
4565 dev.logical_id[2], new_minor, dev.logical_id[4],
4567 iv_names[idx] = (dev, dev.children, new_logical_id)
4568 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4570 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4571 logical_id=new_logical_id,
4572 children=dev.children)
4573 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4575 _GetInstanceInfoText(instance)):
4576 self.cfg.ReleaseDRBDMinors(instance.name)
4577 raise errors.OpExecError("Failed to create new DRBD on"
4578 " node '%s'" % new_node)
4580 for idx, dev in enumerate(instance.disks):
4581 # we have new devices, shutdown the drbd on the old secondary
4582 info("shutting down drbd for disk/%d on old node" % idx)
4583 cfg.SetDiskID(dev, old_node)
4584 result = self.rpc.call_blockdev_shutdown(old_node, dev)
4585 if result.failed or not result.data:
4586 warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4587 hint="Please cleanup this device manually as soon as possible")
4589 info("detaching primary drbds from the network (=> standalone)")
4591 for idx, dev in enumerate(instance.disks):
4592 cfg.SetDiskID(dev, pri_node)
4593 # set the network part of the physical (unique in bdev terms) id
4594 # to None, meaning detach from network
4595 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4596 # and 'find' the device, which will 'fix' it to match the
4598 result = self.rpc.call_blockdev_find(pri_node, dev)
4599 if not result.failed and result.data:
4602 warning("Failed to detach drbd disk/%d from network, unusual case" %
4606 # no detaches succeeded (very unlikely)
4607 self.cfg.ReleaseDRBDMinors(instance.name)
4608 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4610 # if we managed to detach at least one, we update all the disks of
4611 # the instance to point to the new secondary
4612 info("updating instance configuration")
4613 for dev, _, new_logical_id in iv_names.itervalues():
4614 dev.logical_id = new_logical_id
4615 cfg.SetDiskID(dev, pri_node)
4616 cfg.Update(instance)
4617 # we can remove now the temp minors as now the new values are
4618 # written to the config file (and therefore stable)
4619 self.cfg.ReleaseDRBDMinors(instance.name)
4621 # and now perform the drbd attach
4622 info("attaching primary drbds to new secondary (standalone => connected)")
4624 for idx, dev in enumerate(instance.disks):
4625 info("attaching primary drbd for disk/%d to new secondary node" % idx)
4626 # since the attach is smart, it's enough to 'find' the device,
4627 # it will automatically activate the network, if the physical_id
4629 cfg.SetDiskID(dev, pri_node)
4630 logging.debug("Disk to attach: %s", dev)
4631 result = self.rpc.call_blockdev_find(pri_node, dev)
4632 if result.failed or not result.data:
4633 warning("can't attach drbd disk/%d to new secondary!" % idx,
4634 "please do a gnt-instance info to see the status of disks")
4636 # this can fail as the old devices are degraded and _WaitForSync
4637 # does a combined result over all disks, so we don't check its
4639 self.proc.LogStep(5, steps_total, "sync devices")
4640 _WaitForSync(self, instance, unlock=True)
4642 # so check manually all the devices
4643 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4644 cfg.SetDiskID(dev, pri_node)
4645 result = self.rpc.call_blockdev_find(pri_node, dev)
4648 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4650 self.proc.LogStep(6, steps_total, "removing old storage")
4651 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4652 info("remove logical volumes for disk/%d" % idx)
4654 cfg.SetDiskID(lv, old_node)
4655 result = self.rpc.call_blockdev_remove(old_node, lv)
4656 if result.failed or not result.data:
4657 warning("Can't remove LV on old secondary",
4658 hint="Cleanup stale volumes by hand")
4660 def Exec(self, feedback_fn):
4661 """Execute disk replacement.
4663 This dispatches the disk replacement to the appropriate handler.
4666 instance = self.instance
4668 # Activate the instance disks if we're replacing them on a down instance
4669 if instance.status == "down":
4670 _StartInstanceDisks(self, instance, True)
4672 if instance.disk_template == constants.DT_DRBD8:
4673 if self.op.remote_node is None:
4674 fn = self._ExecD8DiskOnly
4676 fn = self._ExecD8Secondary
4678 raise errors.ProgrammerError("Unhandled disk replacement case")
4680 ret = fn(feedback_fn)
4682 # Deactivate the instance disks if we're replacing them on a down instance
4683 if instance.status == "down":
4684 _SafeShutdownInstanceDisks(self, instance)
4689 class LUGrowDisk(LogicalUnit):
4690 """Grow a disk of an instance.
4694 HTYPE = constants.HTYPE_INSTANCE
4695 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4698 def ExpandNames(self):
4699 self._ExpandAndLockInstance()
4700 self.needed_locks[locking.LEVEL_NODE] = []
4701 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4703 def DeclareLocks(self, level):
4704 if level == locking.LEVEL_NODE:
4705 self._LockInstancesNodes()
4707 def BuildHooksEnv(self):
4710 This runs on the master, the primary and all the secondaries.
4714 "DISK": self.op.disk,
4715 "AMOUNT": self.op.amount,
4717 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4719 self.cfg.GetMasterNode(),
4720 self.instance.primary_node,
4724 def CheckPrereq(self):
4725 """Check prerequisites.
4727 This checks that the instance is in the cluster.
4730 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4731 assert instance is not None, \
4732 "Cannot retrieve locked instance %s" % self.op.instance_name
4734 self.instance = instance
4736 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4737 raise errors.OpPrereqError("Instance's disk layout does not support"
4740 self.disk = instance.FindDisk(self.op.disk)
4742 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4743 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4744 instance.hypervisor)
4745 for node in nodenames:
4746 info = nodeinfo[node]
4747 if info.failed or not info.data:
4748 raise errors.OpPrereqError("Cannot get current information"
4749 " from node '%s'" % node)
4750 vg_free = info.data.get('vg_free', None)
4751 if not isinstance(vg_free, int):
4752 raise errors.OpPrereqError("Can't compute free disk space on"
4754 if self.op.amount > vg_free:
4755 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4756 " %d MiB available, %d MiB required" %
4757 (node, vg_free, self.op.amount))
4759 def Exec(self, feedback_fn):
4760 """Execute disk grow.
4763 instance = self.instance
4765 for node in (instance.secondary_nodes + (instance.primary_node,)):
4766 self.cfg.SetDiskID(disk, node)
4767 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4769 if (not result.data or not isinstance(result.data, (list, tuple)) or
4770 len(result.data) != 2):
4771 raise errors.OpExecError("Grow request failed to node %s" % node)
4772 elif not result.data[0]:
4773 raise errors.OpExecError("Grow request failed to node %s: %s" %
4774 (node, result.data[1]))
4775 disk.RecordGrow(self.op.amount)
4776 self.cfg.Update(instance)
4777 if self.op.wait_for_sync:
4778 disk_abort = not _WaitForSync(self, instance)
4780 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4781 " status.\nPlease check the instance.")
4784 class LUQueryInstanceData(NoHooksLU):
4785 """Query runtime instance data.
4788 _OP_REQP = ["instances", "static"]
4791 def ExpandNames(self):
4792 self.needed_locks = {}
4793 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4795 if not isinstance(self.op.instances, list):
4796 raise errors.OpPrereqError("Invalid argument type 'instances'")
4798 if self.op.instances:
4799 self.wanted_names = []
4800 for name in self.op.instances:
4801 full_name = self.cfg.ExpandInstanceName(name)
4802 if full_name is None:
4803 raise errors.OpPrereqError("Instance '%s' not known" %
4804 self.op.instance_name)
4805 self.wanted_names.append(full_name)
4806 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4808 self.wanted_names = None
4809 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4811 self.needed_locks[locking.LEVEL_NODE] = []
4812 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4814 def DeclareLocks(self, level):
4815 if level == locking.LEVEL_NODE:
4816 self._LockInstancesNodes()
4818 def CheckPrereq(self):
4819 """Check prerequisites.
4821 This only checks the optional instance list against the existing names.
4824 if self.wanted_names is None:
4825 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4827 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4828 in self.wanted_names]
4831 def _ComputeDiskStatus(self, instance, snode, dev):
4832 """Compute block device status.
4835 static = self.op.static
4837 self.cfg.SetDiskID(dev, instance.primary_node)
4838 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4840 dev_pstatus = dev_pstatus.data
4844 if dev.dev_type in constants.LDS_DRBD:
4845 # we change the snode then (otherwise we use the one passed in)
4846 if dev.logical_id[0] == instance.primary_node:
4847 snode = dev.logical_id[1]
4849 snode = dev.logical_id[0]
4851 if snode and not static:
4852 self.cfg.SetDiskID(dev, snode)
4853 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4855 dev_sstatus = dev_sstatus.data
4860 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4861 for child in dev.children]
4866 "iv_name": dev.iv_name,
4867 "dev_type": dev.dev_type,
4868 "logical_id": dev.logical_id,
4869 "physical_id": dev.physical_id,
4870 "pstatus": dev_pstatus,
4871 "sstatus": dev_sstatus,
4872 "children": dev_children,
4878 def Exec(self, feedback_fn):
4879 """Gather and return data"""
4882 cluster = self.cfg.GetClusterInfo()
4884 for instance in self.wanted_instances:
4885 if not self.op.static:
4886 remote_info = self.rpc.call_instance_info(instance.primary_node,
4888 instance.hypervisor)
4890 remote_info = remote_info.data
4891 if remote_info and "state" in remote_info:
4894 remote_state = "down"
4897 if instance.status == "down":
4898 config_state = "down"
4902 disks = [self._ComputeDiskStatus(instance, None, device)
4903 for device in instance.disks]
4906 "name": instance.name,
4907 "config_state": config_state,
4908 "run_state": remote_state,
4909 "pnode": instance.primary_node,
4910 "snodes": instance.secondary_nodes,
4912 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4914 "hypervisor": instance.hypervisor,
4915 "network_port": instance.network_port,
4916 "hv_instance": instance.hvparams,
4917 "hv_actual": cluster.FillHV(instance),
4918 "be_instance": instance.beparams,
4919 "be_actual": cluster.FillBE(instance),
4922 result[instance.name] = idict
4927 class LUSetInstanceParams(LogicalUnit):
4928 """Modifies an instances's parameters.
4931 HPATH = "instance-modify"
4932 HTYPE = constants.HTYPE_INSTANCE
4933 _OP_REQP = ["instance_name"]
4936 def CheckArguments(self):
4937 if not hasattr(self.op, 'nics'):
4939 if not hasattr(self.op, 'disks'):
4941 if not hasattr(self.op, 'beparams'):
4942 self.op.beparams = {}
4943 if not hasattr(self.op, 'hvparams'):
4944 self.op.hvparams = {}
4945 self.op.force = getattr(self.op, "force", False)
4946 if not (self.op.nics or self.op.disks or
4947 self.op.hvparams or self.op.beparams):
4948 raise errors.OpPrereqError("No changes submitted")
4950 utils.CheckBEParams(self.op.beparams)
4954 for disk_op, disk_dict in self.op.disks:
4955 if disk_op == constants.DDM_REMOVE:
4958 elif disk_op == constants.DDM_ADD:
4961 if not isinstance(disk_op, int):
4962 raise errors.OpPrereqError("Invalid disk index")
4963 if disk_op == constants.DDM_ADD:
4964 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4965 if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4966 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4967 size = disk_dict.get('size', None)
4969 raise errors.OpPrereqError("Required disk parameter size missing")
4972 except ValueError, err:
4973 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
4975 disk_dict['size'] = size
4977 # modification of disk
4978 if 'size' in disk_dict:
4979 raise errors.OpPrereqError("Disk size change not possible, use"
4982 if disk_addremove > 1:
4983 raise errors.OpPrereqError("Only one disk add or remove operation"
4984 " supported at a time")
4988 for nic_op, nic_dict in self.op.nics:
4989 if nic_op == constants.DDM_REMOVE:
4992 elif nic_op == constants.DDM_ADD:
4995 if not isinstance(nic_op, int):
4996 raise errors.OpPrereqError("Invalid nic index")
4998 # nic_dict should be a dict
4999 nic_ip = nic_dict.get('ip', None)
5000 if nic_ip is not None:
5001 if nic_ip.lower() == "none":
5002 nic_dict['ip'] = None
5004 if not utils.IsValidIP(nic_ip):
5005 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5006 # we can only check None bridges and assign the default one
5007 nic_bridge = nic_dict.get('bridge', None)
5008 if nic_bridge is None:
5009 nic_dict['bridge'] = self.cfg.GetDefBridge()
5010 # but we can validate MACs
5011 nic_mac = nic_dict.get('mac', None)
5012 if nic_mac is not None:
5013 if self.cfg.IsMacInUse(nic_mac):
5014 raise errors.OpPrereqError("MAC address %s already in use"
5015 " in cluster" % nic_mac)
5016 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5017 if not utils.IsValidMac(nic_mac):
5018 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5019 if nic_addremove > 1:
5020 raise errors.OpPrereqError("Only one NIC add or remove operation"
5021 " supported at a time")
5023 def ExpandNames(self):
5024 self._ExpandAndLockInstance()
5025 self.needed_locks[locking.LEVEL_NODE] = []
5026 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5028 def DeclareLocks(self, level):
5029 if level == locking.LEVEL_NODE:
5030 self._LockInstancesNodes()
5032 def BuildHooksEnv(self):
5035 This runs on the master, primary and secondaries.
5039 if constants.BE_MEMORY in self.be_new:
5040 args['memory'] = self.be_new[constants.BE_MEMORY]
5041 if constants.BE_VCPUS in self.be_new:
5042 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5043 # FIXME: readd disk/nic changes
5044 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5045 nl = [self.cfg.GetMasterNode(),
5046 self.instance.primary_node] + list(self.instance.secondary_nodes)
5049 def CheckPrereq(self):
5050 """Check prerequisites.
5052 This only checks the instance list against the existing names.
5055 force = self.force = self.op.force
5057 # checking the new params on the primary/secondary nodes
5059 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5060 assert self.instance is not None, \
5061 "Cannot retrieve locked instance %s" % self.op.instance_name
5062 pnode = self.instance.primary_node
5064 nodelist.extend(instance.secondary_nodes)
5066 # hvparams processing
5067 if self.op.hvparams:
5068 i_hvdict = copy.deepcopy(instance.hvparams)
5069 for key, val in self.op.hvparams.iteritems():
5070 if val == constants.VALUE_DEFAULT:
5075 elif val == constants.VALUE_NONE:
5076 i_hvdict[key] = None
5079 cluster = self.cfg.GetClusterInfo()
5080 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5083 hypervisor.GetHypervisor(
5084 instance.hypervisor).CheckParameterSyntax(hv_new)
5085 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5086 self.hv_new = hv_new # the new actual values
5087 self.hv_inst = i_hvdict # the new dict (without defaults)
5089 self.hv_new = self.hv_inst = {}
5091 # beparams processing
5092 if self.op.beparams:
5093 i_bedict = copy.deepcopy(instance.beparams)
5094 for key, val in self.op.beparams.iteritems():
5095 if val == constants.VALUE_DEFAULT:
5102 cluster = self.cfg.GetClusterInfo()
5103 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5105 self.be_new = be_new # the new actual values
5106 self.be_inst = i_bedict # the new dict (without defaults)
5108 self.be_new = self.be_inst = {}
5112 if constants.BE_MEMORY in self.op.beparams and not self.force:
5113 mem_check_list = [pnode]
5114 if be_new[constants.BE_AUTO_BALANCE]:
5115 # either we changed auto_balance to yes or it was from before
5116 mem_check_list.extend(instance.secondary_nodes)
5117 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5118 instance.hypervisor)
5119 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5120 instance.hypervisor)
5121 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5122 # Assume the primary node is unreachable and go ahead
5123 self.warn.append("Can't get info from primary node %s" % pnode)
5125 if not instance_info.failed and instance_info.data:
5126 current_mem = instance_info.data['memory']
5128 # Assume instance not running
5129 # (there is a slight race condition here, but it's not very probable,
5130 # and we have no other way to check)
5132 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5133 nodeinfo[pnode].data['memory_free'])
5135 raise errors.OpPrereqError("This change will prevent the instance"
5136 " from starting, due to %d MB of memory"
5137 " missing on its primary node" % miss_mem)
5139 if be_new[constants.BE_AUTO_BALANCE]:
5140 for node, nres in instance.secondary_nodes.iteritems():
5141 if nres.failed or not isinstance(nres.data, dict):
5142 self.warn.append("Can't get info from secondary node %s" % node)
5143 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5144 self.warn.append("Not enough memory to failover instance to"
5145 " secondary node %s" % node)
5148 for nic_op, nic_dict in self.op.nics:
5149 if nic_op == constants.DDM_REMOVE:
5150 if not instance.nics:
5151 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5153 if nic_op != constants.DDM_ADD:
5155 if nic_op < 0 or nic_op >= len(instance.nics):
5156 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5158 (nic_op, len(instance.nics)))
5159 nic_bridge = nic_dict.get('bridge', None)
5160 if nic_bridge is not None:
5161 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5162 msg = ("Bridge '%s' doesn't exist on one of"
5163 " the instance nodes" % nic_bridge)
5165 self.warn.append(msg)
5167 raise errors.OpPrereqError(msg)
5170 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5171 raise errors.OpPrereqError("Disk operations not supported for"
5172 " diskless instances")
5173 for disk_op, disk_dict in self.op.disks:
5174 if disk_op == constants.DDM_REMOVE:
5175 if len(instance.disks) == 1:
5176 raise errors.OpPrereqError("Cannot remove the last disk of"
5178 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5179 ins_l = ins_l[pnode]
5180 if not type(ins_l) is list:
5181 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5182 if instance.name in ins_l:
5183 raise errors.OpPrereqError("Instance is running, can't remove"
5186 if (disk_op == constants.DDM_ADD and
5187 len(instance.nics) >= constants.MAX_DISKS):
5188 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5189 " add more" % constants.MAX_DISKS)
5190 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5192 if disk_op < 0 or disk_op >= len(instance.disks):
5193 raise errors.OpPrereqError("Invalid disk index %s, valid values"
5195 (disk_op, len(instance.disks)))
5199 def Exec(self, feedback_fn):
5200 """Modifies an instance.
5202 All parameters take effect only at the next restart of the instance.
5205 # Process here the warnings from CheckPrereq, as we don't have a
5206 # feedback_fn there.
5207 for warn in self.warn:
5208 feedback_fn("WARNING: %s" % warn)
5211 instance = self.instance
5213 for disk_op, disk_dict in self.op.disks:
5214 if disk_op == constants.DDM_REMOVE:
5215 # remove the last disk
5216 device = instance.disks.pop()
5217 device_idx = len(instance.disks)
5218 for node, disk in device.ComputeNodeTree(instance.primary_node):
5219 self.cfg.SetDiskID(disk, node)
5220 result = self.rpc.call_blockdev_remove(node, disk)
5221 if result.failed or not result.data:
5222 self.proc.LogWarning("Could not remove disk/%d on node %s,"
5223 " continuing anyway", device_idx, node)
5224 result.append(("disk/%d" % device_idx, "remove"))
5225 elif disk_op == constants.DDM_ADD:
5227 if instance.disk_template == constants.DT_FILE:
5228 file_driver, file_path = instance.disks[0].logical_id
5229 file_path = os.path.dirname(file_path)
5231 file_driver = file_path = None
5232 disk_idx_base = len(instance.disks)
5233 new_disk = _GenerateDiskTemplate(self,
5234 instance.disk_template,
5235 instance, instance.primary_node,
5236 instance.secondary_nodes,
5241 new_disk.mode = disk_dict['mode']
5242 instance.disks.append(new_disk)
5243 info = _GetInstanceInfoText(instance)
5245 logging.info("Creating volume %s for instance %s",
5246 new_disk.iv_name, instance.name)
5247 # Note: this needs to be kept in sync with _CreateDisks
5249 for secondary_node in instance.secondary_nodes:
5250 if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5251 new_disk, False, info):
5252 self.LogWarning("Failed to create volume %s (%s) on"
5253 " secondary node %s!",
5254 new_disk.iv_name, new_disk, secondary_node)
5256 if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5257 instance, new_disk, info):
5258 self.LogWarning("Failed to create volume %s on primary!",
5260 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5261 (new_disk.size, new_disk.mode)))
5263 # change a given disk
5264 instance.disks[disk_op].mode = disk_dict['mode']
5265 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5267 for nic_op, nic_dict in self.op.nics:
5268 if nic_op == constants.DDM_REMOVE:
5269 # remove the last nic
5270 del instance.nics[-1]
5271 result.append(("nic.%d" % len(instance.nics), "remove"))
5272 elif nic_op == constants.DDM_ADD:
5274 if 'mac' not in nic_dict:
5275 mac = constants.VALUE_GENERATE
5277 mac = nic_dict['mac']
5278 if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5279 mac = self.cfg.GenerateMAC()
5280 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5281 bridge=nic_dict.get('bridge', None))
5282 instance.nics.append(new_nic)
5283 result.append(("nic.%d" % (len(instance.nics) - 1),
5284 "add:mac=%s,ip=%s,bridge=%s" %
5285 (new_nic.mac, new_nic.ip, new_nic.bridge)))
5287 # change a given nic
5288 for key in 'mac', 'ip', 'bridge':
5290 setattr(instance.nics[nic_op], key, nic_dict[key])
5291 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5294 if self.op.hvparams:
5295 instance.hvparams = self.hv_new
5296 for key, val in self.op.hvparams.iteritems():
5297 result.append(("hv/%s" % key, val))
5300 if self.op.beparams:
5301 instance.beparams = self.be_inst
5302 for key, val in self.op.beparams.iteritems():
5303 result.append(("be/%s" % key, val))
5305 self.cfg.Update(instance)
5310 class LUQueryExports(NoHooksLU):
5311 """Query the exports list
5314 _OP_REQP = ['nodes']
5317 def ExpandNames(self):
5318 self.needed_locks = {}
5319 self.share_locks[locking.LEVEL_NODE] = 1
5320 if not self.op.nodes:
5321 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5323 self.needed_locks[locking.LEVEL_NODE] = \
5324 _GetWantedNodes(self, self.op.nodes)
5326 def CheckPrereq(self):
5327 """Check prerequisites.
5330 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5332 def Exec(self, feedback_fn):
5333 """Compute the list of all the exported system images.
5336 @return: a dictionary with the structure node->(export-list)
5337 where export-list is a list of the instances exported on
5341 result = self.rpc.call_export_list(self.nodes)
5346 class LUExportInstance(LogicalUnit):
5347 """Export an instance to an image in the cluster.
5350 HPATH = "instance-export"
5351 HTYPE = constants.HTYPE_INSTANCE
5352 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5355 def ExpandNames(self):
5356 self._ExpandAndLockInstance()
5357 # FIXME: lock only instance primary and destination node
5359 # Sad but true, for now we have do lock all nodes, as we don't know where
5360 # the previous export might be, and and in this LU we search for it and
5361 # remove it from its current node. In the future we could fix this by:
5362 # - making a tasklet to search (share-lock all), then create the new one,
5363 # then one to remove, after
5364 # - removing the removal operation altoghether
5365 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5367 def DeclareLocks(self, level):
5368 """Last minute lock declaration."""
5369 # All nodes are locked anyway, so nothing to do here.
5371 def BuildHooksEnv(self):
5374 This will run on the master, primary node and target node.
5378 "EXPORT_NODE": self.op.target_node,
5379 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5381 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5382 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5383 self.op.target_node]
5386 def CheckPrereq(self):
5387 """Check prerequisites.
5389 This checks that the instance and node names are valid.
5392 instance_name = self.op.instance_name
5393 self.instance = self.cfg.GetInstanceInfo(instance_name)
5394 assert self.instance is not None, \
5395 "Cannot retrieve locked instance %s" % self.op.instance_name
5397 self.dst_node = self.cfg.GetNodeInfo(
5398 self.cfg.ExpandNodeName(self.op.target_node))
5400 if self.dst_node is None:
5401 # This is wrong node name, not a non-locked node
5402 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5404 # instance disk type verification
5405 for disk in self.instance.disks:
5406 if disk.dev_type == constants.LD_FILE:
5407 raise errors.OpPrereqError("Export not supported for instances with"
5408 " file-based disks")
5410 def Exec(self, feedback_fn):
5411 """Export an instance to an image in the cluster.
5414 instance = self.instance
5415 dst_node = self.dst_node
5416 src_node = instance.primary_node
5417 if self.op.shutdown:
5418 # shutdown the instance, but not the disks
5419 result = self.rpc.call_instance_shutdown(src_node, instance)
5422 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5423 (instance.name, src_node))
5425 vgname = self.cfg.GetVGName()
5430 for disk in instance.disks:
5431 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5432 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5433 if new_dev_name.failed or not new_dev_name.data:
5434 self.LogWarning("Could not snapshot block device %s on node %s",
5435 disk.logical_id[1], src_node)
5436 snap_disks.append(False)
5438 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5439 logical_id=(vgname, new_dev_name.data),
5440 physical_id=(vgname, new_dev_name.data),
5441 iv_name=disk.iv_name)
5442 snap_disks.append(new_dev)
5445 if self.op.shutdown and instance.status == "up":
5446 result = self.rpc.call_instance_start(src_node, instance, None)
5447 if result.failed or not result.data:
5448 _ShutdownInstanceDisks(self, instance)
5449 raise errors.OpExecError("Could not start instance")
5451 # TODO: check for size
5453 cluster_name = self.cfg.GetClusterName()
5454 for idx, dev in enumerate(snap_disks):
5456 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5457 instance, cluster_name, idx)
5458 if result.failed or not result.data:
5459 self.LogWarning("Could not export block device %s from node %s to"
5460 " node %s", dev.logical_id[1], src_node,
5462 result = self.rpc.call_blockdev_remove(src_node, dev)
5463 if result.failed or not result.data:
5464 self.LogWarning("Could not remove snapshot block device %s from node"
5465 " %s", dev.logical_id[1], src_node)
5467 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5468 if result.failed or not result.data:
5469 self.LogWarning("Could not finalize export for instance %s on node %s",
5470 instance.name, dst_node.name)
5472 nodelist = self.cfg.GetNodeList()
5473 nodelist.remove(dst_node.name)
5475 # on one-node clusters nodelist will be empty after the removal
5476 # if we proceed the backup would be removed because OpQueryExports
5477 # substitutes an empty list with the full cluster node list.
5479 exportlist = self.rpc.call_export_list(nodelist)
5480 for node in exportlist:
5481 if exportlist[node].failed:
5483 if instance.name in exportlist[node].data:
5484 if not self.rpc.call_export_remove(node, instance.name):
5485 self.LogWarning("Could not remove older export for instance %s"
5486 " on node %s", instance.name, node)
5489 class LURemoveExport(NoHooksLU):
5490 """Remove exports related to the named instance.
5493 _OP_REQP = ["instance_name"]
5496 def ExpandNames(self):
5497 self.needed_locks = {}
5498 # We need all nodes to be locked in order for RemoveExport to work, but we
5499 # don't need to lock the instance itself, as nothing will happen to it (and
5500 # we can remove exports also for a removed instance)
5501 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5503 def CheckPrereq(self):
5504 """Check prerequisites.
5508 def Exec(self, feedback_fn):
5509 """Remove any export.
5512 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5513 # If the instance was not found we'll try with the name that was passed in.
5514 # This will only work if it was an FQDN, though.
5516 if not instance_name:
5518 instance_name = self.op.instance_name
5520 exportlist = self.rpc.call_export_list(self.acquired_locks[
5521 locking.LEVEL_NODE])
5523 for node in exportlist:
5524 if exportlist[node].failed:
5525 self.LogWarning("Failed to query node %s, continuing" % node)
5527 if instance_name in exportlist[node].data:
5529 result = self.rpc.call_export_remove(node, instance_name)
5530 if result.failed or not result.data:
5531 logging.error("Could not remove export for instance %s"
5532 " on node %s", instance_name, node)
5534 if fqdn_warn and not found:
5535 feedback_fn("Export not found. If trying to remove an export belonging"
5536 " to a deleted instance please use its Fully Qualified"
5540 class TagsLU(NoHooksLU):
5543 This is an abstract class which is the parent of all the other tags LUs.
5547 def ExpandNames(self):
5548 self.needed_locks = {}
5549 if self.op.kind == constants.TAG_NODE:
5550 name = self.cfg.ExpandNodeName(self.op.name)
5552 raise errors.OpPrereqError("Invalid node name (%s)" %
5555 self.needed_locks[locking.LEVEL_NODE] = name
5556 elif self.op.kind == constants.TAG_INSTANCE:
5557 name = self.cfg.ExpandInstanceName(self.op.name)
5559 raise errors.OpPrereqError("Invalid instance name (%s)" %
5562 self.needed_locks[locking.LEVEL_INSTANCE] = name
5564 def CheckPrereq(self):
5565 """Check prerequisites.
5568 if self.op.kind == constants.TAG_CLUSTER:
5569 self.target = self.cfg.GetClusterInfo()
5570 elif self.op.kind == constants.TAG_NODE:
5571 self.target = self.cfg.GetNodeInfo(self.op.name)
5572 elif self.op.kind == constants.TAG_INSTANCE:
5573 self.target = self.cfg.GetInstanceInfo(self.op.name)
5575 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5579 class LUGetTags(TagsLU):
5580 """Returns the tags of a given object.
5583 _OP_REQP = ["kind", "name"]
5586 def Exec(self, feedback_fn):
5587 """Returns the tag list.
5590 return list(self.target.GetTags())
5593 class LUSearchTags(NoHooksLU):
5594 """Searches the tags for a given pattern.
5597 _OP_REQP = ["pattern"]
5600 def ExpandNames(self):
5601 self.needed_locks = {}
5603 def CheckPrereq(self):
5604 """Check prerequisites.
5606 This checks the pattern passed for validity by compiling it.
5610 self.re = re.compile(self.op.pattern)
5611 except re.error, err:
5612 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5613 (self.op.pattern, err))
5615 def Exec(self, feedback_fn):
5616 """Returns the tag list.
5620 tgts = [("/cluster", cfg.GetClusterInfo())]
5621 ilist = cfg.GetAllInstancesInfo().values()
5622 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5623 nlist = cfg.GetAllNodesInfo().values()
5624 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5626 for path, target in tgts:
5627 for tag in target.GetTags():
5628 if self.re.search(tag):
5629 results.append((path, tag))
5633 class LUAddTags(TagsLU):
5634 """Sets a tag on a given object.
5637 _OP_REQP = ["kind", "name", "tags"]
5640 def CheckPrereq(self):
5641 """Check prerequisites.
5643 This checks the type and length of the tag name and value.
5646 TagsLU.CheckPrereq(self)
5647 for tag in self.op.tags:
5648 objects.TaggableObject.ValidateTag(tag)
5650 def Exec(self, feedback_fn):
5655 for tag in self.op.tags:
5656 self.target.AddTag(tag)
5657 except errors.TagError, err:
5658 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5660 self.cfg.Update(self.target)
5661 except errors.ConfigurationError:
5662 raise errors.OpRetryError("There has been a modification to the"
5663 " config file and the operation has been"
5664 " aborted. Please retry.")
5667 class LUDelTags(TagsLU):
5668 """Delete a list of tags from a given object.
5671 _OP_REQP = ["kind", "name", "tags"]
5674 def CheckPrereq(self):
5675 """Check prerequisites.
5677 This checks that we have the given tag.
5680 TagsLU.CheckPrereq(self)
5681 for tag in self.op.tags:
5682 objects.TaggableObject.ValidateTag(tag)
5683 del_tags = frozenset(self.op.tags)
5684 cur_tags = self.target.GetTags()
5685 if not del_tags <= cur_tags:
5686 diff_tags = del_tags - cur_tags
5687 diff_names = ["'%s'" % tag for tag in diff_tags]
5689 raise errors.OpPrereqError("Tag(s) %s not found" %
5690 (",".join(diff_names)))
5692 def Exec(self, feedback_fn):
5693 """Remove the tag from the object.
5696 for tag in self.op.tags:
5697 self.target.RemoveTag(tag)
5699 self.cfg.Update(self.target)
5700 except errors.ConfigurationError:
5701 raise errors.OpRetryError("There has been a modification to the"
5702 " config file and the operation has been"
5703 " aborted. Please retry.")
5706 class LUTestDelay(NoHooksLU):
5707 """Sleep for a specified amount of time.
5709 This LU sleeps on the master and/or nodes for a specified amount of
5713 _OP_REQP = ["duration", "on_master", "on_nodes"]
5716 def ExpandNames(self):
5717 """Expand names and set required locks.
5719 This expands the node list, if any.
5722 self.needed_locks = {}
5723 if self.op.on_nodes:
5724 # _GetWantedNodes can be used here, but is not always appropriate to use
5725 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5727 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5728 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5730 def CheckPrereq(self):
5731 """Check prerequisites.
5735 def Exec(self, feedback_fn):
5736 """Do the actual sleep.
5739 if self.op.on_master:
5740 if not utils.TestDelay(self.op.duration):
5741 raise errors.OpExecError("Error during master delay test")
5742 if self.op.on_nodes:
5743 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5745 raise errors.OpExecError("Complete failure from rpc call")
5746 for node, node_result in result.items():
5748 if not node_result.data:
5749 raise errors.OpExecError("Failure during rpc call to node %s,"
5750 " result: %s" % (node, node_result.data))
5753 class IAllocator(object):
5754 """IAllocator framework.
5756 An IAllocator instance has three sets of attributes:
5757 - cfg that is needed to query the cluster
5758 - input data (all members of the _KEYS class attribute are required)
5759 - four buffer attributes (in|out_data|text), that represent the
5760 input (to the external script) in text and data structure format,
5761 and the output from it, again in two formats
5762 - the result variables from the script (success, info, nodes) for
5767 "mem_size", "disks", "disk_template",
5768 "os", "tags", "nics", "vcpus", "hypervisor",
5774 def __init__(self, lu, mode, name, **kwargs):
5776 # init buffer variables
5777 self.in_text = self.out_text = self.in_data = self.out_data = None
5778 # init all input fields so that pylint is happy
5781 self.mem_size = self.disks = self.disk_template = None
5782 self.os = self.tags = self.nics = self.vcpus = None
5783 self.relocate_from = None
5785 self.required_nodes = None
5786 # init result fields
5787 self.success = self.info = self.nodes = None
5788 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5789 keyset = self._ALLO_KEYS
5790 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5791 keyset = self._RELO_KEYS
5793 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5794 " IAllocator" % self.mode)
5796 if key not in keyset:
5797 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5798 " IAllocator" % key)
5799 setattr(self, key, kwargs[key])
5801 if key not in kwargs:
5802 raise errors.ProgrammerError("Missing input parameter '%s' to"
5803 " IAllocator" % key)
5804 self._BuildInputData()
5806 def _ComputeClusterData(self):
5807 """Compute the generic allocator input data.
5809 This is the data that is independent of the actual operation.
5813 cluster_info = cfg.GetClusterInfo()
5817 "cluster_name": cfg.GetClusterName(),
5818 "cluster_tags": list(cluster_info.GetTags()),
5819 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5820 # we don't have job IDs
5822 iinfo = cfg.GetAllInstancesInfo().values()
5823 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5827 node_list = cfg.GetNodeList()
5829 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5830 hypervisor = self.hypervisor
5831 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5832 hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5834 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5836 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5837 cluster_info.enabled_hypervisors)
5838 for nname in node_list:
5839 ninfo = cfg.GetNodeInfo(nname)
5840 node_data[nname].Raise()
5841 if not isinstance(node_data[nname].data, dict):
5842 raise errors.OpExecError("Can't get data for node %s" % nname)
5843 remote_info = node_data[nname].data
5844 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5845 'vg_size', 'vg_free', 'cpu_total']:
5846 if attr not in remote_info:
5847 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5850 remote_info[attr] = int(remote_info[attr])
5851 except ValueError, err:
5852 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5853 " %s" % (nname, attr, str(err)))
5854 # compute memory used by primary instances
5855 i_p_mem = i_p_up_mem = 0
5856 for iinfo, beinfo in i_list:
5857 if iinfo.primary_node == nname:
5858 i_p_mem += beinfo[constants.BE_MEMORY]
5859 if iinfo.name not in node_iinfo[nname]:
5862 i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5863 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5864 remote_info['memory_free'] -= max(0, i_mem_diff)
5866 if iinfo.status == "up":
5867 i_p_up_mem += beinfo[constants.BE_MEMORY]
5869 # compute memory used by instances
5871 "tags": list(ninfo.GetTags()),
5872 "total_memory": remote_info['memory_total'],
5873 "reserved_memory": remote_info['memory_dom0'],
5874 "free_memory": remote_info['memory_free'],
5875 "i_pri_memory": i_p_mem,
5876 "i_pri_up_memory": i_p_up_mem,
5877 "total_disk": remote_info['vg_size'],
5878 "free_disk": remote_info['vg_free'],
5879 "primary_ip": ninfo.primary_ip,
5880 "secondary_ip": ninfo.secondary_ip,
5881 "total_cpus": remote_info['cpu_total'],
5883 node_results[nname] = pnr
5884 data["nodes"] = node_results
5888 for iinfo, beinfo in i_list:
5889 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5890 for n in iinfo.nics]
5892 "tags": list(iinfo.GetTags()),
5893 "should_run": iinfo.status == "up",
5894 "vcpus": beinfo[constants.BE_VCPUS],
5895 "memory": beinfo[constants.BE_MEMORY],
5897 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5899 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5900 "disk_template": iinfo.disk_template,
5901 "hypervisor": iinfo.hypervisor,
5903 instance_data[iinfo.name] = pir
5905 data["instances"] = instance_data
5909 def _AddNewInstance(self):
5910 """Add new instance data to allocator structure.
5912 This in combination with _AllocatorGetClusterData will create the
5913 correct structure needed as input for the allocator.
5915 The checks for the completeness of the opcode must have already been
5920 if len(self.disks) != 2:
5921 raise errors.OpExecError("Only two-disk configurations supported")
5923 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5925 if self.disk_template in constants.DTS_NET_MIRROR:
5926 self.required_nodes = 2
5928 self.required_nodes = 1
5932 "disk_template": self.disk_template,
5935 "vcpus": self.vcpus,
5936 "memory": self.mem_size,
5937 "disks": self.disks,
5938 "disk_space_total": disk_space,
5940 "required_nodes": self.required_nodes,
5942 data["request"] = request
5944 def _AddRelocateInstance(self):
5945 """Add relocate instance data to allocator structure.
5947 This in combination with _IAllocatorGetClusterData will create the
5948 correct structure needed as input for the allocator.
5950 The checks for the completeness of the opcode must have already been
5954 instance = self.lu.cfg.GetInstanceInfo(self.name)
5955 if instance is None:
5956 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5957 " IAllocator" % self.name)
5959 if instance.disk_template not in constants.DTS_NET_MIRROR:
5960 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5962 if len(instance.secondary_nodes) != 1:
5963 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5965 self.required_nodes = 1
5966 disk_sizes = [{'size': disk.size} for disk in instance.disks]
5967 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5972 "disk_space_total": disk_space,
5973 "required_nodes": self.required_nodes,
5974 "relocate_from": self.relocate_from,
5976 self.in_data["request"] = request
5978 def _BuildInputData(self):
5979 """Build input data structures.
5982 self._ComputeClusterData()
5984 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5985 self._AddNewInstance()
5987 self._AddRelocateInstance()
5989 self.in_text = serializer.Dump(self.in_data)
5991 def Run(self, name, validate=True, call_fn=None):
5992 """Run an instance allocator and return the results.
5996 call_fn = self.lu.rpc.call_iallocator_runner
5999 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6002 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6003 raise errors.OpExecError("Invalid result from master iallocator runner")
6005 rcode, stdout, stderr, fail = result.data
6007 if rcode == constants.IARUN_NOTFOUND:
6008 raise errors.OpExecError("Can't find allocator '%s'" % name)
6009 elif rcode == constants.IARUN_FAILURE:
6010 raise errors.OpExecError("Instance allocator call failed: %s,"
6011 " output: %s" % (fail, stdout+stderr))
6012 self.out_text = stdout
6014 self._ValidateResult()
6016 def _ValidateResult(self):
6017 """Process the allocator results.
6019 This will process and if successful save the result in
6020 self.out_data and the other parameters.
6024 rdict = serializer.Load(self.out_text)
6025 except Exception, err:
6026 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6028 if not isinstance(rdict, dict):
6029 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6031 for key in "success", "info", "nodes":
6032 if key not in rdict:
6033 raise errors.OpExecError("Can't parse iallocator results:"
6034 " missing key '%s'" % key)
6035 setattr(self, key, rdict[key])
6037 if not isinstance(rdict["nodes"], list):
6038 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6040 self.out_data = rdict
6043 class LUTestAllocator(NoHooksLU):
6044 """Run allocator tests.
6046 This LU runs the allocator tests
6049 _OP_REQP = ["direction", "mode", "name"]
6051 def CheckPrereq(self):
6052 """Check prerequisites.
6054 This checks the opcode parameters depending on the director and mode test.
6057 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6058 for attr in ["name", "mem_size", "disks", "disk_template",
6059 "os", "tags", "nics", "vcpus"]:
6060 if not hasattr(self.op, attr):
6061 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6063 iname = self.cfg.ExpandInstanceName(self.op.name)
6064 if iname is not None:
6065 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6067 if not isinstance(self.op.nics, list):
6068 raise errors.OpPrereqError("Invalid parameter 'nics'")
6069 for row in self.op.nics:
6070 if (not isinstance(row, dict) or
6073 "bridge" not in row):
6074 raise errors.OpPrereqError("Invalid contents of the"
6075 " 'nics' parameter")
6076 if not isinstance(self.op.disks, list):
6077 raise errors.OpPrereqError("Invalid parameter 'disks'")
6078 if len(self.op.disks) != 2:
6079 raise errors.OpPrereqError("Only two-disk configurations supported")
6080 for row in self.op.disks:
6081 if (not isinstance(row, dict) or
6082 "size" not in row or
6083 not isinstance(row["size"], int) or
6084 "mode" not in row or
6085 row["mode"] not in ['r', 'w']):
6086 raise errors.OpPrereqError("Invalid contents of the"
6087 " 'disks' parameter")
6088 if self.op.hypervisor is None:
6089 self.op.hypervisor = self.cfg.GetHypervisorType()
6090 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6091 if not hasattr(self.op, "name"):
6092 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6093 fname = self.cfg.ExpandInstanceName(self.op.name)
6095 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6097 self.op.name = fname
6098 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6100 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6103 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6104 if not hasattr(self.op, "allocator") or self.op.allocator is None:
6105 raise errors.OpPrereqError("Missing allocator name")
6106 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6107 raise errors.OpPrereqError("Wrong allocator test '%s'" %
6110 def Exec(self, feedback_fn):
6111 """Run the allocator test.
6114 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6115 ial = IAllocator(self,
6118 mem_size=self.op.mem_size,
6119 disks=self.op.disks,
6120 disk_template=self.op.disk_template,
6124 vcpus=self.op.vcpus,
6125 hypervisor=self.op.hypervisor,
6128 ial = IAllocator(self,
6131 relocate_from=list(self.relocate_from),
6134 if self.op.direction == constants.IALLOCATOR_DIR_IN:
6135 result = ial.in_text
6137 ial.Run(self.op.allocator, validate=False)
6138 result = ial.out_text