4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 for attr_name in self._OP_REQP:
94 attr_val = getattr(op, attr_name, None)
96 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108 ssh = property(fget=__GetSSH)
110 def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments.
113 This method is for doing a simple syntactic check and ensure
114 validity of opcode parameters, without any cluster-related
115 checks. While the same can be accomplished in ExpandNames and/or
116 CheckPrereq, doing these separate is better because:
118 - ExpandNames is left as as purely a lock-related function
119 - CheckPrereq is run after we have aquired locks (and possible
122 The function is allowed to change the self.op attribute so that
123 later methods can no longer worry about missing parameters.
128 def ExpandNames(self):
129 """Expand names for this LU.
131 This method is called before starting to execute the opcode, and it should
132 update all the parameters of the opcode to their canonical form (e.g. a
133 short node name must be fully expanded after this method has successfully
134 completed). This way locking, hooks, logging, ecc. can work correctly.
136 LUs which implement this method must also populate the self.needed_locks
137 member, as a dict with lock levels as keys, and a list of needed lock names
140 - use an empty dict if you don't need any lock
141 - if you don't need any lock at a particular level omit that level
142 - don't put anything for the BGL level
143 - if you want all locks at a level use locking.ALL_SET as a value
145 If you need to share locks (rather than acquire them exclusively) at one
146 level you can modify self.share_locks, setting a true value (usually 1) for
147 that level. By default locks are not shared.
151 # Acquire all nodes and one instance
152 self.needed_locks = {
153 locking.LEVEL_NODE: locking.ALL_SET,
154 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156 # Acquire just two nodes
157 self.needed_locks = {
158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161 self.needed_locks = {} # No, you can't leave it to the default value None
164 # The implementation of this method is mandatory only if the new LU is
165 # concurrent, so that old LUs don't need to be changed all at the same
168 self.needed_locks = {} # Exclusive LUs don't need locks.
170 raise NotImplementedError
172 def DeclareLocks(self, level):
173 """Declare LU locking needs for a level
175 While most LUs can just declare their locking needs at ExpandNames time,
176 sometimes there's the need to calculate some locks after having acquired
177 the ones before. This function is called just before acquiring locks at a
178 particular level, but after acquiring the ones at lower levels, and permits
179 such calculations. It can be used to modify self.needed_locks, and by
180 default it does nothing.
182 This function is only called if you have something already set in
183 self.needed_locks for the level.
185 @param level: Locking level which is going to be locked
186 @type level: member of ganeti.locking.LEVELS
190 def CheckPrereq(self):
191 """Check prerequisites for this LU.
193 This method should check that the prerequisites for the execution
194 of this LU are fulfilled. It can do internode communication, but
195 it should be idempotent - no cluster or system changes are
198 The method should raise errors.OpPrereqError in case something is
199 not fulfilled. Its return value is ignored.
201 This method should also update all the parameters of the opcode to
202 their canonical form if it hasn't been done by ExpandNames before.
205 raise NotImplementedError
207 def Exec(self, feedback_fn):
210 This method should implement the actual work. It should raise
211 errors.OpExecError for failures that are somewhat dealt with in
215 raise NotImplementedError
217 def BuildHooksEnv(self):
218 """Build hooks environment for this LU.
220 This method should return a three-node tuple consisting of: a dict
221 containing the environment that will be used for running the
222 specific hook for this LU, a list of node names on which the hook
223 should run before the execution, and a list of node names on which
224 the hook should run after the execution.
226 The keys of the dict must not have 'GANETI_' prefixed as this will
227 be handled in the hooks runner. Also note additional keys will be
228 added by the hooks runner. If the LU doesn't define any
229 environment, an empty dict (and not None) should be returned.
231 No nodes should be returned as an empty list (and not None).
233 Note that if the HPATH for a LU class is None, this function will
237 raise NotImplementedError
239 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks.
242 This method is called every time a hooks phase is executed, and notifies
243 the Logical Unit about the hooks' result. The LU can then use it to alter
244 its result based on the hooks. By default the method does nothing and the
245 previous result is passed back unchanged but any LU can define it if it
246 wants to use the local cluster hook-scripts somehow.
248 @param phase: one of L{constants.HOOKS_PHASE_POST} or
249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250 @param hook_results: the results of the multi-node hooks rpc call
251 @param feedback_fn: function used send feedback back to the caller
252 @param lu_result: the previous Exec result this LU had, or None
254 @return: the new Exec result, based on the previous result
260 def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance.
263 Many LUs that work on an instance take its name in self.op.instance_name
264 and need to expand it and then declare the expanded name for locking. This
265 function does it, and then updates self.op.instance_name to the expanded
266 name. It also initializes needed_locks as a dict, if this hasn't been done
270 if self.needed_locks is None:
271 self.needed_locks = {}
273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274 "_ExpandAndLockInstance called with instance-level locks set"
275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276 if expanded_name is None:
277 raise errors.OpPrereqError("Instance '%s' not known" %
278 self.op.instance_name)
279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280 self.op.instance_name = expanded_name
282 def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking.
285 This function should be called after locking one or more instances to lock
286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287 with all primary or secondary nodes for instances already locked and
288 present in self.needed_locks[locking.LEVEL_INSTANCE].
290 It should be called from DeclareLocks, and for safety only works if
291 self.recalculate_locks[locking.LEVEL_NODE] is set.
293 In the future it may grow parameters to just lock some instance's nodes, or
294 to just lock primaries or secondary nodes, if needed.
296 If should be called in DeclareLocks in a way similar to::
298 if level == locking.LEVEL_NODE:
299 self._LockInstancesNodes()
301 @type primary_only: boolean
302 @param primary_only: only lock primary nodes of locked instances
305 assert locking.LEVEL_NODE in self.recalculate_locks, \
306 "_LockInstancesNodes helper function called with no nodes to recalculate"
308 # TODO: check if we're really been called with the instance locks held
310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311 # future we might want to have different behaviors depending on the value
312 # of self.recalculate_locks[locking.LEVEL_NODE]
314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315 instance = self.context.cfg.GetInstanceInfo(instance_name)
316 wanted_nodes.append(instance.primary_node)
318 wanted_nodes.extend(instance.secondary_nodes)
320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325 del self.recalculate_locks[locking.LEVEL_NODE]
328 class NoHooksLU(LogicalUnit):
329 """Simple LU which runs no hooks.
331 This LU is intended as a parent for other LogicalUnits which will
332 run no hooks, in order to reduce duplicate code.
339 def _GetWantedNodes(lu, nodes):
340 """Returns list of checked and expanded node names.
342 @type lu: L{LogicalUnit}
343 @param lu: the logical unit on whose behalf we execute
345 @param nodes: list of node names or None for all nodes
347 @return: the list of nodes, sorted
348 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351 if not isinstance(nodes, list):
352 raise errors.OpPrereqError("Invalid argument type 'nodes'")
355 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356 " non-empty list of nodes whose name is to be expanded.")
360 node = lu.cfg.ExpandNodeName(name)
362 raise errors.OpPrereqError("No such node name '%s'" % name)
365 return utils.NiceSort(wanted)
368 def _GetWantedInstances(lu, instances):
369 """Returns list of checked and expanded instance names.
371 @type lu: L{LogicalUnit}
372 @param lu: the logical unit on whose behalf we execute
373 @type instances: list
374 @param instances: list of instance names or None for all instances
376 @return: the list of instances, sorted
377 @raise errors.OpPrereqError: if the instances parameter is wrong type
378 @raise errors.OpPrereqError: if any of the passed instances is not found
381 if not isinstance(instances, list):
382 raise errors.OpPrereqError("Invalid argument type 'instances'")
387 for name in instances:
388 instance = lu.cfg.ExpandInstanceName(name)
390 raise errors.OpPrereqError("No such instance name '%s'" % name)
391 wanted.append(instance)
394 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398 def _CheckOutputFields(static, dynamic, selected):
399 """Checks whether all selected fields are valid.
401 @type static: L{utils.FieldSet}
402 @param static: static fields set
403 @type dynamic: L{utils.FieldSet}
404 @param dynamic: dynamic fields set
411 delta = f.NonMatching(selected)
413 raise errors.OpPrereqError("Unknown output fields selected: %s"
417 def _CheckBooleanOpField(op, name):
418 """Validates boolean opcode parameters.
420 This will ensure that an opcode parameter is either a boolean value,
421 or None (but that it always exists).
424 val = getattr(op, name, None)
425 if not (val is None or isinstance(val, bool)):
426 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428 setattr(op, name, val)
431 def _CheckNodeOnline(lu, node):
432 """Ensure that a given node is online.
434 @param lu: the LU on behalf of which we make the check
435 @param node: the node to check
436 @raise errors.OpPrereqError: if the node is offline
439 if lu.cfg.GetNodeInfo(node).offline:
440 raise errors.OpPrereqError("Can't use offline node %s" % node)
443 def _CheckNodeNotDrained(lu, node):
444 """Ensure that a given node is not drained.
446 @param lu: the LU on behalf of which we make the check
447 @param node: the node to check
448 @raise errors.OpPrereqError: if the node is drained
451 if lu.cfg.GetNodeInfo(node).drained:
452 raise errors.OpPrereqError("Can't use drained node %s" % node)
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456 memory, vcpus, nics, disk_template, disks):
457 """Builds instance related env variables for hooks
459 This builds the hook environment from individual variables.
462 @param name: the name of the instance
463 @type primary_node: string
464 @param primary_node: the name of the instance's primary node
465 @type secondary_nodes: list
466 @param secondary_nodes: list of secondary nodes as strings
467 @type os_type: string
468 @param os_type: the name of the instance's OS
469 @type status: boolean
470 @param status: the should_run status of the instance
472 @param memory: the memory size of the instance
474 @param vcpus: the count of VCPUs the instance has
476 @param nics: list of tuples (ip, bridge, mac) representing
477 the NICs the instance has
478 @type disk_template: string
479 @param disk_template: the distk template of the instance
481 @param disks: the list of (size, mode) pairs
483 @return: the hook environment for this instance
492 "INSTANCE_NAME": name,
493 "INSTANCE_PRIMARY": primary_node,
494 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495 "INSTANCE_OS_TYPE": os_type,
496 "INSTANCE_STATUS": str_status,
497 "INSTANCE_MEMORY": memory,
498 "INSTANCE_VCPUS": vcpus,
499 "INSTANCE_DISK_TEMPLATE": disk_template,
503 nic_count = len(nics)
504 for idx, (ip, mac, mode, link) in enumerate(nics):
507 env["INSTANCE_NIC%d_IP" % idx] = ip
508 env["INSTANCE_NIC%d_MAC" % idx] = mac
509 env["INSTANCE_NIC%d_MODE" % idx] = mode
510 env["INSTANCE_NIC%d_LINK" % idx] = link
511 if mode == constants.NIC_MODE_BRIDGED:
512 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
516 env["INSTANCE_NIC_COUNT"] = nic_count
519 disk_count = len(disks)
520 for idx, (size, mode) in enumerate(disks):
521 env["INSTANCE_DISK%d_SIZE" % idx] = size
522 env["INSTANCE_DISK%d_MODE" % idx] = mode
526 env["INSTANCE_DISK_COUNT"] = disk_count
530 def _PreBuildNICHooksList(lu, nics):
531 """Build a list of nic information tuples.
533 This list is suitable to be passed to _BuildInstanceHookEnv.
535 @type lu: L{LogicalUnit}
536 @param lu: the logical unit on whose behalf we execute
537 @type nics: list of L{objects.NIC}
538 @param nics: list of nics to convert to hooks tuples
542 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
546 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547 mode = filled_params[constants.NIC_MODE]
548 link = filled_params[constants.NIC_LINK]
549 hooks_nics.append((ip, mac, mode, link))
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553 """Builds instance related env variables for hooks from an object.
555 @type lu: L{LogicalUnit}
556 @param lu: the logical unit on whose behalf we execute
557 @type instance: L{objects.Instance}
558 @param instance: the instance for which we should build the
561 @param override: dictionary with key/values that will override
564 @return: the hook environment dictionary
567 bep = lu.cfg.GetClusterInfo().FillBE(instance)
569 'name': instance.name,
570 'primary_node': instance.primary_node,
571 'secondary_nodes': instance.secondary_nodes,
572 'os_type': instance.os,
573 'status': instance.admin_up,
574 'memory': bep[constants.BE_MEMORY],
575 'vcpus': bep[constants.BE_VCPUS],
576 'nics': _PreBuildNICHooksList(lu, instance.nics),
577 'disk_template': instance.disk_template,
578 'disks': [(disk.size, disk.mode) for disk in instance.disks],
581 args.update(override)
582 return _BuildInstanceHookEnv(**args)
585 def _AdjustCandidatePool(lu):
586 """Adjust the candidate pool after node operations.
589 mod_list = lu.cfg.MaintainCandidatePool()
591 lu.LogInfo("Promoted nodes to master candidate role: %s",
592 ", ".join(node.name for node in mod_list))
593 for name in mod_list:
594 lu.context.ReaddNode(name)
595 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
597 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602 profile=constants.PP_DEFAULT):
603 """Check that the brigdes needed by a list of nics exist.
606 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608 for nic in target_nics]
609 brlist = [params[constants.NIC_LINK] for params in paramslist
610 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
612 result = lu.rpc.call_bridges_exist(target_node, brlist)
615 raise errors.OpPrereqError("One or more target bridges %s does not"
616 " exist on destination node '%s'" %
617 (brlist, target_node))
620 def _CheckInstanceBridgesExist(lu, instance, node=None):
621 """Check that the brigdes needed by an instance exist.
625 node=instance.primary_node
626 _CheckNicsBridgesExist(lu, instance.nics, node)
629 class LUDestroyCluster(NoHooksLU):
630 """Logical unit for destroying the cluster.
635 def CheckPrereq(self):
636 """Check prerequisites.
638 This checks whether the cluster is empty.
640 Any errors are signalled by raising errors.OpPrereqError.
643 master = self.cfg.GetMasterNode()
645 nodelist = self.cfg.GetNodeList()
646 if len(nodelist) != 1 or nodelist[0] != master:
647 raise errors.OpPrereqError("There are still %d node(s) in"
648 " this cluster." % (len(nodelist) - 1))
649 instancelist = self.cfg.GetInstanceList()
651 raise errors.OpPrereqError("There are still %d instance(s) in"
652 " this cluster." % len(instancelist))
654 def Exec(self, feedback_fn):
655 """Destroys the cluster.
658 master = self.cfg.GetMasterNode()
659 result = self.rpc.call_node_stop_master(master, False)
662 raise errors.OpExecError("Could not disable the master role")
663 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
664 utils.CreateBackup(priv_key)
665 utils.CreateBackup(pub_key)
669 class LUVerifyCluster(LogicalUnit):
670 """Verifies the cluster status.
673 HPATH = "cluster-verify"
674 HTYPE = constants.HTYPE_CLUSTER
675 _OP_REQP = ["skip_checks"]
678 def ExpandNames(self):
679 self.needed_locks = {
680 locking.LEVEL_NODE: locking.ALL_SET,
681 locking.LEVEL_INSTANCE: locking.ALL_SET,
683 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
685 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
686 node_result, feedback_fn, master_files,
688 """Run multiple tests against a node.
692 - compares ganeti version
693 - checks vg existance and size > 20G
694 - checks config file checksum
695 - checks ssh to other nodes
697 @type nodeinfo: L{objects.Node}
698 @param nodeinfo: the node to check
699 @param file_list: required list of files
700 @param local_cksum: dictionary of local files and their checksums
701 @param node_result: the results from the node
702 @param feedback_fn: function used to accumulate results
703 @param master_files: list of files that only masters should have
704 @param drbd_map: the useddrbd minors for this node, in
705 form of minor: (instance, must_exist) which correspond to instances
706 and their running status
707 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
712 # main result, node_result should be a non-empty dict
713 if not node_result or not isinstance(node_result, dict):
714 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
717 # compares ganeti version
718 local_version = constants.PROTOCOL_VERSION
719 remote_version = node_result.get('version', None)
720 if not (remote_version and isinstance(remote_version, (list, tuple)) and
721 len(remote_version) == 2):
722 feedback_fn(" - ERROR: connection to %s failed" % (node))
725 if local_version != remote_version[0]:
726 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
727 " node %s %s" % (local_version, node, remote_version[0]))
730 # node seems compatible, we can actually try to look into its results
734 # full package version
735 if constants.RELEASE_VERSION != remote_version[1]:
736 feedback_fn(" - WARNING: software version mismatch: master %s,"
738 (constants.RELEASE_VERSION, node, remote_version[1]))
740 # checks vg existence and size > 20G
741 if vg_name is not None:
742 vglist = node_result.get(constants.NV_VGLIST, None)
744 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
748 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
749 constants.MIN_VG_SIZE)
751 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
754 # checks config file checksum
756 remote_cksum = node_result.get(constants.NV_FILELIST, None)
757 if not isinstance(remote_cksum, dict):
759 feedback_fn(" - ERROR: node hasn't returned file checksum data")
761 for file_name in file_list:
762 node_is_mc = nodeinfo.master_candidate
763 must_have_file = file_name not in master_files
764 if file_name not in remote_cksum:
765 if node_is_mc or must_have_file:
767 feedback_fn(" - ERROR: file '%s' missing" % file_name)
768 elif remote_cksum[file_name] != local_cksum[file_name]:
769 if node_is_mc or must_have_file:
771 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
773 # not candidate and this is not a must-have file
775 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
778 # all good, except non-master/non-must have combination
779 if not node_is_mc and not must_have_file:
780 feedback_fn(" - ERROR: file '%s' should not exist on non master"
781 " candidates" % file_name)
785 if constants.NV_NODELIST not in node_result:
787 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
789 if node_result[constants.NV_NODELIST]:
791 for node in node_result[constants.NV_NODELIST]:
792 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
793 (node, node_result[constants.NV_NODELIST][node]))
795 if constants.NV_NODENETTEST not in node_result:
797 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
799 if node_result[constants.NV_NODENETTEST]:
801 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
803 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
804 (node, node_result[constants.NV_NODENETTEST][node]))
806 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
807 if isinstance(hyp_result, dict):
808 for hv_name, hv_result in hyp_result.iteritems():
809 if hv_result is not None:
810 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
811 (hv_name, hv_result))
813 # check used drbd list
814 if vg_name is not None:
815 used_minors = node_result.get(constants.NV_DRBDLIST, [])
816 if not isinstance(used_minors, (tuple, list)):
817 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
820 for minor, (iname, must_exist) in drbd_map.items():
821 if minor not in used_minors and must_exist:
822 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
823 " not active" % (minor, iname))
825 for minor in used_minors:
826 if minor not in drbd_map:
827 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
833 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
834 node_instance, feedback_fn, n_offline):
835 """Verify an instance.
837 This function checks to see if the required block devices are
838 available on the instance's node.
843 node_current = instanceconfig.primary_node
846 instanceconfig.MapLVsByNode(node_vol_should)
848 for node in node_vol_should:
849 if node in n_offline:
850 # ignore missing volumes on offline nodes
852 for volume in node_vol_should[node]:
853 if node not in node_vol_is or volume not in node_vol_is[node]:
854 feedback_fn(" - ERROR: volume %s missing on node %s" %
858 if instanceconfig.admin_up:
859 if ((node_current not in node_instance or
860 not instance in node_instance[node_current]) and
861 node_current not in n_offline):
862 feedback_fn(" - ERROR: instance %s not running on node %s" %
863 (instance, node_current))
866 for node in node_instance:
867 if (not node == node_current):
868 if instance in node_instance[node]:
869 feedback_fn(" - ERROR: instance %s should not run on node %s" %
875 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
876 """Verify if there are any unknown volumes in the cluster.
878 The .os, .swap and backup volumes are ignored. All other volumes are
884 for node in node_vol_is:
885 for volume in node_vol_is[node]:
886 if node not in node_vol_should or volume not in node_vol_should[node]:
887 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
892 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
893 """Verify the list of running instances.
895 This checks what instances are running but unknown to the cluster.
899 for node in node_instance:
900 for runninginstance in node_instance[node]:
901 if runninginstance not in instancelist:
902 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
903 (runninginstance, node))
907 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
908 """Verify N+1 Memory Resilience.
910 Check that if one single node dies we can still start all the instances it
916 for node, nodeinfo in node_info.iteritems():
917 # This code checks that every node which is now listed as secondary has
918 # enough memory to host all instances it is supposed to should a single
919 # other node in the cluster fail.
920 # FIXME: not ready for failover to an arbitrary node
921 # FIXME: does not support file-backed instances
922 # WARNING: we currently take into account down instances as well as up
923 # ones, considering that even if they're down someone might want to start
924 # them even in the event of a node failure.
925 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
927 for instance in instances:
928 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
929 if bep[constants.BE_AUTO_BALANCE]:
930 needed_mem += bep[constants.BE_MEMORY]
931 if nodeinfo['mfree'] < needed_mem:
932 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
933 " failovers should node %s fail" % (node, prinode))
937 def CheckPrereq(self):
938 """Check prerequisites.
940 Transform the list of checks we're going to skip into a set and check that
941 all its members are valid.
944 self.skip_set = frozenset(self.op.skip_checks)
945 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
946 raise errors.OpPrereqError("Invalid checks to be skipped specified")
948 def BuildHooksEnv(self):
951 Cluster-Verify hooks just rone in the post phase and their failure makes
952 the output be logged in the verify output and the verification to fail.
955 all_nodes = self.cfg.GetNodeList()
957 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
959 for node in self.cfg.GetAllNodesInfo().values():
960 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
962 return env, [], all_nodes
964 def Exec(self, feedback_fn):
965 """Verify integrity of cluster, performing various test on nodes.
969 feedback_fn("* Verifying global settings")
970 for msg in self.cfg.VerifyConfig():
971 feedback_fn(" - ERROR: %s" % msg)
973 vg_name = self.cfg.GetVGName()
974 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
975 nodelist = utils.NiceSort(self.cfg.GetNodeList())
976 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
977 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
978 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
979 for iname in instancelist)
980 i_non_redundant = [] # Non redundant instances
981 i_non_a_balanced = [] # Non auto-balanced instances
982 n_offline = [] # List of offline nodes
983 n_drained = [] # List of nodes being drained
989 # FIXME: verify OS list
991 master_files = [constants.CLUSTER_CONF_FILE]
993 file_names = ssconf.SimpleStore().GetFileList()
994 file_names.append(constants.SSL_CERT_FILE)
995 file_names.append(constants.RAPI_CERT_FILE)
996 file_names.extend(master_files)
998 local_checksums = utils.FingerprintFiles(file_names)
1000 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1001 node_verify_param = {
1002 constants.NV_FILELIST: file_names,
1003 constants.NV_NODELIST: [node.name for node in nodeinfo
1004 if not node.offline],
1005 constants.NV_HYPERVISOR: hypervisors,
1006 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1007 node.secondary_ip) for node in nodeinfo
1008 if not node.offline],
1009 constants.NV_INSTANCELIST: hypervisors,
1010 constants.NV_VERSION: None,
1011 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1013 if vg_name is not None:
1014 node_verify_param[constants.NV_VGLIST] = None
1015 node_verify_param[constants.NV_LVLIST] = vg_name
1016 node_verify_param[constants.NV_DRBDLIST] = None
1017 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1018 self.cfg.GetClusterName())
1020 cluster = self.cfg.GetClusterInfo()
1021 master_node = self.cfg.GetMasterNode()
1022 all_drbd_map = self.cfg.ComputeDRBDMap()
1024 for node_i in nodeinfo:
1026 nresult = all_nvinfo[node].data
1029 feedback_fn("* Skipping offline node %s" % (node,))
1030 n_offline.append(node)
1033 if node == master_node:
1035 elif node_i.master_candidate:
1036 ntype = "master candidate"
1037 elif node_i.drained:
1039 n_drained.append(node)
1042 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1044 if all_nvinfo[node].failed or not isinstance(nresult, dict):
1045 feedback_fn(" - ERROR: connection to %s failed" % (node,))
1050 for minor, instance in all_drbd_map[node].items():
1051 if instance not in instanceinfo:
1052 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1054 # ghost instance should not be running, but otherwise we
1055 # don't give double warnings (both ghost instance and
1056 # unallocated minor in use)
1057 node_drbd[minor] = (instance, False)
1059 instance = instanceinfo[instance]
1060 node_drbd[minor] = (instance.name, instance.admin_up)
1061 result = self._VerifyNode(node_i, file_names, local_checksums,
1062 nresult, feedback_fn, master_files,
1066 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1068 node_volume[node] = {}
1069 elif isinstance(lvdata, basestring):
1070 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1071 (node, utils.SafeEncode(lvdata)))
1073 node_volume[node] = {}
1074 elif not isinstance(lvdata, dict):
1075 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1079 node_volume[node] = lvdata
1082 idata = nresult.get(constants.NV_INSTANCELIST, None)
1083 if not isinstance(idata, list):
1084 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1089 node_instance[node] = idata
1092 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093 if not isinstance(nodeinfo, dict):
1094 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1100 "mfree": int(nodeinfo['memory_free']),
1103 # dictionary holding all instances this node is secondary for,
1104 # grouped by their primary node. Each key is a cluster node, and each
1105 # value is a list of instances which have the key as primary and the
1106 # current node as secondary. this is handy to calculate N+1 memory
1107 # availability if you can only failover from a primary to its
1109 "sinst-by-pnode": {},
1111 # FIXME: devise a free space model for file based instances as well
1112 if vg_name is not None:
1113 if (constants.NV_VGLIST not in nresult or
1114 vg_name not in nresult[constants.NV_VGLIST]):
1115 feedback_fn(" - ERROR: node %s didn't return data for the"
1116 " volume group '%s' - it is either missing or broken" %
1120 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121 except (ValueError, KeyError):
1122 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1123 " from node %s" % (node,))
1127 node_vol_should = {}
1129 for instance in instancelist:
1130 feedback_fn("* Verifying instance %s" % instance)
1131 inst_config = instanceinfo[instance]
1132 result = self._VerifyInstance(instance, inst_config, node_volume,
1133 node_instance, feedback_fn, n_offline)
1135 inst_nodes_offline = []
1137 inst_config.MapLVsByNode(node_vol_should)
1139 instance_cfg[instance] = inst_config
1141 pnode = inst_config.primary_node
1142 if pnode in node_info:
1143 node_info[pnode]['pinst'].append(instance)
1144 elif pnode not in n_offline:
1145 feedback_fn(" - ERROR: instance %s, connection to primary node"
1146 " %s failed" % (instance, pnode))
1149 if pnode in n_offline:
1150 inst_nodes_offline.append(pnode)
1152 # If the instance is non-redundant we cannot survive losing its primary
1153 # node, so we are not N+1 compliant. On the other hand we have no disk
1154 # templates with more than one secondary so that situation is not well
1156 # FIXME: does not support file-backed instances
1157 if len(inst_config.secondary_nodes) == 0:
1158 i_non_redundant.append(instance)
1159 elif len(inst_config.secondary_nodes) > 1:
1160 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1163 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164 i_non_a_balanced.append(instance)
1166 for snode in inst_config.secondary_nodes:
1167 if snode in node_info:
1168 node_info[snode]['sinst'].append(instance)
1169 if pnode not in node_info[snode]['sinst-by-pnode']:
1170 node_info[snode]['sinst-by-pnode'][pnode] = []
1171 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172 elif snode not in n_offline:
1173 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1174 " %s failed" % (instance, snode))
1176 if snode in n_offline:
1177 inst_nodes_offline.append(snode)
1179 if inst_nodes_offline:
1180 # warn that the instance lives on offline nodes, and set bad=True
1181 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1182 ", ".join(inst_nodes_offline))
1185 feedback_fn("* Verifying orphan volumes")
1186 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1190 feedback_fn("* Verifying remaining instances")
1191 result = self._VerifyOrphanInstances(instancelist, node_instance,
1195 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196 feedback_fn("* Verifying N+1 Memory redundancy")
1197 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1200 feedback_fn("* Other Notes")
1202 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1203 % len(i_non_redundant))
1205 if i_non_a_balanced:
1206 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1207 % len(i_non_a_balanced))
1210 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1213 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1217 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218 """Analize the post-hooks' result
1220 This method analyses the hook result, handles it, and sends some
1221 nicely-formatted feedback back to the user.
1223 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225 @param hooks_results: the results of the multi-node hooks rpc call
1226 @param feedback_fn: function used send feedback back to the caller
1227 @param lu_result: previous Exec result
1228 @return: the new Exec result, based on the previous result
1232 # We only really run POST phase hooks, and are only interested in
1234 if phase == constants.HOOKS_PHASE_POST:
1235 # Used to change hooks' output to proper indentation
1236 indent_re = re.compile('^', re.M)
1237 feedback_fn("* Hooks Results")
1238 if not hooks_results:
1239 feedback_fn(" - ERROR: general communication failure")
1242 for node_name in hooks_results:
1243 show_node_header = True
1244 res = hooks_results[node_name]
1245 if res.failed or res.data is False or not isinstance(res.data, list):
1247 # no need to warn or set fail return value
1249 feedback_fn(" Communication failure in hooks execution")
1252 for script, hkr, output in res.data:
1253 if hkr == constants.HKR_FAIL:
1254 # The node header is only shown once, if there are
1255 # failing hooks on that node
1256 if show_node_header:
1257 feedback_fn(" Node %s:" % node_name)
1258 show_node_header = False
1259 feedback_fn(" ERROR: Script %s failed, output:" % script)
1260 output = indent_re.sub(' ', output)
1261 feedback_fn("%s" % output)
1267 class LUVerifyDisks(NoHooksLU):
1268 """Verifies the cluster disks status.
1274 def ExpandNames(self):
1275 self.needed_locks = {
1276 locking.LEVEL_NODE: locking.ALL_SET,
1277 locking.LEVEL_INSTANCE: locking.ALL_SET,
1279 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1281 def CheckPrereq(self):
1282 """Check prerequisites.
1284 This has no prerequisites.
1289 def Exec(self, feedback_fn):
1290 """Verify integrity of cluster disks.
1293 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1295 vg_name = self.cfg.GetVGName()
1296 nodes = utils.NiceSort(self.cfg.GetNodeList())
1297 instances = [self.cfg.GetInstanceInfo(name)
1298 for name in self.cfg.GetInstanceList()]
1301 for inst in instances:
1303 if (not inst.admin_up or
1304 inst.disk_template not in constants.DTS_NET_MIRROR):
1306 inst.MapLVsByNode(inst_lvs)
1307 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1308 for node, vol_list in inst_lvs.iteritems():
1309 for vol in vol_list:
1310 nv_dict[(node, vol)] = inst
1315 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1320 lvs = node_lvs[node]
1323 self.LogWarning("Connection to node %s failed: %s" %
1327 if isinstance(lvs, basestring):
1328 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1329 res_nlvm[node] = lvs
1331 elif not isinstance(lvs, dict):
1332 logging.warning("Connection to node %s failed or invalid data"
1334 res_nodes.append(node)
1337 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1338 inst = nv_dict.pop((node, lv_name), None)
1339 if (not lv_online and inst is not None
1340 and inst.name not in res_instances):
1341 res_instances.append(inst.name)
1343 # any leftover items in nv_dict are missing LVs, let's arrange the
1345 for key, inst in nv_dict.iteritems():
1346 if inst.name not in res_missing:
1347 res_missing[inst.name] = []
1348 res_missing[inst.name].append(key)
1353 class LURenameCluster(LogicalUnit):
1354 """Rename the cluster.
1357 HPATH = "cluster-rename"
1358 HTYPE = constants.HTYPE_CLUSTER
1361 def BuildHooksEnv(self):
1366 "OP_TARGET": self.cfg.GetClusterName(),
1367 "NEW_NAME": self.op.name,
1369 mn = self.cfg.GetMasterNode()
1370 return env, [mn], [mn]
1372 def CheckPrereq(self):
1373 """Verify that the passed name is a valid one.
1376 hostname = utils.HostInfo(self.op.name)
1378 new_name = hostname.name
1379 self.ip = new_ip = hostname.ip
1380 old_name = self.cfg.GetClusterName()
1381 old_ip = self.cfg.GetMasterIP()
1382 if new_name == old_name and new_ip == old_ip:
1383 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1384 " cluster has changed")
1385 if new_ip != old_ip:
1386 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1387 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1388 " reachable on the network. Aborting." %
1391 self.op.name = new_name
1393 def Exec(self, feedback_fn):
1394 """Rename the cluster.
1397 clustername = self.op.name
1400 # shutdown the master IP
1401 master = self.cfg.GetMasterNode()
1402 result = self.rpc.call_node_stop_master(master, False)
1403 if result.failed or not result.data:
1404 raise errors.OpExecError("Could not disable the master role")
1407 cluster = self.cfg.GetClusterInfo()
1408 cluster.cluster_name = clustername
1409 cluster.master_ip = ip
1410 self.cfg.Update(cluster)
1412 # update the known hosts file
1413 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1414 node_list = self.cfg.GetNodeList()
1416 node_list.remove(master)
1419 result = self.rpc.call_upload_file(node_list,
1420 constants.SSH_KNOWN_HOSTS_FILE)
1421 for to_node, to_result in result.iteritems():
1422 msg = to_result.RemoteFailMsg()
1424 msg = ("Copy of file %s to node %s failed: %s" %
1425 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1426 self.proc.LogWarning(msg)
1429 result = self.rpc.call_node_start_master(master, False)
1430 if result.failed or not result.data:
1431 self.LogWarning("Could not re-enable the master role on"
1432 " the master, please restart manually.")
1435 def _RecursiveCheckIfLVMBased(disk):
1436 """Check if the given disk or its children are lvm-based.
1438 @type disk: L{objects.Disk}
1439 @param disk: the disk to check
1441 @return: boolean indicating whether a LD_LV dev_type was found or not
1445 for chdisk in disk.children:
1446 if _RecursiveCheckIfLVMBased(chdisk):
1448 return disk.dev_type == constants.LD_LV
1451 class LUSetClusterParams(LogicalUnit):
1452 """Change the parameters of the cluster.
1455 HPATH = "cluster-modify"
1456 HTYPE = constants.HTYPE_CLUSTER
1460 def CheckArguments(self):
1464 if not hasattr(self.op, "candidate_pool_size"):
1465 self.op.candidate_pool_size = None
1466 if self.op.candidate_pool_size is not None:
1468 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1469 except (ValueError, TypeError), err:
1470 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1472 if self.op.candidate_pool_size < 1:
1473 raise errors.OpPrereqError("At least one master candidate needed")
1475 def ExpandNames(self):
1476 # FIXME: in the future maybe other cluster params won't require checking on
1477 # all nodes to be modified.
1478 self.needed_locks = {
1479 locking.LEVEL_NODE: locking.ALL_SET,
1481 self.share_locks[locking.LEVEL_NODE] = 1
1483 def BuildHooksEnv(self):
1488 "OP_TARGET": self.cfg.GetClusterName(),
1489 "NEW_VG_NAME": self.op.vg_name,
1491 mn = self.cfg.GetMasterNode()
1492 return env, [mn], [mn]
1494 def CheckPrereq(self):
1495 """Check prerequisites.
1497 This checks whether the given params don't conflict and
1498 if the given volume group is valid.
1501 if self.op.vg_name is not None and not self.op.vg_name:
1502 instances = self.cfg.GetAllInstancesInfo().values()
1503 for inst in instances:
1504 for disk in inst.disks:
1505 if _RecursiveCheckIfLVMBased(disk):
1506 raise errors.OpPrereqError("Cannot disable lvm storage while"
1507 " lvm-based instances exist")
1509 node_list = self.acquired_locks[locking.LEVEL_NODE]
1511 # if vg_name not None, checks given volume group on all nodes
1513 vglist = self.rpc.call_vg_list(node_list)
1514 for node in node_list:
1515 if vglist[node].failed:
1516 # ignoring down node
1517 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1519 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1521 constants.MIN_VG_SIZE)
1523 raise errors.OpPrereqError("Error on node '%s': %s" %
1526 self.cluster = cluster = self.cfg.GetClusterInfo()
1527 # validate params changes
1528 if self.op.beparams:
1529 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1530 self.new_beparams = objects.FillDict(
1531 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1533 if self.op.nicparams:
1534 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1535 self.new_nicparams = objects.FillDict(
1536 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1537 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1539 # hypervisor list/parameters
1540 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1541 if self.op.hvparams:
1542 if not isinstance(self.op.hvparams, dict):
1543 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1544 for hv_name, hv_dict in self.op.hvparams.items():
1545 if hv_name not in self.new_hvparams:
1546 self.new_hvparams[hv_name] = hv_dict
1548 self.new_hvparams[hv_name].update(hv_dict)
1550 if self.op.enabled_hypervisors is not None:
1551 self.hv_list = self.op.enabled_hypervisors
1553 self.hv_list = cluster.enabled_hypervisors
1555 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1556 # either the enabled list has changed, or the parameters have, validate
1557 for hv_name, hv_params in self.new_hvparams.items():
1558 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1559 (self.op.enabled_hypervisors and
1560 hv_name in self.op.enabled_hypervisors)):
1561 # either this is a new hypervisor, or its parameters have changed
1562 hv_class = hypervisor.GetHypervisor(hv_name)
1563 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1564 hv_class.CheckParameterSyntax(hv_params)
1565 _CheckHVParams(self, node_list, hv_name, hv_params)
1567 def Exec(self, feedback_fn):
1568 """Change the parameters of the cluster.
1571 if self.op.vg_name is not None:
1572 new_volume = self.op.vg_name
1575 if new_volume != self.cfg.GetVGName():
1576 self.cfg.SetVGName(new_volume)
1578 feedback_fn("Cluster LVM configuration already in desired"
1579 " state, not changing")
1580 if self.op.hvparams:
1581 self.cluster.hvparams = self.new_hvparams
1582 if self.op.enabled_hypervisors is not None:
1583 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1584 if self.op.beparams:
1585 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1586 if self.op.nicparams:
1587 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1589 if self.op.candidate_pool_size is not None:
1590 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1592 self.cfg.Update(self.cluster)
1594 # we want to update nodes after the cluster so that if any errors
1595 # happen, we have recorded and saved the cluster info
1596 if self.op.candidate_pool_size is not None:
1597 _AdjustCandidatePool(self)
1600 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1601 """Distribute additional files which are part of the cluster configuration.
1603 ConfigWriter takes care of distributing the config and ssconf files, but
1604 there are more files which should be distributed to all nodes. This function
1605 makes sure those are copied.
1607 @param lu: calling logical unit
1608 @param additional_nodes: list of nodes not in the config to distribute to
1611 # 1. Gather target nodes
1612 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1613 dist_nodes = lu.cfg.GetNodeList()
1614 if additional_nodes is not None:
1615 dist_nodes.extend(additional_nodes)
1616 if myself.name in dist_nodes:
1617 dist_nodes.remove(myself.name)
1618 # 2. Gather files to distribute
1619 dist_files = set([constants.ETC_HOSTS,
1620 constants.SSH_KNOWN_HOSTS_FILE,
1621 constants.RAPI_CERT_FILE,
1622 constants.RAPI_USERS_FILE,
1625 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1626 for hv_name in enabled_hypervisors:
1627 hv_class = hypervisor.GetHypervisor(hv_name)
1628 dist_files.update(hv_class.GetAncillaryFiles())
1630 # 3. Perform the files upload
1631 for fname in dist_files:
1632 if os.path.exists(fname):
1633 result = lu.rpc.call_upload_file(dist_nodes, fname)
1634 for to_node, to_result in result.items():
1635 msg = to_result.RemoteFailMsg()
1637 msg = ("Copy of file %s to node %s failed: %s" %
1638 (fname, to_node, msg))
1639 lu.proc.LogWarning(msg)
1642 class LURedistributeConfig(NoHooksLU):
1643 """Force the redistribution of cluster configuration.
1645 This is a very simple LU.
1651 def ExpandNames(self):
1652 self.needed_locks = {
1653 locking.LEVEL_NODE: locking.ALL_SET,
1655 self.share_locks[locking.LEVEL_NODE] = 1
1657 def CheckPrereq(self):
1658 """Check prerequisites.
1662 def Exec(self, feedback_fn):
1663 """Redistribute the configuration.
1666 self.cfg.Update(self.cfg.GetClusterInfo())
1667 _RedistributeAncillaryFiles(self)
1670 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1671 """Sleep and poll for an instance's disk to sync.
1674 if not instance.disks:
1678 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1680 node = instance.primary_node
1682 for dev in instance.disks:
1683 lu.cfg.SetDiskID(dev, node)
1689 cumul_degraded = False
1690 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1691 msg = rstats.RemoteFailMsg()
1693 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1696 raise errors.RemoteError("Can't contact node %s for mirror data,"
1697 " aborting." % node)
1700 rstats = rstats.payload
1702 for i, mstat in enumerate(rstats):
1704 lu.LogWarning("Can't compute data for node %s/%s",
1705 node, instance.disks[i].iv_name)
1707 # we ignore the ldisk parameter
1708 perc_done, est_time, is_degraded, _ = mstat
1709 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1710 if perc_done is not None:
1712 if est_time is not None:
1713 rem_time = "%d estimated seconds remaining" % est_time
1716 rem_time = "no time estimate"
1717 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1718 (instance.disks[i].iv_name, perc_done, rem_time))
1722 time.sleep(min(60, max_time))
1725 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1726 return not cumul_degraded
1729 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1730 """Check that mirrors are not degraded.
1732 The ldisk parameter, if True, will change the test from the
1733 is_degraded attribute (which represents overall non-ok status for
1734 the device(s)) to the ldisk (representing the local storage status).
1737 lu.cfg.SetDiskID(dev, node)
1744 if on_primary or dev.AssembleOnSecondary():
1745 rstats = lu.rpc.call_blockdev_find(node, dev)
1746 msg = rstats.RemoteFailMsg()
1748 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1750 elif not rstats.payload:
1751 lu.LogWarning("Can't find disk on node %s", node)
1754 result = result and (not rstats.payload[idx])
1756 for child in dev.children:
1757 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1762 class LUDiagnoseOS(NoHooksLU):
1763 """Logical unit for OS diagnose/query.
1766 _OP_REQP = ["output_fields", "names"]
1768 _FIELDS_STATIC = utils.FieldSet()
1769 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1771 def ExpandNames(self):
1773 raise errors.OpPrereqError("Selective OS query not supported")
1775 _CheckOutputFields(static=self._FIELDS_STATIC,
1776 dynamic=self._FIELDS_DYNAMIC,
1777 selected=self.op.output_fields)
1779 # Lock all nodes, in shared mode
1780 # Temporary removal of locks, should be reverted later
1781 # TODO: reintroduce locks when they are lighter-weight
1782 self.needed_locks = {}
1783 #self.share_locks[locking.LEVEL_NODE] = 1
1784 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1786 def CheckPrereq(self):
1787 """Check prerequisites.
1792 def _DiagnoseByOS(node_list, rlist):
1793 """Remaps a per-node return list into an a per-os per-node dictionary
1795 @param node_list: a list with the names of all nodes
1796 @param rlist: a map with node names as keys and OS objects as values
1799 @return: a dictionary with osnames as keys and as value another map, with
1800 nodes as keys and list of OS objects as values, eg::
1802 {"debian-etch": {"node1": [<object>,...],
1803 "node2": [<object>,]}
1808 # we build here the list of nodes that didn't fail the RPC (at RPC
1809 # level), so that nodes with a non-responding node daemon don't
1810 # make all OSes invalid
1811 good_nodes = [node_name for node_name in rlist
1812 if not rlist[node_name].failed]
1813 for node_name, nr in rlist.iteritems():
1814 if nr.failed or not nr.data:
1816 for os_obj in nr.data:
1817 if os_obj.name not in all_os:
1818 # build a list of nodes for this os containing empty lists
1819 # for each node in node_list
1820 all_os[os_obj.name] = {}
1821 for nname in good_nodes:
1822 all_os[os_obj.name][nname] = []
1823 all_os[os_obj.name][node_name].append(os_obj)
1826 def Exec(self, feedback_fn):
1827 """Compute the list of OSes.
1830 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1831 node_data = self.rpc.call_os_diagnose(valid_nodes)
1832 if node_data == False:
1833 raise errors.OpExecError("Can't gather the list of OSes")
1834 pol = self._DiagnoseByOS(valid_nodes, node_data)
1836 for os_name, os_data in pol.iteritems():
1838 for field in self.op.output_fields:
1841 elif field == "valid":
1842 val = utils.all([osl and osl[0] for osl in os_data.values()])
1843 elif field == "node_status":
1845 for node_name, nos_list in os_data.iteritems():
1846 val[node_name] = [(v.status, v.path) for v in nos_list]
1848 raise errors.ParameterError(field)
1855 class LURemoveNode(LogicalUnit):
1856 """Logical unit for removing a node.
1859 HPATH = "node-remove"
1860 HTYPE = constants.HTYPE_NODE
1861 _OP_REQP = ["node_name"]
1863 def BuildHooksEnv(self):
1866 This doesn't run on the target node in the pre phase as a failed
1867 node would then be impossible to remove.
1871 "OP_TARGET": self.op.node_name,
1872 "NODE_NAME": self.op.node_name,
1874 all_nodes = self.cfg.GetNodeList()
1875 all_nodes.remove(self.op.node_name)
1876 return env, all_nodes, all_nodes
1878 def CheckPrereq(self):
1879 """Check prerequisites.
1882 - the node exists in the configuration
1883 - it does not have primary or secondary instances
1884 - it's not the master
1886 Any errors are signalled by raising errors.OpPrereqError.
1889 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1891 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1893 instance_list = self.cfg.GetInstanceList()
1895 masternode = self.cfg.GetMasterNode()
1896 if node.name == masternode:
1897 raise errors.OpPrereqError("Node is the master node,"
1898 " you need to failover first.")
1900 for instance_name in instance_list:
1901 instance = self.cfg.GetInstanceInfo(instance_name)
1902 if node.name in instance.all_nodes:
1903 raise errors.OpPrereqError("Instance %s is still running on the node,"
1904 " please remove first." % instance_name)
1905 self.op.node_name = node.name
1908 def Exec(self, feedback_fn):
1909 """Removes the node from the cluster.
1913 logging.info("Stopping the node daemon and removing configs from node %s",
1916 self.context.RemoveNode(node.name)
1918 self.rpc.call_node_leave_cluster(node.name)
1920 # Promote nodes to master candidate as needed
1921 _AdjustCandidatePool(self)
1924 class LUQueryNodes(NoHooksLU):
1925 """Logical unit for querying nodes.
1928 _OP_REQP = ["output_fields", "names", "use_locking"]
1930 _FIELDS_DYNAMIC = utils.FieldSet(
1932 "mtotal", "mnode", "mfree",
1934 "ctotal", "cnodes", "csockets",
1937 _FIELDS_STATIC = utils.FieldSet(
1938 "name", "pinst_cnt", "sinst_cnt",
1939 "pinst_list", "sinst_list",
1940 "pip", "sip", "tags",
1948 def ExpandNames(self):
1949 _CheckOutputFields(static=self._FIELDS_STATIC,
1950 dynamic=self._FIELDS_DYNAMIC,
1951 selected=self.op.output_fields)
1953 self.needed_locks = {}
1954 self.share_locks[locking.LEVEL_NODE] = 1
1957 self.wanted = _GetWantedNodes(self, self.op.names)
1959 self.wanted = locking.ALL_SET
1961 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1962 self.do_locking = self.do_node_query and self.op.use_locking
1964 # if we don't request only static fields, we need to lock the nodes
1965 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1968 def CheckPrereq(self):
1969 """Check prerequisites.
1972 # The validation of the node list is done in the _GetWantedNodes,
1973 # if non empty, and if empty, there's no validation to do
1976 def Exec(self, feedback_fn):
1977 """Computes the list of nodes and their attributes.
1980 all_info = self.cfg.GetAllNodesInfo()
1982 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1983 elif self.wanted != locking.ALL_SET:
1984 nodenames = self.wanted
1985 missing = set(nodenames).difference(all_info.keys())
1987 raise errors.OpExecError(
1988 "Some nodes were removed before retrieving their data: %s" % missing)
1990 nodenames = all_info.keys()
1992 nodenames = utils.NiceSort(nodenames)
1993 nodelist = [all_info[name] for name in nodenames]
1995 # begin data gathering
1997 if self.do_node_query:
1999 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2000 self.cfg.GetHypervisorType())
2001 for name in nodenames:
2002 nodeinfo = node_data[name]
2003 if not nodeinfo.failed and nodeinfo.data:
2004 nodeinfo = nodeinfo.data
2005 fn = utils.TryConvert
2007 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2008 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2009 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2010 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2011 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2012 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2013 "bootid": nodeinfo.get('bootid', None),
2014 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2015 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2018 live_data[name] = {}
2020 live_data = dict.fromkeys(nodenames, {})
2022 node_to_primary = dict([(name, set()) for name in nodenames])
2023 node_to_secondary = dict([(name, set()) for name in nodenames])
2025 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2026 "sinst_cnt", "sinst_list"))
2027 if inst_fields & frozenset(self.op.output_fields):
2028 instancelist = self.cfg.GetInstanceList()
2030 for instance_name in instancelist:
2031 inst = self.cfg.GetInstanceInfo(instance_name)
2032 if inst.primary_node in node_to_primary:
2033 node_to_primary[inst.primary_node].add(inst.name)
2034 for secnode in inst.secondary_nodes:
2035 if secnode in node_to_secondary:
2036 node_to_secondary[secnode].add(inst.name)
2038 master_node = self.cfg.GetMasterNode()
2040 # end data gathering
2043 for node in nodelist:
2045 for field in self.op.output_fields:
2048 elif field == "pinst_list":
2049 val = list(node_to_primary[node.name])
2050 elif field == "sinst_list":
2051 val = list(node_to_secondary[node.name])
2052 elif field == "pinst_cnt":
2053 val = len(node_to_primary[node.name])
2054 elif field == "sinst_cnt":
2055 val = len(node_to_secondary[node.name])
2056 elif field == "pip":
2057 val = node.primary_ip
2058 elif field == "sip":
2059 val = node.secondary_ip
2060 elif field == "tags":
2061 val = list(node.GetTags())
2062 elif field == "serial_no":
2063 val = node.serial_no
2064 elif field == "master_candidate":
2065 val = node.master_candidate
2066 elif field == "master":
2067 val = node.name == master_node
2068 elif field == "offline":
2070 elif field == "drained":
2072 elif self._FIELDS_DYNAMIC.Matches(field):
2073 val = live_data[node.name].get(field, None)
2075 raise errors.ParameterError(field)
2076 node_output.append(val)
2077 output.append(node_output)
2082 class LUQueryNodeVolumes(NoHooksLU):
2083 """Logical unit for getting volumes on node(s).
2086 _OP_REQP = ["nodes", "output_fields"]
2088 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2089 _FIELDS_STATIC = utils.FieldSet("node")
2091 def ExpandNames(self):
2092 _CheckOutputFields(static=self._FIELDS_STATIC,
2093 dynamic=self._FIELDS_DYNAMIC,
2094 selected=self.op.output_fields)
2096 self.needed_locks = {}
2097 self.share_locks[locking.LEVEL_NODE] = 1
2098 if not self.op.nodes:
2099 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2101 self.needed_locks[locking.LEVEL_NODE] = \
2102 _GetWantedNodes(self, self.op.nodes)
2104 def CheckPrereq(self):
2105 """Check prerequisites.
2107 This checks that the fields required are valid output fields.
2110 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2112 def Exec(self, feedback_fn):
2113 """Computes the list of nodes and their attributes.
2116 nodenames = self.nodes
2117 volumes = self.rpc.call_node_volumes(nodenames)
2119 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2120 in self.cfg.GetInstanceList()]
2122 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2125 for node in nodenames:
2126 if node not in volumes or volumes[node].failed or not volumes[node].data:
2129 node_vols = volumes[node].data[:]
2130 node_vols.sort(key=lambda vol: vol['dev'])
2132 for vol in node_vols:
2134 for field in self.op.output_fields:
2137 elif field == "phys":
2141 elif field == "name":
2143 elif field == "size":
2144 val = int(float(vol['size']))
2145 elif field == "instance":
2147 if node not in lv_by_node[inst]:
2149 if vol['name'] in lv_by_node[inst][node]:
2155 raise errors.ParameterError(field)
2156 node_output.append(str(val))
2158 output.append(node_output)
2163 class LUAddNode(LogicalUnit):
2164 """Logical unit for adding node to the cluster.
2168 HTYPE = constants.HTYPE_NODE
2169 _OP_REQP = ["node_name"]
2171 def BuildHooksEnv(self):
2174 This will run on all nodes before, and on all nodes + the new node after.
2178 "OP_TARGET": self.op.node_name,
2179 "NODE_NAME": self.op.node_name,
2180 "NODE_PIP": self.op.primary_ip,
2181 "NODE_SIP": self.op.secondary_ip,
2183 nodes_0 = self.cfg.GetNodeList()
2184 nodes_1 = nodes_0 + [self.op.node_name, ]
2185 return env, nodes_0, nodes_1
2187 def CheckPrereq(self):
2188 """Check prerequisites.
2191 - the new node is not already in the config
2193 - its parameters (single/dual homed) matches the cluster
2195 Any errors are signalled by raising errors.OpPrereqError.
2198 node_name = self.op.node_name
2201 dns_data = utils.HostInfo(node_name)
2203 node = dns_data.name
2204 primary_ip = self.op.primary_ip = dns_data.ip
2205 secondary_ip = getattr(self.op, "secondary_ip", None)
2206 if secondary_ip is None:
2207 secondary_ip = primary_ip
2208 if not utils.IsValidIP(secondary_ip):
2209 raise errors.OpPrereqError("Invalid secondary IP given")
2210 self.op.secondary_ip = secondary_ip
2212 node_list = cfg.GetNodeList()
2213 if not self.op.readd and node in node_list:
2214 raise errors.OpPrereqError("Node %s is already in the configuration" %
2216 elif self.op.readd and node not in node_list:
2217 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2219 for existing_node_name in node_list:
2220 existing_node = cfg.GetNodeInfo(existing_node_name)
2222 if self.op.readd and node == existing_node_name:
2223 if (existing_node.primary_ip != primary_ip or
2224 existing_node.secondary_ip != secondary_ip):
2225 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2226 " address configuration as before")
2229 if (existing_node.primary_ip == primary_ip or
2230 existing_node.secondary_ip == primary_ip or
2231 existing_node.primary_ip == secondary_ip or
2232 existing_node.secondary_ip == secondary_ip):
2233 raise errors.OpPrereqError("New node ip address(es) conflict with"
2234 " existing node %s" % existing_node.name)
2236 # check that the type of the node (single versus dual homed) is the
2237 # same as for the master
2238 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2239 master_singlehomed = myself.secondary_ip == myself.primary_ip
2240 newbie_singlehomed = secondary_ip == primary_ip
2241 if master_singlehomed != newbie_singlehomed:
2242 if master_singlehomed:
2243 raise errors.OpPrereqError("The master has no private ip but the"
2244 " new node has one")
2246 raise errors.OpPrereqError("The master has a private ip but the"
2247 " new node doesn't have one")
2249 # checks reachablity
2250 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2251 raise errors.OpPrereqError("Node not reachable by ping")
2253 if not newbie_singlehomed:
2254 # check reachability from my secondary ip to newbie's secondary ip
2255 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2256 source=myself.secondary_ip):
2257 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2258 " based ping to noded port")
2260 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2261 mc_now, _ = self.cfg.GetMasterCandidateStats()
2262 master_candidate = mc_now < cp_size
2264 self.new_node = objects.Node(name=node,
2265 primary_ip=primary_ip,
2266 secondary_ip=secondary_ip,
2267 master_candidate=master_candidate,
2268 offline=False, drained=False)
2270 def Exec(self, feedback_fn):
2271 """Adds the new node to the cluster.
2274 new_node = self.new_node
2275 node = new_node.name
2277 # check connectivity
2278 result = self.rpc.call_version([node])[node]
2281 if constants.PROTOCOL_VERSION == result.data:
2282 logging.info("Communication to node %s fine, sw version %s match",
2285 raise errors.OpExecError("Version mismatch master version %s,"
2286 " node version %s" %
2287 (constants.PROTOCOL_VERSION, result.data))
2289 raise errors.OpExecError("Cannot get version from the new node")
2292 logging.info("Copy ssh key to node %s", node)
2293 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2295 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2296 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2302 keyarray.append(f.read())
2306 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2308 keyarray[3], keyarray[4], keyarray[5])
2310 msg = result.RemoteFailMsg()
2312 raise errors.OpExecError("Cannot transfer ssh keys to the"
2313 " new node: %s" % msg)
2315 # Add node to our /etc/hosts, and add key to known_hosts
2316 if self.cfg.GetClusterInfo().modify_etc_hosts:
2317 utils.AddHostToEtcHosts(new_node.name)
2319 if new_node.secondary_ip != new_node.primary_ip:
2320 result = self.rpc.call_node_has_ip_address(new_node.name,
2321 new_node.secondary_ip)
2322 if result.failed or not result.data:
2323 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2324 " you gave (%s). Please fix and re-run this"
2325 " command." % new_node.secondary_ip)
2327 node_verify_list = [self.cfg.GetMasterNode()]
2328 node_verify_param = {
2330 # TODO: do a node-net-test as well?
2333 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2334 self.cfg.GetClusterName())
2335 for verifier in node_verify_list:
2336 if result[verifier].failed or not result[verifier].data:
2337 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2338 " for remote verification" % verifier)
2339 if result[verifier].data['nodelist']:
2340 for failed in result[verifier].data['nodelist']:
2341 feedback_fn("ssh/hostname verification failed %s -> %s" %
2342 (verifier, result[verifier].data['nodelist'][failed]))
2343 raise errors.OpExecError("ssh/hostname verification failed.")
2346 _RedistributeAncillaryFiles(self)
2347 self.context.ReaddNode(new_node)
2349 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2350 self.context.AddNode(new_node)
2353 class LUSetNodeParams(LogicalUnit):
2354 """Modifies the parameters of a node.
2357 HPATH = "node-modify"
2358 HTYPE = constants.HTYPE_NODE
2359 _OP_REQP = ["node_name"]
2362 def CheckArguments(self):
2363 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2364 if node_name is None:
2365 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2366 self.op.node_name = node_name
2367 _CheckBooleanOpField(self.op, 'master_candidate')
2368 _CheckBooleanOpField(self.op, 'offline')
2369 _CheckBooleanOpField(self.op, 'drained')
2370 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2371 if all_mods.count(None) == 3:
2372 raise errors.OpPrereqError("Please pass at least one modification")
2373 if all_mods.count(True) > 1:
2374 raise errors.OpPrereqError("Can't set the node into more than one"
2375 " state at the same time")
2377 def ExpandNames(self):
2378 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2380 def BuildHooksEnv(self):
2383 This runs on the master node.
2387 "OP_TARGET": self.op.node_name,
2388 "MASTER_CANDIDATE": str(self.op.master_candidate),
2389 "OFFLINE": str(self.op.offline),
2390 "DRAINED": str(self.op.drained),
2392 nl = [self.cfg.GetMasterNode(),
2396 def CheckPrereq(self):
2397 """Check prerequisites.
2399 This only checks the instance list against the existing names.
2402 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2404 if ((self.op.master_candidate == False or self.op.offline == True or
2405 self.op.drained == True) and node.master_candidate):
2406 # we will demote the node from master_candidate
2407 if self.op.node_name == self.cfg.GetMasterNode():
2408 raise errors.OpPrereqError("The master node has to be a"
2409 " master candidate, online and not drained")
2410 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2411 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2412 if num_candidates <= cp_size:
2413 msg = ("Not enough master candidates (desired"
2414 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2416 self.LogWarning(msg)
2418 raise errors.OpPrereqError(msg)
2420 if (self.op.master_candidate == True and
2421 ((node.offline and not self.op.offline == False) or
2422 (node.drained and not self.op.drained == False))):
2423 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2424 " to master_candidate" % node.name)
2428 def Exec(self, feedback_fn):
2437 if self.op.offline is not None:
2438 node.offline = self.op.offline
2439 result.append(("offline", str(self.op.offline)))
2440 if self.op.offline == True:
2441 if node.master_candidate:
2442 node.master_candidate = False
2444 result.append(("master_candidate", "auto-demotion due to offline"))
2446 node.drained = False
2447 result.append(("drained", "clear drained status due to offline"))
2449 if self.op.master_candidate is not None:
2450 node.master_candidate = self.op.master_candidate
2452 result.append(("master_candidate", str(self.op.master_candidate)))
2453 if self.op.master_candidate == False:
2454 rrc = self.rpc.call_node_demote_from_mc(node.name)
2455 msg = rrc.RemoteFailMsg()
2457 self.LogWarning("Node failed to demote itself: %s" % msg)
2459 if self.op.drained is not None:
2460 node.drained = self.op.drained
2461 result.append(("drained", str(self.op.drained)))
2462 if self.op.drained == True:
2463 if node.master_candidate:
2464 node.master_candidate = False
2466 result.append(("master_candidate", "auto-demotion due to drain"))
2468 node.offline = False
2469 result.append(("offline", "clear offline status due to drain"))
2471 # this will trigger configuration file update, if needed
2472 self.cfg.Update(node)
2473 # this will trigger job queue propagation or cleanup
2475 self.context.ReaddNode(node)
2480 class LUPowercycleNode(NoHooksLU):
2481 """Powercycles a node.
2484 _OP_REQP = ["node_name", "force"]
2487 def CheckArguments(self):
2488 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2489 if node_name is None:
2490 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2491 self.op.node_name = node_name
2492 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2493 raise errors.OpPrereqError("The node is the master and the force"
2494 " parameter was not set")
2496 def ExpandNames(self):
2497 """Locking for PowercycleNode.
2499 This is a last-resource option and shouldn't block on other
2500 jobs. Therefore, we grab no locks.
2503 self.needed_locks = {}
2505 def CheckPrereq(self):
2506 """Check prerequisites.
2508 This LU has no prereqs.
2513 def Exec(self, feedback_fn):
2517 result = self.rpc.call_node_powercycle(self.op.node_name,
2518 self.cfg.GetHypervisorType())
2519 msg = result.RemoteFailMsg()
2521 raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2522 return result.payload
2525 class LUQueryClusterInfo(NoHooksLU):
2526 """Query cluster configuration.
2532 def ExpandNames(self):
2533 self.needed_locks = {}
2535 def CheckPrereq(self):
2536 """No prerequsites needed for this LU.
2541 def Exec(self, feedback_fn):
2542 """Return cluster config.
2545 cluster = self.cfg.GetClusterInfo()
2547 "software_version": constants.RELEASE_VERSION,
2548 "protocol_version": constants.PROTOCOL_VERSION,
2549 "config_version": constants.CONFIG_VERSION,
2550 "os_api_version": constants.OS_API_VERSION,
2551 "export_version": constants.EXPORT_VERSION,
2552 "architecture": (platform.architecture()[0], platform.machine()),
2553 "name": cluster.cluster_name,
2554 "master": cluster.master_node,
2555 "default_hypervisor": cluster.default_hypervisor,
2556 "enabled_hypervisors": cluster.enabled_hypervisors,
2557 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2558 for hypervisor in cluster.enabled_hypervisors]),
2559 "beparams": cluster.beparams,
2560 "nicparams": cluster.nicparams,
2561 "candidate_pool_size": cluster.candidate_pool_size,
2562 "master_netdev": cluster.master_netdev,
2563 "volume_group_name": cluster.volume_group_name,
2564 "file_storage_dir": cluster.file_storage_dir,
2570 class LUQueryConfigValues(NoHooksLU):
2571 """Return configuration values.
2576 _FIELDS_DYNAMIC = utils.FieldSet()
2577 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2579 def ExpandNames(self):
2580 self.needed_locks = {}
2582 _CheckOutputFields(static=self._FIELDS_STATIC,
2583 dynamic=self._FIELDS_DYNAMIC,
2584 selected=self.op.output_fields)
2586 def CheckPrereq(self):
2587 """No prerequisites.
2592 def Exec(self, feedback_fn):
2593 """Dump a representation of the cluster config to the standard output.
2597 for field in self.op.output_fields:
2598 if field == "cluster_name":
2599 entry = self.cfg.GetClusterName()
2600 elif field == "master_node":
2601 entry = self.cfg.GetMasterNode()
2602 elif field == "drain_flag":
2603 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2605 raise errors.ParameterError(field)
2606 values.append(entry)
2610 class LUActivateInstanceDisks(NoHooksLU):
2611 """Bring up an instance's disks.
2614 _OP_REQP = ["instance_name"]
2617 def ExpandNames(self):
2618 self._ExpandAndLockInstance()
2619 self.needed_locks[locking.LEVEL_NODE] = []
2620 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2622 def DeclareLocks(self, level):
2623 if level == locking.LEVEL_NODE:
2624 self._LockInstancesNodes()
2626 def CheckPrereq(self):
2627 """Check prerequisites.
2629 This checks that the instance is in the cluster.
2632 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2633 assert self.instance is not None, \
2634 "Cannot retrieve locked instance %s" % self.op.instance_name
2635 _CheckNodeOnline(self, self.instance.primary_node)
2637 def Exec(self, feedback_fn):
2638 """Activate the disks.
2641 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2643 raise errors.OpExecError("Cannot activate block devices")
2648 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2649 """Prepare the block devices for an instance.
2651 This sets up the block devices on all nodes.
2653 @type lu: L{LogicalUnit}
2654 @param lu: the logical unit on whose behalf we execute
2655 @type instance: L{objects.Instance}
2656 @param instance: the instance for whose disks we assemble
2657 @type ignore_secondaries: boolean
2658 @param ignore_secondaries: if true, errors on secondary nodes
2659 won't result in an error return from the function
2660 @return: False if the operation failed, otherwise a list of
2661 (host, instance_visible_name, node_visible_name)
2662 with the mapping from node devices to instance devices
2667 iname = instance.name
2668 # With the two passes mechanism we try to reduce the window of
2669 # opportunity for the race condition of switching DRBD to primary
2670 # before handshaking occured, but we do not eliminate it
2672 # The proper fix would be to wait (with some limits) until the
2673 # connection has been made and drbd transitions from WFConnection
2674 # into any other network-connected state (Connected, SyncTarget,
2677 # 1st pass, assemble on all nodes in secondary mode
2678 for inst_disk in instance.disks:
2679 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2680 lu.cfg.SetDiskID(node_disk, node)
2681 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2682 msg = result.RemoteFailMsg()
2684 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2685 " (is_primary=False, pass=1): %s",
2686 inst_disk.iv_name, node, msg)
2687 if not ignore_secondaries:
2690 # FIXME: race condition on drbd migration to primary
2692 # 2nd pass, do only the primary node
2693 for inst_disk in instance.disks:
2694 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2695 if node != instance.primary_node:
2697 lu.cfg.SetDiskID(node_disk, node)
2698 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2699 msg = result.RemoteFailMsg()
2701 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2702 " (is_primary=True, pass=2): %s",
2703 inst_disk.iv_name, node, msg)
2705 device_info.append((instance.primary_node, inst_disk.iv_name,
2708 # leave the disks configured for the primary node
2709 # this is a workaround that would be fixed better by
2710 # improving the logical/physical id handling
2711 for disk in instance.disks:
2712 lu.cfg.SetDiskID(disk, instance.primary_node)
2714 return disks_ok, device_info
2717 def _StartInstanceDisks(lu, instance, force):
2718 """Start the disks of an instance.
2721 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2722 ignore_secondaries=force)
2724 _ShutdownInstanceDisks(lu, instance)
2725 if force is not None and not force:
2726 lu.proc.LogWarning("", hint="If the message above refers to a"
2728 " you can retry the operation using '--force'.")
2729 raise errors.OpExecError("Disk consistency error")
2732 class LUDeactivateInstanceDisks(NoHooksLU):
2733 """Shutdown an instance's disks.
2736 _OP_REQP = ["instance_name"]
2739 def ExpandNames(self):
2740 self._ExpandAndLockInstance()
2741 self.needed_locks[locking.LEVEL_NODE] = []
2742 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2744 def DeclareLocks(self, level):
2745 if level == locking.LEVEL_NODE:
2746 self._LockInstancesNodes()
2748 def CheckPrereq(self):
2749 """Check prerequisites.
2751 This checks that the instance is in the cluster.
2754 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2755 assert self.instance is not None, \
2756 "Cannot retrieve locked instance %s" % self.op.instance_name
2758 def Exec(self, feedback_fn):
2759 """Deactivate the disks
2762 instance = self.instance
2763 _SafeShutdownInstanceDisks(self, instance)
2766 def _SafeShutdownInstanceDisks(lu, instance):
2767 """Shutdown block devices of an instance.
2769 This function checks if an instance is running, before calling
2770 _ShutdownInstanceDisks.
2773 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2774 [instance.hypervisor])
2775 ins_l = ins_l[instance.primary_node]
2776 if ins_l.failed or not isinstance(ins_l.data, list):
2777 raise errors.OpExecError("Can't contact node '%s'" %
2778 instance.primary_node)
2780 if instance.name in ins_l.data:
2781 raise errors.OpExecError("Instance is running, can't shutdown"
2784 _ShutdownInstanceDisks(lu, instance)
2787 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2788 """Shutdown block devices of an instance.
2790 This does the shutdown on all nodes of the instance.
2792 If the ignore_primary is false, errors on the primary node are
2797 for disk in instance.disks:
2798 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2799 lu.cfg.SetDiskID(top_disk, node)
2800 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2801 msg = result.RemoteFailMsg()
2803 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2804 disk.iv_name, node, msg)
2805 if not ignore_primary or node != instance.primary_node:
2810 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2811 """Checks if a node has enough free memory.
2813 This function check if a given node has the needed amount of free
2814 memory. In case the node has less memory or we cannot get the
2815 information from the node, this function raise an OpPrereqError
2818 @type lu: C{LogicalUnit}
2819 @param lu: a logical unit from which we get configuration data
2821 @param node: the node to check
2822 @type reason: C{str}
2823 @param reason: string to use in the error message
2824 @type requested: C{int}
2825 @param requested: the amount of memory in MiB to check for
2826 @type hypervisor_name: C{str}
2827 @param hypervisor_name: the hypervisor to ask for memory stats
2828 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2829 we cannot check the node
2832 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2833 nodeinfo[node].Raise()
2834 free_mem = nodeinfo[node].data.get('memory_free')
2835 if not isinstance(free_mem, int):
2836 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2837 " was '%s'" % (node, free_mem))
2838 if requested > free_mem:
2839 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2840 " needed %s MiB, available %s MiB" %
2841 (node, reason, requested, free_mem))
2844 class LUStartupInstance(LogicalUnit):
2845 """Starts an instance.
2848 HPATH = "instance-start"
2849 HTYPE = constants.HTYPE_INSTANCE
2850 _OP_REQP = ["instance_name", "force"]
2853 def ExpandNames(self):
2854 self._ExpandAndLockInstance()
2856 def BuildHooksEnv(self):
2859 This runs on master, primary and secondary nodes of the instance.
2863 "FORCE": self.op.force,
2865 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2866 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2869 def CheckPrereq(self):
2870 """Check prerequisites.
2872 This checks that the instance is in the cluster.
2875 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2876 assert self.instance is not None, \
2877 "Cannot retrieve locked instance %s" % self.op.instance_name
2880 self.beparams = getattr(self.op, "beparams", {})
2882 if not isinstance(self.beparams, dict):
2883 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2884 " dict" % (type(self.beparams), ))
2885 # fill the beparams dict
2886 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2887 self.op.beparams = self.beparams
2890 self.hvparams = getattr(self.op, "hvparams", {})
2892 if not isinstance(self.hvparams, dict):
2893 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2894 " dict" % (type(self.hvparams), ))
2896 # check hypervisor parameter syntax (locally)
2897 cluster = self.cfg.GetClusterInfo()
2898 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2899 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2901 filled_hvp.update(self.hvparams)
2902 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2903 hv_type.CheckParameterSyntax(filled_hvp)
2904 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2905 self.op.hvparams = self.hvparams
2907 _CheckNodeOnline(self, instance.primary_node)
2909 bep = self.cfg.GetClusterInfo().FillBE(instance)
2910 # check bridges existance
2911 _CheckInstanceBridgesExist(self, instance)
2913 remote_info = self.rpc.call_instance_info(instance.primary_node,
2915 instance.hypervisor)
2917 if not remote_info.data:
2918 _CheckNodeFreeMemory(self, instance.primary_node,
2919 "starting instance %s" % instance.name,
2920 bep[constants.BE_MEMORY], instance.hypervisor)
2922 def Exec(self, feedback_fn):
2923 """Start the instance.
2926 instance = self.instance
2927 force = self.op.force
2929 self.cfg.MarkInstanceUp(instance.name)
2931 node_current = instance.primary_node
2933 _StartInstanceDisks(self, instance, force)
2935 result = self.rpc.call_instance_start(node_current, instance,
2936 self.hvparams, self.beparams)
2937 msg = result.RemoteFailMsg()
2939 _ShutdownInstanceDisks(self, instance)
2940 raise errors.OpExecError("Could not start instance: %s" % msg)
2943 class LURebootInstance(LogicalUnit):
2944 """Reboot an instance.
2947 HPATH = "instance-reboot"
2948 HTYPE = constants.HTYPE_INSTANCE
2949 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2952 def ExpandNames(self):
2953 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2954 constants.INSTANCE_REBOOT_HARD,
2955 constants.INSTANCE_REBOOT_FULL]:
2956 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2957 (constants.INSTANCE_REBOOT_SOFT,
2958 constants.INSTANCE_REBOOT_HARD,
2959 constants.INSTANCE_REBOOT_FULL))
2960 self._ExpandAndLockInstance()
2962 def BuildHooksEnv(self):
2965 This runs on master, primary and secondary nodes of the instance.
2969 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2970 "REBOOT_TYPE": self.op.reboot_type,
2972 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2973 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2976 def CheckPrereq(self):
2977 """Check prerequisites.
2979 This checks that the instance is in the cluster.
2982 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2983 assert self.instance is not None, \
2984 "Cannot retrieve locked instance %s" % self.op.instance_name
2986 _CheckNodeOnline(self, instance.primary_node)
2988 # check bridges existance
2989 _CheckInstanceBridgesExist(self, instance)
2991 def Exec(self, feedback_fn):
2992 """Reboot the instance.
2995 instance = self.instance
2996 ignore_secondaries = self.op.ignore_secondaries
2997 reboot_type = self.op.reboot_type
2999 node_current = instance.primary_node
3001 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3002 constants.INSTANCE_REBOOT_HARD]:
3003 for disk in instance.disks:
3004 self.cfg.SetDiskID(disk, node_current)
3005 result = self.rpc.call_instance_reboot(node_current, instance,
3007 msg = result.RemoteFailMsg()
3009 raise errors.OpExecError("Could not reboot instance: %s" % msg)
3011 result = self.rpc.call_instance_shutdown(node_current, instance)
3012 msg = result.RemoteFailMsg()
3014 raise errors.OpExecError("Could not shutdown instance for"
3015 " full reboot: %s" % msg)
3016 _ShutdownInstanceDisks(self, instance)
3017 _StartInstanceDisks(self, instance, ignore_secondaries)
3018 result = self.rpc.call_instance_start(node_current, instance, None, None)
3019 msg = result.RemoteFailMsg()
3021 _ShutdownInstanceDisks(self, instance)
3022 raise errors.OpExecError("Could not start instance for"
3023 " full reboot: %s" % msg)
3025 self.cfg.MarkInstanceUp(instance.name)
3028 class LUShutdownInstance(LogicalUnit):
3029 """Shutdown an instance.
3032 HPATH = "instance-stop"
3033 HTYPE = constants.HTYPE_INSTANCE
3034 _OP_REQP = ["instance_name"]
3037 def ExpandNames(self):
3038 self._ExpandAndLockInstance()
3040 def BuildHooksEnv(self):
3043 This runs on master, primary and secondary nodes of the instance.
3046 env = _BuildInstanceHookEnvByObject(self, self.instance)
3047 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3050 def CheckPrereq(self):
3051 """Check prerequisites.
3053 This checks that the instance is in the cluster.
3056 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3057 assert self.instance is not None, \
3058 "Cannot retrieve locked instance %s" % self.op.instance_name
3059 _CheckNodeOnline(self, self.instance.primary_node)
3061 def Exec(self, feedback_fn):
3062 """Shutdown the instance.
3065 instance = self.instance
3066 node_current = instance.primary_node
3067 self.cfg.MarkInstanceDown(instance.name)
3068 result = self.rpc.call_instance_shutdown(node_current, instance)
3069 msg = result.RemoteFailMsg()
3071 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3073 _ShutdownInstanceDisks(self, instance)
3076 class LUReinstallInstance(LogicalUnit):
3077 """Reinstall an instance.
3080 HPATH = "instance-reinstall"
3081 HTYPE = constants.HTYPE_INSTANCE
3082 _OP_REQP = ["instance_name"]
3085 def ExpandNames(self):
3086 self._ExpandAndLockInstance()
3088 def BuildHooksEnv(self):
3091 This runs on master, primary and secondary nodes of the instance.
3094 env = _BuildInstanceHookEnvByObject(self, self.instance)
3095 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3098 def CheckPrereq(self):
3099 """Check prerequisites.
3101 This checks that the instance is in the cluster and is not running.
3104 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3105 assert instance is not None, \
3106 "Cannot retrieve locked instance %s" % self.op.instance_name
3107 _CheckNodeOnline(self, instance.primary_node)
3109 if instance.disk_template == constants.DT_DISKLESS:
3110 raise errors.OpPrereqError("Instance '%s' has no disks" %
3111 self.op.instance_name)
3112 if instance.admin_up:
3113 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3114 self.op.instance_name)
3115 remote_info = self.rpc.call_instance_info(instance.primary_node,
3117 instance.hypervisor)
3119 if remote_info.data:
3120 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3121 (self.op.instance_name,
3122 instance.primary_node))
3124 self.op.os_type = getattr(self.op, "os_type", None)
3125 if self.op.os_type is not None:
3127 pnode = self.cfg.GetNodeInfo(
3128 self.cfg.ExpandNodeName(instance.primary_node))
3130 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3132 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3134 if not isinstance(result.data, objects.OS):
3135 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3136 " primary node" % self.op.os_type)
3138 self.instance = instance
3140 def Exec(self, feedback_fn):
3141 """Reinstall the instance.
3144 inst = self.instance
3146 if self.op.os_type is not None:
3147 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3148 inst.os = self.op.os_type
3149 self.cfg.Update(inst)
3151 _StartInstanceDisks(self, inst, None)
3153 feedback_fn("Running the instance OS create scripts...")
3154 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3155 msg = result.RemoteFailMsg()
3157 raise errors.OpExecError("Could not install OS for instance %s"
3159 (inst.name, inst.primary_node, msg))
3161 _ShutdownInstanceDisks(self, inst)
3164 class LURenameInstance(LogicalUnit):
3165 """Rename an instance.
3168 HPATH = "instance-rename"
3169 HTYPE = constants.HTYPE_INSTANCE
3170 _OP_REQP = ["instance_name", "new_name"]
3172 def BuildHooksEnv(self):
3175 This runs on master, primary and secondary nodes of the instance.
3178 env = _BuildInstanceHookEnvByObject(self, self.instance)
3179 env["INSTANCE_NEW_NAME"] = self.op.new_name
3180 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3183 def CheckPrereq(self):
3184 """Check prerequisites.
3186 This checks that the instance is in the cluster and is not running.
3189 instance = self.cfg.GetInstanceInfo(
3190 self.cfg.ExpandInstanceName(self.op.instance_name))
3191 if instance is None:
3192 raise errors.OpPrereqError("Instance '%s' not known" %
3193 self.op.instance_name)
3194 _CheckNodeOnline(self, instance.primary_node)
3196 if instance.admin_up:
3197 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3198 self.op.instance_name)
3199 remote_info = self.rpc.call_instance_info(instance.primary_node,
3201 instance.hypervisor)
3203 if remote_info.data:
3204 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3205 (self.op.instance_name,
3206 instance.primary_node))
3207 self.instance = instance
3209 # new name verification
3210 name_info = utils.HostInfo(self.op.new_name)
3212 self.op.new_name = new_name = name_info.name
3213 instance_list = self.cfg.GetInstanceList()
3214 if new_name in instance_list:
3215 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3218 if not getattr(self.op, "ignore_ip", False):
3219 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3220 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3221 (name_info.ip, new_name))
3224 def Exec(self, feedback_fn):
3225 """Reinstall the instance.
3228 inst = self.instance
3229 old_name = inst.name
3231 if inst.disk_template == constants.DT_FILE:
3232 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3234 self.cfg.RenameInstance(inst.name, self.op.new_name)
3235 # Change the instance lock. This is definitely safe while we hold the BGL
3236 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3237 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3239 # re-read the instance from the configuration after rename
3240 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3242 if inst.disk_template == constants.DT_FILE:
3243 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3244 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3245 old_file_storage_dir,
3246 new_file_storage_dir)
3249 raise errors.OpExecError("Could not connect to node '%s' to rename"
3250 " directory '%s' to '%s' (but the instance"
3251 " has been renamed in Ganeti)" % (
3252 inst.primary_node, old_file_storage_dir,
3253 new_file_storage_dir))
3255 if not result.data[0]:
3256 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3257 " (but the instance has been renamed in"
3258 " Ganeti)" % (old_file_storage_dir,
3259 new_file_storage_dir))
3261 _StartInstanceDisks(self, inst, None)
3263 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3265 msg = result.RemoteFailMsg()
3267 msg = ("Could not run OS rename script for instance %s on node %s"
3268 " (but the instance has been renamed in Ganeti): %s" %
3269 (inst.name, inst.primary_node, msg))
3270 self.proc.LogWarning(msg)
3272 _ShutdownInstanceDisks(self, inst)
3275 class LURemoveInstance(LogicalUnit):
3276 """Remove an instance.
3279 HPATH = "instance-remove"
3280 HTYPE = constants.HTYPE_INSTANCE
3281 _OP_REQP = ["instance_name", "ignore_failures"]
3284 def ExpandNames(self):
3285 self._ExpandAndLockInstance()
3286 self.needed_locks[locking.LEVEL_NODE] = []
3287 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3289 def DeclareLocks(self, level):
3290 if level == locking.LEVEL_NODE:
3291 self._LockInstancesNodes()
3293 def BuildHooksEnv(self):
3296 This runs on master, primary and secondary nodes of the instance.
3299 env = _BuildInstanceHookEnvByObject(self, self.instance)
3300 nl = [self.cfg.GetMasterNode()]
3303 def CheckPrereq(self):
3304 """Check prerequisites.
3306 This checks that the instance is in the cluster.
3309 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3310 assert self.instance is not None, \
3311 "Cannot retrieve locked instance %s" % self.op.instance_name
3313 def Exec(self, feedback_fn):
3314 """Remove the instance.
3317 instance = self.instance
3318 logging.info("Shutting down instance %s on node %s",
3319 instance.name, instance.primary_node)
3321 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3322 msg = result.RemoteFailMsg()
3324 if self.op.ignore_failures:
3325 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3327 raise errors.OpExecError("Could not shutdown instance %s on"
3329 (instance.name, instance.primary_node, msg))
3331 logging.info("Removing block devices for instance %s", instance.name)
3333 if not _RemoveDisks(self, instance):
3334 if self.op.ignore_failures:
3335 feedback_fn("Warning: can't remove instance's disks")
3337 raise errors.OpExecError("Can't remove instance's disks")
3339 logging.info("Removing instance %s out of cluster config", instance.name)
3341 self.cfg.RemoveInstance(instance.name)
3342 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3345 class LUQueryInstances(NoHooksLU):
3346 """Logical unit for querying instances.
3349 _OP_REQP = ["output_fields", "names", "use_locking"]
3351 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3353 "disk_template", "ip", "mac", "bridge",
3354 "sda_size", "sdb_size", "vcpus", "tags",
3355 "network_port", "beparams",
3356 r"(disk)\.(size)/([0-9]+)",
3357 r"(disk)\.(sizes)", "disk_usage",
3358 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3359 r"(nic)\.(macs|ips|bridges)",
3360 r"(disk|nic)\.(count)",
3361 "serial_no", "hypervisor", "hvparams",] +
3363 for name in constants.HVS_PARAMETERS] +
3365 for name in constants.BES_PARAMETERS])
3366 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3369 def ExpandNames(self):
3370 _CheckOutputFields(static=self._FIELDS_STATIC,
3371 dynamic=self._FIELDS_DYNAMIC,
3372 selected=self.op.output_fields)
3374 self.needed_locks = {}
3375 self.share_locks[locking.LEVEL_INSTANCE] = 1
3376 self.share_locks[locking.LEVEL_NODE] = 1
3379 self.wanted = _GetWantedInstances(self, self.op.names)
3381 self.wanted = locking.ALL_SET
3383 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3384 self.do_locking = self.do_node_query and self.op.use_locking
3386 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3387 self.needed_locks[locking.LEVEL_NODE] = []
3388 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3390 def DeclareLocks(self, level):
3391 if level == locking.LEVEL_NODE and self.do_locking:
3392 self._LockInstancesNodes()
3394 def CheckPrereq(self):
3395 """Check prerequisites.
3400 def Exec(self, feedback_fn):
3401 """Computes the list of nodes and their attributes.
3404 all_info = self.cfg.GetAllInstancesInfo()
3405 if self.wanted == locking.ALL_SET:
3406 # caller didn't specify instance names, so ordering is not important
3408 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3410 instance_names = all_info.keys()
3411 instance_names = utils.NiceSort(instance_names)
3413 # caller did specify names, so we must keep the ordering
3415 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3417 tgt_set = all_info.keys()
3418 missing = set(self.wanted).difference(tgt_set)
3420 raise errors.OpExecError("Some instances were removed before"
3421 " retrieving their data: %s" % missing)
3422 instance_names = self.wanted
3424 instance_list = [all_info[iname] for iname in instance_names]
3426 # begin data gathering
3428 nodes = frozenset([inst.primary_node for inst in instance_list])
3429 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3433 if self.do_node_query:
3435 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3437 result = node_data[name]
3439 # offline nodes will be in both lists
3440 off_nodes.append(name)
3442 bad_nodes.append(name)
3445 live_data.update(result.data)
3446 # else no instance is alive
3448 live_data = dict([(name, {}) for name in instance_names])
3450 # end data gathering
3455 for instance in instance_list:
3457 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3458 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3459 for field in self.op.output_fields:
3460 st_match = self._FIELDS_STATIC.Matches(field)
3465 elif field == "pnode":
3466 val = instance.primary_node
3467 elif field == "snodes":
3468 val = list(instance.secondary_nodes)
3469 elif field == "admin_state":
3470 val = instance.admin_up
3471 elif field == "oper_state":
3472 if instance.primary_node in bad_nodes:
3475 val = bool(live_data.get(instance.name))
3476 elif field == "status":
3477 if instance.primary_node in off_nodes:
3478 val = "ERROR_nodeoffline"
3479 elif instance.primary_node in bad_nodes:
3480 val = "ERROR_nodedown"
3482 running = bool(live_data.get(instance.name))
3484 if instance.admin_up:
3489 if instance.admin_up:
3493 elif field == "oper_ram":
3494 if instance.primary_node in bad_nodes:
3496 elif instance.name in live_data:
3497 val = live_data[instance.name].get("memory", "?")
3500 elif field == "disk_template":
3501 val = instance.disk_template
3503 val = instance.nics[0].ip
3504 elif field == "bridge":
3505 val = instance.nics[0].bridge
3506 elif field == "mac":
3507 val = instance.nics[0].mac
3508 elif field == "sda_size" or field == "sdb_size":
3509 idx = ord(field[2]) - ord('a')
3511 val = instance.FindDisk(idx).size
3512 except errors.OpPrereqError:
3514 elif field == "disk_usage": # total disk usage per node
3515 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3516 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3517 elif field == "tags":
3518 val = list(instance.GetTags())
3519 elif field == "serial_no":
3520 val = instance.serial_no
3521 elif field == "network_port":
3522 val = instance.network_port
3523 elif field == "hypervisor":
3524 val = instance.hypervisor
3525 elif field == "hvparams":
3527 elif (field.startswith(HVPREFIX) and
3528 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3529 val = i_hv.get(field[len(HVPREFIX):], None)
3530 elif field == "beparams":
3532 elif (field.startswith(BEPREFIX) and
3533 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3534 val = i_be.get(field[len(BEPREFIX):], None)
3535 elif st_match and st_match.groups():
3536 # matches a variable list
3537 st_groups = st_match.groups()
3538 if st_groups and st_groups[0] == "disk":
3539 if st_groups[1] == "count":
3540 val = len(instance.disks)
3541 elif st_groups[1] == "sizes":
3542 val = [disk.size for disk in instance.disks]
3543 elif st_groups[1] == "size":
3545 val = instance.FindDisk(st_groups[2]).size
3546 except errors.OpPrereqError:
3549 assert False, "Unhandled disk parameter"
3550 elif st_groups[0] == "nic":
3551 if st_groups[1] == "count":
3552 val = len(instance.nics)
3553 elif st_groups[1] == "macs":
3554 val = [nic.mac for nic in instance.nics]
3555 elif st_groups[1] == "ips":
3556 val = [nic.ip for nic in instance.nics]
3557 elif st_groups[1] == "bridges":
3558 val = [nic.bridge for nic in instance.nics]
3561 nic_idx = int(st_groups[2])
3562 if nic_idx >= len(instance.nics):
3565 if st_groups[1] == "mac":
3566 val = instance.nics[nic_idx].mac
3567 elif st_groups[1] == "ip":
3568 val = instance.nics[nic_idx].ip
3569 elif st_groups[1] == "bridge":
3570 val = instance.nics[nic_idx].bridge
3572 assert False, "Unhandled NIC parameter"
3574 assert False, "Unhandled variable parameter"
3576 raise errors.ParameterError(field)
3583 class LUFailoverInstance(LogicalUnit):
3584 """Failover an instance.
3587 HPATH = "instance-failover"
3588 HTYPE = constants.HTYPE_INSTANCE
3589 _OP_REQP = ["instance_name", "ignore_consistency"]
3592 def ExpandNames(self):
3593 self._ExpandAndLockInstance()
3594 self.needed_locks[locking.LEVEL_NODE] = []
3595 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3597 def DeclareLocks(self, level):
3598 if level == locking.LEVEL_NODE:
3599 self._LockInstancesNodes()
3601 def BuildHooksEnv(self):
3604 This runs on master, primary and secondary nodes of the instance.
3608 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3610 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3611 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3614 def CheckPrereq(self):
3615 """Check prerequisites.
3617 This checks that the instance is in the cluster.
3620 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3621 assert self.instance is not None, \
3622 "Cannot retrieve locked instance %s" % self.op.instance_name
3624 bep = self.cfg.GetClusterInfo().FillBE(instance)
3625 if instance.disk_template not in constants.DTS_NET_MIRROR:
3626 raise errors.OpPrereqError("Instance's disk layout is not"
3627 " network mirrored, cannot failover.")
3629 secondary_nodes = instance.secondary_nodes
3630 if not secondary_nodes:
3631 raise errors.ProgrammerError("no secondary node but using "
3632 "a mirrored disk template")
3634 target_node = secondary_nodes[0]
3635 _CheckNodeOnline(self, target_node)
3636 _CheckNodeNotDrained(self, target_node)
3637 # check memory requirements on the secondary node
3638 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3639 instance.name, bep[constants.BE_MEMORY],
3640 instance.hypervisor)
3641 # check bridge existance
3642 _CheckInstanceBridgesExist(self, instance, node=target_node)
3644 def Exec(self, feedback_fn):
3645 """Failover an instance.
3647 The failover is done by shutting it down on its present node and
3648 starting it on the secondary.
3651 instance = self.instance
3653 source_node = instance.primary_node
3654 target_node = instance.secondary_nodes[0]
3656 feedback_fn("* checking disk consistency between source and target")
3657 for dev in instance.disks:
3658 # for drbd, these are drbd over lvm
3659 if not _CheckDiskConsistency(self, dev, target_node, False):
3660 if instance.admin_up and not self.op.ignore_consistency:
3661 raise errors.OpExecError("Disk %s is degraded on target node,"
3662 " aborting failover." % dev.iv_name)
3664 feedback_fn("* shutting down instance on source node")
3665 logging.info("Shutting down instance %s on node %s",
3666 instance.name, source_node)
3668 result = self.rpc.call_instance_shutdown(source_node, instance)
3669 msg = result.RemoteFailMsg()
3671 if self.op.ignore_consistency:
3672 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3673 " Proceeding anyway. Please make sure node"
3674 " %s is down. Error details: %s",
3675 instance.name, source_node, source_node, msg)
3677 raise errors.OpExecError("Could not shutdown instance %s on"
3679 (instance.name, source_node, msg))
3681 feedback_fn("* deactivating the instance's disks on source node")
3682 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3683 raise errors.OpExecError("Can't shut down the instance's disks.")
3685 instance.primary_node = target_node
3686 # distribute new instance config to the other nodes
3687 self.cfg.Update(instance)
3689 # Only start the instance if it's marked as up
3690 if instance.admin_up:
3691 feedback_fn("* activating the instance's disks on target node")
3692 logging.info("Starting instance %s on node %s",
3693 instance.name, target_node)
3695 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3696 ignore_secondaries=True)
3698 _ShutdownInstanceDisks(self, instance)
3699 raise errors.OpExecError("Can't activate the instance's disks")
3701 feedback_fn("* starting the instance on the target node")
3702 result = self.rpc.call_instance_start(target_node, instance, None, None)
3703 msg = result.RemoteFailMsg()
3705 _ShutdownInstanceDisks(self, instance)
3706 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3707 (instance.name, target_node, msg))
3710 class LUMigrateInstance(LogicalUnit):
3711 """Migrate an instance.
3713 This is migration without shutting down, compared to the failover,
3714 which is done with shutdown.
3717 HPATH = "instance-migrate"
3718 HTYPE = constants.HTYPE_INSTANCE
3719 _OP_REQP = ["instance_name", "live", "cleanup"]
3723 def ExpandNames(self):
3724 self._ExpandAndLockInstance()
3725 self.needed_locks[locking.LEVEL_NODE] = []
3726 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3728 def DeclareLocks(self, level):
3729 if level == locking.LEVEL_NODE:
3730 self._LockInstancesNodes()
3732 def BuildHooksEnv(self):
3735 This runs on master, primary and secondary nodes of the instance.
3738 env = _BuildInstanceHookEnvByObject(self, self.instance)
3739 env["MIGRATE_LIVE"] = self.op.live
3740 env["MIGRATE_CLEANUP"] = self.op.cleanup
3741 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3744 def CheckPrereq(self):
3745 """Check prerequisites.
3747 This checks that the instance is in the cluster.
3750 instance = self.cfg.GetInstanceInfo(
3751 self.cfg.ExpandInstanceName(self.op.instance_name))
3752 if instance is None:
3753 raise errors.OpPrereqError("Instance '%s' not known" %
3754 self.op.instance_name)
3756 if instance.disk_template != constants.DT_DRBD8:
3757 raise errors.OpPrereqError("Instance's disk layout is not"
3758 " drbd8, cannot migrate.")
3760 secondary_nodes = instance.secondary_nodes
3761 if not secondary_nodes:
3762 raise errors.ConfigurationError("No secondary node but using"
3763 " drbd8 disk template")
3765 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3767 target_node = secondary_nodes[0]
3768 # check memory requirements on the secondary node
3769 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3770 instance.name, i_be[constants.BE_MEMORY],
3771 instance.hypervisor)
3773 # check bridge existance
3774 _CheckInstanceBridgesExist(self, instance, node=target_node)
3776 if not self.op.cleanup:
3777 _CheckNodeNotDrained(self, target_node)
3778 result = self.rpc.call_instance_migratable(instance.primary_node,
3780 msg = result.RemoteFailMsg()
3782 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3785 self.instance = instance
3787 def _WaitUntilSync(self):
3788 """Poll with custom rpc for disk sync.
3790 This uses our own step-based rpc call.
3793 self.feedback_fn("* wait until resync is done")
3797 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3799 self.instance.disks)
3801 for node, nres in result.items():
3802 msg = nres.RemoteFailMsg()
3804 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3806 node_done, node_percent = nres.payload
3807 all_done = all_done and node_done
3808 if node_percent is not None:
3809 min_percent = min(min_percent, node_percent)
3811 if min_percent < 100:
3812 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3815 def _EnsureSecondary(self, node):
3816 """Demote a node to secondary.
3819 self.feedback_fn("* switching node %s to secondary mode" % node)
3821 for dev in self.instance.disks:
3822 self.cfg.SetDiskID(dev, node)
3824 result = self.rpc.call_blockdev_close(node, self.instance.name,
3825 self.instance.disks)
3826 msg = result.RemoteFailMsg()
3828 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3829 " error %s" % (node, msg))
3831 def _GoStandalone(self):
3832 """Disconnect from the network.
3835 self.feedback_fn("* changing into standalone mode")
3836 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3837 self.instance.disks)
3838 for node, nres in result.items():
3839 msg = nres.RemoteFailMsg()
3841 raise errors.OpExecError("Cannot disconnect disks node %s,"
3842 " error %s" % (node, msg))
3844 def _GoReconnect(self, multimaster):
3845 """Reconnect to the network.
3851 msg = "single-master"
3852 self.feedback_fn("* changing disks into %s mode" % msg)
3853 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3854 self.instance.disks,
3855 self.instance.name, multimaster)
3856 for node, nres in result.items():
3857 msg = nres.RemoteFailMsg()
3859 raise errors.OpExecError("Cannot change disks config on node %s,"
3860 " error: %s" % (node, msg))
3862 def _ExecCleanup(self):
3863 """Try to cleanup after a failed migration.
3865 The cleanup is done by:
3866 - check that the instance is running only on one node
3867 (and update the config if needed)
3868 - change disks on its secondary node to secondary
3869 - wait until disks are fully synchronized
3870 - disconnect from the network
3871 - change disks into single-master mode
3872 - wait again until disks are fully synchronized
3875 instance = self.instance
3876 target_node = self.target_node
3877 source_node = self.source_node
3879 # check running on only one node
3880 self.feedback_fn("* checking where the instance actually runs"
3881 " (if this hangs, the hypervisor might be in"
3883 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3884 for node, result in ins_l.items():
3886 if not isinstance(result.data, list):
3887 raise errors.OpExecError("Can't contact node '%s'" % node)
3889 runningon_source = instance.name in ins_l[source_node].data
3890 runningon_target = instance.name in ins_l[target_node].data
3892 if runningon_source and runningon_target:
3893 raise errors.OpExecError("Instance seems to be running on two nodes,"
3894 " or the hypervisor is confused. You will have"
3895 " to ensure manually that it runs only on one"
3896 " and restart this operation.")
3898 if not (runningon_source or runningon_target):
3899 raise errors.OpExecError("Instance does not seem to be running at all."
3900 " In this case, it's safer to repair by"
3901 " running 'gnt-instance stop' to ensure disk"
3902 " shutdown, and then restarting it.")
3904 if runningon_target:
3905 # the migration has actually succeeded, we need to update the config
3906 self.feedback_fn("* instance running on secondary node (%s),"
3907 " updating config" % target_node)
3908 instance.primary_node = target_node
3909 self.cfg.Update(instance)
3910 demoted_node = source_node
3912 self.feedback_fn("* instance confirmed to be running on its"
3913 " primary node (%s)" % source_node)
3914 demoted_node = target_node
3916 self._EnsureSecondary(demoted_node)
3918 self._WaitUntilSync()
3919 except errors.OpExecError:
3920 # we ignore here errors, since if the device is standalone, it
3921 # won't be able to sync
3923 self._GoStandalone()
3924 self._GoReconnect(False)
3925 self._WaitUntilSync()
3927 self.feedback_fn("* done")
3929 def _RevertDiskStatus(self):
3930 """Try to revert the disk status after a failed migration.
3933 target_node = self.target_node
3935 self._EnsureSecondary(target_node)
3936 self._GoStandalone()
3937 self._GoReconnect(False)
3938 self._WaitUntilSync()
3939 except errors.OpExecError, err:
3940 self.LogWarning("Migration failed and I can't reconnect the"
3941 " drives: error '%s'\n"
3942 "Please look and recover the instance status" %
3945 def _AbortMigration(self):
3946 """Call the hypervisor code to abort a started migration.
3949 instance = self.instance
3950 target_node = self.target_node
3951 migration_info = self.migration_info
3953 abort_result = self.rpc.call_finalize_migration(target_node,
3957 abort_msg = abort_result.RemoteFailMsg()
3959 logging.error("Aborting migration failed on target node %s: %s" %
3960 (target_node, abort_msg))
3961 # Don't raise an exception here, as we stil have to try to revert the
3962 # disk status, even if this step failed.
3964 def _ExecMigration(self):
3965 """Migrate an instance.
3967 The migrate is done by:
3968 - change the disks into dual-master mode
3969 - wait until disks are fully synchronized again
3970 - migrate the instance
3971 - change disks on the new secondary node (the old primary) to secondary
3972 - wait until disks are fully synchronized
3973 - change disks into single-master mode
3976 instance = self.instance
3977 target_node = self.target_node
3978 source_node = self.source_node
3980 self.feedback_fn("* checking disk consistency between source and target")
3981 for dev in instance.disks:
3982 if not _CheckDiskConsistency(self, dev, target_node, False):
3983 raise errors.OpExecError("Disk %s is degraded or not fully"
3984 " synchronized on target node,"
3985 " aborting migrate." % dev.iv_name)
3987 # First get the migration information from the remote node
3988 result = self.rpc.call_migration_info(source_node, instance)
3989 msg = result.RemoteFailMsg()
3991 log_err = ("Failed fetching source migration information from %s: %s" %
3993 logging.error(log_err)
3994 raise errors.OpExecError(log_err)
3996 self.migration_info = migration_info = result.payload
3998 # Then switch the disks to master/master mode
3999 self._EnsureSecondary(target_node)
4000 self._GoStandalone()
4001 self._GoReconnect(True)
4002 self._WaitUntilSync()
4004 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4005 result = self.rpc.call_accept_instance(target_node,
4008 self.nodes_ip[target_node])
4010 msg = result.RemoteFailMsg()
4012 logging.error("Instance pre-migration failed, trying to revert"
4013 " disk status: %s", msg)
4014 self._AbortMigration()
4015 self._RevertDiskStatus()
4016 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4017 (instance.name, msg))
4019 self.feedback_fn("* migrating instance to %s" % target_node)
4021 result = self.rpc.call_instance_migrate(source_node, instance,
4022 self.nodes_ip[target_node],
4024 msg = result.RemoteFailMsg()
4026 logging.error("Instance migration failed, trying to revert"
4027 " disk status: %s", msg)
4028 self._AbortMigration()
4029 self._RevertDiskStatus()
4030 raise errors.OpExecError("Could not migrate instance %s: %s" %
4031 (instance.name, msg))
4034 instance.primary_node = target_node
4035 # distribute new instance config to the other nodes
4036 self.cfg.Update(instance)
4038 result = self.rpc.call_finalize_migration(target_node,
4042 msg = result.RemoteFailMsg()
4044 logging.error("Instance migration succeeded, but finalization failed:"
4046 raise errors.OpExecError("Could not finalize instance migration: %s" %
4049 self._EnsureSecondary(source_node)
4050 self._WaitUntilSync()
4051 self._GoStandalone()
4052 self._GoReconnect(False)
4053 self._WaitUntilSync()
4055 self.feedback_fn("* done")
4057 def Exec(self, feedback_fn):
4058 """Perform the migration.
4061 self.feedback_fn = feedback_fn
4063 self.source_node = self.instance.primary_node
4064 self.target_node = self.instance.secondary_nodes[0]
4065 self.all_nodes = [self.source_node, self.target_node]
4067 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4068 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4071 return self._ExecCleanup()
4073 return self._ExecMigration()
4076 def _CreateBlockDev(lu, node, instance, device, force_create,
4078 """Create a tree of block devices on a given node.
4080 If this device type has to be created on secondaries, create it and
4083 If not, just recurse to children keeping the same 'force' value.
4085 @param lu: the lu on whose behalf we execute
4086 @param node: the node on which to create the device
4087 @type instance: L{objects.Instance}
4088 @param instance: the instance which owns the device
4089 @type device: L{objects.Disk}
4090 @param device: the device to create
4091 @type force_create: boolean
4092 @param force_create: whether to force creation of this device; this
4093 will be change to True whenever we find a device which has
4094 CreateOnSecondary() attribute
4095 @param info: the extra 'metadata' we should attach to the device
4096 (this will be represented as a LVM tag)
4097 @type force_open: boolean
4098 @param force_open: this parameter will be passes to the
4099 L{backend.BlockdevCreate} function where it specifies
4100 whether we run on primary or not, and it affects both
4101 the child assembly and the device own Open() execution
4104 if device.CreateOnSecondary():
4108 for child in device.children:
4109 _CreateBlockDev(lu, node, instance, child, force_create,
4112 if not force_create:
4115 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4118 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4119 """Create a single block device on a given node.
4121 This will not recurse over children of the device, so they must be
4124 @param lu: the lu on whose behalf we execute
4125 @param node: the node on which to create the device
4126 @type instance: L{objects.Instance}
4127 @param instance: the instance which owns the device
4128 @type device: L{objects.Disk}
4129 @param device: the device to create
4130 @param info: the extra 'metadata' we should attach to the device
4131 (this will be represented as a LVM tag)
4132 @type force_open: boolean
4133 @param force_open: this parameter will be passes to the
4134 L{backend.BlockdevCreate} function where it specifies
4135 whether we run on primary or not, and it affects both
4136 the child assembly and the device own Open() execution
4139 lu.cfg.SetDiskID(device, node)
4140 result = lu.rpc.call_blockdev_create(node, device, device.size,
4141 instance.name, force_open, info)
4142 msg = result.RemoteFailMsg()
4144 raise errors.OpExecError("Can't create block device %s on"
4145 " node %s for instance %s: %s" %
4146 (device, node, instance.name, msg))
4147 if device.physical_id is None:
4148 device.physical_id = result.payload
4151 def _GenerateUniqueNames(lu, exts):
4152 """Generate a suitable LV name.
4154 This will generate a logical volume name for the given instance.
4159 new_id = lu.cfg.GenerateUniqueID()
4160 results.append("%s%s" % (new_id, val))
4164 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4166 """Generate a drbd8 device complete with its children.
4169 port = lu.cfg.AllocatePort()
4170 vgname = lu.cfg.GetVGName()
4171 shared_secret = lu.cfg.GenerateDRBDSecret()
4172 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4173 logical_id=(vgname, names[0]))
4174 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4175 logical_id=(vgname, names[1]))
4176 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4177 logical_id=(primary, secondary, port,
4180 children=[dev_data, dev_meta],
4185 def _GenerateDiskTemplate(lu, template_name,
4186 instance_name, primary_node,
4187 secondary_nodes, disk_info,
4188 file_storage_dir, file_driver,
4190 """Generate the entire disk layout for a given template type.
4193 #TODO: compute space requirements
4195 vgname = lu.cfg.GetVGName()
4196 disk_count = len(disk_info)
4198 if template_name == constants.DT_DISKLESS:
4200 elif template_name == constants.DT_PLAIN:
4201 if len(secondary_nodes) != 0:
4202 raise errors.ProgrammerError("Wrong template configuration")
4204 names = _GenerateUniqueNames(lu, [".disk%d" % i
4205 for i in range(disk_count)])
4206 for idx, disk in enumerate(disk_info):
4207 disk_index = idx + base_index
4208 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4209 logical_id=(vgname, names[idx]),
4210 iv_name="disk/%d" % disk_index,
4212 disks.append(disk_dev)
4213 elif template_name == constants.DT_DRBD8:
4214 if len(secondary_nodes) != 1:
4215 raise errors.ProgrammerError("Wrong template configuration")
4216 remote_node = secondary_nodes[0]
4217 minors = lu.cfg.AllocateDRBDMinor(
4218 [primary_node, remote_node] * len(disk_info), instance_name)
4221 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4222 for i in range(disk_count)]):
4223 names.append(lv_prefix + "_data")
4224 names.append(lv_prefix + "_meta")
4225 for idx, disk in enumerate(disk_info):
4226 disk_index = idx + base_index
4227 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4228 disk["size"], names[idx*2:idx*2+2],
4229 "disk/%d" % disk_index,
4230 minors[idx*2], minors[idx*2+1])
4231 disk_dev.mode = disk["mode"]
4232 disks.append(disk_dev)
4233 elif template_name == constants.DT_FILE:
4234 if len(secondary_nodes) != 0:
4235 raise errors.ProgrammerError("Wrong template configuration")
4237 for idx, disk in enumerate(disk_info):
4238 disk_index = idx + base_index
4239 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4240 iv_name="disk/%d" % disk_index,
4241 logical_id=(file_driver,
4242 "%s/disk%d" % (file_storage_dir,
4245 disks.append(disk_dev)
4247 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4251 def _GetInstanceInfoText(instance):
4252 """Compute that text that should be added to the disk's metadata.
4255 return "originstname+%s" % instance.name
4258 def _CreateDisks(lu, instance):
4259 """Create all disks for an instance.
4261 This abstracts away some work from AddInstance.
4263 @type lu: L{LogicalUnit}
4264 @param lu: the logical unit on whose behalf we execute
4265 @type instance: L{objects.Instance}
4266 @param instance: the instance whose disks we should create
4268 @return: the success of the creation
4271 info = _GetInstanceInfoText(instance)
4272 pnode = instance.primary_node
4274 if instance.disk_template == constants.DT_FILE:
4275 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4276 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4278 if result.failed or not result.data:
4279 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4281 if not result.data[0]:
4282 raise errors.OpExecError("Failed to create directory '%s'" %
4285 # Note: this needs to be kept in sync with adding of disks in
4286 # LUSetInstanceParams
4287 for device in instance.disks:
4288 logging.info("Creating volume %s for instance %s",
4289 device.iv_name, instance.name)
4291 for node in instance.all_nodes:
4292 f_create = node == pnode
4293 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4296 def _RemoveDisks(lu, instance):
4297 """Remove all disks for an instance.
4299 This abstracts away some work from `AddInstance()` and
4300 `RemoveInstance()`. Note that in case some of the devices couldn't
4301 be removed, the removal will continue with the other ones (compare
4302 with `_CreateDisks()`).
4304 @type lu: L{LogicalUnit}
4305 @param lu: the logical unit on whose behalf we execute
4306 @type instance: L{objects.Instance}
4307 @param instance: the instance whose disks we should remove
4309 @return: the success of the removal
4312 logging.info("Removing block devices for instance %s", instance.name)
4315 for device in instance.disks:
4316 for node, disk in device.ComputeNodeTree(instance.primary_node):
4317 lu.cfg.SetDiskID(disk, node)
4318 msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4320 lu.LogWarning("Could not remove block device %s on node %s,"
4321 " continuing anyway: %s", device.iv_name, node, msg)
4324 if instance.disk_template == constants.DT_FILE:
4325 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4326 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4328 if result.failed or not result.data:
4329 logging.error("Could not remove directory '%s'", file_storage_dir)
4335 def _ComputeDiskSize(disk_template, disks):
4336 """Compute disk size requirements in the volume group
4339 # Required free disk space as a function of disk and swap space
4341 constants.DT_DISKLESS: None,
4342 constants.DT_PLAIN: sum(d["size"] for d in disks),
4343 # 128 MB are added for drbd metadata for each disk
4344 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4345 constants.DT_FILE: None,
4348 if disk_template not in req_size_dict:
4349 raise errors.ProgrammerError("Disk template '%s' size requirement"
4350 " is unknown" % disk_template)
4352 return req_size_dict[disk_template]
4355 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4356 """Hypervisor parameter validation.
4358 This function abstract the hypervisor parameter validation to be
4359 used in both instance create and instance modify.
4361 @type lu: L{LogicalUnit}
4362 @param lu: the logical unit for which we check
4363 @type nodenames: list
4364 @param nodenames: the list of nodes on which we should check
4365 @type hvname: string
4366 @param hvname: the name of the hypervisor we should use
4367 @type hvparams: dict
4368 @param hvparams: the parameters which we need to check
4369 @raise errors.OpPrereqError: if the parameters are not valid
4372 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4375 for node in nodenames:
4379 msg = info.RemoteFailMsg()
4381 raise errors.OpPrereqError("Hypervisor parameter validation"
4382 " failed on node %s: %s" % (node, msg))
4385 class LUCreateInstance(LogicalUnit):
4386 """Create an instance.
4389 HPATH = "instance-add"
4390 HTYPE = constants.HTYPE_INSTANCE
4391 _OP_REQP = ["instance_name", "disks", "disk_template",
4393 "wait_for_sync", "ip_check", "nics",
4394 "hvparams", "beparams"]
4397 def _ExpandNode(self, node):
4398 """Expands and checks one node name.
4401 node_full = self.cfg.ExpandNodeName(node)
4402 if node_full is None:
4403 raise errors.OpPrereqError("Unknown node %s" % node)
4406 def ExpandNames(self):
4407 """ExpandNames for CreateInstance.
4409 Figure out the right locks for instance creation.
4412 self.needed_locks = {}
4414 # set optional parameters to none if they don't exist
4415 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4416 if not hasattr(self.op, attr):
4417 setattr(self.op, attr, None)
4419 # cheap checks, mostly valid constants given
4421 # verify creation mode
4422 if self.op.mode not in (constants.INSTANCE_CREATE,
4423 constants.INSTANCE_IMPORT):
4424 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4427 # disk template and mirror node verification
4428 if self.op.disk_template not in constants.DISK_TEMPLATES:
4429 raise errors.OpPrereqError("Invalid disk template name")
4431 if self.op.hypervisor is None:
4432 self.op.hypervisor = self.cfg.GetHypervisorType()
4434 cluster = self.cfg.GetClusterInfo()
4435 enabled_hvs = cluster.enabled_hypervisors
4436 if self.op.hypervisor not in enabled_hvs:
4437 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4438 " cluster (%s)" % (self.op.hypervisor,
4439 ",".join(enabled_hvs)))
4441 # check hypervisor parameter syntax (locally)
4442 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4443 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4445 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4446 hv_type.CheckParameterSyntax(filled_hvp)
4448 # fill and remember the beparams dict
4449 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4450 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4453 #### instance parameters check
4455 # instance name verification
4456 hostname1 = utils.HostInfo(self.op.instance_name)
4457 self.op.instance_name = instance_name = hostname1.name
4459 # this is just a preventive check, but someone might still add this
4460 # instance in the meantime, and creation will fail at lock-add time
4461 if instance_name in self.cfg.GetInstanceList():
4462 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4465 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4469 for idx, nic in enumerate(self.op.nics):
4470 nic_mode_req = nic.get("mode", None)
4471 nic_mode = nic_mode_req
4472 if nic_mode is None:
4473 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4475 # in routed mode, for the first nic, the default ip is 'auto'
4476 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4477 default_ip_mode = constants.VALUE_AUTO
4479 default_ip_mode = constants.VALUE_NONE
4481 # ip validity checks
4482 ip = nic.get("ip", default_ip_mode)
4483 if ip is None or ip.lower() == constants.VALUE_NONE:
4485 elif ip.lower() == constants.VALUE_AUTO:
4486 nic_ip = hostname1.ip
4488 if not utils.IsValidIP(ip):
4489 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4490 " like a valid IP" % ip)
4493 # TODO: check the ip for uniqueness !!
4494 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4495 raise errors.OpPrereqError("Routed nic mode requires an ip address")
4497 # MAC address verification
4498 mac = nic.get("mac", constants.VALUE_AUTO)
4499 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4500 if not utils.IsValidMac(mac.lower()):
4501 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4503 # bridge verification
4504 bridge = nic.get("bridge", None)
4505 link = nic.get("link", None)
4507 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4508 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4509 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4515 nicparams[constants.NIC_MODE] = nic_mode_req
4517 nicparams[constants.NIC_LINK] = link
4519 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4521 objects.NIC.CheckParameterSyntax(check_params)
4522 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4524 # disk checks/pre-build
4526 for disk in self.op.disks:
4527 mode = disk.get("mode", constants.DISK_RDWR)
4528 if mode not in constants.DISK_ACCESS_SET:
4529 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4531 size = disk.get("size", None)
4533 raise errors.OpPrereqError("Missing disk size")
4537 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4538 self.disks.append({"size": size, "mode": mode})
4540 # used in CheckPrereq for ip ping check
4541 self.check_ip = hostname1.ip
4543 # file storage checks
4544 if (self.op.file_driver and
4545 not self.op.file_driver in constants.FILE_DRIVER):
4546 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4547 self.op.file_driver)
4549 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4550 raise errors.OpPrereqError("File storage directory path not absolute")
4552 ### Node/iallocator related checks
4553 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4554 raise errors.OpPrereqError("One and only one of iallocator and primary"
4555 " node must be given")
4557 if self.op.iallocator:
4558 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4560 self.op.pnode = self._ExpandNode(self.op.pnode)
4561 nodelist = [self.op.pnode]
4562 if self.op.snode is not None:
4563 self.op.snode = self._ExpandNode(self.op.snode)
4564 nodelist.append(self.op.snode)
4565 self.needed_locks[locking.LEVEL_NODE] = nodelist
4567 # in case of import lock the source node too
4568 if self.op.mode == constants.INSTANCE_IMPORT:
4569 src_node = getattr(self.op, "src_node", None)
4570 src_path = getattr(self.op, "src_path", None)
4572 if src_path is None:
4573 self.op.src_path = src_path = self.op.instance_name
4575 if src_node is None:
4576 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4577 self.op.src_node = None
4578 if os.path.isabs(src_path):
4579 raise errors.OpPrereqError("Importing an instance from an absolute"
4580 " path requires a source node option.")
4582 self.op.src_node = src_node = self._ExpandNode(src_node)
4583 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4584 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4585 if not os.path.isabs(src_path):
4586 self.op.src_path = src_path = \
4587 os.path.join(constants.EXPORT_DIR, src_path)
4589 else: # INSTANCE_CREATE
4590 if getattr(self.op, "os_type", None) is None:
4591 raise errors.OpPrereqError("No guest OS specified")
4593 def _RunAllocator(self):
4594 """Run the allocator based on input opcode.
4597 nics = [n.ToDict() for n in self.nics]
4598 ial = IAllocator(self,
4599 mode=constants.IALLOCATOR_MODE_ALLOC,
4600 name=self.op.instance_name,
4601 disk_template=self.op.disk_template,
4604 vcpus=self.be_full[constants.BE_VCPUS],
4605 mem_size=self.be_full[constants.BE_MEMORY],
4608 hypervisor=self.op.hypervisor,
4611 ial.Run(self.op.iallocator)
4614 raise errors.OpPrereqError("Can't compute nodes using"
4615 " iallocator '%s': %s" % (self.op.iallocator,
4617 if len(ial.nodes) != ial.required_nodes:
4618 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4619 " of nodes (%s), required %s" %
4620 (self.op.iallocator, len(ial.nodes),
4621 ial.required_nodes))
4622 self.op.pnode = ial.nodes[0]
4623 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4624 self.op.instance_name, self.op.iallocator,
4625 ", ".join(ial.nodes))
4626 if ial.required_nodes == 2:
4627 self.op.snode = ial.nodes[1]
4629 def BuildHooksEnv(self):
4632 This runs on master, primary and secondary nodes of the instance.
4636 "ADD_MODE": self.op.mode,
4638 if self.op.mode == constants.INSTANCE_IMPORT:
4639 env["SRC_NODE"] = self.op.src_node
4640 env["SRC_PATH"] = self.op.src_path
4641 env["SRC_IMAGES"] = self.src_images
4643 env.update(_BuildInstanceHookEnv(
4644 name=self.op.instance_name,
4645 primary_node=self.op.pnode,
4646 secondary_nodes=self.secondaries,
4647 status=self.op.start,
4648 os_type=self.op.os_type,
4649 memory=self.be_full[constants.BE_MEMORY],
4650 vcpus=self.be_full[constants.BE_VCPUS],
4651 nics=_PreBuildNICHooksList(self, self.nics),
4652 disk_template=self.op.disk_template,
4653 disks=[(d["size"], d["mode"]) for d in self.disks],
4656 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4661 def CheckPrereq(self):
4662 """Check prerequisites.
4665 if (not self.cfg.GetVGName() and
4666 self.op.disk_template not in constants.DTS_NOT_LVM):
4667 raise errors.OpPrereqError("Cluster does not support lvm-based"
4670 if self.op.mode == constants.INSTANCE_IMPORT:
4671 src_node = self.op.src_node
4672 src_path = self.op.src_path
4674 if src_node is None:
4675 exp_list = self.rpc.call_export_list(
4676 self.acquired_locks[locking.LEVEL_NODE])
4678 for node in exp_list:
4679 if not exp_list[node].failed and src_path in exp_list[node].data:
4681 self.op.src_node = src_node = node
4682 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4686 raise errors.OpPrereqError("No export found for relative path %s" %
4689 _CheckNodeOnline(self, src_node)
4690 result = self.rpc.call_export_info(src_node, src_path)
4693 raise errors.OpPrereqError("No export found in dir %s" % src_path)
4695 export_info = result.data
4696 if not export_info.has_section(constants.INISECT_EXP):
4697 raise errors.ProgrammerError("Corrupted export config")
4699 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4700 if (int(ei_version) != constants.EXPORT_VERSION):
4701 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4702 (ei_version, constants.EXPORT_VERSION))
4704 # Check that the new instance doesn't have less disks than the export
4705 instance_disks = len(self.disks)
4706 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4707 if instance_disks < export_disks:
4708 raise errors.OpPrereqError("Not enough disks to import."
4709 " (instance: %d, export: %d)" %
4710 (instance_disks, export_disks))
4712 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4714 for idx in range(export_disks):
4715 option = 'disk%d_dump' % idx
4716 if export_info.has_option(constants.INISECT_INS, option):
4717 # FIXME: are the old os-es, disk sizes, etc. useful?
4718 export_name = export_info.get(constants.INISECT_INS, option)
4719 image = os.path.join(src_path, export_name)
4720 disk_images.append(image)
4722 disk_images.append(False)
4724 self.src_images = disk_images
4726 old_name = export_info.get(constants.INISECT_INS, 'name')
4727 # FIXME: int() here could throw a ValueError on broken exports
4728 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4729 if self.op.instance_name == old_name:
4730 for idx, nic in enumerate(self.nics):
4731 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4732 nic_mac_ini = 'nic%d_mac' % idx
4733 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4735 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4736 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4737 if self.op.start and not self.op.ip_check:
4738 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4739 " adding an instance in start mode")
4741 if self.op.ip_check:
4742 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4743 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4744 (self.check_ip, self.op.instance_name))
4746 #### mac address generation
4747 # By generating here the mac address both the allocator and the hooks get
4748 # the real final mac address rather than the 'auto' or 'generate' value.
4749 # There is a race condition between the generation and the instance object
4750 # creation, which means that we know the mac is valid now, but we're not
4751 # sure it will be when we actually add the instance. If things go bad
4752 # adding the instance will abort because of a duplicate mac, and the
4753 # creation job will fail.
4754 for nic in self.nics:
4755 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4756 nic.mac = self.cfg.GenerateMAC()
4760 if self.op.iallocator is not None:
4761 self._RunAllocator()
4763 #### node related checks
4765 # check primary node
4766 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4767 assert self.pnode is not None, \
4768 "Cannot retrieve locked node %s" % self.op.pnode
4770 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4773 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4776 self.secondaries = []
4778 # mirror node verification
4779 if self.op.disk_template in constants.DTS_NET_MIRROR:
4780 if self.op.snode is None:
4781 raise errors.OpPrereqError("The networked disk templates need"
4783 if self.op.snode == pnode.name:
4784 raise errors.OpPrereqError("The secondary node cannot be"
4785 " the primary node.")
4786 _CheckNodeOnline(self, self.op.snode)
4787 _CheckNodeNotDrained(self, self.op.snode)
4788 self.secondaries.append(self.op.snode)
4790 nodenames = [pnode.name] + self.secondaries
4792 req_size = _ComputeDiskSize(self.op.disk_template,
4795 # Check lv size requirements
4796 if req_size is not None:
4797 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4799 for node in nodenames:
4800 info = nodeinfo[node]
4804 raise errors.OpPrereqError("Cannot get current information"
4805 " from node '%s'" % node)
4806 vg_free = info.get('vg_free', None)
4807 if not isinstance(vg_free, int):
4808 raise errors.OpPrereqError("Can't compute free disk space on"
4810 if req_size > info['vg_free']:
4811 raise errors.OpPrereqError("Not enough disk space on target node %s."
4812 " %d MB available, %d MB required" %
4813 (node, info['vg_free'], req_size))
4815 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4818 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4820 if not isinstance(result.data, objects.OS):
4821 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4822 " primary node" % self.op.os_type)
4824 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4826 # memory check on primary node
4828 _CheckNodeFreeMemory(self, self.pnode.name,
4829 "creating instance %s" % self.op.instance_name,
4830 self.be_full[constants.BE_MEMORY],
4833 def Exec(self, feedback_fn):
4834 """Create and add the instance to the cluster.
4837 instance = self.op.instance_name
4838 pnode_name = self.pnode.name
4840 ht_kind = self.op.hypervisor
4841 if ht_kind in constants.HTS_REQ_PORT:
4842 network_port = self.cfg.AllocatePort()
4846 ##if self.op.vnc_bind_address is None:
4847 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4849 # this is needed because os.path.join does not accept None arguments
4850 if self.op.file_storage_dir is None:
4851 string_file_storage_dir = ""
4853 string_file_storage_dir = self.op.file_storage_dir
4855 # build the full file storage dir path
4856 file_storage_dir = os.path.normpath(os.path.join(
4857 self.cfg.GetFileStorageDir(),
4858 string_file_storage_dir, instance))
4861 disks = _GenerateDiskTemplate(self,
4862 self.op.disk_template,
4863 instance, pnode_name,
4867 self.op.file_driver,
4870 iobj = objects.Instance(name=instance, os=self.op.os_type,
4871 primary_node=pnode_name,
4872 nics=self.nics, disks=disks,
4873 disk_template=self.op.disk_template,
4875 network_port=network_port,
4876 beparams=self.op.beparams,
4877 hvparams=self.op.hvparams,
4878 hypervisor=self.op.hypervisor,
4881 feedback_fn("* creating instance disks...")
4883 _CreateDisks(self, iobj)
4884 except errors.OpExecError:
4885 self.LogWarning("Device creation failed, reverting...")
4887 _RemoveDisks(self, iobj)
4889 self.cfg.ReleaseDRBDMinors(instance)
4892 feedback_fn("adding instance %s to cluster config" % instance)
4894 self.cfg.AddInstance(iobj)
4895 # Declare that we don't want to remove the instance lock anymore, as we've
4896 # added the instance to the config
4897 del self.remove_locks[locking.LEVEL_INSTANCE]
4898 # Unlock all the nodes
4899 if self.op.mode == constants.INSTANCE_IMPORT:
4900 nodes_keep = [self.op.src_node]
4901 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4902 if node != self.op.src_node]
4903 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4904 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4906 self.context.glm.release(locking.LEVEL_NODE)
4907 del self.acquired_locks[locking.LEVEL_NODE]
4909 if self.op.wait_for_sync:
4910 disk_abort = not _WaitForSync(self, iobj)
4911 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4912 # make sure the disks are not degraded (still sync-ing is ok)
4914 feedback_fn("* checking mirrors status")
4915 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4920 _RemoveDisks(self, iobj)
4921 self.cfg.RemoveInstance(iobj.name)
4922 # Make sure the instance lock gets removed
4923 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4924 raise errors.OpExecError("There are some degraded disks for"
4927 feedback_fn("creating os for instance %s on node %s" %
4928 (instance, pnode_name))
4930 if iobj.disk_template != constants.DT_DISKLESS:
4931 if self.op.mode == constants.INSTANCE_CREATE:
4932 feedback_fn("* running the instance OS create scripts...")
4933 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4934 msg = result.RemoteFailMsg()
4936 raise errors.OpExecError("Could not add os for instance %s"
4938 (instance, pnode_name, msg))
4940 elif self.op.mode == constants.INSTANCE_IMPORT:
4941 feedback_fn("* running the instance OS import scripts...")
4942 src_node = self.op.src_node
4943 src_images = self.src_images
4944 cluster_name = self.cfg.GetClusterName()
4945 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4946 src_node, src_images,
4948 import_result.Raise()
4949 for idx, result in enumerate(import_result.data):
4951 self.LogWarning("Could not import the image %s for instance"
4952 " %s, disk %d, on node %s" %
4953 (src_images[idx], instance, idx, pnode_name))
4955 # also checked in the prereq part
4956 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4960 iobj.admin_up = True
4961 self.cfg.Update(iobj)
4962 logging.info("Starting instance %s on node %s", instance, pnode_name)
4963 feedback_fn("* starting instance...")
4964 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4965 msg = result.RemoteFailMsg()
4967 raise errors.OpExecError("Could not start instance: %s" % msg)
4970 class LUConnectConsole(NoHooksLU):
4971 """Connect to an instance's console.
4973 This is somewhat special in that it returns the command line that
4974 you need to run on the master node in order to connect to the
4978 _OP_REQP = ["instance_name"]
4981 def ExpandNames(self):
4982 self._ExpandAndLockInstance()
4984 def CheckPrereq(self):
4985 """Check prerequisites.
4987 This checks that the instance is in the cluster.
4990 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4991 assert self.instance is not None, \
4992 "Cannot retrieve locked instance %s" % self.op.instance_name
4993 _CheckNodeOnline(self, self.instance.primary_node)
4995 def Exec(self, feedback_fn):
4996 """Connect to the console of an instance
4999 instance = self.instance
5000 node = instance.primary_node
5002 node_insts = self.rpc.call_instance_list([node],
5003 [instance.hypervisor])[node]
5006 if instance.name not in node_insts.data:
5007 raise errors.OpExecError("Instance %s is not running." % instance.name)
5009 logging.debug("Connecting to console of %s on %s", instance.name, node)
5011 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5012 cluster = self.cfg.GetClusterInfo()
5013 # beparams and hvparams are passed separately, to avoid editing the
5014 # instance and then saving the defaults in the instance itself.
5015 hvparams = cluster.FillHV(instance)
5016 beparams = cluster.FillBE(instance)
5017 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5020 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5023 class LUReplaceDisks(LogicalUnit):
5024 """Replace the disks of an instance.
5027 HPATH = "mirrors-replace"
5028 HTYPE = constants.HTYPE_INSTANCE
5029 _OP_REQP = ["instance_name", "mode", "disks"]
5032 def CheckArguments(self):
5033 if not hasattr(self.op, "remote_node"):
5034 self.op.remote_node = None
5035 if not hasattr(self.op, "iallocator"):
5036 self.op.iallocator = None
5038 # check for valid parameter combination
5039 cnt = [self.op.remote_node, self.op.iallocator].count(None)
5040 if self.op.mode == constants.REPLACE_DISK_CHG:
5042 raise errors.OpPrereqError("When changing the secondary either an"
5043 " iallocator script must be used or the"
5046 raise errors.OpPrereqError("Give either the iallocator or the new"
5047 " secondary, not both")
5048 else: # not replacing the secondary
5050 raise errors.OpPrereqError("The iallocator and new node options can"
5051 " be used only when changing the"
5054 def ExpandNames(self):
5055 self._ExpandAndLockInstance()
5057 if self.op.iallocator is not None:
5058 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5059 elif self.op.remote_node is not None:
5060 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5061 if remote_node is None:
5062 raise errors.OpPrereqError("Node '%s' not known" %
5063 self.op.remote_node)
5064 self.op.remote_node = remote_node
5065 # Warning: do not remove the locking of the new secondary here
5066 # unless DRBD8.AddChildren is changed to work in parallel;
5067 # currently it doesn't since parallel invocations of
5068 # FindUnusedMinor will conflict
5069 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5070 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5072 self.needed_locks[locking.LEVEL_NODE] = []
5073 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5075 def DeclareLocks(self, level):
5076 # If we're not already locking all nodes in the set we have to declare the
5077 # instance's primary/secondary nodes.
5078 if (level == locking.LEVEL_NODE and
5079 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5080 self._LockInstancesNodes()
5082 def _RunAllocator(self):
5083 """Compute a new secondary node using an IAllocator.
5086 ial = IAllocator(self,
5087 mode=constants.IALLOCATOR_MODE_RELOC,
5088 name=self.op.instance_name,
5089 relocate_from=[self.sec_node])
5091 ial.Run(self.op.iallocator)
5094 raise errors.OpPrereqError("Can't compute nodes using"
5095 " iallocator '%s': %s" % (self.op.iallocator,
5097 if len(ial.nodes) != ial.required_nodes:
5098 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5099 " of nodes (%s), required %s" %
5100 (len(ial.nodes), ial.required_nodes))
5101 self.op.remote_node = ial.nodes[0]
5102 self.LogInfo("Selected new secondary for the instance: %s",
5103 self.op.remote_node)
5105 def BuildHooksEnv(self):
5108 This runs on the master, the primary and all the secondaries.
5112 "MODE": self.op.mode,
5113 "NEW_SECONDARY": self.op.remote_node,
5114 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5116 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5118 self.cfg.GetMasterNode(),
5119 self.instance.primary_node,
5121 if self.op.remote_node is not None:
5122 nl.append(self.op.remote_node)
5125 def CheckPrereq(self):
5126 """Check prerequisites.
5128 This checks that the instance is in the cluster.
5131 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5132 assert instance is not None, \
5133 "Cannot retrieve locked instance %s" % self.op.instance_name
5134 self.instance = instance
5136 if instance.disk_template != constants.DT_DRBD8:
5137 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5140 if len(instance.secondary_nodes) != 1:
5141 raise errors.OpPrereqError("The instance has a strange layout,"
5142 " expected one secondary but found %d" %
5143 len(instance.secondary_nodes))
5145 self.sec_node = instance.secondary_nodes[0]
5147 if self.op.iallocator is not None:
5148 self._RunAllocator()
5150 remote_node = self.op.remote_node
5151 if remote_node is not None:
5152 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5153 assert self.remote_node_info is not None, \
5154 "Cannot retrieve locked node %s" % remote_node
5156 self.remote_node_info = None
5157 if remote_node == instance.primary_node:
5158 raise errors.OpPrereqError("The specified node is the primary node of"
5160 elif remote_node == self.sec_node:
5161 raise errors.OpPrereqError("The specified node is already the"
5162 " secondary node of the instance.")
5164 if self.op.mode == constants.REPLACE_DISK_PRI:
5165 n1 = self.tgt_node = instance.primary_node
5166 n2 = self.oth_node = self.sec_node
5167 elif self.op.mode == constants.REPLACE_DISK_SEC:
5168 n1 = self.tgt_node = self.sec_node
5169 n2 = self.oth_node = instance.primary_node
5170 elif self.op.mode == constants.REPLACE_DISK_CHG:
5171 n1 = self.new_node = remote_node
5172 n2 = self.oth_node = instance.primary_node
5173 self.tgt_node = self.sec_node
5174 _CheckNodeNotDrained(self, remote_node)
5176 raise errors.ProgrammerError("Unhandled disk replace mode")
5178 _CheckNodeOnline(self, n1)
5179 _CheckNodeOnline(self, n2)
5181 if not self.op.disks:
5182 self.op.disks = range(len(instance.disks))
5184 for disk_idx in self.op.disks:
5185 instance.FindDisk(disk_idx)
5187 def _ExecD8DiskOnly(self, feedback_fn):
5188 """Replace a disk on the primary or secondary for dbrd8.
5190 The algorithm for replace is quite complicated:
5192 1. for each disk to be replaced:
5194 1. create new LVs on the target node with unique names
5195 1. detach old LVs from the drbd device
5196 1. rename old LVs to name_replaced.<time_t>
5197 1. rename new LVs to old LVs
5198 1. attach the new LVs (with the old names now) to the drbd device
5200 1. wait for sync across all devices
5202 1. for each modified disk:
5204 1. remove old LVs (which have the name name_replaces.<time_t>)
5206 Failures are not very well handled.
5210 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5211 instance = self.instance
5213 vgname = self.cfg.GetVGName()
5216 tgt_node = self.tgt_node
5217 oth_node = self.oth_node
5219 # Step: check device activation
5220 self.proc.LogStep(1, steps_total, "check device existence")
5221 info("checking volume groups")
5222 my_vg = cfg.GetVGName()
5223 results = self.rpc.call_vg_list([oth_node, tgt_node])
5225 raise errors.OpExecError("Can't list volume groups on the nodes")
5226 for node in oth_node, tgt_node:
5228 if res.failed or not res.data or my_vg not in res.data:
5229 raise errors.OpExecError("Volume group '%s' not found on %s" %
5231 for idx, dev in enumerate(instance.disks):
5232 if idx not in self.op.disks:
5234 for node in tgt_node, oth_node:
5235 info("checking disk/%d on %s" % (idx, node))
5236 cfg.SetDiskID(dev, node)
5237 result = self.rpc.call_blockdev_find(node, dev)
5238 msg = result.RemoteFailMsg()
5239 if not msg and not result.payload:
5240 msg = "disk not found"
5242 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5245 # Step: check other node consistency
5246 self.proc.LogStep(2, steps_total, "check peer consistency")
5247 for idx, dev in enumerate(instance.disks):
5248 if idx not in self.op.disks:
5250 info("checking disk/%d consistency on %s" % (idx, oth_node))
5251 if not _CheckDiskConsistency(self, dev, oth_node,
5252 oth_node==instance.primary_node):
5253 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5254 " to replace disks on this node (%s)" %
5255 (oth_node, tgt_node))
5257 # Step: create new storage
5258 self.proc.LogStep(3, steps_total, "allocate new storage")
5259 for idx, dev in enumerate(instance.disks):
5260 if idx not in self.op.disks:
5263 cfg.SetDiskID(dev, tgt_node)
5264 lv_names = [".disk%d_%s" % (idx, suf)
5265 for suf in ["data", "meta"]]
5266 names = _GenerateUniqueNames(self, lv_names)
5267 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5268 logical_id=(vgname, names[0]))
5269 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5270 logical_id=(vgname, names[1]))
5271 new_lvs = [lv_data, lv_meta]
5272 old_lvs = dev.children
5273 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5274 info("creating new local storage on %s for %s" %
5275 (tgt_node, dev.iv_name))
5276 # we pass force_create=True to force the LVM creation
5277 for new_lv in new_lvs:
5278 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5279 _GetInstanceInfoText(instance), False)
5281 # Step: for each lv, detach+rename*2+attach
5282 self.proc.LogStep(4, steps_total, "change drbd configuration")
5283 for dev, old_lvs, new_lvs in iv_names.itervalues():
5284 info("detaching %s drbd from local storage" % dev.iv_name)
5285 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5286 msg = result.RemoteFailMsg()
5288 raise errors.OpExecError("Can't detach drbd from local storage on node"
5289 " %s for device %s: %s" %
5290 (tgt_node, dev.iv_name, msg))
5292 #cfg.Update(instance)
5294 # ok, we created the new LVs, so now we know we have the needed
5295 # storage; as such, we proceed on the target node to rename
5296 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5297 # using the assumption that logical_id == physical_id (which in
5298 # turn is the unique_id on that node)
5300 # FIXME(iustin): use a better name for the replaced LVs
5301 temp_suffix = int(time.time())
5302 ren_fn = lambda d, suff: (d.physical_id[0],
5303 d.physical_id[1] + "_replaced-%s" % suff)
5304 # build the rename list based on what LVs exist on the node
5306 for to_ren in old_lvs:
5307 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5308 if not result.RemoteFailMsg() and result.payload:
5310 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5312 info("renaming the old LVs on the target node")
5313 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5314 msg = result.RemoteFailMsg()
5316 raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5318 # now we rename the new LVs to the old LVs
5319 info("renaming the new LVs on the target node")
5320 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5321 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5322 msg = result.RemoteFailMsg()
5324 raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5327 for old, new in zip(old_lvs, new_lvs):
5328 new.logical_id = old.logical_id
5329 cfg.SetDiskID(new, tgt_node)
5331 for disk in old_lvs:
5332 disk.logical_id = ren_fn(disk, temp_suffix)
5333 cfg.SetDiskID(disk, tgt_node)
5335 # now that the new lvs have the old name, we can add them to the device
5336 info("adding new mirror component on %s" % tgt_node)
5337 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5338 msg = result.RemoteFailMsg()
5340 for new_lv in new_lvs:
5341 msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5343 warning("Can't rollback device %s: %s", dev, msg,
5344 hint="cleanup manually the unused logical volumes")
5345 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5347 dev.children = new_lvs
5348 cfg.Update(instance)
5350 # Step: wait for sync
5352 # this can fail as the old devices are degraded and _WaitForSync
5353 # does a combined result over all disks, so we don't check its
5355 self.proc.LogStep(5, steps_total, "sync devices")
5356 _WaitForSync(self, instance, unlock=True)
5358 # so check manually all the devices
5359 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5360 cfg.SetDiskID(dev, instance.primary_node)
5361 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5362 msg = result.RemoteFailMsg()
5363 if not msg and not result.payload:
5364 msg = "disk not found"
5366 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5368 if result.payload[5]:
5369 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5371 # Step: remove old storage
5372 self.proc.LogStep(6, steps_total, "removing old storage")
5373 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5374 info("remove logical volumes for %s" % name)
5376 cfg.SetDiskID(lv, tgt_node)
5377 msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5379 warning("Can't remove old LV: %s" % msg,
5380 hint="manually remove unused LVs")
5383 def _ExecD8Secondary(self, feedback_fn):
5384 """Replace the secondary node for drbd8.
5386 The algorithm for replace is quite complicated:
5387 - for all disks of the instance:
5388 - create new LVs on the new node with same names
5389 - shutdown the drbd device on the old secondary
5390 - disconnect the drbd network on the primary
5391 - create the drbd device on the new secondary
5392 - network attach the drbd on the primary, using an artifice:
5393 the drbd code for Attach() will connect to the network if it
5394 finds a device which is connected to the good local disks but
5396 - wait for sync across all devices
5397 - remove all disks from the old secondary
5399 Failures are not very well handled.
5403 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5404 instance = self.instance
5408 old_node = self.tgt_node
5409 new_node = self.new_node
5410 pri_node = instance.primary_node
5412 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5413 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5414 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5417 # Step: check device activation
5418 self.proc.LogStep(1, steps_total, "check device existence")
5419 info("checking volume groups")
5420 my_vg = cfg.GetVGName()
5421 results = self.rpc.call_vg_list([pri_node, new_node])
5422 for node in pri_node, new_node:
5424 if res.failed or not res.data or my_vg not in res.data:
5425 raise errors.OpExecError("Volume group '%s' not found on %s" %
5427 for idx, dev in enumerate(instance.disks):
5428 if idx not in self.op.disks:
5430 info("checking disk/%d on %s" % (idx, pri_node))
5431 cfg.SetDiskID(dev, pri_node)
5432 result = self.rpc.call_blockdev_find(pri_node, dev)
5433 msg = result.RemoteFailMsg()
5434 if not msg and not result.payload:
5435 msg = "disk not found"
5437 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5438 (idx, pri_node, msg))
5440 # Step: check other node consistency
5441 self.proc.LogStep(2, steps_total, "check peer consistency")
5442 for idx, dev in enumerate(instance.disks):
5443 if idx not in self.op.disks:
5445 info("checking disk/%d consistency on %s" % (idx, pri_node))
5446 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5447 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5448 " unsafe to replace the secondary" %
5451 # Step: create new storage
5452 self.proc.LogStep(3, steps_total, "allocate new storage")
5453 for idx, dev in enumerate(instance.disks):
5454 info("adding new local storage on %s for disk/%d" %
5456 # we pass force_create=True to force LVM creation
5457 for new_lv in dev.children:
5458 _CreateBlockDev(self, new_node, instance, new_lv, True,
5459 _GetInstanceInfoText(instance), False)
5461 # Step 4: dbrd minors and drbd setups changes
5462 # after this, we must manually remove the drbd minors on both the
5463 # error and the success paths
5464 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5466 logging.debug("Allocated minors %s" % (minors,))
5467 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5468 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5470 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5471 # create new devices on new_node; note that we create two IDs:
5472 # one without port, so the drbd will be activated without
5473 # networking information on the new node at this stage, and one
5474 # with network, for the latter activation in step 4
5475 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5476 if pri_node == o_node1:
5481 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5482 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5484 iv_names[idx] = (dev, dev.children, new_net_id)
5485 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5487 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5488 logical_id=new_alone_id,
5489 children=dev.children)
5491 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5492 _GetInstanceInfoText(instance), False)
5493 except errors.GenericError:
5494 self.cfg.ReleaseDRBDMinors(instance.name)
5497 for idx, dev in enumerate(instance.disks):
5498 # we have new devices, shutdown the drbd on the old secondary
5499 info("shutting down drbd for disk/%d on old node" % idx)
5500 cfg.SetDiskID(dev, old_node)
5501 msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5503 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5505 hint="Please cleanup this device manually as soon as possible")
5507 info("detaching primary drbds from the network (=> standalone)")
5508 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5509 instance.disks)[pri_node]
5511 msg = result.RemoteFailMsg()
5513 # detaches didn't succeed (unlikely)
5514 self.cfg.ReleaseDRBDMinors(instance.name)
5515 raise errors.OpExecError("Can't detach the disks from the network on"
5516 " old node: %s" % (msg,))
5518 # if we managed to detach at least one, we update all the disks of
5519 # the instance to point to the new secondary
5520 info("updating instance configuration")
5521 for dev, _, new_logical_id in iv_names.itervalues():
5522 dev.logical_id = new_logical_id
5523 cfg.SetDiskID(dev, pri_node)
5524 cfg.Update(instance)
5526 # and now perform the drbd attach
5527 info("attaching primary drbds to new secondary (standalone => connected)")
5528 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5529 instance.disks, instance.name,
5531 for to_node, to_result in result.items():
5532 msg = to_result.RemoteFailMsg()
5534 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5535 hint="please do a gnt-instance info to see the"
5538 # this can fail as the old devices are degraded and _WaitForSync
5539 # does a combined result over all disks, so we don't check its
5541 self.proc.LogStep(5, steps_total, "sync devices")
5542 _WaitForSync(self, instance, unlock=True)
5544 # so check manually all the devices
5545 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5546 cfg.SetDiskID(dev, pri_node)
5547 result = self.rpc.call_blockdev_find(pri_node, dev)
5548 msg = result.RemoteFailMsg()
5549 if not msg and not result.payload:
5550 msg = "disk not found"
5552 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5554 if result.payload[5]:
5555 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5557 self.proc.LogStep(6, steps_total, "removing old storage")
5558 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5559 info("remove logical volumes for disk/%d" % idx)
5561 cfg.SetDiskID(lv, old_node)
5562 msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5564 warning("Can't remove LV on old secondary: %s", msg,
5565 hint="Cleanup stale volumes by hand")
5567 def Exec(self, feedback_fn):
5568 """Execute disk replacement.
5570 This dispatches the disk replacement to the appropriate handler.
5573 instance = self.instance
5575 # Activate the instance disks if we're replacing them on a down instance
5576 if not instance.admin_up:
5577 _StartInstanceDisks(self, instance, True)
5579 if self.op.mode == constants.REPLACE_DISK_CHG:
5580 fn = self._ExecD8Secondary
5582 fn = self._ExecD8DiskOnly
5584 ret = fn(feedback_fn)
5586 # Deactivate the instance disks if we're replacing them on a down instance
5587 if not instance.admin_up:
5588 _SafeShutdownInstanceDisks(self, instance)
5593 class LUGrowDisk(LogicalUnit):
5594 """Grow a disk of an instance.
5598 HTYPE = constants.HTYPE_INSTANCE
5599 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5602 def ExpandNames(self):
5603 self._ExpandAndLockInstance()
5604 self.needed_locks[locking.LEVEL_NODE] = []
5605 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5607 def DeclareLocks(self, level):
5608 if level == locking.LEVEL_NODE:
5609 self._LockInstancesNodes()
5611 def BuildHooksEnv(self):
5614 This runs on the master, the primary and all the secondaries.
5618 "DISK": self.op.disk,
5619 "AMOUNT": self.op.amount,
5621 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5623 self.cfg.GetMasterNode(),
5624 self.instance.primary_node,
5628 def CheckPrereq(self):
5629 """Check prerequisites.
5631 This checks that the instance is in the cluster.
5634 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5635 assert instance is not None, \
5636 "Cannot retrieve locked instance %s" % self.op.instance_name
5637 nodenames = list(instance.all_nodes)
5638 for node in nodenames:
5639 _CheckNodeOnline(self, node)
5642 self.instance = instance
5644 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5645 raise errors.OpPrereqError("Instance's disk layout does not support"
5648 self.disk = instance.FindDisk(self.op.disk)
5650 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5651 instance.hypervisor)
5652 for node in nodenames:
5653 info = nodeinfo[node]
5654 if info.failed or not info.data:
5655 raise errors.OpPrereqError("Cannot get current information"
5656 " from node '%s'" % node)
5657 vg_free = info.data.get('vg_free', None)
5658 if not isinstance(vg_free, int):
5659 raise errors.OpPrereqError("Can't compute free disk space on"
5661 if self.op.amount > vg_free:
5662 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5663 " %d MiB available, %d MiB required" %
5664 (node, vg_free, self.op.amount))
5666 def Exec(self, feedback_fn):
5667 """Execute disk grow.
5670 instance = self.instance
5672 for node in instance.all_nodes:
5673 self.cfg.SetDiskID(disk, node)
5674 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5675 msg = result.RemoteFailMsg()
5677 raise errors.OpExecError("Grow request failed to node %s: %s" %
5679 disk.RecordGrow(self.op.amount)
5680 self.cfg.Update(instance)
5681 if self.op.wait_for_sync:
5682 disk_abort = not _WaitForSync(self, instance)
5684 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5685 " status.\nPlease check the instance.")
5688 class LUQueryInstanceData(NoHooksLU):
5689 """Query runtime instance data.
5692 _OP_REQP = ["instances", "static"]
5695 def ExpandNames(self):
5696 self.needed_locks = {}
5697 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5699 if not isinstance(self.op.instances, list):
5700 raise errors.OpPrereqError("Invalid argument type 'instances'")
5702 if self.op.instances:
5703 self.wanted_names = []
5704 for name in self.op.instances:
5705 full_name = self.cfg.ExpandInstanceName(name)
5706 if full_name is None:
5707 raise errors.OpPrereqError("Instance '%s' not known" % name)
5708 self.wanted_names.append(full_name)
5709 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5711 self.wanted_names = None
5712 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5714 self.needed_locks[locking.LEVEL_NODE] = []
5715 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5717 def DeclareLocks(self, level):
5718 if level == locking.LEVEL_NODE:
5719 self._LockInstancesNodes()
5721 def CheckPrereq(self):
5722 """Check prerequisites.
5724 This only checks the optional instance list against the existing names.
5727 if self.wanted_names is None:
5728 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5730 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5731 in self.wanted_names]
5734 def _ComputeDiskStatus(self, instance, snode, dev):
5735 """Compute block device status.
5738 static = self.op.static
5740 self.cfg.SetDiskID(dev, instance.primary_node)
5741 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5742 if dev_pstatus.offline:
5745 msg = dev_pstatus.RemoteFailMsg()
5747 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5748 (instance.name, msg))
5749 dev_pstatus = dev_pstatus.payload
5753 if dev.dev_type in constants.LDS_DRBD:
5754 # we change the snode then (otherwise we use the one passed in)
5755 if dev.logical_id[0] == instance.primary_node:
5756 snode = dev.logical_id[1]
5758 snode = dev.logical_id[0]
5760 if snode and not static:
5761 self.cfg.SetDiskID(dev, snode)
5762 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5763 if dev_sstatus.offline:
5766 msg = dev_sstatus.RemoteFailMsg()
5768 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5769 (instance.name, msg))
5770 dev_sstatus = dev_sstatus.payload
5775 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5776 for child in dev.children]
5781 "iv_name": dev.iv_name,
5782 "dev_type": dev.dev_type,
5783 "logical_id": dev.logical_id,
5784 "physical_id": dev.physical_id,
5785 "pstatus": dev_pstatus,
5786 "sstatus": dev_sstatus,
5787 "children": dev_children,
5793 def Exec(self, feedback_fn):
5794 """Gather and return data"""
5797 cluster = self.cfg.GetClusterInfo()
5799 for instance in self.wanted_instances:
5800 if not self.op.static:
5801 remote_info = self.rpc.call_instance_info(instance.primary_node,
5803 instance.hypervisor)
5805 remote_info = remote_info.data
5806 if remote_info and "state" in remote_info:
5809 remote_state = "down"
5812 if instance.admin_up:
5815 config_state = "down"
5817 disks = [self._ComputeDiskStatus(instance, None, device)
5818 for device in instance.disks]
5821 "name": instance.name,
5822 "config_state": config_state,
5823 "run_state": remote_state,
5824 "pnode": instance.primary_node,
5825 "snodes": instance.secondary_nodes,
5827 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5829 "hypervisor": instance.hypervisor,
5830 "network_port": instance.network_port,
5831 "hv_instance": instance.hvparams,
5832 "hv_actual": cluster.FillHV(instance),
5833 "be_instance": instance.beparams,
5834 "be_actual": cluster.FillBE(instance),
5837 result[instance.name] = idict
5842 class LUSetInstanceParams(LogicalUnit):
5843 """Modifies an instances's parameters.
5846 HPATH = "instance-modify"
5847 HTYPE = constants.HTYPE_INSTANCE
5848 _OP_REQP = ["instance_name"]
5851 def CheckArguments(self):
5852 if not hasattr(self.op, 'nics'):
5854 if not hasattr(self.op, 'disks'):
5856 if not hasattr(self.op, 'beparams'):
5857 self.op.beparams = {}
5858 if not hasattr(self.op, 'hvparams'):
5859 self.op.hvparams = {}
5860 self.op.force = getattr(self.op, "force", False)
5861 if not (self.op.nics or self.op.disks or
5862 self.op.hvparams or self.op.beparams):
5863 raise errors.OpPrereqError("No changes submitted")
5867 for disk_op, disk_dict in self.op.disks:
5868 if disk_op == constants.DDM_REMOVE:
5871 elif disk_op == constants.DDM_ADD:
5874 if not isinstance(disk_op, int):
5875 raise errors.OpPrereqError("Invalid disk index")
5876 if disk_op == constants.DDM_ADD:
5877 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5878 if mode not in constants.DISK_ACCESS_SET:
5879 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5880 size = disk_dict.get('size', None)
5882 raise errors.OpPrereqError("Required disk parameter size missing")
5885 except ValueError, err:
5886 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5888 disk_dict['size'] = size
5890 # modification of disk
5891 if 'size' in disk_dict:
5892 raise errors.OpPrereqError("Disk size change not possible, use"
5895 if disk_addremove > 1:
5896 raise errors.OpPrereqError("Only one disk add or remove operation"
5897 " supported at a time")
5901 for nic_op, nic_dict in self.op.nics:
5902 if nic_op == constants.DDM_REMOVE:
5905 elif nic_op == constants.DDM_ADD:
5908 if not isinstance(nic_op, int):
5909 raise errors.OpPrereqError("Invalid nic index")
5911 # nic_dict should be a dict
5912 nic_ip = nic_dict.get('ip', None)
5913 if nic_ip is not None:
5914 if nic_ip.lower() == constants.VALUE_NONE:
5915 nic_dict['ip'] = None
5917 if not utils.IsValidIP(nic_ip):
5918 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5920 nic_bridge = nic_dict.get('bridge', None)
5921 nic_link = nic_dict.get('link', None)
5922 if nic_bridge and nic_link:
5923 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5924 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5925 nic_dict['bridge'] = None
5926 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5927 nic_dict['link'] = None
5929 if nic_op == constants.DDM_ADD:
5930 nic_mac = nic_dict.get('mac', None)
5932 nic_dict['mac'] = constants.VALUE_AUTO
5934 if 'mac' in nic_dict:
5935 nic_mac = nic_dict['mac']
5936 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5937 if not utils.IsValidMac(nic_mac):
5938 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5939 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5940 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5941 " modifying an existing nic")
5943 if nic_addremove > 1:
5944 raise errors.OpPrereqError("Only one NIC add or remove operation"
5945 " supported at a time")
5947 def ExpandNames(self):
5948 self._ExpandAndLockInstance()
5949 self.needed_locks[locking.LEVEL_NODE] = []
5950 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5952 def DeclareLocks(self, level):
5953 if level == locking.LEVEL_NODE:
5954 self._LockInstancesNodes()
5956 def BuildHooksEnv(self):
5959 This runs on the master, primary and secondaries.
5963 if constants.BE_MEMORY in self.be_new:
5964 args['memory'] = self.be_new[constants.BE_MEMORY]
5965 if constants.BE_VCPUS in self.be_new:
5966 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5967 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5968 # information at all.
5971 nic_override = dict(self.op.nics)
5972 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
5973 for idx, nic in enumerate(self.instance.nics):
5974 if idx in nic_override:
5975 this_nic_override = nic_override[idx]
5977 this_nic_override = {}
5978 if 'ip' in this_nic_override:
5979 ip = this_nic_override['ip']
5982 if 'mac' in this_nic_override:
5983 mac = this_nic_override['mac']
5986 if idx in self.nic_pnew:
5987 nicparams = self.nic_pnew[idx]
5989 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
5990 mode = nicparams[constants.NIC_MODE]
5991 link = nicparams[constants.NIC_LINK]
5992 args['nics'].append((ip, mac, mode, link))
5993 if constants.DDM_ADD in nic_override:
5994 ip = nic_override[constants.DDM_ADD].get('ip', None)
5995 mac = nic_override[constants.DDM_ADD]['mac']
5996 nicparams = self.nic_pnew[constants.DDM_ADD]
5997 mode = nicparams[constants.NIC_MODE]
5998 link = nicparams[constants.NIC_LINK]
5999 args['nics'].append((ip, mac, mode, link))
6000 elif constants.DDM_REMOVE in nic_override:
6001 del args['nics'][-1]
6003 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6004 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6007 def _GetUpdatedParams(self, old_params, update_dict,
6008 default_values, parameter_types):
6009 """Return the new params dict for the given params.
6011 @type old_params: dict
6012 @type old_params: old parameters
6013 @type update_dict: dict
6014 @type update_dict: dict containing new parameter values,
6015 or constants.VALUE_DEFAULT to reset the
6016 parameter to its default value
6017 @type default_values: dict
6018 @param default_values: default values for the filled parameters
6019 @type parameter_types: dict
6020 @param parameter_types: dict mapping target dict keys to types
6021 in constants.ENFORCEABLE_TYPES
6022 @rtype: (dict, dict)
6023 @return: (new_parameters, filled_parameters)
6026 params_copy = copy.deepcopy(old_params)
6027 for key, val in update_dict.iteritems():
6028 if val == constants.VALUE_DEFAULT:
6030 del params_copy[key]
6034 params_copy[key] = val
6035 utils.ForceDictType(params_copy, parameter_types)
6036 params_filled = objects.FillDict(default_values, params_copy)
6037 return (params_copy, params_filled)
6039 def CheckPrereq(self):
6040 """Check prerequisites.
6042 This only checks the instance list against the existing names.
6045 force = self.force = self.op.force
6047 # checking the new params on the primary/secondary nodes
6049 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6050 cluster = self.cluster = self.cfg.GetClusterInfo()
6051 assert self.instance is not None, \
6052 "Cannot retrieve locked instance %s" % self.op.instance_name
6053 pnode = instance.primary_node
6054 nodelist = list(instance.all_nodes)
6056 # hvparams processing
6057 if self.op.hvparams:
6058 i_hvdict, hv_new = self._GetUpdatedParams(
6059 instance.hvparams, self.op.hvparams,
6060 cluster.hvparams[instance.hypervisor],
6061 constants.HVS_PARAMETER_TYPES)
6063 hypervisor.GetHypervisor(
6064 instance.hypervisor).CheckParameterSyntax(hv_new)
6065 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6066 self.hv_new = hv_new # the new actual values
6067 self.hv_inst = i_hvdict # the new dict (without defaults)
6069 self.hv_new = self.hv_inst = {}
6071 # beparams processing
6072 if self.op.beparams:
6073 i_bedict, be_new = self._GetUpdatedParams(
6074 instance.beparams, self.op.beparams,
6075 cluster.beparams[constants.PP_DEFAULT],
6076 constants.BES_PARAMETER_TYPES)
6077 self.be_new = be_new # the new actual values
6078 self.be_inst = i_bedict # the new dict (without defaults)
6080 self.be_new = self.be_inst = {}
6084 if constants.BE_MEMORY in self.op.beparams and not self.force:
6085 mem_check_list = [pnode]
6086 if be_new[constants.BE_AUTO_BALANCE]:
6087 # either we changed auto_balance to yes or it was from before
6088 mem_check_list.extend(instance.secondary_nodes)
6089 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6090 instance.hypervisor)
6091 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6092 instance.hypervisor)
6093 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6094 # Assume the primary node is unreachable and go ahead
6095 self.warn.append("Can't get info from primary node %s" % pnode)
6097 if not instance_info.failed and instance_info.data:
6098 current_mem = int(instance_info.data['memory'])
6100 # Assume instance not running
6101 # (there is a slight race condition here, but it's not very probable,
6102 # and we have no other way to check)
6104 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6105 nodeinfo[pnode].data['memory_free'])
6107 raise errors.OpPrereqError("This change will prevent the instance"
6108 " from starting, due to %d MB of memory"
6109 " missing on its primary node" % miss_mem)
6111 if be_new[constants.BE_AUTO_BALANCE]:
6112 for node, nres in nodeinfo.iteritems():
6113 if node not in instance.secondary_nodes:
6115 if nres.failed or not isinstance(nres.data, dict):
6116 self.warn.append("Can't get info from secondary node %s" % node)
6117 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6118 self.warn.append("Not enough memory to failover instance to"
6119 " secondary node %s" % node)
6124 for nic_op, nic_dict in self.op.nics:
6125 if nic_op == constants.DDM_REMOVE:
6126 if not instance.nics:
6127 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6129 if nic_op != constants.DDM_ADD:
6131 if nic_op < 0 or nic_op >= len(instance.nics):
6132 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6134 (nic_op, len(instance.nics)))
6135 old_nic_params = instance.nics[nic_op].nicparams
6136 old_nic_ip = instance.nics[nic_op].ip
6141 update_params_dict = dict([(key, nic_dict[key])
6142 for key in constants.NICS_PARAMETERS
6143 if key in nic_dict])
6145 if 'bridge' in nic_dict:
6146 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6148 new_nic_params, new_filled_nic_params = \
6149 self._GetUpdatedParams(old_nic_params, update_params_dict,
6150 cluster.nicparams[constants.PP_DEFAULT],
6151 constants.NICS_PARAMETER_TYPES)
6152 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6153 self.nic_pinst[nic_op] = new_nic_params
6154 self.nic_pnew[nic_op] = new_filled_nic_params
6155 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6157 if new_nic_mode == constants.NIC_MODE_BRIDGED:
6158 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6159 result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6162 msg = ("Bridge '%s' doesn't exist on one of"
6163 " the instance nodes" % nic_bridge)
6165 self.warn.append(msg)
6167 raise errors.OpPrereqError(msg)
6168 if new_nic_mode == constants.NIC_MODE_ROUTED:
6169 if 'ip' in nic_dict:
6170 nic_ip = nic_dict['ip']
6174 raise errors.OpPrereqError('Cannot set the nic ip to None'
6176 if 'mac' in nic_dict:
6177 nic_mac = nic_dict['mac']
6179 raise errors.OpPrereqError('Cannot set the nic mac to None')
6180 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6181 # otherwise generate the mac
6182 nic_dict['mac'] = self.cfg.GenerateMAC()
6184 # or validate/reserve the current one
6185 if self.cfg.IsMacInUse(nic_mac):
6186 raise errors.OpPrereqError("MAC address %s already in use"
6187 " in cluster" % nic_mac)
6190 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6191 raise errors.OpPrereqError("Disk operations not supported for"
6192 " diskless instances")
6193 for disk_op, disk_dict in self.op.disks:
6194 if disk_op == constants.DDM_REMOVE:
6195 if len(instance.disks) == 1:
6196 raise errors.OpPrereqError("Cannot remove the last disk of"
6198 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6199 ins_l = ins_l[pnode]
6200 if ins_l.failed or not isinstance(ins_l.data, list):
6201 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6202 if instance.name in ins_l.data:
6203 raise errors.OpPrereqError("Instance is running, can't remove"
6206 if (disk_op == constants.DDM_ADD and
6207 len(instance.nics) >= constants.MAX_DISKS):
6208 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6209 " add more" % constants.MAX_DISKS)
6210 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6212 if disk_op < 0 or disk_op >= len(instance.disks):
6213 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6215 (disk_op, len(instance.disks)))
6219 def Exec(self, feedback_fn):
6220 """Modifies an instance.
6222 All parameters take effect only at the next restart of the instance.
6225 # Process here the warnings from CheckPrereq, as we don't have a
6226 # feedback_fn there.
6227 for warn in self.warn:
6228 feedback_fn("WARNING: %s" % warn)
6231 instance = self.instance
6232 cluster = self.cluster
6234 for disk_op, disk_dict in self.op.disks:
6235 if disk_op == constants.DDM_REMOVE:
6236 # remove the last disk
6237 device = instance.disks.pop()
6238 device_idx = len(instance.disks)
6239 for node, disk in device.ComputeNodeTree(instance.primary_node):
6240 self.cfg.SetDiskID(disk, node)
6241 msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6243 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6244 " continuing anyway", device_idx, node, msg)
6245 result.append(("disk/%d" % device_idx, "remove"))
6246 elif disk_op == constants.DDM_ADD:
6248 if instance.disk_template == constants.DT_FILE:
6249 file_driver, file_path = instance.disks[0].logical_id
6250 file_path = os.path.dirname(file_path)
6252 file_driver = file_path = None
6253 disk_idx_base = len(instance.disks)
6254 new_disk = _GenerateDiskTemplate(self,
6255 instance.disk_template,
6256 instance.name, instance.primary_node,
6257 instance.secondary_nodes,
6262 instance.disks.append(new_disk)
6263 info = _GetInstanceInfoText(instance)
6265 logging.info("Creating volume %s for instance %s",
6266 new_disk.iv_name, instance.name)
6267 # Note: this needs to be kept in sync with _CreateDisks
6269 for node in instance.all_nodes:
6270 f_create = node == instance.primary_node
6272 _CreateBlockDev(self, node, instance, new_disk,
6273 f_create, info, f_create)
6274 except errors.OpExecError, err:
6275 self.LogWarning("Failed to create volume %s (%s) on"
6277 new_disk.iv_name, new_disk, node, err)
6278 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6279 (new_disk.size, new_disk.mode)))
6281 # change a given disk
6282 instance.disks[disk_op].mode = disk_dict['mode']
6283 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6285 for nic_op, nic_dict in self.op.nics:
6286 if nic_op == constants.DDM_REMOVE:
6287 # remove the last nic
6288 del instance.nics[-1]
6289 result.append(("nic.%d" % len(instance.nics), "remove"))
6290 elif nic_op == constants.DDM_ADD:
6291 # mac and bridge should be set, by now
6292 mac = nic_dict['mac']
6293 ip = nic_dict.get('ip', None)
6294 nicparams = self.nic_pinst[constants.DDM_ADD]
6295 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6296 instance.nics.append(new_nic)
6297 result.append(("nic.%d" % (len(instance.nics) - 1),
6298 "add:mac=%s,ip=%s,mode=%s,link=%s" %
6299 (new_nic.mac, new_nic.ip,
6300 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6301 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6304 for key in 'mac', 'ip':
6306 setattr(instance.nics[nic_op], key, nic_dict[key])
6307 if nic_op in self.nic_pnew:
6308 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6309 for key, val in nic_dict.iteritems():
6310 result.append(("nic.%s/%d" % (key, nic_op), val))
6313 if self.op.hvparams:
6314 instance.hvparams = self.hv_inst
6315 for key, val in self.op.hvparams.iteritems():
6316 result.append(("hv/%s" % key, val))
6319 if self.op.beparams:
6320 instance.beparams = self.be_inst
6321 for key, val in self.op.beparams.iteritems():
6322 result.append(("be/%s" % key, val))
6324 self.cfg.Update(instance)
6329 class LUQueryExports(NoHooksLU):
6330 """Query the exports list
6333 _OP_REQP = ['nodes']
6336 def ExpandNames(self):
6337 self.needed_locks = {}
6338 self.share_locks[locking.LEVEL_NODE] = 1
6339 if not self.op.nodes:
6340 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6342 self.needed_locks[locking.LEVEL_NODE] = \
6343 _GetWantedNodes(self, self.op.nodes)
6345 def CheckPrereq(self):
6346 """Check prerequisites.
6349 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6351 def Exec(self, feedback_fn):
6352 """Compute the list of all the exported system images.
6355 @return: a dictionary with the structure node->(export-list)
6356 where export-list is a list of the instances exported on
6360 rpcresult = self.rpc.call_export_list(self.nodes)
6362 for node in rpcresult:
6363 if rpcresult[node].failed:
6364 result[node] = False
6366 result[node] = rpcresult[node].data
6371 class LUExportInstance(LogicalUnit):
6372 """Export an instance to an image in the cluster.
6375 HPATH = "instance-export"
6376 HTYPE = constants.HTYPE_INSTANCE
6377 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6380 def ExpandNames(self):
6381 self._ExpandAndLockInstance()
6382 # FIXME: lock only instance primary and destination node
6384 # Sad but true, for now we have do lock all nodes, as we don't know where
6385 # the previous export might be, and and in this LU we search for it and
6386 # remove it from its current node. In the future we could fix this by:
6387 # - making a tasklet to search (share-lock all), then create the new one,
6388 # then one to remove, after
6389 # - removing the removal operation altoghether
6390 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6392 def DeclareLocks(self, level):
6393 """Last minute lock declaration."""
6394 # All nodes are locked anyway, so nothing to do here.
6396 def BuildHooksEnv(self):
6399 This will run on the master, primary node and target node.
6403 "EXPORT_NODE": self.op.target_node,
6404 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6406 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6407 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6408 self.op.target_node]
6411 def CheckPrereq(self):
6412 """Check prerequisites.
6414 This checks that the instance and node names are valid.
6417 instance_name = self.op.instance_name
6418 self.instance = self.cfg.GetInstanceInfo(instance_name)
6419 assert self.instance is not None, \
6420 "Cannot retrieve locked instance %s" % self.op.instance_name
6421 _CheckNodeOnline(self, self.instance.primary_node)
6423 self.dst_node = self.cfg.GetNodeInfo(
6424 self.cfg.ExpandNodeName(self.op.target_node))
6426 if self.dst_node is None:
6427 # This is wrong node name, not a non-locked node
6428 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6429 _CheckNodeOnline(self, self.dst_node.name)
6430 _CheckNodeNotDrained(self, self.dst_node.name)
6432 # instance disk type verification
6433 for disk in self.instance.disks:
6434 if disk.dev_type == constants.LD_FILE:
6435 raise errors.OpPrereqError("Export not supported for instances with"
6436 " file-based disks")
6438 def Exec(self, feedback_fn):
6439 """Export an instance to an image in the cluster.
6442 instance = self.instance
6443 dst_node = self.dst_node
6444 src_node = instance.primary_node
6445 if self.op.shutdown:
6446 # shutdown the instance, but not the disks
6447 result = self.rpc.call_instance_shutdown(src_node, instance)
6448 msg = result.RemoteFailMsg()
6450 raise errors.OpExecError("Could not shutdown instance %s on"
6452 (instance.name, src_node, msg))
6454 vgname = self.cfg.GetVGName()
6458 # set the disks ID correctly since call_instance_start needs the
6459 # correct drbd minor to create the symlinks
6460 for disk in instance.disks:
6461 self.cfg.SetDiskID(disk, src_node)
6464 for disk in instance.disks:
6465 # result.payload will be a snapshot of an lvm leaf of the one we passed
6466 result = self.rpc.call_blockdev_snapshot(src_node, disk)
6467 msg = result.RemoteFailMsg()
6469 self.LogWarning("Could not snapshot block device %s on node %s: %s",
6470 disk.logical_id[1], src_node, msg)
6471 snap_disks.append(False)
6473 disk_id = (vgname, result.payload)
6474 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6475 logical_id=disk_id, physical_id=disk_id,
6476 iv_name=disk.iv_name)
6477 snap_disks.append(new_dev)
6480 if self.op.shutdown and instance.admin_up:
6481 result = self.rpc.call_instance_start(src_node, instance, None, None)
6482 msg = result.RemoteFailMsg()
6484 _ShutdownInstanceDisks(self, instance)
6485 raise errors.OpExecError("Could not start instance: %s" % msg)
6487 # TODO: check for size
6489 cluster_name = self.cfg.GetClusterName()
6490 for idx, dev in enumerate(snap_disks):
6492 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6493 instance, cluster_name, idx)
6494 if result.failed or not result.data:
6495 self.LogWarning("Could not export block device %s from node %s to"
6496 " node %s", dev.logical_id[1], src_node,
6498 msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6500 self.LogWarning("Could not remove snapshot block device %s from node"
6501 " %s: %s", dev.logical_id[1], src_node, msg)
6503 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6504 if result.failed or not result.data:
6505 self.LogWarning("Could not finalize export for instance %s on node %s",
6506 instance.name, dst_node.name)
6508 nodelist = self.cfg.GetNodeList()
6509 nodelist.remove(dst_node.name)
6511 # on one-node clusters nodelist will be empty after the removal
6512 # if we proceed the backup would be removed because OpQueryExports
6513 # substitutes an empty list with the full cluster node list.
6515 exportlist = self.rpc.call_export_list(nodelist)
6516 for node in exportlist:
6517 if exportlist[node].failed:
6519 if instance.name in exportlist[node].data:
6520 if not self.rpc.call_export_remove(node, instance.name):
6521 self.LogWarning("Could not remove older export for instance %s"
6522 " on node %s", instance.name, node)
6525 class LURemoveExport(NoHooksLU):
6526 """Remove exports related to the named instance.
6529 _OP_REQP = ["instance_name"]
6532 def ExpandNames(self):
6533 self.needed_locks = {}
6534 # We need all nodes to be locked in order for RemoveExport to work, but we
6535 # don't need to lock the instance itself, as nothing will happen to it (and
6536 # we can remove exports also for a removed instance)
6537 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6539 def CheckPrereq(self):
6540 """Check prerequisites.
6544 def Exec(self, feedback_fn):
6545 """Remove any export.
6548 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6549 # If the instance was not found we'll try with the name that was passed in.
6550 # This will only work if it was an FQDN, though.
6552 if not instance_name:
6554 instance_name = self.op.instance_name
6556 exportlist = self.rpc.call_export_list(self.acquired_locks[
6557 locking.LEVEL_NODE])
6559 for node in exportlist:
6560 if exportlist[node].failed:
6561 self.LogWarning("Failed to query node %s, continuing" % node)
6563 if instance_name in exportlist[node].data:
6565 result = self.rpc.call_export_remove(node, instance_name)
6566 if result.failed or not result.data:
6567 logging.error("Could not remove export for instance %s"
6568 " on node %s", instance_name, node)
6570 if fqdn_warn and not found:
6571 feedback_fn("Export not found. If trying to remove an export belonging"
6572 " to a deleted instance please use its Fully Qualified"
6576 class TagsLU(NoHooksLU):
6579 This is an abstract class which is the parent of all the other tags LUs.
6583 def ExpandNames(self):
6584 self.needed_locks = {}
6585 if self.op.kind == constants.TAG_NODE:
6586 name = self.cfg.ExpandNodeName(self.op.name)
6588 raise errors.OpPrereqError("Invalid node name (%s)" %
6591 self.needed_locks[locking.LEVEL_NODE] = name
6592 elif self.op.kind == constants.TAG_INSTANCE:
6593 name = self.cfg.ExpandInstanceName(self.op.name)
6595 raise errors.OpPrereqError("Invalid instance name (%s)" %
6598 self.needed_locks[locking.LEVEL_INSTANCE] = name
6600 def CheckPrereq(self):
6601 """Check prerequisites.
6604 if self.op.kind == constants.TAG_CLUSTER:
6605 self.target = self.cfg.GetClusterInfo()
6606 elif self.op.kind == constants.TAG_NODE:
6607 self.target = self.cfg.GetNodeInfo(self.op.name)
6608 elif self.op.kind == constants.TAG_INSTANCE:
6609 self.target = self.cfg.GetInstanceInfo(self.op.name)
6611 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6615 class LUGetTags(TagsLU):
6616 """Returns the tags of a given object.
6619 _OP_REQP = ["kind", "name"]
6622 def Exec(self, feedback_fn):
6623 """Returns the tag list.
6626 return list(self.target.GetTags())
6629 class LUSearchTags(NoHooksLU):
6630 """Searches the tags for a given pattern.
6633 _OP_REQP = ["pattern"]
6636 def ExpandNames(self):
6637 self.needed_locks = {}
6639 def CheckPrereq(self):
6640 """Check prerequisites.
6642 This checks the pattern passed for validity by compiling it.
6646 self.re = re.compile(self.op.pattern)
6647 except re.error, err:
6648 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6649 (self.op.pattern, err))
6651 def Exec(self, feedback_fn):
6652 """Returns the tag list.
6656 tgts = [("/cluster", cfg.GetClusterInfo())]
6657 ilist = cfg.GetAllInstancesInfo().values()
6658 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6659 nlist = cfg.GetAllNodesInfo().values()
6660 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6662 for path, target in tgts:
6663 for tag in target.GetTags():
6664 if self.re.search(tag):
6665 results.append((path, tag))
6669 class LUAddTags(TagsLU):
6670 """Sets a tag on a given object.
6673 _OP_REQP = ["kind", "name", "tags"]
6676 def CheckPrereq(self):
6677 """Check prerequisites.
6679 This checks the type and length of the tag name and value.
6682 TagsLU.CheckPrereq(self)
6683 for tag in self.op.tags:
6684 objects.TaggableObject.ValidateTag(tag)
6686 def Exec(self, feedback_fn):
6691 for tag in self.op.tags:
6692 self.target.AddTag(tag)
6693 except errors.TagError, err:
6694 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6696 self.cfg.Update(self.target)
6697 except errors.ConfigurationError:
6698 raise errors.OpRetryError("There has been a modification to the"
6699 " config file and the operation has been"
6700 " aborted. Please retry.")
6703 class LUDelTags(TagsLU):
6704 """Delete a list of tags from a given object.
6707 _OP_REQP = ["kind", "name", "tags"]
6710 def CheckPrereq(self):
6711 """Check prerequisites.
6713 This checks that we have the given tag.
6716 TagsLU.CheckPrereq(self)
6717 for tag in self.op.tags:
6718 objects.TaggableObject.ValidateTag(tag)
6719 del_tags = frozenset(self.op.tags)
6720 cur_tags = self.target.GetTags()
6721 if not del_tags <= cur_tags:
6722 diff_tags = del_tags - cur_tags
6723 diff_names = ["'%s'" % tag for tag in diff_tags]
6725 raise errors.OpPrereqError("Tag(s) %s not found" %
6726 (",".join(diff_names)))
6728 def Exec(self, feedback_fn):
6729 """Remove the tag from the object.
6732 for tag in self.op.tags:
6733 self.target.RemoveTag(tag)
6735 self.cfg.Update(self.target)
6736 except errors.ConfigurationError:
6737 raise errors.OpRetryError("There has been a modification to the"
6738 " config file and the operation has been"
6739 " aborted. Please retry.")
6742 class LUTestDelay(NoHooksLU):
6743 """Sleep for a specified amount of time.
6745 This LU sleeps on the master and/or nodes for a specified amount of
6749 _OP_REQP = ["duration", "on_master", "on_nodes"]
6752 def ExpandNames(self):
6753 """Expand names and set required locks.
6755 This expands the node list, if any.
6758 self.needed_locks = {}
6759 if self.op.on_nodes:
6760 # _GetWantedNodes can be used here, but is not always appropriate to use
6761 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6763 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6764 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6766 def CheckPrereq(self):
6767 """Check prerequisites.
6771 def Exec(self, feedback_fn):
6772 """Do the actual sleep.
6775 if self.op.on_master:
6776 if not utils.TestDelay(self.op.duration):
6777 raise errors.OpExecError("Error during master delay test")
6778 if self.op.on_nodes:
6779 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6781 raise errors.OpExecError("Complete failure from rpc call")
6782 for node, node_result in result.items():
6784 if not node_result.data:
6785 raise errors.OpExecError("Failure during rpc call to node %s,"
6786 " result: %s" % (node, node_result.data))
6789 class IAllocator(object):
6790 """IAllocator framework.
6792 An IAllocator instance has three sets of attributes:
6793 - cfg that is needed to query the cluster
6794 - input data (all members of the _KEYS class attribute are required)
6795 - four buffer attributes (in|out_data|text), that represent the
6796 input (to the external script) in text and data structure format,
6797 and the output from it, again in two formats
6798 - the result variables from the script (success, info, nodes) for
6803 "mem_size", "disks", "disk_template",
6804 "os", "tags", "nics", "vcpus", "hypervisor",
6810 def __init__(self, lu, mode, name, **kwargs):
6812 # init buffer variables
6813 self.in_text = self.out_text = self.in_data = self.out_data = None
6814 # init all input fields so that pylint is happy
6817 self.mem_size = self.disks = self.disk_template = None
6818 self.os = self.tags = self.nics = self.vcpus = None
6819 self.hypervisor = None
6820 self.relocate_from = None
6822 self.required_nodes = None
6823 # init result fields
6824 self.success = self.info = self.nodes = None
6825 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6826 keyset = self._ALLO_KEYS
6827 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6828 keyset = self._RELO_KEYS
6830 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6831 " IAllocator" % self.mode)
6833 if key not in keyset:
6834 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6835 " IAllocator" % key)
6836 setattr(self, key, kwargs[key])
6838 if key not in kwargs:
6839 raise errors.ProgrammerError("Missing input parameter '%s' to"
6840 " IAllocator" % key)
6841 self._BuildInputData()
6843 def _ComputeClusterData(self):
6844 """Compute the generic allocator input data.
6846 This is the data that is independent of the actual operation.
6850 cluster_info = cfg.GetClusterInfo()
6853 "version": constants.IALLOCATOR_VERSION,
6854 "cluster_name": cfg.GetClusterName(),
6855 "cluster_tags": list(cluster_info.GetTags()),
6856 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6857 # we don't have job IDs
6859 iinfo = cfg.GetAllInstancesInfo().values()
6860 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6864 node_list = cfg.GetNodeList()
6866 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6867 hypervisor_name = self.hypervisor
6868 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6869 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6871 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6873 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6874 cluster_info.enabled_hypervisors)
6875 for nname, nresult in node_data.items():
6876 # first fill in static (config-based) values
6877 ninfo = cfg.GetNodeInfo(nname)
6879 "tags": list(ninfo.GetTags()),
6880 "primary_ip": ninfo.primary_ip,
6881 "secondary_ip": ninfo.secondary_ip,
6882 "offline": ninfo.offline,
6883 "drained": ninfo.drained,
6884 "master_candidate": ninfo.master_candidate,
6887 if not ninfo.offline:
6889 if not isinstance(nresult.data, dict):
6890 raise errors.OpExecError("Can't get data for node %s" % nname)
6891 remote_info = nresult.data
6892 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6893 'vg_size', 'vg_free', 'cpu_total']:
6894 if attr not in remote_info:
6895 raise errors.OpExecError("Node '%s' didn't return attribute"
6896 " '%s'" % (nname, attr))
6898 remote_info[attr] = int(remote_info[attr])
6899 except ValueError, err:
6900 raise errors.OpExecError("Node '%s' returned invalid value"
6901 " for '%s': %s" % (nname, attr, err))
6902 # compute memory used by primary instances
6903 i_p_mem = i_p_up_mem = 0
6904 for iinfo, beinfo in i_list:
6905 if iinfo.primary_node == nname:
6906 i_p_mem += beinfo[constants.BE_MEMORY]
6907 if iinfo.name not in node_iinfo[nname].data:
6910 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6911 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6912 remote_info['memory_free'] -= max(0, i_mem_diff)
6915 i_p_up_mem += beinfo[constants.BE_MEMORY]
6917 # compute memory used by instances
6919 "total_memory": remote_info['memory_total'],
6920 "reserved_memory": remote_info['memory_dom0'],
6921 "free_memory": remote_info['memory_free'],
6922 "total_disk": remote_info['vg_size'],
6923 "free_disk": remote_info['vg_free'],
6924 "total_cpus": remote_info['cpu_total'],
6925 "i_pri_memory": i_p_mem,
6926 "i_pri_up_memory": i_p_up_mem,
6930 node_results[nname] = pnr
6931 data["nodes"] = node_results
6935 for iinfo, beinfo in i_list:
6937 for nic in iinfo.nics:
6938 filled_params = objects.FillDict(
6939 cluster_info.nicparams[constants.PP_DEFAULT],
6941 nic_dict = {"mac": nic.mac,
6943 "mode": filled_params[constants.NIC_MODE],
6944 "link": filled_params[constants.NIC_LINK],
6946 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6947 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6948 nic_data.append(nic_dict)
6950 "tags": list(iinfo.GetTags()),
6951 "admin_up": iinfo.admin_up,
6952 "vcpus": beinfo[constants.BE_VCPUS],
6953 "memory": beinfo[constants.BE_MEMORY],
6955 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6957 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6958 "disk_template": iinfo.disk_template,
6959 "hypervisor": iinfo.hypervisor,
6961 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6963 instance_data[iinfo.name] = pir
6965 data["instances"] = instance_data
6969 def _AddNewInstance(self):
6970 """Add new instance data to allocator structure.
6972 This in combination with _AllocatorGetClusterData will create the
6973 correct structure needed as input for the allocator.
6975 The checks for the completeness of the opcode must have already been
6981 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6983 if self.disk_template in constants.DTS_NET_MIRROR:
6984 self.required_nodes = 2
6986 self.required_nodes = 1
6990 "disk_template": self.disk_template,
6993 "vcpus": self.vcpus,
6994 "memory": self.mem_size,
6995 "disks": self.disks,
6996 "disk_space_total": disk_space,
6998 "required_nodes": self.required_nodes,
7000 data["request"] = request
7002 def _AddRelocateInstance(self):
7003 """Add relocate instance data to allocator structure.
7005 This in combination with _IAllocatorGetClusterData will create the
7006 correct structure needed as input for the allocator.
7008 The checks for the completeness of the opcode must have already been
7012 instance = self.lu.cfg.GetInstanceInfo(self.name)
7013 if instance is None:
7014 raise errors.ProgrammerError("Unknown instance '%s' passed to"
7015 " IAllocator" % self.name)
7017 if instance.disk_template not in constants.DTS_NET_MIRROR:
7018 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7020 if len(instance.secondary_nodes) != 1:
7021 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7023 self.required_nodes = 1
7024 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7025 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7030 "disk_space_total": disk_space,
7031 "required_nodes": self.required_nodes,
7032 "relocate_from": self.relocate_from,
7034 self.in_data["request"] = request
7036 def _BuildInputData(self):
7037 """Build input data structures.
7040 self._ComputeClusterData()
7042 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7043 self._AddNewInstance()
7045 self._AddRelocateInstance()
7047 self.in_text = serializer.Dump(self.in_data)
7049 def Run(self, name, validate=True, call_fn=None):
7050 """Run an instance allocator and return the results.
7054 call_fn = self.lu.rpc.call_iallocator_runner
7057 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7060 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7061 raise errors.OpExecError("Invalid result from master iallocator runner")
7063 rcode, stdout, stderr, fail = result.data
7065 if rcode == constants.IARUN_NOTFOUND:
7066 raise errors.OpExecError("Can't find allocator '%s'" % name)
7067 elif rcode == constants.IARUN_FAILURE:
7068 raise errors.OpExecError("Instance allocator call failed: %s,"
7069 " output: %s" % (fail, stdout+stderr))
7070 self.out_text = stdout
7072 self._ValidateResult()
7074 def _ValidateResult(self):
7075 """Process the allocator results.
7077 This will process and if successful save the result in
7078 self.out_data and the other parameters.
7082 rdict = serializer.Load(self.out_text)
7083 except Exception, err:
7084 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7086 if not isinstance(rdict, dict):
7087 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7089 for key in "success", "info", "nodes":
7090 if key not in rdict:
7091 raise errors.OpExecError("Can't parse iallocator results:"
7092 " missing key '%s'" % key)
7093 setattr(self, key, rdict[key])
7095 if not isinstance(rdict["nodes"], list):
7096 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7098 self.out_data = rdict
7101 class LUTestAllocator(NoHooksLU):
7102 """Run allocator tests.
7104 This LU runs the allocator tests
7107 _OP_REQP = ["direction", "mode", "name"]
7109 def CheckPrereq(self):
7110 """Check prerequisites.
7112 This checks the opcode parameters depending on the director and mode test.
7115 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7116 for attr in ["name", "mem_size", "disks", "disk_template",
7117 "os", "tags", "nics", "vcpus"]:
7118 if not hasattr(self.op, attr):
7119 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7121 iname = self.cfg.ExpandInstanceName(self.op.name)
7122 if iname is not None:
7123 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7125 if not isinstance(self.op.nics, list):
7126 raise errors.OpPrereqError("Invalid parameter 'nics'")
7127 for row in self.op.nics:
7128 if (not isinstance(row, dict) or
7131 "bridge" not in row):
7132 raise errors.OpPrereqError("Invalid contents of the"
7133 " 'nics' parameter")
7134 if not isinstance(self.op.disks, list):
7135 raise errors.OpPrereqError("Invalid parameter 'disks'")
7136 for row in self.op.disks:
7137 if (not isinstance(row, dict) or
7138 "size" not in row or
7139 not isinstance(row["size"], int) or
7140 "mode" not in row or
7141 row["mode"] not in ['r', 'w']):
7142 raise errors.OpPrereqError("Invalid contents of the"
7143 " 'disks' parameter")
7144 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7145 self.op.hypervisor = self.cfg.GetHypervisorType()
7146 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7147 if not hasattr(self.op, "name"):
7148 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7149 fname = self.cfg.ExpandInstanceName(self.op.name)
7151 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7153 self.op.name = fname
7154 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7156 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7159 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7160 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7161 raise errors.OpPrereqError("Missing allocator name")
7162 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7163 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7166 def Exec(self, feedback_fn):
7167 """Run the allocator test.
7170 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7171 ial = IAllocator(self,
7174 mem_size=self.op.mem_size,
7175 disks=self.op.disks,
7176 disk_template=self.op.disk_template,
7180 vcpus=self.op.vcpus,
7181 hypervisor=self.op.hypervisor,
7184 ial = IAllocator(self,
7187 relocate_from=list(self.relocate_from),
7190 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7191 result = ial.in_text
7193 ial.Run(self.op.allocator, validate=False)
7194 result = ial.out_text