4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
49 class LogicalUnit(object):
50 """Logical Unit base class.
52 Subclasses must follow these rules:
53 - implement ExpandNames
54 - implement CheckPrereq
56 - implement BuildHooksEnv
57 - redefine HPATH and HTYPE
58 - optionally redefine their run requirements:
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 self.LogWarning = processor.LogWarning
92 self.LogInfo = processor.LogInfo
94 for attr_name in self._OP_REQP:
95 attr_val = getattr(op, attr_name, None)
97 raise errors.OpPrereqError("Required parameter '%s' missing" %
102 """Returns the SshRunner object
106 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
109 ssh = property(fget=__GetSSH)
111 def CheckArguments(self):
112 """Check syntactic validity for the opcode arguments.
114 This method is for doing a simple syntactic check and ensure
115 validity of opcode parameters, without any cluster-related
116 checks. While the same can be accomplished in ExpandNames and/or
117 CheckPrereq, doing these separate is better because:
119 - ExpandNames is left as as purely a lock-related function
120 - CheckPrereq is run after we have aquired locks (and possible
123 The function is allowed to change the self.op attribute so that
124 later methods can no longer worry about missing parameters.
129 def ExpandNames(self):
130 """Expand names for this LU.
132 This method is called before starting to execute the opcode, and it should
133 update all the parameters of the opcode to their canonical form (e.g. a
134 short node name must be fully expanded after this method has successfully
135 completed). This way locking, hooks, logging, ecc. can work correctly.
137 LUs which implement this method must also populate the self.needed_locks
138 member, as a dict with lock levels as keys, and a list of needed lock names
141 - use an empty dict if you don't need any lock
142 - if you don't need any lock at a particular level omit that level
143 - don't put anything for the BGL level
144 - if you want all locks at a level use locking.ALL_SET as a value
146 If you need to share locks (rather than acquire them exclusively) at one
147 level you can modify self.share_locks, setting a true value (usually 1) for
148 that level. By default locks are not shared.
152 # Acquire all nodes and one instance
153 self.needed_locks = {
154 locking.LEVEL_NODE: locking.ALL_SET,
155 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157 # Acquire just two nodes
158 self.needed_locks = {
159 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
162 self.needed_locks = {} # No, you can't leave it to the default value None
165 # The implementation of this method is mandatory only if the new LU is
166 # concurrent, so that old LUs don't need to be changed all at the same
169 self.needed_locks = {} # Exclusive LUs don't need locks.
171 raise NotImplementedError
173 def DeclareLocks(self, level):
174 """Declare LU locking needs for a level
176 While most LUs can just declare their locking needs at ExpandNames time,
177 sometimes there's the need to calculate some locks after having acquired
178 the ones before. This function is called just before acquiring locks at a
179 particular level, but after acquiring the ones at lower levels, and permits
180 such calculations. It can be used to modify self.needed_locks, and by
181 default it does nothing.
183 This function is only called if you have something already set in
184 self.needed_locks for the level.
186 @param level: Locking level which is going to be locked
187 @type level: member of ganeti.locking.LEVELS
191 def CheckPrereq(self):
192 """Check prerequisites for this LU.
194 This method should check that the prerequisites for the execution
195 of this LU are fulfilled. It can do internode communication, but
196 it should be idempotent - no cluster or system changes are
199 The method should raise errors.OpPrereqError in case something is
200 not fulfilled. Its return value is ignored.
202 This method should also update all the parameters of the opcode to
203 their canonical form if it hasn't been done by ExpandNames before.
206 raise NotImplementedError
208 def Exec(self, feedback_fn):
211 This method should implement the actual work. It should raise
212 errors.OpExecError for failures that are somewhat dealt with in
216 raise NotImplementedError
218 def BuildHooksEnv(self):
219 """Build hooks environment for this LU.
221 This method should return a three-node tuple consisting of: a dict
222 containing the environment that will be used for running the
223 specific hook for this LU, a list of node names on which the hook
224 should run before the execution, and a list of node names on which
225 the hook should run after the execution.
227 The keys of the dict must not have 'GANETI_' prefixed as this will
228 be handled in the hooks runner. Also note additional keys will be
229 added by the hooks runner. If the LU doesn't define any
230 environment, an empty dict (and not None) should be returned.
232 No nodes should be returned as an empty list (and not None).
234 Note that if the HPATH for a LU class is None, this function will
238 raise NotImplementedError
240 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241 """Notify the LU about the results of its hooks.
243 This method is called every time a hooks phase is executed, and notifies
244 the Logical Unit about the hooks' result. The LU can then use it to alter
245 its result based on the hooks. By default the method does nothing and the
246 previous result is passed back unchanged but any LU can define it if it
247 wants to use the local cluster hook-scripts somehow.
249 @param phase: one of L{constants.HOOKS_PHASE_POST} or
250 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251 @param hook_results: the results of the multi-node hooks rpc call
252 @param feedback_fn: function used send feedback back to the caller
253 @param lu_result: the previous Exec result this LU had, or None
255 @return: the new Exec result, based on the previous result
261 def _ExpandAndLockInstance(self):
262 """Helper function to expand and lock an instance.
264 Many LUs that work on an instance take its name in self.op.instance_name
265 and need to expand it and then declare the expanded name for locking. This
266 function does it, and then updates self.op.instance_name to the expanded
267 name. It also initializes needed_locks as a dict, if this hasn't been done
271 if self.needed_locks is None:
272 self.needed_locks = {}
274 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275 "_ExpandAndLockInstance called with instance-level locks set"
276 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277 if expanded_name is None:
278 raise errors.OpPrereqError("Instance '%s' not known" %
279 self.op.instance_name)
280 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281 self.op.instance_name = expanded_name
283 def _LockInstancesNodes(self, primary_only=False):
284 """Helper function to declare instances' nodes for locking.
286 This function should be called after locking one or more instances to lock
287 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288 with all primary or secondary nodes for instances already locked and
289 present in self.needed_locks[locking.LEVEL_INSTANCE].
291 It should be called from DeclareLocks, and for safety only works if
292 self.recalculate_locks[locking.LEVEL_NODE] is set.
294 In the future it may grow parameters to just lock some instance's nodes, or
295 to just lock primaries or secondary nodes, if needed.
297 If should be called in DeclareLocks in a way similar to::
299 if level == locking.LEVEL_NODE:
300 self._LockInstancesNodes()
302 @type primary_only: boolean
303 @param primary_only: only lock primary nodes of locked instances
306 assert locking.LEVEL_NODE in self.recalculate_locks, \
307 "_LockInstancesNodes helper function called with no nodes to recalculate"
309 # TODO: check if we're really been called with the instance locks held
311 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312 # future we might want to have different behaviors depending on the value
313 # of self.recalculate_locks[locking.LEVEL_NODE]
315 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316 instance = self.context.cfg.GetInstanceInfo(instance_name)
317 wanted_nodes.append(instance.primary_node)
319 wanted_nodes.extend(instance.secondary_nodes)
321 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326 del self.recalculate_locks[locking.LEVEL_NODE]
329 class NoHooksLU(LogicalUnit):
330 """Simple LU which runs no hooks.
332 This LU is intended as a parent for other LogicalUnits which will
333 run no hooks, in order to reduce duplicate code.
340 def _GetWantedNodes(lu, nodes):
341 """Returns list of checked and expanded node names.
343 @type lu: L{LogicalUnit}
344 @param lu: the logical unit on whose behalf we execute
346 @param nodes: list of node names or None for all nodes
348 @return: the list of nodes, sorted
349 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
352 if not isinstance(nodes, list):
353 raise errors.OpPrereqError("Invalid argument type 'nodes'")
356 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357 " non-empty list of nodes whose name is to be expanded.")
361 node = lu.cfg.ExpandNodeName(name)
363 raise errors.OpPrereqError("No such node name '%s'" % name)
366 return utils.NiceSort(wanted)
369 def _GetWantedInstances(lu, instances):
370 """Returns list of checked and expanded instance names.
372 @type lu: L{LogicalUnit}
373 @param lu: the logical unit on whose behalf we execute
374 @type instances: list
375 @param instances: list of instance names or None for all instances
377 @return: the list of instances, sorted
378 @raise errors.OpPrereqError: if the instances parameter is wrong type
379 @raise errors.OpPrereqError: if any of the passed instances is not found
382 if not isinstance(instances, list):
383 raise errors.OpPrereqError("Invalid argument type 'instances'")
388 for name in instances:
389 instance = lu.cfg.ExpandInstanceName(name)
391 raise errors.OpPrereqError("No such instance name '%s'" % name)
392 wanted.append(instance)
395 wanted = lu.cfg.GetInstanceList()
396 return utils.NiceSort(wanted)
399 def _CheckOutputFields(static, dynamic, selected):
400 """Checks whether all selected fields are valid.
402 @type static: L{utils.FieldSet}
403 @param static: static fields set
404 @type dynamic: L{utils.FieldSet}
405 @param dynamic: dynamic fields set
412 delta = f.NonMatching(selected)
414 raise errors.OpPrereqError("Unknown output fields selected: %s"
418 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419 memory, vcpus, nics):
420 """Builds instance related env variables for hooks
422 This builds the hook environment from individual variables.
425 @param name: the name of the instance
426 @type primary_node: string
427 @param primary_node: the name of the instance's primary node
428 @type secondary_nodes: list
429 @param secondary_nodes: list of secondary nodes as strings
430 @type os_type: string
431 @param os_type: the name of the instance's OS
433 @param status: the desired status of the instances
435 @param memory: the memory size of the instance
437 @param vcpus: the count of VCPUs the instance has
439 @param nics: list of tuples (ip, bridge, mac) representing
440 the NICs the instance has
442 @return: the hook environment for this instance
447 "INSTANCE_NAME": name,
448 "INSTANCE_PRIMARY": primary_node,
449 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450 "INSTANCE_OS_TYPE": os_type,
451 "INSTANCE_STATUS": status,
452 "INSTANCE_MEMORY": memory,
453 "INSTANCE_VCPUS": vcpus,
457 nic_count = len(nics)
458 for idx, (ip, bridge, mac) in enumerate(nics):
461 env["INSTANCE_NIC%d_IP" % idx] = ip
462 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
467 env["INSTANCE_NIC_COUNT"] = nic_count
472 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473 """Builds instance related env variables for hooks from an object.
475 @type lu: L{LogicalUnit}
476 @param lu: the logical unit on whose behalf we execute
477 @type instance: L{objects.Instance}
478 @param instance: the instance for which we should build the
481 @param override: dictionary with key/values that will override
484 @return: the hook environment dictionary
487 bep = lu.cfg.GetClusterInfo().FillBE(instance)
489 'name': instance.name,
490 'primary_node': instance.primary_node,
491 'secondary_nodes': instance.secondary_nodes,
492 'os_type': instance.os,
493 'status': instance.os,
494 'memory': bep[constants.BE_MEMORY],
495 'vcpus': bep[constants.BE_VCPUS],
496 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
499 args.update(override)
500 return _BuildInstanceHookEnv(**args)
503 def _CheckInstanceBridgesExist(lu, instance):
504 """Check that the brigdes needed by an instance exist.
507 # check bridges existance
508 brlist = [nic.bridge for nic in instance.nics]
509 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
512 raise errors.OpPrereqError("One or more target bridges %s does not"
513 " exist on destination node '%s'" %
514 (brlist, instance.primary_node))
517 class LUDestroyCluster(NoHooksLU):
518 """Logical unit for destroying the cluster.
523 def CheckPrereq(self):
524 """Check prerequisites.
526 This checks whether the cluster is empty.
528 Any errors are signalled by raising errors.OpPrereqError.
531 master = self.cfg.GetMasterNode()
533 nodelist = self.cfg.GetNodeList()
534 if len(nodelist) != 1 or nodelist[0] != master:
535 raise errors.OpPrereqError("There are still %d node(s) in"
536 " this cluster." % (len(nodelist) - 1))
537 instancelist = self.cfg.GetInstanceList()
539 raise errors.OpPrereqError("There are still %d instance(s) in"
540 " this cluster." % len(instancelist))
542 def Exec(self, feedback_fn):
543 """Destroys the cluster.
546 master = self.cfg.GetMasterNode()
547 result = self.rpc.call_node_stop_master(master, False)
550 raise errors.OpExecError("Could not disable the master role")
551 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552 utils.CreateBackup(priv_key)
553 utils.CreateBackup(pub_key)
557 class LUVerifyCluster(LogicalUnit):
558 """Verifies the cluster status.
561 HPATH = "cluster-verify"
562 HTYPE = constants.HTYPE_CLUSTER
563 _OP_REQP = ["skip_checks"]
566 def ExpandNames(self):
567 self.needed_locks = {
568 locking.LEVEL_NODE: locking.ALL_SET,
569 locking.LEVEL_INSTANCE: locking.ALL_SET,
571 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
573 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574 node_result, feedback_fn, master_files):
575 """Run multiple tests against a node.
579 - compares ganeti version
580 - checks vg existance and size > 20G
581 - checks config file checksum
582 - checks ssh to other nodes
584 @type nodeinfo: L{objects.Node}
585 @param nodeinfo: the node to check
586 @param file_list: required list of files
587 @param local_cksum: dictionary of local files and their checksums
588 @param node_result: the results from the node
589 @param feedback_fn: function used to accumulate results
590 @param master_files: list of files that only masters should have
595 # main result, node_result should be a non-empty dict
596 if not node_result or not isinstance(node_result, dict):
597 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
600 # compares ganeti version
601 local_version = constants.PROTOCOL_VERSION
602 remote_version = node_result.get('version', None)
603 if not remote_version:
604 feedback_fn(" - ERROR: connection to %s failed" % (node))
607 if local_version != remote_version:
608 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
609 (local_version, node, remote_version))
612 # checks vg existance and size > 20G
615 vglist = node_result.get(constants.NV_VGLIST, None)
617 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
621 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622 constants.MIN_VG_SIZE)
624 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
627 # checks config file checksum
629 remote_cksum = node_result.get(constants.NV_FILELIST, None)
630 if not isinstance(remote_cksum, dict):
632 feedback_fn(" - ERROR: node hasn't returned file checksum data")
634 for file_name in file_list:
635 node_is_mc = nodeinfo.master_candidate
636 must_have_file = file_name not in master_files
637 if file_name not in remote_cksum:
638 if node_is_mc or must_have_file:
640 feedback_fn(" - ERROR: file '%s' missing" % file_name)
641 elif remote_cksum[file_name] != local_cksum[file_name]:
642 if node_is_mc or must_have_file:
644 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
646 # not candidate and this is not a must-have file
648 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
651 # all good, except non-master/non-must have combination
652 if not node_is_mc and not must_have_file:
653 feedback_fn(" - ERROR: file '%s' should not exist on non master"
654 " candidates" % file_name)
658 if constants.NV_NODELIST not in node_result:
660 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
662 if node_result[constants.NV_NODELIST]:
664 for node in node_result[constants.NV_NODELIST]:
665 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
666 (node, node_result[constants.NV_NODELIST][node]))
668 if constants.NV_NODENETTEST not in node_result:
670 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
672 if node_result[constants.NV_NODENETTEST]:
674 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
676 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
677 (node, node_result[constants.NV_NODENETTEST][node]))
679 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680 if isinstance(hyp_result, dict):
681 for hv_name, hv_result in hyp_result.iteritems():
682 if hv_result is not None:
683 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
684 (hv_name, hv_result))
687 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688 node_instance, feedback_fn):
689 """Verify an instance.
691 This function checks to see if the required block devices are
692 available on the instance's node.
697 node_current = instanceconfig.primary_node
700 instanceconfig.MapLVsByNode(node_vol_should)
702 for node in node_vol_should:
703 for volume in node_vol_should[node]:
704 if node not in node_vol_is or volume not in node_vol_is[node]:
705 feedback_fn(" - ERROR: volume %s missing on node %s" %
709 if not instanceconfig.status == 'down':
710 if (node_current not in node_instance or
711 not instance in node_instance[node_current]):
712 feedback_fn(" - ERROR: instance %s not running on node %s" %
713 (instance, node_current))
716 for node in node_instance:
717 if (not node == node_current):
718 if instance in node_instance[node]:
719 feedback_fn(" - ERROR: instance %s should not run on node %s" %
725 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726 """Verify if there are any unknown volumes in the cluster.
728 The .os, .swap and backup volumes are ignored. All other volumes are
734 for node in node_vol_is:
735 for volume in node_vol_is[node]:
736 if node not in node_vol_should or volume not in node_vol_should[node]:
737 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
742 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743 """Verify the list of running instances.
745 This checks what instances are running but unknown to the cluster.
749 for node in node_instance:
750 for runninginstance in node_instance[node]:
751 if runninginstance not in instancelist:
752 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
753 (runninginstance, node))
757 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758 """Verify N+1 Memory Resilience.
760 Check that if one single node dies we can still start all the instances it
766 for node, nodeinfo in node_info.iteritems():
767 # This code checks that every node which is now listed as secondary has
768 # enough memory to host all instances it is supposed to should a single
769 # other node in the cluster fail.
770 # FIXME: not ready for failover to an arbitrary node
771 # FIXME: does not support file-backed instances
772 # WARNING: we currently take into account down instances as well as up
773 # ones, considering that even if they're down someone might want to start
774 # them even in the event of a node failure.
775 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
777 for instance in instances:
778 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779 if bep[constants.BE_AUTO_BALANCE]:
780 needed_mem += bep[constants.BE_MEMORY]
781 if nodeinfo['mfree'] < needed_mem:
782 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
783 " failovers should node %s fail" % (node, prinode))
787 def CheckPrereq(self):
788 """Check prerequisites.
790 Transform the list of checks we're going to skip into a set and check that
791 all its members are valid.
794 self.skip_set = frozenset(self.op.skip_checks)
795 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796 raise errors.OpPrereqError("Invalid checks to be skipped specified")
798 def BuildHooksEnv(self):
801 Cluster-Verify hooks just rone in the post phase and their failure makes
802 the output be logged in the verify output and the verification to fail.
805 all_nodes = self.cfg.GetNodeList()
806 # TODO: populate the environment with useful information for verify hooks
808 return env, [], all_nodes
810 def Exec(self, feedback_fn):
811 """Verify integrity of cluster, performing various test on nodes.
815 feedback_fn("* Verifying global settings")
816 for msg in self.cfg.VerifyConfig():
817 feedback_fn(" - ERROR: %s" % msg)
819 vg_name = self.cfg.GetVGName()
820 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821 nodelist = utils.NiceSort(self.cfg.GetNodeList())
822 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824 i_non_redundant = [] # Non redundant instances
825 i_non_a_balanced = [] # Non auto-balanced instances
831 # FIXME: verify OS list
833 master_files = [constants.CLUSTER_CONF_FILE]
835 file_names = ssconf.SimpleStore().GetFileList()
836 file_names.append(constants.SSL_CERT_FILE)
837 file_names.extend(master_files)
839 local_checksums = utils.FingerprintFiles(file_names)
841 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842 node_verify_param = {
843 constants.NV_FILELIST: file_names,
844 constants.NV_NODELIST: nodelist,
845 constants.NV_HYPERVISOR: hypervisors,
846 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847 node.secondary_ip) for node in nodeinfo],
848 constants.NV_LVLIST: vg_name,
849 constants.NV_INSTANCELIST: hypervisors,
850 constants.NV_VGLIST: None,
851 constants.NV_VERSION: None,
852 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
854 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855 self.cfg.GetClusterName())
857 cluster = self.cfg.GetClusterInfo()
858 master_node = self.cfg.GetMasterNode()
859 for node_i in nodeinfo:
861 nresult = all_nvinfo[node].data
863 if node == master_node:
865 elif node_i.master_candidate:
866 ntype = "master candidate"
869 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
871 if all_nvinfo[node].failed or not isinstance(nresult, dict):
872 feedback_fn(" - ERROR: connection to %s failed" % (node,))
876 result = self._VerifyNode(node_i, file_names, local_checksums,
877 nresult, feedback_fn, master_files)
880 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881 if isinstance(lvdata, basestring):
882 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
883 (node, lvdata.encode('string_escape')))
885 node_volume[node] = {}
886 elif not isinstance(lvdata, dict):
887 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
891 node_volume[node] = lvdata
894 idata = nresult.get(constants.NV_INSTANCELIST, None)
895 if not isinstance(idata, list):
896 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
901 node_instance[node] = idata
904 nodeinfo = nresult.get(constants.NV_HVINFO, None)
905 if not isinstance(nodeinfo, dict):
906 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
912 "mfree": int(nodeinfo['memory_free']),
913 "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
916 # dictionary holding all instances this node is secondary for,
917 # grouped by their primary node. Each key is a cluster node, and each
918 # value is a list of instances which have the key as primary and the
919 # current node as secondary. this is handy to calculate N+1 memory
920 # availability if you can only failover from a primary to its
922 "sinst-by-pnode": {},
925 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
931 for instance in instancelist:
932 feedback_fn("* Verifying instance %s" % instance)
933 inst_config = self.cfg.GetInstanceInfo(instance)
934 result = self._VerifyInstance(instance, inst_config, node_volume,
935 node_instance, feedback_fn)
938 inst_config.MapLVsByNode(node_vol_should)
940 instance_cfg[instance] = inst_config
942 pnode = inst_config.primary_node
943 if pnode in node_info:
944 node_info[pnode]['pinst'].append(instance)
946 feedback_fn(" - ERROR: instance %s, connection to primary node"
947 " %s failed" % (instance, pnode))
950 # If the instance is non-redundant we cannot survive losing its primary
951 # node, so we are not N+1 compliant. On the other hand we have no disk
952 # templates with more than one secondary so that situation is not well
954 # FIXME: does not support file-backed instances
955 if len(inst_config.secondary_nodes) == 0:
956 i_non_redundant.append(instance)
957 elif len(inst_config.secondary_nodes) > 1:
958 feedback_fn(" - WARNING: multiple secondaries for instance %s"
961 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962 i_non_a_balanced.append(instance)
964 for snode in inst_config.secondary_nodes:
965 if snode in node_info:
966 node_info[snode]['sinst'].append(instance)
967 if pnode not in node_info[snode]['sinst-by-pnode']:
968 node_info[snode]['sinst-by-pnode'][pnode] = []
969 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
971 feedback_fn(" - ERROR: instance %s, connection to secondary node"
972 " %s failed" % (instance, snode))
974 feedback_fn("* Verifying orphan volumes")
975 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
979 feedback_fn("* Verifying remaining instances")
980 result = self._VerifyOrphanInstances(instancelist, node_instance,
984 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985 feedback_fn("* Verifying N+1 Memory redundancy")
986 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
989 feedback_fn("* Other Notes")
991 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
992 % len(i_non_redundant))
995 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
996 % len(i_non_a_balanced))
1000 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001 """Analize the post-hooks' result
1003 This method analyses the hook result, handles it, and sends some
1004 nicely-formatted feedback back to the user.
1006 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008 @param hooks_results: the results of the multi-node hooks rpc call
1009 @param feedback_fn: function used send feedback back to the caller
1010 @param lu_result: previous Exec result
1011 @return: the new Exec result, based on the previous result
1015 # We only really run POST phase hooks, and are only interested in
1017 if phase == constants.HOOKS_PHASE_POST:
1018 # Used to change hooks' output to proper indentation
1019 indent_re = re.compile('^', re.M)
1020 feedback_fn("* Hooks Results")
1021 if not hooks_results:
1022 feedback_fn(" - ERROR: general communication failure")
1025 for node_name in hooks_results:
1026 show_node_header = True
1027 res = hooks_results[node_name]
1028 if res.failed or res.data is False or not isinstance(res.data, list):
1029 feedback_fn(" Communication failure in hooks execution")
1032 for script, hkr, output in res.data:
1033 if hkr == constants.HKR_FAIL:
1034 # The node header is only shown once, if there are
1035 # failing hooks on that node
1036 if show_node_header:
1037 feedback_fn(" Node %s:" % node_name)
1038 show_node_header = False
1039 feedback_fn(" ERROR: Script %s failed, output:" % script)
1040 output = indent_re.sub(' ', output)
1041 feedback_fn("%s" % output)
1047 class LUVerifyDisks(NoHooksLU):
1048 """Verifies the cluster disks status.
1054 def ExpandNames(self):
1055 self.needed_locks = {
1056 locking.LEVEL_NODE: locking.ALL_SET,
1057 locking.LEVEL_INSTANCE: locking.ALL_SET,
1059 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1061 def CheckPrereq(self):
1062 """Check prerequisites.
1064 This has no prerequisites.
1069 def Exec(self, feedback_fn):
1070 """Verify integrity of cluster disks.
1073 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1075 vg_name = self.cfg.GetVGName()
1076 nodes = utils.NiceSort(self.cfg.GetNodeList())
1077 instances = [self.cfg.GetInstanceInfo(name)
1078 for name in self.cfg.GetInstanceList()]
1081 for inst in instances:
1083 if (inst.status != "up" or
1084 inst.disk_template not in constants.DTS_NET_MIRROR):
1086 inst.MapLVsByNode(inst_lvs)
1087 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088 for node, vol_list in inst_lvs.iteritems():
1089 for vol in vol_list:
1090 nv_dict[(node, vol)] = inst
1095 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1100 lvs = node_lvs[node]
1102 self.LogWarning("Connection to node %s failed: %s" %
1106 if isinstance(lvs, basestring):
1107 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108 res_nlvm[node] = lvs
1109 elif not isinstance(lvs, dict):
1110 logging.warning("Connection to node %s failed or invalid data"
1112 res_nodes.append(node)
1115 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116 inst = nv_dict.pop((node, lv_name), None)
1117 if (not lv_online and inst is not None
1118 and inst.name not in res_instances):
1119 res_instances.append(inst.name)
1121 # any leftover items in nv_dict are missing LVs, let's arrange the
1123 for key, inst in nv_dict.iteritems():
1124 if inst.name not in res_missing:
1125 res_missing[inst.name] = []
1126 res_missing[inst.name].append(key)
1131 class LURenameCluster(LogicalUnit):
1132 """Rename the cluster.
1135 HPATH = "cluster-rename"
1136 HTYPE = constants.HTYPE_CLUSTER
1139 def BuildHooksEnv(self):
1144 "OP_TARGET": self.cfg.GetClusterName(),
1145 "NEW_NAME": self.op.name,
1147 mn = self.cfg.GetMasterNode()
1148 return env, [mn], [mn]
1150 def CheckPrereq(self):
1151 """Verify that the passed name is a valid one.
1154 hostname = utils.HostInfo(self.op.name)
1156 new_name = hostname.name
1157 self.ip = new_ip = hostname.ip
1158 old_name = self.cfg.GetClusterName()
1159 old_ip = self.cfg.GetMasterIP()
1160 if new_name == old_name and new_ip == old_ip:
1161 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162 " cluster has changed")
1163 if new_ip != old_ip:
1164 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166 " reachable on the network. Aborting." %
1169 self.op.name = new_name
1171 def Exec(self, feedback_fn):
1172 """Rename the cluster.
1175 clustername = self.op.name
1178 # shutdown the master IP
1179 master = self.cfg.GetMasterNode()
1180 result = self.rpc.call_node_stop_master(master, False)
1181 if result.failed or not result.data:
1182 raise errors.OpExecError("Could not disable the master role")
1185 cluster = self.cfg.GetClusterInfo()
1186 cluster.cluster_name = clustername
1187 cluster.master_ip = ip
1188 self.cfg.Update(cluster)
1190 # update the known hosts file
1191 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192 node_list = self.cfg.GetNodeList()
1194 node_list.remove(master)
1197 result = self.rpc.call_upload_file(node_list,
1198 constants.SSH_KNOWN_HOSTS_FILE)
1199 for to_node, to_result in result.iteritems():
1200 if to_result.failed or not to_result.data:
1201 logging.error("Copy of file %s to node %s failed", fname, to_node)
1204 result = self.rpc.call_node_start_master(master, False)
1205 if result.failed or not result.data:
1206 self.LogWarning("Could not re-enable the master role on"
1207 " the master, please restart manually.")
1210 def _RecursiveCheckIfLVMBased(disk):
1211 """Check if the given disk or its children are lvm-based.
1213 @type disk: L{objects.Disk}
1214 @param disk: the disk to check
1216 @return: boolean indicating whether a LD_LV dev_type was found or not
1220 for chdisk in disk.children:
1221 if _RecursiveCheckIfLVMBased(chdisk):
1223 return disk.dev_type == constants.LD_LV
1226 class LUSetClusterParams(LogicalUnit):
1227 """Change the parameters of the cluster.
1230 HPATH = "cluster-modify"
1231 HTYPE = constants.HTYPE_CLUSTER
1235 def CheckParameters(self):
1239 if not hasattr(self.op, "candidate_pool_size"):
1240 self.op.candidate_pool_size = None
1241 if self.op.candidate_pool_size is not None:
1243 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244 except ValueError, err:
1245 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1247 if self.op.candidate_pool_size < 1:
1248 raise errors.OpPrereqError("At least one master candidate needed")
1250 def ExpandNames(self):
1251 # FIXME: in the future maybe other cluster params won't require checking on
1252 # all nodes to be modified.
1253 self.needed_locks = {
1254 locking.LEVEL_NODE: locking.ALL_SET,
1256 self.share_locks[locking.LEVEL_NODE] = 1
1258 def BuildHooksEnv(self):
1263 "OP_TARGET": self.cfg.GetClusterName(),
1264 "NEW_VG_NAME": self.op.vg_name,
1266 mn = self.cfg.GetMasterNode()
1267 return env, [mn], [mn]
1269 def CheckPrereq(self):
1270 """Check prerequisites.
1272 This checks whether the given params don't conflict and
1273 if the given volume group is valid.
1276 # FIXME: This only works because there is only one parameter that can be
1277 # changed or removed.
1278 if self.op.vg_name is not None and not self.op.vg_name:
1279 instances = self.cfg.GetAllInstancesInfo().values()
1280 for inst in instances:
1281 for disk in inst.disks:
1282 if _RecursiveCheckIfLVMBased(disk):
1283 raise errors.OpPrereqError("Cannot disable lvm storage while"
1284 " lvm-based instances exist")
1286 node_list = self.acquired_locks[locking.LEVEL_NODE]
1288 # if vg_name not None, checks given volume group on all nodes
1290 vglist = self.rpc.call_vg_list(node_list)
1291 for node in node_list:
1292 if vglist[node].failed:
1293 # ignoring down node
1294 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1296 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1298 constants.MIN_VG_SIZE)
1300 raise errors.OpPrereqError("Error on node '%s': %s" %
1303 self.cluster = cluster = self.cfg.GetClusterInfo()
1304 # validate beparams changes
1305 if self.op.beparams:
1306 utils.CheckBEParams(self.op.beparams)
1307 self.new_beparams = cluster.FillDict(
1308 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1310 # hypervisor list/parameters
1311 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312 if self.op.hvparams:
1313 if not isinstance(self.op.hvparams, dict):
1314 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315 for hv_name, hv_dict in self.op.hvparams.items():
1316 if hv_name not in self.new_hvparams:
1317 self.new_hvparams[hv_name] = hv_dict
1319 self.new_hvparams[hv_name].update(hv_dict)
1321 if self.op.enabled_hypervisors is not None:
1322 self.hv_list = self.op.enabled_hypervisors
1324 self.hv_list = cluster.enabled_hypervisors
1326 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327 # either the enabled list has changed, or the parameters have, validate
1328 for hv_name, hv_params in self.new_hvparams.items():
1329 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330 (self.op.enabled_hypervisors and
1331 hv_name in self.op.enabled_hypervisors)):
1332 # either this is a new hypervisor, or its parameters have changed
1333 hv_class = hypervisor.GetHypervisor(hv_name)
1334 hv_class.CheckParameterSyntax(hv_params)
1335 _CheckHVParams(self, node_list, hv_name, hv_params)
1337 def Exec(self, feedback_fn):
1338 """Change the parameters of the cluster.
1341 if self.op.vg_name is not None:
1342 if self.op.vg_name != self.cfg.GetVGName():
1343 self.cfg.SetVGName(self.op.vg_name)
1345 feedback_fn("Cluster LVM configuration already in desired"
1346 " state, not changing")
1347 if self.op.hvparams:
1348 self.cluster.hvparams = self.new_hvparams
1349 if self.op.enabled_hypervisors is not None:
1350 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351 if self.op.beparams:
1352 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353 if self.op.candidate_pool_size is not None:
1354 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1356 self.cfg.Update(self.cluster)
1358 # we want to update nodes after the cluster so that if any errors
1359 # happen, we have recorded and saved the cluster info
1360 if self.op.candidate_pool_size is not None:
1361 node_info = self.cfg.GetAllNodesInfo().values()
1362 num_candidates = len([node for node in node_info
1363 if node.master_candidate])
1364 num_nodes = len(node_info)
1365 if num_candidates < self.op.candidate_pool_size:
1366 random.shuffle(node_info)
1367 for node in node_info:
1368 if num_candidates >= self.op.candidate_pool_size:
1370 if node.master_candidate:
1372 node.master_candidate = True
1373 self.LogInfo("Promoting node %s to master candidate", node.name)
1374 self.cfg.Update(node)
1375 self.context.ReaddNode(node)
1377 elif num_candidates > self.op.candidate_pool_size:
1378 self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379 " of candidate_pool_size (%d)" %
1380 (num_candidates, self.op.candidate_pool_size))
1383 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384 """Sleep and poll for an instance's disk to sync.
1387 if not instance.disks:
1391 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1393 node = instance.primary_node
1395 for dev in instance.disks:
1396 lu.cfg.SetDiskID(dev, node)
1402 cumul_degraded = False
1403 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404 if rstats.failed or not rstats.data:
1405 lu.LogWarning("Can't get any data from node %s", node)
1408 raise errors.RemoteError("Can't contact node %s for mirror data,"
1409 " aborting." % node)
1412 rstats = rstats.data
1414 for i in range(len(rstats)):
1417 lu.LogWarning("Can't compute data for node %s/%s",
1418 node, instance.disks[i].iv_name)
1420 # we ignore the ldisk parameter
1421 perc_done, est_time, is_degraded, _ = mstat
1422 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423 if perc_done is not None:
1425 if est_time is not None:
1426 rem_time = "%d estimated seconds remaining" % est_time
1429 rem_time = "no time estimate"
1430 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431 (instance.disks[i].iv_name, perc_done, rem_time))
1435 time.sleep(min(60, max_time))
1438 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439 return not cumul_degraded
1442 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443 """Check that mirrors are not degraded.
1445 The ldisk parameter, if True, will change the test from the
1446 is_degraded attribute (which represents overall non-ok status for
1447 the device(s)) to the ldisk (representing the local storage status).
1450 lu.cfg.SetDiskID(dev, node)
1457 if on_primary or dev.AssembleOnSecondary():
1458 rstats = lu.rpc.call_blockdev_find(node, dev)
1459 if rstats.failed or not rstats.data:
1460 logging.warning("Node %s: disk degraded, not found or node down", node)
1463 result = result and (not rstats.data[idx])
1465 for child in dev.children:
1466 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1471 class LUDiagnoseOS(NoHooksLU):
1472 """Logical unit for OS diagnose/query.
1475 _OP_REQP = ["output_fields", "names"]
1477 _FIELDS_STATIC = utils.FieldSet()
1478 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1480 def ExpandNames(self):
1482 raise errors.OpPrereqError("Selective OS query not supported")
1484 _CheckOutputFields(static=self._FIELDS_STATIC,
1485 dynamic=self._FIELDS_DYNAMIC,
1486 selected=self.op.output_fields)
1488 # Lock all nodes, in shared mode
1489 self.needed_locks = {}
1490 self.share_locks[locking.LEVEL_NODE] = 1
1491 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1493 def CheckPrereq(self):
1494 """Check prerequisites.
1499 def _DiagnoseByOS(node_list, rlist):
1500 """Remaps a per-node return list into an a per-os per-node dictionary
1502 @param node_list: a list with the names of all nodes
1503 @param rlist: a map with node names as keys and OS objects as values
1506 @returns: a dictionary with osnames as keys and as value another map, with
1507 nodes as keys and list of OS objects as values, eg::
1509 {"debian-etch": {"node1": [<object>,...],
1510 "node2": [<object>,]}
1515 for node_name, nr in rlist.iteritems():
1516 if nr.failed or not nr.data:
1518 for os_obj in nr.data:
1519 if os_obj.name not in all_os:
1520 # build a list of nodes for this os containing empty lists
1521 # for each node in node_list
1522 all_os[os_obj.name] = {}
1523 for nname in node_list:
1524 all_os[os_obj.name][nname] = []
1525 all_os[os_obj.name][node_name].append(os_obj)
1528 def Exec(self, feedback_fn):
1529 """Compute the list of OSes.
1532 node_list = self.acquired_locks[locking.LEVEL_NODE]
1533 node_data = self.rpc.call_os_diagnose(node_list)
1534 if node_data == False:
1535 raise errors.OpExecError("Can't gather the list of OSes")
1536 pol = self._DiagnoseByOS(node_list, node_data)
1538 for os_name, os_data in pol.iteritems():
1540 for field in self.op.output_fields:
1543 elif field == "valid":
1544 val = utils.all([osl and osl[0] for osl in os_data.values()])
1545 elif field == "node_status":
1547 for node_name, nos_list in os_data.iteritems():
1548 val[node_name] = [(v.status, v.path) for v in nos_list]
1550 raise errors.ParameterError(field)
1557 class LURemoveNode(LogicalUnit):
1558 """Logical unit for removing a node.
1561 HPATH = "node-remove"
1562 HTYPE = constants.HTYPE_NODE
1563 _OP_REQP = ["node_name"]
1565 def BuildHooksEnv(self):
1568 This doesn't run on the target node in the pre phase as a failed
1569 node would then be impossible to remove.
1573 "OP_TARGET": self.op.node_name,
1574 "NODE_NAME": self.op.node_name,
1576 all_nodes = self.cfg.GetNodeList()
1577 all_nodes.remove(self.op.node_name)
1578 return env, all_nodes, all_nodes
1580 def CheckPrereq(self):
1581 """Check prerequisites.
1584 - the node exists in the configuration
1585 - it does not have primary or secondary instances
1586 - it's not the master
1588 Any errors are signalled by raising errors.OpPrereqError.
1591 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1593 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1595 instance_list = self.cfg.GetInstanceList()
1597 masternode = self.cfg.GetMasterNode()
1598 if node.name == masternode:
1599 raise errors.OpPrereqError("Node is the master node,"
1600 " you need to failover first.")
1602 for instance_name in instance_list:
1603 instance = self.cfg.GetInstanceInfo(instance_name)
1604 if node.name == instance.primary_node:
1605 raise errors.OpPrereqError("Instance %s still running on the node,"
1606 " please remove first." % instance_name)
1607 if node.name in instance.secondary_nodes:
1608 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609 " please remove first." % instance_name)
1610 self.op.node_name = node.name
1613 def Exec(self, feedback_fn):
1614 """Removes the node from the cluster.
1618 logging.info("Stopping the node daemon and removing configs from node %s",
1621 self.context.RemoveNode(node.name)
1623 self.rpc.call_node_leave_cluster(node.name)
1625 # Promote nodes to master candidate as needed
1626 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1627 node_info = self.cfg.GetAllNodesInfo().values()
1628 num_candidates = len([n for n in node_info
1629 if n.master_candidate])
1630 num_nodes = len(node_info)
1631 random.shuffle(node_info)
1632 for node in node_info:
1633 if num_candidates >= cp_size or num_candidates >= num_nodes:
1635 if node.master_candidate:
1637 node.master_candidate = True
1638 self.LogInfo("Promoting node %s to master candidate", node.name)
1639 self.cfg.Update(node)
1640 self.context.ReaddNode(node)
1644 class LUQueryNodes(NoHooksLU):
1645 """Logical unit for querying nodes.
1648 _OP_REQP = ["output_fields", "names"]
1650 _FIELDS_DYNAMIC = utils.FieldSet(
1652 "mtotal", "mnode", "mfree",
1657 _FIELDS_STATIC = utils.FieldSet(
1658 "name", "pinst_cnt", "sinst_cnt",
1659 "pinst_list", "sinst_list",
1660 "pip", "sip", "tags",
1666 def ExpandNames(self):
1667 _CheckOutputFields(static=self._FIELDS_STATIC,
1668 dynamic=self._FIELDS_DYNAMIC,
1669 selected=self.op.output_fields)
1671 self.needed_locks = {}
1672 self.share_locks[locking.LEVEL_NODE] = 1
1675 self.wanted = _GetWantedNodes(self, self.op.names)
1677 self.wanted = locking.ALL_SET
1679 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1681 # if we don't request only static fields, we need to lock the nodes
1682 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1685 def CheckPrereq(self):
1686 """Check prerequisites.
1689 # The validation of the node list is done in the _GetWantedNodes,
1690 # if non empty, and if empty, there's no validation to do
1693 def Exec(self, feedback_fn):
1694 """Computes the list of nodes and their attributes.
1697 all_info = self.cfg.GetAllNodesInfo()
1699 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1700 elif self.wanted != locking.ALL_SET:
1701 nodenames = self.wanted
1702 missing = set(nodenames).difference(all_info.keys())
1704 raise errors.OpExecError(
1705 "Some nodes were removed before retrieving their data: %s" % missing)
1707 nodenames = all_info.keys()
1709 nodenames = utils.NiceSort(nodenames)
1710 nodelist = [all_info[name] for name in nodenames]
1712 # begin data gathering
1716 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1717 self.cfg.GetHypervisorType())
1718 for name in nodenames:
1719 nodeinfo = node_data[name]
1720 if not nodeinfo.failed and nodeinfo.data:
1721 nodeinfo = nodeinfo.data
1722 fn = utils.TryConvert
1724 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1725 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1726 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1727 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1728 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1729 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1730 "bootid": nodeinfo.get('bootid', None),
1733 live_data[name] = {}
1735 live_data = dict.fromkeys(nodenames, {})
1737 node_to_primary = dict([(name, set()) for name in nodenames])
1738 node_to_secondary = dict([(name, set()) for name in nodenames])
1740 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1741 "sinst_cnt", "sinst_list"))
1742 if inst_fields & frozenset(self.op.output_fields):
1743 instancelist = self.cfg.GetInstanceList()
1745 for instance_name in instancelist:
1746 inst = self.cfg.GetInstanceInfo(instance_name)
1747 if inst.primary_node in node_to_primary:
1748 node_to_primary[inst.primary_node].add(inst.name)
1749 for secnode in inst.secondary_nodes:
1750 if secnode in node_to_secondary:
1751 node_to_secondary[secnode].add(inst.name)
1753 master_node = self.cfg.GetMasterNode()
1755 # end data gathering
1758 for node in nodelist:
1760 for field in self.op.output_fields:
1763 elif field == "pinst_list":
1764 val = list(node_to_primary[node.name])
1765 elif field == "sinst_list":
1766 val = list(node_to_secondary[node.name])
1767 elif field == "pinst_cnt":
1768 val = len(node_to_primary[node.name])
1769 elif field == "sinst_cnt":
1770 val = len(node_to_secondary[node.name])
1771 elif field == "pip":
1772 val = node.primary_ip
1773 elif field == "sip":
1774 val = node.secondary_ip
1775 elif field == "tags":
1776 val = list(node.GetTags())
1777 elif field == "serial_no":
1778 val = node.serial_no
1779 elif field == "master_candidate":
1780 val = node.master_candidate
1781 elif field == "master":
1782 val = node.name == master_node
1783 elif self._FIELDS_DYNAMIC.Matches(field):
1784 val = live_data[node.name].get(field, None)
1786 raise errors.ParameterError(field)
1787 node_output.append(val)
1788 output.append(node_output)
1793 class LUQueryNodeVolumes(NoHooksLU):
1794 """Logical unit for getting volumes on node(s).
1797 _OP_REQP = ["nodes", "output_fields"]
1799 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1800 _FIELDS_STATIC = utils.FieldSet("node")
1802 def ExpandNames(self):
1803 _CheckOutputFields(static=self._FIELDS_STATIC,
1804 dynamic=self._FIELDS_DYNAMIC,
1805 selected=self.op.output_fields)
1807 self.needed_locks = {}
1808 self.share_locks[locking.LEVEL_NODE] = 1
1809 if not self.op.nodes:
1810 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1812 self.needed_locks[locking.LEVEL_NODE] = \
1813 _GetWantedNodes(self, self.op.nodes)
1815 def CheckPrereq(self):
1816 """Check prerequisites.
1818 This checks that the fields required are valid output fields.
1821 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1823 def Exec(self, feedback_fn):
1824 """Computes the list of nodes and their attributes.
1827 nodenames = self.nodes
1828 volumes = self.rpc.call_node_volumes(nodenames)
1830 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1831 in self.cfg.GetInstanceList()]
1833 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1836 for node in nodenames:
1837 if node not in volumes or volumes[node].failed or not volumes[node].data:
1840 node_vols = volumes[node].data[:]
1841 node_vols.sort(key=lambda vol: vol['dev'])
1843 for vol in node_vols:
1845 for field in self.op.output_fields:
1848 elif field == "phys":
1852 elif field == "name":
1854 elif field == "size":
1855 val = int(float(vol['size']))
1856 elif field == "instance":
1858 if node not in lv_by_node[inst]:
1860 if vol['name'] in lv_by_node[inst][node]:
1866 raise errors.ParameterError(field)
1867 node_output.append(str(val))
1869 output.append(node_output)
1874 class LUAddNode(LogicalUnit):
1875 """Logical unit for adding node to the cluster.
1879 HTYPE = constants.HTYPE_NODE
1880 _OP_REQP = ["node_name"]
1882 def BuildHooksEnv(self):
1885 This will run on all nodes before, and on all nodes + the new node after.
1889 "OP_TARGET": self.op.node_name,
1890 "NODE_NAME": self.op.node_name,
1891 "NODE_PIP": self.op.primary_ip,
1892 "NODE_SIP": self.op.secondary_ip,
1894 nodes_0 = self.cfg.GetNodeList()
1895 nodes_1 = nodes_0 + [self.op.node_name, ]
1896 return env, nodes_0, nodes_1
1898 def CheckPrereq(self):
1899 """Check prerequisites.
1902 - the new node is not already in the config
1904 - its parameters (single/dual homed) matches the cluster
1906 Any errors are signalled by raising errors.OpPrereqError.
1909 node_name = self.op.node_name
1912 dns_data = utils.HostInfo(node_name)
1914 node = dns_data.name
1915 primary_ip = self.op.primary_ip = dns_data.ip
1916 secondary_ip = getattr(self.op, "secondary_ip", None)
1917 if secondary_ip is None:
1918 secondary_ip = primary_ip
1919 if not utils.IsValidIP(secondary_ip):
1920 raise errors.OpPrereqError("Invalid secondary IP given")
1921 self.op.secondary_ip = secondary_ip
1923 node_list = cfg.GetNodeList()
1924 if not self.op.readd and node in node_list:
1925 raise errors.OpPrereqError("Node %s is already in the configuration" %
1927 elif self.op.readd and node not in node_list:
1928 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1930 for existing_node_name in node_list:
1931 existing_node = cfg.GetNodeInfo(existing_node_name)
1933 if self.op.readd and node == existing_node_name:
1934 if (existing_node.primary_ip != primary_ip or
1935 existing_node.secondary_ip != secondary_ip):
1936 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1937 " address configuration as before")
1940 if (existing_node.primary_ip == primary_ip or
1941 existing_node.secondary_ip == primary_ip or
1942 existing_node.primary_ip == secondary_ip or
1943 existing_node.secondary_ip == secondary_ip):
1944 raise errors.OpPrereqError("New node ip address(es) conflict with"
1945 " existing node %s" % existing_node.name)
1947 # check that the type of the node (single versus dual homed) is the
1948 # same as for the master
1949 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1950 master_singlehomed = myself.secondary_ip == myself.primary_ip
1951 newbie_singlehomed = secondary_ip == primary_ip
1952 if master_singlehomed != newbie_singlehomed:
1953 if master_singlehomed:
1954 raise errors.OpPrereqError("The master has no private ip but the"
1955 " new node has one")
1957 raise errors.OpPrereqError("The master has a private ip but the"
1958 " new node doesn't have one")
1960 # checks reachablity
1961 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1962 raise errors.OpPrereqError("Node not reachable by ping")
1964 if not newbie_singlehomed:
1965 # check reachability from my secondary ip to newbie's secondary ip
1966 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1967 source=myself.secondary_ip):
1968 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1969 " based ping to noded port")
1971 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1972 node_info = self.cfg.GetAllNodesInfo().values()
1973 num_candidates = len([n for n in node_info
1974 if n.master_candidate])
1975 master_candidate = num_candidates < cp_size
1977 self.new_node = objects.Node(name=node,
1978 primary_ip=primary_ip,
1979 secondary_ip=secondary_ip,
1980 master_candidate=master_candidate)
1982 def Exec(self, feedback_fn):
1983 """Adds the new node to the cluster.
1986 new_node = self.new_node
1987 node = new_node.name
1989 # check connectivity
1990 result = self.rpc.call_version([node])[node]
1993 if constants.PROTOCOL_VERSION == result.data:
1994 logging.info("Communication to node %s fine, sw version %s match",
1997 raise errors.OpExecError("Version mismatch master version %s,"
1998 " node version %s" %
1999 (constants.PROTOCOL_VERSION, result.data))
2001 raise errors.OpExecError("Cannot get version from the new node")
2004 logging.info("Copy ssh key to node %s", node)
2005 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2007 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2008 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2014 keyarray.append(f.read())
2018 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2020 keyarray[3], keyarray[4], keyarray[5])
2022 if result.failed or not result.data:
2023 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2025 # Add node to our /etc/hosts, and add key to known_hosts
2026 utils.AddHostToEtcHosts(new_node.name)
2028 if new_node.secondary_ip != new_node.primary_ip:
2029 result = self.rpc.call_node_has_ip_address(new_node.name,
2030 new_node.secondary_ip)
2031 if result.failed or not result.data:
2032 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2033 " you gave (%s). Please fix and re-run this"
2034 " command." % new_node.secondary_ip)
2036 node_verify_list = [self.cfg.GetMasterNode()]
2037 node_verify_param = {
2039 # TODO: do a node-net-test as well?
2042 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2043 self.cfg.GetClusterName())
2044 for verifier in node_verify_list:
2045 if result[verifier].failed or not result[verifier].data:
2046 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2047 " for remote verification" % verifier)
2048 if result[verifier].data['nodelist']:
2049 for failed in result[verifier].data['nodelist']:
2050 feedback_fn("ssh/hostname verification failed %s -> %s" %
2051 (verifier, result[verifier]['nodelist'][failed]))
2052 raise errors.OpExecError("ssh/hostname verification failed.")
2054 # Distribute updated /etc/hosts and known_hosts to all nodes,
2055 # including the node just added
2056 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2057 dist_nodes = self.cfg.GetNodeList()
2058 if not self.op.readd:
2059 dist_nodes.append(node)
2060 if myself.name in dist_nodes:
2061 dist_nodes.remove(myself.name)
2063 logging.debug("Copying hosts and known_hosts to all nodes")
2064 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2065 result = self.rpc.call_upload_file(dist_nodes, fname)
2066 for to_node, to_result in result.iteritems():
2067 if to_result.failed or not to_result.data:
2068 logging.error("Copy of file %s to node %s failed", fname, to_node)
2071 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2072 to_copy.append(constants.VNC_PASSWORD_FILE)
2073 for fname in to_copy:
2074 result = self.rpc.call_upload_file([node], fname)
2075 if result[node].failed or not result[node]:
2076 logging.error("Could not copy file %s to node %s", fname, node)
2079 self.context.ReaddNode(new_node)
2081 self.context.AddNode(new_node)
2084 class LUSetNodeParams(LogicalUnit):
2085 """Modifies the parameters of a node.
2088 HPATH = "node-modify"
2089 HTYPE = constants.HTYPE_NODE
2090 _OP_REQP = ["node_name"]
2093 def CheckArguments(self):
2094 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2095 if node_name is None:
2096 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2097 self.op.node_name = node_name
2098 if not hasattr(self.op, 'master_candidate'):
2099 raise errors.OpPrereqError("Please pass at least one modification")
2100 self.op.master_candidate = bool(self.op.master_candidate)
2102 def ExpandNames(self):
2103 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2105 def BuildHooksEnv(self):
2108 This runs on the master node.
2112 "OP_TARGET": self.op.node_name,
2113 "MASTER_CANDIDATE": str(self.op.master_candidate),
2115 nl = [self.cfg.GetMasterNode(),
2119 def CheckPrereq(self):
2120 """Check prerequisites.
2122 This only checks the instance list against the existing names.
2125 force = self.force = self.op.force
2127 if self.op.master_candidate == False:
2128 if self.op.node_name == self.cfg.GetMasterNode():
2129 raise errors.OpPrereqError("The master node has to be a"
2130 " master candidate")
2131 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2132 node_info = self.cfg.GetAllNodesInfo().values()
2133 num_candidates = len([node for node in node_info
2134 if node.master_candidate])
2135 if num_candidates <= cp_size:
2136 msg = ("Not enough master candidates (desired"
2137 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2139 self.LogWarning(msg)
2141 raise errors.OpPrereqError(msg)
2145 def Exec(self, feedback_fn):
2149 node = self.cfg.GetNodeInfo(self.op.node_name)
2153 if self.op.master_candidate is not None:
2154 node.master_candidate = self.op.master_candidate
2155 result.append(("master_candidate", str(self.op.master_candidate)))
2157 # this will trigger configuration file update, if needed
2158 self.cfg.Update(node)
2159 # this will trigger job queue propagation or cleanup
2160 if self.op.node_name != self.cfg.GetMasterNode():
2161 self.context.ReaddNode(node)
2166 class LUQueryClusterInfo(NoHooksLU):
2167 """Query cluster configuration.
2173 def ExpandNames(self):
2174 self.needed_locks = {}
2176 def CheckPrereq(self):
2177 """No prerequsites needed for this LU.
2182 def Exec(self, feedback_fn):
2183 """Return cluster config.
2186 cluster = self.cfg.GetClusterInfo()
2188 "software_version": constants.RELEASE_VERSION,
2189 "protocol_version": constants.PROTOCOL_VERSION,
2190 "config_version": constants.CONFIG_VERSION,
2191 "os_api_version": constants.OS_API_VERSION,
2192 "export_version": constants.EXPORT_VERSION,
2193 "architecture": (platform.architecture()[0], platform.machine()),
2194 "name": cluster.cluster_name,
2195 "master": cluster.master_node,
2196 "default_hypervisor": cluster.default_hypervisor,
2197 "enabled_hypervisors": cluster.enabled_hypervisors,
2198 "hvparams": cluster.hvparams,
2199 "beparams": cluster.beparams,
2200 "candidate_pool_size": cluster.candidate_pool_size,
2206 class LUQueryConfigValues(NoHooksLU):
2207 """Return configuration values.
2212 _FIELDS_DYNAMIC = utils.FieldSet()
2213 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2215 def ExpandNames(self):
2216 self.needed_locks = {}
2218 _CheckOutputFields(static=self._FIELDS_STATIC,
2219 dynamic=self._FIELDS_DYNAMIC,
2220 selected=self.op.output_fields)
2222 def CheckPrereq(self):
2223 """No prerequisites.
2228 def Exec(self, feedback_fn):
2229 """Dump a representation of the cluster config to the standard output.
2233 for field in self.op.output_fields:
2234 if field == "cluster_name":
2235 entry = self.cfg.GetClusterName()
2236 elif field == "master_node":
2237 entry = self.cfg.GetMasterNode()
2238 elif field == "drain_flag":
2239 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2241 raise errors.ParameterError(field)
2242 values.append(entry)
2246 class LUActivateInstanceDisks(NoHooksLU):
2247 """Bring up an instance's disks.
2250 _OP_REQP = ["instance_name"]
2253 def ExpandNames(self):
2254 self._ExpandAndLockInstance()
2255 self.needed_locks[locking.LEVEL_NODE] = []
2256 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2258 def DeclareLocks(self, level):
2259 if level == locking.LEVEL_NODE:
2260 self._LockInstancesNodes()
2262 def CheckPrereq(self):
2263 """Check prerequisites.
2265 This checks that the instance is in the cluster.
2268 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2269 assert self.instance is not None, \
2270 "Cannot retrieve locked instance %s" % self.op.instance_name
2272 def Exec(self, feedback_fn):
2273 """Activate the disks.
2276 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2278 raise errors.OpExecError("Cannot activate block devices")
2283 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2284 """Prepare the block devices for an instance.
2286 This sets up the block devices on all nodes.
2288 @type lu: L{LogicalUnit}
2289 @param lu: the logical unit on whose behalf we execute
2290 @type instance: L{objects.Instance}
2291 @param instance: the instance for whose disks we assemble
2292 @type ignore_secondaries: boolean
2293 @param ignore_secondaries: if true, errors on secondary nodes
2294 won't result in an error return from the function
2295 @return: False if the operation failed, otherwise a list of
2296 (host, instance_visible_name, node_visible_name)
2297 with the mapping from node devices to instance devices
2302 iname = instance.name
2303 # With the two passes mechanism we try to reduce the window of
2304 # opportunity for the race condition of switching DRBD to primary
2305 # before handshaking occured, but we do not eliminate it
2307 # The proper fix would be to wait (with some limits) until the
2308 # connection has been made and drbd transitions from WFConnection
2309 # into any other network-connected state (Connected, SyncTarget,
2312 # 1st pass, assemble on all nodes in secondary mode
2313 for inst_disk in instance.disks:
2314 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2315 lu.cfg.SetDiskID(node_disk, node)
2316 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2317 if result.failed or not result:
2318 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2319 " (is_primary=False, pass=1)",
2320 inst_disk.iv_name, node)
2321 if not ignore_secondaries:
2324 # FIXME: race condition on drbd migration to primary
2326 # 2nd pass, do only the primary node
2327 for inst_disk in instance.disks:
2328 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2329 if node != instance.primary_node:
2331 lu.cfg.SetDiskID(node_disk, node)
2332 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2333 if result.failed or not result:
2334 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2335 " (is_primary=True, pass=2)",
2336 inst_disk.iv_name, node)
2338 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2340 # leave the disks configured for the primary node
2341 # this is a workaround that would be fixed better by
2342 # improving the logical/physical id handling
2343 for disk in instance.disks:
2344 lu.cfg.SetDiskID(disk, instance.primary_node)
2346 return disks_ok, device_info
2349 def _StartInstanceDisks(lu, instance, force):
2350 """Start the disks of an instance.
2353 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2354 ignore_secondaries=force)
2356 _ShutdownInstanceDisks(lu, instance)
2357 if force is not None and not force:
2358 lu.proc.LogWarning("", hint="If the message above refers to a"
2360 " you can retry the operation using '--force'.")
2361 raise errors.OpExecError("Disk consistency error")
2364 class LUDeactivateInstanceDisks(NoHooksLU):
2365 """Shutdown an instance's disks.
2368 _OP_REQP = ["instance_name"]
2371 def ExpandNames(self):
2372 self._ExpandAndLockInstance()
2373 self.needed_locks[locking.LEVEL_NODE] = []
2374 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2376 def DeclareLocks(self, level):
2377 if level == locking.LEVEL_NODE:
2378 self._LockInstancesNodes()
2380 def CheckPrereq(self):
2381 """Check prerequisites.
2383 This checks that the instance is in the cluster.
2386 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2387 assert self.instance is not None, \
2388 "Cannot retrieve locked instance %s" % self.op.instance_name
2390 def Exec(self, feedback_fn):
2391 """Deactivate the disks
2394 instance = self.instance
2395 _SafeShutdownInstanceDisks(self, instance)
2398 def _SafeShutdownInstanceDisks(lu, instance):
2399 """Shutdown block devices of an instance.
2401 This function checks if an instance is running, before calling
2402 _ShutdownInstanceDisks.
2405 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2406 [instance.hypervisor])
2407 ins_l = ins_l[instance.primary_node]
2408 if ins_l.failed or not isinstance(ins_l.data, list):
2409 raise errors.OpExecError("Can't contact node '%s'" %
2410 instance.primary_node)
2412 if instance.name in ins_l.data:
2413 raise errors.OpExecError("Instance is running, can't shutdown"
2416 _ShutdownInstanceDisks(lu, instance)
2419 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2420 """Shutdown block devices of an instance.
2422 This does the shutdown on all nodes of the instance.
2424 If the ignore_primary is false, errors on the primary node are
2429 for disk in instance.disks:
2430 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2431 lu.cfg.SetDiskID(top_disk, node)
2432 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2433 if result.failed or not result.data:
2434 logging.error("Could not shutdown block device %s on node %s",
2436 if not ignore_primary or node != instance.primary_node:
2441 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2442 """Checks if a node has enough free memory.
2444 This function check if a given node has the needed amount of free
2445 memory. In case the node has less memory or we cannot get the
2446 information from the node, this function raise an OpPrereqError
2449 @type lu: C{LogicalUnit}
2450 @param lu: a logical unit from which we get configuration data
2452 @param node: the node to check
2453 @type reason: C{str}
2454 @param reason: string to use in the error message
2455 @type requested: C{int}
2456 @param requested: the amount of memory in MiB to check for
2457 @type hypervisor: C{str}
2458 @param hypervisor: the hypervisor to ask for memory stats
2459 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2460 we cannot check the node
2463 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2464 nodeinfo[node].Raise()
2465 free_mem = nodeinfo[node].data.get('memory_free')
2466 if not isinstance(free_mem, int):
2467 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2468 " was '%s'" % (node, free_mem))
2469 if requested > free_mem:
2470 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2471 " needed %s MiB, available %s MiB" %
2472 (node, reason, requested, free_mem))
2475 class LUStartupInstance(LogicalUnit):
2476 """Starts an instance.
2479 HPATH = "instance-start"
2480 HTYPE = constants.HTYPE_INSTANCE
2481 _OP_REQP = ["instance_name", "force"]
2484 def ExpandNames(self):
2485 self._ExpandAndLockInstance()
2487 def BuildHooksEnv(self):
2490 This runs on master, primary and secondary nodes of the instance.
2494 "FORCE": self.op.force,
2496 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2497 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2498 list(self.instance.secondary_nodes))
2501 def CheckPrereq(self):
2502 """Check prerequisites.
2504 This checks that the instance is in the cluster.
2507 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2508 assert self.instance is not None, \
2509 "Cannot retrieve locked instance %s" % self.op.instance_name
2511 bep = self.cfg.GetClusterInfo().FillBE(instance)
2512 # check bridges existance
2513 _CheckInstanceBridgesExist(self, instance)
2515 _CheckNodeFreeMemory(self, instance.primary_node,
2516 "starting instance %s" % instance.name,
2517 bep[constants.BE_MEMORY], instance.hypervisor)
2519 def Exec(self, feedback_fn):
2520 """Start the instance.
2523 instance = self.instance
2524 force = self.op.force
2525 extra_args = getattr(self.op, "extra_args", "")
2527 self.cfg.MarkInstanceUp(instance.name)
2529 node_current = instance.primary_node
2531 _StartInstanceDisks(self, instance, force)
2533 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2534 if result.failed or not result.data:
2535 _ShutdownInstanceDisks(self, instance)
2536 raise errors.OpExecError("Could not start instance")
2539 class LURebootInstance(LogicalUnit):
2540 """Reboot an instance.
2543 HPATH = "instance-reboot"
2544 HTYPE = constants.HTYPE_INSTANCE
2545 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2548 def ExpandNames(self):
2549 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2550 constants.INSTANCE_REBOOT_HARD,
2551 constants.INSTANCE_REBOOT_FULL]:
2552 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2553 (constants.INSTANCE_REBOOT_SOFT,
2554 constants.INSTANCE_REBOOT_HARD,
2555 constants.INSTANCE_REBOOT_FULL))
2556 self._ExpandAndLockInstance()
2558 def BuildHooksEnv(self):
2561 This runs on master, primary and secondary nodes of the instance.
2565 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2567 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2568 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2569 list(self.instance.secondary_nodes))
2572 def CheckPrereq(self):
2573 """Check prerequisites.
2575 This checks that the instance is in the cluster.
2578 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2579 assert self.instance is not None, \
2580 "Cannot retrieve locked instance %s" % self.op.instance_name
2582 # check bridges existance
2583 _CheckInstanceBridgesExist(self, instance)
2585 def Exec(self, feedback_fn):
2586 """Reboot the instance.
2589 instance = self.instance
2590 ignore_secondaries = self.op.ignore_secondaries
2591 reboot_type = self.op.reboot_type
2592 extra_args = getattr(self.op, "extra_args", "")
2594 node_current = instance.primary_node
2596 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2597 constants.INSTANCE_REBOOT_HARD]:
2598 result = self.rpc.call_instance_reboot(node_current, instance,
2599 reboot_type, extra_args)
2600 if result.failed or not result.data:
2601 raise errors.OpExecError("Could not reboot instance")
2603 if not self.rpc.call_instance_shutdown(node_current, instance):
2604 raise errors.OpExecError("could not shutdown instance for full reboot")
2605 _ShutdownInstanceDisks(self, instance)
2606 _StartInstanceDisks(self, instance, ignore_secondaries)
2607 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2608 if result.failed or not result.data:
2609 _ShutdownInstanceDisks(self, instance)
2610 raise errors.OpExecError("Could not start instance for full reboot")
2612 self.cfg.MarkInstanceUp(instance.name)
2615 class LUShutdownInstance(LogicalUnit):
2616 """Shutdown an instance.
2619 HPATH = "instance-stop"
2620 HTYPE = constants.HTYPE_INSTANCE
2621 _OP_REQP = ["instance_name"]
2624 def ExpandNames(self):
2625 self._ExpandAndLockInstance()
2627 def BuildHooksEnv(self):
2630 This runs on master, primary and secondary nodes of the instance.
2633 env = _BuildInstanceHookEnvByObject(self, self.instance)
2634 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2635 list(self.instance.secondary_nodes))
2638 def CheckPrereq(self):
2639 """Check prerequisites.
2641 This checks that the instance is in the cluster.
2644 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2645 assert self.instance is not None, \
2646 "Cannot retrieve locked instance %s" % self.op.instance_name
2648 def Exec(self, feedback_fn):
2649 """Shutdown the instance.
2652 instance = self.instance
2653 node_current = instance.primary_node
2654 self.cfg.MarkInstanceDown(instance.name)
2655 result = self.rpc.call_instance_shutdown(node_current, instance)
2656 if result.failed or not result.data:
2657 self.proc.LogWarning("Could not shutdown instance")
2659 _ShutdownInstanceDisks(self, instance)
2662 class LUReinstallInstance(LogicalUnit):
2663 """Reinstall an instance.
2666 HPATH = "instance-reinstall"
2667 HTYPE = constants.HTYPE_INSTANCE
2668 _OP_REQP = ["instance_name"]
2671 def ExpandNames(self):
2672 self._ExpandAndLockInstance()
2674 def BuildHooksEnv(self):
2677 This runs on master, primary and secondary nodes of the instance.
2680 env = _BuildInstanceHookEnvByObject(self, self.instance)
2681 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2682 list(self.instance.secondary_nodes))
2685 def CheckPrereq(self):
2686 """Check prerequisites.
2688 This checks that the instance is in the cluster and is not running.
2691 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2692 assert instance is not None, \
2693 "Cannot retrieve locked instance %s" % self.op.instance_name
2695 if instance.disk_template == constants.DT_DISKLESS:
2696 raise errors.OpPrereqError("Instance '%s' has no disks" %
2697 self.op.instance_name)
2698 if instance.status != "down":
2699 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2700 self.op.instance_name)
2701 remote_info = self.rpc.call_instance_info(instance.primary_node,
2703 instance.hypervisor)
2704 if remote_info.failed or remote_info.data:
2705 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2706 (self.op.instance_name,
2707 instance.primary_node))
2709 self.op.os_type = getattr(self.op, "os_type", None)
2710 if self.op.os_type is not None:
2712 pnode = self.cfg.GetNodeInfo(
2713 self.cfg.ExpandNodeName(instance.primary_node))
2715 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2717 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2719 if not isinstance(result.data, objects.OS):
2720 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2721 " primary node" % self.op.os_type)
2723 self.instance = instance
2725 def Exec(self, feedback_fn):
2726 """Reinstall the instance.
2729 inst = self.instance
2731 if self.op.os_type is not None:
2732 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2733 inst.os = self.op.os_type
2734 self.cfg.Update(inst)
2736 _StartInstanceDisks(self, inst, None)
2738 feedback_fn("Running the instance OS create scripts...")
2739 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2742 raise errors.OpExecError("Could not install OS for instance %s"
2744 (inst.name, inst.primary_node))
2746 _ShutdownInstanceDisks(self, inst)
2749 class LURenameInstance(LogicalUnit):
2750 """Rename an instance.
2753 HPATH = "instance-rename"
2754 HTYPE = constants.HTYPE_INSTANCE
2755 _OP_REQP = ["instance_name", "new_name"]
2757 def BuildHooksEnv(self):
2760 This runs on master, primary and secondary nodes of the instance.
2763 env = _BuildInstanceHookEnvByObject(self, self.instance)
2764 env["INSTANCE_NEW_NAME"] = self.op.new_name
2765 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2766 list(self.instance.secondary_nodes))
2769 def CheckPrereq(self):
2770 """Check prerequisites.
2772 This checks that the instance is in the cluster and is not running.
2775 instance = self.cfg.GetInstanceInfo(
2776 self.cfg.ExpandInstanceName(self.op.instance_name))
2777 if instance is None:
2778 raise errors.OpPrereqError("Instance '%s' not known" %
2779 self.op.instance_name)
2780 if instance.status != "down":
2781 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2782 self.op.instance_name)
2783 remote_info = self.rpc.call_instance_info(instance.primary_node,
2785 instance.hypervisor)
2787 if remote_info.data:
2788 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2789 (self.op.instance_name,
2790 instance.primary_node))
2791 self.instance = instance
2793 # new name verification
2794 name_info = utils.HostInfo(self.op.new_name)
2796 self.op.new_name = new_name = name_info.name
2797 instance_list = self.cfg.GetInstanceList()
2798 if new_name in instance_list:
2799 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2802 if not getattr(self.op, "ignore_ip", False):
2803 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2804 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2805 (name_info.ip, new_name))
2808 def Exec(self, feedback_fn):
2809 """Reinstall the instance.
2812 inst = self.instance
2813 old_name = inst.name
2815 if inst.disk_template == constants.DT_FILE:
2816 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2818 self.cfg.RenameInstance(inst.name, self.op.new_name)
2819 # Change the instance lock. This is definitely safe while we hold the BGL
2820 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2821 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2823 # re-read the instance from the configuration after rename
2824 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2826 if inst.disk_template == constants.DT_FILE:
2827 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2828 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2829 old_file_storage_dir,
2830 new_file_storage_dir)
2833 raise errors.OpExecError("Could not connect to node '%s' to rename"
2834 " directory '%s' to '%s' (but the instance"
2835 " has been renamed in Ganeti)" % (
2836 inst.primary_node, old_file_storage_dir,
2837 new_file_storage_dir))
2839 if not result.data[0]:
2840 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2841 " (but the instance has been renamed in"
2842 " Ganeti)" % (old_file_storage_dir,
2843 new_file_storage_dir))
2845 _StartInstanceDisks(self, inst, None)
2847 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2849 if result.failed or not result.data:
2850 msg = ("Could not run OS rename script for instance %s on node %s"
2851 " (but the instance has been renamed in Ganeti)" %
2852 (inst.name, inst.primary_node))
2853 self.proc.LogWarning(msg)
2855 _ShutdownInstanceDisks(self, inst)
2858 class LURemoveInstance(LogicalUnit):
2859 """Remove an instance.
2862 HPATH = "instance-remove"
2863 HTYPE = constants.HTYPE_INSTANCE
2864 _OP_REQP = ["instance_name", "ignore_failures"]
2867 def ExpandNames(self):
2868 self._ExpandAndLockInstance()
2869 self.needed_locks[locking.LEVEL_NODE] = []
2870 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2872 def DeclareLocks(self, level):
2873 if level == locking.LEVEL_NODE:
2874 self._LockInstancesNodes()
2876 def BuildHooksEnv(self):
2879 This runs on master, primary and secondary nodes of the instance.
2882 env = _BuildInstanceHookEnvByObject(self, self.instance)
2883 nl = [self.cfg.GetMasterNode()]
2886 def CheckPrereq(self):
2887 """Check prerequisites.
2889 This checks that the instance is in the cluster.
2892 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2893 assert self.instance is not None, \
2894 "Cannot retrieve locked instance %s" % self.op.instance_name
2896 def Exec(self, feedback_fn):
2897 """Remove the instance.
2900 instance = self.instance
2901 logging.info("Shutting down instance %s on node %s",
2902 instance.name, instance.primary_node)
2904 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2905 if result.failed or not result.data:
2906 if self.op.ignore_failures:
2907 feedback_fn("Warning: can't shutdown instance")
2909 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2910 (instance.name, instance.primary_node))
2912 logging.info("Removing block devices for instance %s", instance.name)
2914 if not _RemoveDisks(self, instance):
2915 if self.op.ignore_failures:
2916 feedback_fn("Warning: can't remove instance's disks")
2918 raise errors.OpExecError("Can't remove instance's disks")
2920 logging.info("Removing instance %s out of cluster config", instance.name)
2922 self.cfg.RemoveInstance(instance.name)
2923 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2926 class LUQueryInstances(NoHooksLU):
2927 """Logical unit for querying instances.
2930 _OP_REQP = ["output_fields", "names"]
2932 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2933 "admin_state", "admin_ram",
2934 "disk_template", "ip", "mac", "bridge",
2935 "sda_size", "sdb_size", "vcpus", "tags",
2936 "network_port", "beparams",
2937 "(disk).(size)/([0-9]+)",
2939 "(nic).(mac|ip|bridge)/([0-9]+)",
2940 "(nic).(macs|ips|bridges)",
2941 "(disk|nic).(count)",
2942 "serial_no", "hypervisor", "hvparams",] +
2944 for name in constants.HVS_PARAMETERS] +
2946 for name in constants.BES_PARAMETERS])
2947 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2950 def ExpandNames(self):
2951 _CheckOutputFields(static=self._FIELDS_STATIC,
2952 dynamic=self._FIELDS_DYNAMIC,
2953 selected=self.op.output_fields)
2955 self.needed_locks = {}
2956 self.share_locks[locking.LEVEL_INSTANCE] = 1
2957 self.share_locks[locking.LEVEL_NODE] = 1
2960 self.wanted = _GetWantedInstances(self, self.op.names)
2962 self.wanted = locking.ALL_SET
2964 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2966 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2967 self.needed_locks[locking.LEVEL_NODE] = []
2968 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2970 def DeclareLocks(self, level):
2971 if level == locking.LEVEL_NODE and self.do_locking:
2972 self._LockInstancesNodes()
2974 def CheckPrereq(self):
2975 """Check prerequisites.
2980 def Exec(self, feedback_fn):
2981 """Computes the list of nodes and their attributes.
2984 all_info = self.cfg.GetAllInstancesInfo()
2986 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2987 elif self.wanted != locking.ALL_SET:
2988 instance_names = self.wanted
2989 missing = set(instance_names).difference(all_info.keys())
2991 raise errors.OpExecError(
2992 "Some instances were removed before retrieving their data: %s"
2995 instance_names = all_info.keys()
2997 instance_names = utils.NiceSort(instance_names)
2998 instance_list = [all_info[iname] for iname in instance_names]
3000 # begin data gathering
3002 nodes = frozenset([inst.primary_node for inst in instance_list])
3003 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3008 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3010 result = node_data[name]
3012 bad_nodes.append(name)
3015 live_data.update(result.data)
3016 # else no instance is alive
3018 live_data = dict([(name, {}) for name in instance_names])
3020 # end data gathering
3025 for instance in instance_list:
3027 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3028 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3029 for field in self.op.output_fields:
3030 st_match = self._FIELDS_STATIC.Matches(field)
3035 elif field == "pnode":
3036 val = instance.primary_node
3037 elif field == "snodes":
3038 val = list(instance.secondary_nodes)
3039 elif field == "admin_state":
3040 val = (instance.status != "down")
3041 elif field == "oper_state":
3042 if instance.primary_node in bad_nodes:
3045 val = bool(live_data.get(instance.name))
3046 elif field == "status":
3047 if instance.primary_node in bad_nodes:
3048 val = "ERROR_nodedown"
3050 running = bool(live_data.get(instance.name))
3052 if instance.status != "down":
3057 if instance.status != "down":
3061 elif field == "oper_ram":
3062 if instance.primary_node in bad_nodes:
3064 elif instance.name in live_data:
3065 val = live_data[instance.name].get("memory", "?")
3068 elif field == "disk_template":
3069 val = instance.disk_template
3071 val = instance.nics[0].ip
3072 elif field == "bridge":
3073 val = instance.nics[0].bridge
3074 elif field == "mac":
3075 val = instance.nics[0].mac
3076 elif field == "sda_size" or field == "sdb_size":
3077 idx = ord(field[2]) - ord('a')
3079 val = instance.FindDisk(idx).size
3080 except errors.OpPrereqError:
3082 elif field == "tags":
3083 val = list(instance.GetTags())
3084 elif field == "serial_no":
3085 val = instance.serial_no
3086 elif field == "network_port":
3087 val = instance.network_port
3088 elif field == "hypervisor":
3089 val = instance.hypervisor
3090 elif field == "hvparams":
3092 elif (field.startswith(HVPREFIX) and
3093 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3094 val = i_hv.get(field[len(HVPREFIX):], None)
3095 elif field == "beparams":
3097 elif (field.startswith(BEPREFIX) and
3098 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3099 val = i_be.get(field[len(BEPREFIX):], None)
3100 elif st_match and st_match.groups():
3101 # matches a variable list
3102 st_groups = st_match.groups()
3103 if st_groups and st_groups[0] == "disk":
3104 if st_groups[1] == "count":
3105 val = len(instance.disks)
3106 elif st_groups[1] == "sizes":
3107 val = [disk.size for disk in instance.disks]
3108 elif st_groups[1] == "size":
3110 val = instance.FindDisk(st_groups[2]).size
3111 except errors.OpPrereqError:
3114 assert False, "Unhandled disk parameter"
3115 elif st_groups[0] == "nic":
3116 if st_groups[1] == "count":
3117 val = len(instance.nics)
3118 elif st_groups[1] == "macs":
3119 val = [nic.mac for nic in instance.nics]
3120 elif st_groups[1] == "ips":
3121 val = [nic.ip for nic in instance.nics]
3122 elif st_groups[1] == "bridges":
3123 val = [nic.bridge for nic in instance.nics]
3126 nic_idx = int(st_groups[2])
3127 if nic_idx >= len(instance.nics):
3130 if st_groups[1] == "mac":
3131 val = instance.nics[nic_idx].mac
3132 elif st_groups[1] == "ip":
3133 val = instance.nics[nic_idx].ip
3134 elif st_groups[1] == "bridge":
3135 val = instance.nics[nic_idx].bridge
3137 assert False, "Unhandled NIC parameter"
3139 assert False, "Unhandled variable parameter"
3141 raise errors.ParameterError(field)
3148 class LUFailoverInstance(LogicalUnit):
3149 """Failover an instance.
3152 HPATH = "instance-failover"
3153 HTYPE = constants.HTYPE_INSTANCE
3154 _OP_REQP = ["instance_name", "ignore_consistency"]
3157 def ExpandNames(self):
3158 self._ExpandAndLockInstance()
3159 self.needed_locks[locking.LEVEL_NODE] = []
3160 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3162 def DeclareLocks(self, level):
3163 if level == locking.LEVEL_NODE:
3164 self._LockInstancesNodes()
3166 def BuildHooksEnv(self):
3169 This runs on master, primary and secondary nodes of the instance.
3173 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3175 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3176 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3179 def CheckPrereq(self):
3180 """Check prerequisites.
3182 This checks that the instance is in the cluster.
3185 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3186 assert self.instance is not None, \
3187 "Cannot retrieve locked instance %s" % self.op.instance_name
3189 bep = self.cfg.GetClusterInfo().FillBE(instance)
3190 if instance.disk_template not in constants.DTS_NET_MIRROR:
3191 raise errors.OpPrereqError("Instance's disk layout is not"
3192 " network mirrored, cannot failover.")
3194 secondary_nodes = instance.secondary_nodes
3195 if not secondary_nodes:
3196 raise errors.ProgrammerError("no secondary node but using "
3197 "a mirrored disk template")
3199 target_node = secondary_nodes[0]
3200 # check memory requirements on the secondary node
3201 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3202 instance.name, bep[constants.BE_MEMORY],
3203 instance.hypervisor)
3205 # check bridge existance
3206 brlist = [nic.bridge for nic in instance.nics]
3207 result = self.rpc.call_bridges_exist(target_node, brlist)
3210 raise errors.OpPrereqError("One or more target bridges %s does not"
3211 " exist on destination node '%s'" %
3212 (brlist, target_node))
3214 def Exec(self, feedback_fn):
3215 """Failover an instance.
3217 The failover is done by shutting it down on its present node and
3218 starting it on the secondary.
3221 instance = self.instance
3223 source_node = instance.primary_node
3224 target_node = instance.secondary_nodes[0]
3226 feedback_fn("* checking disk consistency between source and target")
3227 for dev in instance.disks:
3228 # for drbd, these are drbd over lvm
3229 if not _CheckDiskConsistency(self, dev, target_node, False):
3230 if instance.status == "up" and not self.op.ignore_consistency:
3231 raise errors.OpExecError("Disk %s is degraded on target node,"
3232 " aborting failover." % dev.iv_name)
3234 feedback_fn("* shutting down instance on source node")
3235 logging.info("Shutting down instance %s on node %s",
3236 instance.name, source_node)
3238 result = self.rpc.call_instance_shutdown(source_node, instance)
3239 if result.failed or not result.data:
3240 if self.op.ignore_consistency:
3241 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3243 " anyway. Please make sure node %s is down",
3244 instance.name, source_node, source_node)
3246 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3247 (instance.name, source_node))
3249 feedback_fn("* deactivating the instance's disks on source node")
3250 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3251 raise errors.OpExecError("Can't shut down the instance's disks.")
3253 instance.primary_node = target_node
3254 # distribute new instance config to the other nodes
3255 self.cfg.Update(instance)
3257 # Only start the instance if it's marked as up
3258 if instance.status == "up":
3259 feedback_fn("* activating the instance's disks on target node")
3260 logging.info("Starting instance %s on node %s",
3261 instance.name, target_node)
3263 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3264 ignore_secondaries=True)
3266 _ShutdownInstanceDisks(self, instance)
3267 raise errors.OpExecError("Can't activate the instance's disks")
3269 feedback_fn("* starting the instance on the target node")
3270 result = self.rpc.call_instance_start(target_node, instance, None)
3271 if result.failed or not result.data:
3272 _ShutdownInstanceDisks(self, instance)
3273 raise errors.OpExecError("Could not start instance %s on node %s." %
3274 (instance.name, target_node))
3277 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3278 """Create a tree of block devices on the primary node.
3280 This always creates all devices.
3284 for child in device.children:
3285 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3288 lu.cfg.SetDiskID(device, node)
3289 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3290 instance.name, True, info)
3291 if new_id.failed or not new_id.data:
3293 if device.physical_id is None:
3294 device.physical_id = new_id
3298 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3299 """Create a tree of block devices on a secondary node.
3301 If this device type has to be created on secondaries, create it and
3304 If not, just recurse to children keeping the same 'force' value.
3307 if device.CreateOnSecondary():
3310 for child in device.children:
3311 if not _CreateBlockDevOnSecondary(lu, node, instance,
3312 child, force, info):
3317 lu.cfg.SetDiskID(device, node)
3318 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3319 instance.name, False, info)
3320 if new_id.failed or not new_id.data:
3322 if device.physical_id is None:
3323 device.physical_id = new_id
3327 def _GenerateUniqueNames(lu, exts):
3328 """Generate a suitable LV name.
3330 This will generate a logical volume name for the given instance.
3335 new_id = lu.cfg.GenerateUniqueID()
3336 results.append("%s%s" % (new_id, val))
3340 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3342 """Generate a drbd8 device complete with its children.
3345 port = lu.cfg.AllocatePort()
3346 vgname = lu.cfg.GetVGName()
3347 shared_secret = lu.cfg.GenerateDRBDSecret()
3348 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3349 logical_id=(vgname, names[0]))
3350 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3351 logical_id=(vgname, names[1]))
3352 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3353 logical_id=(primary, secondary, port,
3356 children=[dev_data, dev_meta],
3361 def _GenerateDiskTemplate(lu, template_name,
3362 instance_name, primary_node,
3363 secondary_nodes, disk_info,
3364 file_storage_dir, file_driver,
3366 """Generate the entire disk layout for a given template type.
3369 #TODO: compute space requirements
3371 vgname = lu.cfg.GetVGName()
3372 disk_count = len(disk_info)
3374 if template_name == constants.DT_DISKLESS:
3376 elif template_name == constants.DT_PLAIN:
3377 if len(secondary_nodes) != 0:
3378 raise errors.ProgrammerError("Wrong template configuration")
3380 names = _GenerateUniqueNames(lu, [".disk%d" % i
3381 for i in range(disk_count)])
3382 for idx, disk in enumerate(disk_info):
3383 disk_index = idx + base_index
3384 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3385 logical_id=(vgname, names[idx]),
3386 iv_name="disk/%d" % disk_index)
3387 disks.append(disk_dev)
3388 elif template_name == constants.DT_DRBD8:
3389 if len(secondary_nodes) != 1:
3390 raise errors.ProgrammerError("Wrong template configuration")
3391 remote_node = secondary_nodes[0]
3392 minors = lu.cfg.AllocateDRBDMinor(
3393 [primary_node, remote_node] * len(disk_info), instance_name)
3395 names = _GenerateUniqueNames(lu,
3396 [".disk%d_%s" % (i, s)
3397 for i in range(disk_count)
3398 for s in ("data", "meta")
3400 for idx, disk in enumerate(disk_info):
3401 disk_index = idx + base_index
3402 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3403 disk["size"], names[idx*2:idx*2+2],
3404 "disk/%d" % disk_index,
3405 minors[idx*2], minors[idx*2+1])
3406 disks.append(disk_dev)
3407 elif template_name == constants.DT_FILE:
3408 if len(secondary_nodes) != 0:
3409 raise errors.ProgrammerError("Wrong template configuration")
3411 for idx, disk in enumerate(disk_info):
3412 disk_index = idx + base_index
3413 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3414 iv_name="disk/%d" % disk_index,
3415 logical_id=(file_driver,
3416 "%s/disk%d" % (file_storage_dir,
3418 disks.append(disk_dev)
3420 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3424 def _GetInstanceInfoText(instance):
3425 """Compute that text that should be added to the disk's metadata.
3428 return "originstname+%s" % instance.name
3431 def _CreateDisks(lu, instance):
3432 """Create all disks for an instance.
3434 This abstracts away some work from AddInstance.
3436 @type lu: L{LogicalUnit}
3437 @param lu: the logical unit on whose behalf we execute
3438 @type instance: L{objects.Instance}
3439 @param instance: the instance whose disks we should create
3441 @return: the success of the creation
3444 info = _GetInstanceInfoText(instance)
3446 if instance.disk_template == constants.DT_FILE:
3447 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3448 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3451 if result.failed or not result.data:
3452 logging.error("Could not connect to node '%s'", instance.primary_node)
3455 if not result.data[0]:
3456 logging.error("Failed to create directory '%s'", file_storage_dir)
3459 # Note: this needs to be kept in sync with adding of disks in
3460 # LUSetInstanceParams
3461 for device in instance.disks:
3462 logging.info("Creating volume %s for instance %s",
3463 device.iv_name, instance.name)
3465 for secondary_node in instance.secondary_nodes:
3466 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3467 device, False, info):
3468 logging.error("Failed to create volume %s (%s) on secondary node %s!",
3469 device.iv_name, device, secondary_node)
3472 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3473 instance, device, info):
3474 logging.error("Failed to create volume %s on primary!", device.iv_name)
3480 def _RemoveDisks(lu, instance):
3481 """Remove all disks for an instance.
3483 This abstracts away some work from `AddInstance()` and
3484 `RemoveInstance()`. Note that in case some of the devices couldn't
3485 be removed, the removal will continue with the other ones (compare
3486 with `_CreateDisks()`).
3488 @type lu: L{LogicalUnit}
3489 @param lu: the logical unit on whose behalf we execute
3490 @type instance: L{objects.Instance}
3491 @param instance: the instance whose disks we should remove
3493 @return: the success of the removal
3496 logging.info("Removing block devices for instance %s", instance.name)
3499 for device in instance.disks:
3500 for node, disk in device.ComputeNodeTree(instance.primary_node):
3501 lu.cfg.SetDiskID(disk, node)
3502 result = lu.rpc.call_blockdev_remove(node, disk)
3503 if result.failed or not result.data:
3504 lu.proc.LogWarning("Could not remove block device %s on node %s,"
3505 " continuing anyway", device.iv_name, node)
3508 if instance.disk_template == constants.DT_FILE:
3509 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3510 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3512 if result.failed or not result.data:
3513 logging.error("Could not remove directory '%s'", file_storage_dir)
3519 def _ComputeDiskSize(disk_template, disks):
3520 """Compute disk size requirements in the volume group
3523 # Required free disk space as a function of disk and swap space
3525 constants.DT_DISKLESS: None,
3526 constants.DT_PLAIN: sum(d["size"] for d in disks),
3527 # 128 MB are added for drbd metadata for each disk
3528 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3529 constants.DT_FILE: None,
3532 if disk_template not in req_size_dict:
3533 raise errors.ProgrammerError("Disk template '%s' size requirement"
3534 " is unknown" % disk_template)
3536 return req_size_dict[disk_template]
3539 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3540 """Hypervisor parameter validation.
3542 This function abstract the hypervisor parameter validation to be
3543 used in both instance create and instance modify.
3545 @type lu: L{LogicalUnit}
3546 @param lu: the logical unit for which we check
3547 @type nodenames: list
3548 @param nodenames: the list of nodes on which we should check
3549 @type hvname: string
3550 @param hvname: the name of the hypervisor we should use
3551 @type hvparams: dict
3552 @param hvparams: the parameters which we need to check
3553 @raise errors.OpPrereqError: if the parameters are not valid
3556 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3559 for node in nodenames:
3562 if not info.data or not isinstance(info.data, (tuple, list)):
3563 raise errors.OpPrereqError("Cannot get current information"
3564 " from node '%s' (%s)" % (node, info.data))
3565 if not info.data[0]:
3566 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3567 " %s" % info.data[1])
3570 class LUCreateInstance(LogicalUnit):
3571 """Create an instance.
3574 HPATH = "instance-add"
3575 HTYPE = constants.HTYPE_INSTANCE
3576 _OP_REQP = ["instance_name", "disks", "disk_template",
3578 "wait_for_sync", "ip_check", "nics",
3579 "hvparams", "beparams"]
3582 def _ExpandNode(self, node):
3583 """Expands and checks one node name.
3586 node_full = self.cfg.ExpandNodeName(node)
3587 if node_full is None:
3588 raise errors.OpPrereqError("Unknown node %s" % node)
3591 def ExpandNames(self):
3592 """ExpandNames for CreateInstance.
3594 Figure out the right locks for instance creation.
3597 self.needed_locks = {}
3599 # set optional parameters to none if they don't exist
3600 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3601 if not hasattr(self.op, attr):
3602 setattr(self.op, attr, None)
3604 # cheap checks, mostly valid constants given
3606 # verify creation mode
3607 if self.op.mode not in (constants.INSTANCE_CREATE,
3608 constants.INSTANCE_IMPORT):
3609 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3612 # disk template and mirror node verification
3613 if self.op.disk_template not in constants.DISK_TEMPLATES:
3614 raise errors.OpPrereqError("Invalid disk template name")
3616 if self.op.hypervisor is None:
3617 self.op.hypervisor = self.cfg.GetHypervisorType()
3619 cluster = self.cfg.GetClusterInfo()
3620 enabled_hvs = cluster.enabled_hypervisors
3621 if self.op.hypervisor not in enabled_hvs:
3622 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3623 " cluster (%s)" % (self.op.hypervisor,
3624 ",".join(enabled_hvs)))
3626 # check hypervisor parameter syntax (locally)
3628 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3630 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3631 hv_type.CheckParameterSyntax(filled_hvp)
3633 # fill and remember the beparams dict
3634 utils.CheckBEParams(self.op.beparams)
3635 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3638 #### instance parameters check
3640 # instance name verification
3641 hostname1 = utils.HostInfo(self.op.instance_name)
3642 self.op.instance_name = instance_name = hostname1.name
3644 # this is just a preventive check, but someone might still add this
3645 # instance in the meantime, and creation will fail at lock-add time
3646 if instance_name in self.cfg.GetInstanceList():
3647 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3650 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3654 for nic in self.op.nics:
3655 # ip validity checks
3656 ip = nic.get("ip", None)
3657 if ip is None or ip.lower() == "none":
3659 elif ip.lower() == constants.VALUE_AUTO:
3660 nic_ip = hostname1.ip
3662 if not utils.IsValidIP(ip):
3663 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3664 " like a valid IP" % ip)
3667 # MAC address verification
3668 mac = nic.get("mac", constants.VALUE_AUTO)
3669 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3670 if not utils.IsValidMac(mac.lower()):
3671 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3673 # bridge verification
3674 bridge = nic.get("bridge", self.cfg.GetDefBridge())
3675 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3677 # disk checks/pre-build
3679 for disk in self.op.disks:
3680 mode = disk.get("mode", constants.DISK_RDWR)
3681 if mode not in constants.DISK_ACCESS_SET:
3682 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3684 size = disk.get("size", None)
3686 raise errors.OpPrereqError("Missing disk size")
3690 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3691 self.disks.append({"size": size, "mode": mode})
3693 # used in CheckPrereq for ip ping check
3694 self.check_ip = hostname1.ip
3696 # file storage checks
3697 if (self.op.file_driver and
3698 not self.op.file_driver in constants.FILE_DRIVER):
3699 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3700 self.op.file_driver)
3702 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3703 raise errors.OpPrereqError("File storage directory path not absolute")
3705 ### Node/iallocator related checks
3706 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3707 raise errors.OpPrereqError("One and only one of iallocator and primary"
3708 " node must be given")
3710 if self.op.iallocator:
3711 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3713 self.op.pnode = self._ExpandNode(self.op.pnode)
3714 nodelist = [self.op.pnode]
3715 if self.op.snode is not None:
3716 self.op.snode = self._ExpandNode(self.op.snode)
3717 nodelist.append(self.op.snode)
3718 self.needed_locks[locking.LEVEL_NODE] = nodelist
3720 # in case of import lock the source node too
3721 if self.op.mode == constants.INSTANCE_IMPORT:
3722 src_node = getattr(self.op, "src_node", None)
3723 src_path = getattr(self.op, "src_path", None)
3725 if src_path is None:
3726 self.op.src_path = src_path = self.op.instance_name
3728 if src_node is None:
3729 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3730 self.op.src_node = None
3731 if os.path.isabs(src_path):
3732 raise errors.OpPrereqError("Importing an instance from an absolute"
3733 " path requires a source node option.")
3735 self.op.src_node = src_node = self._ExpandNode(src_node)
3736 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3737 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3738 if not os.path.isabs(src_path):
3739 self.op.src_path = src_path = \
3740 os.path.join(constants.EXPORT_DIR, src_path)
3742 else: # INSTANCE_CREATE
3743 if getattr(self.op, "os_type", None) is None:
3744 raise errors.OpPrereqError("No guest OS specified")
3746 def _RunAllocator(self):
3747 """Run the allocator based on input opcode.
3750 nics = [n.ToDict() for n in self.nics]
3751 ial = IAllocator(self,
3752 mode=constants.IALLOCATOR_MODE_ALLOC,
3753 name=self.op.instance_name,
3754 disk_template=self.op.disk_template,
3757 vcpus=self.be_full[constants.BE_VCPUS],
3758 mem_size=self.be_full[constants.BE_MEMORY],
3761 hypervisor=self.op.hypervisor,
3764 ial.Run(self.op.iallocator)
3767 raise errors.OpPrereqError("Can't compute nodes using"
3768 " iallocator '%s': %s" % (self.op.iallocator,
3770 if len(ial.nodes) != ial.required_nodes:
3771 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3772 " of nodes (%s), required %s" %
3773 (self.op.iallocator, len(ial.nodes),
3774 ial.required_nodes))
3775 self.op.pnode = ial.nodes[0]
3776 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3777 self.op.instance_name, self.op.iallocator,
3778 ", ".join(ial.nodes))
3779 if ial.required_nodes == 2:
3780 self.op.snode = ial.nodes[1]
3782 def BuildHooksEnv(self):
3785 This runs on master, primary and secondary nodes of the instance.
3789 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3790 "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3791 "INSTANCE_ADD_MODE": self.op.mode,
3793 if self.op.mode == constants.INSTANCE_IMPORT:
3794 env["INSTANCE_SRC_NODE"] = self.op.src_node
3795 env["INSTANCE_SRC_PATH"] = self.op.src_path
3796 env["INSTANCE_SRC_IMAGES"] = self.src_images
3798 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3799 primary_node=self.op.pnode,
3800 secondary_nodes=self.secondaries,
3801 status=self.instance_status,
3802 os_type=self.op.os_type,
3803 memory=self.be_full[constants.BE_MEMORY],
3804 vcpus=self.be_full[constants.BE_VCPUS],
3805 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3808 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3813 def CheckPrereq(self):
3814 """Check prerequisites.
3817 if (not self.cfg.GetVGName() and
3818 self.op.disk_template not in constants.DTS_NOT_LVM):
3819 raise errors.OpPrereqError("Cluster does not support lvm-based"
3823 if self.op.mode == constants.INSTANCE_IMPORT:
3824 src_node = self.op.src_node
3825 src_path = self.op.src_path
3827 if src_node is None:
3828 exp_list = self.rpc.call_export_list(
3829 self.acquired_locks[locking.LEVEL_NODE])
3831 for node in exp_list:
3832 if not exp_list[node].failed and src_path in exp_list[node].data:
3834 self.op.src_node = src_node = node
3835 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3839 raise errors.OpPrereqError("No export found for relative path %s" %
3842 result = self.rpc.call_export_info(src_node, src_path)
3845 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3847 export_info = result.data
3848 if not export_info.has_section(constants.INISECT_EXP):
3849 raise errors.ProgrammerError("Corrupted export config")
3851 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3852 if (int(ei_version) != constants.EXPORT_VERSION):
3853 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3854 (ei_version, constants.EXPORT_VERSION))
3856 # Check that the new instance doesn't have less disks than the export
3857 instance_disks = len(self.disks)
3858 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3859 if instance_disks < export_disks:
3860 raise errors.OpPrereqError("Not enough disks to import."
3861 " (instance: %d, export: %d)" %
3862 (instance_disks, export_disks))
3864 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3866 for idx in range(export_disks):
3867 option = 'disk%d_dump' % idx
3868 if export_info.has_option(constants.INISECT_INS, option):
3869 # FIXME: are the old os-es, disk sizes, etc. useful?
3870 export_name = export_info.get(constants.INISECT_INS, option)
3871 image = os.path.join(src_path, export_name)
3872 disk_images.append(image)
3874 disk_images.append(False)
3876 self.src_images = disk_images
3878 old_name = export_info.get(constants.INISECT_INS, 'name')
3879 # FIXME: int() here could throw a ValueError on broken exports
3880 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3881 if self.op.instance_name == old_name:
3882 for idx, nic in enumerate(self.nics):
3883 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3884 nic_mac_ini = 'nic%d_mac' % idx
3885 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3887 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3888 if self.op.start and not self.op.ip_check:
3889 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3890 " adding an instance in start mode")
3892 if self.op.ip_check:
3893 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3894 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3895 (self.check_ip, self.op.instance_name))
3899 if self.op.iallocator is not None:
3900 self._RunAllocator()
3902 #### node related checks
3904 # check primary node
3905 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3906 assert self.pnode is not None, \
3907 "Cannot retrieve locked node %s" % self.op.pnode
3908 self.secondaries = []
3910 # mirror node verification
3911 if self.op.disk_template in constants.DTS_NET_MIRROR:
3912 if self.op.snode is None:
3913 raise errors.OpPrereqError("The networked disk templates need"
3915 if self.op.snode == pnode.name:
3916 raise errors.OpPrereqError("The secondary node cannot be"
3917 " the primary node.")
3918 self.secondaries.append(self.op.snode)
3920 nodenames = [pnode.name] + self.secondaries
3922 req_size = _ComputeDiskSize(self.op.disk_template,
3925 # Check lv size requirements
3926 if req_size is not None:
3927 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3929 for node in nodenames:
3930 info = nodeinfo[node]
3934 raise errors.OpPrereqError("Cannot get current information"
3935 " from node '%s'" % node)
3936 vg_free = info.get('vg_free', None)
3937 if not isinstance(vg_free, int):
3938 raise errors.OpPrereqError("Can't compute free disk space on"
3940 if req_size > info['vg_free']:
3941 raise errors.OpPrereqError("Not enough disk space on target node %s."
3942 " %d MB available, %d MB required" %
3943 (node, info['vg_free'], req_size))
3945 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3948 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3950 if not isinstance(result.data, objects.OS):
3951 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3952 " primary node" % self.op.os_type)
3954 # bridge check on primary node
3955 bridges = [n.bridge for n in self.nics]
3956 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3959 raise errors.OpPrereqError("One of the target bridges '%s' does not"
3960 " exist on destination node '%s'" %
3961 (",".join(bridges), pnode.name))
3963 # memory check on primary node
3965 _CheckNodeFreeMemory(self, self.pnode.name,
3966 "creating instance %s" % self.op.instance_name,
3967 self.be_full[constants.BE_MEMORY],
3971 self.instance_status = 'up'
3973 self.instance_status = 'down'
3975 def Exec(self, feedback_fn):
3976 """Create and add the instance to the cluster.
3979 instance = self.op.instance_name
3980 pnode_name = self.pnode.name
3982 for nic in self.nics:
3983 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3984 nic.mac = self.cfg.GenerateMAC()
3986 ht_kind = self.op.hypervisor
3987 if ht_kind in constants.HTS_REQ_PORT:
3988 network_port = self.cfg.AllocatePort()
3992 ##if self.op.vnc_bind_address is None:
3993 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3995 # this is needed because os.path.join does not accept None arguments
3996 if self.op.file_storage_dir is None:
3997 string_file_storage_dir = ""
3999 string_file_storage_dir = self.op.file_storage_dir
4001 # build the full file storage dir path
4002 file_storage_dir = os.path.normpath(os.path.join(
4003 self.cfg.GetFileStorageDir(),
4004 string_file_storage_dir, instance))
4007 disks = _GenerateDiskTemplate(self,
4008 self.op.disk_template,
4009 instance, pnode_name,
4013 self.op.file_driver,
4016 iobj = objects.Instance(name=instance, os=self.op.os_type,
4017 primary_node=pnode_name,
4018 nics=self.nics, disks=disks,
4019 disk_template=self.op.disk_template,
4020 status=self.instance_status,
4021 network_port=network_port,
4022 beparams=self.op.beparams,
4023 hvparams=self.op.hvparams,
4024 hypervisor=self.op.hypervisor,
4027 feedback_fn("* creating instance disks...")
4028 if not _CreateDisks(self, iobj):
4029 _RemoveDisks(self, iobj)
4030 self.cfg.ReleaseDRBDMinors(instance)
4031 raise errors.OpExecError("Device creation failed, reverting...")
4033 feedback_fn("adding instance %s to cluster config" % instance)
4035 self.cfg.AddInstance(iobj)
4036 # Declare that we don't want to remove the instance lock anymore, as we've
4037 # added the instance to the config
4038 del self.remove_locks[locking.LEVEL_INSTANCE]
4039 # Remove the temp. assignements for the instance's drbds
4040 self.cfg.ReleaseDRBDMinors(instance)
4041 # Unlock all the nodes
4042 if self.op.mode == constants.INSTANCE_IMPORT:
4043 nodes_keep = [self.op.src_node]
4044 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4045 if node != self.op.src_node]
4046 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4047 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4049 self.context.glm.release(locking.LEVEL_NODE)
4050 del self.acquired_locks[locking.LEVEL_NODE]
4052 if self.op.wait_for_sync:
4053 disk_abort = not _WaitForSync(self, iobj)
4054 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4055 # make sure the disks are not degraded (still sync-ing is ok)
4057 feedback_fn("* checking mirrors status")
4058 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4063 _RemoveDisks(self, iobj)
4064 self.cfg.RemoveInstance(iobj.name)
4065 # Make sure the instance lock gets removed
4066 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4067 raise errors.OpExecError("There are some degraded disks for"
4070 feedback_fn("creating os for instance %s on node %s" %
4071 (instance, pnode_name))
4073 if iobj.disk_template != constants.DT_DISKLESS:
4074 if self.op.mode == constants.INSTANCE_CREATE:
4075 feedback_fn("* running the instance OS create scripts...")
4076 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4079 raise errors.OpExecError("Could not add os for instance %s"
4081 (instance, pnode_name))
4083 elif self.op.mode == constants.INSTANCE_IMPORT:
4084 feedback_fn("* running the instance OS import scripts...")
4085 src_node = self.op.src_node
4086 src_images = self.src_images
4087 cluster_name = self.cfg.GetClusterName()
4088 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4089 src_node, src_images,
4091 import_result.Raise()
4092 for idx, result in enumerate(import_result.data):
4094 self.LogWarning("Could not import the image %s for instance"
4095 " %s, disk %d, on node %s" %
4096 (src_images[idx], instance, idx, pnode_name))
4098 # also checked in the prereq part
4099 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4103 logging.info("Starting instance %s on node %s", instance, pnode_name)
4104 feedback_fn("* starting instance...")
4105 result = self.rpc.call_instance_start(pnode_name, iobj, None)
4108 raise errors.OpExecError("Could not start instance")
4111 class LUConnectConsole(NoHooksLU):
4112 """Connect to an instance's console.
4114 This is somewhat special in that it returns the command line that
4115 you need to run on the master node in order to connect to the
4119 _OP_REQP = ["instance_name"]
4122 def ExpandNames(self):
4123 self._ExpandAndLockInstance()
4125 def CheckPrereq(self):
4126 """Check prerequisites.
4128 This checks that the instance is in the cluster.
4131 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4132 assert self.instance is not None, \
4133 "Cannot retrieve locked instance %s" % self.op.instance_name
4135 def Exec(self, feedback_fn):
4136 """Connect to the console of an instance
4139 instance = self.instance
4140 node = instance.primary_node
4142 node_insts = self.rpc.call_instance_list([node],
4143 [instance.hypervisor])[node]
4146 if instance.name not in node_insts.data:
4147 raise errors.OpExecError("Instance %s is not running." % instance.name)
4149 logging.debug("Connecting to console of %s on %s", instance.name, node)
4151 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4152 console_cmd = hyper.GetShellCommandForConsole(instance)
4155 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4158 class LUReplaceDisks(LogicalUnit):
4159 """Replace the disks of an instance.
4162 HPATH = "mirrors-replace"
4163 HTYPE = constants.HTYPE_INSTANCE
4164 _OP_REQP = ["instance_name", "mode", "disks"]
4167 def ExpandNames(self):
4168 self._ExpandAndLockInstance()
4170 if not hasattr(self.op, "remote_node"):
4171 self.op.remote_node = None
4173 ia_name = getattr(self.op, "iallocator", None)
4174 if ia_name is not None:
4175 if self.op.remote_node is not None:
4176 raise errors.OpPrereqError("Give either the iallocator or the new"
4177 " secondary, not both")
4178 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4179 elif self.op.remote_node is not None:
4180 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4181 if remote_node is None:
4182 raise errors.OpPrereqError("Node '%s' not known" %
4183 self.op.remote_node)
4184 self.op.remote_node = remote_node
4185 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4186 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4188 self.needed_locks[locking.LEVEL_NODE] = []
4189 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4191 def DeclareLocks(self, level):
4192 # If we're not already locking all nodes in the set we have to declare the
4193 # instance's primary/secondary nodes.
4194 if (level == locking.LEVEL_NODE and
4195 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4196 self._LockInstancesNodes()
4198 def _RunAllocator(self):
4199 """Compute a new secondary node using an IAllocator.
4202 ial = IAllocator(self,
4203 mode=constants.IALLOCATOR_MODE_RELOC,
4204 name=self.op.instance_name,
4205 relocate_from=[self.sec_node])
4207 ial.Run(self.op.iallocator)
4210 raise errors.OpPrereqError("Can't compute nodes using"
4211 " iallocator '%s': %s" % (self.op.iallocator,
4213 if len(ial.nodes) != ial.required_nodes:
4214 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4215 " of nodes (%s), required %s" %
4216 (len(ial.nodes), ial.required_nodes))
4217 self.op.remote_node = ial.nodes[0]
4218 self.LogInfo("Selected new secondary for the instance: %s",
4219 self.op.remote_node)
4221 def BuildHooksEnv(self):
4224 This runs on the master, the primary and all the secondaries.
4228 "MODE": self.op.mode,
4229 "NEW_SECONDARY": self.op.remote_node,
4230 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4232 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4234 self.cfg.GetMasterNode(),
4235 self.instance.primary_node,
4237 if self.op.remote_node is not None:
4238 nl.append(self.op.remote_node)
4241 def CheckPrereq(self):
4242 """Check prerequisites.
4244 This checks that the instance is in the cluster.
4247 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4248 assert instance is not None, \
4249 "Cannot retrieve locked instance %s" % self.op.instance_name
4250 self.instance = instance
4252 if instance.disk_template not in constants.DTS_NET_MIRROR:
4253 raise errors.OpPrereqError("Instance's disk layout is not"
4254 " network mirrored.")
4256 if len(instance.secondary_nodes) != 1:
4257 raise errors.OpPrereqError("The instance has a strange layout,"
4258 " expected one secondary but found %d" %
4259 len(instance.secondary_nodes))
4261 self.sec_node = instance.secondary_nodes[0]
4263 ia_name = getattr(self.op, "iallocator", None)
4264 if ia_name is not None:
4265 self._RunAllocator()
4267 remote_node = self.op.remote_node
4268 if remote_node is not None:
4269 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4270 assert self.remote_node_info is not None, \
4271 "Cannot retrieve locked node %s" % remote_node
4273 self.remote_node_info = None
4274 if remote_node == instance.primary_node:
4275 raise errors.OpPrereqError("The specified node is the primary node of"
4277 elif remote_node == self.sec_node:
4278 if self.op.mode == constants.REPLACE_DISK_SEC:
4279 # this is for DRBD8, where we can't execute the same mode of
4280 # replacement as for drbd7 (no different port allocated)
4281 raise errors.OpPrereqError("Same secondary given, cannot execute"
4283 if instance.disk_template == constants.DT_DRBD8:
4284 if (self.op.mode == constants.REPLACE_DISK_ALL and
4285 remote_node is not None):
4286 # switch to replace secondary mode
4287 self.op.mode = constants.REPLACE_DISK_SEC
4289 if self.op.mode == constants.REPLACE_DISK_ALL:
4290 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4291 " secondary disk replacement, not"
4293 elif self.op.mode == constants.REPLACE_DISK_PRI:
4294 if remote_node is not None:
4295 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4296 " the secondary while doing a primary"
4297 " node disk replacement")
4298 self.tgt_node = instance.primary_node
4299 self.oth_node = instance.secondary_nodes[0]
4300 elif self.op.mode == constants.REPLACE_DISK_SEC:
4301 self.new_node = remote_node # this can be None, in which case
4302 # we don't change the secondary
4303 self.tgt_node = instance.secondary_nodes[0]
4304 self.oth_node = instance.primary_node
4306 raise errors.ProgrammerError("Unhandled disk replace mode")
4308 if not self.op.disks:
4309 self.op.disks = range(len(instance.disks))
4311 for disk_idx in self.op.disks:
4312 instance.FindDisk(disk_idx)
4314 def _ExecD8DiskOnly(self, feedback_fn):
4315 """Replace a disk on the primary or secondary for dbrd8.
4317 The algorithm for replace is quite complicated:
4319 1. for each disk to be replaced:
4321 1. create new LVs on the target node with unique names
4322 1. detach old LVs from the drbd device
4323 1. rename old LVs to name_replaced.<time_t>
4324 1. rename new LVs to old LVs
4325 1. attach the new LVs (with the old names now) to the drbd device
4327 1. wait for sync across all devices
4329 1. for each modified disk:
4331 1. remove old LVs (which have the name name_replaces.<time_t>)
4333 Failures are not very well handled.
4337 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4338 instance = self.instance
4340 vgname = self.cfg.GetVGName()
4343 tgt_node = self.tgt_node
4344 oth_node = self.oth_node
4346 # Step: check device activation
4347 self.proc.LogStep(1, steps_total, "check device existence")
4348 info("checking volume groups")
4349 my_vg = cfg.GetVGName()
4350 results = self.rpc.call_vg_list([oth_node, tgt_node])
4352 raise errors.OpExecError("Can't list volume groups on the nodes")
4353 for node in oth_node, tgt_node:
4355 if res.failed or not res.data or my_vg not in res.data:
4356 raise errors.OpExecError("Volume group '%s' not found on %s" %
4358 for idx, dev in enumerate(instance.disks):
4359 if idx not in self.op.disks:
4361 for node in tgt_node, oth_node:
4362 info("checking disk/%d on %s" % (idx, node))
4363 cfg.SetDiskID(dev, node)
4364 if not self.rpc.call_blockdev_find(node, dev):
4365 raise errors.OpExecError("Can't find disk/%d on node %s" %
4368 # Step: check other node consistency
4369 self.proc.LogStep(2, steps_total, "check peer consistency")
4370 for idx, dev in enumerate(instance.disks):
4371 if idx not in self.op.disks:
4373 info("checking disk/%d consistency on %s" % (idx, oth_node))
4374 if not _CheckDiskConsistency(self, dev, oth_node,
4375 oth_node==instance.primary_node):
4376 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4377 " to replace disks on this node (%s)" %
4378 (oth_node, tgt_node))
4380 # Step: create new storage
4381 self.proc.LogStep(3, steps_total, "allocate new storage")
4382 for idx, dev in enumerate(instance.disks):
4383 if idx not in self.op.disks:
4386 cfg.SetDiskID(dev, tgt_node)
4387 lv_names = [".disk%d_%s" % (idx, suf)
4388 for suf in ["data", "meta"]]
4389 names = _GenerateUniqueNames(self, lv_names)
4390 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4391 logical_id=(vgname, names[0]))
4392 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4393 logical_id=(vgname, names[1]))
4394 new_lvs = [lv_data, lv_meta]
4395 old_lvs = dev.children
4396 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4397 info("creating new local storage on %s for %s" %
4398 (tgt_node, dev.iv_name))
4399 # since we *always* want to create this LV, we use the
4400 # _Create...OnPrimary (which forces the creation), even if we
4401 # are talking about the secondary node
4402 for new_lv in new_lvs:
4403 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4404 _GetInstanceInfoText(instance)):
4405 raise errors.OpExecError("Failed to create new LV named '%s' on"
4407 (new_lv.logical_id[1], tgt_node))
4409 # Step: for each lv, detach+rename*2+attach
4410 self.proc.LogStep(4, steps_total, "change drbd configuration")
4411 for dev, old_lvs, new_lvs in iv_names.itervalues():
4412 info("detaching %s drbd from local storage" % dev.iv_name)
4413 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4416 raise errors.OpExecError("Can't detach drbd from local storage on node"
4417 " %s for device %s" % (tgt_node, dev.iv_name))
4419 #cfg.Update(instance)
4421 # ok, we created the new LVs, so now we know we have the needed
4422 # storage; as such, we proceed on the target node to rename
4423 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4424 # using the assumption that logical_id == physical_id (which in
4425 # turn is the unique_id on that node)
4427 # FIXME(iustin): use a better name for the replaced LVs
4428 temp_suffix = int(time.time())
4429 ren_fn = lambda d, suff: (d.physical_id[0],
4430 d.physical_id[1] + "_replaced-%s" % suff)
4431 # build the rename list based on what LVs exist on the node
4433 for to_ren in old_lvs:
4434 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4435 if not find_res.failed and find_res.data is not None: # device exists
4436 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4438 info("renaming the old LVs on the target node")
4439 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4442 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4443 # now we rename the new LVs to the old LVs
4444 info("renaming the new LVs on the target node")
4445 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4446 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4449 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4451 for old, new in zip(old_lvs, new_lvs):
4452 new.logical_id = old.logical_id
4453 cfg.SetDiskID(new, tgt_node)
4455 for disk in old_lvs:
4456 disk.logical_id = ren_fn(disk, temp_suffix)
4457 cfg.SetDiskID(disk, tgt_node)
4459 # now that the new lvs have the old name, we can add them to the device
4460 info("adding new mirror component on %s" % tgt_node)
4461 result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4462 if result.failed or not result.data:
4463 for new_lv in new_lvs:
4464 result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4465 if result.failed or not result.data:
4466 warning("Can't rollback device %s", hint="manually cleanup unused"
4468 raise errors.OpExecError("Can't add local storage to drbd")
4470 dev.children = new_lvs
4471 cfg.Update(instance)
4473 # Step: wait for sync
4475 # this can fail as the old devices are degraded and _WaitForSync
4476 # does a combined result over all disks, so we don't check its
4478 self.proc.LogStep(5, steps_total, "sync devices")
4479 _WaitForSync(self, instance, unlock=True)
4481 # so check manually all the devices
4482 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4483 cfg.SetDiskID(dev, instance.primary_node)
4484 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4485 if result.failed or result.data[5]:
4486 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4488 # Step: remove old storage
4489 self.proc.LogStep(6, steps_total, "removing old storage")
4490 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4491 info("remove logical volumes for %s" % name)
4493 cfg.SetDiskID(lv, tgt_node)
4494 result = self.rpc.call_blockdev_remove(tgt_node, lv)
4495 if result.failed or not result.data:
4496 warning("Can't remove old LV", hint="manually remove unused LVs")
4499 def _ExecD8Secondary(self, feedback_fn):
4500 """Replace the secondary node for drbd8.
4502 The algorithm for replace is quite complicated:
4503 - for all disks of the instance:
4504 - create new LVs on the new node with same names
4505 - shutdown the drbd device on the old secondary
4506 - disconnect the drbd network on the primary
4507 - create the drbd device on the new secondary
4508 - network attach the drbd on the primary, using an artifice:
4509 the drbd code for Attach() will connect to the network if it
4510 finds a device which is connected to the good local disks but
4512 - wait for sync across all devices
4513 - remove all disks from the old secondary
4515 Failures are not very well handled.
4519 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4520 instance = self.instance
4522 vgname = self.cfg.GetVGName()
4525 old_node = self.tgt_node
4526 new_node = self.new_node
4527 pri_node = instance.primary_node
4529 # Step: check device activation
4530 self.proc.LogStep(1, steps_total, "check device existence")
4531 info("checking volume groups")
4532 my_vg = cfg.GetVGName()
4533 results = self.rpc.call_vg_list([pri_node, new_node])
4534 for node in pri_node, new_node:
4536 if res.failed or not res.data or my_vg not in res.data:
4537 raise errors.OpExecError("Volume group '%s' not found on %s" %
4539 for idx, dev in enumerate(instance.disks):
4540 if idx not in self.op.disks:
4542 info("checking disk/%d on %s" % (idx, pri_node))
4543 cfg.SetDiskID(dev, pri_node)
4544 result = self.rpc.call_blockdev_find(pri_node, dev)
4547 raise errors.OpExecError("Can't find disk/%d on node %s" %
4550 # Step: check other node consistency
4551 self.proc.LogStep(2, steps_total, "check peer consistency")
4552 for idx, dev in enumerate(instance.disks):
4553 if idx not in self.op.disks:
4555 info("checking disk/%d consistency on %s" % (idx, pri_node))
4556 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4557 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4558 " unsafe to replace the secondary" %
4561 # Step: create new storage
4562 self.proc.LogStep(3, steps_total, "allocate new storage")
4563 for idx, dev in enumerate(instance.disks):
4565 info("adding new local storage on %s for disk/%d" %
4567 # since we *always* want to create this LV, we use the
4568 # _Create...OnPrimary (which forces the creation), even if we
4569 # are talking about the secondary node
4570 for new_lv in dev.children:
4571 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4572 _GetInstanceInfoText(instance)):
4573 raise errors.OpExecError("Failed to create new LV named '%s' on"
4575 (new_lv.logical_id[1], new_node))
4577 # Step 4: dbrd minors and drbd setups changes
4578 # after this, we must manually remove the drbd minors on both the
4579 # error and the success paths
4580 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4582 logging.debug("Allocated minors %s" % (minors,))
4583 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4584 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4586 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4587 # create new devices on new_node
4588 if pri_node == dev.logical_id[0]:
4589 new_logical_id = (pri_node, new_node,
4590 dev.logical_id[2], dev.logical_id[3], new_minor,
4593 new_logical_id = (new_node, pri_node,
4594 dev.logical_id[2], new_minor, dev.logical_id[4],
4596 iv_names[idx] = (dev, dev.children, new_logical_id)
4597 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4599 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4600 logical_id=new_logical_id,
4601 children=dev.children)
4602 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4604 _GetInstanceInfoText(instance)):
4605 self.cfg.ReleaseDRBDMinors(instance.name)
4606 raise errors.OpExecError("Failed to create new DRBD on"
4607 " node '%s'" % new_node)
4609 for idx, dev in enumerate(instance.disks):
4610 # we have new devices, shutdown the drbd on the old secondary
4611 info("shutting down drbd for disk/%d on old node" % idx)
4612 cfg.SetDiskID(dev, old_node)
4613 result = self.rpc.call_blockdev_shutdown(old_node, dev)
4614 if result.failed or not result.data:
4615 warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4616 hint="Please cleanup this device manually as soon as possible")
4618 info("detaching primary drbds from the network (=> standalone)")
4620 for idx, dev in enumerate(instance.disks):
4621 cfg.SetDiskID(dev, pri_node)
4622 # set the network part of the physical (unique in bdev terms) id
4623 # to None, meaning detach from network
4624 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4625 # and 'find' the device, which will 'fix' it to match the
4627 result = self.rpc.call_blockdev_find(pri_node, dev)
4628 if not result.failed and result.data:
4631 warning("Failed to detach drbd disk/%d from network, unusual case" %
4635 # no detaches succeeded (very unlikely)
4636 self.cfg.ReleaseDRBDMinors(instance.name)
4637 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4639 # if we managed to detach at least one, we update all the disks of
4640 # the instance to point to the new secondary
4641 info("updating instance configuration")
4642 for dev, _, new_logical_id in iv_names.itervalues():
4643 dev.logical_id = new_logical_id
4644 cfg.SetDiskID(dev, pri_node)
4645 cfg.Update(instance)
4646 # we can remove now the temp minors as now the new values are
4647 # written to the config file (and therefore stable)
4648 self.cfg.ReleaseDRBDMinors(instance.name)
4650 # and now perform the drbd attach
4651 info("attaching primary drbds to new secondary (standalone => connected)")
4653 for idx, dev in enumerate(instance.disks):
4654 info("attaching primary drbd for disk/%d to new secondary node" % idx)
4655 # since the attach is smart, it's enough to 'find' the device,
4656 # it will automatically activate the network, if the physical_id
4658 cfg.SetDiskID(dev, pri_node)
4659 logging.debug("Disk to attach: %s", dev)
4660 result = self.rpc.call_blockdev_find(pri_node, dev)
4661 if result.failed or not result.data:
4662 warning("can't attach drbd disk/%d to new secondary!" % idx,
4663 "please do a gnt-instance info to see the status of disks")
4665 # this can fail as the old devices are degraded and _WaitForSync
4666 # does a combined result over all disks, so we don't check its
4668 self.proc.LogStep(5, steps_total, "sync devices")
4669 _WaitForSync(self, instance, unlock=True)
4671 # so check manually all the devices
4672 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4673 cfg.SetDiskID(dev, pri_node)
4674 result = self.rpc.call_blockdev_find(pri_node, dev)
4677 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4679 self.proc.LogStep(6, steps_total, "removing old storage")
4680 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4681 info("remove logical volumes for disk/%d" % idx)
4683 cfg.SetDiskID(lv, old_node)
4684 result = self.rpc.call_blockdev_remove(old_node, lv)
4685 if result.failed or not result.data:
4686 warning("Can't remove LV on old secondary",
4687 hint="Cleanup stale volumes by hand")
4689 def Exec(self, feedback_fn):
4690 """Execute disk replacement.
4692 This dispatches the disk replacement to the appropriate handler.
4695 instance = self.instance
4697 # Activate the instance disks if we're replacing them on a down instance
4698 if instance.status == "down":
4699 _StartInstanceDisks(self, instance, True)
4701 if instance.disk_template == constants.DT_DRBD8:
4702 if self.op.remote_node is None:
4703 fn = self._ExecD8DiskOnly
4705 fn = self._ExecD8Secondary
4707 raise errors.ProgrammerError("Unhandled disk replacement case")
4709 ret = fn(feedback_fn)
4711 # Deactivate the instance disks if we're replacing them on a down instance
4712 if instance.status == "down":
4713 _SafeShutdownInstanceDisks(self, instance)
4718 class LUGrowDisk(LogicalUnit):
4719 """Grow a disk of an instance.
4723 HTYPE = constants.HTYPE_INSTANCE
4724 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4727 def ExpandNames(self):
4728 self._ExpandAndLockInstance()
4729 self.needed_locks[locking.LEVEL_NODE] = []
4730 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4732 def DeclareLocks(self, level):
4733 if level == locking.LEVEL_NODE:
4734 self._LockInstancesNodes()
4736 def BuildHooksEnv(self):
4739 This runs on the master, the primary and all the secondaries.
4743 "DISK": self.op.disk,
4744 "AMOUNT": self.op.amount,
4746 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4748 self.cfg.GetMasterNode(),
4749 self.instance.primary_node,
4753 def CheckPrereq(self):
4754 """Check prerequisites.
4756 This checks that the instance is in the cluster.
4759 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4760 assert instance is not None, \
4761 "Cannot retrieve locked instance %s" % self.op.instance_name
4763 self.instance = instance
4765 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4766 raise errors.OpPrereqError("Instance's disk layout does not support"
4769 self.disk = instance.FindDisk(self.op.disk)
4771 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4772 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4773 instance.hypervisor)
4774 for node in nodenames:
4775 info = nodeinfo[node]
4776 if info.failed or not info.data:
4777 raise errors.OpPrereqError("Cannot get current information"
4778 " from node '%s'" % node)
4779 vg_free = info.data.get('vg_free', None)
4780 if not isinstance(vg_free, int):
4781 raise errors.OpPrereqError("Can't compute free disk space on"
4783 if self.op.amount > vg_free:
4784 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4785 " %d MiB available, %d MiB required" %
4786 (node, vg_free, self.op.amount))
4788 def Exec(self, feedback_fn):
4789 """Execute disk grow.
4792 instance = self.instance
4794 for node in (instance.secondary_nodes + (instance.primary_node,)):
4795 self.cfg.SetDiskID(disk, node)
4796 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4798 if (not result.data or not isinstance(result.data, (list, tuple)) or
4799 len(result.data) != 2):
4800 raise errors.OpExecError("Grow request failed to node %s" % node)
4801 elif not result.data[0]:
4802 raise errors.OpExecError("Grow request failed to node %s: %s" %
4803 (node, result.data[1]))
4804 disk.RecordGrow(self.op.amount)
4805 self.cfg.Update(instance)
4806 if self.op.wait_for_sync:
4807 disk_abort = not _WaitForSync(self, instance)
4809 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4810 " status.\nPlease check the instance.")
4813 class LUQueryInstanceData(NoHooksLU):
4814 """Query runtime instance data.
4817 _OP_REQP = ["instances", "static"]
4820 def ExpandNames(self):
4821 self.needed_locks = {}
4822 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4824 if not isinstance(self.op.instances, list):
4825 raise errors.OpPrereqError("Invalid argument type 'instances'")
4827 if self.op.instances:
4828 self.wanted_names = []
4829 for name in self.op.instances:
4830 full_name = self.cfg.ExpandInstanceName(name)
4831 if full_name is None:
4832 raise errors.OpPrereqError("Instance '%s' not known" %
4833 self.op.instance_name)
4834 self.wanted_names.append(full_name)
4835 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4837 self.wanted_names = None
4838 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4840 self.needed_locks[locking.LEVEL_NODE] = []
4841 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4843 def DeclareLocks(self, level):
4844 if level == locking.LEVEL_NODE:
4845 self._LockInstancesNodes()
4847 def CheckPrereq(self):
4848 """Check prerequisites.
4850 This only checks the optional instance list against the existing names.
4853 if self.wanted_names is None:
4854 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4856 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4857 in self.wanted_names]
4860 def _ComputeDiskStatus(self, instance, snode, dev):
4861 """Compute block device status.
4864 static = self.op.static
4866 self.cfg.SetDiskID(dev, instance.primary_node)
4867 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4869 dev_pstatus = dev_pstatus.data
4873 if dev.dev_type in constants.LDS_DRBD:
4874 # we change the snode then (otherwise we use the one passed in)
4875 if dev.logical_id[0] == instance.primary_node:
4876 snode = dev.logical_id[1]
4878 snode = dev.logical_id[0]
4880 if snode and not static:
4881 self.cfg.SetDiskID(dev, snode)
4882 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4884 dev_sstatus = dev_sstatus.data
4889 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4890 for child in dev.children]
4895 "iv_name": dev.iv_name,
4896 "dev_type": dev.dev_type,
4897 "logical_id": dev.logical_id,
4898 "physical_id": dev.physical_id,
4899 "pstatus": dev_pstatus,
4900 "sstatus": dev_sstatus,
4901 "children": dev_children,
4907 def Exec(self, feedback_fn):
4908 """Gather and return data"""
4911 cluster = self.cfg.GetClusterInfo()
4913 for instance in self.wanted_instances:
4914 if not self.op.static:
4915 remote_info = self.rpc.call_instance_info(instance.primary_node,
4917 instance.hypervisor)
4919 remote_info = remote_info.data
4920 if remote_info and "state" in remote_info:
4923 remote_state = "down"
4926 if instance.status == "down":
4927 config_state = "down"
4931 disks = [self._ComputeDiskStatus(instance, None, device)
4932 for device in instance.disks]
4935 "name": instance.name,
4936 "config_state": config_state,
4937 "run_state": remote_state,
4938 "pnode": instance.primary_node,
4939 "snodes": instance.secondary_nodes,
4941 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4943 "hypervisor": instance.hypervisor,
4944 "network_port": instance.network_port,
4945 "hv_instance": instance.hvparams,
4946 "hv_actual": cluster.FillHV(instance),
4947 "be_instance": instance.beparams,
4948 "be_actual": cluster.FillBE(instance),
4951 result[instance.name] = idict
4956 class LUSetInstanceParams(LogicalUnit):
4957 """Modifies an instances's parameters.
4960 HPATH = "instance-modify"
4961 HTYPE = constants.HTYPE_INSTANCE
4962 _OP_REQP = ["instance_name"]
4965 def CheckArguments(self):
4966 if not hasattr(self.op, 'nics'):
4968 if not hasattr(self.op, 'disks'):
4970 if not hasattr(self.op, 'beparams'):
4971 self.op.beparams = {}
4972 if not hasattr(self.op, 'hvparams'):
4973 self.op.hvparams = {}
4974 self.op.force = getattr(self.op, "force", False)
4975 if not (self.op.nics or self.op.disks or
4976 self.op.hvparams or self.op.beparams):
4977 raise errors.OpPrereqError("No changes submitted")
4979 utils.CheckBEParams(self.op.beparams)
4983 for disk_op, disk_dict in self.op.disks:
4984 if disk_op == constants.DDM_REMOVE:
4987 elif disk_op == constants.DDM_ADD:
4990 if not isinstance(disk_op, int):
4991 raise errors.OpPrereqError("Invalid disk index")
4992 if disk_op == constants.DDM_ADD:
4993 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4994 if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4995 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4996 size = disk_dict.get('size', None)
4998 raise errors.OpPrereqError("Required disk parameter size missing")
5001 except ValueError, err:
5002 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5004 disk_dict['size'] = size
5006 # modification of disk
5007 if 'size' in disk_dict:
5008 raise errors.OpPrereqError("Disk size change not possible, use"
5011 if disk_addremove > 1:
5012 raise errors.OpPrereqError("Only one disk add or remove operation"
5013 " supported at a time")
5017 for nic_op, nic_dict in self.op.nics:
5018 if nic_op == constants.DDM_REMOVE:
5021 elif nic_op == constants.DDM_ADD:
5024 if not isinstance(nic_op, int):
5025 raise errors.OpPrereqError("Invalid nic index")
5027 # nic_dict should be a dict
5028 nic_ip = nic_dict.get('ip', None)
5029 if nic_ip is not None:
5030 if nic_ip.lower() == "none":
5031 nic_dict['ip'] = None
5033 if not utils.IsValidIP(nic_ip):
5034 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5035 # we can only check None bridges and assign the default one
5036 nic_bridge = nic_dict.get('bridge', None)
5037 if nic_bridge is None:
5038 nic_dict['bridge'] = self.cfg.GetDefBridge()
5039 # but we can validate MACs
5040 nic_mac = nic_dict.get('mac', None)
5041 if nic_mac is not None:
5042 if self.cfg.IsMacInUse(nic_mac):
5043 raise errors.OpPrereqError("MAC address %s already in use"
5044 " in cluster" % nic_mac)
5045 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5046 if not utils.IsValidMac(nic_mac):
5047 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5048 if nic_addremove > 1:
5049 raise errors.OpPrereqError("Only one NIC add or remove operation"
5050 " supported at a time")
5052 def ExpandNames(self):
5053 self._ExpandAndLockInstance()
5054 self.needed_locks[locking.LEVEL_NODE] = []
5055 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5057 def DeclareLocks(self, level):
5058 if level == locking.LEVEL_NODE:
5059 self._LockInstancesNodes()
5061 def BuildHooksEnv(self):
5064 This runs on the master, primary and secondaries.
5068 if constants.BE_MEMORY in self.be_new:
5069 args['memory'] = self.be_new[constants.BE_MEMORY]
5070 if constants.BE_VCPUS in self.be_new:
5071 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5072 # FIXME: readd disk/nic changes
5073 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5074 nl = [self.cfg.GetMasterNode(),
5075 self.instance.primary_node] + list(self.instance.secondary_nodes)
5078 def CheckPrereq(self):
5079 """Check prerequisites.
5081 This only checks the instance list against the existing names.
5084 force = self.force = self.op.force
5086 # checking the new params on the primary/secondary nodes
5088 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5089 assert self.instance is not None, \
5090 "Cannot retrieve locked instance %s" % self.op.instance_name
5091 pnode = self.instance.primary_node
5093 nodelist.extend(instance.secondary_nodes)
5095 # hvparams processing
5096 if self.op.hvparams:
5097 i_hvdict = copy.deepcopy(instance.hvparams)
5098 for key, val in self.op.hvparams.iteritems():
5099 if val == constants.VALUE_DEFAULT:
5104 elif val == constants.VALUE_NONE:
5105 i_hvdict[key] = None
5108 cluster = self.cfg.GetClusterInfo()
5109 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5112 hypervisor.GetHypervisor(
5113 instance.hypervisor).CheckParameterSyntax(hv_new)
5114 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5115 self.hv_new = hv_new # the new actual values
5116 self.hv_inst = i_hvdict # the new dict (without defaults)
5118 self.hv_new = self.hv_inst = {}
5120 # beparams processing
5121 if self.op.beparams:
5122 i_bedict = copy.deepcopy(instance.beparams)
5123 for key, val in self.op.beparams.iteritems():
5124 if val == constants.VALUE_DEFAULT:
5131 cluster = self.cfg.GetClusterInfo()
5132 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5134 self.be_new = be_new # the new actual values
5135 self.be_inst = i_bedict # the new dict (without defaults)
5137 self.be_new = self.be_inst = {}
5141 if constants.BE_MEMORY in self.op.beparams and not self.force:
5142 mem_check_list = [pnode]
5143 if be_new[constants.BE_AUTO_BALANCE]:
5144 # either we changed auto_balance to yes or it was from before
5145 mem_check_list.extend(instance.secondary_nodes)
5146 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5147 instance.hypervisor)
5148 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5149 instance.hypervisor)
5150 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5151 # Assume the primary node is unreachable and go ahead
5152 self.warn.append("Can't get info from primary node %s" % pnode)
5154 if not instance_info.failed and instance_info.data:
5155 current_mem = instance_info.data['memory']
5157 # Assume instance not running
5158 # (there is a slight race condition here, but it's not very probable,
5159 # and we have no other way to check)
5161 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5162 nodeinfo[pnode].data['memory_free'])
5164 raise errors.OpPrereqError("This change will prevent the instance"
5165 " from starting, due to %d MB of memory"
5166 " missing on its primary node" % miss_mem)
5168 if be_new[constants.BE_AUTO_BALANCE]:
5169 for node, nres in instance.secondary_nodes.iteritems():
5170 if nres.failed or not isinstance(nres.data, dict):
5171 self.warn.append("Can't get info from secondary node %s" % node)
5172 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5173 self.warn.append("Not enough memory to failover instance to"
5174 " secondary node %s" % node)
5177 for nic_op, nic_dict in self.op.nics:
5178 if nic_op == constants.DDM_REMOVE:
5179 if not instance.nics:
5180 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5182 if nic_op != constants.DDM_ADD:
5184 if nic_op < 0 or nic_op >= len(instance.nics):
5185 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5187 (nic_op, len(instance.nics)))
5188 nic_bridge = nic_dict.get('bridge', None)
5189 if nic_bridge is not None:
5190 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5191 msg = ("Bridge '%s' doesn't exist on one of"
5192 " the instance nodes" % nic_bridge)
5194 self.warn.append(msg)
5196 raise errors.OpPrereqError(msg)
5199 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5200 raise errors.OpPrereqError("Disk operations not supported for"
5201 " diskless instances")
5202 for disk_op, disk_dict in self.op.disks:
5203 if disk_op == constants.DDM_REMOVE:
5204 if len(instance.disks) == 1:
5205 raise errors.OpPrereqError("Cannot remove the last disk of"
5207 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5208 ins_l = ins_l[pnode]
5209 if not type(ins_l) is list:
5210 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5211 if instance.name in ins_l:
5212 raise errors.OpPrereqError("Instance is running, can't remove"
5215 if (disk_op == constants.DDM_ADD and
5216 len(instance.nics) >= constants.MAX_DISKS):
5217 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5218 " add more" % constants.MAX_DISKS)
5219 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5221 if disk_op < 0 or disk_op >= len(instance.disks):
5222 raise errors.OpPrereqError("Invalid disk index %s, valid values"
5224 (disk_op, len(instance.disks)))
5228 def Exec(self, feedback_fn):
5229 """Modifies an instance.
5231 All parameters take effect only at the next restart of the instance.
5234 # Process here the warnings from CheckPrereq, as we don't have a
5235 # feedback_fn there.
5236 for warn in self.warn:
5237 feedback_fn("WARNING: %s" % warn)
5240 instance = self.instance
5242 for disk_op, disk_dict in self.op.disks:
5243 if disk_op == constants.DDM_REMOVE:
5244 # remove the last disk
5245 device = instance.disks.pop()
5246 device_idx = len(instance.disks)
5247 for node, disk in device.ComputeNodeTree(instance.primary_node):
5248 self.cfg.SetDiskID(disk, node)
5249 result = self.rpc.call_blockdev_remove(node, disk)
5250 if result.failed or not result.data:
5251 self.proc.LogWarning("Could not remove disk/%d on node %s,"
5252 " continuing anyway", device_idx, node)
5253 result.append(("disk/%d" % device_idx, "remove"))
5254 elif disk_op == constants.DDM_ADD:
5256 if instance.disk_template == constants.DT_FILE:
5257 file_driver, file_path = instance.disks[0].logical_id
5258 file_path = os.path.dirname(file_path)
5260 file_driver = file_path = None
5261 disk_idx_base = len(instance.disks)
5262 new_disk = _GenerateDiskTemplate(self,
5263 instance.disk_template,
5264 instance, instance.primary_node,
5265 instance.secondary_nodes,
5270 new_disk.mode = disk_dict['mode']
5271 instance.disks.append(new_disk)
5272 info = _GetInstanceInfoText(instance)
5274 logging.info("Creating volume %s for instance %s",
5275 new_disk.iv_name, instance.name)
5276 # Note: this needs to be kept in sync with _CreateDisks
5278 for secondary_node in instance.secondary_nodes:
5279 if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5280 new_disk, False, info):
5281 self.LogWarning("Failed to create volume %s (%s) on"
5282 " secondary node %s!",
5283 new_disk.iv_name, new_disk, secondary_node)
5285 if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5286 instance, new_disk, info):
5287 self.LogWarning("Failed to create volume %s on primary!",
5289 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5290 (new_disk.size, new_disk.mode)))
5292 # change a given disk
5293 instance.disks[disk_op].mode = disk_dict['mode']
5294 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5296 for nic_op, nic_dict in self.op.nics:
5297 if nic_op == constants.DDM_REMOVE:
5298 # remove the last nic
5299 del instance.nics[-1]
5300 result.append(("nic.%d" % len(instance.nics), "remove"))
5301 elif nic_op == constants.DDM_ADD:
5303 if 'mac' not in nic_dict:
5304 mac = constants.VALUE_GENERATE
5306 mac = nic_dict['mac']
5307 if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5308 mac = self.cfg.GenerateMAC()
5309 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5310 bridge=nic_dict.get('bridge', None))
5311 instance.nics.append(new_nic)
5312 result.append(("nic.%d" % (len(instance.nics) - 1),
5313 "add:mac=%s,ip=%s,bridge=%s" %
5314 (new_nic.mac, new_nic.ip, new_nic.bridge)))
5316 # change a given nic
5317 for key in 'mac', 'ip', 'bridge':
5319 setattr(instance.nics[nic_op], key, nic_dict[key])
5320 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5323 if self.op.hvparams:
5324 instance.hvparams = self.hv_new
5325 for key, val in self.op.hvparams.iteritems():
5326 result.append(("hv/%s" % key, val))
5329 if self.op.beparams:
5330 instance.beparams = self.be_inst
5331 for key, val in self.op.beparams.iteritems():
5332 result.append(("be/%s" % key, val))
5334 self.cfg.Update(instance)
5339 class LUQueryExports(NoHooksLU):
5340 """Query the exports list
5343 _OP_REQP = ['nodes']
5346 def ExpandNames(self):
5347 self.needed_locks = {}
5348 self.share_locks[locking.LEVEL_NODE] = 1
5349 if not self.op.nodes:
5350 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5352 self.needed_locks[locking.LEVEL_NODE] = \
5353 _GetWantedNodes(self, self.op.nodes)
5355 def CheckPrereq(self):
5356 """Check prerequisites.
5359 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5361 def Exec(self, feedback_fn):
5362 """Compute the list of all the exported system images.
5365 @return: a dictionary with the structure node->(export-list)
5366 where export-list is a list of the instances exported on
5370 rpcresult = self.rpc.call_export_list(self.nodes)
5372 for node in rpcresult:
5373 if rpcresult[node].failed:
5374 result[node] = False
5376 result[node] = rpcresult[node].data
5381 class LUExportInstance(LogicalUnit):
5382 """Export an instance to an image in the cluster.
5385 HPATH = "instance-export"
5386 HTYPE = constants.HTYPE_INSTANCE
5387 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5390 def ExpandNames(self):
5391 self._ExpandAndLockInstance()
5392 # FIXME: lock only instance primary and destination node
5394 # Sad but true, for now we have do lock all nodes, as we don't know where
5395 # the previous export might be, and and in this LU we search for it and
5396 # remove it from its current node. In the future we could fix this by:
5397 # - making a tasklet to search (share-lock all), then create the new one,
5398 # then one to remove, after
5399 # - removing the removal operation altoghether
5400 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5402 def DeclareLocks(self, level):
5403 """Last minute lock declaration."""
5404 # All nodes are locked anyway, so nothing to do here.
5406 def BuildHooksEnv(self):
5409 This will run on the master, primary node and target node.
5413 "EXPORT_NODE": self.op.target_node,
5414 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5416 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5417 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5418 self.op.target_node]
5421 def CheckPrereq(self):
5422 """Check prerequisites.
5424 This checks that the instance and node names are valid.
5427 instance_name = self.op.instance_name
5428 self.instance = self.cfg.GetInstanceInfo(instance_name)
5429 assert self.instance is not None, \
5430 "Cannot retrieve locked instance %s" % self.op.instance_name
5432 self.dst_node = self.cfg.GetNodeInfo(
5433 self.cfg.ExpandNodeName(self.op.target_node))
5435 if self.dst_node is None:
5436 # This is wrong node name, not a non-locked node
5437 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5439 # instance disk type verification
5440 for disk in self.instance.disks:
5441 if disk.dev_type == constants.LD_FILE:
5442 raise errors.OpPrereqError("Export not supported for instances with"
5443 " file-based disks")
5445 def Exec(self, feedback_fn):
5446 """Export an instance to an image in the cluster.
5449 instance = self.instance
5450 dst_node = self.dst_node
5451 src_node = instance.primary_node
5452 if self.op.shutdown:
5453 # shutdown the instance, but not the disks
5454 result = self.rpc.call_instance_shutdown(src_node, instance)
5457 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5458 (instance.name, src_node))
5460 vgname = self.cfg.GetVGName()
5465 for disk in instance.disks:
5466 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5467 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5468 if new_dev_name.failed or not new_dev_name.data:
5469 self.LogWarning("Could not snapshot block device %s on node %s",
5470 disk.logical_id[1], src_node)
5471 snap_disks.append(False)
5473 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5474 logical_id=(vgname, new_dev_name.data),
5475 physical_id=(vgname, new_dev_name.data),
5476 iv_name=disk.iv_name)
5477 snap_disks.append(new_dev)
5480 if self.op.shutdown and instance.status == "up":
5481 result = self.rpc.call_instance_start(src_node, instance, None)
5482 if result.failed or not result.data:
5483 _ShutdownInstanceDisks(self, instance)
5484 raise errors.OpExecError("Could not start instance")
5486 # TODO: check for size
5488 cluster_name = self.cfg.GetClusterName()
5489 for idx, dev in enumerate(snap_disks):
5491 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5492 instance, cluster_name, idx)
5493 if result.failed or not result.data:
5494 self.LogWarning("Could not export block device %s from node %s to"
5495 " node %s", dev.logical_id[1], src_node,
5497 result = self.rpc.call_blockdev_remove(src_node, dev)
5498 if result.failed or not result.data:
5499 self.LogWarning("Could not remove snapshot block device %s from node"
5500 " %s", dev.logical_id[1], src_node)
5502 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5503 if result.failed or not result.data:
5504 self.LogWarning("Could not finalize export for instance %s on node %s",
5505 instance.name, dst_node.name)
5507 nodelist = self.cfg.GetNodeList()
5508 nodelist.remove(dst_node.name)
5510 # on one-node clusters nodelist will be empty after the removal
5511 # if we proceed the backup would be removed because OpQueryExports
5512 # substitutes an empty list with the full cluster node list.
5514 exportlist = self.rpc.call_export_list(nodelist)
5515 for node in exportlist:
5516 if exportlist[node].failed:
5518 if instance.name in exportlist[node].data:
5519 if not self.rpc.call_export_remove(node, instance.name):
5520 self.LogWarning("Could not remove older export for instance %s"
5521 " on node %s", instance.name, node)
5524 class LURemoveExport(NoHooksLU):
5525 """Remove exports related to the named instance.
5528 _OP_REQP = ["instance_name"]
5531 def ExpandNames(self):
5532 self.needed_locks = {}
5533 # We need all nodes to be locked in order for RemoveExport to work, but we
5534 # don't need to lock the instance itself, as nothing will happen to it (and
5535 # we can remove exports also for a removed instance)
5536 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5538 def CheckPrereq(self):
5539 """Check prerequisites.
5543 def Exec(self, feedback_fn):
5544 """Remove any export.
5547 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5548 # If the instance was not found we'll try with the name that was passed in.
5549 # This will only work if it was an FQDN, though.
5551 if not instance_name:
5553 instance_name = self.op.instance_name
5555 exportlist = self.rpc.call_export_list(self.acquired_locks[
5556 locking.LEVEL_NODE])
5558 for node in exportlist:
5559 if exportlist[node].failed:
5560 self.LogWarning("Failed to query node %s, continuing" % node)
5562 if instance_name in exportlist[node].data:
5564 result = self.rpc.call_export_remove(node, instance_name)
5565 if result.failed or not result.data:
5566 logging.error("Could not remove export for instance %s"
5567 " on node %s", instance_name, node)
5569 if fqdn_warn and not found:
5570 feedback_fn("Export not found. If trying to remove an export belonging"
5571 " to a deleted instance please use its Fully Qualified"
5575 class TagsLU(NoHooksLU):
5578 This is an abstract class which is the parent of all the other tags LUs.
5582 def ExpandNames(self):
5583 self.needed_locks = {}
5584 if self.op.kind == constants.TAG_NODE:
5585 name = self.cfg.ExpandNodeName(self.op.name)
5587 raise errors.OpPrereqError("Invalid node name (%s)" %
5590 self.needed_locks[locking.LEVEL_NODE] = name
5591 elif self.op.kind == constants.TAG_INSTANCE:
5592 name = self.cfg.ExpandInstanceName(self.op.name)
5594 raise errors.OpPrereqError("Invalid instance name (%s)" %
5597 self.needed_locks[locking.LEVEL_INSTANCE] = name
5599 def CheckPrereq(self):
5600 """Check prerequisites.
5603 if self.op.kind == constants.TAG_CLUSTER:
5604 self.target = self.cfg.GetClusterInfo()
5605 elif self.op.kind == constants.TAG_NODE:
5606 self.target = self.cfg.GetNodeInfo(self.op.name)
5607 elif self.op.kind == constants.TAG_INSTANCE:
5608 self.target = self.cfg.GetInstanceInfo(self.op.name)
5610 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5614 class LUGetTags(TagsLU):
5615 """Returns the tags of a given object.
5618 _OP_REQP = ["kind", "name"]
5621 def Exec(self, feedback_fn):
5622 """Returns the tag list.
5625 return list(self.target.GetTags())
5628 class LUSearchTags(NoHooksLU):
5629 """Searches the tags for a given pattern.
5632 _OP_REQP = ["pattern"]
5635 def ExpandNames(self):
5636 self.needed_locks = {}
5638 def CheckPrereq(self):
5639 """Check prerequisites.
5641 This checks the pattern passed for validity by compiling it.
5645 self.re = re.compile(self.op.pattern)
5646 except re.error, err:
5647 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5648 (self.op.pattern, err))
5650 def Exec(self, feedback_fn):
5651 """Returns the tag list.
5655 tgts = [("/cluster", cfg.GetClusterInfo())]
5656 ilist = cfg.GetAllInstancesInfo().values()
5657 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5658 nlist = cfg.GetAllNodesInfo().values()
5659 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5661 for path, target in tgts:
5662 for tag in target.GetTags():
5663 if self.re.search(tag):
5664 results.append((path, tag))
5668 class LUAddTags(TagsLU):
5669 """Sets a tag on a given object.
5672 _OP_REQP = ["kind", "name", "tags"]
5675 def CheckPrereq(self):
5676 """Check prerequisites.
5678 This checks the type and length of the tag name and value.
5681 TagsLU.CheckPrereq(self)
5682 for tag in self.op.tags:
5683 objects.TaggableObject.ValidateTag(tag)
5685 def Exec(self, feedback_fn):
5690 for tag in self.op.tags:
5691 self.target.AddTag(tag)
5692 except errors.TagError, err:
5693 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5695 self.cfg.Update(self.target)
5696 except errors.ConfigurationError:
5697 raise errors.OpRetryError("There has been a modification to the"
5698 " config file and the operation has been"
5699 " aborted. Please retry.")
5702 class LUDelTags(TagsLU):
5703 """Delete a list of tags from a given object.
5706 _OP_REQP = ["kind", "name", "tags"]
5709 def CheckPrereq(self):
5710 """Check prerequisites.
5712 This checks that we have the given tag.
5715 TagsLU.CheckPrereq(self)
5716 for tag in self.op.tags:
5717 objects.TaggableObject.ValidateTag(tag)
5718 del_tags = frozenset(self.op.tags)
5719 cur_tags = self.target.GetTags()
5720 if not del_tags <= cur_tags:
5721 diff_tags = del_tags - cur_tags
5722 diff_names = ["'%s'" % tag for tag in diff_tags]
5724 raise errors.OpPrereqError("Tag(s) %s not found" %
5725 (",".join(diff_names)))
5727 def Exec(self, feedback_fn):
5728 """Remove the tag from the object.
5731 for tag in self.op.tags:
5732 self.target.RemoveTag(tag)
5734 self.cfg.Update(self.target)
5735 except errors.ConfigurationError:
5736 raise errors.OpRetryError("There has been a modification to the"
5737 " config file and the operation has been"
5738 " aborted. Please retry.")
5741 class LUTestDelay(NoHooksLU):
5742 """Sleep for a specified amount of time.
5744 This LU sleeps on the master and/or nodes for a specified amount of
5748 _OP_REQP = ["duration", "on_master", "on_nodes"]
5751 def ExpandNames(self):
5752 """Expand names and set required locks.
5754 This expands the node list, if any.
5757 self.needed_locks = {}
5758 if self.op.on_nodes:
5759 # _GetWantedNodes can be used here, but is not always appropriate to use
5760 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5762 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5763 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5765 def CheckPrereq(self):
5766 """Check prerequisites.
5770 def Exec(self, feedback_fn):
5771 """Do the actual sleep.
5774 if self.op.on_master:
5775 if not utils.TestDelay(self.op.duration):
5776 raise errors.OpExecError("Error during master delay test")
5777 if self.op.on_nodes:
5778 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5780 raise errors.OpExecError("Complete failure from rpc call")
5781 for node, node_result in result.items():
5783 if not node_result.data:
5784 raise errors.OpExecError("Failure during rpc call to node %s,"
5785 " result: %s" % (node, node_result.data))
5788 class IAllocator(object):
5789 """IAllocator framework.
5791 An IAllocator instance has three sets of attributes:
5792 - cfg that is needed to query the cluster
5793 - input data (all members of the _KEYS class attribute are required)
5794 - four buffer attributes (in|out_data|text), that represent the
5795 input (to the external script) in text and data structure format,
5796 and the output from it, again in two formats
5797 - the result variables from the script (success, info, nodes) for
5802 "mem_size", "disks", "disk_template",
5803 "os", "tags", "nics", "vcpus", "hypervisor",
5809 def __init__(self, lu, mode, name, **kwargs):
5811 # init buffer variables
5812 self.in_text = self.out_text = self.in_data = self.out_data = None
5813 # init all input fields so that pylint is happy
5816 self.mem_size = self.disks = self.disk_template = None
5817 self.os = self.tags = self.nics = self.vcpus = None
5818 self.relocate_from = None
5820 self.required_nodes = None
5821 # init result fields
5822 self.success = self.info = self.nodes = None
5823 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5824 keyset = self._ALLO_KEYS
5825 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5826 keyset = self._RELO_KEYS
5828 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5829 " IAllocator" % self.mode)
5831 if key not in keyset:
5832 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5833 " IAllocator" % key)
5834 setattr(self, key, kwargs[key])
5836 if key not in kwargs:
5837 raise errors.ProgrammerError("Missing input parameter '%s' to"
5838 " IAllocator" % key)
5839 self._BuildInputData()
5841 def _ComputeClusterData(self):
5842 """Compute the generic allocator input data.
5844 This is the data that is independent of the actual operation.
5848 cluster_info = cfg.GetClusterInfo()
5852 "cluster_name": cfg.GetClusterName(),
5853 "cluster_tags": list(cluster_info.GetTags()),
5854 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5855 # we don't have job IDs
5857 iinfo = cfg.GetAllInstancesInfo().values()
5858 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5862 node_list = cfg.GetNodeList()
5864 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5865 hypervisor = self.hypervisor
5866 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5867 hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5869 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5871 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5872 cluster_info.enabled_hypervisors)
5873 for nname in node_list:
5874 ninfo = cfg.GetNodeInfo(nname)
5875 node_data[nname].Raise()
5876 if not isinstance(node_data[nname].data, dict):
5877 raise errors.OpExecError("Can't get data for node %s" % nname)
5878 remote_info = node_data[nname].data
5879 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5880 'vg_size', 'vg_free', 'cpu_total']:
5881 if attr not in remote_info:
5882 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5885 remote_info[attr] = int(remote_info[attr])
5886 except ValueError, err:
5887 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5888 " %s" % (nname, attr, str(err)))
5889 # compute memory used by primary instances
5890 i_p_mem = i_p_up_mem = 0
5891 for iinfo, beinfo in i_list:
5892 if iinfo.primary_node == nname:
5893 i_p_mem += beinfo[constants.BE_MEMORY]
5894 if iinfo.name not in node_iinfo[nname]:
5897 i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5898 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5899 remote_info['memory_free'] -= max(0, i_mem_diff)
5901 if iinfo.status == "up":
5902 i_p_up_mem += beinfo[constants.BE_MEMORY]
5904 # compute memory used by instances
5906 "tags": list(ninfo.GetTags()),
5907 "total_memory": remote_info['memory_total'],
5908 "reserved_memory": remote_info['memory_dom0'],
5909 "free_memory": remote_info['memory_free'],
5910 "i_pri_memory": i_p_mem,
5911 "i_pri_up_memory": i_p_up_mem,
5912 "total_disk": remote_info['vg_size'],
5913 "free_disk": remote_info['vg_free'],
5914 "primary_ip": ninfo.primary_ip,
5915 "secondary_ip": ninfo.secondary_ip,
5916 "total_cpus": remote_info['cpu_total'],
5918 node_results[nname] = pnr
5919 data["nodes"] = node_results
5923 for iinfo, beinfo in i_list:
5924 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5925 for n in iinfo.nics]
5927 "tags": list(iinfo.GetTags()),
5928 "should_run": iinfo.status == "up",
5929 "vcpus": beinfo[constants.BE_VCPUS],
5930 "memory": beinfo[constants.BE_MEMORY],
5932 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5934 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5935 "disk_template": iinfo.disk_template,
5936 "hypervisor": iinfo.hypervisor,
5938 instance_data[iinfo.name] = pir
5940 data["instances"] = instance_data
5944 def _AddNewInstance(self):
5945 """Add new instance data to allocator structure.
5947 This in combination with _AllocatorGetClusterData will create the
5948 correct structure needed as input for the allocator.
5950 The checks for the completeness of the opcode must have already been
5955 if len(self.disks) != 2:
5956 raise errors.OpExecError("Only two-disk configurations supported")
5958 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5960 if self.disk_template in constants.DTS_NET_MIRROR:
5961 self.required_nodes = 2
5963 self.required_nodes = 1
5967 "disk_template": self.disk_template,
5970 "vcpus": self.vcpus,
5971 "memory": self.mem_size,
5972 "disks": self.disks,
5973 "disk_space_total": disk_space,
5975 "required_nodes": self.required_nodes,
5977 data["request"] = request
5979 def _AddRelocateInstance(self):
5980 """Add relocate instance data to allocator structure.
5982 This in combination with _IAllocatorGetClusterData will create the
5983 correct structure needed as input for the allocator.
5985 The checks for the completeness of the opcode must have already been
5989 instance = self.lu.cfg.GetInstanceInfo(self.name)
5990 if instance is None:
5991 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5992 " IAllocator" % self.name)
5994 if instance.disk_template not in constants.DTS_NET_MIRROR:
5995 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5997 if len(instance.secondary_nodes) != 1:
5998 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6000 self.required_nodes = 1
6001 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6002 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6007 "disk_space_total": disk_space,
6008 "required_nodes": self.required_nodes,
6009 "relocate_from": self.relocate_from,
6011 self.in_data["request"] = request
6013 def _BuildInputData(self):
6014 """Build input data structures.
6017 self._ComputeClusterData()
6019 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6020 self._AddNewInstance()
6022 self._AddRelocateInstance()
6024 self.in_text = serializer.Dump(self.in_data)
6026 def Run(self, name, validate=True, call_fn=None):
6027 """Run an instance allocator and return the results.
6031 call_fn = self.lu.rpc.call_iallocator_runner
6034 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6037 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6038 raise errors.OpExecError("Invalid result from master iallocator runner")
6040 rcode, stdout, stderr, fail = result.data
6042 if rcode == constants.IARUN_NOTFOUND:
6043 raise errors.OpExecError("Can't find allocator '%s'" % name)
6044 elif rcode == constants.IARUN_FAILURE:
6045 raise errors.OpExecError("Instance allocator call failed: %s,"
6046 " output: %s" % (fail, stdout+stderr))
6047 self.out_text = stdout
6049 self._ValidateResult()
6051 def _ValidateResult(self):
6052 """Process the allocator results.
6054 This will process and if successful save the result in
6055 self.out_data and the other parameters.
6059 rdict = serializer.Load(self.out_text)
6060 except Exception, err:
6061 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6063 if not isinstance(rdict, dict):
6064 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6066 for key in "success", "info", "nodes":
6067 if key not in rdict:
6068 raise errors.OpExecError("Can't parse iallocator results:"
6069 " missing key '%s'" % key)
6070 setattr(self, key, rdict[key])
6072 if not isinstance(rdict["nodes"], list):
6073 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6075 self.out_data = rdict
6078 class LUTestAllocator(NoHooksLU):
6079 """Run allocator tests.
6081 This LU runs the allocator tests
6084 _OP_REQP = ["direction", "mode", "name"]
6086 def CheckPrereq(self):
6087 """Check prerequisites.
6089 This checks the opcode parameters depending on the director and mode test.
6092 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6093 for attr in ["name", "mem_size", "disks", "disk_template",
6094 "os", "tags", "nics", "vcpus"]:
6095 if not hasattr(self.op, attr):
6096 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6098 iname = self.cfg.ExpandInstanceName(self.op.name)
6099 if iname is not None:
6100 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6102 if not isinstance(self.op.nics, list):
6103 raise errors.OpPrereqError("Invalid parameter 'nics'")
6104 for row in self.op.nics:
6105 if (not isinstance(row, dict) or
6108 "bridge" not in row):
6109 raise errors.OpPrereqError("Invalid contents of the"
6110 " 'nics' parameter")
6111 if not isinstance(self.op.disks, list):
6112 raise errors.OpPrereqError("Invalid parameter 'disks'")
6113 if len(self.op.disks) != 2:
6114 raise errors.OpPrereqError("Only two-disk configurations supported")
6115 for row in self.op.disks:
6116 if (not isinstance(row, dict) or
6117 "size" not in row or
6118 not isinstance(row["size"], int) or
6119 "mode" not in row or
6120 row["mode"] not in ['r', 'w']):
6121 raise errors.OpPrereqError("Invalid contents of the"
6122 " 'disks' parameter")
6123 if self.op.hypervisor is None:
6124 self.op.hypervisor = self.cfg.GetHypervisorType()
6125 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6126 if not hasattr(self.op, "name"):
6127 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6128 fname = self.cfg.ExpandInstanceName(self.op.name)
6130 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6132 self.op.name = fname
6133 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6135 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6138 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6139 if not hasattr(self.op, "allocator") or self.op.allocator is None:
6140 raise errors.OpPrereqError("Missing allocator name")
6141 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6142 raise errors.OpPrereqError("Wrong allocator test '%s'" %
6145 def Exec(self, feedback_fn):
6146 """Run the allocator test.
6149 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6150 ial = IAllocator(self,
6153 mem_size=self.op.mem_size,
6154 disks=self.op.disks,
6155 disk_template=self.op.disk_template,
6159 vcpus=self.op.vcpus,
6160 hypervisor=self.op.hypervisor,
6163 ial = IAllocator(self,
6166 relocate_from=list(self.relocate_from),
6169 if self.op.direction == constants.IALLOCATOR_DIR_IN:
6170 result = ial.in_text
6172 ial.Run(self.op.allocator, validate=False)
6173 result = ial.out_text