4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46 from ganeti import ssconf
49 class LogicalUnit(object):
50 """Logical Unit base class.
52 Subclasses must follow these rules:
53 - implement ExpandNames
54 - implement CheckPrereq
56 - implement BuildHooksEnv
57 - redefine HPATH and HTYPE
58 - optionally redefine their run requirements:
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
69 def __init__(self, processor, op, context, rpc):
70 """Constructor for LogicalUnit.
72 This needs to be overriden in derived classes in order to check op
78 self.cfg = context.cfg
79 self.context = context
81 # Dicts used to declare locking needs to mcpu
82 self.needed_locks = None
83 self.acquired_locks = {}
84 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86 self.remove_locks = {}
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 self.LogWarning = processor.LogWarning
92 self.LogInfo = processor.LogInfo
94 for attr_name in self._OP_REQP:
95 attr_val = getattr(op, attr_name, None)
97 raise errors.OpPrereqError("Required parameter '%s' missing" %
102 """Returns the SshRunner object
106 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
109 ssh = property(fget=__GetSSH)
111 def CheckArguments(self):
112 """Check syntactic validity for the opcode arguments.
114 This method is for doing a simple syntactic check and ensure
115 validity of opcode parameters, without any cluster-related
116 checks. While the same can be accomplished in ExpandNames and/or
117 CheckPrereq, doing these separate is better because:
119 - ExpandNames is left as as purely a lock-related function
120 - CheckPrereq is run after we have aquired locks (and possible
123 The function is allowed to change the self.op attribute so that
124 later methods can no longer worry about missing parameters.
129 def ExpandNames(self):
130 """Expand names for this LU.
132 This method is called before starting to execute the opcode, and it should
133 update all the parameters of the opcode to their canonical form (e.g. a
134 short node name must be fully expanded after this method has successfully
135 completed). This way locking, hooks, logging, ecc. can work correctly.
137 LUs which implement this method must also populate the self.needed_locks
138 member, as a dict with lock levels as keys, and a list of needed lock names
141 - use an empty dict if you don't need any lock
142 - if you don't need any lock at a particular level omit that level
143 - don't put anything for the BGL level
144 - if you want all locks at a level use locking.ALL_SET as a value
146 If you need to share locks (rather than acquire them exclusively) at one
147 level you can modify self.share_locks, setting a true value (usually 1) for
148 that level. By default locks are not shared.
152 # Acquire all nodes and one instance
153 self.needed_locks = {
154 locking.LEVEL_NODE: locking.ALL_SET,
155 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157 # Acquire just two nodes
158 self.needed_locks = {
159 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
162 self.needed_locks = {} # No, you can't leave it to the default value None
165 # The implementation of this method is mandatory only if the new LU is
166 # concurrent, so that old LUs don't need to be changed all at the same
169 self.needed_locks = {} # Exclusive LUs don't need locks.
171 raise NotImplementedError
173 def DeclareLocks(self, level):
174 """Declare LU locking needs for a level
176 While most LUs can just declare their locking needs at ExpandNames time,
177 sometimes there's the need to calculate some locks after having acquired
178 the ones before. This function is called just before acquiring locks at a
179 particular level, but after acquiring the ones at lower levels, and permits
180 such calculations. It can be used to modify self.needed_locks, and by
181 default it does nothing.
183 This function is only called if you have something already set in
184 self.needed_locks for the level.
186 @param level: Locking level which is going to be locked
187 @type level: member of ganeti.locking.LEVELS
191 def CheckPrereq(self):
192 """Check prerequisites for this LU.
194 This method should check that the prerequisites for the execution
195 of this LU are fulfilled. It can do internode communication, but
196 it should be idempotent - no cluster or system changes are
199 The method should raise errors.OpPrereqError in case something is
200 not fulfilled. Its return value is ignored.
202 This method should also update all the parameters of the opcode to
203 their canonical form if it hasn't been done by ExpandNames before.
206 raise NotImplementedError
208 def Exec(self, feedback_fn):
211 This method should implement the actual work. It should raise
212 errors.OpExecError for failures that are somewhat dealt with in
216 raise NotImplementedError
218 def BuildHooksEnv(self):
219 """Build hooks environment for this LU.
221 This method should return a three-node tuple consisting of: a dict
222 containing the environment that will be used for running the
223 specific hook for this LU, a list of node names on which the hook
224 should run before the execution, and a list of node names on which
225 the hook should run after the execution.
227 The keys of the dict must not have 'GANETI_' prefixed as this will
228 be handled in the hooks runner. Also note additional keys will be
229 added by the hooks runner. If the LU doesn't define any
230 environment, an empty dict (and not None) should be returned.
232 No nodes should be returned as an empty list (and not None).
234 Note that if the HPATH for a LU class is None, this function will
238 raise NotImplementedError
240 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241 """Notify the LU about the results of its hooks.
243 This method is called every time a hooks phase is executed, and notifies
244 the Logical Unit about the hooks' result. The LU can then use it to alter
245 its result based on the hooks. By default the method does nothing and the
246 previous result is passed back unchanged but any LU can define it if it
247 wants to use the local cluster hook-scripts somehow.
249 @param phase: one of L{constants.HOOKS_PHASE_POST} or
250 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251 @param hook_results: the results of the multi-node hooks rpc call
252 @param feedback_fn: function used send feedback back to the caller
253 @param lu_result: the previous Exec result this LU had, or None
255 @return: the new Exec result, based on the previous result
261 def _ExpandAndLockInstance(self):
262 """Helper function to expand and lock an instance.
264 Many LUs that work on an instance take its name in self.op.instance_name
265 and need to expand it and then declare the expanded name for locking. This
266 function does it, and then updates self.op.instance_name to the expanded
267 name. It also initializes needed_locks as a dict, if this hasn't been done
271 if self.needed_locks is None:
272 self.needed_locks = {}
274 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275 "_ExpandAndLockInstance called with instance-level locks set"
276 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277 if expanded_name is None:
278 raise errors.OpPrereqError("Instance '%s' not known" %
279 self.op.instance_name)
280 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281 self.op.instance_name = expanded_name
283 def _LockInstancesNodes(self, primary_only=False):
284 """Helper function to declare instances' nodes for locking.
286 This function should be called after locking one or more instances to lock
287 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288 with all primary or secondary nodes for instances already locked and
289 present in self.needed_locks[locking.LEVEL_INSTANCE].
291 It should be called from DeclareLocks, and for safety only works if
292 self.recalculate_locks[locking.LEVEL_NODE] is set.
294 In the future it may grow parameters to just lock some instance's nodes, or
295 to just lock primaries or secondary nodes, if needed.
297 If should be called in DeclareLocks in a way similar to::
299 if level == locking.LEVEL_NODE:
300 self._LockInstancesNodes()
302 @type primary_only: boolean
303 @param primary_only: only lock primary nodes of locked instances
306 assert locking.LEVEL_NODE in self.recalculate_locks, \
307 "_LockInstancesNodes helper function called with no nodes to recalculate"
309 # TODO: check if we're really been called with the instance locks held
311 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312 # future we might want to have different behaviors depending on the value
313 # of self.recalculate_locks[locking.LEVEL_NODE]
315 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316 instance = self.context.cfg.GetInstanceInfo(instance_name)
317 wanted_nodes.append(instance.primary_node)
319 wanted_nodes.extend(instance.secondary_nodes)
321 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326 del self.recalculate_locks[locking.LEVEL_NODE]
329 class NoHooksLU(LogicalUnit):
330 """Simple LU which runs no hooks.
332 This LU is intended as a parent for other LogicalUnits which will
333 run no hooks, in order to reduce duplicate code.
340 def _GetWantedNodes(lu, nodes):
341 """Returns list of checked and expanded node names.
343 @type lu: L{LogicalUnit}
344 @param lu: the logical unit on whose behalf we execute
346 @param nodes: list of node names or None for all nodes
348 @return: the list of nodes, sorted
349 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
352 if not isinstance(nodes, list):
353 raise errors.OpPrereqError("Invalid argument type 'nodes'")
356 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357 " non-empty list of nodes whose name is to be expanded.")
361 node = lu.cfg.ExpandNodeName(name)
363 raise errors.OpPrereqError("No such node name '%s'" % name)
366 return utils.NiceSort(wanted)
369 def _GetWantedInstances(lu, instances):
370 """Returns list of checked and expanded instance names.
372 @type lu: L{LogicalUnit}
373 @param lu: the logical unit on whose behalf we execute
374 @type instances: list
375 @param instances: list of instance names or None for all instances
377 @return: the list of instances, sorted
378 @raise errors.OpPrereqError: if the instances parameter is wrong type
379 @raise errors.OpPrereqError: if any of the passed instances is not found
382 if not isinstance(instances, list):
383 raise errors.OpPrereqError("Invalid argument type 'instances'")
388 for name in instances:
389 instance = lu.cfg.ExpandInstanceName(name)
391 raise errors.OpPrereqError("No such instance name '%s'" % name)
392 wanted.append(instance)
395 wanted = lu.cfg.GetInstanceList()
396 return utils.NiceSort(wanted)
399 def _CheckOutputFields(static, dynamic, selected):
400 """Checks whether all selected fields are valid.
402 @type static: L{utils.FieldSet}
403 @param static: static fields set
404 @type dynamic: L{utils.FieldSet}
405 @param dynamic: dynamic fields set
412 delta = f.NonMatching(selected)
414 raise errors.OpPrereqError("Unknown output fields selected: %s"
418 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
419 memory, vcpus, nics):
420 """Builds instance related env variables for hooks
422 This builds the hook environment from individual variables.
425 @param name: the name of the instance
426 @type primary_node: string
427 @param primary_node: the name of the instance's primary node
428 @type secondary_nodes: list
429 @param secondary_nodes: list of secondary nodes as strings
430 @type os_type: string
431 @param os_type: the name of the instance's OS
433 @param status: the desired status of the instances
435 @param memory: the memory size of the instance
437 @param vcpus: the count of VCPUs the instance has
439 @param nics: list of tuples (ip, bridge, mac) representing
440 the NICs the instance has
442 @return: the hook environment for this instance
447 "INSTANCE_NAME": name,
448 "INSTANCE_PRIMARY": primary_node,
449 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
450 "INSTANCE_OS_TYPE": os_type,
451 "INSTANCE_STATUS": status,
452 "INSTANCE_MEMORY": memory,
453 "INSTANCE_VCPUS": vcpus,
457 nic_count = len(nics)
458 for idx, (ip, bridge, mac) in enumerate(nics):
461 env["INSTANCE_NIC%d_IP" % idx] = ip
462 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
463 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
467 env["INSTANCE_NIC_COUNT"] = nic_count
472 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
473 """Builds instance related env variables for hooks from an object.
475 @type lu: L{LogicalUnit}
476 @param lu: the logical unit on whose behalf we execute
477 @type instance: L{objects.Instance}
478 @param instance: the instance for which we should build the
481 @param override: dictionary with key/values that will override
484 @return: the hook environment dictionary
487 bep = lu.cfg.GetClusterInfo().FillBE(instance)
489 'name': instance.name,
490 'primary_node': instance.primary_node,
491 'secondary_nodes': instance.secondary_nodes,
492 'os_type': instance.os,
493 'status': instance.os,
494 'memory': bep[constants.BE_MEMORY],
495 'vcpus': bep[constants.BE_VCPUS],
496 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
499 args.update(override)
500 return _BuildInstanceHookEnv(**args)
503 def _CheckInstanceBridgesExist(lu, instance):
504 """Check that the brigdes needed by an instance exist.
507 # check bridges existance
508 brlist = [nic.bridge for nic in instance.nics]
509 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
512 raise errors.OpPrereqError("One or more target bridges %s does not"
513 " exist on destination node '%s'" %
514 (brlist, instance.primary_node))
517 class LUDestroyCluster(NoHooksLU):
518 """Logical unit for destroying the cluster.
523 def CheckPrereq(self):
524 """Check prerequisites.
526 This checks whether the cluster is empty.
528 Any errors are signalled by raising errors.OpPrereqError.
531 master = self.cfg.GetMasterNode()
533 nodelist = self.cfg.GetNodeList()
534 if len(nodelist) != 1 or nodelist[0] != master:
535 raise errors.OpPrereqError("There are still %d node(s) in"
536 " this cluster." % (len(nodelist) - 1))
537 instancelist = self.cfg.GetInstanceList()
539 raise errors.OpPrereqError("There are still %d instance(s) in"
540 " this cluster." % len(instancelist))
542 def Exec(self, feedback_fn):
543 """Destroys the cluster.
546 master = self.cfg.GetMasterNode()
547 result = self.rpc.call_node_stop_master(master, False)
550 raise errors.OpExecError("Could not disable the master role")
551 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
552 utils.CreateBackup(priv_key)
553 utils.CreateBackup(pub_key)
557 class LUVerifyCluster(LogicalUnit):
558 """Verifies the cluster status.
561 HPATH = "cluster-verify"
562 HTYPE = constants.HTYPE_CLUSTER
563 _OP_REQP = ["skip_checks"]
566 def ExpandNames(self):
567 self.needed_locks = {
568 locking.LEVEL_NODE: locking.ALL_SET,
569 locking.LEVEL_INSTANCE: locking.ALL_SET,
571 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
573 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
574 node_result, feedback_fn, master_files):
575 """Run multiple tests against a node.
579 - compares ganeti version
580 - checks vg existance and size > 20G
581 - checks config file checksum
582 - checks ssh to other nodes
584 @type nodeinfo: L{objects.Node}
585 @param nodeinfo: the node to check
586 @param file_list: required list of files
587 @param local_cksum: dictionary of local files and their checksums
588 @param node_result: the results from the node
589 @param feedback_fn: function used to accumulate results
590 @param master_files: list of files that only masters should have
595 # main result, node_result should be a non-empty dict
596 if not node_result or not isinstance(node_result, dict):
597 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
600 # compares ganeti version
601 local_version = constants.PROTOCOL_VERSION
602 remote_version = node_result.get('version', None)
603 if not remote_version:
604 feedback_fn(" - ERROR: connection to %s failed" % (node))
607 if local_version != remote_version:
608 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
609 (local_version, node, remote_version))
612 # checks vg existance and size > 20G
615 vglist = node_result.get(constants.NV_VGLIST, None)
617 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
621 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
622 constants.MIN_VG_SIZE)
624 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
627 # checks config file checksum
629 remote_cksum = node_result.get(constants.NV_FILELIST, None)
630 if not isinstance(remote_cksum, dict):
632 feedback_fn(" - ERROR: node hasn't returned file checksum data")
634 for file_name in file_list:
635 node_is_mc = nodeinfo.master_candidate
636 must_have_file = file_name not in master_files
637 if file_name not in remote_cksum:
638 if node_is_mc or must_have_file:
640 feedback_fn(" - ERROR: file '%s' missing" % file_name)
641 elif remote_cksum[file_name] != local_cksum[file_name]:
642 if node_is_mc or must_have_file:
644 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
646 # not candidate and this is not a must-have file
648 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
651 # all good, except non-master/non-must have combination
652 if not node_is_mc and not must_have_file:
653 feedback_fn(" - ERROR: file '%s' should not exist on non master"
654 " candidates" % file_name)
658 if constants.NV_NODELIST not in node_result:
660 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
662 if node_result[constants.NV_NODELIST]:
664 for node in node_result[constants.NV_NODELIST]:
665 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
666 (node, node_result[constants.NV_NODELIST][node]))
668 if constants.NV_NODENETTEST not in node_result:
670 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
672 if node_result[constants.NV_NODENETTEST]:
674 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
676 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
677 (node, node_result[constants.NV_NODENETTEST][node]))
679 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
680 if isinstance(hyp_result, dict):
681 for hv_name, hv_result in hyp_result.iteritems():
682 if hv_result is not None:
683 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
684 (hv_name, hv_result))
687 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
688 node_instance, feedback_fn):
689 """Verify an instance.
691 This function checks to see if the required block devices are
692 available on the instance's node.
697 node_current = instanceconfig.primary_node
700 instanceconfig.MapLVsByNode(node_vol_should)
702 for node in node_vol_should:
703 for volume in node_vol_should[node]:
704 if node not in node_vol_is or volume not in node_vol_is[node]:
705 feedback_fn(" - ERROR: volume %s missing on node %s" %
709 if not instanceconfig.status == 'down':
710 if (node_current not in node_instance or
711 not instance in node_instance[node_current]):
712 feedback_fn(" - ERROR: instance %s not running on node %s" %
713 (instance, node_current))
716 for node in node_instance:
717 if (not node == node_current):
718 if instance in node_instance[node]:
719 feedback_fn(" - ERROR: instance %s should not run on node %s" %
725 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
726 """Verify if there are any unknown volumes in the cluster.
728 The .os, .swap and backup volumes are ignored. All other volumes are
734 for node in node_vol_is:
735 for volume in node_vol_is[node]:
736 if node not in node_vol_should or volume not in node_vol_should[node]:
737 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
742 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
743 """Verify the list of running instances.
745 This checks what instances are running but unknown to the cluster.
749 for node in node_instance:
750 for runninginstance in node_instance[node]:
751 if runninginstance not in instancelist:
752 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
753 (runninginstance, node))
757 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
758 """Verify N+1 Memory Resilience.
760 Check that if one single node dies we can still start all the instances it
766 for node, nodeinfo in node_info.iteritems():
767 # This code checks that every node which is now listed as secondary has
768 # enough memory to host all instances it is supposed to should a single
769 # other node in the cluster fail.
770 # FIXME: not ready for failover to an arbitrary node
771 # FIXME: does not support file-backed instances
772 # WARNING: we currently take into account down instances as well as up
773 # ones, considering that even if they're down someone might want to start
774 # them even in the event of a node failure.
775 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
777 for instance in instances:
778 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
779 if bep[constants.BE_AUTO_BALANCE]:
780 needed_mem += bep[constants.BE_MEMORY]
781 if nodeinfo['mfree'] < needed_mem:
782 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
783 " failovers should node %s fail" % (node, prinode))
787 def CheckPrereq(self):
788 """Check prerequisites.
790 Transform the list of checks we're going to skip into a set and check that
791 all its members are valid.
794 self.skip_set = frozenset(self.op.skip_checks)
795 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
796 raise errors.OpPrereqError("Invalid checks to be skipped specified")
798 def BuildHooksEnv(self):
801 Cluster-Verify hooks just rone in the post phase and their failure makes
802 the output be logged in the verify output and the verification to fail.
805 all_nodes = self.cfg.GetNodeList()
806 # TODO: populate the environment with useful information for verify hooks
808 return env, [], all_nodes
810 def Exec(self, feedback_fn):
811 """Verify integrity of cluster, performing various test on nodes.
815 feedback_fn("* Verifying global settings")
816 for msg in self.cfg.VerifyConfig():
817 feedback_fn(" - ERROR: %s" % msg)
819 vg_name = self.cfg.GetVGName()
820 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
821 nodelist = utils.NiceSort(self.cfg.GetNodeList())
822 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
823 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
824 i_non_redundant = [] # Non redundant instances
825 i_non_a_balanced = [] # Non auto-balanced instances
831 # FIXME: verify OS list
833 master_files = [constants.CLUSTER_CONF_FILE]
835 file_names = ssconf.SimpleStore().GetFileList()
836 file_names.append(constants.SSL_CERT_FILE)
837 file_names.extend(master_files)
839 local_checksums = utils.FingerprintFiles(file_names)
841 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
842 node_verify_param = {
843 constants.NV_FILELIST: file_names,
844 constants.NV_NODELIST: nodelist,
845 constants.NV_HYPERVISOR: hypervisors,
846 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
847 node.secondary_ip) for node in nodeinfo],
848 constants.NV_LVLIST: vg_name,
849 constants.NV_INSTANCELIST: hypervisors,
850 constants.NV_VGLIST: None,
851 constants.NV_VERSION: None,
852 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
854 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
855 self.cfg.GetClusterName())
857 cluster = self.cfg.GetClusterInfo()
858 master_node = self.cfg.GetMasterNode()
859 for node_i in nodeinfo:
861 nresult = all_nvinfo[node].data
863 if node == master_node:
865 elif node_i.master_candidate:
866 ntype = "master candidate"
869 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
871 if all_nvinfo[node].failed or not isinstance(nresult, dict):
872 feedback_fn(" - ERROR: connection to %s failed" % (node,))
876 result = self._VerifyNode(node_i, file_names, local_checksums,
877 nresult, feedback_fn, master_files)
880 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
881 if isinstance(lvdata, basestring):
882 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
883 (node, lvdata.encode('string_escape')))
885 node_volume[node] = {}
886 elif not isinstance(lvdata, dict):
887 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
891 node_volume[node] = lvdata
894 idata = nresult.get(constants.NV_INSTANCELIST, None)
895 if not isinstance(idata, list):
896 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
901 node_instance[node] = idata
904 nodeinfo = nresult.get(constants.NV_HVINFO, None)
905 if not isinstance(nodeinfo, dict):
906 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
912 "mfree": int(nodeinfo['memory_free']),
913 "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
916 # dictionary holding all instances this node is secondary for,
917 # grouped by their primary node. Each key is a cluster node, and each
918 # value is a list of instances which have the key as primary and the
919 # current node as secondary. this is handy to calculate N+1 memory
920 # availability if you can only failover from a primary to its
922 "sinst-by-pnode": {},
925 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
931 for instance in instancelist:
932 feedback_fn("* Verifying instance %s" % instance)
933 inst_config = self.cfg.GetInstanceInfo(instance)
934 result = self._VerifyInstance(instance, inst_config, node_volume,
935 node_instance, feedback_fn)
938 inst_config.MapLVsByNode(node_vol_should)
940 instance_cfg[instance] = inst_config
942 pnode = inst_config.primary_node
943 if pnode in node_info:
944 node_info[pnode]['pinst'].append(instance)
946 feedback_fn(" - ERROR: instance %s, connection to primary node"
947 " %s failed" % (instance, pnode))
950 # If the instance is non-redundant we cannot survive losing its primary
951 # node, so we are not N+1 compliant. On the other hand we have no disk
952 # templates with more than one secondary so that situation is not well
954 # FIXME: does not support file-backed instances
955 if len(inst_config.secondary_nodes) == 0:
956 i_non_redundant.append(instance)
957 elif len(inst_config.secondary_nodes) > 1:
958 feedback_fn(" - WARNING: multiple secondaries for instance %s"
961 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
962 i_non_a_balanced.append(instance)
964 for snode in inst_config.secondary_nodes:
965 if snode in node_info:
966 node_info[snode]['sinst'].append(instance)
967 if pnode not in node_info[snode]['sinst-by-pnode']:
968 node_info[snode]['sinst-by-pnode'][pnode] = []
969 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
971 feedback_fn(" - ERROR: instance %s, connection to secondary node"
972 " %s failed" % (instance, snode))
974 feedback_fn("* Verifying orphan volumes")
975 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
979 feedback_fn("* Verifying remaining instances")
980 result = self._VerifyOrphanInstances(instancelist, node_instance,
984 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
985 feedback_fn("* Verifying N+1 Memory redundancy")
986 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
989 feedback_fn("* Other Notes")
991 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
992 % len(i_non_redundant))
995 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
996 % len(i_non_a_balanced))
1000 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1001 """Analize the post-hooks' result
1003 This method analyses the hook result, handles it, and sends some
1004 nicely-formatted feedback back to the user.
1006 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1007 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1008 @param hooks_results: the results of the multi-node hooks rpc call
1009 @param feedback_fn: function used send feedback back to the caller
1010 @param lu_result: previous Exec result
1011 @return: the new Exec result, based on the previous result
1015 # We only really run POST phase hooks, and are only interested in
1017 if phase == constants.HOOKS_PHASE_POST:
1018 # Used to change hooks' output to proper indentation
1019 indent_re = re.compile('^', re.M)
1020 feedback_fn("* Hooks Results")
1021 if not hooks_results:
1022 feedback_fn(" - ERROR: general communication failure")
1025 for node_name in hooks_results:
1026 show_node_header = True
1027 res = hooks_results[node_name]
1028 if res.failed or res.data is False or not isinstance(res.data, list):
1029 feedback_fn(" Communication failure in hooks execution")
1032 for script, hkr, output in res.data:
1033 if hkr == constants.HKR_FAIL:
1034 # The node header is only shown once, if there are
1035 # failing hooks on that node
1036 if show_node_header:
1037 feedback_fn(" Node %s:" % node_name)
1038 show_node_header = False
1039 feedback_fn(" ERROR: Script %s failed, output:" % script)
1040 output = indent_re.sub(' ', output)
1041 feedback_fn("%s" % output)
1047 class LUVerifyDisks(NoHooksLU):
1048 """Verifies the cluster disks status.
1054 def ExpandNames(self):
1055 self.needed_locks = {
1056 locking.LEVEL_NODE: locking.ALL_SET,
1057 locking.LEVEL_INSTANCE: locking.ALL_SET,
1059 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1061 def CheckPrereq(self):
1062 """Check prerequisites.
1064 This has no prerequisites.
1069 def Exec(self, feedback_fn):
1070 """Verify integrity of cluster disks.
1073 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1075 vg_name = self.cfg.GetVGName()
1076 nodes = utils.NiceSort(self.cfg.GetNodeList())
1077 instances = [self.cfg.GetInstanceInfo(name)
1078 for name in self.cfg.GetInstanceList()]
1081 for inst in instances:
1083 if (inst.status != "up" or
1084 inst.disk_template not in constants.DTS_NET_MIRROR):
1086 inst.MapLVsByNode(inst_lvs)
1087 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1088 for node, vol_list in inst_lvs.iteritems():
1089 for vol in vol_list:
1090 nv_dict[(node, vol)] = inst
1095 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1100 lvs = node_lvs[node]
1102 self.LogWarning("Connection to node %s failed: %s" %
1106 if isinstance(lvs, basestring):
1107 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1108 res_nlvm[node] = lvs
1109 elif not isinstance(lvs, dict):
1110 logging.warning("Connection to node %s failed or invalid data"
1112 res_nodes.append(node)
1115 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1116 inst = nv_dict.pop((node, lv_name), None)
1117 if (not lv_online and inst is not None
1118 and inst.name not in res_instances):
1119 res_instances.append(inst.name)
1121 # any leftover items in nv_dict are missing LVs, let's arrange the
1123 for key, inst in nv_dict.iteritems():
1124 if inst.name not in res_missing:
1125 res_missing[inst.name] = []
1126 res_missing[inst.name].append(key)
1131 class LURenameCluster(LogicalUnit):
1132 """Rename the cluster.
1135 HPATH = "cluster-rename"
1136 HTYPE = constants.HTYPE_CLUSTER
1139 def BuildHooksEnv(self):
1144 "OP_TARGET": self.cfg.GetClusterName(),
1145 "NEW_NAME": self.op.name,
1147 mn = self.cfg.GetMasterNode()
1148 return env, [mn], [mn]
1150 def CheckPrereq(self):
1151 """Verify that the passed name is a valid one.
1154 hostname = utils.HostInfo(self.op.name)
1156 new_name = hostname.name
1157 self.ip = new_ip = hostname.ip
1158 old_name = self.cfg.GetClusterName()
1159 old_ip = self.cfg.GetMasterIP()
1160 if new_name == old_name and new_ip == old_ip:
1161 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1162 " cluster has changed")
1163 if new_ip != old_ip:
1164 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1165 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1166 " reachable on the network. Aborting." %
1169 self.op.name = new_name
1171 def Exec(self, feedback_fn):
1172 """Rename the cluster.
1175 clustername = self.op.name
1178 # shutdown the master IP
1179 master = self.cfg.GetMasterNode()
1180 result = self.rpc.call_node_stop_master(master, False)
1181 if result.failed or not result.data:
1182 raise errors.OpExecError("Could not disable the master role")
1185 cluster = self.cfg.GetClusterInfo()
1186 cluster.cluster_name = clustername
1187 cluster.master_ip = ip
1188 self.cfg.Update(cluster)
1190 # update the known hosts file
1191 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1192 node_list = self.cfg.GetNodeList()
1194 node_list.remove(master)
1197 result = self.rpc.call_upload_file(node_list,
1198 constants.SSH_KNOWN_HOSTS_FILE)
1199 for to_node, to_result in result.iteritems():
1200 if to_result.failed or not to_result.data:
1201 logging.error("Copy of file %s to node %s failed", fname, to_node)
1204 result = self.rpc.call_node_start_master(master, False)
1205 if result.failed or not result.data:
1206 self.LogWarning("Could not re-enable the master role on"
1207 " the master, please restart manually.")
1210 def _RecursiveCheckIfLVMBased(disk):
1211 """Check if the given disk or its children are lvm-based.
1213 @type disk: L{objects.Disk}
1214 @param disk: the disk to check
1216 @return: boolean indicating whether a LD_LV dev_type was found or not
1220 for chdisk in disk.children:
1221 if _RecursiveCheckIfLVMBased(chdisk):
1223 return disk.dev_type == constants.LD_LV
1226 class LUSetClusterParams(LogicalUnit):
1227 """Change the parameters of the cluster.
1230 HPATH = "cluster-modify"
1231 HTYPE = constants.HTYPE_CLUSTER
1235 def CheckParameters(self):
1239 if not hasattr(self.op, "candidate_pool_size"):
1240 self.op.candidate_pool_size = None
1241 if self.op.candidate_pool_size is not None:
1243 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1244 except ValueError, err:
1245 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1247 if self.op.candidate_pool_size < 1:
1248 raise errors.OpPrereqError("At least one master candidate needed")
1250 def ExpandNames(self):
1251 # FIXME: in the future maybe other cluster params won't require checking on
1252 # all nodes to be modified.
1253 self.needed_locks = {
1254 locking.LEVEL_NODE: locking.ALL_SET,
1256 self.share_locks[locking.LEVEL_NODE] = 1
1258 def BuildHooksEnv(self):
1263 "OP_TARGET": self.cfg.GetClusterName(),
1264 "NEW_VG_NAME": self.op.vg_name,
1266 mn = self.cfg.GetMasterNode()
1267 return env, [mn], [mn]
1269 def CheckPrereq(self):
1270 """Check prerequisites.
1272 This checks whether the given params don't conflict and
1273 if the given volume group is valid.
1276 # FIXME: This only works because there is only one parameter that can be
1277 # changed or removed.
1278 if self.op.vg_name is not None and not self.op.vg_name:
1279 instances = self.cfg.GetAllInstancesInfo().values()
1280 for inst in instances:
1281 for disk in inst.disks:
1282 if _RecursiveCheckIfLVMBased(disk):
1283 raise errors.OpPrereqError("Cannot disable lvm storage while"
1284 " lvm-based instances exist")
1286 node_list = self.acquired_locks[locking.LEVEL_NODE]
1288 # if vg_name not None, checks given volume group on all nodes
1290 vglist = self.rpc.call_vg_list(node_list)
1291 for node in node_list:
1292 if vglist[node].failed:
1293 # ignoring down node
1294 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1296 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1298 constants.MIN_VG_SIZE)
1300 raise errors.OpPrereqError("Error on node '%s': %s" %
1303 self.cluster = cluster = self.cfg.GetClusterInfo()
1304 # validate beparams changes
1305 if self.op.beparams:
1306 utils.CheckBEParams(self.op.beparams)
1307 self.new_beparams = cluster.FillDict(
1308 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1310 # hypervisor list/parameters
1311 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1312 if self.op.hvparams:
1313 if not isinstance(self.op.hvparams, dict):
1314 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1315 for hv_name, hv_dict in self.op.hvparams.items():
1316 if hv_name not in self.new_hvparams:
1317 self.new_hvparams[hv_name] = hv_dict
1319 self.new_hvparams[hv_name].update(hv_dict)
1321 if self.op.enabled_hypervisors is not None:
1322 self.hv_list = self.op.enabled_hypervisors
1324 self.hv_list = cluster.enabled_hypervisors
1326 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1327 # either the enabled list has changed, or the parameters have, validate
1328 for hv_name, hv_params in self.new_hvparams.items():
1329 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1330 (self.op.enabled_hypervisors and
1331 hv_name in self.op.enabled_hypervisors)):
1332 # either this is a new hypervisor, or its parameters have changed
1333 hv_class = hypervisor.GetHypervisor(hv_name)
1334 hv_class.CheckParameterSyntax(hv_params)
1335 _CheckHVParams(self, node_list, hv_name, hv_params)
1337 def Exec(self, feedback_fn):
1338 """Change the parameters of the cluster.
1341 if self.op.vg_name is not None:
1342 if self.op.vg_name != self.cfg.GetVGName():
1343 self.cfg.SetVGName(self.op.vg_name)
1345 feedback_fn("Cluster LVM configuration already in desired"
1346 " state, not changing")
1347 if self.op.hvparams:
1348 self.cluster.hvparams = self.new_hvparams
1349 if self.op.enabled_hypervisors is not None:
1350 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1351 if self.op.beparams:
1352 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1353 if self.op.candidate_pool_size is not None:
1354 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1356 self.cfg.Update(self.cluster)
1358 # we want to update nodes after the cluster so that if any errors
1359 # happen, we have recorded and saved the cluster info
1360 if self.op.candidate_pool_size is not None:
1361 node_info = self.cfg.GetAllNodesInfo().values()
1362 num_candidates = len([node for node in node_info
1363 if node.master_candidate])
1364 num_nodes = len(node_info)
1365 if num_candidates < self.op.candidate_pool_size:
1366 random.shuffle(node_info)
1367 for node in node_info:
1368 if num_candidates >= self.op.candidate_pool_size:
1370 if node.master_candidate:
1372 node.master_candidate = True
1373 self.LogInfo("Promoting node %s to master candidate", node.name)
1374 self.cfg.Update(node)
1375 self.context.ReaddNode(node)
1377 elif num_candidates > self.op.candidate_pool_size:
1378 self.LogInfo("Note: more nodes are candidates (%d) than the new value"
1379 " of candidate_pool_size (%d)" %
1380 (num_candidates, self.op.candidate_pool_size))
1383 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1384 """Sleep and poll for an instance's disk to sync.
1387 if not instance.disks:
1391 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1393 node = instance.primary_node
1395 for dev in instance.disks:
1396 lu.cfg.SetDiskID(dev, node)
1402 cumul_degraded = False
1403 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1404 if rstats.failed or not rstats.data:
1405 lu.LogWarning("Can't get any data from node %s", node)
1408 raise errors.RemoteError("Can't contact node %s for mirror data,"
1409 " aborting." % node)
1412 rstats = rstats.data
1414 for i in range(len(rstats)):
1417 lu.LogWarning("Can't compute data for node %s/%s",
1418 node, instance.disks[i].iv_name)
1420 # we ignore the ldisk parameter
1421 perc_done, est_time, is_degraded, _ = mstat
1422 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1423 if perc_done is not None:
1425 if est_time is not None:
1426 rem_time = "%d estimated seconds remaining" % est_time
1429 rem_time = "no time estimate"
1430 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1431 (instance.disks[i].iv_name, perc_done, rem_time))
1435 time.sleep(min(60, max_time))
1438 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1439 return not cumul_degraded
1442 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1443 """Check that mirrors are not degraded.
1445 The ldisk parameter, if True, will change the test from the
1446 is_degraded attribute (which represents overall non-ok status for
1447 the device(s)) to the ldisk (representing the local storage status).
1450 lu.cfg.SetDiskID(dev, node)
1457 if on_primary or dev.AssembleOnSecondary():
1458 rstats = lu.rpc.call_blockdev_find(node, dev)
1459 if rstats.failed or not rstats.data:
1460 logging.warning("Node %s: disk degraded, not found or node down", node)
1463 result = result and (not rstats.data[idx])
1465 for child in dev.children:
1466 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1471 class LUDiagnoseOS(NoHooksLU):
1472 """Logical unit for OS diagnose/query.
1475 _OP_REQP = ["output_fields", "names"]
1477 _FIELDS_STATIC = utils.FieldSet()
1478 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1480 def ExpandNames(self):
1482 raise errors.OpPrereqError("Selective OS query not supported")
1484 _CheckOutputFields(static=self._FIELDS_STATIC,
1485 dynamic=self._FIELDS_DYNAMIC,
1486 selected=self.op.output_fields)
1488 # Lock all nodes, in shared mode
1489 self.needed_locks = {}
1490 self.share_locks[locking.LEVEL_NODE] = 1
1491 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1493 def CheckPrereq(self):
1494 """Check prerequisites.
1499 def _DiagnoseByOS(node_list, rlist):
1500 """Remaps a per-node return list into an a per-os per-node dictionary
1502 @param node_list: a list with the names of all nodes
1503 @param rlist: a map with node names as keys and OS objects as values
1506 @returns: a dictionary with osnames as keys and as value another map, with
1507 nodes as keys and list of OS objects as values, eg::
1509 {"debian-etch": {"node1": [<object>,...],
1510 "node2": [<object>,]}
1515 for node_name, nr in rlist.iteritems():
1516 if nr.failed or not nr.data:
1518 for os_obj in nr.data:
1519 if os_obj.name not in all_os:
1520 # build a list of nodes for this os containing empty lists
1521 # for each node in node_list
1522 all_os[os_obj.name] = {}
1523 for nname in node_list:
1524 all_os[os_obj.name][nname] = []
1525 all_os[os_obj.name][node_name].append(os_obj)
1528 def Exec(self, feedback_fn):
1529 """Compute the list of OSes.
1532 node_list = self.acquired_locks[locking.LEVEL_NODE]
1533 node_data = self.rpc.call_os_diagnose(node_list)
1534 if node_data == False:
1535 raise errors.OpExecError("Can't gather the list of OSes")
1536 pol = self._DiagnoseByOS(node_list, node_data)
1538 for os_name, os_data in pol.iteritems():
1540 for field in self.op.output_fields:
1543 elif field == "valid":
1544 val = utils.all([osl and osl[0] for osl in os_data.values()])
1545 elif field == "node_status":
1547 for node_name, nos_list in os_data.iteritems():
1548 val[node_name] = [(v.status, v.path) for v in nos_list]
1550 raise errors.ParameterError(field)
1557 class LURemoveNode(LogicalUnit):
1558 """Logical unit for removing a node.
1561 HPATH = "node-remove"
1562 HTYPE = constants.HTYPE_NODE
1563 _OP_REQP = ["node_name"]
1565 def BuildHooksEnv(self):
1568 This doesn't run on the target node in the pre phase as a failed
1569 node would then be impossible to remove.
1573 "OP_TARGET": self.op.node_name,
1574 "NODE_NAME": self.op.node_name,
1576 all_nodes = self.cfg.GetNodeList()
1577 all_nodes.remove(self.op.node_name)
1578 return env, all_nodes, all_nodes
1580 def CheckPrereq(self):
1581 """Check prerequisites.
1584 - the node exists in the configuration
1585 - it does not have primary or secondary instances
1586 - it's not the master
1588 Any errors are signalled by raising errors.OpPrereqError.
1591 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1593 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1595 instance_list = self.cfg.GetInstanceList()
1597 masternode = self.cfg.GetMasterNode()
1598 if node.name == masternode:
1599 raise errors.OpPrereqError("Node is the master node,"
1600 " you need to failover first.")
1602 for instance_name in instance_list:
1603 instance = self.cfg.GetInstanceInfo(instance_name)
1604 if node.name == instance.primary_node:
1605 raise errors.OpPrereqError("Instance %s still running on the node,"
1606 " please remove first." % instance_name)
1607 if node.name in instance.secondary_nodes:
1608 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1609 " please remove first." % instance_name)
1610 self.op.node_name = node.name
1613 def Exec(self, feedback_fn):
1614 """Removes the node from the cluster.
1618 logging.info("Stopping the node daemon and removing configs from node %s",
1621 self.context.RemoveNode(node.name)
1623 self.rpc.call_node_leave_cluster(node.name)
1625 # Promote nodes to master candidate as needed
1626 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1627 node_info = self.cfg.GetAllNodesInfo().values()
1628 num_candidates = len([n for n in node_info
1629 if n.master_candidate])
1630 num_nodes = len(node_info)
1631 random.shuffle(node_info)
1632 for node in node_info:
1633 if num_candidates >= cp_size or num_candidates >= num_nodes:
1635 if node.master_candidate:
1637 node.master_candidate = True
1638 self.LogInfo("Promoting node %s to master candidate", node.name)
1639 self.cfg.Update(node)
1640 self.context.ReaddNode(node)
1644 class LUQueryNodes(NoHooksLU):
1645 """Logical unit for querying nodes.
1648 _OP_REQP = ["output_fields", "names"]
1650 _FIELDS_DYNAMIC = utils.FieldSet(
1652 "mtotal", "mnode", "mfree",
1657 _FIELDS_STATIC = utils.FieldSet(
1658 "name", "pinst_cnt", "sinst_cnt",
1659 "pinst_list", "sinst_list",
1660 "pip", "sip", "tags",
1667 def ExpandNames(self):
1668 _CheckOutputFields(static=self._FIELDS_STATIC,
1669 dynamic=self._FIELDS_DYNAMIC,
1670 selected=self.op.output_fields)
1672 self.needed_locks = {}
1673 self.share_locks[locking.LEVEL_NODE] = 1
1676 self.wanted = _GetWantedNodes(self, self.op.names)
1678 self.wanted = locking.ALL_SET
1680 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1682 # if we don't request only static fields, we need to lock the nodes
1683 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1686 def CheckPrereq(self):
1687 """Check prerequisites.
1690 # The validation of the node list is done in the _GetWantedNodes,
1691 # if non empty, and if empty, there's no validation to do
1694 def Exec(self, feedback_fn):
1695 """Computes the list of nodes and their attributes.
1698 all_info = self.cfg.GetAllNodesInfo()
1700 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1701 elif self.wanted != locking.ALL_SET:
1702 nodenames = self.wanted
1703 missing = set(nodenames).difference(all_info.keys())
1705 raise errors.OpExecError(
1706 "Some nodes were removed before retrieving their data: %s" % missing)
1708 nodenames = all_info.keys()
1710 nodenames = utils.NiceSort(nodenames)
1711 nodelist = [all_info[name] for name in nodenames]
1713 # begin data gathering
1717 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1718 self.cfg.GetHypervisorType())
1719 for name in nodenames:
1720 nodeinfo = node_data[name]
1721 if not nodeinfo.failed and nodeinfo.data:
1722 nodeinfo = nodeinfo.data
1723 fn = utils.TryConvert
1725 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1726 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1727 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1728 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1729 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1730 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1731 "bootid": nodeinfo.get('bootid', None),
1734 live_data[name] = {}
1736 live_data = dict.fromkeys(nodenames, {})
1738 node_to_primary = dict([(name, set()) for name in nodenames])
1739 node_to_secondary = dict([(name, set()) for name in nodenames])
1741 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1742 "sinst_cnt", "sinst_list"))
1743 if inst_fields & frozenset(self.op.output_fields):
1744 instancelist = self.cfg.GetInstanceList()
1746 for instance_name in instancelist:
1747 inst = self.cfg.GetInstanceInfo(instance_name)
1748 if inst.primary_node in node_to_primary:
1749 node_to_primary[inst.primary_node].add(inst.name)
1750 for secnode in inst.secondary_nodes:
1751 if secnode in node_to_secondary:
1752 node_to_secondary[secnode].add(inst.name)
1754 master_node = self.cfg.GetMasterNode()
1756 # end data gathering
1759 for node in nodelist:
1761 for field in self.op.output_fields:
1764 elif field == "pinst_list":
1765 val = list(node_to_primary[node.name])
1766 elif field == "sinst_list":
1767 val = list(node_to_secondary[node.name])
1768 elif field == "pinst_cnt":
1769 val = len(node_to_primary[node.name])
1770 elif field == "sinst_cnt":
1771 val = len(node_to_secondary[node.name])
1772 elif field == "pip":
1773 val = node.primary_ip
1774 elif field == "sip":
1775 val = node.secondary_ip
1776 elif field == "tags":
1777 val = list(node.GetTags())
1778 elif field == "serial_no":
1779 val = node.serial_no
1780 elif field == "master_candidate":
1781 val = node.master_candidate
1782 elif field == "master":
1783 val = node.name == master_node
1784 elif field == "offline":
1786 elif self._FIELDS_DYNAMIC.Matches(field):
1787 val = live_data[node.name].get(field, None)
1789 raise errors.ParameterError(field)
1790 node_output.append(val)
1791 output.append(node_output)
1796 class LUQueryNodeVolumes(NoHooksLU):
1797 """Logical unit for getting volumes on node(s).
1800 _OP_REQP = ["nodes", "output_fields"]
1802 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1803 _FIELDS_STATIC = utils.FieldSet("node")
1805 def ExpandNames(self):
1806 _CheckOutputFields(static=self._FIELDS_STATIC,
1807 dynamic=self._FIELDS_DYNAMIC,
1808 selected=self.op.output_fields)
1810 self.needed_locks = {}
1811 self.share_locks[locking.LEVEL_NODE] = 1
1812 if not self.op.nodes:
1813 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1815 self.needed_locks[locking.LEVEL_NODE] = \
1816 _GetWantedNodes(self, self.op.nodes)
1818 def CheckPrereq(self):
1819 """Check prerequisites.
1821 This checks that the fields required are valid output fields.
1824 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1826 def Exec(self, feedback_fn):
1827 """Computes the list of nodes and their attributes.
1830 nodenames = self.nodes
1831 volumes = self.rpc.call_node_volumes(nodenames)
1833 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1834 in self.cfg.GetInstanceList()]
1836 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1839 for node in nodenames:
1840 if node not in volumes or volumes[node].failed or not volumes[node].data:
1843 node_vols = volumes[node].data[:]
1844 node_vols.sort(key=lambda vol: vol['dev'])
1846 for vol in node_vols:
1848 for field in self.op.output_fields:
1851 elif field == "phys":
1855 elif field == "name":
1857 elif field == "size":
1858 val = int(float(vol['size']))
1859 elif field == "instance":
1861 if node not in lv_by_node[inst]:
1863 if vol['name'] in lv_by_node[inst][node]:
1869 raise errors.ParameterError(field)
1870 node_output.append(str(val))
1872 output.append(node_output)
1877 class LUAddNode(LogicalUnit):
1878 """Logical unit for adding node to the cluster.
1882 HTYPE = constants.HTYPE_NODE
1883 _OP_REQP = ["node_name"]
1885 def BuildHooksEnv(self):
1888 This will run on all nodes before, and on all nodes + the new node after.
1892 "OP_TARGET": self.op.node_name,
1893 "NODE_NAME": self.op.node_name,
1894 "NODE_PIP": self.op.primary_ip,
1895 "NODE_SIP": self.op.secondary_ip,
1897 nodes_0 = self.cfg.GetNodeList()
1898 nodes_1 = nodes_0 + [self.op.node_name, ]
1899 return env, nodes_0, nodes_1
1901 def CheckPrereq(self):
1902 """Check prerequisites.
1905 - the new node is not already in the config
1907 - its parameters (single/dual homed) matches the cluster
1909 Any errors are signalled by raising errors.OpPrereqError.
1912 node_name = self.op.node_name
1915 dns_data = utils.HostInfo(node_name)
1917 node = dns_data.name
1918 primary_ip = self.op.primary_ip = dns_data.ip
1919 secondary_ip = getattr(self.op, "secondary_ip", None)
1920 if secondary_ip is None:
1921 secondary_ip = primary_ip
1922 if not utils.IsValidIP(secondary_ip):
1923 raise errors.OpPrereqError("Invalid secondary IP given")
1924 self.op.secondary_ip = secondary_ip
1926 node_list = cfg.GetNodeList()
1927 if not self.op.readd and node in node_list:
1928 raise errors.OpPrereqError("Node %s is already in the configuration" %
1930 elif self.op.readd and node not in node_list:
1931 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1933 for existing_node_name in node_list:
1934 existing_node = cfg.GetNodeInfo(existing_node_name)
1936 if self.op.readd and node == existing_node_name:
1937 if (existing_node.primary_ip != primary_ip or
1938 existing_node.secondary_ip != secondary_ip):
1939 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1940 " address configuration as before")
1943 if (existing_node.primary_ip == primary_ip or
1944 existing_node.secondary_ip == primary_ip or
1945 existing_node.primary_ip == secondary_ip or
1946 existing_node.secondary_ip == secondary_ip):
1947 raise errors.OpPrereqError("New node ip address(es) conflict with"
1948 " existing node %s" % existing_node.name)
1950 # check that the type of the node (single versus dual homed) is the
1951 # same as for the master
1952 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1953 master_singlehomed = myself.secondary_ip == myself.primary_ip
1954 newbie_singlehomed = secondary_ip == primary_ip
1955 if master_singlehomed != newbie_singlehomed:
1956 if master_singlehomed:
1957 raise errors.OpPrereqError("The master has no private ip but the"
1958 " new node has one")
1960 raise errors.OpPrereqError("The master has a private ip but the"
1961 " new node doesn't have one")
1963 # checks reachablity
1964 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1965 raise errors.OpPrereqError("Node not reachable by ping")
1967 if not newbie_singlehomed:
1968 # check reachability from my secondary ip to newbie's secondary ip
1969 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1970 source=myself.secondary_ip):
1971 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1972 " based ping to noded port")
1974 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
1975 node_info = self.cfg.GetAllNodesInfo().values()
1976 num_candidates = len([n for n in node_info
1977 if n.master_candidate])
1978 master_candidate = num_candidates < cp_size
1980 self.new_node = objects.Node(name=node,
1981 primary_ip=primary_ip,
1982 secondary_ip=secondary_ip,
1983 master_candidate=master_candidate,
1986 def Exec(self, feedback_fn):
1987 """Adds the new node to the cluster.
1990 new_node = self.new_node
1991 node = new_node.name
1993 # check connectivity
1994 result = self.rpc.call_version([node])[node]
1997 if constants.PROTOCOL_VERSION == result.data:
1998 logging.info("Communication to node %s fine, sw version %s match",
2001 raise errors.OpExecError("Version mismatch master version %s,"
2002 " node version %s" %
2003 (constants.PROTOCOL_VERSION, result.data))
2005 raise errors.OpExecError("Cannot get version from the new node")
2008 logging.info("Copy ssh key to node %s", node)
2009 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2011 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2012 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2018 keyarray.append(f.read())
2022 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2024 keyarray[3], keyarray[4], keyarray[5])
2026 if result.failed or not result.data:
2027 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
2029 # Add node to our /etc/hosts, and add key to known_hosts
2030 utils.AddHostToEtcHosts(new_node.name)
2032 if new_node.secondary_ip != new_node.primary_ip:
2033 result = self.rpc.call_node_has_ip_address(new_node.name,
2034 new_node.secondary_ip)
2035 if result.failed or not result.data:
2036 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2037 " you gave (%s). Please fix and re-run this"
2038 " command." % new_node.secondary_ip)
2040 node_verify_list = [self.cfg.GetMasterNode()]
2041 node_verify_param = {
2043 # TODO: do a node-net-test as well?
2046 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2047 self.cfg.GetClusterName())
2048 for verifier in node_verify_list:
2049 if result[verifier].failed or not result[verifier].data:
2050 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2051 " for remote verification" % verifier)
2052 if result[verifier].data['nodelist']:
2053 for failed in result[verifier].data['nodelist']:
2054 feedback_fn("ssh/hostname verification failed %s -> %s" %
2055 (verifier, result[verifier]['nodelist'][failed]))
2056 raise errors.OpExecError("ssh/hostname verification failed.")
2058 # Distribute updated /etc/hosts and known_hosts to all nodes,
2059 # including the node just added
2060 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2061 dist_nodes = self.cfg.GetNodeList()
2062 if not self.op.readd:
2063 dist_nodes.append(node)
2064 if myself.name in dist_nodes:
2065 dist_nodes.remove(myself.name)
2067 logging.debug("Copying hosts and known_hosts to all nodes")
2068 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2069 result = self.rpc.call_upload_file(dist_nodes, fname)
2070 for to_node, to_result in result.iteritems():
2071 if to_result.failed or not to_result.data:
2072 logging.error("Copy of file %s to node %s failed", fname, to_node)
2075 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
2076 to_copy.append(constants.VNC_PASSWORD_FILE)
2077 for fname in to_copy:
2078 result = self.rpc.call_upload_file([node], fname)
2079 if result[node].failed or not result[node]:
2080 logging.error("Could not copy file %s to node %s", fname, node)
2083 self.context.ReaddNode(new_node)
2085 self.context.AddNode(new_node)
2088 class LUSetNodeParams(LogicalUnit):
2089 """Modifies the parameters of a node.
2092 HPATH = "node-modify"
2093 HTYPE = constants.HTYPE_NODE
2094 _OP_REQP = ["node_name"]
2097 def CheckArguments(self):
2098 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2099 if node_name is None:
2100 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2101 self.op.node_name = node_name
2102 if not hasattr(self.op, 'master_candidate'):
2103 raise errors.OpPrereqError("Please pass at least one modification")
2104 self.op.master_candidate = bool(self.op.master_candidate)
2106 def ExpandNames(self):
2107 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2109 def BuildHooksEnv(self):
2112 This runs on the master node.
2116 "OP_TARGET": self.op.node_name,
2117 "MASTER_CANDIDATE": str(self.op.master_candidate),
2119 nl = [self.cfg.GetMasterNode(),
2123 def CheckPrereq(self):
2124 """Check prerequisites.
2126 This only checks the instance list against the existing names.
2129 force = self.force = self.op.force
2131 if self.op.master_candidate == False:
2132 if self.op.node_name == self.cfg.GetMasterNode():
2133 raise errors.OpPrereqError("The master node has to be a"
2134 " master candidate")
2135 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2136 node_info = self.cfg.GetAllNodesInfo().values()
2137 num_candidates = len([node for node in node_info
2138 if node.master_candidate])
2139 if num_candidates <= cp_size:
2140 msg = ("Not enough master candidates (desired"
2141 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2143 self.LogWarning(msg)
2145 raise errors.OpPrereqError(msg)
2149 def Exec(self, feedback_fn):
2153 node = self.cfg.GetNodeInfo(self.op.node_name)
2157 if self.op.master_candidate is not None:
2158 node.master_candidate = self.op.master_candidate
2159 result.append(("master_candidate", str(self.op.master_candidate)))
2160 if self.op.master_candidate == False:
2161 rrc = self.rpc.call_node_demote_from_mc(node.name)
2162 if (rrc.failed or not isinstance(rrc.data, (tuple, list))
2163 or len(rrc.data) != 2):
2164 self.LogWarning("Node rpc error: %s" % rrc.error)
2165 elif not rrc.data[0]:
2166 self.LogWarning("Node failed to demote itself: %s" % rrc.data[1])
2168 # this will trigger configuration file update, if needed
2169 self.cfg.Update(node)
2170 # this will trigger job queue propagation or cleanup
2171 if self.op.node_name != self.cfg.GetMasterNode():
2172 self.context.ReaddNode(node)
2177 class LUQueryClusterInfo(NoHooksLU):
2178 """Query cluster configuration.
2184 def ExpandNames(self):
2185 self.needed_locks = {}
2187 def CheckPrereq(self):
2188 """No prerequsites needed for this LU.
2193 def Exec(self, feedback_fn):
2194 """Return cluster config.
2197 cluster = self.cfg.GetClusterInfo()
2199 "software_version": constants.RELEASE_VERSION,
2200 "protocol_version": constants.PROTOCOL_VERSION,
2201 "config_version": constants.CONFIG_VERSION,
2202 "os_api_version": constants.OS_API_VERSION,
2203 "export_version": constants.EXPORT_VERSION,
2204 "architecture": (platform.architecture()[0], platform.machine()),
2205 "name": cluster.cluster_name,
2206 "master": cluster.master_node,
2207 "default_hypervisor": cluster.default_hypervisor,
2208 "enabled_hypervisors": cluster.enabled_hypervisors,
2209 "hvparams": cluster.hvparams,
2210 "beparams": cluster.beparams,
2211 "candidate_pool_size": cluster.candidate_pool_size,
2217 class LUQueryConfigValues(NoHooksLU):
2218 """Return configuration values.
2223 _FIELDS_DYNAMIC = utils.FieldSet()
2224 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2226 def ExpandNames(self):
2227 self.needed_locks = {}
2229 _CheckOutputFields(static=self._FIELDS_STATIC,
2230 dynamic=self._FIELDS_DYNAMIC,
2231 selected=self.op.output_fields)
2233 def CheckPrereq(self):
2234 """No prerequisites.
2239 def Exec(self, feedback_fn):
2240 """Dump a representation of the cluster config to the standard output.
2244 for field in self.op.output_fields:
2245 if field == "cluster_name":
2246 entry = self.cfg.GetClusterName()
2247 elif field == "master_node":
2248 entry = self.cfg.GetMasterNode()
2249 elif field == "drain_flag":
2250 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2252 raise errors.ParameterError(field)
2253 values.append(entry)
2257 class LUActivateInstanceDisks(NoHooksLU):
2258 """Bring up an instance's disks.
2261 _OP_REQP = ["instance_name"]
2264 def ExpandNames(self):
2265 self._ExpandAndLockInstance()
2266 self.needed_locks[locking.LEVEL_NODE] = []
2267 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2269 def DeclareLocks(self, level):
2270 if level == locking.LEVEL_NODE:
2271 self._LockInstancesNodes()
2273 def CheckPrereq(self):
2274 """Check prerequisites.
2276 This checks that the instance is in the cluster.
2279 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2280 assert self.instance is not None, \
2281 "Cannot retrieve locked instance %s" % self.op.instance_name
2283 def Exec(self, feedback_fn):
2284 """Activate the disks.
2287 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2289 raise errors.OpExecError("Cannot activate block devices")
2294 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2295 """Prepare the block devices for an instance.
2297 This sets up the block devices on all nodes.
2299 @type lu: L{LogicalUnit}
2300 @param lu: the logical unit on whose behalf we execute
2301 @type instance: L{objects.Instance}
2302 @param instance: the instance for whose disks we assemble
2303 @type ignore_secondaries: boolean
2304 @param ignore_secondaries: if true, errors on secondary nodes
2305 won't result in an error return from the function
2306 @return: False if the operation failed, otherwise a list of
2307 (host, instance_visible_name, node_visible_name)
2308 with the mapping from node devices to instance devices
2313 iname = instance.name
2314 # With the two passes mechanism we try to reduce the window of
2315 # opportunity for the race condition of switching DRBD to primary
2316 # before handshaking occured, but we do not eliminate it
2318 # The proper fix would be to wait (with some limits) until the
2319 # connection has been made and drbd transitions from WFConnection
2320 # into any other network-connected state (Connected, SyncTarget,
2323 # 1st pass, assemble on all nodes in secondary mode
2324 for inst_disk in instance.disks:
2325 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2326 lu.cfg.SetDiskID(node_disk, node)
2327 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2328 if result.failed or not result:
2329 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2330 " (is_primary=False, pass=1)",
2331 inst_disk.iv_name, node)
2332 if not ignore_secondaries:
2335 # FIXME: race condition on drbd migration to primary
2337 # 2nd pass, do only the primary node
2338 for inst_disk in instance.disks:
2339 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2340 if node != instance.primary_node:
2342 lu.cfg.SetDiskID(node_disk, node)
2343 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2344 if result.failed or not result:
2345 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2346 " (is_primary=True, pass=2)",
2347 inst_disk.iv_name, node)
2349 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2351 # leave the disks configured for the primary node
2352 # this is a workaround that would be fixed better by
2353 # improving the logical/physical id handling
2354 for disk in instance.disks:
2355 lu.cfg.SetDiskID(disk, instance.primary_node)
2357 return disks_ok, device_info
2360 def _StartInstanceDisks(lu, instance, force):
2361 """Start the disks of an instance.
2364 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2365 ignore_secondaries=force)
2367 _ShutdownInstanceDisks(lu, instance)
2368 if force is not None and not force:
2369 lu.proc.LogWarning("", hint="If the message above refers to a"
2371 " you can retry the operation using '--force'.")
2372 raise errors.OpExecError("Disk consistency error")
2375 class LUDeactivateInstanceDisks(NoHooksLU):
2376 """Shutdown an instance's disks.
2379 _OP_REQP = ["instance_name"]
2382 def ExpandNames(self):
2383 self._ExpandAndLockInstance()
2384 self.needed_locks[locking.LEVEL_NODE] = []
2385 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2387 def DeclareLocks(self, level):
2388 if level == locking.LEVEL_NODE:
2389 self._LockInstancesNodes()
2391 def CheckPrereq(self):
2392 """Check prerequisites.
2394 This checks that the instance is in the cluster.
2397 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2398 assert self.instance is not None, \
2399 "Cannot retrieve locked instance %s" % self.op.instance_name
2401 def Exec(self, feedback_fn):
2402 """Deactivate the disks
2405 instance = self.instance
2406 _SafeShutdownInstanceDisks(self, instance)
2409 def _SafeShutdownInstanceDisks(lu, instance):
2410 """Shutdown block devices of an instance.
2412 This function checks if an instance is running, before calling
2413 _ShutdownInstanceDisks.
2416 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2417 [instance.hypervisor])
2418 ins_l = ins_l[instance.primary_node]
2419 if ins_l.failed or not isinstance(ins_l.data, list):
2420 raise errors.OpExecError("Can't contact node '%s'" %
2421 instance.primary_node)
2423 if instance.name in ins_l.data:
2424 raise errors.OpExecError("Instance is running, can't shutdown"
2427 _ShutdownInstanceDisks(lu, instance)
2430 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2431 """Shutdown block devices of an instance.
2433 This does the shutdown on all nodes of the instance.
2435 If the ignore_primary is false, errors on the primary node are
2440 for disk in instance.disks:
2441 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2442 lu.cfg.SetDiskID(top_disk, node)
2443 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2444 if result.failed or not result.data:
2445 logging.error("Could not shutdown block device %s on node %s",
2447 if not ignore_primary or node != instance.primary_node:
2452 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2453 """Checks if a node has enough free memory.
2455 This function check if a given node has the needed amount of free
2456 memory. In case the node has less memory or we cannot get the
2457 information from the node, this function raise an OpPrereqError
2460 @type lu: C{LogicalUnit}
2461 @param lu: a logical unit from which we get configuration data
2463 @param node: the node to check
2464 @type reason: C{str}
2465 @param reason: string to use in the error message
2466 @type requested: C{int}
2467 @param requested: the amount of memory in MiB to check for
2468 @type hypervisor: C{str}
2469 @param hypervisor: the hypervisor to ask for memory stats
2470 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2471 we cannot check the node
2474 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2475 nodeinfo[node].Raise()
2476 free_mem = nodeinfo[node].data.get('memory_free')
2477 if not isinstance(free_mem, int):
2478 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2479 " was '%s'" % (node, free_mem))
2480 if requested > free_mem:
2481 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2482 " needed %s MiB, available %s MiB" %
2483 (node, reason, requested, free_mem))
2486 class LUStartupInstance(LogicalUnit):
2487 """Starts an instance.
2490 HPATH = "instance-start"
2491 HTYPE = constants.HTYPE_INSTANCE
2492 _OP_REQP = ["instance_name", "force"]
2495 def ExpandNames(self):
2496 self._ExpandAndLockInstance()
2498 def BuildHooksEnv(self):
2501 This runs on master, primary and secondary nodes of the instance.
2505 "FORCE": self.op.force,
2507 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2508 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2509 list(self.instance.secondary_nodes))
2512 def CheckPrereq(self):
2513 """Check prerequisites.
2515 This checks that the instance is in the cluster.
2518 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2519 assert self.instance is not None, \
2520 "Cannot retrieve locked instance %s" % self.op.instance_name
2522 bep = self.cfg.GetClusterInfo().FillBE(instance)
2523 # check bridges existance
2524 _CheckInstanceBridgesExist(self, instance)
2526 _CheckNodeFreeMemory(self, instance.primary_node,
2527 "starting instance %s" % instance.name,
2528 bep[constants.BE_MEMORY], instance.hypervisor)
2530 def Exec(self, feedback_fn):
2531 """Start the instance.
2534 instance = self.instance
2535 force = self.op.force
2536 extra_args = getattr(self.op, "extra_args", "")
2538 self.cfg.MarkInstanceUp(instance.name)
2540 node_current = instance.primary_node
2542 _StartInstanceDisks(self, instance, force)
2544 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2545 if result.failed or not result.data:
2546 _ShutdownInstanceDisks(self, instance)
2547 raise errors.OpExecError("Could not start instance")
2550 class LURebootInstance(LogicalUnit):
2551 """Reboot an instance.
2554 HPATH = "instance-reboot"
2555 HTYPE = constants.HTYPE_INSTANCE
2556 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2559 def ExpandNames(self):
2560 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2561 constants.INSTANCE_REBOOT_HARD,
2562 constants.INSTANCE_REBOOT_FULL]:
2563 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2564 (constants.INSTANCE_REBOOT_SOFT,
2565 constants.INSTANCE_REBOOT_HARD,
2566 constants.INSTANCE_REBOOT_FULL))
2567 self._ExpandAndLockInstance()
2569 def BuildHooksEnv(self):
2572 This runs on master, primary and secondary nodes of the instance.
2576 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2578 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2579 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2580 list(self.instance.secondary_nodes))
2583 def CheckPrereq(self):
2584 """Check prerequisites.
2586 This checks that the instance is in the cluster.
2589 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2590 assert self.instance is not None, \
2591 "Cannot retrieve locked instance %s" % self.op.instance_name
2593 # check bridges existance
2594 _CheckInstanceBridgesExist(self, instance)
2596 def Exec(self, feedback_fn):
2597 """Reboot the instance.
2600 instance = self.instance
2601 ignore_secondaries = self.op.ignore_secondaries
2602 reboot_type = self.op.reboot_type
2603 extra_args = getattr(self.op, "extra_args", "")
2605 node_current = instance.primary_node
2607 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2608 constants.INSTANCE_REBOOT_HARD]:
2609 result = self.rpc.call_instance_reboot(node_current, instance,
2610 reboot_type, extra_args)
2611 if result.failed or not result.data:
2612 raise errors.OpExecError("Could not reboot instance")
2614 if not self.rpc.call_instance_shutdown(node_current, instance):
2615 raise errors.OpExecError("could not shutdown instance for full reboot")
2616 _ShutdownInstanceDisks(self, instance)
2617 _StartInstanceDisks(self, instance, ignore_secondaries)
2618 result = self.rpc.call_instance_start(node_current, instance, extra_args)
2619 if result.failed or not result.data:
2620 _ShutdownInstanceDisks(self, instance)
2621 raise errors.OpExecError("Could not start instance for full reboot")
2623 self.cfg.MarkInstanceUp(instance.name)
2626 class LUShutdownInstance(LogicalUnit):
2627 """Shutdown an instance.
2630 HPATH = "instance-stop"
2631 HTYPE = constants.HTYPE_INSTANCE
2632 _OP_REQP = ["instance_name"]
2635 def ExpandNames(self):
2636 self._ExpandAndLockInstance()
2638 def BuildHooksEnv(self):
2641 This runs on master, primary and secondary nodes of the instance.
2644 env = _BuildInstanceHookEnvByObject(self, self.instance)
2645 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2646 list(self.instance.secondary_nodes))
2649 def CheckPrereq(self):
2650 """Check prerequisites.
2652 This checks that the instance is in the cluster.
2655 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2656 assert self.instance is not None, \
2657 "Cannot retrieve locked instance %s" % self.op.instance_name
2659 def Exec(self, feedback_fn):
2660 """Shutdown the instance.
2663 instance = self.instance
2664 node_current = instance.primary_node
2665 self.cfg.MarkInstanceDown(instance.name)
2666 result = self.rpc.call_instance_shutdown(node_current, instance)
2667 if result.failed or not result.data:
2668 self.proc.LogWarning("Could not shutdown instance")
2670 _ShutdownInstanceDisks(self, instance)
2673 class LUReinstallInstance(LogicalUnit):
2674 """Reinstall an instance.
2677 HPATH = "instance-reinstall"
2678 HTYPE = constants.HTYPE_INSTANCE
2679 _OP_REQP = ["instance_name"]
2682 def ExpandNames(self):
2683 self._ExpandAndLockInstance()
2685 def BuildHooksEnv(self):
2688 This runs on master, primary and secondary nodes of the instance.
2691 env = _BuildInstanceHookEnvByObject(self, self.instance)
2692 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2693 list(self.instance.secondary_nodes))
2696 def CheckPrereq(self):
2697 """Check prerequisites.
2699 This checks that the instance is in the cluster and is not running.
2702 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2703 assert instance is not None, \
2704 "Cannot retrieve locked instance %s" % self.op.instance_name
2706 if instance.disk_template == constants.DT_DISKLESS:
2707 raise errors.OpPrereqError("Instance '%s' has no disks" %
2708 self.op.instance_name)
2709 if instance.status != "down":
2710 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2711 self.op.instance_name)
2712 remote_info = self.rpc.call_instance_info(instance.primary_node,
2714 instance.hypervisor)
2715 if remote_info.failed or remote_info.data:
2716 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2717 (self.op.instance_name,
2718 instance.primary_node))
2720 self.op.os_type = getattr(self.op, "os_type", None)
2721 if self.op.os_type is not None:
2723 pnode = self.cfg.GetNodeInfo(
2724 self.cfg.ExpandNodeName(instance.primary_node))
2726 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2728 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2730 if not isinstance(result.data, objects.OS):
2731 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2732 " primary node" % self.op.os_type)
2734 self.instance = instance
2736 def Exec(self, feedback_fn):
2737 """Reinstall the instance.
2740 inst = self.instance
2742 if self.op.os_type is not None:
2743 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2744 inst.os = self.op.os_type
2745 self.cfg.Update(inst)
2747 _StartInstanceDisks(self, inst, None)
2749 feedback_fn("Running the instance OS create scripts...")
2750 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2753 raise errors.OpExecError("Could not install OS for instance %s"
2755 (inst.name, inst.primary_node))
2757 _ShutdownInstanceDisks(self, inst)
2760 class LURenameInstance(LogicalUnit):
2761 """Rename an instance.
2764 HPATH = "instance-rename"
2765 HTYPE = constants.HTYPE_INSTANCE
2766 _OP_REQP = ["instance_name", "new_name"]
2768 def BuildHooksEnv(self):
2771 This runs on master, primary and secondary nodes of the instance.
2774 env = _BuildInstanceHookEnvByObject(self, self.instance)
2775 env["INSTANCE_NEW_NAME"] = self.op.new_name
2776 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2777 list(self.instance.secondary_nodes))
2780 def CheckPrereq(self):
2781 """Check prerequisites.
2783 This checks that the instance is in the cluster and is not running.
2786 instance = self.cfg.GetInstanceInfo(
2787 self.cfg.ExpandInstanceName(self.op.instance_name))
2788 if instance is None:
2789 raise errors.OpPrereqError("Instance '%s' not known" %
2790 self.op.instance_name)
2791 if instance.status != "down":
2792 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2793 self.op.instance_name)
2794 remote_info = self.rpc.call_instance_info(instance.primary_node,
2796 instance.hypervisor)
2798 if remote_info.data:
2799 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2800 (self.op.instance_name,
2801 instance.primary_node))
2802 self.instance = instance
2804 # new name verification
2805 name_info = utils.HostInfo(self.op.new_name)
2807 self.op.new_name = new_name = name_info.name
2808 instance_list = self.cfg.GetInstanceList()
2809 if new_name in instance_list:
2810 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2813 if not getattr(self.op, "ignore_ip", False):
2814 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2815 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2816 (name_info.ip, new_name))
2819 def Exec(self, feedback_fn):
2820 """Reinstall the instance.
2823 inst = self.instance
2824 old_name = inst.name
2826 if inst.disk_template == constants.DT_FILE:
2827 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2829 self.cfg.RenameInstance(inst.name, self.op.new_name)
2830 # Change the instance lock. This is definitely safe while we hold the BGL
2831 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2832 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2834 # re-read the instance from the configuration after rename
2835 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2837 if inst.disk_template == constants.DT_FILE:
2838 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2839 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2840 old_file_storage_dir,
2841 new_file_storage_dir)
2844 raise errors.OpExecError("Could not connect to node '%s' to rename"
2845 " directory '%s' to '%s' (but the instance"
2846 " has been renamed in Ganeti)" % (
2847 inst.primary_node, old_file_storage_dir,
2848 new_file_storage_dir))
2850 if not result.data[0]:
2851 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2852 " (but the instance has been renamed in"
2853 " Ganeti)" % (old_file_storage_dir,
2854 new_file_storage_dir))
2856 _StartInstanceDisks(self, inst, None)
2858 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2860 if result.failed or not result.data:
2861 msg = ("Could not run OS rename script for instance %s on node %s"
2862 " (but the instance has been renamed in Ganeti)" %
2863 (inst.name, inst.primary_node))
2864 self.proc.LogWarning(msg)
2866 _ShutdownInstanceDisks(self, inst)
2869 class LURemoveInstance(LogicalUnit):
2870 """Remove an instance.
2873 HPATH = "instance-remove"
2874 HTYPE = constants.HTYPE_INSTANCE
2875 _OP_REQP = ["instance_name", "ignore_failures"]
2878 def ExpandNames(self):
2879 self._ExpandAndLockInstance()
2880 self.needed_locks[locking.LEVEL_NODE] = []
2881 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2883 def DeclareLocks(self, level):
2884 if level == locking.LEVEL_NODE:
2885 self._LockInstancesNodes()
2887 def BuildHooksEnv(self):
2890 This runs on master, primary and secondary nodes of the instance.
2893 env = _BuildInstanceHookEnvByObject(self, self.instance)
2894 nl = [self.cfg.GetMasterNode()]
2897 def CheckPrereq(self):
2898 """Check prerequisites.
2900 This checks that the instance is in the cluster.
2903 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2904 assert self.instance is not None, \
2905 "Cannot retrieve locked instance %s" % self.op.instance_name
2907 def Exec(self, feedback_fn):
2908 """Remove the instance.
2911 instance = self.instance
2912 logging.info("Shutting down instance %s on node %s",
2913 instance.name, instance.primary_node)
2915 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
2916 if result.failed or not result.data:
2917 if self.op.ignore_failures:
2918 feedback_fn("Warning: can't shutdown instance")
2920 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2921 (instance.name, instance.primary_node))
2923 logging.info("Removing block devices for instance %s", instance.name)
2925 if not _RemoveDisks(self, instance):
2926 if self.op.ignore_failures:
2927 feedback_fn("Warning: can't remove instance's disks")
2929 raise errors.OpExecError("Can't remove instance's disks")
2931 logging.info("Removing instance %s out of cluster config", instance.name)
2933 self.cfg.RemoveInstance(instance.name)
2934 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2937 class LUQueryInstances(NoHooksLU):
2938 """Logical unit for querying instances.
2941 _OP_REQP = ["output_fields", "names"]
2943 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2944 "admin_state", "admin_ram",
2945 "disk_template", "ip", "mac", "bridge",
2946 "sda_size", "sdb_size", "vcpus", "tags",
2947 "network_port", "beparams",
2948 "(disk).(size)/([0-9]+)",
2950 "(nic).(mac|ip|bridge)/([0-9]+)",
2951 "(nic).(macs|ips|bridges)",
2952 "(disk|nic).(count)",
2953 "serial_no", "hypervisor", "hvparams",] +
2955 for name in constants.HVS_PARAMETERS] +
2957 for name in constants.BES_PARAMETERS])
2958 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2961 def ExpandNames(self):
2962 _CheckOutputFields(static=self._FIELDS_STATIC,
2963 dynamic=self._FIELDS_DYNAMIC,
2964 selected=self.op.output_fields)
2966 self.needed_locks = {}
2967 self.share_locks[locking.LEVEL_INSTANCE] = 1
2968 self.share_locks[locking.LEVEL_NODE] = 1
2971 self.wanted = _GetWantedInstances(self, self.op.names)
2973 self.wanted = locking.ALL_SET
2975 self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2977 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2978 self.needed_locks[locking.LEVEL_NODE] = []
2979 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2981 def DeclareLocks(self, level):
2982 if level == locking.LEVEL_NODE and self.do_locking:
2983 self._LockInstancesNodes()
2985 def CheckPrereq(self):
2986 """Check prerequisites.
2991 def Exec(self, feedback_fn):
2992 """Computes the list of nodes and their attributes.
2995 all_info = self.cfg.GetAllInstancesInfo()
2997 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2998 elif self.wanted != locking.ALL_SET:
2999 instance_names = self.wanted
3000 missing = set(instance_names).difference(all_info.keys())
3002 raise errors.OpExecError(
3003 "Some instances were removed before retrieving their data: %s"
3006 instance_names = all_info.keys()
3008 instance_names = utils.NiceSort(instance_names)
3009 instance_list = [all_info[iname] for iname in instance_names]
3011 # begin data gathering
3013 nodes = frozenset([inst.primary_node for inst in instance_list])
3014 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3020 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3022 result = node_data[name]
3024 # offline nodes will be in both lists
3025 off_nodes.append(name)
3027 bad_nodes.append(name)
3030 live_data.update(result.data)
3031 # else no instance is alive
3033 live_data = dict([(name, {}) for name in instance_names])
3035 # end data gathering
3040 for instance in instance_list:
3042 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3043 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3044 for field in self.op.output_fields:
3045 st_match = self._FIELDS_STATIC.Matches(field)
3050 elif field == "pnode":
3051 val = instance.primary_node
3052 elif field == "snodes":
3053 val = list(instance.secondary_nodes)
3054 elif field == "admin_state":
3055 val = (instance.status != "down")
3056 elif field == "oper_state":
3057 if instance.primary_node in bad_nodes:
3060 val = bool(live_data.get(instance.name))
3061 elif field == "status":
3062 if instance.primary_node in off_nodes:
3063 val = "ERROR_nodeoffline"
3064 elif instance.primary_node in bad_nodes:
3065 val = "ERROR_nodedown"
3067 running = bool(live_data.get(instance.name))
3069 if instance.status != "down":
3074 if instance.status != "down":
3078 elif field == "oper_ram":
3079 if instance.primary_node in bad_nodes:
3081 elif instance.name in live_data:
3082 val = live_data[instance.name].get("memory", "?")
3085 elif field == "disk_template":
3086 val = instance.disk_template
3088 val = instance.nics[0].ip
3089 elif field == "bridge":
3090 val = instance.nics[0].bridge
3091 elif field == "mac":
3092 val = instance.nics[0].mac
3093 elif field == "sda_size" or field == "sdb_size":
3094 idx = ord(field[2]) - ord('a')
3096 val = instance.FindDisk(idx).size
3097 except errors.OpPrereqError:
3099 elif field == "tags":
3100 val = list(instance.GetTags())
3101 elif field == "serial_no":
3102 val = instance.serial_no
3103 elif field == "network_port":
3104 val = instance.network_port
3105 elif field == "hypervisor":
3106 val = instance.hypervisor
3107 elif field == "hvparams":
3109 elif (field.startswith(HVPREFIX) and
3110 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3111 val = i_hv.get(field[len(HVPREFIX):], None)
3112 elif field == "beparams":
3114 elif (field.startswith(BEPREFIX) and
3115 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3116 val = i_be.get(field[len(BEPREFIX):], None)
3117 elif st_match and st_match.groups():
3118 # matches a variable list
3119 st_groups = st_match.groups()
3120 if st_groups and st_groups[0] == "disk":
3121 if st_groups[1] == "count":
3122 val = len(instance.disks)
3123 elif st_groups[1] == "sizes":
3124 val = [disk.size for disk in instance.disks]
3125 elif st_groups[1] == "size":
3127 val = instance.FindDisk(st_groups[2]).size
3128 except errors.OpPrereqError:
3131 assert False, "Unhandled disk parameter"
3132 elif st_groups[0] == "nic":
3133 if st_groups[1] == "count":
3134 val = len(instance.nics)
3135 elif st_groups[1] == "macs":
3136 val = [nic.mac for nic in instance.nics]
3137 elif st_groups[1] == "ips":
3138 val = [nic.ip for nic in instance.nics]
3139 elif st_groups[1] == "bridges":
3140 val = [nic.bridge for nic in instance.nics]
3143 nic_idx = int(st_groups[2])
3144 if nic_idx >= len(instance.nics):
3147 if st_groups[1] == "mac":
3148 val = instance.nics[nic_idx].mac
3149 elif st_groups[1] == "ip":
3150 val = instance.nics[nic_idx].ip
3151 elif st_groups[1] == "bridge":
3152 val = instance.nics[nic_idx].bridge
3154 assert False, "Unhandled NIC parameter"
3156 assert False, "Unhandled variable parameter"
3158 raise errors.ParameterError(field)
3165 class LUFailoverInstance(LogicalUnit):
3166 """Failover an instance.
3169 HPATH = "instance-failover"
3170 HTYPE = constants.HTYPE_INSTANCE
3171 _OP_REQP = ["instance_name", "ignore_consistency"]
3174 def ExpandNames(self):
3175 self._ExpandAndLockInstance()
3176 self.needed_locks[locking.LEVEL_NODE] = []
3177 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3179 def DeclareLocks(self, level):
3180 if level == locking.LEVEL_NODE:
3181 self._LockInstancesNodes()
3183 def BuildHooksEnv(self):
3186 This runs on master, primary and secondary nodes of the instance.
3190 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3192 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3193 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3196 def CheckPrereq(self):
3197 """Check prerequisites.
3199 This checks that the instance is in the cluster.
3202 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3203 assert self.instance is not None, \
3204 "Cannot retrieve locked instance %s" % self.op.instance_name
3206 bep = self.cfg.GetClusterInfo().FillBE(instance)
3207 if instance.disk_template not in constants.DTS_NET_MIRROR:
3208 raise errors.OpPrereqError("Instance's disk layout is not"
3209 " network mirrored, cannot failover.")
3211 secondary_nodes = instance.secondary_nodes
3212 if not secondary_nodes:
3213 raise errors.ProgrammerError("no secondary node but using "
3214 "a mirrored disk template")
3216 target_node = secondary_nodes[0]
3217 # check memory requirements on the secondary node
3218 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3219 instance.name, bep[constants.BE_MEMORY],
3220 instance.hypervisor)
3222 # check bridge existance
3223 brlist = [nic.bridge for nic in instance.nics]
3224 result = self.rpc.call_bridges_exist(target_node, brlist)
3227 raise errors.OpPrereqError("One or more target bridges %s does not"
3228 " exist on destination node '%s'" %
3229 (brlist, target_node))
3231 def Exec(self, feedback_fn):
3232 """Failover an instance.
3234 The failover is done by shutting it down on its present node and
3235 starting it on the secondary.
3238 instance = self.instance
3240 source_node = instance.primary_node
3241 target_node = instance.secondary_nodes[0]
3243 feedback_fn("* checking disk consistency between source and target")
3244 for dev in instance.disks:
3245 # for drbd, these are drbd over lvm
3246 if not _CheckDiskConsistency(self, dev, target_node, False):
3247 if instance.status == "up" and not self.op.ignore_consistency:
3248 raise errors.OpExecError("Disk %s is degraded on target node,"
3249 " aborting failover." % dev.iv_name)
3251 feedback_fn("* shutting down instance on source node")
3252 logging.info("Shutting down instance %s on node %s",
3253 instance.name, source_node)
3255 result = self.rpc.call_instance_shutdown(source_node, instance)
3256 if result.failed or not result.data:
3257 if self.op.ignore_consistency:
3258 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3260 " anyway. Please make sure node %s is down",
3261 instance.name, source_node, source_node)
3263 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3264 (instance.name, source_node))
3266 feedback_fn("* deactivating the instance's disks on source node")
3267 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3268 raise errors.OpExecError("Can't shut down the instance's disks.")
3270 instance.primary_node = target_node
3271 # distribute new instance config to the other nodes
3272 self.cfg.Update(instance)
3274 # Only start the instance if it's marked as up
3275 if instance.status == "up":
3276 feedback_fn("* activating the instance's disks on target node")
3277 logging.info("Starting instance %s on node %s",
3278 instance.name, target_node)
3280 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3281 ignore_secondaries=True)
3283 _ShutdownInstanceDisks(self, instance)
3284 raise errors.OpExecError("Can't activate the instance's disks")
3286 feedback_fn("* starting the instance on the target node")
3287 result = self.rpc.call_instance_start(target_node, instance, None)
3288 if result.failed or not result.data:
3289 _ShutdownInstanceDisks(self, instance)
3290 raise errors.OpExecError("Could not start instance %s on node %s." %
3291 (instance.name, target_node))
3294 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3295 """Create a tree of block devices on the primary node.
3297 This always creates all devices.
3301 for child in device.children:
3302 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3305 lu.cfg.SetDiskID(device, node)
3306 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3307 instance.name, True, info)
3308 if new_id.failed or not new_id.data:
3310 if device.physical_id is None:
3311 device.physical_id = new_id
3315 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3316 """Create a tree of block devices on a secondary node.
3318 If this device type has to be created on secondaries, create it and
3321 If not, just recurse to children keeping the same 'force' value.
3324 if device.CreateOnSecondary():
3327 for child in device.children:
3328 if not _CreateBlockDevOnSecondary(lu, node, instance,
3329 child, force, info):
3334 lu.cfg.SetDiskID(device, node)
3335 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3336 instance.name, False, info)
3337 if new_id.failed or not new_id.data:
3339 if device.physical_id is None:
3340 device.physical_id = new_id
3344 def _GenerateUniqueNames(lu, exts):
3345 """Generate a suitable LV name.
3347 This will generate a logical volume name for the given instance.
3352 new_id = lu.cfg.GenerateUniqueID()
3353 results.append("%s%s" % (new_id, val))
3357 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3359 """Generate a drbd8 device complete with its children.
3362 port = lu.cfg.AllocatePort()
3363 vgname = lu.cfg.GetVGName()
3364 shared_secret = lu.cfg.GenerateDRBDSecret()
3365 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3366 logical_id=(vgname, names[0]))
3367 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3368 logical_id=(vgname, names[1]))
3369 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3370 logical_id=(primary, secondary, port,
3373 children=[dev_data, dev_meta],
3378 def _GenerateDiskTemplate(lu, template_name,
3379 instance_name, primary_node,
3380 secondary_nodes, disk_info,
3381 file_storage_dir, file_driver,
3383 """Generate the entire disk layout for a given template type.
3386 #TODO: compute space requirements
3388 vgname = lu.cfg.GetVGName()
3389 disk_count = len(disk_info)
3391 if template_name == constants.DT_DISKLESS:
3393 elif template_name == constants.DT_PLAIN:
3394 if len(secondary_nodes) != 0:
3395 raise errors.ProgrammerError("Wrong template configuration")
3397 names = _GenerateUniqueNames(lu, [".disk%d" % i
3398 for i in range(disk_count)])
3399 for idx, disk in enumerate(disk_info):
3400 disk_index = idx + base_index
3401 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3402 logical_id=(vgname, names[idx]),
3403 iv_name="disk/%d" % disk_index)
3404 disks.append(disk_dev)
3405 elif template_name == constants.DT_DRBD8:
3406 if len(secondary_nodes) != 1:
3407 raise errors.ProgrammerError("Wrong template configuration")
3408 remote_node = secondary_nodes[0]
3409 minors = lu.cfg.AllocateDRBDMinor(
3410 [primary_node, remote_node] * len(disk_info), instance_name)
3412 names = _GenerateUniqueNames(lu,
3413 [".disk%d_%s" % (i, s)
3414 for i in range(disk_count)
3415 for s in ("data", "meta")
3417 for idx, disk in enumerate(disk_info):
3418 disk_index = idx + base_index
3419 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3420 disk["size"], names[idx*2:idx*2+2],
3421 "disk/%d" % disk_index,
3422 minors[idx*2], minors[idx*2+1])
3423 disks.append(disk_dev)
3424 elif template_name == constants.DT_FILE:
3425 if len(secondary_nodes) != 0:
3426 raise errors.ProgrammerError("Wrong template configuration")
3428 for idx, disk in enumerate(disk_info):
3429 disk_index = idx + base_index
3430 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3431 iv_name="disk/%d" % disk_index,
3432 logical_id=(file_driver,
3433 "%s/disk%d" % (file_storage_dir,
3435 disks.append(disk_dev)
3437 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3441 def _GetInstanceInfoText(instance):
3442 """Compute that text that should be added to the disk's metadata.
3445 return "originstname+%s" % instance.name
3448 def _CreateDisks(lu, instance):
3449 """Create all disks for an instance.
3451 This abstracts away some work from AddInstance.
3453 @type lu: L{LogicalUnit}
3454 @param lu: the logical unit on whose behalf we execute
3455 @type instance: L{objects.Instance}
3456 @param instance: the instance whose disks we should create
3458 @return: the success of the creation
3461 info = _GetInstanceInfoText(instance)
3463 if instance.disk_template == constants.DT_FILE:
3464 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3465 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3468 if result.failed or not result.data:
3469 logging.error("Could not connect to node '%s'", instance.primary_node)
3472 if not result.data[0]:
3473 logging.error("Failed to create directory '%s'", file_storage_dir)
3476 # Note: this needs to be kept in sync with adding of disks in
3477 # LUSetInstanceParams
3478 for device in instance.disks:
3479 logging.info("Creating volume %s for instance %s",
3480 device.iv_name, instance.name)
3482 for secondary_node in instance.secondary_nodes:
3483 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3484 device, False, info):
3485 logging.error("Failed to create volume %s (%s) on secondary node %s!",
3486 device.iv_name, device, secondary_node)
3489 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3490 instance, device, info):
3491 logging.error("Failed to create volume %s on primary!", device.iv_name)
3497 def _RemoveDisks(lu, instance):
3498 """Remove all disks for an instance.
3500 This abstracts away some work from `AddInstance()` and
3501 `RemoveInstance()`. Note that in case some of the devices couldn't
3502 be removed, the removal will continue with the other ones (compare
3503 with `_CreateDisks()`).
3505 @type lu: L{LogicalUnit}
3506 @param lu: the logical unit on whose behalf we execute
3507 @type instance: L{objects.Instance}
3508 @param instance: the instance whose disks we should remove
3510 @return: the success of the removal
3513 logging.info("Removing block devices for instance %s", instance.name)
3516 for device in instance.disks:
3517 for node, disk in device.ComputeNodeTree(instance.primary_node):
3518 lu.cfg.SetDiskID(disk, node)
3519 result = lu.rpc.call_blockdev_remove(node, disk)
3520 if result.failed or not result.data:
3521 lu.proc.LogWarning("Could not remove block device %s on node %s,"
3522 " continuing anyway", device.iv_name, node)
3525 if instance.disk_template == constants.DT_FILE:
3526 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3527 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3529 if result.failed or not result.data:
3530 logging.error("Could not remove directory '%s'", file_storage_dir)
3536 def _ComputeDiskSize(disk_template, disks):
3537 """Compute disk size requirements in the volume group
3540 # Required free disk space as a function of disk and swap space
3542 constants.DT_DISKLESS: None,
3543 constants.DT_PLAIN: sum(d["size"] for d in disks),
3544 # 128 MB are added for drbd metadata for each disk
3545 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3546 constants.DT_FILE: None,
3549 if disk_template not in req_size_dict:
3550 raise errors.ProgrammerError("Disk template '%s' size requirement"
3551 " is unknown" % disk_template)
3553 return req_size_dict[disk_template]
3556 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3557 """Hypervisor parameter validation.
3559 This function abstract the hypervisor parameter validation to be
3560 used in both instance create and instance modify.
3562 @type lu: L{LogicalUnit}
3563 @param lu: the logical unit for which we check
3564 @type nodenames: list
3565 @param nodenames: the list of nodes on which we should check
3566 @type hvname: string
3567 @param hvname: the name of the hypervisor we should use
3568 @type hvparams: dict
3569 @param hvparams: the parameters which we need to check
3570 @raise errors.OpPrereqError: if the parameters are not valid
3573 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3576 for node in nodenames:
3579 if not info.data or not isinstance(info.data, (tuple, list)):
3580 raise errors.OpPrereqError("Cannot get current information"
3581 " from node '%s' (%s)" % (node, info.data))
3582 if not info.data[0]:
3583 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3584 " %s" % info.data[1])
3587 class LUCreateInstance(LogicalUnit):
3588 """Create an instance.
3591 HPATH = "instance-add"
3592 HTYPE = constants.HTYPE_INSTANCE
3593 _OP_REQP = ["instance_name", "disks", "disk_template",
3595 "wait_for_sync", "ip_check", "nics",
3596 "hvparams", "beparams"]
3599 def _ExpandNode(self, node):
3600 """Expands and checks one node name.
3603 node_full = self.cfg.ExpandNodeName(node)
3604 if node_full is None:
3605 raise errors.OpPrereqError("Unknown node %s" % node)
3608 def ExpandNames(self):
3609 """ExpandNames for CreateInstance.
3611 Figure out the right locks for instance creation.
3614 self.needed_locks = {}
3616 # set optional parameters to none if they don't exist
3617 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3618 if not hasattr(self.op, attr):
3619 setattr(self.op, attr, None)
3621 # cheap checks, mostly valid constants given
3623 # verify creation mode
3624 if self.op.mode not in (constants.INSTANCE_CREATE,
3625 constants.INSTANCE_IMPORT):
3626 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3629 # disk template and mirror node verification
3630 if self.op.disk_template not in constants.DISK_TEMPLATES:
3631 raise errors.OpPrereqError("Invalid disk template name")
3633 if self.op.hypervisor is None:
3634 self.op.hypervisor = self.cfg.GetHypervisorType()
3636 cluster = self.cfg.GetClusterInfo()
3637 enabled_hvs = cluster.enabled_hypervisors
3638 if self.op.hypervisor not in enabled_hvs:
3639 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3640 " cluster (%s)" % (self.op.hypervisor,
3641 ",".join(enabled_hvs)))
3643 # check hypervisor parameter syntax (locally)
3645 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3647 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3648 hv_type.CheckParameterSyntax(filled_hvp)
3650 # fill and remember the beparams dict
3651 utils.CheckBEParams(self.op.beparams)
3652 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3655 #### instance parameters check
3657 # instance name verification
3658 hostname1 = utils.HostInfo(self.op.instance_name)
3659 self.op.instance_name = instance_name = hostname1.name
3661 # this is just a preventive check, but someone might still add this
3662 # instance in the meantime, and creation will fail at lock-add time
3663 if instance_name in self.cfg.GetInstanceList():
3664 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3667 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3671 for nic in self.op.nics:
3672 # ip validity checks
3673 ip = nic.get("ip", None)
3674 if ip is None or ip.lower() == "none":
3676 elif ip.lower() == constants.VALUE_AUTO:
3677 nic_ip = hostname1.ip
3679 if not utils.IsValidIP(ip):
3680 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3681 " like a valid IP" % ip)
3684 # MAC address verification
3685 mac = nic.get("mac", constants.VALUE_AUTO)
3686 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3687 if not utils.IsValidMac(mac.lower()):
3688 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3690 # bridge verification
3691 bridge = nic.get("bridge", self.cfg.GetDefBridge())
3692 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3694 # disk checks/pre-build
3696 for disk in self.op.disks:
3697 mode = disk.get("mode", constants.DISK_RDWR)
3698 if mode not in constants.DISK_ACCESS_SET:
3699 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3701 size = disk.get("size", None)
3703 raise errors.OpPrereqError("Missing disk size")
3707 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3708 self.disks.append({"size": size, "mode": mode})
3710 # used in CheckPrereq for ip ping check
3711 self.check_ip = hostname1.ip
3713 # file storage checks
3714 if (self.op.file_driver and
3715 not self.op.file_driver in constants.FILE_DRIVER):
3716 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3717 self.op.file_driver)
3719 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3720 raise errors.OpPrereqError("File storage directory path not absolute")
3722 ### Node/iallocator related checks
3723 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3724 raise errors.OpPrereqError("One and only one of iallocator and primary"
3725 " node must be given")
3727 if self.op.iallocator:
3728 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3730 self.op.pnode = self._ExpandNode(self.op.pnode)
3731 nodelist = [self.op.pnode]
3732 if self.op.snode is not None:
3733 self.op.snode = self._ExpandNode(self.op.snode)
3734 nodelist.append(self.op.snode)
3735 self.needed_locks[locking.LEVEL_NODE] = nodelist
3737 # in case of import lock the source node too
3738 if self.op.mode == constants.INSTANCE_IMPORT:
3739 src_node = getattr(self.op, "src_node", None)
3740 src_path = getattr(self.op, "src_path", None)
3742 if src_path is None:
3743 self.op.src_path = src_path = self.op.instance_name
3745 if src_node is None:
3746 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3747 self.op.src_node = None
3748 if os.path.isabs(src_path):
3749 raise errors.OpPrereqError("Importing an instance from an absolute"
3750 " path requires a source node option.")
3752 self.op.src_node = src_node = self._ExpandNode(src_node)
3753 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3754 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3755 if not os.path.isabs(src_path):
3756 self.op.src_path = src_path = \
3757 os.path.join(constants.EXPORT_DIR, src_path)
3759 else: # INSTANCE_CREATE
3760 if getattr(self.op, "os_type", None) is None:
3761 raise errors.OpPrereqError("No guest OS specified")
3763 def _RunAllocator(self):
3764 """Run the allocator based on input opcode.
3767 nics = [n.ToDict() for n in self.nics]
3768 ial = IAllocator(self,
3769 mode=constants.IALLOCATOR_MODE_ALLOC,
3770 name=self.op.instance_name,
3771 disk_template=self.op.disk_template,
3774 vcpus=self.be_full[constants.BE_VCPUS],
3775 mem_size=self.be_full[constants.BE_MEMORY],
3778 hypervisor=self.op.hypervisor,
3781 ial.Run(self.op.iallocator)
3784 raise errors.OpPrereqError("Can't compute nodes using"
3785 " iallocator '%s': %s" % (self.op.iallocator,
3787 if len(ial.nodes) != ial.required_nodes:
3788 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3789 " of nodes (%s), required %s" %
3790 (self.op.iallocator, len(ial.nodes),
3791 ial.required_nodes))
3792 self.op.pnode = ial.nodes[0]
3793 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3794 self.op.instance_name, self.op.iallocator,
3795 ", ".join(ial.nodes))
3796 if ial.required_nodes == 2:
3797 self.op.snode = ial.nodes[1]
3799 def BuildHooksEnv(self):
3802 This runs on master, primary and secondary nodes of the instance.
3806 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3807 "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3808 "INSTANCE_ADD_MODE": self.op.mode,
3810 if self.op.mode == constants.INSTANCE_IMPORT:
3811 env["INSTANCE_SRC_NODE"] = self.op.src_node
3812 env["INSTANCE_SRC_PATH"] = self.op.src_path
3813 env["INSTANCE_SRC_IMAGES"] = self.src_images
3815 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3816 primary_node=self.op.pnode,
3817 secondary_nodes=self.secondaries,
3818 status=self.instance_status,
3819 os_type=self.op.os_type,
3820 memory=self.be_full[constants.BE_MEMORY],
3821 vcpus=self.be_full[constants.BE_VCPUS],
3822 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3825 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3830 def CheckPrereq(self):
3831 """Check prerequisites.
3834 if (not self.cfg.GetVGName() and
3835 self.op.disk_template not in constants.DTS_NOT_LVM):
3836 raise errors.OpPrereqError("Cluster does not support lvm-based"
3840 if self.op.mode == constants.INSTANCE_IMPORT:
3841 src_node = self.op.src_node
3842 src_path = self.op.src_path
3844 if src_node is None:
3845 exp_list = self.rpc.call_export_list(
3846 self.acquired_locks[locking.LEVEL_NODE])
3848 for node in exp_list:
3849 if not exp_list[node].failed and src_path in exp_list[node].data:
3851 self.op.src_node = src_node = node
3852 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
3856 raise errors.OpPrereqError("No export found for relative path %s" %
3859 result = self.rpc.call_export_info(src_node, src_path)
3862 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3864 export_info = result.data
3865 if not export_info.has_section(constants.INISECT_EXP):
3866 raise errors.ProgrammerError("Corrupted export config")
3868 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3869 if (int(ei_version) != constants.EXPORT_VERSION):
3870 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3871 (ei_version, constants.EXPORT_VERSION))
3873 # Check that the new instance doesn't have less disks than the export
3874 instance_disks = len(self.disks)
3875 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3876 if instance_disks < export_disks:
3877 raise errors.OpPrereqError("Not enough disks to import."
3878 " (instance: %d, export: %d)" %
3879 (instance_disks, export_disks))
3881 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3883 for idx in range(export_disks):
3884 option = 'disk%d_dump' % idx
3885 if export_info.has_option(constants.INISECT_INS, option):
3886 # FIXME: are the old os-es, disk sizes, etc. useful?
3887 export_name = export_info.get(constants.INISECT_INS, option)
3888 image = os.path.join(src_path, export_name)
3889 disk_images.append(image)
3891 disk_images.append(False)
3893 self.src_images = disk_images
3895 old_name = export_info.get(constants.INISECT_INS, 'name')
3896 # FIXME: int() here could throw a ValueError on broken exports
3897 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3898 if self.op.instance_name == old_name:
3899 for idx, nic in enumerate(self.nics):
3900 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3901 nic_mac_ini = 'nic%d_mac' % idx
3902 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3904 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3905 if self.op.start and not self.op.ip_check:
3906 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3907 " adding an instance in start mode")
3909 if self.op.ip_check:
3910 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3911 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3912 (self.check_ip, self.op.instance_name))
3916 if self.op.iallocator is not None:
3917 self._RunAllocator()
3919 #### node related checks
3921 # check primary node
3922 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3923 assert self.pnode is not None, \
3924 "Cannot retrieve locked node %s" % self.op.pnode
3925 self.secondaries = []
3927 # mirror node verification
3928 if self.op.disk_template in constants.DTS_NET_MIRROR:
3929 if self.op.snode is None:
3930 raise errors.OpPrereqError("The networked disk templates need"
3932 if self.op.snode == pnode.name:
3933 raise errors.OpPrereqError("The secondary node cannot be"
3934 " the primary node.")
3935 self.secondaries.append(self.op.snode)
3937 nodenames = [pnode.name] + self.secondaries
3939 req_size = _ComputeDiskSize(self.op.disk_template,
3942 # Check lv size requirements
3943 if req_size is not None:
3944 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3946 for node in nodenames:
3947 info = nodeinfo[node]
3951 raise errors.OpPrereqError("Cannot get current information"
3952 " from node '%s'" % node)
3953 vg_free = info.get('vg_free', None)
3954 if not isinstance(vg_free, int):
3955 raise errors.OpPrereqError("Can't compute free disk space on"
3957 if req_size > info['vg_free']:
3958 raise errors.OpPrereqError("Not enough disk space on target node %s."
3959 " %d MB available, %d MB required" %
3960 (node, info['vg_free'], req_size))
3962 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3965 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3967 if not isinstance(result.data, objects.OS):
3968 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3969 " primary node" % self.op.os_type)
3971 # bridge check on primary node
3972 bridges = [n.bridge for n in self.nics]
3973 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
3976 raise errors.OpPrereqError("One of the target bridges '%s' does not"
3977 " exist on destination node '%s'" %
3978 (",".join(bridges), pnode.name))
3980 # memory check on primary node
3982 _CheckNodeFreeMemory(self, self.pnode.name,
3983 "creating instance %s" % self.op.instance_name,
3984 self.be_full[constants.BE_MEMORY],
3988 self.instance_status = 'up'
3990 self.instance_status = 'down'
3992 def Exec(self, feedback_fn):
3993 """Create and add the instance to the cluster.
3996 instance = self.op.instance_name
3997 pnode_name = self.pnode.name
3999 for nic in self.nics:
4000 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4001 nic.mac = self.cfg.GenerateMAC()
4003 ht_kind = self.op.hypervisor
4004 if ht_kind in constants.HTS_REQ_PORT:
4005 network_port = self.cfg.AllocatePort()
4009 ##if self.op.vnc_bind_address is None:
4010 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4012 # this is needed because os.path.join does not accept None arguments
4013 if self.op.file_storage_dir is None:
4014 string_file_storage_dir = ""
4016 string_file_storage_dir = self.op.file_storage_dir
4018 # build the full file storage dir path
4019 file_storage_dir = os.path.normpath(os.path.join(
4020 self.cfg.GetFileStorageDir(),
4021 string_file_storage_dir, instance))
4024 disks = _GenerateDiskTemplate(self,
4025 self.op.disk_template,
4026 instance, pnode_name,
4030 self.op.file_driver,
4033 iobj = objects.Instance(name=instance, os=self.op.os_type,
4034 primary_node=pnode_name,
4035 nics=self.nics, disks=disks,
4036 disk_template=self.op.disk_template,
4037 status=self.instance_status,
4038 network_port=network_port,
4039 beparams=self.op.beparams,
4040 hvparams=self.op.hvparams,
4041 hypervisor=self.op.hypervisor,
4044 feedback_fn("* creating instance disks...")
4045 if not _CreateDisks(self, iobj):
4046 _RemoveDisks(self, iobj)
4047 self.cfg.ReleaseDRBDMinors(instance)
4048 raise errors.OpExecError("Device creation failed, reverting...")
4050 feedback_fn("adding instance %s to cluster config" % instance)
4052 self.cfg.AddInstance(iobj)
4053 # Declare that we don't want to remove the instance lock anymore, as we've
4054 # added the instance to the config
4055 del self.remove_locks[locking.LEVEL_INSTANCE]
4056 # Remove the temp. assignements for the instance's drbds
4057 self.cfg.ReleaseDRBDMinors(instance)
4058 # Unlock all the nodes
4059 if self.op.mode == constants.INSTANCE_IMPORT:
4060 nodes_keep = [self.op.src_node]
4061 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4062 if node != self.op.src_node]
4063 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4064 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4066 self.context.glm.release(locking.LEVEL_NODE)
4067 del self.acquired_locks[locking.LEVEL_NODE]
4069 if self.op.wait_for_sync:
4070 disk_abort = not _WaitForSync(self, iobj)
4071 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4072 # make sure the disks are not degraded (still sync-ing is ok)
4074 feedback_fn("* checking mirrors status")
4075 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4080 _RemoveDisks(self, iobj)
4081 self.cfg.RemoveInstance(iobj.name)
4082 # Make sure the instance lock gets removed
4083 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4084 raise errors.OpExecError("There are some degraded disks for"
4087 feedback_fn("creating os for instance %s on node %s" %
4088 (instance, pnode_name))
4090 if iobj.disk_template != constants.DT_DISKLESS:
4091 if self.op.mode == constants.INSTANCE_CREATE:
4092 feedback_fn("* running the instance OS create scripts...")
4093 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4096 raise errors.OpExecError("Could not add os for instance %s"
4098 (instance, pnode_name))
4100 elif self.op.mode == constants.INSTANCE_IMPORT:
4101 feedback_fn("* running the instance OS import scripts...")
4102 src_node = self.op.src_node
4103 src_images = self.src_images
4104 cluster_name = self.cfg.GetClusterName()
4105 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4106 src_node, src_images,
4108 import_result.Raise()
4109 for idx, result in enumerate(import_result.data):
4111 self.LogWarning("Could not import the image %s for instance"
4112 " %s, disk %d, on node %s" %
4113 (src_images[idx], instance, idx, pnode_name))
4115 # also checked in the prereq part
4116 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4120 logging.info("Starting instance %s on node %s", instance, pnode_name)
4121 feedback_fn("* starting instance...")
4122 result = self.rpc.call_instance_start(pnode_name, iobj, None)
4125 raise errors.OpExecError("Could not start instance")
4128 class LUConnectConsole(NoHooksLU):
4129 """Connect to an instance's console.
4131 This is somewhat special in that it returns the command line that
4132 you need to run on the master node in order to connect to the
4136 _OP_REQP = ["instance_name"]
4139 def ExpandNames(self):
4140 self._ExpandAndLockInstance()
4142 def CheckPrereq(self):
4143 """Check prerequisites.
4145 This checks that the instance is in the cluster.
4148 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4149 assert self.instance is not None, \
4150 "Cannot retrieve locked instance %s" % self.op.instance_name
4152 def Exec(self, feedback_fn):
4153 """Connect to the console of an instance
4156 instance = self.instance
4157 node = instance.primary_node
4159 node_insts = self.rpc.call_instance_list([node],
4160 [instance.hypervisor])[node]
4163 if instance.name not in node_insts.data:
4164 raise errors.OpExecError("Instance %s is not running." % instance.name)
4166 logging.debug("Connecting to console of %s on %s", instance.name, node)
4168 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4169 console_cmd = hyper.GetShellCommandForConsole(instance)
4172 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4175 class LUReplaceDisks(LogicalUnit):
4176 """Replace the disks of an instance.
4179 HPATH = "mirrors-replace"
4180 HTYPE = constants.HTYPE_INSTANCE
4181 _OP_REQP = ["instance_name", "mode", "disks"]
4184 def ExpandNames(self):
4185 self._ExpandAndLockInstance()
4187 if not hasattr(self.op, "remote_node"):
4188 self.op.remote_node = None
4190 ia_name = getattr(self.op, "iallocator", None)
4191 if ia_name is not None:
4192 if self.op.remote_node is not None:
4193 raise errors.OpPrereqError("Give either the iallocator or the new"
4194 " secondary, not both")
4195 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4196 elif self.op.remote_node is not None:
4197 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4198 if remote_node is None:
4199 raise errors.OpPrereqError("Node '%s' not known" %
4200 self.op.remote_node)
4201 self.op.remote_node = remote_node
4202 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4203 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4205 self.needed_locks[locking.LEVEL_NODE] = []
4206 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4208 def DeclareLocks(self, level):
4209 # If we're not already locking all nodes in the set we have to declare the
4210 # instance's primary/secondary nodes.
4211 if (level == locking.LEVEL_NODE and
4212 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4213 self._LockInstancesNodes()
4215 def _RunAllocator(self):
4216 """Compute a new secondary node using an IAllocator.
4219 ial = IAllocator(self,
4220 mode=constants.IALLOCATOR_MODE_RELOC,
4221 name=self.op.instance_name,
4222 relocate_from=[self.sec_node])
4224 ial.Run(self.op.iallocator)
4227 raise errors.OpPrereqError("Can't compute nodes using"
4228 " iallocator '%s': %s" % (self.op.iallocator,
4230 if len(ial.nodes) != ial.required_nodes:
4231 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4232 " of nodes (%s), required %s" %
4233 (len(ial.nodes), ial.required_nodes))
4234 self.op.remote_node = ial.nodes[0]
4235 self.LogInfo("Selected new secondary for the instance: %s",
4236 self.op.remote_node)
4238 def BuildHooksEnv(self):
4241 This runs on the master, the primary and all the secondaries.
4245 "MODE": self.op.mode,
4246 "NEW_SECONDARY": self.op.remote_node,
4247 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4249 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4251 self.cfg.GetMasterNode(),
4252 self.instance.primary_node,
4254 if self.op.remote_node is not None:
4255 nl.append(self.op.remote_node)
4258 def CheckPrereq(self):
4259 """Check prerequisites.
4261 This checks that the instance is in the cluster.
4264 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4265 assert instance is not None, \
4266 "Cannot retrieve locked instance %s" % self.op.instance_name
4267 self.instance = instance
4269 if instance.disk_template not in constants.DTS_NET_MIRROR:
4270 raise errors.OpPrereqError("Instance's disk layout is not"
4271 " network mirrored.")
4273 if len(instance.secondary_nodes) != 1:
4274 raise errors.OpPrereqError("The instance has a strange layout,"
4275 " expected one secondary but found %d" %
4276 len(instance.secondary_nodes))
4278 self.sec_node = instance.secondary_nodes[0]
4280 ia_name = getattr(self.op, "iallocator", None)
4281 if ia_name is not None:
4282 self._RunAllocator()
4284 remote_node = self.op.remote_node
4285 if remote_node is not None:
4286 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4287 assert self.remote_node_info is not None, \
4288 "Cannot retrieve locked node %s" % remote_node
4290 self.remote_node_info = None
4291 if remote_node == instance.primary_node:
4292 raise errors.OpPrereqError("The specified node is the primary node of"
4294 elif remote_node == self.sec_node:
4295 if self.op.mode == constants.REPLACE_DISK_SEC:
4296 # this is for DRBD8, where we can't execute the same mode of
4297 # replacement as for drbd7 (no different port allocated)
4298 raise errors.OpPrereqError("Same secondary given, cannot execute"
4300 if instance.disk_template == constants.DT_DRBD8:
4301 if (self.op.mode == constants.REPLACE_DISK_ALL and
4302 remote_node is not None):
4303 # switch to replace secondary mode
4304 self.op.mode = constants.REPLACE_DISK_SEC
4306 if self.op.mode == constants.REPLACE_DISK_ALL:
4307 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4308 " secondary disk replacement, not"
4310 elif self.op.mode == constants.REPLACE_DISK_PRI:
4311 if remote_node is not None:
4312 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4313 " the secondary while doing a primary"
4314 " node disk replacement")
4315 self.tgt_node = instance.primary_node
4316 self.oth_node = instance.secondary_nodes[0]
4317 elif self.op.mode == constants.REPLACE_DISK_SEC:
4318 self.new_node = remote_node # this can be None, in which case
4319 # we don't change the secondary
4320 self.tgt_node = instance.secondary_nodes[0]
4321 self.oth_node = instance.primary_node
4323 raise errors.ProgrammerError("Unhandled disk replace mode")
4325 if not self.op.disks:
4326 self.op.disks = range(len(instance.disks))
4328 for disk_idx in self.op.disks:
4329 instance.FindDisk(disk_idx)
4331 def _ExecD8DiskOnly(self, feedback_fn):
4332 """Replace a disk on the primary or secondary for dbrd8.
4334 The algorithm for replace is quite complicated:
4336 1. for each disk to be replaced:
4338 1. create new LVs on the target node with unique names
4339 1. detach old LVs from the drbd device
4340 1. rename old LVs to name_replaced.<time_t>
4341 1. rename new LVs to old LVs
4342 1. attach the new LVs (with the old names now) to the drbd device
4344 1. wait for sync across all devices
4346 1. for each modified disk:
4348 1. remove old LVs (which have the name name_replaces.<time_t>)
4350 Failures are not very well handled.
4354 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4355 instance = self.instance
4357 vgname = self.cfg.GetVGName()
4360 tgt_node = self.tgt_node
4361 oth_node = self.oth_node
4363 # Step: check device activation
4364 self.proc.LogStep(1, steps_total, "check device existence")
4365 info("checking volume groups")
4366 my_vg = cfg.GetVGName()
4367 results = self.rpc.call_vg_list([oth_node, tgt_node])
4369 raise errors.OpExecError("Can't list volume groups on the nodes")
4370 for node in oth_node, tgt_node:
4372 if res.failed or not res.data or my_vg not in res.data:
4373 raise errors.OpExecError("Volume group '%s' not found on %s" %
4375 for idx, dev in enumerate(instance.disks):
4376 if idx not in self.op.disks:
4378 for node in tgt_node, oth_node:
4379 info("checking disk/%d on %s" % (idx, node))
4380 cfg.SetDiskID(dev, node)
4381 if not self.rpc.call_blockdev_find(node, dev):
4382 raise errors.OpExecError("Can't find disk/%d on node %s" %
4385 # Step: check other node consistency
4386 self.proc.LogStep(2, steps_total, "check peer consistency")
4387 for idx, dev in enumerate(instance.disks):
4388 if idx not in self.op.disks:
4390 info("checking disk/%d consistency on %s" % (idx, oth_node))
4391 if not _CheckDiskConsistency(self, dev, oth_node,
4392 oth_node==instance.primary_node):
4393 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4394 " to replace disks on this node (%s)" %
4395 (oth_node, tgt_node))
4397 # Step: create new storage
4398 self.proc.LogStep(3, steps_total, "allocate new storage")
4399 for idx, dev in enumerate(instance.disks):
4400 if idx not in self.op.disks:
4403 cfg.SetDiskID(dev, tgt_node)
4404 lv_names = [".disk%d_%s" % (idx, suf)
4405 for suf in ["data", "meta"]]
4406 names = _GenerateUniqueNames(self, lv_names)
4407 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4408 logical_id=(vgname, names[0]))
4409 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4410 logical_id=(vgname, names[1]))
4411 new_lvs = [lv_data, lv_meta]
4412 old_lvs = dev.children
4413 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4414 info("creating new local storage on %s for %s" %
4415 (tgt_node, dev.iv_name))
4416 # since we *always* want to create this LV, we use the
4417 # _Create...OnPrimary (which forces the creation), even if we
4418 # are talking about the secondary node
4419 for new_lv in new_lvs:
4420 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4421 _GetInstanceInfoText(instance)):
4422 raise errors.OpExecError("Failed to create new LV named '%s' on"
4424 (new_lv.logical_id[1], tgt_node))
4426 # Step: for each lv, detach+rename*2+attach
4427 self.proc.LogStep(4, steps_total, "change drbd configuration")
4428 for dev, old_lvs, new_lvs in iv_names.itervalues():
4429 info("detaching %s drbd from local storage" % dev.iv_name)
4430 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
4433 raise errors.OpExecError("Can't detach drbd from local storage on node"
4434 " %s for device %s" % (tgt_node, dev.iv_name))
4436 #cfg.Update(instance)
4438 # ok, we created the new LVs, so now we know we have the needed
4439 # storage; as such, we proceed on the target node to rename
4440 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4441 # using the assumption that logical_id == physical_id (which in
4442 # turn is the unique_id on that node)
4444 # FIXME(iustin): use a better name for the replaced LVs
4445 temp_suffix = int(time.time())
4446 ren_fn = lambda d, suff: (d.physical_id[0],
4447 d.physical_id[1] + "_replaced-%s" % suff)
4448 # build the rename list based on what LVs exist on the node
4450 for to_ren in old_lvs:
4451 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4452 if not find_res.failed and find_res.data is not None: # device exists
4453 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4455 info("renaming the old LVs on the target node")
4456 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4459 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4460 # now we rename the new LVs to the old LVs
4461 info("renaming the new LVs on the target node")
4462 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4463 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
4466 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4468 for old, new in zip(old_lvs, new_lvs):
4469 new.logical_id = old.logical_id
4470 cfg.SetDiskID(new, tgt_node)
4472 for disk in old_lvs:
4473 disk.logical_id = ren_fn(disk, temp_suffix)
4474 cfg.SetDiskID(disk, tgt_node)
4476 # now that the new lvs have the old name, we can add them to the device
4477 info("adding new mirror component on %s" % tgt_node)
4478 result =self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
4479 if result.failed or not result.data:
4480 for new_lv in new_lvs:
4481 result = self.rpc.call_blockdev_remove(tgt_node, new_lv)
4482 if result.failed or not result.data:
4483 warning("Can't rollback device %s", hint="manually cleanup unused"
4485 raise errors.OpExecError("Can't add local storage to drbd")
4487 dev.children = new_lvs
4488 cfg.Update(instance)
4490 # Step: wait for sync
4492 # this can fail as the old devices are degraded and _WaitForSync
4493 # does a combined result over all disks, so we don't check its
4495 self.proc.LogStep(5, steps_total, "sync devices")
4496 _WaitForSync(self, instance, unlock=True)
4498 # so check manually all the devices
4499 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4500 cfg.SetDiskID(dev, instance.primary_node)
4501 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
4502 if result.failed or result.data[5]:
4503 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4505 # Step: remove old storage
4506 self.proc.LogStep(6, steps_total, "removing old storage")
4507 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4508 info("remove logical volumes for %s" % name)
4510 cfg.SetDiskID(lv, tgt_node)
4511 result = self.rpc.call_blockdev_remove(tgt_node, lv)
4512 if result.failed or not result.data:
4513 warning("Can't remove old LV", hint="manually remove unused LVs")
4516 def _ExecD8Secondary(self, feedback_fn):
4517 """Replace the secondary node for drbd8.
4519 The algorithm for replace is quite complicated:
4520 - for all disks of the instance:
4521 - create new LVs on the new node with same names
4522 - shutdown the drbd device on the old secondary
4523 - disconnect the drbd network on the primary
4524 - create the drbd device on the new secondary
4525 - network attach the drbd on the primary, using an artifice:
4526 the drbd code for Attach() will connect to the network if it
4527 finds a device which is connected to the good local disks but
4529 - wait for sync across all devices
4530 - remove all disks from the old secondary
4532 Failures are not very well handled.
4536 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4537 instance = self.instance
4539 vgname = self.cfg.GetVGName()
4542 old_node = self.tgt_node
4543 new_node = self.new_node
4544 pri_node = instance.primary_node
4546 # Step: check device activation
4547 self.proc.LogStep(1, steps_total, "check device existence")
4548 info("checking volume groups")
4549 my_vg = cfg.GetVGName()
4550 results = self.rpc.call_vg_list([pri_node, new_node])
4551 for node in pri_node, new_node:
4553 if res.failed or not res.data or my_vg not in res.data:
4554 raise errors.OpExecError("Volume group '%s' not found on %s" %
4556 for idx, dev in enumerate(instance.disks):
4557 if idx not in self.op.disks:
4559 info("checking disk/%d on %s" % (idx, pri_node))
4560 cfg.SetDiskID(dev, pri_node)
4561 result = self.rpc.call_blockdev_find(pri_node, dev)
4564 raise errors.OpExecError("Can't find disk/%d on node %s" %
4567 # Step: check other node consistency
4568 self.proc.LogStep(2, steps_total, "check peer consistency")
4569 for idx, dev in enumerate(instance.disks):
4570 if idx not in self.op.disks:
4572 info("checking disk/%d consistency on %s" % (idx, pri_node))
4573 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4574 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4575 " unsafe to replace the secondary" %
4578 # Step: create new storage
4579 self.proc.LogStep(3, steps_total, "allocate new storage")
4580 for idx, dev in enumerate(instance.disks):
4582 info("adding new local storage on %s for disk/%d" %
4584 # since we *always* want to create this LV, we use the
4585 # _Create...OnPrimary (which forces the creation), even if we
4586 # are talking about the secondary node
4587 for new_lv in dev.children:
4588 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4589 _GetInstanceInfoText(instance)):
4590 raise errors.OpExecError("Failed to create new LV named '%s' on"
4592 (new_lv.logical_id[1], new_node))
4594 # Step 4: dbrd minors and drbd setups changes
4595 # after this, we must manually remove the drbd minors on both the
4596 # error and the success paths
4597 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4599 logging.debug("Allocated minors %s" % (minors,))
4600 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4601 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4603 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4604 # create new devices on new_node
4605 if pri_node == dev.logical_id[0]:
4606 new_logical_id = (pri_node, new_node,
4607 dev.logical_id[2], dev.logical_id[3], new_minor,
4610 new_logical_id = (new_node, pri_node,
4611 dev.logical_id[2], new_minor, dev.logical_id[4],
4613 iv_names[idx] = (dev, dev.children, new_logical_id)
4614 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4616 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4617 logical_id=new_logical_id,
4618 children=dev.children)
4619 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4621 _GetInstanceInfoText(instance)):
4622 self.cfg.ReleaseDRBDMinors(instance.name)
4623 raise errors.OpExecError("Failed to create new DRBD on"
4624 " node '%s'" % new_node)
4626 for idx, dev in enumerate(instance.disks):
4627 # we have new devices, shutdown the drbd on the old secondary
4628 info("shutting down drbd for disk/%d on old node" % idx)
4629 cfg.SetDiskID(dev, old_node)
4630 result = self.rpc.call_blockdev_shutdown(old_node, dev)
4631 if result.failed or not result.data:
4632 warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4633 hint="Please cleanup this device manually as soon as possible")
4635 info("detaching primary drbds from the network (=> standalone)")
4637 for idx, dev in enumerate(instance.disks):
4638 cfg.SetDiskID(dev, pri_node)
4639 # set the network part of the physical (unique in bdev terms) id
4640 # to None, meaning detach from network
4641 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4642 # and 'find' the device, which will 'fix' it to match the
4644 result = self.rpc.call_blockdev_find(pri_node, dev)
4645 if not result.failed and result.data:
4648 warning("Failed to detach drbd disk/%d from network, unusual case" %
4652 # no detaches succeeded (very unlikely)
4653 self.cfg.ReleaseDRBDMinors(instance.name)
4654 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4656 # if we managed to detach at least one, we update all the disks of
4657 # the instance to point to the new secondary
4658 info("updating instance configuration")
4659 for dev, _, new_logical_id in iv_names.itervalues():
4660 dev.logical_id = new_logical_id
4661 cfg.SetDiskID(dev, pri_node)
4662 cfg.Update(instance)
4663 # we can remove now the temp minors as now the new values are
4664 # written to the config file (and therefore stable)
4665 self.cfg.ReleaseDRBDMinors(instance.name)
4667 # and now perform the drbd attach
4668 info("attaching primary drbds to new secondary (standalone => connected)")
4670 for idx, dev in enumerate(instance.disks):
4671 info("attaching primary drbd for disk/%d to new secondary node" % idx)
4672 # since the attach is smart, it's enough to 'find' the device,
4673 # it will automatically activate the network, if the physical_id
4675 cfg.SetDiskID(dev, pri_node)
4676 logging.debug("Disk to attach: %s", dev)
4677 result = self.rpc.call_blockdev_find(pri_node, dev)
4678 if result.failed or not result.data:
4679 warning("can't attach drbd disk/%d to new secondary!" % idx,
4680 "please do a gnt-instance info to see the status of disks")
4682 # this can fail as the old devices are degraded and _WaitForSync
4683 # does a combined result over all disks, so we don't check its
4685 self.proc.LogStep(5, steps_total, "sync devices")
4686 _WaitForSync(self, instance, unlock=True)
4688 # so check manually all the devices
4689 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4690 cfg.SetDiskID(dev, pri_node)
4691 result = self.rpc.call_blockdev_find(pri_node, dev)
4694 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4696 self.proc.LogStep(6, steps_total, "removing old storage")
4697 for idx, (dev, old_lvs, _) in iv_names.iteritems():
4698 info("remove logical volumes for disk/%d" % idx)
4700 cfg.SetDiskID(lv, old_node)
4701 result = self.rpc.call_blockdev_remove(old_node, lv)
4702 if result.failed or not result.data:
4703 warning("Can't remove LV on old secondary",
4704 hint="Cleanup stale volumes by hand")
4706 def Exec(self, feedback_fn):
4707 """Execute disk replacement.
4709 This dispatches the disk replacement to the appropriate handler.
4712 instance = self.instance
4714 # Activate the instance disks if we're replacing them on a down instance
4715 if instance.status == "down":
4716 _StartInstanceDisks(self, instance, True)
4718 if instance.disk_template == constants.DT_DRBD8:
4719 if self.op.remote_node is None:
4720 fn = self._ExecD8DiskOnly
4722 fn = self._ExecD8Secondary
4724 raise errors.ProgrammerError("Unhandled disk replacement case")
4726 ret = fn(feedback_fn)
4728 # Deactivate the instance disks if we're replacing them on a down instance
4729 if instance.status == "down":
4730 _SafeShutdownInstanceDisks(self, instance)
4735 class LUGrowDisk(LogicalUnit):
4736 """Grow a disk of an instance.
4740 HTYPE = constants.HTYPE_INSTANCE
4741 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4744 def ExpandNames(self):
4745 self._ExpandAndLockInstance()
4746 self.needed_locks[locking.LEVEL_NODE] = []
4747 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4749 def DeclareLocks(self, level):
4750 if level == locking.LEVEL_NODE:
4751 self._LockInstancesNodes()
4753 def BuildHooksEnv(self):
4756 This runs on the master, the primary and all the secondaries.
4760 "DISK": self.op.disk,
4761 "AMOUNT": self.op.amount,
4763 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4765 self.cfg.GetMasterNode(),
4766 self.instance.primary_node,
4770 def CheckPrereq(self):
4771 """Check prerequisites.
4773 This checks that the instance is in the cluster.
4776 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4777 assert instance is not None, \
4778 "Cannot retrieve locked instance %s" % self.op.instance_name
4780 self.instance = instance
4782 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4783 raise errors.OpPrereqError("Instance's disk layout does not support"
4786 self.disk = instance.FindDisk(self.op.disk)
4788 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4789 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4790 instance.hypervisor)
4791 for node in nodenames:
4792 info = nodeinfo[node]
4793 if info.failed or not info.data:
4794 raise errors.OpPrereqError("Cannot get current information"
4795 " from node '%s'" % node)
4796 vg_free = info.data.get('vg_free', None)
4797 if not isinstance(vg_free, int):
4798 raise errors.OpPrereqError("Can't compute free disk space on"
4800 if self.op.amount > vg_free:
4801 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4802 " %d MiB available, %d MiB required" %
4803 (node, vg_free, self.op.amount))
4805 def Exec(self, feedback_fn):
4806 """Execute disk grow.
4809 instance = self.instance
4811 for node in (instance.secondary_nodes + (instance.primary_node,)):
4812 self.cfg.SetDiskID(disk, node)
4813 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4815 if (not result.data or not isinstance(result.data, (list, tuple)) or
4816 len(result.data) != 2):
4817 raise errors.OpExecError("Grow request failed to node %s" % node)
4818 elif not result.data[0]:
4819 raise errors.OpExecError("Grow request failed to node %s: %s" %
4820 (node, result.data[1]))
4821 disk.RecordGrow(self.op.amount)
4822 self.cfg.Update(instance)
4823 if self.op.wait_for_sync:
4824 disk_abort = not _WaitForSync(self, instance)
4826 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4827 " status.\nPlease check the instance.")
4830 class LUQueryInstanceData(NoHooksLU):
4831 """Query runtime instance data.
4834 _OP_REQP = ["instances", "static"]
4837 def ExpandNames(self):
4838 self.needed_locks = {}
4839 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4841 if not isinstance(self.op.instances, list):
4842 raise errors.OpPrereqError("Invalid argument type 'instances'")
4844 if self.op.instances:
4845 self.wanted_names = []
4846 for name in self.op.instances:
4847 full_name = self.cfg.ExpandInstanceName(name)
4848 if full_name is None:
4849 raise errors.OpPrereqError("Instance '%s' not known" %
4850 self.op.instance_name)
4851 self.wanted_names.append(full_name)
4852 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4854 self.wanted_names = None
4855 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4857 self.needed_locks[locking.LEVEL_NODE] = []
4858 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4860 def DeclareLocks(self, level):
4861 if level == locking.LEVEL_NODE:
4862 self._LockInstancesNodes()
4864 def CheckPrereq(self):
4865 """Check prerequisites.
4867 This only checks the optional instance list against the existing names.
4870 if self.wanted_names is None:
4871 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4873 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4874 in self.wanted_names]
4877 def _ComputeDiskStatus(self, instance, snode, dev):
4878 """Compute block device status.
4881 static = self.op.static
4883 self.cfg.SetDiskID(dev, instance.primary_node)
4884 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4886 dev_pstatus = dev_pstatus.data
4890 if dev.dev_type in constants.LDS_DRBD:
4891 # we change the snode then (otherwise we use the one passed in)
4892 if dev.logical_id[0] == instance.primary_node:
4893 snode = dev.logical_id[1]
4895 snode = dev.logical_id[0]
4897 if snode and not static:
4898 self.cfg.SetDiskID(dev, snode)
4899 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4901 dev_sstatus = dev_sstatus.data
4906 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4907 for child in dev.children]
4912 "iv_name": dev.iv_name,
4913 "dev_type": dev.dev_type,
4914 "logical_id": dev.logical_id,
4915 "physical_id": dev.physical_id,
4916 "pstatus": dev_pstatus,
4917 "sstatus": dev_sstatus,
4918 "children": dev_children,
4924 def Exec(self, feedback_fn):
4925 """Gather and return data"""
4928 cluster = self.cfg.GetClusterInfo()
4930 for instance in self.wanted_instances:
4931 if not self.op.static:
4932 remote_info = self.rpc.call_instance_info(instance.primary_node,
4934 instance.hypervisor)
4936 remote_info = remote_info.data
4937 if remote_info and "state" in remote_info:
4940 remote_state = "down"
4943 if instance.status == "down":
4944 config_state = "down"
4948 disks = [self._ComputeDiskStatus(instance, None, device)
4949 for device in instance.disks]
4952 "name": instance.name,
4953 "config_state": config_state,
4954 "run_state": remote_state,
4955 "pnode": instance.primary_node,
4956 "snodes": instance.secondary_nodes,
4958 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4960 "hypervisor": instance.hypervisor,
4961 "network_port": instance.network_port,
4962 "hv_instance": instance.hvparams,
4963 "hv_actual": cluster.FillHV(instance),
4964 "be_instance": instance.beparams,
4965 "be_actual": cluster.FillBE(instance),
4968 result[instance.name] = idict
4973 class LUSetInstanceParams(LogicalUnit):
4974 """Modifies an instances's parameters.
4977 HPATH = "instance-modify"
4978 HTYPE = constants.HTYPE_INSTANCE
4979 _OP_REQP = ["instance_name"]
4982 def CheckArguments(self):
4983 if not hasattr(self.op, 'nics'):
4985 if not hasattr(self.op, 'disks'):
4987 if not hasattr(self.op, 'beparams'):
4988 self.op.beparams = {}
4989 if not hasattr(self.op, 'hvparams'):
4990 self.op.hvparams = {}
4991 self.op.force = getattr(self.op, "force", False)
4992 if not (self.op.nics or self.op.disks or
4993 self.op.hvparams or self.op.beparams):
4994 raise errors.OpPrereqError("No changes submitted")
4996 utils.CheckBEParams(self.op.beparams)
5000 for disk_op, disk_dict in self.op.disks:
5001 if disk_op == constants.DDM_REMOVE:
5004 elif disk_op == constants.DDM_ADD:
5007 if not isinstance(disk_op, int):
5008 raise errors.OpPrereqError("Invalid disk index")
5009 if disk_op == constants.DDM_ADD:
5010 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5011 if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
5012 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5013 size = disk_dict.get('size', None)
5015 raise errors.OpPrereqError("Required disk parameter size missing")
5018 except ValueError, err:
5019 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5021 disk_dict['size'] = size
5023 # modification of disk
5024 if 'size' in disk_dict:
5025 raise errors.OpPrereqError("Disk size change not possible, use"
5028 if disk_addremove > 1:
5029 raise errors.OpPrereqError("Only one disk add or remove operation"
5030 " supported at a time")
5034 for nic_op, nic_dict in self.op.nics:
5035 if nic_op == constants.DDM_REMOVE:
5038 elif nic_op == constants.DDM_ADD:
5041 if not isinstance(nic_op, int):
5042 raise errors.OpPrereqError("Invalid nic index")
5044 # nic_dict should be a dict
5045 nic_ip = nic_dict.get('ip', None)
5046 if nic_ip is not None:
5047 if nic_ip.lower() == "none":
5048 nic_dict['ip'] = None
5050 if not utils.IsValidIP(nic_ip):
5051 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5052 # we can only check None bridges and assign the default one
5053 nic_bridge = nic_dict.get('bridge', None)
5054 if nic_bridge is None:
5055 nic_dict['bridge'] = self.cfg.GetDefBridge()
5056 # but we can validate MACs
5057 nic_mac = nic_dict.get('mac', None)
5058 if nic_mac is not None:
5059 if self.cfg.IsMacInUse(nic_mac):
5060 raise errors.OpPrereqError("MAC address %s already in use"
5061 " in cluster" % nic_mac)
5062 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5063 if not utils.IsValidMac(nic_mac):
5064 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5065 if nic_addremove > 1:
5066 raise errors.OpPrereqError("Only one NIC add or remove operation"
5067 " supported at a time")
5069 def ExpandNames(self):
5070 self._ExpandAndLockInstance()
5071 self.needed_locks[locking.LEVEL_NODE] = []
5072 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5074 def DeclareLocks(self, level):
5075 if level == locking.LEVEL_NODE:
5076 self._LockInstancesNodes()
5078 def BuildHooksEnv(self):
5081 This runs on the master, primary and secondaries.
5085 if constants.BE_MEMORY in self.be_new:
5086 args['memory'] = self.be_new[constants.BE_MEMORY]
5087 if constants.BE_VCPUS in self.be_new:
5088 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5089 # FIXME: readd disk/nic changes
5090 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5091 nl = [self.cfg.GetMasterNode(),
5092 self.instance.primary_node] + list(self.instance.secondary_nodes)
5095 def CheckPrereq(self):
5096 """Check prerequisites.
5098 This only checks the instance list against the existing names.
5101 force = self.force = self.op.force
5103 # checking the new params on the primary/secondary nodes
5105 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5106 assert self.instance is not None, \
5107 "Cannot retrieve locked instance %s" % self.op.instance_name
5108 pnode = self.instance.primary_node
5110 nodelist.extend(instance.secondary_nodes)
5112 # hvparams processing
5113 if self.op.hvparams:
5114 i_hvdict = copy.deepcopy(instance.hvparams)
5115 for key, val in self.op.hvparams.iteritems():
5116 if val == constants.VALUE_DEFAULT:
5121 elif val == constants.VALUE_NONE:
5122 i_hvdict[key] = None
5125 cluster = self.cfg.GetClusterInfo()
5126 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5129 hypervisor.GetHypervisor(
5130 instance.hypervisor).CheckParameterSyntax(hv_new)
5131 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5132 self.hv_new = hv_new # the new actual values
5133 self.hv_inst = i_hvdict # the new dict (without defaults)
5135 self.hv_new = self.hv_inst = {}
5137 # beparams processing
5138 if self.op.beparams:
5139 i_bedict = copy.deepcopy(instance.beparams)
5140 for key, val in self.op.beparams.iteritems():
5141 if val == constants.VALUE_DEFAULT:
5148 cluster = self.cfg.GetClusterInfo()
5149 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5151 self.be_new = be_new # the new actual values
5152 self.be_inst = i_bedict # the new dict (without defaults)
5154 self.be_new = self.be_inst = {}
5158 if constants.BE_MEMORY in self.op.beparams and not self.force:
5159 mem_check_list = [pnode]
5160 if be_new[constants.BE_AUTO_BALANCE]:
5161 # either we changed auto_balance to yes or it was from before
5162 mem_check_list.extend(instance.secondary_nodes)
5163 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5164 instance.hypervisor)
5165 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5166 instance.hypervisor)
5167 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5168 # Assume the primary node is unreachable and go ahead
5169 self.warn.append("Can't get info from primary node %s" % pnode)
5171 if not instance_info.failed and instance_info.data:
5172 current_mem = instance_info.data['memory']
5174 # Assume instance not running
5175 # (there is a slight race condition here, but it's not very probable,
5176 # and we have no other way to check)
5178 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5179 nodeinfo[pnode].data['memory_free'])
5181 raise errors.OpPrereqError("This change will prevent the instance"
5182 " from starting, due to %d MB of memory"
5183 " missing on its primary node" % miss_mem)
5185 if be_new[constants.BE_AUTO_BALANCE]:
5186 for node, nres in instance.secondary_nodes.iteritems():
5187 if nres.failed or not isinstance(nres.data, dict):
5188 self.warn.append("Can't get info from secondary node %s" % node)
5189 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5190 self.warn.append("Not enough memory to failover instance to"
5191 " secondary node %s" % node)
5194 for nic_op, nic_dict in self.op.nics:
5195 if nic_op == constants.DDM_REMOVE:
5196 if not instance.nics:
5197 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5199 if nic_op != constants.DDM_ADD:
5201 if nic_op < 0 or nic_op >= len(instance.nics):
5202 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5204 (nic_op, len(instance.nics)))
5205 nic_bridge = nic_dict.get('bridge', None)
5206 if nic_bridge is not None:
5207 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5208 msg = ("Bridge '%s' doesn't exist on one of"
5209 " the instance nodes" % nic_bridge)
5211 self.warn.append(msg)
5213 raise errors.OpPrereqError(msg)
5216 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5217 raise errors.OpPrereqError("Disk operations not supported for"
5218 " diskless instances")
5219 for disk_op, disk_dict in self.op.disks:
5220 if disk_op == constants.DDM_REMOVE:
5221 if len(instance.disks) == 1:
5222 raise errors.OpPrereqError("Cannot remove the last disk of"
5224 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5225 ins_l = ins_l[pnode]
5226 if not type(ins_l) is list:
5227 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5228 if instance.name in ins_l:
5229 raise errors.OpPrereqError("Instance is running, can't remove"
5232 if (disk_op == constants.DDM_ADD and
5233 len(instance.nics) >= constants.MAX_DISKS):
5234 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5235 " add more" % constants.MAX_DISKS)
5236 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5238 if disk_op < 0 or disk_op >= len(instance.disks):
5239 raise errors.OpPrereqError("Invalid disk index %s, valid values"
5241 (disk_op, len(instance.disks)))
5245 def Exec(self, feedback_fn):
5246 """Modifies an instance.
5248 All parameters take effect only at the next restart of the instance.
5251 # Process here the warnings from CheckPrereq, as we don't have a
5252 # feedback_fn there.
5253 for warn in self.warn:
5254 feedback_fn("WARNING: %s" % warn)
5257 instance = self.instance
5259 for disk_op, disk_dict in self.op.disks:
5260 if disk_op == constants.DDM_REMOVE:
5261 # remove the last disk
5262 device = instance.disks.pop()
5263 device_idx = len(instance.disks)
5264 for node, disk in device.ComputeNodeTree(instance.primary_node):
5265 self.cfg.SetDiskID(disk, node)
5266 result = self.rpc.call_blockdev_remove(node, disk)
5267 if result.failed or not result.data:
5268 self.proc.LogWarning("Could not remove disk/%d on node %s,"
5269 " continuing anyway", device_idx, node)
5270 result.append(("disk/%d" % device_idx, "remove"))
5271 elif disk_op == constants.DDM_ADD:
5273 if instance.disk_template == constants.DT_FILE:
5274 file_driver, file_path = instance.disks[0].logical_id
5275 file_path = os.path.dirname(file_path)
5277 file_driver = file_path = None
5278 disk_idx_base = len(instance.disks)
5279 new_disk = _GenerateDiskTemplate(self,
5280 instance.disk_template,
5281 instance, instance.primary_node,
5282 instance.secondary_nodes,
5287 new_disk.mode = disk_dict['mode']
5288 instance.disks.append(new_disk)
5289 info = _GetInstanceInfoText(instance)
5291 logging.info("Creating volume %s for instance %s",
5292 new_disk.iv_name, instance.name)
5293 # Note: this needs to be kept in sync with _CreateDisks
5295 for secondary_node in instance.secondary_nodes:
5296 if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
5297 new_disk, False, info):
5298 self.LogWarning("Failed to create volume %s (%s) on"
5299 " secondary node %s!",
5300 new_disk.iv_name, new_disk, secondary_node)
5302 if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5303 instance, new_disk, info):
5304 self.LogWarning("Failed to create volume %s on primary!",
5306 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5307 (new_disk.size, new_disk.mode)))
5309 # change a given disk
5310 instance.disks[disk_op].mode = disk_dict['mode']
5311 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5313 for nic_op, nic_dict in self.op.nics:
5314 if nic_op == constants.DDM_REMOVE:
5315 # remove the last nic
5316 del instance.nics[-1]
5317 result.append(("nic.%d" % len(instance.nics), "remove"))
5318 elif nic_op == constants.DDM_ADD:
5320 if 'mac' not in nic_dict:
5321 mac = constants.VALUE_GENERATE
5323 mac = nic_dict['mac']
5324 if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5325 mac = self.cfg.GenerateMAC()
5326 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5327 bridge=nic_dict.get('bridge', None))
5328 instance.nics.append(new_nic)
5329 result.append(("nic.%d" % (len(instance.nics) - 1),
5330 "add:mac=%s,ip=%s,bridge=%s" %
5331 (new_nic.mac, new_nic.ip, new_nic.bridge)))
5333 # change a given nic
5334 for key in 'mac', 'ip', 'bridge':
5336 setattr(instance.nics[nic_op], key, nic_dict[key])
5337 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5340 if self.op.hvparams:
5341 instance.hvparams = self.hv_new
5342 for key, val in self.op.hvparams.iteritems():
5343 result.append(("hv/%s" % key, val))
5346 if self.op.beparams:
5347 instance.beparams = self.be_inst
5348 for key, val in self.op.beparams.iteritems():
5349 result.append(("be/%s" % key, val))
5351 self.cfg.Update(instance)
5356 class LUQueryExports(NoHooksLU):
5357 """Query the exports list
5360 _OP_REQP = ['nodes']
5363 def ExpandNames(self):
5364 self.needed_locks = {}
5365 self.share_locks[locking.LEVEL_NODE] = 1
5366 if not self.op.nodes:
5367 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5369 self.needed_locks[locking.LEVEL_NODE] = \
5370 _GetWantedNodes(self, self.op.nodes)
5372 def CheckPrereq(self):
5373 """Check prerequisites.
5376 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5378 def Exec(self, feedback_fn):
5379 """Compute the list of all the exported system images.
5382 @return: a dictionary with the structure node->(export-list)
5383 where export-list is a list of the instances exported on
5387 rpcresult = self.rpc.call_export_list(self.nodes)
5389 for node in rpcresult:
5390 if rpcresult[node].failed:
5391 result[node] = False
5393 result[node] = rpcresult[node].data
5398 class LUExportInstance(LogicalUnit):
5399 """Export an instance to an image in the cluster.
5402 HPATH = "instance-export"
5403 HTYPE = constants.HTYPE_INSTANCE
5404 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5407 def ExpandNames(self):
5408 self._ExpandAndLockInstance()
5409 # FIXME: lock only instance primary and destination node
5411 # Sad but true, for now we have do lock all nodes, as we don't know where
5412 # the previous export might be, and and in this LU we search for it and
5413 # remove it from its current node. In the future we could fix this by:
5414 # - making a tasklet to search (share-lock all), then create the new one,
5415 # then one to remove, after
5416 # - removing the removal operation altoghether
5417 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5419 def DeclareLocks(self, level):
5420 """Last minute lock declaration."""
5421 # All nodes are locked anyway, so nothing to do here.
5423 def BuildHooksEnv(self):
5426 This will run on the master, primary node and target node.
5430 "EXPORT_NODE": self.op.target_node,
5431 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5433 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5434 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5435 self.op.target_node]
5438 def CheckPrereq(self):
5439 """Check prerequisites.
5441 This checks that the instance and node names are valid.
5444 instance_name = self.op.instance_name
5445 self.instance = self.cfg.GetInstanceInfo(instance_name)
5446 assert self.instance is not None, \
5447 "Cannot retrieve locked instance %s" % self.op.instance_name
5449 self.dst_node = self.cfg.GetNodeInfo(
5450 self.cfg.ExpandNodeName(self.op.target_node))
5452 if self.dst_node is None:
5453 # This is wrong node name, not a non-locked node
5454 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
5456 # instance disk type verification
5457 for disk in self.instance.disks:
5458 if disk.dev_type == constants.LD_FILE:
5459 raise errors.OpPrereqError("Export not supported for instances with"
5460 " file-based disks")
5462 def Exec(self, feedback_fn):
5463 """Export an instance to an image in the cluster.
5466 instance = self.instance
5467 dst_node = self.dst_node
5468 src_node = instance.primary_node
5469 if self.op.shutdown:
5470 # shutdown the instance, but not the disks
5471 result = self.rpc.call_instance_shutdown(src_node, instance)
5474 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5475 (instance.name, src_node))
5477 vgname = self.cfg.GetVGName()
5482 for disk in instance.disks:
5483 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5484 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5485 if new_dev_name.failed or not new_dev_name.data:
5486 self.LogWarning("Could not snapshot block device %s on node %s",
5487 disk.logical_id[1], src_node)
5488 snap_disks.append(False)
5490 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5491 logical_id=(vgname, new_dev_name.data),
5492 physical_id=(vgname, new_dev_name.data),
5493 iv_name=disk.iv_name)
5494 snap_disks.append(new_dev)
5497 if self.op.shutdown and instance.status == "up":
5498 result = self.rpc.call_instance_start(src_node, instance, None)
5499 if result.failed or not result.data:
5500 _ShutdownInstanceDisks(self, instance)
5501 raise errors.OpExecError("Could not start instance")
5503 # TODO: check for size
5505 cluster_name = self.cfg.GetClusterName()
5506 for idx, dev in enumerate(snap_disks):
5508 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5509 instance, cluster_name, idx)
5510 if result.failed or not result.data:
5511 self.LogWarning("Could not export block device %s from node %s to"
5512 " node %s", dev.logical_id[1], src_node,
5514 result = self.rpc.call_blockdev_remove(src_node, dev)
5515 if result.failed or not result.data:
5516 self.LogWarning("Could not remove snapshot block device %s from node"
5517 " %s", dev.logical_id[1], src_node)
5519 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
5520 if result.failed or not result.data:
5521 self.LogWarning("Could not finalize export for instance %s on node %s",
5522 instance.name, dst_node.name)
5524 nodelist = self.cfg.GetNodeList()
5525 nodelist.remove(dst_node.name)
5527 # on one-node clusters nodelist will be empty after the removal
5528 # if we proceed the backup would be removed because OpQueryExports
5529 # substitutes an empty list with the full cluster node list.
5531 exportlist = self.rpc.call_export_list(nodelist)
5532 for node in exportlist:
5533 if exportlist[node].failed:
5535 if instance.name in exportlist[node].data:
5536 if not self.rpc.call_export_remove(node, instance.name):
5537 self.LogWarning("Could not remove older export for instance %s"
5538 " on node %s", instance.name, node)
5541 class LURemoveExport(NoHooksLU):
5542 """Remove exports related to the named instance.
5545 _OP_REQP = ["instance_name"]
5548 def ExpandNames(self):
5549 self.needed_locks = {}
5550 # We need all nodes to be locked in order for RemoveExport to work, but we
5551 # don't need to lock the instance itself, as nothing will happen to it (and
5552 # we can remove exports also for a removed instance)
5553 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5555 def CheckPrereq(self):
5556 """Check prerequisites.
5560 def Exec(self, feedback_fn):
5561 """Remove any export.
5564 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5565 # If the instance was not found we'll try with the name that was passed in.
5566 # This will only work if it was an FQDN, though.
5568 if not instance_name:
5570 instance_name = self.op.instance_name
5572 exportlist = self.rpc.call_export_list(self.acquired_locks[
5573 locking.LEVEL_NODE])
5575 for node in exportlist:
5576 if exportlist[node].failed:
5577 self.LogWarning("Failed to query node %s, continuing" % node)
5579 if instance_name in exportlist[node].data:
5581 result = self.rpc.call_export_remove(node, instance_name)
5582 if result.failed or not result.data:
5583 logging.error("Could not remove export for instance %s"
5584 " on node %s", instance_name, node)
5586 if fqdn_warn and not found:
5587 feedback_fn("Export not found. If trying to remove an export belonging"
5588 " to a deleted instance please use its Fully Qualified"
5592 class TagsLU(NoHooksLU):
5595 This is an abstract class which is the parent of all the other tags LUs.
5599 def ExpandNames(self):
5600 self.needed_locks = {}
5601 if self.op.kind == constants.TAG_NODE:
5602 name = self.cfg.ExpandNodeName(self.op.name)
5604 raise errors.OpPrereqError("Invalid node name (%s)" %
5607 self.needed_locks[locking.LEVEL_NODE] = name
5608 elif self.op.kind == constants.TAG_INSTANCE:
5609 name = self.cfg.ExpandInstanceName(self.op.name)
5611 raise errors.OpPrereqError("Invalid instance name (%s)" %
5614 self.needed_locks[locking.LEVEL_INSTANCE] = name
5616 def CheckPrereq(self):
5617 """Check prerequisites.
5620 if self.op.kind == constants.TAG_CLUSTER:
5621 self.target = self.cfg.GetClusterInfo()
5622 elif self.op.kind == constants.TAG_NODE:
5623 self.target = self.cfg.GetNodeInfo(self.op.name)
5624 elif self.op.kind == constants.TAG_INSTANCE:
5625 self.target = self.cfg.GetInstanceInfo(self.op.name)
5627 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5631 class LUGetTags(TagsLU):
5632 """Returns the tags of a given object.
5635 _OP_REQP = ["kind", "name"]
5638 def Exec(self, feedback_fn):
5639 """Returns the tag list.
5642 return list(self.target.GetTags())
5645 class LUSearchTags(NoHooksLU):
5646 """Searches the tags for a given pattern.
5649 _OP_REQP = ["pattern"]
5652 def ExpandNames(self):
5653 self.needed_locks = {}
5655 def CheckPrereq(self):
5656 """Check prerequisites.
5658 This checks the pattern passed for validity by compiling it.
5662 self.re = re.compile(self.op.pattern)
5663 except re.error, err:
5664 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5665 (self.op.pattern, err))
5667 def Exec(self, feedback_fn):
5668 """Returns the tag list.
5672 tgts = [("/cluster", cfg.GetClusterInfo())]
5673 ilist = cfg.GetAllInstancesInfo().values()
5674 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5675 nlist = cfg.GetAllNodesInfo().values()
5676 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5678 for path, target in tgts:
5679 for tag in target.GetTags():
5680 if self.re.search(tag):
5681 results.append((path, tag))
5685 class LUAddTags(TagsLU):
5686 """Sets a tag on a given object.
5689 _OP_REQP = ["kind", "name", "tags"]
5692 def CheckPrereq(self):
5693 """Check prerequisites.
5695 This checks the type and length of the tag name and value.
5698 TagsLU.CheckPrereq(self)
5699 for tag in self.op.tags:
5700 objects.TaggableObject.ValidateTag(tag)
5702 def Exec(self, feedback_fn):
5707 for tag in self.op.tags:
5708 self.target.AddTag(tag)
5709 except errors.TagError, err:
5710 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5712 self.cfg.Update(self.target)
5713 except errors.ConfigurationError:
5714 raise errors.OpRetryError("There has been a modification to the"
5715 " config file and the operation has been"
5716 " aborted. Please retry.")
5719 class LUDelTags(TagsLU):
5720 """Delete a list of tags from a given object.
5723 _OP_REQP = ["kind", "name", "tags"]
5726 def CheckPrereq(self):
5727 """Check prerequisites.
5729 This checks that we have the given tag.
5732 TagsLU.CheckPrereq(self)
5733 for tag in self.op.tags:
5734 objects.TaggableObject.ValidateTag(tag)
5735 del_tags = frozenset(self.op.tags)
5736 cur_tags = self.target.GetTags()
5737 if not del_tags <= cur_tags:
5738 diff_tags = del_tags - cur_tags
5739 diff_names = ["'%s'" % tag for tag in diff_tags]
5741 raise errors.OpPrereqError("Tag(s) %s not found" %
5742 (",".join(diff_names)))
5744 def Exec(self, feedback_fn):
5745 """Remove the tag from the object.
5748 for tag in self.op.tags:
5749 self.target.RemoveTag(tag)
5751 self.cfg.Update(self.target)
5752 except errors.ConfigurationError:
5753 raise errors.OpRetryError("There has been a modification to the"
5754 " config file and the operation has been"
5755 " aborted. Please retry.")
5758 class LUTestDelay(NoHooksLU):
5759 """Sleep for a specified amount of time.
5761 This LU sleeps on the master and/or nodes for a specified amount of
5765 _OP_REQP = ["duration", "on_master", "on_nodes"]
5768 def ExpandNames(self):
5769 """Expand names and set required locks.
5771 This expands the node list, if any.
5774 self.needed_locks = {}
5775 if self.op.on_nodes:
5776 # _GetWantedNodes can be used here, but is not always appropriate to use
5777 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5779 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5780 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5782 def CheckPrereq(self):
5783 """Check prerequisites.
5787 def Exec(self, feedback_fn):
5788 """Do the actual sleep.
5791 if self.op.on_master:
5792 if not utils.TestDelay(self.op.duration):
5793 raise errors.OpExecError("Error during master delay test")
5794 if self.op.on_nodes:
5795 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5797 raise errors.OpExecError("Complete failure from rpc call")
5798 for node, node_result in result.items():
5800 if not node_result.data:
5801 raise errors.OpExecError("Failure during rpc call to node %s,"
5802 " result: %s" % (node, node_result.data))
5805 class IAllocator(object):
5806 """IAllocator framework.
5808 An IAllocator instance has three sets of attributes:
5809 - cfg that is needed to query the cluster
5810 - input data (all members of the _KEYS class attribute are required)
5811 - four buffer attributes (in|out_data|text), that represent the
5812 input (to the external script) in text and data structure format,
5813 and the output from it, again in two formats
5814 - the result variables from the script (success, info, nodes) for
5819 "mem_size", "disks", "disk_template",
5820 "os", "tags", "nics", "vcpus", "hypervisor",
5826 def __init__(self, lu, mode, name, **kwargs):
5828 # init buffer variables
5829 self.in_text = self.out_text = self.in_data = self.out_data = None
5830 # init all input fields so that pylint is happy
5833 self.mem_size = self.disks = self.disk_template = None
5834 self.os = self.tags = self.nics = self.vcpus = None
5835 self.relocate_from = None
5837 self.required_nodes = None
5838 # init result fields
5839 self.success = self.info = self.nodes = None
5840 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5841 keyset = self._ALLO_KEYS
5842 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5843 keyset = self._RELO_KEYS
5845 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5846 " IAllocator" % self.mode)
5848 if key not in keyset:
5849 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5850 " IAllocator" % key)
5851 setattr(self, key, kwargs[key])
5853 if key not in kwargs:
5854 raise errors.ProgrammerError("Missing input parameter '%s' to"
5855 " IAllocator" % key)
5856 self._BuildInputData()
5858 def _ComputeClusterData(self):
5859 """Compute the generic allocator input data.
5861 This is the data that is independent of the actual operation.
5865 cluster_info = cfg.GetClusterInfo()
5869 "cluster_name": cfg.GetClusterName(),
5870 "cluster_tags": list(cluster_info.GetTags()),
5871 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5872 # we don't have job IDs
5874 iinfo = cfg.GetAllInstancesInfo().values()
5875 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5879 node_list = cfg.GetNodeList()
5881 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5882 hypervisor = self.hypervisor
5883 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5884 hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5886 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5888 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5889 cluster_info.enabled_hypervisors)
5890 for nname in node_list:
5891 ninfo = cfg.GetNodeInfo(nname)
5892 node_data[nname].Raise()
5893 if not isinstance(node_data[nname].data, dict):
5894 raise errors.OpExecError("Can't get data for node %s" % nname)
5895 remote_info = node_data[nname].data
5896 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5897 'vg_size', 'vg_free', 'cpu_total']:
5898 if attr not in remote_info:
5899 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5902 remote_info[attr] = int(remote_info[attr])
5903 except ValueError, err:
5904 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5905 " %s" % (nname, attr, str(err)))
5906 # compute memory used by primary instances
5907 i_p_mem = i_p_up_mem = 0
5908 for iinfo, beinfo in i_list:
5909 if iinfo.primary_node == nname:
5910 i_p_mem += beinfo[constants.BE_MEMORY]
5911 if iinfo.name not in node_iinfo[nname]:
5914 i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5915 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5916 remote_info['memory_free'] -= max(0, i_mem_diff)
5918 if iinfo.status == "up":
5919 i_p_up_mem += beinfo[constants.BE_MEMORY]
5921 # compute memory used by instances
5923 "tags": list(ninfo.GetTags()),
5924 "total_memory": remote_info['memory_total'],
5925 "reserved_memory": remote_info['memory_dom0'],
5926 "free_memory": remote_info['memory_free'],
5927 "i_pri_memory": i_p_mem,
5928 "i_pri_up_memory": i_p_up_mem,
5929 "total_disk": remote_info['vg_size'],
5930 "free_disk": remote_info['vg_free'],
5931 "primary_ip": ninfo.primary_ip,
5932 "secondary_ip": ninfo.secondary_ip,
5933 "total_cpus": remote_info['cpu_total'],
5934 "offline": ninfo.offline,
5936 node_results[nname] = pnr
5937 data["nodes"] = node_results
5941 for iinfo, beinfo in i_list:
5942 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5943 for n in iinfo.nics]
5945 "tags": list(iinfo.GetTags()),
5946 "should_run": iinfo.status == "up",
5947 "vcpus": beinfo[constants.BE_VCPUS],
5948 "memory": beinfo[constants.BE_MEMORY],
5950 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5952 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5953 "disk_template": iinfo.disk_template,
5954 "hypervisor": iinfo.hypervisor,
5956 instance_data[iinfo.name] = pir
5958 data["instances"] = instance_data
5962 def _AddNewInstance(self):
5963 """Add new instance data to allocator structure.
5965 This in combination with _AllocatorGetClusterData will create the
5966 correct structure needed as input for the allocator.
5968 The checks for the completeness of the opcode must have already been
5973 if len(self.disks) != 2:
5974 raise errors.OpExecError("Only two-disk configurations supported")
5976 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5978 if self.disk_template in constants.DTS_NET_MIRROR:
5979 self.required_nodes = 2
5981 self.required_nodes = 1
5985 "disk_template": self.disk_template,
5988 "vcpus": self.vcpus,
5989 "memory": self.mem_size,
5990 "disks": self.disks,
5991 "disk_space_total": disk_space,
5993 "required_nodes": self.required_nodes,
5995 data["request"] = request
5997 def _AddRelocateInstance(self):
5998 """Add relocate instance data to allocator structure.
6000 This in combination with _IAllocatorGetClusterData will create the
6001 correct structure needed as input for the allocator.
6003 The checks for the completeness of the opcode must have already been
6007 instance = self.lu.cfg.GetInstanceInfo(self.name)
6008 if instance is None:
6009 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6010 " IAllocator" % self.name)
6012 if instance.disk_template not in constants.DTS_NET_MIRROR:
6013 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6015 if len(instance.secondary_nodes) != 1:
6016 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6018 self.required_nodes = 1
6019 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6020 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6025 "disk_space_total": disk_space,
6026 "required_nodes": self.required_nodes,
6027 "relocate_from": self.relocate_from,
6029 self.in_data["request"] = request
6031 def _BuildInputData(self):
6032 """Build input data structures.
6035 self._ComputeClusterData()
6037 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6038 self._AddNewInstance()
6040 self._AddRelocateInstance()
6042 self.in_text = serializer.Dump(self.in_data)
6044 def Run(self, name, validate=True, call_fn=None):
6045 """Run an instance allocator and return the results.
6049 call_fn = self.lu.rpc.call_iallocator_runner
6052 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6055 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6056 raise errors.OpExecError("Invalid result from master iallocator runner")
6058 rcode, stdout, stderr, fail = result.data
6060 if rcode == constants.IARUN_NOTFOUND:
6061 raise errors.OpExecError("Can't find allocator '%s'" % name)
6062 elif rcode == constants.IARUN_FAILURE:
6063 raise errors.OpExecError("Instance allocator call failed: %s,"
6064 " output: %s" % (fail, stdout+stderr))
6065 self.out_text = stdout
6067 self._ValidateResult()
6069 def _ValidateResult(self):
6070 """Process the allocator results.
6072 This will process and if successful save the result in
6073 self.out_data and the other parameters.
6077 rdict = serializer.Load(self.out_text)
6078 except Exception, err:
6079 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6081 if not isinstance(rdict, dict):
6082 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6084 for key in "success", "info", "nodes":
6085 if key not in rdict:
6086 raise errors.OpExecError("Can't parse iallocator results:"
6087 " missing key '%s'" % key)
6088 setattr(self, key, rdict[key])
6090 if not isinstance(rdict["nodes"], list):
6091 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6093 self.out_data = rdict
6096 class LUTestAllocator(NoHooksLU):
6097 """Run allocator tests.
6099 This LU runs the allocator tests
6102 _OP_REQP = ["direction", "mode", "name"]
6104 def CheckPrereq(self):
6105 """Check prerequisites.
6107 This checks the opcode parameters depending on the director and mode test.
6110 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6111 for attr in ["name", "mem_size", "disks", "disk_template",
6112 "os", "tags", "nics", "vcpus"]:
6113 if not hasattr(self.op, attr):
6114 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6116 iname = self.cfg.ExpandInstanceName(self.op.name)
6117 if iname is not None:
6118 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6120 if not isinstance(self.op.nics, list):
6121 raise errors.OpPrereqError("Invalid parameter 'nics'")
6122 for row in self.op.nics:
6123 if (not isinstance(row, dict) or
6126 "bridge" not in row):
6127 raise errors.OpPrereqError("Invalid contents of the"
6128 " 'nics' parameter")
6129 if not isinstance(self.op.disks, list):
6130 raise errors.OpPrereqError("Invalid parameter 'disks'")
6131 if len(self.op.disks) != 2:
6132 raise errors.OpPrereqError("Only two-disk configurations supported")
6133 for row in self.op.disks:
6134 if (not isinstance(row, dict) or
6135 "size" not in row or
6136 not isinstance(row["size"], int) or
6137 "mode" not in row or
6138 row["mode"] not in ['r', 'w']):
6139 raise errors.OpPrereqError("Invalid contents of the"
6140 " 'disks' parameter")
6141 if self.op.hypervisor is None:
6142 self.op.hypervisor = self.cfg.GetHypervisorType()
6143 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6144 if not hasattr(self.op, "name"):
6145 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6146 fname = self.cfg.ExpandInstanceName(self.op.name)
6148 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6150 self.op.name = fname
6151 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6153 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6156 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6157 if not hasattr(self.op, "allocator") or self.op.allocator is None:
6158 raise errors.OpPrereqError("Missing allocator name")
6159 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6160 raise errors.OpPrereqError("Wrong allocator test '%s'" %
6163 def Exec(self, feedback_fn):
6164 """Run the allocator test.
6167 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6168 ial = IAllocator(self,
6171 mem_size=self.op.mem_size,
6172 disks=self.op.disks,
6173 disk_template=self.op.disk_template,
6177 vcpus=self.op.vcpus,
6178 hypervisor=self.op.hypervisor,
6181 ial = IAllocator(self,
6184 relocate_from=list(self.relocate_from),
6187 if self.op.direction == constants.IALLOCATOR_DIR_IN:
6188 result = ial.in_text
6190 ial.Run(self.op.allocator, validate=False)
6191 result = ial.out_text