4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
49 class LogicalUnit(object):
50 """Logical Unit base class.
52 Subclasses must follow these rules:
53 - implement ExpandNames
54 - implement CheckPrereq
56 - implement BuildHooksEnv
57 - redefine HPATH and HTYPE
58 - optionally redefine their run requirements:
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 self.LogWarning = processor.LogWarning
92 self.LogInfo = processor.LogInfo
94 for attr_name in self._OP_REQP:
95 attr_val = getattr(op, attr_name, None)
97 raise errors.OpPrereqError("Required parameter '%s' missing" %
102 """Returns the SshRunner object
106 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
109 ssh = property(fget=__GetSSH)
111 def CheckArguments(self):
112 """Check syntactic validity for the opcode arguments.
114 This method is for doing a simple syntactic check and ensure
115 validity of opcode parameters, without any cluster-related
116 checks. While the same can be accomplished in ExpandNames and/or
117 CheckPrereq, doing these separate is better because:
119 - ExpandNames is left as as purely a lock-related function
120 - CheckPrereq is run after we have aquired locks (and possible
123 The function is allowed to change the self.op attribute so that
124 later methods can no longer worry about missing parameters.
129 def ExpandNames(self):
130 """Expand names for this LU.
132 This method is called before starting to execute the opcode, and it should
133 update all the parameters of the opcode to their canonical form (e.g. a
134 short node name must be fully expanded after this method has successfully
135 completed). This way locking, hooks, logging, ecc. can work correctly.
137 LUs which implement this method must also populate the self.needed_locks
138 member, as a dict with lock levels as keys, and a list of needed lock names
141 - use an empty dict if you don't need any lock
142 - if you don't need any lock at a particular level omit that level
143 - don't put anything for the BGL level
144 - if you want all locks at a level use locking.ALL_SET as a value
146 If you need to share locks (rather than acquire them exclusively) at one
147 level you can modify self.share_locks, setting a true value (usually 1) for
148 that level. By default locks are not shared.
152 # Acquire all nodes and one instance
153 self.needed_locks = {
154 locking.LEVEL_NODE: locking.ALL_SET,
155 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157 # Acquire just two nodes
158 self.needed_locks = {
159 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
162 self.needed_locks = {} # No, you can't leave it to the default value None
165 # The implementation of this method is mandatory only if the new LU is
166 # concurrent, so that old LUs don't need to be changed all at the same
169 self.needed_locks = {} # Exclusive LUs don't need locks.
171 raise NotImplementedError
173 def DeclareLocks(self, level):
174 """Declare LU locking needs for a level
176 While most LUs can just declare their locking needs at ExpandNames time,
177 sometimes there's the need to calculate some locks after having acquired
178 the ones before. This function is called just before acquiring locks at a
179 particular level, but after acquiring the ones at lower levels, and permits
180 such calculations. It can be used to modify self.needed_locks, and by
181 default it does nothing.
183 This function is only called if you have something already set in
184 self.needed_locks for the level.
186 @param level: Locking level which is going to be locked
187 @type level: member of ganeti.locking.LEVELS
191 def CheckPrereq(self):
192 """Check prerequisites for this LU.
194 This method should check that the prerequisites for the execution
195 of this LU are fulfilled. It can do internode communication, but
196 it should be idempotent - no cluster or system changes are
199 The method should raise errors.OpPrereqError in case something is
200 not fulfilled. Its return value is ignored.
202 This method should also update all the parameters of the opcode to
203 their canonical form if it hasn't been done by ExpandNames before.
206 raise NotImplementedError
208 def Exec(self, feedback_fn):
211 This method should implement the actual work. It should raise
212 errors.OpExecError for failures that are somewhat dealt with in
216 raise NotImplementedError
218 def BuildHooksEnv(self):
219 """Build hooks environment for this LU.
221 This method should return a three-node tuple consisting of: a dict
222 containing the environment that will be used for running the
223 specific hook for this LU, a list of node names on which the hook
224 should run before the execution, and a list of node names on which
225 the hook should run after the execution.
227 The keys of the dict must not have 'GANETI_' prefixed as this will
228 be handled in the hooks runner. Also note additional keys will be
229 added by the hooks runner. If the LU doesn't define any
230 environment, an empty dict (and not None) should be returned.
232 No nodes should be returned as an empty list (and not None).
234 Note that if the HPATH for a LU class is None, this function will
238 raise NotImplementedError
240 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241 """Notify the LU about the results of its hooks.
243 This method is called every time a hooks phase is executed, and notifies
244 the Logical Unit about the hooks' result. The LU can then use it to alter
245 its result based on the hooks. By default the method does nothing and the
246 previous result is passed back unchanged but any LU can define it if it
247 wants to use the local cluster hook-scripts somehow.
249 @param phase: one of L{constants.HOOKS_PHASE_POST} or
250 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251 @param hook_results: the results of the multi-node hooks rpc call
252 @param feedback_fn: function used send feedback back to the caller
253 @param lu_result: the previous Exec result this LU had, or None
255 @return: the new Exec result, based on the previous result
261 def _ExpandAndLockInstance(self):
262 """Helper function to expand and lock an instance.
264 Many LUs that work on an instance take its name in self.op.instance_name
265 and need to expand it and then declare the expanded name for locking. This
266 function does it, and then updates self.op.instance_name to the expanded
267 name. It also initializes needed_locks as a dict, if this hasn't been done
271 if self.needed_locks is None:
272 self.needed_locks = {}
274 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275 "_ExpandAndLockInstance called with instance-level locks set"
276 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277 if expanded_name is None:
278 raise errors.OpPrereqError("Instance '%s' not known" %
279 self.op.instance_name)
280 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281 self.op.instance_name = expanded_name
283 def _LockInstancesNodes(self, primary_only=False):
284 """Helper function to declare instances' nodes for locking.
286 This function should be called after locking one or more instances to lock
287 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288 with all primary or secondary nodes for instances already locked and
289 present in self.needed_locks[locking.LEVEL_INSTANCE].
291 It should be called from DeclareLocks, and for safety only works if
292 self.recalculate_locks[locking.LEVEL_NODE] is set.
294 In the future it may grow parameters to just lock some instance's nodes, or
295 to just lock primaries or secondary nodes, if needed.
297 If should be called in DeclareLocks in a way similar to::
299 if level == locking.LEVEL_NODE:
300 self._LockInstancesNodes()
302 @type primary_only: boolean
303 @param primary_only: only lock primary nodes of locked instances
306 assert locking.LEVEL_NODE in self.recalculate_locks, \
307 "_LockInstancesNodes helper function called with no nodes to recalculate"
309 # TODO: check if we're really been called with the instance locks held
311 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312 # future we might want to have different behaviors depending on the value
313 # of self.recalculate_locks[locking.LEVEL_NODE]
315 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316 instance = self.context.cfg.GetInstanceInfo(instance_name)
317 wanted_nodes.append(instance.primary_node)
319 wanted_nodes.extend(instance.secondary_nodes)
321 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326 del self.recalculate_locks[locking.LEVEL_NODE]
329 class NoHooksLU(LogicalUnit):
330 """Simple LU which runs no hooks.
332 This LU is intended as a parent for other LogicalUnits which will
333 run no hooks, in order to reduce duplicate code.
340 def _GetWantedNodes(lu, nodes):
341 """Returns list of checked and expanded node names.
343 @type lu: L{LogicalUnit}
344 @param lu: the logical unit on whose behalf we execute
346 @param nodes: list of node names or None for all nodes
348 @return: the list of nodes, sorted
349 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
352 if not isinstance(nodes, list):
353 raise errors.OpPrereqError("Invalid argument type 'nodes'")
356 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357 " non-empty list of nodes whose name is to be expanded.")
361 node = lu.cfg.ExpandNodeName(name)
363 raise errors.OpPrereqError("No such node name '%s'" % name)
366 return utils.NiceSort(wanted)
369 def _GetWantedInstances(lu, instances):
370 """Returns list of checked and expanded instance names.
372 @type lu: L{LogicalUnit}
373 @param lu: the logical unit on whose behalf we execute
374 @type instances: list
375 @param instances: list of instance names or None for all instances
377 @return: the list of instances, sorted
378 @raise errors.OpPrereqError: if the instances parameter is wrong type
379 @raise errors.OpPrereqError: if any of the passed instances is not found
382 if not isinstance(instances, list):
383 raise errors.OpPrereqError("Invalid argument type 'instances'")
388 for name in instances:
389 instance = lu.cfg.ExpandInstanceName(name)
391 raise errors.OpPrereqError("No such instance name '%s'" % name)
392 wanted.append(instance)
395 wanted = lu.cfg.GetInstanceList()
396 return utils.NiceSort(wanted)
399 def _CheckOutputFields(static, dynamic, selected):
400 """Checks whether all selected fields are valid.
402 @type static: L{utils.FieldSet}
403 @param static: static fields set
404 @type dynamic: L{utils.FieldSet}
405 @param dynamic: dynamic fields set
412 delta = f.NonMatching(selected)
414 raise errors.OpPrereqError("Unknown output fields selected: %s"
418 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419 memory, vcpus, nics):
420 """Builds instance related env variables for hooks
422 This builds the hook environment from individual variables.
425 @param name: the name of the instance
426 @type primary_node: string
427 @param primary_node: the name of the instance's primary node
428 @type secondary_nodes: list
429 @param secondary_nodes: list of secondary nodes as strings
430 @type os_type: string
431 @param os_type: the name of the instance's OS
433 @param status: the desired status of the instances
435 @param memory: the memory size of the instance
437 @param vcpus: the count of VCPUs the instance has
439 @param nics: list of tuples (ip, bridge, mac) representing
440 the NICs the instance has
442 @return: the hook environment for this instance
447 "INSTANCE_NAME": name,
448 "INSTANCE_PRIMARY": primary_node,
449 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450 "INSTANCE_OS_TYPE": os_type,
451 "INSTANCE_STATUS": status,
452 "INSTANCE_MEMORY": memory,
453 "INSTANCE_VCPUS": vcpus,
457 nic_count = len(nics)
458 for idx, (ip, bridge, mac) in enumerate(nics):
461 env["INSTANCE_NIC%d_IP" % idx] = ip
462 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
467 env["INSTANCE_NIC_COUNT"] = nic_count
472 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473 """Builds instance related env variables for hooks from an object.
475 @type lu: L{LogicalUnit}
476 @param lu: the logical unit on whose behalf we execute
477 @type instance: L{objects.Instance}
478 @param instance: the instance for which we should build the
481 @param override: dictionary with key/values that will override
484 @return: the hook environment dictionary
487 bep = lu.cfg.GetClusterInfo().FillBE(instance)
489 'name': instance.name,
490 'primary_node': instance.primary_node,
491 'secondary_nodes': instance.secondary_nodes,
492 'os_type': instance.os,
493 'status': instance.os,
494 'memory': bep[constants.BE_MEMORY],
495 'vcpus': bep[constants.BE_VCPUS],
496 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
499 args.update(override)
500 return _BuildInstanceHookEnv(**args)
503 def _CheckInstanceBridgesExist(lu, instance):
504 """Check that the brigdes needed by an instance exist.
507 # check bridges existance
508 brlist = [nic.bridge for nic in instance.nics]
509 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
512 raise errors.OpPrereqError("One or more target bridges %s does not"
513 " exist on destination node '%s'" %
514 (brlist, instance.primary_node))
517 class LUDestroyCluster(NoHooksLU):
518 """Logical unit for destroying the cluster.
523 def CheckPrereq(self):
524 """Check prerequisites.
526 This checks whether the cluster is empty.
528 Any errors are signalled by raising errors.OpPrereqError.
531 master = self.cfg.GetMasterNode()
533 nodelist = self.cfg.GetNodeList()
534 if len(nodelist) != 1 or nodelist[0] != master:
535 raise errors.OpPrereqError("There are still %d node(s) in"
536 " this cluster." % (len(nodelist) - 1))
537 instancelist = self.cfg.GetInstanceList()
539 raise errors.OpPrereqError("There are still %d instance(s) in"
540 " this cluster." % len(instancelist))
542 def Exec(self, feedback_fn):
543 """Destroys the cluster.
546 master = self.cfg.GetMasterNode()
547 result = self.rpc.call_node_stop_master(master, False)
550 raise errors.OpExecError("Could not disable the master role")
551 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552 utils.CreateBackup(priv_key)
553 utils.CreateBackup(pub_key)
557 class LUVerifyCluster(LogicalUnit):
558 """Verifies the cluster status.
561 HPATH = "cluster-verify"
562 HTYPE = constants.HTYPE_CLUSTER
563 _OP_REQP = ["skip_checks"]
566 def ExpandNames(self):
567 self.needed_locks = {
568 locking.LEVEL_NODE: locking.ALL_SET,
569 locking.LEVEL_INSTANCE: locking.ALL_SET,
571 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
573 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574 node_result, feedback_fn, master_files):
575 """Run multiple tests against a node.
579 - compares ganeti version
580 - checks vg existance and size > 20G
581 - checks config file checksum
582 - checks ssh to other nodes
584 @type nodeinfo: L{objects.Node}
585 @param nodeinfo: the node to check
586 @param file_list: required list of files
587 @param local_cksum: dictionary of local files and their checksums
588 @param node_result: the results from the node
589 @param feedback_fn: function used to accumulate results
590 @param master_files: list of files that only masters should have
595 # main result, node_result should be a non-empty dict
596 if not node_result or not isinstance(node_result, dict):
597 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
600 # compares ganeti version
601 local_version = constants.PROTOCOL_VERSION
602 remote_version = node_result.get('version', None)
603 if not remote_version:
604 feedback_fn(" - ERROR: connection to %s failed" % (node))
607 if local_version != remote_version:
608 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
609 (local_version, node, remote_version))
612 # checks vg existance and size > 20G
615 vglist = node_result.get(constants.NV_VGLIST, None)
617 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
621 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622 constants.MIN_VG_SIZE)
624 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
627 # checks config file checksum
629 remote_cksum = node_result.get(constants.NV_FILELIST, None)
630 if not isinstance(remote_cksum, dict):
632 feedback_fn(" - ERROR: node hasn't returned file checksum data")
634 for file_name in file_list:
635 node_is_mc = nodeinfo.master_candidate
636 must_have_file = file_name not in master_files
637 if file_name not in remote_cksum:
638 if node_is_mc or must_have_file:
640 feedback_fn(" - ERROR: file '%s' missing" % file_name)
641 elif remote_cksum[file_name] != local_cksum[file_name]:
642 if node_is_mc or must_have_file:
644 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
646 # not candidate and this is not a must-have file
648 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
651 # all good, except non-master/non-must have combination
652 if not node_is_mc and not must_have_file:
653 feedback_fn(" - ERROR: file '%s' should not exist on non master"
654 " candidates" % file_name)
658 if constants.NV_NODELIST not in node_result:
660 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
662 if node_result[constants.NV_NODELIST]:
664 for node in node_result[constants.NV_NODELIST]:
665 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
666 (node, node_result[constants.NV_NODELIST][node]))
668 if constants.NV_NODENETTEST not in node_result:
670 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
672 if node_result[constants.NV_NODENETTEST]:
674 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
676 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
677 (node, node_result[constants.NV_NODENETTEST][node]))
679 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680 if isinstance(hyp_result, dict):
681 for hv_name, hv_result in hyp_result.iteritems():
682 if hv_result is not None:
683 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
684 (hv_name, hv_result))
687 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688 node_instance, feedback_fn):
689 """Verify an instance.
691 This function checks to see if the required block devices are
692 available on the instance's node.
697 node_current = instanceconfig.primary_node
700 instanceconfig.MapLVsByNode(node_vol_should)
702 for node in node_vol_should:
703 for volume in node_vol_should[node]:
704 if node not in node_vol_is or volume not in node_vol_is[node]:
705 feedback_fn(" - ERROR: volume %s missing on node %s" %
709 if not instanceconfig.status == 'down':
710 if (node_current not in node_instance or
711 not instance in node_instance[node_current]):
712 feedback_fn(" - ERROR: instance %s not running on node %s" %
713 (instance, node_current))
716 for node in node_instance:
717 if (not node == node_current):
718 if instance in node_instance[node]:
719 feedback_fn(" - ERROR: instance %s should not run on node %s" %
725 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726 """Verify if there are any unknown volumes in the cluster.
728 The .os, .swap and backup volumes are ignored. All other volumes are
734 for node in node_vol_is:
735 for volume in node_vol_is[node]:
736 if node not in node_vol_should or volume not in node_vol_should[node]:
737 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
742 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743 """Verify the list of running instances.
745 This checks what instances are running but unknown to the cluster.
749 for node in node_instance:
750 for runninginstance in node_instance[node]:
751 if runninginstance not in instancelist:
752 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
753 (runninginstance, node))
757 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758 """Verify N+1 Memory Resilience.
760 Check that if one single node dies we can still start all the instances it
766 for node, nodeinfo in node_info.iteritems():
767 # This code checks that every node which is now listed as secondary has
768 # enough memory to host all instances it is supposed to should a single
769 # other node in the cluster fail.
770 # FIXME: not ready for failover to an arbitrary node
771 # FIXME: does not support file-backed instances
772 # WARNING: we currently take into account down instances as well as up
773 # ones, considering that even if they're down someone might want to start
774 # them even in the event of a node failure.
775 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
777 for instance in instances:
778 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779 if bep[constants.BE_AUTO_BALANCE]:
780 needed_mem += bep[constants.BE_MEMORY]
781 if nodeinfo['mfree'] < needed_mem:
782 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
783 " failovers should node %s fail" % (node, prinode))
787 def CheckPrereq(self):
788 """Check prerequisites.
790 Transform the list of checks we're going to skip into a set and check that
791 all its members are valid.
794 self.skip_set = frozenset(self.op.skip_checks)
795 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796 raise errors.OpPrereqError("Invalid checks to be skipped specified")
798 def BuildHooksEnv(self):
801 Cluster-Verify hooks just rone in the post phase and their failure makes
802 the output be logged in the verify output and the verification to fail.
805 all_nodes = self.cfg.GetNodeList()
806 # TODO: populate the environment with useful information for verify hooks
808 return env, [], all_nodes
810 def Exec(self, feedback_fn):
811 """Verify integrity of cluster, performing various test on nodes.
815 feedback_fn("* Verifying global settings")
816 for msg in self.cfg.VerifyConfig():
817 feedback_fn(" - ERROR: %s" % msg)
819 vg_name = self.cfg.GetVGName()
820 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821 nodelist = utils.NiceSort(self.cfg.GetNodeList())
822 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824 i_non_redundant = [] # Non redundant instances
825 i_non_a_balanced = [] # Non auto-balanced instances
831 # FIXME: verify OS list
833 master_files = [constants.CLUSTER_CONF_FILE]
835 file_names = ssconf.SimpleStore().GetFileList()
836 file_names.append(constants.SSL_CERT_FILE)
837 file_names.extend(master_files)
839 local_checksums = utils.FingerprintFiles(file_names)
841 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842 node_verify_param = {
843 constants.NV_FILELIST: file_names,
844 constants.NV_NODELIST: nodelist,
845 constants.NV_HYPERVISOR: hypervisors,
846 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847 node.secondary_ip) for node in nodeinfo],
848 constants.NV_LVLIST: vg_name,
849 constants.NV_INSTANCELIST: hypervisors,
850 constants.NV_VGLIST: None,
851 constants.NV_VERSION: None,
852 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
854 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855 self.cfg.GetClusterName())
857 cluster = self.cfg.GetClusterInfo()
858 master_node = self.cfg.GetMasterNode()
859 for node_i in nodeinfo:
861 nresult = all_nvinfo[node].data
863 if node == master_node:
865 elif node_i.master_candidate:
866 ntype = "master candidate"
869 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
871 if all_nvinfo[node].failed or not isinstance(nresult, dict):
872 feedback_fn(" - ERROR: connection to %s failed" % (node,))
876 result = self._VerifyNode(node_i, file_names, local_checksums,
877 nresult, feedback_fn, master_files)
880 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881 if isinstance(lvdata, basestring):
882 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
883 (node, lvdata.encode('string_escape')))
885 node_volume[node] = {}
886 elif not isinstance(lvdata, dict):
887 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
891 node_volume[node] = lvdata
894 idata = nresult.get(constants.NV_INSTANCELIST, None)
895 if not isinstance(idata, list):
896 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
901 node_instance[node] = idata
904 nodeinfo = nresult.get(constants.NV_HVINFO, None)
905 if not isinstance(nodeinfo, dict):
906 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
912 "mfree": int(nodeinfo['memory_free']),
913 "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
916 # dictionary holding all instances this node is secondary for,
917 # grouped by their primary node. Each key is a cluster node, and each
918 # value is a list of instances which have the key as primary and the
919 # current node as secondary. this is handy to calculate N+1 memory
920 # availability if you can only failover from a primary to its
922 "sinst-by-pnode": {},
925 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
931 for instance in instancelist:
932 feedback_fn("* Verifying instance %s" % instance)
933 inst_config = self.cfg.GetInstanceInfo(instance)
934 result = self._VerifyInstance(instance, inst_config, node_volume,
935 node_instance, feedback_fn)
938 inst_config.MapLVsByNode(node_vol_should)
940 instance_cfg[instance] = inst_config
942 pnode = inst_config.primary_node
943 if pnode in node_info:
944 node_info[pnode]['pinst'].append(instance)
946 feedback_fn(" - ERROR: instance %s, connection to primary node"
947 " %s failed" % (instance, pnode))
950 # If the instance is non-redundant we cannot survive losing its primary
951 # node, so we are not N+1 compliant. On the other hand we have no disk
952 # templates with more than one secondary so that situation is not well
954 # FIXME: does not support file-backed instances
955 if len(inst_config.secondary_nodes) == 0:
956 i_non_redundant.append(instance)
957 elif len(inst_config.secondary_nodes) > 1:
958 feedback_fn(" - WARNING: multiple secondaries for instance %s"
961 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962 i_non_a_balanced.append(instance)
964 for snode in inst_config.secondary_nodes:
965 if snode in node_info:
966 node_info[snode]['sinst'].append(instance)
967 if pnode not in node_info[snode]['sinst-by-pnode']:
968 node_info[snode]['sinst-by-pnode'][pnode] = []
969 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
971 feedback_fn(" - ERROR: instance %s, connection to secondary node"
972 " %s failed" % (instance, snode))
974 feedback_fn("* Verifying orphan volumes")
975 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
979 feedback_fn("* Verifying remaining instances")
980 result = self._VerifyOrphanInstances(instancelist, node_instance,
984 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985 feedback_fn("* Verifying N+1 Memory redundancy")
986 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
989 feedback_fn("* Other Notes")
991 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
992 % len(i_non_redundant))
995 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
996 % len(i_non_a_balanced))
1000 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001 """Analize the post-hooks' result
1003 This method analyses the hook result, handles it, and sends some
1004 nicely-formatted feedback back to the user.
1006 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008 @param hooks_results: the results of the multi-node hooks rpc call
1009 @param feedback_fn: function used send feedback back to the caller
1010 @param lu_result: previous Exec result
1011 @return: the new Exec result, based on the previous result
1015 # We only really run POST phase hooks, and are only interested in
1017 if phase == constants.HOOKS_PHASE_POST:
1018 # Used to change hooks' output to proper indentation
1019 indent_re = re.compile('^', re.M)
1020 feedback_fn("* Hooks Results")
1021 if not hooks_results:
1022 feedback_fn(" - ERROR: general communication failure")
1025 for node_name in hooks_results:
1026 show_node_header = True
1027 res = hooks_results[node_name]
1028 if res.failed or res.data is False or not isinstance(res.data, list):
1029 feedback_fn(" Communication failure in hooks execution")
1032 for script, hkr, output in res.data:
1033 if hkr == constants.HKR_FAIL:
1034 # The node header is only shown once, if there are
1035 # failing hooks on that node
1036 if show_node_header:
1037 feedback_fn(" Node %s:" % node_name)
1038 show_node_header = False
1039 feedback_fn(" ERROR: Script %s failed, output:" % script)
1040 output = indent_re.sub(' ', output)
1041 feedback_fn("%s" % output)
1047 class LUVerifyDisks(NoHooksLU):
1048 """Verifies the cluster disks status.
1054 def ExpandNames(self):
1055 self.needed_locks = {
1056 locking.LEVEL_NODE: locking.ALL_SET,
1057 locking.LEVEL_INSTANCE: locking.ALL_SET,
1059 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1061 def CheckPrereq(self):
1062 """Check prerequisites.
1064 This has no prerequisites.
1069 def Exec(self, feedback_fn):
1070 """Verify integrity of cluster disks.
1073 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1075 vg_name = self.cfg.GetVGName()
1076 nodes = utils.NiceSort(self.cfg.GetNodeList())
1077 instances = [self.cfg.GetInstanceInfo(name)
1078 for name in self.cfg.GetInstanceList()]
1081 for inst in instances:
1083 if (inst.status != "up" or
1084 inst.disk_template not in constants.DTS_NET_MIRROR):
1086 inst.MapLVsByNode(inst_lvs)
1087 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088 for node, vol_list in inst_lvs.iteritems():
1089 for vol in vol_list:
1090 nv_dict[(node, vol)] = inst
1095 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1100 lvs = node_lvs[node]
1102 self.LogWarning("Connection to node %s failed: %s" %
1106 if isinstance(lvs, basestring):
1107 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108 res_nlvm[node] = lvs
1109 elif not isinstance(lvs, dict):
1110 logging.warning("Connection to node %s failed or invalid data"
1112 res_nodes.append(node)
1115 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116 inst = nv_dict.pop((node, lv_name), None)
1117 if (not lv_online and inst is not None
1118 and inst.name not in res_instances):
1119 res_instances.append(inst.name)
1121 # any leftover items in nv_dict are missing LVs, let's arrange the
1123 for key, inst in nv_dict.iteritems():
1124 if inst.name not in res_missing:
1125 res_missing[inst.name] = []
1126 res_missing[inst.name].append(key)
1131 class LURenameCluster(LogicalUnit):
1132 """Rename the cluster.
1135 HPATH = "cluster-rename"
1136 HTYPE = constants.HTYPE_CLUSTER
1139 def BuildHooksEnv(self):
1144 "OP_TARGET": self.cfg.GetClusterName(),
1145 "NEW_NAME": self.op.name,
1147 mn = self.cfg.GetMasterNode()
1148 return env, [mn], [mn]
1150 def CheckPrereq(self):
1151 """Verify that the passed name is a valid one.
1154 hostname = utils.HostInfo(self.op.name)
1156 new_name = hostname.name
1157 self.ip = new_ip = hostname.ip
1158 old_name = self.cfg.GetClusterName()
1159 old_ip = self.cfg.GetMasterIP()
1160 if new_name == old_name and new_ip == old_ip:
1161 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162 " cluster has changed")
1163 if new_ip != old_ip:
1164 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166 " reachable on the network. Aborting." %
1169 self.op.name = new_name
1171 def Exec(self, feedback_fn):
1172 """Rename the cluster.
1175 clustername = self.op.name
1178 # shutdown the master IP
1179 master = self.cfg.GetMasterNode()
1180 result = self.rpc.call_node_stop_master(master, False)
1181 if result.failed or not result.data:
1182 raise errors.OpExecError("Could not disable the master role")
1185 cluster = self.cfg.GetClusterInfo()
1186 cluster.cluster_name = clustername
1187 cluster.master_ip = ip
1188 self.cfg.Update(cluster)
1190 # update the known hosts file
1191 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192 node_list = self.cfg.GetNodeList()
1194 node_list.remove(master)
1197 result = self.rpc.call_upload_file(node_list,
1198 constants.SSH_KNOWN_HOSTS_FILE)
1199 for to_node, to_result in result.iteritems():
1200 if to_result.failed or not to_result.data:
1201 logging.error("Copy of file %s to node %s failed", fname, to_node)
1204 result = self.rpc.call_node_start_master(master, False)
1205 if result.failed or not result.data:
1206 self.LogWarning("Could not re-enable the master role on"
1207 " the master, please restart manually.")
1210 def _RecursiveCheckIfLVMBased(disk):
1211 """Check if the given disk or its children are lvm-based.
1213 @type disk: L{objects.Disk}
1214 @param disk: the disk to check
1216 @return: boolean indicating whether a LD_LV dev_type was found or not
1220 for chdisk in disk.children:
1221 if _RecursiveCheckIfLVMBased(chdisk):
1223 return disk.dev_type == constants.LD_LV
1226 class LUSetClusterParams(LogicalUnit):
1227 """Change the parameters of the cluster.
1230 HPATH = "cluster-modify"
1231 HTYPE = constants.HTYPE_CLUSTER
1235 def CheckParameters(self):
1239 if not hasattr(self.op, "candidate_pool_size"):
1240 self.op.candidate_pool_size = None
1241 if self.op.candidate_pool_size is not None:
1243 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244 except ValueError, err:
1245 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1247 if self.op.candidate_pool_size < 1:
1248 raise errors.OpPrereqError("At least one master candidate needed")
1250 def ExpandNames(self):
1251 # FIXME: in the future maybe other cluster params won't require checking on
1252 # all nodes to be modified.
1253 self.needed_locks = {
1254 locking.LEVEL_NODE: locking.ALL_SET,
1256 self.share_locks[locking.LEVEL_NODE] = 1
1258 def BuildHooksEnv(self):
1263 "OP_TARGET": self.cfg.GetClusterName(),
1264 "NEW_VG_NAME": self.op.vg_name,
1266 mn = self.cfg.GetMasterNode()
1267 return env, [mn], [mn]
1269 def CheckPrereq(self):
1270 """Check prerequisites.
1272 This checks whether the given params don't conflict and
1273 if the given volume group is valid.
1276 # FIXME: This only works because there is only one parameter that can be
1277 # changed or removed.
1278 if self.op.vg_name is not None and not self.op.vg_name:
1279 instances = self.cfg.GetAllInstancesInfo().values()
1280 for inst in instances:
1281 for disk in inst.disks:
1282 if _RecursiveCheckIfLVMBased(disk):
1283 raise errors.OpPrereqError("Cannot disable lvm storage while"
1284 " lvm-based instances exist")
1286 node_list = self.acquired_locks[locking.LEVEL_NODE]
1288 # if vg_name not None, checks given volume group on all nodes
1290 vglist = self.rpc.call_vg_list(node_list)
1291 for node in node_list:
1292 if vglist[node].failed:
1293 # ignoring down node
1294 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1296 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1298 constants.MIN_VG_SIZE)
1300 raise errors.OpPrereqError("Error on node '%s': %s" %
1303 self.cluster = cluster = self.cfg.GetClusterInfo()
1304 # validate beparams changes
1305 if self.op.beparams:
1306 utils.CheckBEParams(self.op.beparams)
1307 self.new_beparams = cluster.FillDict(
1308 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1310 # hypervisor list/parameters
1311 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312 if self.op.hvparams:
1313 if not isinstance(self.op.hvparams, dict):
1314 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315 for hv_name, hv_dict in self.op.hvparams.items():
1316 if hv_name not in self.new_hvparams:
1317 self.new_hvparams[hv_name] = hv_dict
1319 self.new_hvparams[hv_name].update(hv_dict)
1321 if self.op.enabled_hypervisors is not None:
1322 self.hv_list = self.op.enabled_hypervisors
1324 self.hv_list = cluster.enabled_hypervisors
1326 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327 # either the enabled list has changed, or the parameters have, validate
1328 for hv_name, hv_params in self.new_hvparams.items():
1329 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330 (self.op.enabled_hypervisors and
1331 hv_name in self.op.enabled_hypervisors)):
1332 # either this is a new hypervisor, or its parameters have changed
1333 hv_class = hypervisor.GetHypervisor(hv_name)
1334 hv_class.CheckParameterSyntax(hv_params)
1335 _CheckHVParams(self, node_list, hv_name, hv_params)
1337 def Exec(self, feedback_fn):
1338 """Change the parameters of the cluster.
1341 if self.op.vg_name is not None:
1342 if self.op.vg_name != self.cfg.GetVGName():
1343 self.cfg.SetVGName(self.op.vg_name)
1345 feedback_fn("Cluster LVM configuration already in desired"
1346 " state, not changing")
1347 if self.op.hvparams:
1348 self.cluster.hvparams = self.new_hvparams
1349 if self.op.enabled_hypervisors is not None:
1350 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351 if self.op.beparams:
1352 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353 if self.op.candidate_pool_size is not None:
1354 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1356 self.cfg.Update(self.cluster)
1358 # we want to update nodes after the cluster so that if any errors
1359 # happen, we have recorded and saved the cluster info
1360 if self.op.candidate_pool_size is not None:
1361 node_info = self.cfg.GetAllNodesInfo().values()
1362 num_candidates = len([node for node in node_info
1363 if node.master_candidate])
1364 num_nodes = len(node_info)
1365 if num_candidates < self.op.candidate_pool_size:
1366 random.shuffle(node_info)
1367 for node in node_info:
1368 if num_candidates >= self.op.candidate_pool_size:
1370 if node.master_candidate:
1372 node.master_candidate = True
1373 self.LogInfo("Promoting node %s to master candidate", node.name)
1374 self.cfg.Update(node)
1375 self.context.ReaddNode(node)
1377 elif num_candidates > self.op.candidate_pool_size:
1378 self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379 " of candidate_pool_size (%d)" %
1380 (num_candidates, self.op.candidate_pool_size))
1383 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384 """Sleep and poll for an instance's disk to sync.
1387 if not instance.disks:
1391 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1393 node = instance.primary_node
1395 for dev in instance.disks:
1396 lu.cfg.SetDiskID(dev, node)
1402 cumul_degraded = False
1403 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404 if rstats.failed or not rstats.data:
1405 lu.LogWarning("Can't get any data from node %s", node)
1408 raise errors.RemoteError("Can't contact node %s for mirror data,"
1409 " aborting." % node)
1412 rstats = rstats.data
1414 for i in range(len(rstats)):
1417 lu.LogWarning("Can't compute data for node %s/%s",
1418 node, instance.disks[i].iv_name)
1420 # we ignore the ldisk parameter
1421 perc_done, est_time, is_degraded, _ = mstat
1422 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423 if perc_done is not None:
1425 if est_time is not None:
1426 rem_time = "%d estimated seconds remaining" % est_time
1429 rem_time = "no time estimate"
1430 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431 (instance.disks[i].iv_name, perc_done, rem_time))
1435 time.sleep(min(60, max_time))
1438 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439 return not cumul_degraded
1442 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443 """Check that mirrors are not degraded.
1445 The ldisk parameter, if True, will change the test from the
1446 is_degraded attribute (which represents overall non-ok status for
1447 the device(s)) to the ldisk (representing the local storage status).
1450 lu.cfg.SetDiskID(dev, node)
1457 if on_primary or dev.AssembleOnSecondary():
1458 rstats = lu.rpc.call_blockdev_find(node, dev)
1459 if rstats.failed or not rstats.data:
1460 logging.warning("Node %s: disk degraded, not found or node down", node)
1463 result = result and (not rstats.data[idx])
1465 for child in dev.children:
1466 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1471 class LUDiagnoseOS(NoHooksLU):
1472 """Logical unit for OS diagnose/query.
1475 _OP_REQP = ["output_fields", "names"]
1477 _FIELDS_STATIC = utils.FieldSet()
1478 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1480 def ExpandNames(self):
1482 raise errors.OpPrereqError("Selective OS query not supported")
1484 _CheckOutputFields(static=self._FIELDS_STATIC,
1485 dynamic=self._FIELDS_DYNAMIC,
1486 selected=self.op.output_fields)
1488 # Lock all nodes, in shared mode
1489 self.needed_locks = {}
1490 self.share_locks[locking.LEVEL_NODE] = 1
1491 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1493 def CheckPrereq(self):
1494 """Check prerequisites.
1499 def _DiagnoseByOS(node_list, rlist):
1500 """Remaps a per-node return list into an a per-os per-node dictionary
1502 @param node_list: a list with the names of all nodes
1503 @param rlist: a map with node names as keys and OS objects as values
1506 @returns: a dictionary with osnames as keys and as value another map, with
1507 nodes as keys and list of OS objects as values, eg::
1509 {"debian-etch": {"node1": [<object>,...],
1510 "node2": [<object>,]}
1515 for node_name, nr in rlist.iteritems():
1516 if nr.failed or not nr.data:
1518 for os_obj in nr.data:
1519 if os_obj.name not in all_os:
1520 # build a list of nodes for this os containing empty lists
1521 # for each node in node_list
1522 all_os[os_obj.name] = {}
1523 for nname in node_list:
1524 all_os[os_obj.name][nname] = []
1525 all_os[os_obj.name][node_name].append(os_obj)
1528 def Exec(self, feedback_fn):
1529 """Compute the list of OSes.
1532 node_list = self.acquired_locks[locking.LEVEL_NODE]
1533 node_data = self.rpc.call_os_diagnose(node_list)
1534 if node_data == False:
1535 raise errors.OpExecError("Can't gather the list of OSes")
1536 pol = self._DiagnoseByOS(node_list, node_data)
1538 for os_name, os_data in pol.iteritems():
1540 for field in self.op.output_fields:
1543 elif field == "valid":
1544 val = utils.all([osl and osl[0] for osl in os_data.values()])
1545 elif field == "node_status":
1547 for node_name, nos_list in os_data.iteritems():
1548 val[node_name] = [(v.status, v.path) for v in nos_list]
1550 raise errors.ParameterError(field)
1557 class LURemoveNode(LogicalUnit):
1558 """Logical unit for removing a node.
1561 HPATH = "node-remove"
1562 HTYPE = constants.HTYPE_NODE
1563 _OP_REQP = ["node_name"]
1565 def BuildHooksEnv(self):
1568 This doesn't run on the target node in the pre phase as a failed
1569 node would then be impossible to remove.
1573 "OP_TARGET": self.op.node_name,
1574 "NODE_NAME": self.op.node_name,
1576 all_nodes = self.cfg.GetNodeList()
1577 all_nodes.remove(self.op.node_name)
1578 return env, all_nodes, all_nodes
1580 def CheckPrereq(self):
1581 """Check prerequisites.
1584 - the node exists in the configuration
1585 - it does not have primary or secondary instances
1586 - it's not the master
1588 Any errors are signalled by raising errors.OpPrereqError.
1591 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1593 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1595 instance_list = self.cfg.GetInstanceList()
1597 masternode = self.cfg.GetMasterNode()
1598 if node.name == masternode:
1599 raise errors.OpPrereqError("Node is the master node,"
1600 " you need to failover first.")
1602 for instance_name in instance_list:
1603 instance = self.cfg.GetInstanceInfo(instance_name)
1604 if node.name == instance.primary_node:
1605 raise errors.OpPrereqError("Instance %s still running on the node,"
1606 " please remove first." % instance_name)
1607 if node.name in instance.secondary_nodes:
1608 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609 " please remove first." % instance_name)
1610 self.op.node_name = node.name
1613 def Exec(self, feedback_fn):
1614 """Removes the node from the cluster.
1618 logging.info("Stopping the node daemon and removing configs from node %s",
1621 self.context.RemoveNode(node.name)
1623 self.rpc.call_node_leave_cluster(node.name)
1626 class LUQueryNodes(NoHooksLU):
1627 """Logical unit for querying nodes.
1630 _OP_REQP = ["output_fields", "names"]
1632 _FIELDS_DYNAMIC = utils.FieldSet(
1634 "mtotal", "mnode", "mfree",
1639 _FIELDS_STATIC = utils.FieldSet(
1640 "name", "pinst_cnt", "sinst_cnt",
1641 "pinst_list", "sinst_list",
1642 "pip", "sip", "tags",
1648 def ExpandNames(self):
1649 _CheckOutputFields(static=self._FIELDS_STATIC,
1650 dynamic=self._FIELDS_DYNAMIC,
1651 selected=self.op.output_fields)
1653 self.needed_locks = {}
1654 self.share_locks[locking.LEVEL_NODE] = 1
1657 self.wanted = _GetWantedNodes(self, self.op.names)
1659 self.wanted = locking.ALL_SET
1661 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1663 # if we don't request only static fields, we need to lock the nodes
1664 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1667 def CheckPrereq(self):
1668 """Check prerequisites.
1671 # The validation of the node list is done in the _GetWantedNodes,
1672 # if non empty, and if empty, there's no validation to do
1675 def Exec(self, feedback_fn):
1676 """Computes the list of nodes and their attributes.
1679 all_info = self.cfg.GetAllNodesInfo()
1681 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1682 elif self.wanted != locking.ALL_SET:
1683 nodenames = self.wanted
1684 missing = set(nodenames).difference(all_info.keys())
1686 raise errors.OpExecError(
1687 "Some nodes were removed before retrieving their data: %s" % missing)
1689 nodenames = all_info.keys()
1691 nodenames = utils.NiceSort(nodenames)
1692 nodelist = [all_info[name] for name in nodenames]
1694 # begin data gathering
1698 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1699 self.cfg.GetHypervisorType())
1700 for name in nodenames:
1701 nodeinfo = node_data[name]
1702 if not nodeinfo.failed and nodeinfo.data:
1703 nodeinfo = nodeinfo.data
1704 fn = utils.TryConvert
1706 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1707 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1708 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1709 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1710 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1711 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1712 "bootid": nodeinfo.get('bootid', None),
1715 live_data[name] = {}
1717 live_data = dict.fromkeys(nodenames, {})
1719 node_to_primary = dict([(name, set()) for name in nodenames])
1720 node_to_secondary = dict([(name, set()) for name in nodenames])
1722 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1723 "sinst_cnt", "sinst_list"))
1724 if inst_fields & frozenset(self.op.output_fields):
1725 instancelist = self.cfg.GetInstanceList()
1727 for instance_name in instancelist:
1728 inst = self.cfg.GetInstanceInfo(instance_name)
1729 if inst.primary_node in node_to_primary:
1730 node_to_primary[inst.primary_node].add(inst.name)
1731 for secnode in inst.secondary_nodes:
1732 if secnode in node_to_secondary:
1733 node_to_secondary[secnode].add(inst.name)
1735 master_node = self.cfg.GetMasterNode()
1737 # end data gathering
1740 for node in nodelist:
1742 for field in self.op.output_fields:
1745 elif field == "pinst_list":
1746 val = list(node_to_primary[node.name])
1747 elif field == "sinst_list":
1748 val = list(node_to_secondary[node.name])
1749 elif field == "pinst_cnt":
1750 val = len(node_to_primary[node.name])
1751 elif field == "sinst_cnt":
1752 val = len(node_to_secondary[node.name])
1753 elif field == "pip":
1754 val = node.primary_ip
1755 elif field == "sip":
1756 val = node.secondary_ip
1757 elif field == "tags":
1758 val = list(node.GetTags())
1759 elif field == "serial_no":
1760 val = node.serial_no
1761 elif field == "master_candidate":
1762 val = node.master_candidate
1763 elif field == "master":
1764 val = node.name == master_node
1765 elif self._FIELDS_DYNAMIC.Matches(field):
1766 val = live_data[node.name].get(field, None)
1768 raise errors.ParameterError(field)
1769 node_output.append(val)
1770 output.append(node_output)
1775 class LUQueryNodeVolumes(NoHooksLU):
1776 """Logical unit for getting volumes on node(s).
1779 _OP_REQP = ["nodes", "output_fields"]
1781 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1782 _FIELDS_STATIC = utils.FieldSet("node")
1784 def ExpandNames(self):
1785 _CheckOutputFields(static=self._FIELDS_STATIC,
1786 dynamic=self._FIELDS_DYNAMIC,
1787 selected=self.op.output_fields)
1789 self.needed_locks = {}
1790 self.share_locks[locking.LEVEL_NODE] = 1
1791 if not self.op.nodes:
1792 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1794 self.needed_locks[locking.LEVEL_NODE] = \
1795 _GetWantedNodes(self, self.op.nodes)
1797 def CheckPrereq(self):
1798 """Check prerequisites.
1800 This checks that the fields required are valid output fields.
1803 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1805 def Exec(self, feedback_fn):
1806 """Computes the list of nodes and their attributes.
1809 nodenames = self.nodes
1810 volumes = self.rpc.call_node_volumes(nodenames)
1812 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1813 in self.cfg.GetInstanceList()]
1815 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1818 for node in nodenames:
1819 if node not in volumes or volumes[node].failed or not volumes[node].data:
1822 node_vols = volumes[node].data[:]
1823 node_vols.sort(key=lambda vol: vol['dev'])
1825 for vol in node_vols:
1827 for field in self.op.output_fields:
1830 elif field == "phys":
1834 elif field == "name":
1836 elif field == "size":
1837 val = int(float(vol['size']))
1838 elif field == "instance":
1840 if node not in lv_by_node[inst]:
1842 if vol['name'] in lv_by_node[inst][node]:
1848 raise errors.ParameterError(field)
1849 node_output.append(str(val))
1851 output.append(node_output)
1856 class LUAddNode(LogicalUnit):
1857 """Logical unit for adding node to the cluster.
1861 HTYPE = constants.HTYPE_NODE
1862 _OP_REQP = ["node_name"]
1864 def BuildHooksEnv(self):
1867 This will run on all nodes before, and on all nodes + the new node after.
1871 "OP_TARGET": self.op.node_name,
1872 "NODE_NAME": self.op.node_name,
1873 "NODE_PIP": self.op.primary_ip,
1874 "NODE_SIP": self.op.secondary_ip,
1876 nodes_0 = self.cfg.GetNodeList()
1877 nodes_1 = nodes_0 + [self.op.node_name, ]
1878 return env, nodes_0, nodes_1
1880 def CheckPrereq(self):
1881 """Check prerequisites.
1884 - the new node is not already in the config
1886 - its parameters (single/dual homed) matches the cluster
1888 Any errors are signalled by raising errors.OpPrereqError.
1891 node_name = self.op.node_name
1894 dns_data = utils.HostInfo(node_name)
1896 node = dns_data.name
1897 primary_ip = self.op.primary_ip = dns_data.ip
1898 secondary_ip = getattr(self.op, "secondary_ip", None)
1899 if secondary_ip is None:
1900 secondary_ip = primary_ip
1901 if not utils.IsValidIP(secondary_ip):
1902 raise errors.OpPrereqError("Invalid secondary IP given")
1903 self.op.secondary_ip = secondary_ip
1905 node_list = cfg.GetNodeList()
1906 if not self.op.readd and node in node_list:
1907 raise errors.OpPrereqError("Node %s is already in the configuration" %
1909 elif self.op.readd and node not in node_list:
1910 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1912 for existing_node_name in node_list:
1913 existing_node = cfg.GetNodeInfo(existing_node_name)
1915 if self.op.readd and node == existing_node_name:
1916 if (existing_node.primary_ip != primary_ip or
1917 existing_node.secondary_ip != secondary_ip):
1918 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1919 " address configuration as before")
1922 if (existing_node.primary_ip == primary_ip or
1923 existing_node.secondary_ip == primary_ip or
1924 existing_node.primary_ip == secondary_ip or
1925 existing_node.secondary_ip == secondary_ip):
1926 raise errors.OpPrereqError("New node ip address(es) conflict with"
1927 " existing node %s" % existing_node.name)
1929 # check that the type of the node (single versus dual homed) is the
1930 # same as for the master
1931 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1932 master_singlehomed = myself.secondary_ip == myself.primary_ip
1933 newbie_singlehomed = secondary_ip == primary_ip
1934 if master_singlehomed != newbie_singlehomed:
1935 if master_singlehomed:
1936 raise errors.OpPrereqError("The master has no private ip but the"
1937 " new node has one")
1939 raise errors.OpPrereqError("The master has a private ip but the"
1940 " new node doesn't have one")
1942 # checks reachablity
1943 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1944 raise errors.OpPrereqError("Node not reachable by ping")
1946 if not newbie_singlehomed:
1947 # check reachability from my secondary ip to newbie's secondary ip
1948 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1949 source=myself.secondary_ip):
1950 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1951 " based ping to noded port")
1953 self.new_node = objects.Node(name=node,
1954 primary_ip=primary_ip,
1955 secondary_ip=secondary_ip)
1957 def Exec(self, feedback_fn):
1958 """Adds the new node to the cluster.
1961 new_node = self.new_node
1962 node = new_node.name
1964 # check connectivity
1965 result = self.rpc.call_version([node])[node]
1968 if constants.PROTOCOL_VERSION == result.data:
1969 logging.info("Communication to node %s fine, sw version %s match",
1972 raise errors.OpExecError("Version mismatch master version %s,"
1973 " node version %s" %
1974 (constants.PROTOCOL_VERSION, result.data))
1976 raise errors.OpExecError("Cannot get version from the new node")
1979 logging.info("Copy ssh key to node %s", node)
1980 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1982 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1983 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1989 keyarray.append(f.read())
1993 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1995 keyarray[3], keyarray[4], keyarray[5])
1997 if result.failed or not result.data:
1998 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2000 # Add node to our /etc/hosts, and add key to known_hosts
2001 utils.AddHostToEtcHosts(new_node.name)
2003 if new_node.secondary_ip != new_node.primary_ip:
2004 result = self.rpc.call_node_has_ip_address(new_node.name,
2005 new_node.secondary_ip)
2006 if result.failed or not result.data:
2007 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2008 " you gave (%s). Please fix and re-run this"
2009 " command." % new_node.secondary_ip)
2011 node_verify_list = [self.cfg.GetMasterNode()]
2012 node_verify_param = {
2014 # TODO: do a node-net-test as well?
2017 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2018 self.cfg.GetClusterName())
2019 for verifier in node_verify_list:
2020 if result[verifier].failed or not result[verifier].data:
2021 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2022 " for remote verification" % verifier)
2023 if result[verifier].data['nodelist']:
2024 for failed in result[verifier].data['nodelist']:
2025 feedback_fn("ssh/hostname verification failed %s -> %s" %
2026 (verifier, result[verifier]['nodelist'][failed]))
2027 raise errors.OpExecError("ssh/hostname verification failed.")
2029 # Distribute updated /etc/hosts and known_hosts to all nodes,
2030 # including the node just added
2031 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2032 dist_nodes = self.cfg.GetNodeList()
2033 if not self.op.readd:
2034 dist_nodes.append(node)
2035 if myself.name in dist_nodes:
2036 dist_nodes.remove(myself.name)
2038 logging.debug("Copying hosts and known_hosts to all nodes")
2039 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2040 result = self.rpc.call_upload_file(dist_nodes, fname)
2041 for to_node, to_result in result.iteritems():
2042 if to_result.failed or not to_result.data:
2043 logging.error("Copy of file %s to node %s failed", fname, to_node)
2046 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2047 to_copy.append(constants.VNC_PASSWORD_FILE)
2048 for fname in to_copy:
2049 result = self.rpc.call_upload_file([node], fname)
2050 if result[node].failed or not result[node]:
2051 logging.error("Could not copy file %s to node %s", fname, node)
2054 self.context.ReaddNode(new_node)
2056 self.context.AddNode(new_node)
2059 class LUSetNodeParams(LogicalUnit):
2060 """Modifies the parameters of a node.
2063 HPATH = "node-modify"
2064 HTYPE = constants.HTYPE_NODE
2065 _OP_REQP = ["node_name"]
2068 def CheckArguments(self):
2069 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2070 if node_name is None:
2071 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2072 self.op.node_name = node_name
2073 if not hasattr(self.op, 'master_candidate'):
2074 raise errors.OpPrereqError("Please pass at least one modification")
2075 self.op.master_candidate = bool(self.op.master_candidate)
2077 def ExpandNames(self):
2078 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2080 def BuildHooksEnv(self):
2083 This runs on the master node.
2087 "OP_TARGET": self.op.node_name,
2088 "MASTER_CANDIDATE": str(self.op.master_candidate),
2090 nl = [self.cfg.GetMasterNode(),
2094 def CheckPrereq(self):
2095 """Check prerequisites.
2097 This only checks the instance list against the existing names.
2100 force = self.force = self.op.force
2102 if self.op.master_candidate == False:
2103 if self.op.node_name == self.cfg.GetMasterNode():
2104 raise errors.OpPrereqError("The master node has to be a"
2105 " master candidate")
2106 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2107 node_info = self.cfg.GetAllNodesInfo().values()
2108 num_candidates = len([node for node in node_info
2109 if node.master_candidate])
2110 if num_candidates <= cp_size:
2111 msg = ("Not enough master candidates (desired"
2112 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2114 self.LogWarning(msg)
2116 raise errors.OpPrereqError(msg)
2120 def Exec(self, feedback_fn):
2124 node = self.cfg.GetNodeInfo(self.op.node_name)
2128 if self.op.master_candidate is not None:
2129 node.master_candidate = self.op.master_candidate
2130 result.append(("master_candidate", str(self.op.master_candidate)))
2132 # this will trigger configuration file update, if needed
2133 self.cfg.Update(node)
2134 # this will trigger job queue propagation or cleanup
2135 if self.op.node_name != self.cfg.GetMasterNode():
2136 self.context.ReaddNode(node)
2141 class LUQueryClusterInfo(NoHooksLU):
2142 """Query cluster configuration.
2148 def ExpandNames(self):
2149 self.needed_locks = {}
2151 def CheckPrereq(self):
2152 """No prerequsites needed for this LU.
2157 def Exec(self, feedback_fn):
2158 """Return cluster config.
2161 cluster = self.cfg.GetClusterInfo()
2163 "software_version": constants.RELEASE_VERSION,
2164 "protocol_version": constants.PROTOCOL_VERSION,
2165 "config_version": constants.CONFIG_VERSION,
2166 "os_api_version": constants.OS_API_VERSION,
2167 "export_version": constants.EXPORT_VERSION,
2168 "architecture": (platform.architecture()[0], platform.machine()),
2169 "name": cluster.cluster_name,
2170 "master": cluster.master_node,
2171 "default_hypervisor": cluster.default_hypervisor,
2172 "enabled_hypervisors": cluster.enabled_hypervisors,
2173 "hvparams": cluster.hvparams,
2174 "beparams": cluster.beparams,
2175 "candidate_pool_size": cluster.candidate_pool_size,
2181 class LUQueryConfigValues(NoHooksLU):
2182 """Return configuration values.
2187 _FIELDS_DYNAMIC = utils.FieldSet()
2188 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2190 def ExpandNames(self):
2191 self.needed_locks = {}
2193 _CheckOutputFields(static=self._FIELDS_STATIC,
2194 dynamic=self._FIELDS_DYNAMIC,
2195 selected=self.op.output_fields)
2197 def CheckPrereq(self):
2198 """No prerequisites.
2203 def Exec(self, feedback_fn):
2204 """Dump a representation of the cluster config to the standard output.
2208 for field in self.op.output_fields:
2209 if field == "cluster_name":
2210 entry = self.cfg.GetClusterName()
2211 elif field == "master_node":
2212 entry = self.cfg.GetMasterNode()
2213 elif field == "drain_flag":
2214 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2216 raise errors.ParameterError(field)
2217 values.append(entry)
2221 class LUActivateInstanceDisks(NoHooksLU):
2222 """Bring up an instance's disks.
2225 _OP_REQP = ["instance_name"]
2228 def ExpandNames(self):
2229 self._ExpandAndLockInstance()
2230 self.needed_locks[locking.LEVEL_NODE] = []
2231 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2233 def DeclareLocks(self, level):
2234 if level == locking.LEVEL_NODE:
2235 self._LockInstancesNodes()
2237 def CheckPrereq(self):
2238 """Check prerequisites.
2240 This checks that the instance is in the cluster.
2243 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2244 assert self.instance is not None, \
2245 "Cannot retrieve locked instance %s" % self.op.instance_name
2247 def Exec(self, feedback_fn):
2248 """Activate the disks.
2251 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2253 raise errors.OpExecError("Cannot activate block devices")
2258 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2259 """Prepare the block devices for an instance.
2261 This sets up the block devices on all nodes.
2263 @type lu: L{LogicalUnit}
2264 @param lu: the logical unit on whose behalf we execute
2265 @type instance: L{objects.Instance}
2266 @param instance: the instance for whose disks we assemble
2267 @type ignore_secondaries: boolean
2268 @param ignore_secondaries: if true, errors on secondary nodes
2269 won't result in an error return from the function
2270 @return: False if the operation failed, otherwise a list of
2271 (host, instance_visible_name, node_visible_name)
2272 with the mapping from node devices to instance devices
2277 iname = instance.name
2278 # With the two passes mechanism we try to reduce the window of
2279 # opportunity for the race condition of switching DRBD to primary
2280 # before handshaking occured, but we do not eliminate it
2282 # The proper fix would be to wait (with some limits) until the
2283 # connection has been made and drbd transitions from WFConnection
2284 # into any other network-connected state (Connected, SyncTarget,
2287 # 1st pass, assemble on all nodes in secondary mode
2288 for inst_disk in instance.disks:
2289 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2290 lu.cfg.SetDiskID(node_disk, node)
2291 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2292 if result.failed or not result:
2293 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2294 " (is_primary=False, pass=1)",
2295 inst_disk.iv_name, node)
2296 if not ignore_secondaries:
2299 # FIXME: race condition on drbd migration to primary
2301 # 2nd pass, do only the primary node
2302 for inst_disk in instance.disks:
2303 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2304 if node != instance.primary_node:
2306 lu.cfg.SetDiskID(node_disk, node)
2307 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2308 if result.failed or not result:
2309 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2310 " (is_primary=True, pass=2)",
2311 inst_disk.iv_name, node)
2313 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2315 # leave the disks configured for the primary node
2316 # this is a workaround that would be fixed better by
2317 # improving the logical/physical id handling
2318 for disk in instance.disks:
2319 lu.cfg.SetDiskID(disk, instance.primary_node)
2321 return disks_ok, device_info
2324 def _StartInstanceDisks(lu, instance, force):
2325 """Start the disks of an instance.
2328 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2329 ignore_secondaries=force)
2331 _ShutdownInstanceDisks(lu, instance)
2332 if force is not None and not force:
2333 lu.proc.LogWarning("", hint="If the message above refers to a"
2335 " you can retry the operation using '--force'.")
2336 raise errors.OpExecError("Disk consistency error")
2339 class LUDeactivateInstanceDisks(NoHooksLU):
2340 """Shutdown an instance's disks.
2343 _OP_REQP = ["instance_name"]
2346 def ExpandNames(self):
2347 self._ExpandAndLockInstance()
2348 self.needed_locks[locking.LEVEL_NODE] = []
2349 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2351 def DeclareLocks(self, level):
2352 if level == locking.LEVEL_NODE:
2353 self._LockInstancesNodes()
2355 def CheckPrereq(self):
2356 """Check prerequisites.
2358 This checks that the instance is in the cluster.
2361 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2362 assert self.instance is not None, \
2363 "Cannot retrieve locked instance %s" % self.op.instance_name
2365 def Exec(self, feedback_fn):
2366 """Deactivate the disks
2369 instance = self.instance
2370 _SafeShutdownInstanceDisks(self, instance)
2373 def _SafeShutdownInstanceDisks(lu, instance):
2374 """Shutdown block devices of an instance.
2376 This function checks if an instance is running, before calling
2377 _ShutdownInstanceDisks.
2380 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2381 [instance.hypervisor])
2382 ins_l = ins_l[instance.primary_node]
2383 if ins_l.failed or not isinstance(ins_l.data, list):
2384 raise errors.OpExecError("Can't contact node '%s'" %
2385 instance.primary_node)
2387 if instance.name in ins_l.data:
2388 raise errors.OpExecError("Instance is running, can't shutdown"
2391 _ShutdownInstanceDisks(lu, instance)
2394 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2395 """Shutdown block devices of an instance.
2397 This does the shutdown on all nodes of the instance.
2399 If the ignore_primary is false, errors on the primary node are
2404 for disk in instance.disks:
2405 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2406 lu.cfg.SetDiskID(top_disk, node)
2407 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2408 if result.failed or not result.data:
2409 logging.error("Could not shutdown block device %s on node %s",
2411 if not ignore_primary or node != instance.primary_node:
2416 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2417 """Checks if a node has enough free memory.
2419 This function check if a given node has the needed amount of free
2420 memory. In case the node has less memory or we cannot get the
2421 information from the node, this function raise an OpPrereqError
2424 @type lu: C{LogicalUnit}
2425 @param lu: a logical unit from which we get configuration data
2427 @param node: the node to check
2428 @type reason: C{str}
2429 @param reason: string to use in the error message
2430 @type requested: C{int}
2431 @param requested: the amount of memory in MiB to check for
2432 @type hypervisor: C{str}
2433 @param hypervisor: the hypervisor to ask for memory stats
2434 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2435 we cannot check the node
2438 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2439 nodeinfo[node].Raise()
2440 free_mem = nodeinfo[node].data.get('memory_free')
2441 if not isinstance(free_mem, int):
2442 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2443 " was '%s'" % (node, free_mem))
2444 if requested > free_mem:
2445 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2446 " needed %s MiB, available %s MiB" %
2447 (node, reason, requested, free_mem))
2450 class LUStartupInstance(LogicalUnit):
2451 """Starts an instance.
2454 HPATH = "instance-start"
2455 HTYPE = constants.HTYPE_INSTANCE
2456 _OP_REQP = ["instance_name", "force"]
2459 def ExpandNames(self):
2460 self._ExpandAndLockInstance()
2462 def BuildHooksEnv(self):
2465 This runs on master, primary and secondary nodes of the instance.
2469 "FORCE": self.op.force,
2471 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2472 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2473 list(self.instance.secondary_nodes))
2476 def CheckPrereq(self):
2477 """Check prerequisites.
2479 This checks that the instance is in the cluster.
2482 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2483 assert self.instance is not None, \
2484 "Cannot retrieve locked instance %s" % self.op.instance_name
2486 bep = self.cfg.GetClusterInfo().FillBE(instance)
2487 # check bridges existance
2488 _CheckInstanceBridgesExist(self, instance)
2490 _CheckNodeFreeMemory(self, instance.primary_node,
2491 "starting instance %s" % instance.name,
2492 bep[constants.BE_MEMORY], instance.hypervisor)
2494 def Exec(self, feedback_fn):
2495 """Start the instance.
2498 instance = self.instance
2499 force = self.op.force
2500 extra_args = getattr(self.op, "extra_args", "")
2502 self.cfg.MarkInstanceUp(instance.name)
2504 node_current = instance.primary_node
2506 _StartInstanceDisks(self, instance, force)
2508 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2509 if result.failed or not result.data:
2510 _ShutdownInstanceDisks(self, instance)
2511 raise errors.OpExecError("Could not start instance")
2514 class LURebootInstance(LogicalUnit):
2515 """Reboot an instance.
2518 HPATH = "instance-reboot"
2519 HTYPE = constants.HTYPE_INSTANCE
2520 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2523 def ExpandNames(self):
2524 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2525 constants.INSTANCE_REBOOT_HARD,
2526 constants.INSTANCE_REBOOT_FULL]:
2527 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2528 (constants.INSTANCE_REBOOT_SOFT,
2529 constants.INSTANCE_REBOOT_HARD,
2530 constants.INSTANCE_REBOOT_FULL))
2531 self._ExpandAndLockInstance()
2533 def BuildHooksEnv(self):
2536 This runs on master, primary and secondary nodes of the instance.
2540 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2542 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2543 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2544 list(self.instance.secondary_nodes))
2547 def CheckPrereq(self):
2548 """Check prerequisites.
2550 This checks that the instance is in the cluster.
2553 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2554 assert self.instance is not None, \
2555 "Cannot retrieve locked instance %s" % self.op.instance_name
2557 # check bridges existance
2558 _CheckInstanceBridgesExist(self, instance)
2560 def Exec(self, feedback_fn):
2561 """Reboot the instance.
2564 instance = self.instance
2565 ignore_secondaries = self.op.ignore_secondaries
2566 reboot_type = self.op.reboot_type
2567 extra_args = getattr(self.op, "extra_args", "")
2569 node_current = instance.primary_node
2571 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2572 constants.INSTANCE_REBOOT_HARD]:
2573 result = self.rpc.call_instance_reboot(node_current, instance,
2574 reboot_type, extra_args)
2575 if result.failed or not result.data:
2576 raise errors.OpExecError("Could not reboot instance")
2578 if not self.rpc.call_instance_shutdown(node_current, instance):
2579 raise errors.OpExecError("could not shutdown instance for full reboot")
2580 _ShutdownInstanceDisks(self, instance)
2581 _StartInstanceDisks(self, instance, ignore_secondaries)
2582 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2583 if result.failed or not result.data:
2584 _ShutdownInstanceDisks(self, instance)
2585 raise errors.OpExecError("Could not start instance for full reboot")
2587 self.cfg.MarkInstanceUp(instance.name)
2590 class LUShutdownInstance(LogicalUnit):
2591 """Shutdown an instance.
2594 HPATH = "instance-stop"
2595 HTYPE = constants.HTYPE_INSTANCE
2596 _OP_REQP = ["instance_name"]
2599 def ExpandNames(self):
2600 self._ExpandAndLockInstance()
2602 def BuildHooksEnv(self):
2605 This runs on master, primary and secondary nodes of the instance.
2608 env = _BuildInstanceHookEnvByObject(self, self.instance)
2609 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2610 list(self.instance.secondary_nodes))
2613 def CheckPrereq(self):
2614 """Check prerequisites.
2616 This checks that the instance is in the cluster.
2619 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2620 assert self.instance is not None, \
2621 "Cannot retrieve locked instance %s" % self.op.instance_name
2623 def Exec(self, feedback_fn):
2624 """Shutdown the instance.
2627 instance = self.instance
2628 node_current = instance.primary_node
2629 self.cfg.MarkInstanceDown(instance.name)
2630 result = self.rpc.call_instance_shutdown(node_current, instance)
2631 if result.failed or not result.data:
2632 self.proc.LogWarning("Could not shutdown instance")
2634 _ShutdownInstanceDisks(self, instance)
2637 class LUReinstallInstance(LogicalUnit):
2638 """Reinstall an instance.
2641 HPATH = "instance-reinstall"
2642 HTYPE = constants.HTYPE_INSTANCE
2643 _OP_REQP = ["instance_name"]
2646 def ExpandNames(self):
2647 self._ExpandAndLockInstance()
2649 def BuildHooksEnv(self):
2652 This runs on master, primary and secondary nodes of the instance.
2655 env = _BuildInstanceHookEnvByObject(self, self.instance)
2656 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2657 list(self.instance.secondary_nodes))
2660 def CheckPrereq(self):
2661 """Check prerequisites.
2663 This checks that the instance is in the cluster and is not running.
2666 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2667 assert instance is not None, \
2668 "Cannot retrieve locked instance %s" % self.op.instance_name
2670 if instance.disk_template == constants.DT_DISKLESS:
2671 raise errors.OpPrereqError("Instance '%s' has no disks" %
2672 self.op.instance_name)
2673 if instance.status != "down":
2674 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2675 self.op.instance_name)
2676 remote_info = self.rpc.call_instance_info(instance.primary_node,
2678 instance.hypervisor)
2679 if remote_info.failed or remote_info.data:
2680 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2681 (self.op.instance_name,
2682 instance.primary_node))
2684 self.op.os_type = getattr(self.op, "os_type", None)
2685 if self.op.os_type is not None:
2687 pnode = self.cfg.GetNodeInfo(
2688 self.cfg.ExpandNodeName(instance.primary_node))
2690 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2692 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2694 if not isinstance(result.data, objects.OS):
2695 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2696 " primary node" % self.op.os_type)
2698 self.instance = instance
2700 def Exec(self, feedback_fn):
2701 """Reinstall the instance.
2704 inst = self.instance
2706 if self.op.os_type is not None:
2707 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2708 inst.os = self.op.os_type
2709 self.cfg.Update(inst)
2711 _StartInstanceDisks(self, inst, None)
2713 feedback_fn("Running the instance OS create scripts...")
2714 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2717 raise errors.OpExecError("Could not install OS for instance %s"
2719 (inst.name, inst.primary_node))
2721 _ShutdownInstanceDisks(self, inst)
2724 class LURenameInstance(LogicalUnit):
2725 """Rename an instance.
2728 HPATH = "instance-rename"
2729 HTYPE = constants.HTYPE_INSTANCE
2730 _OP_REQP = ["instance_name", "new_name"]
2732 def BuildHooksEnv(self):
2735 This runs on master, primary and secondary nodes of the instance.
2738 env = _BuildInstanceHookEnvByObject(self, self.instance)
2739 env["INSTANCE_NEW_NAME"] = self.op.new_name
2740 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2741 list(self.instance.secondary_nodes))
2744 def CheckPrereq(self):
2745 """Check prerequisites.
2747 This checks that the instance is in the cluster and is not running.
2750 instance = self.cfg.GetInstanceInfo(
2751 self.cfg.ExpandInstanceName(self.op.instance_name))
2752 if instance is None:
2753 raise errors.OpPrereqError("Instance '%s' not known" %
2754 self.op.instance_name)
2755 if instance.status != "down":
2756 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2757 self.op.instance_name)
2758 remote_info = self.rpc.call_instance_info(instance.primary_node,
2760 instance.hypervisor)
2762 if remote_info.data:
2763 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2764 (self.op.instance_name,
2765 instance.primary_node))
2766 self.instance = instance
2768 # new name verification
2769 name_info = utils.HostInfo(self.op.new_name)
2771 self.op.new_name = new_name = name_info.name
2772 instance_list = self.cfg.GetInstanceList()
2773 if new_name in instance_list:
2774 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2777 if not getattr(self.op, "ignore_ip", False):
2778 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2779 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2780 (name_info.ip, new_name))
2783 def Exec(self, feedback_fn):
2784 """Reinstall the instance.
2787 inst = self.instance
2788 old_name = inst.name
2790 if inst.disk_template == constants.DT_FILE:
2791 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2793 self.cfg.RenameInstance(inst.name, self.op.new_name)
2794 # Change the instance lock. This is definitely safe while we hold the BGL
2795 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2796 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2798 # re-read the instance from the configuration after rename
2799 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2801 if inst.disk_template == constants.DT_FILE:
2802 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2803 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2804 old_file_storage_dir,
2805 new_file_storage_dir)
2808 raise errors.OpExecError("Could not connect to node '%s' to rename"
2809 " directory '%s' to '%s' (but the instance"
2810 " has been renamed in Ganeti)" % (
2811 inst.primary_node, old_file_storage_dir,
2812 new_file_storage_dir))
2814 if not result.data[0]:
2815 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2816 " (but the instance has been renamed in"
2817 " Ganeti)" % (old_file_storage_dir,
2818 new_file_storage_dir))
2820 _StartInstanceDisks(self, inst, None)
2822 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2824 if result.failed or not result.data:
2825 msg = ("Could not run OS rename script for instance %s on node %s"
2826 " (but the instance has been renamed in Ganeti)" %
2827 (inst.name, inst.primary_node))
2828 self.proc.LogWarning(msg)
2830 _ShutdownInstanceDisks(self, inst)
2833 class LURemoveInstance(LogicalUnit):
2834 """Remove an instance.
2837 HPATH = "instance-remove"
2838 HTYPE = constants.HTYPE_INSTANCE
2839 _OP_REQP = ["instance_name", "ignore_failures"]
2842 def ExpandNames(self):
2843 self._ExpandAndLockInstance()
2844 self.needed_locks[locking.LEVEL_NODE] = []
2845 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2847 def DeclareLocks(self, level):
2848 if level == locking.LEVEL_NODE:
2849 self._LockInstancesNodes()
2851 def BuildHooksEnv(self):
2854 This runs on master, primary and secondary nodes of the instance.
2857 env = _BuildInstanceHookEnvByObject(self, self.instance)
2858 nl = [self.cfg.GetMasterNode()]
2861 def CheckPrereq(self):
2862 """Check prerequisites.
2864 This checks that the instance is in the cluster.
2867 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2868 assert self.instance is not None, \
2869 "Cannot retrieve locked instance %s" % self.op.instance_name
2871 def Exec(self, feedback_fn):
2872 """Remove the instance.
2875 instance = self.instance
2876 logging.info("Shutting down instance %s on node %s",
2877 instance.name, instance.primary_node)
2879 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2880 if result.failed or not result.data:
2881 if self.op.ignore_failures:
2882 feedback_fn("Warning: can't shutdown instance")
2884 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2885 (instance.name, instance.primary_node))
2887 logging.info("Removing block devices for instance %s", instance.name)
2889 if not _RemoveDisks(self, instance):
2890 if self.op.ignore_failures:
2891 feedback_fn("Warning: can't remove instance's disks")
2893 raise errors.OpExecError("Can't remove instance's disks")
2895 logging.info("Removing instance %s out of cluster config", instance.name)
2897 self.cfg.RemoveInstance(instance.name)
2898 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2901 class LUQueryInstances(NoHooksLU):
2902 """Logical unit for querying instances.
2905 _OP_REQP = ["output_fields", "names"]
2907 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2908 "admin_state", "admin_ram",
2909 "disk_template", "ip", "mac", "bridge",
2910 "sda_size", "sdb_size", "vcpus", "tags",
2911 "network_port", "beparams",
2912 "(disk).(size)/([0-9]+)",
2914 "(nic).(mac|ip|bridge)/([0-9]+)",
2915 "(nic).(macs|ips|bridges)",
2916 "(disk|nic).(count)",
2917 "serial_no", "hypervisor", "hvparams",] +
2919 for name in constants.HVS_PARAMETERS] +
2921 for name in constants.BES_PARAMETERS])
2922 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2925 def ExpandNames(self):
2926 _CheckOutputFields(static=self._FIELDS_STATIC,
2927 dynamic=self._FIELDS_DYNAMIC,
2928 selected=self.op.output_fields)
2930 self.needed_locks = {}
2931 self.share_locks[locking.LEVEL_INSTANCE] = 1
2932 self.share_locks[locking.LEVEL_NODE] = 1
2935 self.wanted = _GetWantedInstances(self, self.op.names)
2937 self.wanted = locking.ALL_SET
2939 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2941 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2942 self.needed_locks[locking.LEVEL_NODE] = []
2943 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2945 def DeclareLocks(self, level):
2946 if level == locking.LEVEL_NODE and self.do_locking:
2947 self._LockInstancesNodes()
2949 def CheckPrereq(self):
2950 """Check prerequisites.
2955 def Exec(self, feedback_fn):
2956 """Computes the list of nodes and their attributes.
2959 all_info = self.cfg.GetAllInstancesInfo()
2961 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2962 elif self.wanted != locking.ALL_SET:
2963 instance_names = self.wanted
2964 missing = set(instance_names).difference(all_info.keys())
2966 raise errors.OpExecError(
2967 "Some instances were removed before retrieving their data: %s"
2970 instance_names = all_info.keys()
2972 instance_names = utils.NiceSort(instance_names)
2973 instance_list = [all_info[iname] for iname in instance_names]
2975 # begin data gathering
2977 nodes = frozenset([inst.primary_node for inst in instance_list])
2978 hv_list = list(set([inst.hypervisor for inst in instance_list]))
2983 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2985 result = node_data[name]
2987 bad_nodes.append(name)
2990 live_data.update(result.data)
2991 # else no instance is alive
2993 live_data = dict([(name, {}) for name in instance_names])
2995 # end data gathering
3000 for instance in instance_list:
3002 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3003 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3004 for field in self.op.output_fields:
3005 st_match = self._FIELDS_STATIC.Matches(field)
3010 elif field == "pnode":
3011 val = instance.primary_node
3012 elif field == "snodes":
3013 val = list(instance.secondary_nodes)
3014 elif field == "admin_state":
3015 val = (instance.status != "down")
3016 elif field == "oper_state":
3017 if instance.primary_node in bad_nodes:
3020 val = bool(live_data.get(instance.name))
3021 elif field == "status":
3022 if instance.primary_node in bad_nodes:
3023 val = "ERROR_nodedown"
3025 running = bool(live_data.get(instance.name))
3027 if instance.status != "down":
3032 if instance.status != "down":
3036 elif field == "oper_ram":
3037 if instance.primary_node in bad_nodes:
3039 elif instance.name in live_data:
3040 val = live_data[instance.name].get("memory", "?")
3043 elif field == "disk_template":
3044 val = instance.disk_template
3046 val = instance.nics[0].ip
3047 elif field == "bridge":
3048 val = instance.nics[0].bridge
3049 elif field == "mac":
3050 val = instance.nics[0].mac
3051 elif field == "sda_size" or field == "sdb_size":
3052 idx = ord(field[2]) - ord('a')
3054 val = instance.FindDisk(idx).size
3055 except errors.OpPrereqError:
3057 elif field == "tags":
3058 val = list(instance.GetTags())
3059 elif field == "serial_no":
3060 val = instance.serial_no
3061 elif field == "network_port":
3062 val = instance.network_port
3063 elif field == "hypervisor":
3064 val = instance.hypervisor
3065 elif field == "hvparams":
3067 elif (field.startswith(HVPREFIX) and
3068 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3069 val = i_hv.get(field[len(HVPREFIX):], None)
3070 elif field == "beparams":
3072 elif (field.startswith(BEPREFIX) and
3073 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3074 val = i_be.get(field[len(BEPREFIX):], None)
3075 elif st_match and st_match.groups():
3076 # matches a variable list
3077 st_groups = st_match.groups()
3078 if st_groups and st_groups[0] == "disk":
3079 if st_groups[1] == "count":
3080 val = len(instance.disks)
3081 elif st_groups[1] == "sizes":
3082 val = [disk.size for disk in instance.disks]
3083 elif st_groups[1] == "size":
3085 val = instance.FindDisk(st_groups[2]).size
3086 except errors.OpPrereqError:
3089 assert False, "Unhandled disk parameter"
3090 elif st_groups[0] == "nic":
3091 if st_groups[1] == "count":
3092 val = len(instance.nics)
3093 elif st_groups[1] == "macs":
3094 val = [nic.mac for nic in instance.nics]
3095 elif st_groups[1] == "ips":
3096 val = [nic.ip for nic in instance.nics]
3097 elif st_groups[1] == "bridges":
3098 val = [nic.bridge for nic in instance.nics]
3101 nic_idx = int(st_groups[2])
3102 if nic_idx >= len(instance.nics):
3105 if st_groups[1] == "mac":
3106 val = instance.nics[nic_idx].mac
3107 elif st_groups[1] == "ip":
3108 val = instance.nics[nic_idx].ip
3109 elif st_groups[1] == "bridge":
3110 val = instance.nics[nic_idx].bridge
3112 assert False, "Unhandled NIC parameter"
3114 assert False, "Unhandled variable parameter"
3116 raise errors.ParameterError(field)
3123 class LUFailoverInstance(LogicalUnit):
3124 """Failover an instance.
3127 HPATH = "instance-failover"
3128 HTYPE = constants.HTYPE_INSTANCE
3129 _OP_REQP = ["instance_name", "ignore_consistency"]
3132 def ExpandNames(self):
3133 self._ExpandAndLockInstance()
3134 self.needed_locks[locking.LEVEL_NODE] = []
3135 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3137 def DeclareLocks(self, level):
3138 if level == locking.LEVEL_NODE:
3139 self._LockInstancesNodes()
3141 def BuildHooksEnv(self):
3144 This runs on master, primary and secondary nodes of the instance.
3148 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3150 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3151 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3154 def CheckPrereq(self):
3155 """Check prerequisites.
3157 This checks that the instance is in the cluster.
3160 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3161 assert self.instance is not None, \
3162 "Cannot retrieve locked instance %s" % self.op.instance_name
3164 bep = self.cfg.GetClusterInfo().FillBE(instance)
3165 if instance.disk_template not in constants.DTS_NET_MIRROR:
3166 raise errors.OpPrereqError("Instance's disk layout is not"
3167 " network mirrored, cannot failover.")
3169 secondary_nodes = instance.secondary_nodes
3170 if not secondary_nodes:
3171 raise errors.ProgrammerError("no secondary node but using "
3172 "a mirrored disk template")
3174 target_node = secondary_nodes[0]
3175 # check memory requirements on the secondary node
3176 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3177 instance.name, bep[constants.BE_MEMORY],
3178 instance.hypervisor)
3180 # check bridge existance
3181 brlist = [nic.bridge for nic in instance.nics]
3182 result = self.rpc.call_bridges_exist(target_node, brlist)
3185 raise errors.OpPrereqError("One or more target bridges %s does not"
3186 " exist on destination node '%s'" %
3187 (brlist, target_node))
3189 def Exec(self, feedback_fn):
3190 """Failover an instance.
3192 The failover is done by shutting it down on its present node and
3193 starting it on the secondary.
3196 instance = self.instance
3198 source_node = instance.primary_node
3199 target_node = instance.secondary_nodes[0]
3201 feedback_fn("* checking disk consistency between source and target")
3202 for dev in instance.disks:
3203 # for drbd, these are drbd over lvm
3204 if not _CheckDiskConsistency(self, dev, target_node, False):
3205 if instance.status == "up" and not self.op.ignore_consistency:
3206 raise errors.OpExecError("Disk %s is degraded on target node,"
3207 " aborting failover." % dev.iv_name)
3209 feedback_fn("* shutting down instance on source node")
3210 logging.info("Shutting down instance %s on node %s",
3211 instance.name, source_node)
3213 result = self.rpc.call_instance_shutdown(source_node, instance)
3214 if result.failed or not result.data:
3215 if self.op.ignore_consistency:
3216 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3218 " anyway. Please make sure node %s is down",
3219 instance.name, source_node, source_node)
3221 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3222 (instance.name, source_node))
3224 feedback_fn("* deactivating the instance's disks on source node")
3225 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3226 raise errors.OpExecError("Can't shut down the instance's disks.")
3228 instance.primary_node = target_node
3229 # distribute new instance config to the other nodes
3230 self.cfg.Update(instance)
3232 # Only start the instance if it's marked as up
3233 if instance.status == "up":
3234 feedback_fn("* activating the instance's disks on target node")
3235 logging.info("Starting instance %s on node %s",
3236 instance.name, target_node)
3238 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3239 ignore_secondaries=True)
3241 _ShutdownInstanceDisks(self, instance)
3242 raise errors.OpExecError("Can't activate the instance's disks")
3244 feedback_fn("* starting the instance on the target node")
3245 result = self.rpc.call_instance_start(target_node, instance, None)
3246 if result.failed or not result.data:
3247 _ShutdownInstanceDisks(self, instance)
3248 raise errors.OpExecError("Could not start instance %s on node %s." %
3249 (instance.name, target_node))
3252 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3253 """Create a tree of block devices on the primary node.
3255 This always creates all devices.
3259 for child in device.children:
3260 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3263 lu.cfg.SetDiskID(device, node)
3264 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3265 instance.name, True, info)
3266 if new_id.failed or not new_id.data:
3268 if device.physical_id is None:
3269 device.physical_id = new_id
3273 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3274 """Create a tree of block devices on a secondary node.
3276 If this device type has to be created on secondaries, create it and
3279 If not, just recurse to children keeping the same 'force' value.
3282 if device.CreateOnSecondary():
3285 for child in device.children:
3286 if not _CreateBlockDevOnSecondary(lu, node, instance,
3287 child, force, info):
3292 lu.cfg.SetDiskID(device, node)
3293 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3294 instance.name, False, info)
3295 if new_id.failed or not new_id.data:
3297 if device.physical_id is None:
3298 device.physical_id = new_id
3302 def _GenerateUniqueNames(lu, exts):
3303 """Generate a suitable LV name.
3305 This will generate a logical volume name for the given instance.
3310 new_id = lu.cfg.GenerateUniqueID()
3311 results.append("%s%s" % (new_id, val))
3315 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3317 """Generate a drbd8 device complete with its children.
3320 port = lu.cfg.AllocatePort()
3321 vgname = lu.cfg.GetVGName()
3322 shared_secret = lu.cfg.GenerateDRBDSecret()
3323 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3324 logical_id=(vgname, names[0]))
3325 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3326 logical_id=(vgname, names[1]))
3327 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3328 logical_id=(primary, secondary, port,
3331 children=[dev_data, dev_meta],
3336 def _GenerateDiskTemplate(lu, template_name,
3337 instance_name, primary_node,
3338 secondary_nodes, disk_info,
3339 file_storage_dir, file_driver,
3341 """Generate the entire disk layout for a given template type.
3344 #TODO: compute space requirements
3346 vgname = lu.cfg.GetVGName()
3347 disk_count = len(disk_info)
3349 if template_name == constants.DT_DISKLESS:
3351 elif template_name == constants.DT_PLAIN:
3352 if len(secondary_nodes) != 0:
3353 raise errors.ProgrammerError("Wrong template configuration")
3355 names = _GenerateUniqueNames(lu, [".disk%d" % i
3356 for i in range(disk_count)])
3357 for idx, disk in enumerate(disk_info):
3358 disk_index = idx + base_index
3359 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3360 logical_id=(vgname, names[idx]),
3361 iv_name="disk/%d" % disk_index)
3362 disks.append(disk_dev)
3363 elif template_name == constants.DT_DRBD8:
3364 if len(secondary_nodes) != 1:
3365 raise errors.ProgrammerError("Wrong template configuration")
3366 remote_node = secondary_nodes[0]
3367 minors = lu.cfg.AllocateDRBDMinor(
3368 [primary_node, remote_node] * len(disk_info), instance_name)
3370 names = _GenerateUniqueNames(lu,
3371 [".disk%d_%s" % (i, s)
3372 for i in range(disk_count)
3373 for s in ("data", "meta")
3375 for idx, disk in enumerate(disk_info):
3376 disk_index = idx + base_index
3377 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3378 disk["size"], names[idx*2:idx*2+2],
3379 "disk/%d" % disk_index,
3380 minors[idx*2], minors[idx*2+1])
3381 disks.append(disk_dev)
3382 elif template_name == constants.DT_FILE:
3383 if len(secondary_nodes) != 0:
3384 raise errors.ProgrammerError("Wrong template configuration")
3386 for idx, disk in enumerate(disk_info):
3387 disk_index = idx + base_index
3388 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3389 iv_name="disk/%d" % disk_index,
3390 logical_id=(file_driver,
3391 "%s/disk%d" % (file_storage_dir,
3393 disks.append(disk_dev)
3395 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3399 def _GetInstanceInfoText(instance):
3400 """Compute that text that should be added to the disk's metadata.
3403 return "originstname+%s" % instance.name
3406 def _CreateDisks(lu, instance):
3407 """Create all disks for an instance.
3409 This abstracts away some work from AddInstance.
3411 @type lu: L{LogicalUnit}
3412 @param lu: the logical unit on whose behalf we execute
3413 @type instance: L{objects.Instance}
3414 @param instance: the instance whose disks we should create
3416 @return: the success of the creation
3419 info = _GetInstanceInfoText(instance)
3421 if instance.disk_template == constants.DT_FILE:
3422 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3423 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3426 if result.failed or not result.data:
3427 logging.error("Could not connect to node '%s'", instance.primary_node)
3430 if not result.data[0]:
3431 logging.error("Failed to create directory '%s'", file_storage_dir)
3434 # Note: this needs to be kept in sync with adding of disks in
3435 # LUSetInstanceParams
3436 for device in instance.disks:
3437 logging.info("Creating volume %s for instance %s",
3438 device.iv_name, instance.name)
3440 for secondary_node in instance.secondary_nodes:
3441 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3442 device, False, info):
3443 logging.error("Failed to create volume %s (%s) on secondary node %s!",
3444 device.iv_name, device, secondary_node)
3447 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3448 instance, device, info):
3449 logging.error("Failed to create volume %s on primary!", device.iv_name)
3455 def _RemoveDisks(lu, instance):
3456 """Remove all disks for an instance.
3458 This abstracts away some work from `AddInstance()` and
3459 `RemoveInstance()`. Note that in case some of the devices couldn't
3460 be removed, the removal will continue with the other ones (compare
3461 with `_CreateDisks()`).
3463 @type lu: L{LogicalUnit}
3464 @param lu: the logical unit on whose behalf we execute
3465 @type instance: L{objects.Instance}
3466 @param instance: the instance whose disks we should remove
3468 @return: the success of the removal
3471 logging.info("Removing block devices for instance %s", instance.name)
3474 for device in instance.disks:
3475 for node, disk in device.ComputeNodeTree(instance.primary_node):
3476 lu.cfg.SetDiskID(disk, node)
3477 result = lu.rpc.call_blockdev_remove(node, disk)
3478 if result.failed or not result.data:
3479 lu.proc.LogWarning("Could not remove block device %s on node %s,"
3480 " continuing anyway", device.iv_name, node)
3483 if instance.disk_template == constants.DT_FILE:
3484 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3485 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3487 if result.failed or not result.data:
3488 logging.error("Could not remove directory '%s'", file_storage_dir)
3494 def _ComputeDiskSize(disk_template, disks):
3495 """Compute disk size requirements in the volume group
3498 # Required free disk space as a function of disk and swap space
3500 constants.DT_DISKLESS: None,
3501 constants.DT_PLAIN: sum(d["size"] for d in disks),
3502 # 128 MB are added for drbd metadata for each disk
3503 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3504 constants.DT_FILE: None,
3507 if disk_template not in req_size_dict:
3508 raise errors.ProgrammerError("Disk template '%s' size requirement"
3509 " is unknown" % disk_template)
3511 return req_size_dict[disk_template]
3514 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3515 """Hypervisor parameter validation.
3517 This function abstract the hypervisor parameter validation to be
3518 used in both instance create and instance modify.
3520 @type lu: L{LogicalUnit}
3521 @param lu: the logical unit for which we check
3522 @type nodenames: list
3523 @param nodenames: the list of nodes on which we should check
3524 @type hvname: string
3525 @param hvname: the name of the hypervisor we should use
3526 @type hvparams: dict
3527 @param hvparams: the parameters which we need to check
3528 @raise errors.OpPrereqError: if the parameters are not valid
3531 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3534 for node in nodenames:
3537 if not info.data or not isinstance(info.data, (tuple, list)):
3538 raise errors.OpPrereqError("Cannot get current information"
3539 " from node '%s' (%s)" % (node, info.data))
3540 if not info.data[0]:
3541 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3542 " %s" % info.data[1])
3545 class LUCreateInstance(LogicalUnit):
3546 """Create an instance.
3549 HPATH = "instance-add"
3550 HTYPE = constants.HTYPE_INSTANCE
3551 _OP_REQP = ["instance_name", "disks", "disk_template",
3553 "wait_for_sync", "ip_check", "nics",
3554 "hvparams", "beparams"]
3557 def _ExpandNode(self, node):
3558 """Expands and checks one node name.
3561 node_full = self.cfg.ExpandNodeName(node)
3562 if node_full is None:
3563 raise errors.OpPrereqError("Unknown node %s" % node)
3566 def ExpandNames(self):
3567 """ExpandNames for CreateInstance.
3569 Figure out the right locks for instance creation.
3572 self.needed_locks = {}
3574 # set optional parameters to none if they don't exist
3575 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3576 if not hasattr(self.op, attr):
3577 setattr(self.op, attr, None)
3579 # cheap checks, mostly valid constants given
3581 # verify creation mode
3582 if self.op.mode not in (constants.INSTANCE_CREATE,
3583 constants.INSTANCE_IMPORT):
3584 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3587 # disk template and mirror node verification
3588 if self.op.disk_template not in constants.DISK_TEMPLATES:
3589 raise errors.OpPrereqError("Invalid disk template name")
3591 if self.op.hypervisor is None:
3592 self.op.hypervisor = self.cfg.GetHypervisorType()
3594 cluster = self.cfg.GetClusterInfo()
3595 enabled_hvs = cluster.enabled_hypervisors
3596 if self.op.hypervisor not in enabled_hvs:
3597 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3598 " cluster (%s)" % (self.op.hypervisor,
3599 ",".join(enabled_hvs)))
3601 # check hypervisor parameter syntax (locally)
3603 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3605 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3606 hv_type.CheckParameterSyntax(filled_hvp)
3608 # fill and remember the beparams dict
3609 utils.CheckBEParams(self.op.beparams)
3610 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3613 #### instance parameters check
3615 # instance name verification
3616 hostname1 = utils.HostInfo(self.op.instance_name)
3617 self.op.instance_name = instance_name = hostname1.name
3619 # this is just a preventive check, but someone might still add this
3620 # instance in the meantime, and creation will fail at lock-add time
3621 if instance_name in self.cfg.GetInstanceList():
3622 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3625 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3629 for nic in self.op.nics:
3630 # ip validity checks
3631 ip = nic.get("ip", None)
3632 if ip is None or ip.lower() == "none":
3634 elif ip.lower() == constants.VALUE_AUTO:
3635 nic_ip = hostname1.ip
3637 if not utils.IsValidIP(ip):
3638 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3639 " like a valid IP" % ip)
3642 # MAC address verification
3643 mac = nic.get("mac", constants.VALUE_AUTO)
3644 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3645 if not utils.IsValidMac(mac.lower()):
3646 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3648 # bridge verification
3649 bridge = nic.get("bridge", self.cfg.GetDefBridge())
3650 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3652 # disk checks/pre-build
3654 for disk in self.op.disks:
3655 mode = disk.get("mode", constants.DISK_RDWR)
3656 if mode not in constants.DISK_ACCESS_SET:
3657 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3659 size = disk.get("size", None)
3661 raise errors.OpPrereqError("Missing disk size")
3665 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3666 self.disks.append({"size": size, "mode": mode})
3668 # used in CheckPrereq for ip ping check
3669 self.check_ip = hostname1.ip
3671 # file storage checks
3672 if (self.op.file_driver and
3673 not self.op.file_driver in constants.FILE_DRIVER):
3674 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3675 self.op.file_driver)
3677 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3678 raise errors.OpPrereqError("File storage directory path not absolute")
3680 ### Node/iallocator related checks
3681 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3682 raise errors.OpPrereqError("One and only one of iallocator and primary"
3683 " node must be given")
3685 if self.op.iallocator:
3686 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3688 self.op.pnode = self._ExpandNode(self.op.pnode)
3689 nodelist = [self.op.pnode]
3690 if self.op.snode is not None:
3691 self.op.snode = self._ExpandNode(self.op.snode)
3692 nodelist.append(self.op.snode)
3693 self.needed_locks[locking.LEVEL_NODE] = nodelist
3695 # in case of import lock the source node too
3696 if self.op.mode == constants.INSTANCE_IMPORT:
3697 src_node = getattr(self.op, "src_node", None)
3698 src_path = getattr(self.op, "src_path", None)
3700 if src_path is None:
3701 self.op.src_path = src_path = self.op.instance_name
3703 if src_node is None:
3704 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3705 self.op.src_node = None
3706 if os.path.isabs(src_path):
3707 raise errors.OpPrereqError("Importing an instance from an absolute"
3708 " path requires a source node option.")
3710 self.op.src_node = src_node = self._ExpandNode(src_node)
3711 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3712 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3713 if not os.path.isabs(src_path):
3714 self.op.src_path = src_path = \
3715 os.path.join(constants.EXPORT_DIR, src_path)
3717 else: # INSTANCE_CREATE
3718 if getattr(self.op, "os_type", None) is None:
3719 raise errors.OpPrereqError("No guest OS specified")
3721 def _RunAllocator(self):
3722 """Run the allocator based on input opcode.
3725 nics = [n.ToDict() for n in self.nics]
3726 ial = IAllocator(self,
3727 mode=constants.IALLOCATOR_MODE_ALLOC,
3728 name=self.op.instance_name,
3729 disk_template=self.op.disk_template,
3732 vcpus=self.be_full[constants.BE_VCPUS],
3733 mem_size=self.be_full[constants.BE_MEMORY],
3736 hypervisor=self.op.hypervisor,
3739 ial.Run(self.op.iallocator)
3742 raise errors.OpPrereqError("Can't compute nodes using"
3743 " iallocator '%s': %s" % (self.op.iallocator,
3745 if len(ial.nodes) != ial.required_nodes:
3746 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3747 " of nodes (%s), required %s" %
3748 (self.op.iallocator, len(ial.nodes),
3749 ial.required_nodes))
3750 self.op.pnode = ial.nodes[0]
3751 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3752 self.op.instance_name, self.op.iallocator,
3753 ", ".join(ial.nodes))
3754 if ial.required_nodes == 2:
3755 self.op.snode = ial.nodes[1]
3757 def BuildHooksEnv(self):
3760 This runs on master, primary and secondary nodes of the instance.
3764 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3765 "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3766 "INSTANCE_ADD_MODE": self.op.mode,
3768 if self.op.mode == constants.INSTANCE_IMPORT:
3769 env["INSTANCE_SRC_NODE"] = self.op.src_node
3770 env["INSTANCE_SRC_PATH"] = self.op.src_path
3771 env["INSTANCE_SRC_IMAGES"] = self.src_images
3773 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3774 primary_node=self.op.pnode,
3775 secondary_nodes=self.secondaries,
3776 status=self.instance_status,
3777 os_type=self.op.os_type,
3778 memory=self.be_full[constants.BE_MEMORY],
3779 vcpus=self.be_full[constants.BE_VCPUS],
3780 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3783 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3788 def CheckPrereq(self):
3789 """Check prerequisites.
3792 if (not self.cfg.GetVGName() and
3793 self.op.disk_template not in constants.DTS_NOT_LVM):
3794 raise errors.OpPrereqError("Cluster does not support lvm-based"
3798 if self.op.mode == constants.INSTANCE_IMPORT:
3799 src_node = self.op.src_node
3800 src_path = self.op.src_path
3802 if src_node is None:
3803 exp_list = self.rpc.call_export_list(
3804 self.acquired_locks[locking.LEVEL_NODE])
3806 for node in exp_list:
3807 if not exp_list[node].failed and src_path in exp_list[node].data:
3809 self.op.src_node = src_node = node
3810 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3814 raise errors.OpPrereqError("No export found for relative path %s" %
3817 result = self.rpc.call_export_info(src_node, src_path)
3820 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3822 export_info = result.data
3823 if not export_info.has_section(constants.INISECT_EXP):
3824 raise errors.ProgrammerError("Corrupted export config")
3826 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3827 if (int(ei_version) != constants.EXPORT_VERSION):
3828 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3829 (ei_version, constants.EXPORT_VERSION))
3831 # Check that the new instance doesn't have less disks than the export
3832 instance_disks = len(self.disks)
3833 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3834 if instance_disks < export_disks:
3835 raise errors.OpPrereqError("Not enough disks to import."
3836 " (instance: %d, export: %d)" %
3837 (instance_disks, export_disks))
3839 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3841 for idx in range(export_disks):
3842 option = 'disk%d_dump' % idx
3843 if export_info.has_option(constants.INISECT_INS, option):
3844 # FIXME: are the old os-es, disk sizes, etc. useful?
3845 export_name = export_info.get(constants.INISECT_INS, option)
3846 image = os.path.join(src_path, export_name)
3847 disk_images.append(image)
3849 disk_images.append(False)
3851 self.src_images = disk_images
3853 old_name = export_info.get(constants.INISECT_INS, 'name')
3854 # FIXME: int() here could throw a ValueError on broken exports
3855 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3856 if self.op.instance_name == old_name:
3857 for idx, nic in enumerate(self.nics):
3858 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3859 nic_mac_ini = 'nic%d_mac' % idx
3860 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3862 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3863 if self.op.start and not self.op.ip_check:
3864 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3865 " adding an instance in start mode")
3867 if self.op.ip_check:
3868 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3869 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3870 (self.check_ip, self.op.instance_name))
3874 if self.op.iallocator is not None:
3875 self._RunAllocator()
3877 #### node related checks
3879 # check primary node
3880 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3881 assert self.pnode is not None, \
3882 "Cannot retrieve locked node %s" % self.op.pnode
3883 self.secondaries = []
3885 # mirror node verification
3886 if self.op.disk_template in constants.DTS_NET_MIRROR:
3887 if self.op.snode is None:
3888 raise errors.OpPrereqError("The networked disk templates need"
3890 if self.op.snode == pnode.name:
3891 raise errors.OpPrereqError("The secondary node cannot be"
3892 " the primary node.")
3893 self.secondaries.append(self.op.snode)
3895 nodenames = [pnode.name] + self.secondaries
3897 req_size = _ComputeDiskSize(self.op.disk_template,
3900 # Check lv size requirements
3901 if req_size is not None:
3902 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3904 for node in nodenames:
3905 info = nodeinfo[node]
3909 raise errors.OpPrereqError("Cannot get current information"
3910 " from node '%s'" % node)
3911 vg_free = info.get('vg_free', None)
3912 if not isinstance(vg_free, int):
3913 raise errors.OpPrereqError("Can't compute free disk space on"
3915 if req_size > info['vg_free']:
3916 raise errors.OpPrereqError("Not enough disk space on target node %s."
3917 " %d MB available, %d MB required" %
3918 (node, info['vg_free'], req_size))
3920 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3923 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3925 if not isinstance(result.data, objects.OS):
3926 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3927 " primary node" % self.op.os_type)
3929 # bridge check on primary node
3930 bridges = [n.bridge for n in self.nics]
3931 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3934 raise errors.OpPrereqError("One of the target bridges '%s' does not"
3935 " exist on destination node '%s'" %
3936 (",".join(bridges), pnode.name))
3938 # memory check on primary node
3940 _CheckNodeFreeMemory(self, self.pnode.name,
3941 "creating instance %s" % self.op.instance_name,
3942 self.be_full[constants.BE_MEMORY],
3946 self.instance_status = 'up'
3948 self.instance_status = 'down'
3950 def Exec(self, feedback_fn):
3951 """Create and add the instance to the cluster.
3954 instance = self.op.instance_name
3955 pnode_name = self.pnode.name
3957 for nic in self.nics:
3958 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3959 nic.mac = self.cfg.GenerateMAC()
3961 ht_kind = self.op.hypervisor
3962 if ht_kind in constants.HTS_REQ_PORT:
3963 network_port = self.cfg.AllocatePort()
3967 ##if self.op.vnc_bind_address is None:
3968 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3970 # this is needed because os.path.join does not accept None arguments
3971 if self.op.file_storage_dir is None:
3972 string_file_storage_dir = ""
3974 string_file_storage_dir = self.op.file_storage_dir
3976 # build the full file storage dir path
3977 file_storage_dir = os.path.normpath(os.path.join(
3978 self.cfg.GetFileStorageDir(),
3979 string_file_storage_dir, instance))
3982 disks = _GenerateDiskTemplate(self,
3983 self.op.disk_template,
3984 instance, pnode_name,
3988 self.op.file_driver,
3991 iobj = objects.Instance(name=instance, os=self.op.os_type,
3992 primary_node=pnode_name,
3993 nics=self.nics, disks=disks,
3994 disk_template=self.op.disk_template,
3995 status=self.instance_status,
3996 network_port=network_port,
3997 beparams=self.op.beparams,
3998 hvparams=self.op.hvparams,
3999 hypervisor=self.op.hypervisor,
4002 feedback_fn("* creating instance disks...")
4003 if not _CreateDisks(self, iobj):
4004 _RemoveDisks(self, iobj)
4005 self.cfg.ReleaseDRBDMinors(instance)
4006 raise errors.OpExecError("Device creation failed, reverting...")
4008 feedback_fn("adding instance %s to cluster config" % instance)
4010 self.cfg.AddInstance(iobj)
4011 # Declare that we don't want to remove the instance lock anymore, as we've
4012 # added the instance to the config
4013 del self.remove_locks[locking.LEVEL_INSTANCE]
4014 # Remove the temp. assignements for the instance's drbds
4015 self.cfg.ReleaseDRBDMinors(instance)
4016 # Unlock all the nodes
4017 if self.op.mode == constants.INSTANCE_IMPORT:
4018 nodes_keep = [self.op.src_node]
4019 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4020 if node != self.op.src_node]
4021 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4022 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4024 self.context.glm.release(locking.LEVEL_NODE)
4025 del self.acquired_locks[locking.LEVEL_NODE]
4027 if self.op.wait_for_sync:
4028 disk_abort = not _WaitForSync(self, iobj)
4029 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4030 # make sure the disks are not degraded (still sync-ing is ok)
4032 feedback_fn("* checking mirrors status")
4033 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4038 _RemoveDisks(self, iobj)
4039 self.cfg.RemoveInstance(iobj.name)
4040 # Make sure the instance lock gets removed
4041 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4042 raise errors.OpExecError("There are some degraded disks for"
4045 feedback_fn("creating os for instance %s on node %s" %
4046 (instance, pnode_name))
4048 if iobj.disk_template != constants.DT_DISKLESS:
4049 if self.op.mode == constants.INSTANCE_CREATE:
4050 feedback_fn("* running the instance OS create scripts...")
4051 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4054 raise errors.OpExecError("Could not add os for instance %s"
4056 (instance, pnode_name))
4058 elif self.op.mode == constants.INSTANCE_IMPORT:
4059 feedback_fn("* running the instance OS import scripts...")
4060 src_node = self.op.src_node
4061 src_images = self.src_images
4062 cluster_name = self.cfg.GetClusterName()
4063 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4064 src_node, src_images,
4066 import_result.Raise()
4067 for idx, result in enumerate(import_result.data):
4069 self.LogWarning("Could not import the image %s for instance"
4070 " %s, disk %d, on node %s" %
4071 (src_images[idx], instance, idx, pnode_name))
4073 # also checked in the prereq part
4074 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4078 logging.info("Starting instance %s on node %s", instance, pnode_name)
4079 feedback_fn("* starting instance...")
4080 result = self.rpc.call_instance_start(pnode_name, iobj, None)
4083 raise errors.OpExecError("Could not start instance")
4086 class LUConnectConsole(NoHooksLU):
4087 """Connect to an instance's console.
4089 This is somewhat special in that it returns the command line that
4090 you need to run on the master node in order to connect to the
4094 _OP_REQP = ["instance_name"]
4097 def ExpandNames(self):
4098 self._ExpandAndLockInstance()
4100 def CheckPrereq(self):
4101 """Check prerequisites.
4103 This checks that the instance is in the cluster.
4106 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4107 assert self.instance is not None, \
4108 "Cannot retrieve locked instance %s" % self.op.instance_name
4110 def Exec(self, feedback_fn):
4111 """Connect to the console of an instance
4114 instance = self.instance
4115 node = instance.primary_node
4117 node_insts = self.rpc.call_instance_list([node],
4118 [instance.hypervisor])[node]
4121 if instance.name not in node_insts.data:
4122 raise errors.OpExecError("Instance %s is not running." % instance.name)
4124 logging.debug("Connecting to console of %s on %s", instance.name, node)
4126 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4127 console_cmd = hyper.GetShellCommandForConsole(instance)
4130 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4133 class LUReplaceDisks(LogicalUnit):
4134 """Replace the disks of an instance.
4137 HPATH = "mirrors-replace"
4138 HTYPE = constants.HTYPE_INSTANCE
4139 _OP_REQP = ["instance_name", "mode", "disks"]
4142 def ExpandNames(self):
4143 self._ExpandAndLockInstance()
4145 if not hasattr(self.op, "remote_node"):
4146 self.op.remote_node = None
4148 ia_name = getattr(self.op, "iallocator", None)
4149 if ia_name is not None:
4150 if self.op.remote_node is not None:
4151 raise errors.OpPrereqError("Give either the iallocator or the new"
4152 " secondary, not both")
4153 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4154 elif self.op.remote_node is not None:
4155 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4156 if remote_node is None:
4157 raise errors.OpPrereqError("Node '%s' not known" %
4158 self.op.remote_node)
4159 self.op.remote_node = remote_node
4160 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4161 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4163 self.needed_locks[locking.LEVEL_NODE] = []
4164 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4166 def DeclareLocks(self, level):
4167 # If we're not already locking all nodes in the set we have to declare the
4168 # instance's primary/secondary nodes.
4169 if (level == locking.LEVEL_NODE and
4170 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4171 self._LockInstancesNodes()
4173 def _RunAllocator(self):
4174 """Compute a new secondary node using an IAllocator.
4177 ial = IAllocator(self,
4178 mode=constants.IALLOCATOR_MODE_RELOC,
4179 name=self.op.instance_name,
4180 relocate_from=[self.sec_node])
4182 ial.Run(self.op.iallocator)
4185 raise errors.OpPrereqError("Can't compute nodes using"
4186 " iallocator '%s': %s" % (self.op.iallocator,
4188 if len(ial.nodes) != ial.required_nodes:
4189 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4190 " of nodes (%s), required %s" %
4191 (len(ial.nodes), ial.required_nodes))
4192 self.op.remote_node = ial.nodes[0]
4193 self.LogInfo("Selected new secondary for the instance: %s",
4194 self.op.remote_node)
4196 def BuildHooksEnv(self):
4199 This runs on the master, the primary and all the secondaries.
4203 "MODE": self.op.mode,
4204 "NEW_SECONDARY": self.op.remote_node,
4205 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4207 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4209 self.cfg.GetMasterNode(),
4210 self.instance.primary_node,
4212 if self.op.remote_node is not None:
4213 nl.append(self.op.remote_node)
4216 def CheckPrereq(self):
4217 """Check prerequisites.
4219 This checks that the instance is in the cluster.
4222 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4223 assert instance is not None, \
4224 "Cannot retrieve locked instance %s" % self.op.instance_name
4225 self.instance = instance
4227 if instance.disk_template not in constants.DTS_NET_MIRROR:
4228 raise errors.OpPrereqError("Instance's disk layout is not"
4229 " network mirrored.")
4231 if len(instance.secondary_nodes) != 1:
4232 raise errors.OpPrereqError("The instance has a strange layout,"
4233 " expected one secondary but found %d" %
4234 len(instance.secondary_nodes))
4236 self.sec_node = instance.secondary_nodes[0]
4238 ia_name = getattr(self.op, "iallocator", None)
4239 if ia_name is not None:
4240 self._RunAllocator()
4242 remote_node = self.op.remote_node
4243 if remote_node is not None:
4244 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4245 assert self.remote_node_info is not None, \
4246 "Cannot retrieve locked node %s" % remote_node
4248 self.remote_node_info = None
4249 if remote_node == instance.primary_node:
4250 raise errors.OpPrereqError("The specified node is the primary node of"
4252 elif remote_node == self.sec_node:
4253 if self.op.mode == constants.REPLACE_DISK_SEC:
4254 # this is for DRBD8, where we can't execute the same mode of
4255 # replacement as for drbd7 (no different port allocated)
4256 raise errors.OpPrereqError("Same secondary given, cannot execute"
4258 if instance.disk_template == constants.DT_DRBD8:
4259 if (self.op.mode == constants.REPLACE_DISK_ALL and
4260 remote_node is not None):
4261 # switch to replace secondary mode
4262 self.op.mode = constants.REPLACE_DISK_SEC
4264 if self.op.mode == constants.REPLACE_DISK_ALL:
4265 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4266 " secondary disk replacement, not"
4268 elif self.op.mode == constants.REPLACE_DISK_PRI:
4269 if remote_node is not None:
4270 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4271 " the secondary while doing a primary"
4272 " node disk replacement")
4273 self.tgt_node = instance.primary_node
4274 self.oth_node = instance.secondary_nodes[0]
4275 elif self.op.mode == constants.REPLACE_DISK_SEC:
4276 self.new_node = remote_node # this can be None, in which case
4277 # we don't change the secondary
4278 self.tgt_node = instance.secondary_nodes[0]
4279 self.oth_node = instance.primary_node
4281 raise errors.ProgrammerError("Unhandled disk replace mode")
4283 if not self.op.disks:
4284 self.op.disks = range(len(instance.disks))
4286 for disk_idx in self.op.disks:
4287 instance.FindDisk(disk_idx)
4289 def _ExecD8DiskOnly(self, feedback_fn):
4290 """Replace a disk on the primary or secondary for dbrd8.
4292 The algorithm for replace is quite complicated:
4294 1. for each disk to be replaced:
4296 1. create new LVs on the target node with unique names
4297 1. detach old LVs from the drbd device
4298 1. rename old LVs to name_replaced.<time_t>
4299 1. rename new LVs to old LVs
4300 1. attach the new LVs (with the old names now) to the drbd device
4302 1. wait for sync across all devices
4304 1. for each modified disk:
4306 1. remove old LVs (which have the name name_replaces.<time_t>)
4308 Failures are not very well handled.
4312 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4313 instance = self.instance
4315 vgname = self.cfg.GetVGName()
4318 tgt_node = self.tgt_node
4319 oth_node = self.oth_node
4321 # Step: check device activation
4322 self.proc.LogStep(1, steps_total, "check device existence")
4323 info("checking volume groups")
4324 my_vg = cfg.GetVGName()
4325 results = self.rpc.call_vg_list([oth_node, tgt_node])
4327 raise errors.OpExecError("Can't list volume groups on the nodes")
4328 for node in oth_node, tgt_node:
4330 if res.failed or not res.data or my_vg not in res.data:
4331 raise errors.OpExecError("Volume group '%s' not found on %s" %
4333 for idx, dev in enumerate(instance.disks):
4334 if idx not in self.op.disks:
4336 for node in tgt_node, oth_node:
4337 info("checking disk/%d on %s" % (idx, node))
4338 cfg.SetDiskID(dev, node)
4339 if not self.rpc.call_blockdev_find(node, dev):
4340 raise errors.OpExecError("Can't find disk/%d on node %s" %
4343 # Step: check other node consistency
4344 self.proc.LogStep(2, steps_total, "check peer consistency")
4345 for idx, dev in enumerate(instance.disks):
4346 if idx not in self.op.disks:
4348 info("checking disk/%d consistency on %s" % (idx, oth_node))
4349 if not _CheckDiskConsistency(self, dev, oth_node,
4350 oth_node==instance.primary_node):
4351 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4352 " to replace disks on this node (%s)" %
4353 (oth_node, tgt_node))
4355 # Step: create new storage
4356 self.proc.LogStep(3, steps_total, "allocate new storage")
4357 for idx, dev in enumerate(instance.disks):
4358 if idx not in self.op.disks:
4361 cfg.SetDiskID(dev, tgt_node)
4362 lv_names = [".disk%d_%s" % (idx, suf)
4363 for suf in ["data", "meta"]]
4364 names = _GenerateUniqueNames(self, lv_names)
4365 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4366 logical_id=(vgname, names[0]))
4367 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4368 logical_id=(vgname, names[1]))
4369 new_lvs = [lv_data, lv_meta]
4370 old_lvs = dev.children
4371 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4372 info("creating new local storage on %s for %s" %
4373 (tgt_node, dev.iv_name))
4374 # since we *always* want to create this LV, we use the
4375 # _Create...OnPrimary (which forces the creation), even if we
4376 # are talking about the secondary node
4377 for new_lv in new_lvs:
4378 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4379 _GetInstanceInfoText(instance)):
4380 raise errors.OpExecError("Failed to create new LV named '%s' on"
4382 (new_lv.logical_id[1], tgt_node))
4384 # Step: for each lv, detach+rename*2+attach
4385 self.proc.LogStep(4, steps_total, "change drbd configuration")
4386 for dev, old_lvs, new_lvs in iv_names.itervalues():
4387 info("detaching %s drbd from local storage" % dev.iv_name)
4388 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4391 raise errors.OpExecError("Can't detach drbd from local storage on node"
4392 " %s for device %s" % (tgt_node, dev.iv_name))
4394 #cfg.Update(instance)
4396 # ok, we created the new LVs, so now we know we have the needed
4397 # storage; as such, we proceed on the target node to rename
4398 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4399 # using the assumption that logical_id == physical_id (which in
4400 # turn is the unique_id on that node)
4402 # FIXME(iustin): use a better name for the replaced LVs
4403 temp_suffix = int(time.time())
4404 ren_fn = lambda d, suff: (d.physical_id[0],
4405 d.physical_id[1] + "_replaced-%s" % suff)
4406 # build the rename list based on what LVs exist on the node
4408 for to_ren in old_lvs:
4409 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4410 if not find_res.failed and find_res.data is not None: # device exists
4411 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4413 info("renaming the old LVs on the target node")
4414 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4417 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4418 # now we rename the new LVs to the old LVs
4419 info("renaming the new LVs on the target node")
4420 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4421 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4424 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4426 for old, new in zip(old_lvs, new_lvs):
4427 new.logical_id = old.logical_id
4428 cfg.SetDiskID(new, tgt_node)
4430 for disk in old_lvs:
4431 disk.logical_id = ren_fn(disk, temp_suffix)
4432 cfg.SetDiskID(disk, tgt_node)
4434 # now that the new lvs have the old name, we can add them to the device
4435 info("adding new mirror component on %s" % tgt_node)
4436 result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4437 if result.failed or not result.data:
4438 for new_lv in new_lvs:
4439 result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4440 if result.failed or not result.data:
4441 warning("Can't rollback device %s", hint="manually cleanup unused"
4443 raise errors.OpExecError("Can't add local storage to drbd")
4445 dev.children = new_lvs
4446 cfg.Update(instance)
4448 # Step: wait for sync
4450 # this can fail as the old devices are degraded and _WaitForSync
4451 # does a combined result over all disks, so we don't check its
4453 self.proc.LogStep(5, steps_total, "sync devices")
4454 _WaitForSync(self, instance, unlock=True)
4456 # so check manually all the devices
4457 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4458 cfg.SetDiskID(dev, instance.primary_node)
4459 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4460 if result.failed or result.data[5]:
4461 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4463 # Step: remove old storage
4464 self.proc.LogStep(6, steps_total, "removing old storage")
4465 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4466 info("remove logical volumes for %s" % name)
4468 cfg.SetDiskID(lv, tgt_node)
4469 result = self.rpc.call_blockdev_remove(tgt_node, lv)
4470 if result.failed or not result.data:
4471 warning("Can't remove old LV", hint="manually remove unused LVs")
4474 def _ExecD8Secondary(self, feedback_fn):
4475 """Replace the secondary node for drbd8.
4477 The algorithm for replace is quite complicated:
4478 - for all disks of the instance:
4479 - create new LVs on the new node with same names
4480 - shutdown the drbd device on the old secondary
4481 - disconnect the drbd network on the primary
4482 - create the drbd device on the new secondary
4483 - network attach the drbd on the primary, using an artifice:
4484 the drbd code for Attach() will connect to the network if it
4485 finds a device which is connected to the good local disks but
4487 - wait for sync across all devices
4488 - remove all disks from the old secondary
4490 Failures are not very well handled.
4494 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4495 instance = self.instance
4497 vgname = self.cfg.GetVGName()
4500 old_node = self.tgt_node
4501 new_node = self.new_node
4502 pri_node = instance.primary_node
4504 # Step: check device activation
4505 self.proc.LogStep(1, steps_total, "check device existence")
4506 info("checking volume groups")
4507 my_vg = cfg.GetVGName()
4508 results = self.rpc.call_vg_list([pri_node, new_node])
4509 for node in pri_node, new_node:
4511 if res.failed or not res.data or my_vg not in res.data:
4512 raise errors.OpExecError("Volume group '%s' not found on %s" %
4514 for idx, dev in enumerate(instance.disks):
4515 if idx not in self.op.disks:
4517 info("checking disk/%d on %s" % (idx, pri_node))
4518 cfg.SetDiskID(dev, pri_node)
4519 result = self.rpc.call_blockdev_find(pri_node, dev)
4522 raise errors.OpExecError("Can't find disk/%d on node %s" %
4525 # Step: check other node consistency
4526 self.proc.LogStep(2, steps_total, "check peer consistency")
4527 for idx, dev in enumerate(instance.disks):
4528 if idx not in self.op.disks:
4530 info("checking disk/%d consistency on %s" % (idx, pri_node))
4531 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4532 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4533 " unsafe to replace the secondary" %
4536 # Step: create new storage
4537 self.proc.LogStep(3, steps_total, "allocate new storage")
4538 for idx, dev in enumerate(instance.disks):
4540 info("adding new local storage on %s for disk/%d" %
4542 # since we *always* want to create this LV, we use the
4543 # _Create...OnPrimary (which forces the creation), even if we
4544 # are talking about the secondary node
4545 for new_lv in dev.children:
4546 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4547 _GetInstanceInfoText(instance)):
4548 raise errors.OpExecError("Failed to create new LV named '%s' on"
4550 (new_lv.logical_id[1], new_node))
4552 # Step 4: dbrd minors and drbd setups changes
4553 # after this, we must manually remove the drbd minors on both the
4554 # error and the success paths
4555 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4557 logging.debug("Allocated minors %s" % (minors,))
4558 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4559 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4561 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4562 # create new devices on new_node
4563 if pri_node == dev.logical_id[0]:
4564 new_logical_id = (pri_node, new_node,
4565 dev.logical_id[2], dev.logical_id[3], new_minor,
4568 new_logical_id = (new_node, pri_node,
4569 dev.logical_id[2], new_minor, dev.logical_id[4],
4571 iv_names[idx] = (dev, dev.children, new_logical_id)
4572 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4574 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4575 logical_id=new_logical_id,
4576 children=dev.children)
4577 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4579 _GetInstanceInfoText(instance)):
4580 self.cfg.ReleaseDRBDMinors(instance.name)
4581 raise errors.OpExecError("Failed to create new DRBD on"
4582 " node '%s'" % new_node)
4584 for idx, dev in enumerate(instance.disks):
4585 # we have new devices, shutdown the drbd on the old secondary
4586 info("shutting down drbd for disk/%d on old node" % idx)
4587 cfg.SetDiskID(dev, old_node)
4588 result = self.rpc.call_blockdev_shutdown(old_node, dev)
4589 if result.failed or not result.data:
4590 warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4591 hint="Please cleanup this device manually as soon as possible")
4593 info("detaching primary drbds from the network (=> standalone)")
4595 for idx, dev in enumerate(instance.disks):
4596 cfg.SetDiskID(dev, pri_node)
4597 # set the network part of the physical (unique in bdev terms) id
4598 # to None, meaning detach from network
4599 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4600 # and 'find' the device, which will 'fix' it to match the
4602 result = self.rpc.call_blockdev_find(pri_node, dev)
4603 if not result.failed and result.data:
4606 warning("Failed to detach drbd disk/%d from network, unusual case" %
4610 # no detaches succeeded (very unlikely)
4611 self.cfg.ReleaseDRBDMinors(instance.name)
4612 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4614 # if we managed to detach at least one, we update all the disks of
4615 # the instance to point to the new secondary
4616 info("updating instance configuration")
4617 for dev, _, new_logical_id in iv_names.itervalues():
4618 dev.logical_id = new_logical_id
4619 cfg.SetDiskID(dev, pri_node)
4620 cfg.Update(instance)
4621 # we can remove now the temp minors as now the new values are
4622 # written to the config file (and therefore stable)
4623 self.cfg.ReleaseDRBDMinors(instance.name)
4625 # and now perform the drbd attach
4626 info("attaching primary drbds to new secondary (standalone => connected)")
4628 for idx, dev in enumerate(instance.disks):
4629 info("attaching primary drbd for disk/%d to new secondary node" % idx)
4630 # since the attach is smart, it's enough to 'find' the device,
4631 # it will automatically activate the network, if the physical_id
4633 cfg.SetDiskID(dev, pri_node)
4634 logging.debug("Disk to attach: %s", dev)
4635 result = self.rpc.call_blockdev_find(pri_node, dev)
4636 if result.failed or not result.data:
4637 warning("can't attach drbd disk/%d to new secondary!" % idx,
4638 "please do a gnt-instance info to see the status of disks")
4640 # this can fail as the old devices are degraded and _WaitForSync
4641 # does a combined result over all disks, so we don't check its
4643 self.proc.LogStep(5, steps_total, "sync devices")
4644 _WaitForSync(self, instance, unlock=True)
4646 # so check manually all the devices
4647 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4648 cfg.SetDiskID(dev, pri_node)
4649 result = self.rpc.call_blockdev_find(pri_node, dev)
4652 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4654 self.proc.LogStep(6, steps_total, "removing old storage")
4655 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4656 info("remove logical volumes for disk/%d" % idx)
4658 cfg.SetDiskID(lv, old_node)
4659 result = self.rpc.call_blockdev_remove(old_node, lv)
4660 if result.failed or not result.data:
4661 warning("Can't remove LV on old secondary",
4662 hint="Cleanup stale volumes by hand")
4664 def Exec(self, feedback_fn):
4665 """Execute disk replacement.
4667 This dispatches the disk replacement to the appropriate handler.
4670 instance = self.instance
4672 # Activate the instance disks if we're replacing them on a down instance
4673 if instance.status == "down":
4674 _StartInstanceDisks(self, instance, True)
4676 if instance.disk_template == constants.DT_DRBD8:
4677 if self.op.remote_node is None:
4678 fn = self._ExecD8DiskOnly
4680 fn = self._ExecD8Secondary
4682 raise errors.ProgrammerError("Unhandled disk replacement case")
4684 ret = fn(feedback_fn)
4686 # Deactivate the instance disks if we're replacing them on a down instance
4687 if instance.status == "down":
4688 _SafeShutdownInstanceDisks(self, instance)
4693 class LUGrowDisk(LogicalUnit):
4694 """Grow a disk of an instance.
4698 HTYPE = constants.HTYPE_INSTANCE
4699 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4702 def ExpandNames(self):
4703 self._ExpandAndLockInstance()
4704 self.needed_locks[locking.LEVEL_NODE] = []
4705 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4707 def DeclareLocks(self, level):
4708 if level == locking.LEVEL_NODE:
4709 self._LockInstancesNodes()
4711 def BuildHooksEnv(self):
4714 This runs on the master, the primary and all the secondaries.
4718 "DISK": self.op.disk,
4719 "AMOUNT": self.op.amount,
4721 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4723 self.cfg.GetMasterNode(),
4724 self.instance.primary_node,
4728 def CheckPrereq(self):
4729 """Check prerequisites.
4731 This checks that the instance is in the cluster.
4734 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4735 assert instance is not None, \
4736 "Cannot retrieve locked instance %s" % self.op.instance_name
4738 self.instance = instance
4740 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4741 raise errors.OpPrereqError("Instance's disk layout does not support"
4744 self.disk = instance.FindDisk(self.op.disk)
4746 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4747 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4748 instance.hypervisor)
4749 for node in nodenames:
4750 info = nodeinfo[node]
4751 if info.failed or not info.data:
4752 raise errors.OpPrereqError("Cannot get current information"
4753 " from node '%s'" % node)
4754 vg_free = info.data.get('vg_free', None)
4755 if not isinstance(vg_free, int):
4756 raise errors.OpPrereqError("Can't compute free disk space on"
4758 if self.op.amount > vg_free:
4759 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4760 " %d MiB available, %d MiB required" %
4761 (node, vg_free, self.op.amount))
4763 def Exec(self, feedback_fn):
4764 """Execute disk grow.
4767 instance = self.instance
4769 for node in (instance.secondary_nodes + (instance.primary_node,)):
4770 self.cfg.SetDiskID(disk, node)
4771 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4773 if (not result.data or not isinstance(result.data, (list, tuple)) or
4774 len(result.data) != 2):
4775 raise errors.OpExecError("Grow request failed to node %s" % node)
4776 elif not result.data[0]:
4777 raise errors.OpExecError("Grow request failed to node %s: %s" %
4778 (node, result.data[1]))
4779 disk.RecordGrow(self.op.amount)
4780 self.cfg.Update(instance)
4781 if self.op.wait_for_sync:
4782 disk_abort = not _WaitForSync(self, instance)
4784 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4785 " status.\nPlease check the instance.")
4788 class LUQueryInstanceData(NoHooksLU):
4789 """Query runtime instance data.
4792 _OP_REQP = ["instances", "static"]
4795 def ExpandNames(self):
4796 self.needed_locks = {}
4797 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4799 if not isinstance(self.op.instances, list):
4800 raise errors.OpPrereqError("Invalid argument type 'instances'")
4802 if self.op.instances:
4803 self.wanted_names = []
4804 for name in self.op.instances:
4805 full_name = self.cfg.ExpandInstanceName(name)
4806 if full_name is None:
4807 raise errors.OpPrereqError("Instance '%s' not known" %
4808 self.op.instance_name)
4809 self.wanted_names.append(full_name)
4810 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4812 self.wanted_names = None
4813 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4815 self.needed_locks[locking.LEVEL_NODE] = []
4816 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4818 def DeclareLocks(self, level):
4819 if level == locking.LEVEL_NODE:
4820 self._LockInstancesNodes()
4822 def CheckPrereq(self):
4823 """Check prerequisites.
4825 This only checks the optional instance list against the existing names.
4828 if self.wanted_names is None:
4829 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4831 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4832 in self.wanted_names]
4835 def _ComputeDiskStatus(self, instance, snode, dev):
4836 """Compute block device status.
4839 static = self.op.static
4841 self.cfg.SetDiskID(dev, instance.primary_node)
4842 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4844 dev_pstatus = dev_pstatus.data
4848 if dev.dev_type in constants.LDS_DRBD:
4849 # we change the snode then (otherwise we use the one passed in)
4850 if dev.logical_id[0] == instance.primary_node:
4851 snode = dev.logical_id[1]
4853 snode = dev.logical_id[0]
4855 if snode and not static:
4856 self.cfg.SetDiskID(dev, snode)
4857 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4859 dev_sstatus = dev_sstatus.data
4864 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4865 for child in dev.children]
4870 "iv_name": dev.iv_name,
4871 "dev_type": dev.dev_type,
4872 "logical_id": dev.logical_id,
4873 "physical_id": dev.physical_id,
4874 "pstatus": dev_pstatus,
4875 "sstatus": dev_sstatus,
4876 "children": dev_children,
4882 def Exec(self, feedback_fn):
4883 """Gather and return data"""
4886 cluster = self.cfg.GetClusterInfo()
4888 for instance in self.wanted_instances:
4889 if not self.op.static:
4890 remote_info = self.rpc.call_instance_info(instance.primary_node,
4892 instance.hypervisor)
4894 remote_info = remote_info.data
4895 if remote_info and "state" in remote_info:
4898 remote_state = "down"
4901 if instance.status == "down":
4902 config_state = "down"
4906 disks = [self._ComputeDiskStatus(instance, None, device)
4907 for device in instance.disks]
4910 "name": instance.name,
4911 "config_state": config_state,
4912 "run_state": remote_state,
4913 "pnode": instance.primary_node,
4914 "snodes": instance.secondary_nodes,
4916 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4918 "hypervisor": instance.hypervisor,
4919 "network_port": instance.network_port,
4920 "hv_instance": instance.hvparams,
4921 "hv_actual": cluster.FillHV(instance),
4922 "be_instance": instance.beparams,
4923 "be_actual": cluster.FillBE(instance),
4926 result[instance.name] = idict
4931 class LUSetInstanceParams(LogicalUnit):
4932 """Modifies an instances's parameters.
4935 HPATH = "instance-modify"
4936 HTYPE = constants.HTYPE_INSTANCE
4937 _OP_REQP = ["instance_name"]
4940 def CheckArguments(self):
4941 if not hasattr(self.op, 'nics'):
4943 if not hasattr(self.op, 'disks'):
4945 if not hasattr(self.op, 'beparams'):
4946 self.op.beparams = {}
4947 if not hasattr(self.op, 'hvparams'):
4948 self.op.hvparams = {}
4949 self.op.force = getattr(self.op, "force", False)
4950 if not (self.op.nics or self.op.disks or
4951 self.op.hvparams or self.op.beparams):
4952 raise errors.OpPrereqError("No changes submitted")
4954 utils.CheckBEParams(self.op.beparams)
4958 for disk_op, disk_dict in self.op.disks:
4959 if disk_op == constants.DDM_REMOVE:
4962 elif disk_op == constants.DDM_ADD:
4965 if not isinstance(disk_op, int):
4966 raise errors.OpPrereqError("Invalid disk index")
4967 if disk_op == constants.DDM_ADD:
4968 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4969 if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4970 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4971 size = disk_dict.get('size', None)
4973 raise errors.OpPrereqError("Required disk parameter size missing")
4976 except ValueError, err:
4977 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
4979 disk_dict['size'] = size
4981 # modification of disk
4982 if 'size' in disk_dict:
4983 raise errors.OpPrereqError("Disk size change not possible, use"
4986 if disk_addremove > 1:
4987 raise errors.OpPrereqError("Only one disk add or remove operation"
4988 " supported at a time")
4992 for nic_op, nic_dict in self.op.nics:
4993 if nic_op == constants.DDM_REMOVE:
4996 elif nic_op == constants.DDM_ADD:
4999 if not isinstance(nic_op, int):
5000 raise errors.OpPrereqError("Invalid nic index")
5002 # nic_dict should be a dict
5003 nic_ip = nic_dict.get('ip', None)
5004 if nic_ip is not None:
5005 if nic_ip.lower() == "none":
5006 nic_dict['ip'] = None
5008 if not utils.IsValidIP(nic_ip):
5009 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5010 # we can only check None bridges and assign the default one
5011 nic_bridge = nic_dict.get('bridge', None)
5012 if nic_bridge is None:
5013 nic_dict['bridge'] = self.cfg.GetDefBridge()
5014 # but we can validate MACs
5015 nic_mac = nic_dict.get('mac', None)
5016 if nic_mac is not None:
5017 if self.cfg.IsMacInUse(nic_mac):
5018 raise errors.OpPrereqError("MAC address %s already in use"
5019 " in cluster" % nic_mac)
5020 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5021 if not utils.IsValidMac(nic_mac):
5022 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5023 if nic_addremove > 1:
5024 raise errors.OpPrereqError("Only one NIC add or remove operation"
5025 " supported at a time")
5027 def ExpandNames(self):
5028 self._ExpandAndLockInstance()
5029 self.needed_locks[locking.LEVEL_NODE] = []
5030 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5032 def DeclareLocks(self, level):
5033 if level == locking.LEVEL_NODE:
5034 self._LockInstancesNodes()
5036 def BuildHooksEnv(self):
5039 This runs on the master, primary and secondaries.
5043 if constants.BE_MEMORY in self.be_new:
5044 args['memory'] = self.be_new[constants.BE_MEMORY]
5045 if constants.BE_VCPUS in self.be_new:
5046 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5047 # FIXME: readd disk/nic changes
5048 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5049 nl = [self.cfg.GetMasterNode(),
5050 self.instance.primary_node] + list(self.instance.secondary_nodes)
5053 def CheckPrereq(self):
5054 """Check prerequisites.
5056 This only checks the instance list against the existing names.
5059 force = self.force = self.op.force
5061 # checking the new params on the primary/secondary nodes
5063 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5064 assert self.instance is not None, \
5065 "Cannot retrieve locked instance %s" % self.op.instance_name
5066 pnode = self.instance.primary_node
5068 nodelist.extend(instance.secondary_nodes)
5070 # hvparams processing
5071 if self.op.hvparams:
5072 i_hvdict = copy.deepcopy(instance.hvparams)
5073 for key, val in self.op.hvparams.iteritems():
5074 if val == constants.VALUE_DEFAULT:
5079 elif val == constants.VALUE_NONE:
5080 i_hvdict[key] = None
5083 cluster = self.cfg.GetClusterInfo()
5084 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5087 hypervisor.GetHypervisor(
5088 instance.hypervisor).CheckParameterSyntax(hv_new)
5089 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5090 self.hv_new = hv_new # the new actual values
5091 self.hv_inst = i_hvdict # the new dict (without defaults)
5093 self.hv_new = self.hv_inst = {}
5095 # beparams processing
5096 if self.op.beparams:
5097 i_bedict = copy.deepcopy(instance.beparams)
5098 for key, val in self.op.beparams.iteritems():
5099 if val == constants.VALUE_DEFAULT:
5106 cluster = self.cfg.GetClusterInfo()
5107 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5109 self.be_new = be_new # the new actual values
5110 self.be_inst = i_bedict # the new dict (without defaults)
5112 self.be_new = self.be_inst = {}
5116 if constants.BE_MEMORY in self.op.beparams and not self.force:
5117 mem_check_list = [pnode]
5118 if be_new[constants.BE_AUTO_BALANCE]:
5119 # either we changed auto_balance to yes or it was from before
5120 mem_check_list.extend(instance.secondary_nodes)
5121 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5122 instance.hypervisor)
5123 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5124 instance.hypervisor)
5125 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5126 # Assume the primary node is unreachable and go ahead
5127 self.warn.append("Can't get info from primary node %s" % pnode)
5129 if not instance_info.failed and instance_info.data:
5130 current_mem = instance_info.data['memory']
5132 # Assume instance not running
5133 # (there is a slight race condition here, but it's not very probable,
5134 # and we have no other way to check)
5136 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5137 nodeinfo[pnode].data['memory_free'])
5139 raise errors.OpPrereqError("This change will prevent the instance"
5140 " from starting, due to %d MB of memory"
5141 " missing on its primary node" % miss_mem)
5143 if be_new[constants.BE_AUTO_BALANCE]:
5144 for node, nres in instance.secondary_nodes.iteritems():
5145 if nres.failed or not isinstance(nres.data, dict):
5146 self.warn.append("Can't get info from secondary node %s" % node)
5147 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5148 self.warn.append("Not enough memory to failover instance to"
5149 " secondary node %s" % node)
5152 for nic_op, nic_dict in self.op.nics:
5153 if nic_op == constants.DDM_REMOVE:
5154 if not instance.nics:
5155 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5157 if nic_op != constants.DDM_ADD:
5159 if nic_op < 0 or nic_op >= len(instance.nics):
5160 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5162 (nic_op, len(instance.nics)))
5163 nic_bridge = nic_dict.get('bridge', None)
5164 if nic_bridge is not None:
5165 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5166 msg = ("Bridge '%s' doesn't exist on one of"
5167 " the instance nodes" % nic_bridge)
5169 self.warn.append(msg)
5171 raise errors.OpPrereqError(msg)
5174 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5175 raise errors.OpPrereqError("Disk operations not supported for"
5176 " diskless instances")
5177 for disk_op, disk_dict in self.op.disks:
5178 if disk_op == constants.DDM_REMOVE:
5179 if len(instance.disks) == 1:
5180 raise errors.OpPrereqError("Cannot remove the last disk of"
5182 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5183 ins_l = ins_l[pnode]
5184 if not type(ins_l) is list:
5185 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5186 if instance.name in ins_l:
5187 raise errors.OpPrereqError("Instance is running, can't remove"
5190 if (disk_op == constants.DDM_ADD and
5191 len(instance.nics) >= constants.MAX_DISKS):
5192 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5193 " add more" % constants.MAX_DISKS)
5194 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5196 if disk_op < 0 or disk_op >= len(instance.disks):
5197 raise errors.OpPrereqError("Invalid disk index %s, valid values"
5199 (disk_op, len(instance.disks)))
5203 def Exec(self, feedback_fn):
5204 """Modifies an instance.
5206 All parameters take effect only at the next restart of the instance.
5209 # Process here the warnings from CheckPrereq, as we don't have a
5210 # feedback_fn there.
5211 for warn in self.warn:
5212 feedback_fn("WARNING: %s" % warn)
5215 instance = self.instance
5217 for disk_op, disk_dict in self.op.disks:
5218 if disk_op == constants.DDM_REMOVE:
5219 # remove the last disk
5220 device = instance.disks.pop()
5221 device_idx = len(instance.disks)
5222 for node, disk in device.ComputeNodeTree(instance.primary_node):
5223 self.cfg.SetDiskID(disk, node)
5224 result = self.rpc.call_blockdev_remove(node, disk)
5225 if result.failed or not result.data:
5226 self.proc.LogWarning("Could not remove disk/%d on node %s,"
5227 " continuing anyway", device_idx, node)
5228 result.append(("disk/%d" % device_idx, "remove"))
5229 elif disk_op == constants.DDM_ADD:
5231 if instance.disk_template == constants.DT_FILE:
5232 file_driver, file_path = instance.disks[0].logical_id
5233 file_path = os.path.dirname(file_path)
5235 file_driver = file_path = None
5236 disk_idx_base = len(instance.disks)
5237 new_disk = _GenerateDiskTemplate(self,
5238 instance.disk_template,
5239 instance, instance.primary_node,
5240 instance.secondary_nodes,
5245 new_disk.mode = disk_dict['mode']
5246 instance.disks.append(new_disk)
5247 info = _GetInstanceInfoText(instance)
5249 logging.info("Creating volume %s for instance %s",
5250 new_disk.iv_name, instance.name)
5251 # Note: this needs to be kept in sync with _CreateDisks
5253 for secondary_node in instance.secondary_nodes:
5254 if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5255 new_disk, False, info):
5256 self.LogWarning("Failed to create volume %s (%s) on"
5257 " secondary node %s!",
5258 new_disk.iv_name, new_disk, secondary_node)
5260 if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5261 instance, new_disk, info):
5262 self.LogWarning("Failed to create volume %s on primary!",
5264 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5265 (new_disk.size, new_disk.mode)))
5267 # change a given disk
5268 instance.disks[disk_op].mode = disk_dict['mode']
5269 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5271 for nic_op, nic_dict in self.op.nics:
5272 if nic_op == constants.DDM_REMOVE:
5273 # remove the last nic
5274 del instance.nics[-1]
5275 result.append(("nic.%d" % len(instance.nics), "remove"))
5276 elif nic_op == constants.DDM_ADD:
5278 if 'mac' not in nic_dict:
5279 mac = constants.VALUE_GENERATE
5281 mac = nic_dict['mac']
5282 if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5283 mac = self.cfg.GenerateMAC()
5284 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5285 bridge=nic_dict.get('bridge', None))
5286 instance.nics.append(new_nic)
5287 result.append(("nic.%d" % (len(instance.nics) - 1),
5288 "add:mac=%s,ip=%s,bridge=%s" %
5289 (new_nic.mac, new_nic.ip, new_nic.bridge)))
5291 # change a given nic
5292 for key in 'mac', 'ip', 'bridge':
5294 setattr(instance.nics[nic_op], key, nic_dict[key])
5295 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5298 if self.op.hvparams:
5299 instance.hvparams = self.hv_new
5300 for key, val in self.op.hvparams.iteritems():
5301 result.append(("hv/%s" % key, val))
5304 if self.op.beparams:
5305 instance.beparams = self.be_inst
5306 for key, val in self.op.beparams.iteritems():
5307 result.append(("be/%s" % key, val))
5309 self.cfg.Update(instance)
5314 class LUQueryExports(NoHooksLU):
5315 """Query the exports list
5318 _OP_REQP = ['nodes']
5321 def ExpandNames(self):
5322 self.needed_locks = {}
5323 self.share_locks[locking.LEVEL_NODE] = 1
5324 if not self.op.nodes:
5325 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5327 self.needed_locks[locking.LEVEL_NODE] = \
5328 _GetWantedNodes(self, self.op.nodes)
5330 def CheckPrereq(self):
5331 """Check prerequisites.
5334 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5336 def Exec(self, feedback_fn):
5337 """Compute the list of all the exported system images.
5340 @return: a dictionary with the structure node->(export-list)
5341 where export-list is a list of the instances exported on
5345 result = self.rpc.call_export_list(self.nodes)
5350 class LUExportInstance(LogicalUnit):
5351 """Export an instance to an image in the cluster.
5354 HPATH = "instance-export"
5355 HTYPE = constants.HTYPE_INSTANCE
5356 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5359 def ExpandNames(self):
5360 self._ExpandAndLockInstance()
5361 # FIXME: lock only instance primary and destination node
5363 # Sad but true, for now we have do lock all nodes, as we don't know where
5364 # the previous export might be, and and in this LU we search for it and
5365 # remove it from its current node. In the future we could fix this by:
5366 # - making a tasklet to search (share-lock all), then create the new one,
5367 # then one to remove, after
5368 # - removing the removal operation altoghether
5369 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5371 def DeclareLocks(self, level):
5372 """Last minute lock declaration."""
5373 # All nodes are locked anyway, so nothing to do here.
5375 def BuildHooksEnv(self):
5378 This will run on the master, primary node and target node.
5382 "EXPORT_NODE": self.op.target_node,
5383 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5385 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5386 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5387 self.op.target_node]
5390 def CheckPrereq(self):
5391 """Check prerequisites.
5393 This checks that the instance and node names are valid.
5396 instance_name = self.op.instance_name
5397 self.instance = self.cfg.GetInstanceInfo(instance_name)
5398 assert self.instance is not None, \
5399 "Cannot retrieve locked instance %s" % self.op.instance_name
5401 self.dst_node = self.cfg.GetNodeInfo(
5402 self.cfg.ExpandNodeName(self.op.target_node))
5404 if self.dst_node is None:
5405 # This is wrong node name, not a non-locked node
5406 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5408 # instance disk type verification
5409 for disk in self.instance.disks:
5410 if disk.dev_type == constants.LD_FILE:
5411 raise errors.OpPrereqError("Export not supported for instances with"
5412 " file-based disks")
5414 def Exec(self, feedback_fn):
5415 """Export an instance to an image in the cluster.
5418 instance = self.instance
5419 dst_node = self.dst_node
5420 src_node = instance.primary_node
5421 if self.op.shutdown:
5422 # shutdown the instance, but not the disks
5423 result = self.rpc.call_instance_shutdown(src_node, instance)
5426 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5427 (instance.name, src_node))
5429 vgname = self.cfg.GetVGName()
5434 for disk in instance.disks:
5435 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5436 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5437 if new_dev_name.failed or not new_dev_name.data:
5438 self.LogWarning("Could not snapshot block device %s on node %s",
5439 disk.logical_id[1], src_node)
5440 snap_disks.append(False)
5442 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5443 logical_id=(vgname, new_dev_name.data),
5444 physical_id=(vgname, new_dev_name.data),
5445 iv_name=disk.iv_name)
5446 snap_disks.append(new_dev)
5449 if self.op.shutdown and instance.status == "up":
5450 result = self.rpc.call_instance_start(src_node, instance, None)
5451 if result.failed or not result.data:
5452 _ShutdownInstanceDisks(self, instance)
5453 raise errors.OpExecError("Could not start instance")
5455 # TODO: check for size
5457 cluster_name = self.cfg.GetClusterName()
5458 for idx, dev in enumerate(snap_disks):
5460 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5461 instance, cluster_name, idx)
5462 if result.failed or not result.data:
5463 self.LogWarning("Could not export block device %s from node %s to"
5464 " node %s", dev.logical_id[1], src_node,
5466 result = self.rpc.call_blockdev_remove(src_node, dev)
5467 if result.failed or not result.data:
5468 self.LogWarning("Could not remove snapshot block device %s from node"
5469 " %s", dev.logical_id[1], src_node)
5471 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5472 if result.failed or not result.data:
5473 self.LogWarning("Could not finalize export for instance %s on node %s",
5474 instance.name, dst_node.name)
5476 nodelist = self.cfg.GetNodeList()
5477 nodelist.remove(dst_node.name)
5479 # on one-node clusters nodelist will be empty after the removal
5480 # if we proceed the backup would be removed because OpQueryExports
5481 # substitutes an empty list with the full cluster node list.
5483 exportlist = self.rpc.call_export_list(nodelist)
5484 for node in exportlist:
5485 if exportlist[node].failed:
5487 if instance.name in exportlist[node].data:
5488 if not self.rpc.call_export_remove(node, instance.name):
5489 self.LogWarning("Could not remove older export for instance %s"
5490 " on node %s", instance.name, node)
5493 class LURemoveExport(NoHooksLU):
5494 """Remove exports related to the named instance.
5497 _OP_REQP = ["instance_name"]
5500 def ExpandNames(self):
5501 self.needed_locks = {}
5502 # We need all nodes to be locked in order for RemoveExport to work, but we
5503 # don't need to lock the instance itself, as nothing will happen to it (and
5504 # we can remove exports also for a removed instance)
5505 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5507 def CheckPrereq(self):
5508 """Check prerequisites.
5512 def Exec(self, feedback_fn):
5513 """Remove any export.
5516 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5517 # If the instance was not found we'll try with the name that was passed in.
5518 # This will only work if it was an FQDN, though.
5520 if not instance_name:
5522 instance_name = self.op.instance_name
5524 exportlist = self.rpc.call_export_list(self.acquired_locks[
5525 locking.LEVEL_NODE])
5527 for node in exportlist:
5528 if exportlist[node].failed:
5529 self.LogWarning("Failed to query node %s, continuing" % node)
5531 if instance_name in exportlist[node].data:
5533 result = self.rpc.call_export_remove(node, instance_name)
5534 if result.failed or not result.data:
5535 logging.error("Could not remove export for instance %s"
5536 " on node %s", instance_name, node)
5538 if fqdn_warn and not found:
5539 feedback_fn("Export not found. If trying to remove an export belonging"
5540 " to a deleted instance please use its Fully Qualified"
5544 class TagsLU(NoHooksLU):
5547 This is an abstract class which is the parent of all the other tags LUs.
5551 def ExpandNames(self):
5552 self.needed_locks = {}
5553 if self.op.kind == constants.TAG_NODE:
5554 name = self.cfg.ExpandNodeName(self.op.name)
5556 raise errors.OpPrereqError("Invalid node name (%s)" %
5559 self.needed_locks[locking.LEVEL_NODE] = name
5560 elif self.op.kind == constants.TAG_INSTANCE:
5561 name = self.cfg.ExpandInstanceName(self.op.name)
5563 raise errors.OpPrereqError("Invalid instance name (%s)" %
5566 self.needed_locks[locking.LEVEL_INSTANCE] = name
5568 def CheckPrereq(self):
5569 """Check prerequisites.
5572 if self.op.kind == constants.TAG_CLUSTER:
5573 self.target = self.cfg.GetClusterInfo()
5574 elif self.op.kind == constants.TAG_NODE:
5575 self.target = self.cfg.GetNodeInfo(self.op.name)
5576 elif self.op.kind == constants.TAG_INSTANCE:
5577 self.target = self.cfg.GetInstanceInfo(self.op.name)
5579 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5583 class LUGetTags(TagsLU):
5584 """Returns the tags of a given object.
5587 _OP_REQP = ["kind", "name"]
5590 def Exec(self, feedback_fn):
5591 """Returns the tag list.
5594 return list(self.target.GetTags())
5597 class LUSearchTags(NoHooksLU):
5598 """Searches the tags for a given pattern.
5601 _OP_REQP = ["pattern"]
5604 def ExpandNames(self):
5605 self.needed_locks = {}
5607 def CheckPrereq(self):
5608 """Check prerequisites.
5610 This checks the pattern passed for validity by compiling it.
5614 self.re = re.compile(self.op.pattern)
5615 except re.error, err:
5616 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5617 (self.op.pattern, err))
5619 def Exec(self, feedback_fn):
5620 """Returns the tag list.
5624 tgts = [("/cluster", cfg.GetClusterInfo())]
5625 ilist = cfg.GetAllInstancesInfo().values()
5626 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5627 nlist = cfg.GetAllNodesInfo().values()
5628 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5630 for path, target in tgts:
5631 for tag in target.GetTags():
5632 if self.re.search(tag):
5633 results.append((path, tag))
5637 class LUAddTags(TagsLU):
5638 """Sets a tag on a given object.
5641 _OP_REQP = ["kind", "name", "tags"]
5644 def CheckPrereq(self):
5645 """Check prerequisites.
5647 This checks the type and length of the tag name and value.
5650 TagsLU.CheckPrereq(self)
5651 for tag in self.op.tags:
5652 objects.TaggableObject.ValidateTag(tag)
5654 def Exec(self, feedback_fn):
5659 for tag in self.op.tags:
5660 self.target.AddTag(tag)
5661 except errors.TagError, err:
5662 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5664 self.cfg.Update(self.target)
5665 except errors.ConfigurationError:
5666 raise errors.OpRetryError("There has been a modification to the"
5667 " config file and the operation has been"
5668 " aborted. Please retry.")
5671 class LUDelTags(TagsLU):
5672 """Delete a list of tags from a given object.
5675 _OP_REQP = ["kind", "name", "tags"]
5678 def CheckPrereq(self):
5679 """Check prerequisites.
5681 This checks that we have the given tag.
5684 TagsLU.CheckPrereq(self)
5685 for tag in self.op.tags:
5686 objects.TaggableObject.ValidateTag(tag)
5687 del_tags = frozenset(self.op.tags)
5688 cur_tags = self.target.GetTags()
5689 if not del_tags <= cur_tags:
5690 diff_tags = del_tags - cur_tags
5691 diff_names = ["'%s'" % tag for tag in diff_tags]
5693 raise errors.OpPrereqError("Tag(s) %s not found" %
5694 (",".join(diff_names)))
5696 def Exec(self, feedback_fn):
5697 """Remove the tag from the object.
5700 for tag in self.op.tags:
5701 self.target.RemoveTag(tag)
5703 self.cfg.Update(self.target)
5704 except errors.ConfigurationError:
5705 raise errors.OpRetryError("There has been a modification to the"
5706 " config file and the operation has been"
5707 " aborted. Please retry.")
5710 class LUTestDelay(NoHooksLU):
5711 """Sleep for a specified amount of time.
5713 This LU sleeps on the master and/or nodes for a specified amount of
5717 _OP_REQP = ["duration", "on_master", "on_nodes"]
5720 def ExpandNames(self):
5721 """Expand names and set required locks.
5723 This expands the node list, if any.
5726 self.needed_locks = {}
5727 if self.op.on_nodes:
5728 # _GetWantedNodes can be used here, but is not always appropriate to use
5729 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5731 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5732 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5734 def CheckPrereq(self):
5735 """Check prerequisites.
5739 def Exec(self, feedback_fn):
5740 """Do the actual sleep.
5743 if self.op.on_master:
5744 if not utils.TestDelay(self.op.duration):
5745 raise errors.OpExecError("Error during master delay test")
5746 if self.op.on_nodes:
5747 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5749 raise errors.OpExecError("Complete failure from rpc call")
5750 for node, node_result in result.items():
5752 if not node_result.data:
5753 raise errors.OpExecError("Failure during rpc call to node %s,"
5754 " result: %s" % (node, node_result.data))
5757 class IAllocator(object):
5758 """IAllocator framework.
5760 An IAllocator instance has three sets of attributes:
5761 - cfg that is needed to query the cluster
5762 - input data (all members of the _KEYS class attribute are required)
5763 - four buffer attributes (in|out_data|text), that represent the
5764 input (to the external script) in text and data structure format,
5765 and the output from it, again in two formats
5766 - the result variables from the script (success, info, nodes) for
5771 "mem_size", "disks", "disk_template",
5772 "os", "tags", "nics", "vcpus", "hypervisor",
5778 def __init__(self, lu, mode, name, **kwargs):
5780 # init buffer variables
5781 self.in_text = self.out_text = self.in_data = self.out_data = None
5782 # init all input fields so that pylint is happy
5785 self.mem_size = self.disks = self.disk_template = None
5786 self.os = self.tags = self.nics = self.vcpus = None
5787 self.relocate_from = None
5789 self.required_nodes = None
5790 # init result fields
5791 self.success = self.info = self.nodes = None
5792 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5793 keyset = self._ALLO_KEYS
5794 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5795 keyset = self._RELO_KEYS
5797 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5798 " IAllocator" % self.mode)
5800 if key not in keyset:
5801 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5802 " IAllocator" % key)
5803 setattr(self, key, kwargs[key])
5805 if key not in kwargs:
5806 raise errors.ProgrammerError("Missing input parameter '%s' to"
5807 " IAllocator" % key)
5808 self._BuildInputData()
5810 def _ComputeClusterData(self):
5811 """Compute the generic allocator input data.
5813 This is the data that is independent of the actual operation.
5817 cluster_info = cfg.GetClusterInfo()
5821 "cluster_name": cfg.GetClusterName(),
5822 "cluster_tags": list(cluster_info.GetTags()),
5823 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5824 # we don't have job IDs
5826 iinfo = cfg.GetAllInstancesInfo().values()
5827 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5831 node_list = cfg.GetNodeList()
5833 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5834 hypervisor = self.hypervisor
5835 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5836 hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5838 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5840 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5841 cluster_info.enabled_hypervisors)
5842 for nname in node_list:
5843 ninfo = cfg.GetNodeInfo(nname)
5844 node_data[nname].Raise()
5845 if not isinstance(node_data[nname].data, dict):
5846 raise errors.OpExecError("Can't get data for node %s" % nname)
5847 remote_info = node_data[nname].data
5848 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5849 'vg_size', 'vg_free', 'cpu_total']:
5850 if attr not in remote_info:
5851 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5854 remote_info[attr] = int(remote_info[attr])
5855 except ValueError, err:
5856 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5857 " %s" % (nname, attr, str(err)))
5858 # compute memory used by primary instances
5859 i_p_mem = i_p_up_mem = 0
5860 for iinfo, beinfo in i_list:
5861 if iinfo.primary_node == nname:
5862 i_p_mem += beinfo[constants.BE_MEMORY]
5863 if iinfo.name not in node_iinfo[nname]:
5866 i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5867 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5868 remote_info['memory_free'] -= max(0, i_mem_diff)
5870 if iinfo.status == "up":
5871 i_p_up_mem += beinfo[constants.BE_MEMORY]
5873 # compute memory used by instances
5875 "tags": list(ninfo.GetTags()),
5876 "total_memory": remote_info['memory_total'],
5877 "reserved_memory": remote_info['memory_dom0'],
5878 "free_memory": remote_info['memory_free'],
5879 "i_pri_memory": i_p_mem,
5880 "i_pri_up_memory": i_p_up_mem,
5881 "total_disk": remote_info['vg_size'],
5882 "free_disk": remote_info['vg_free'],
5883 "primary_ip": ninfo.primary_ip,
5884 "secondary_ip": ninfo.secondary_ip,
5885 "total_cpus": remote_info['cpu_total'],
5887 node_results[nname] = pnr
5888 data["nodes"] = node_results
5892 for iinfo, beinfo in i_list:
5893 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5894 for n in iinfo.nics]
5896 "tags": list(iinfo.GetTags()),
5897 "should_run": iinfo.status == "up",
5898 "vcpus": beinfo[constants.BE_VCPUS],
5899 "memory": beinfo[constants.BE_MEMORY],
5901 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5903 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5904 "disk_template": iinfo.disk_template,
5905 "hypervisor": iinfo.hypervisor,
5907 instance_data[iinfo.name] = pir
5909 data["instances"] = instance_data
5913 def _AddNewInstance(self):
5914 """Add new instance data to allocator structure.
5916 This in combination with _AllocatorGetClusterData will create the
5917 correct structure needed as input for the allocator.
5919 The checks for the completeness of the opcode must have already been
5924 if len(self.disks) != 2:
5925 raise errors.OpExecError("Only two-disk configurations supported")
5927 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5929 if self.disk_template in constants.DTS_NET_MIRROR:
5930 self.required_nodes = 2
5932 self.required_nodes = 1
5936 "disk_template": self.disk_template,
5939 "vcpus": self.vcpus,
5940 "memory": self.mem_size,
5941 "disks": self.disks,
5942 "disk_space_total": disk_space,
5944 "required_nodes": self.required_nodes,
5946 data["request"] = request
5948 def _AddRelocateInstance(self):
5949 """Add relocate instance data to allocator structure.
5951 This in combination with _IAllocatorGetClusterData will create the
5952 correct structure needed as input for the allocator.
5954 The checks for the completeness of the opcode must have already been
5958 instance = self.lu.cfg.GetInstanceInfo(self.name)
5959 if instance is None:
5960 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5961 " IAllocator" % self.name)
5963 if instance.disk_template not in constants.DTS_NET_MIRROR:
5964 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5966 if len(instance.secondary_nodes) != 1:
5967 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5969 self.required_nodes = 1
5970 disk_sizes = [{'size': disk.size} for disk in instance.disks]
5971 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5976 "disk_space_total": disk_space,
5977 "required_nodes": self.required_nodes,
5978 "relocate_from": self.relocate_from,
5980 self.in_data["request"] = request
5982 def _BuildInputData(self):
5983 """Build input data structures.
5986 self._ComputeClusterData()
5988 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5989 self._AddNewInstance()
5991 self._AddRelocateInstance()
5993 self.in_text = serializer.Dump(self.in_data)
5995 def Run(self, name, validate=True, call_fn=None):
5996 """Run an instance allocator and return the results.
6000 call_fn = self.lu.rpc.call_iallocator_runner
6003 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6006 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6007 raise errors.OpExecError("Invalid result from master iallocator runner")
6009 rcode, stdout, stderr, fail = result.data
6011 if rcode == constants.IARUN_NOTFOUND:
6012 raise errors.OpExecError("Can't find allocator '%s'" % name)
6013 elif rcode == constants.IARUN_FAILURE:
6014 raise errors.OpExecError("Instance allocator call failed: %s,"
6015 " output: %s" % (fail, stdout+stderr))
6016 self.out_text = stdout
6018 self._ValidateResult()
6020 def _ValidateResult(self):
6021 """Process the allocator results.
6023 This will process and if successful save the result in
6024 self.out_data and the other parameters.
6028 rdict = serializer.Load(self.out_text)
6029 except Exception, err:
6030 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6032 if not isinstance(rdict, dict):
6033 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6035 for key in "success", "info", "nodes":
6036 if key not in rdict:
6037 raise errors.OpExecError("Can't parse iallocator results:"
6038 " missing key '%s'" % key)
6039 setattr(self, key, rdict[key])
6041 if not isinstance(rdict["nodes"], list):
6042 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6044 self.out_data = rdict
6047 class LUTestAllocator(NoHooksLU):
6048 """Run allocator tests.
6050 This LU runs the allocator tests
6053 _OP_REQP = ["direction", "mode", "name"]
6055 def CheckPrereq(self):
6056 """Check prerequisites.
6058 This checks the opcode parameters depending on the director and mode test.
6061 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6062 for attr in ["name", "mem_size", "disks", "disk_template",
6063 "os", "tags", "nics", "vcpus"]:
6064 if not hasattr(self.op, attr):
6065 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6067 iname = self.cfg.ExpandInstanceName(self.op.name)
6068 if iname is not None:
6069 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6071 if not isinstance(self.op.nics, list):
6072 raise errors.OpPrereqError("Invalid parameter 'nics'")
6073 for row in self.op.nics:
6074 if (not isinstance(row, dict) or
6077 "bridge" not in row):
6078 raise errors.OpPrereqError("Invalid contents of the"
6079 " 'nics' parameter")
6080 if not isinstance(self.op.disks, list):
6081 raise errors.OpPrereqError("Invalid parameter 'disks'")
6082 if len(self.op.disks) != 2:
6083 raise errors.OpPrereqError("Only two-disk configurations supported")
6084 for row in self.op.disks:
6085 if (not isinstance(row, dict) or
6086 "size" not in row or
6087 not isinstance(row["size"], int) or
6088 "mode" not in row or
6089 row["mode"] not in ['r', 'w']):
6090 raise errors.OpPrereqError("Invalid contents of the"
6091 " 'disks' parameter")
6092 if self.op.hypervisor is None:
6093 self.op.hypervisor = self.cfg.GetHypervisorType()
6094 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6095 if not hasattr(self.op, "name"):
6096 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6097 fname = self.cfg.ExpandInstanceName(self.op.name)
6099 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6101 self.op.name = fname
6102 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6104 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6107 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6108 if not hasattr(self.op, "allocator") or self.op.allocator is None:
6109 raise errors.OpPrereqError("Missing allocator name")
6110 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6111 raise errors.OpPrereqError("Wrong allocator test '%s'" %
6114 def Exec(self, feedback_fn):
6115 """Run the allocator test.
6118 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6119 ial = IAllocator(self,
6122 mem_size=self.op.mem_size,
6123 disks=self.op.disks,
6124 disk_template=self.op.disk_template,
6128 vcpus=self.op.vcpus,
6129 hypervisor=self.op.hypervisor,
6132 ial = IAllocator(self,
6135 relocate_from=list(self.relocate_from),
6138 if self.op.direction == constants.IALLOCATOR_DIR_IN:
6139 result = ial.in_text
6141 ial.Run(self.op.allocator, validate=False)
6142 result = ial.out_text