4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 for attr_name in self._OP_REQP:
94 attr_val = getattr(op, attr_name, None)
96 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108 ssh = property(fget=__GetSSH)
110 def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments.
113 This method is for doing a simple syntactic check and ensure
114 validity of opcode parameters, without any cluster-related
115 checks. While the same can be accomplished in ExpandNames and/or
116 CheckPrereq, doing these separate is better because:
118 - ExpandNames is left as as purely a lock-related function
119 - CheckPrereq is run after we have aquired locks (and possible
122 The function is allowed to change the self.op attribute so that
123 later methods can no longer worry about missing parameters.
128 def ExpandNames(self):
129 """Expand names for this LU.
131 This method is called before starting to execute the opcode, and it should
132 update all the parameters of the opcode to their canonical form (e.g. a
133 short node name must be fully expanded after this method has successfully
134 completed). This way locking, hooks, logging, ecc. can work correctly.
136 LUs which implement this method must also populate the self.needed_locks
137 member, as a dict with lock levels as keys, and a list of needed lock names
140 - use an empty dict if you don't need any lock
141 - if you don't need any lock at a particular level omit that level
142 - don't put anything for the BGL level
143 - if you want all locks at a level use locking.ALL_SET as a value
145 If you need to share locks (rather than acquire them exclusively) at one
146 level you can modify self.share_locks, setting a true value (usually 1) for
147 that level. By default locks are not shared.
151 # Acquire all nodes and one instance
152 self.needed_locks = {
153 locking.LEVEL_NODE: locking.ALL_SET,
154 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156 # Acquire just two nodes
157 self.needed_locks = {
158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161 self.needed_locks = {} # No, you can't leave it to the default value None
164 # The implementation of this method is mandatory only if the new LU is
165 # concurrent, so that old LUs don't need to be changed all at the same
168 self.needed_locks = {} # Exclusive LUs don't need locks.
170 raise NotImplementedError
172 def DeclareLocks(self, level):
173 """Declare LU locking needs for a level
175 While most LUs can just declare their locking needs at ExpandNames time,
176 sometimes there's the need to calculate some locks after having acquired
177 the ones before. This function is called just before acquiring locks at a
178 particular level, but after acquiring the ones at lower levels, and permits
179 such calculations. It can be used to modify self.needed_locks, and by
180 default it does nothing.
182 This function is only called if you have something already set in
183 self.needed_locks for the level.
185 @param level: Locking level which is going to be locked
186 @type level: member of ganeti.locking.LEVELS
190 def CheckPrereq(self):
191 """Check prerequisites for this LU.
193 This method should check that the prerequisites for the execution
194 of this LU are fulfilled. It can do internode communication, but
195 it should be idempotent - no cluster or system changes are
198 The method should raise errors.OpPrereqError in case something is
199 not fulfilled. Its return value is ignored.
201 This method should also update all the parameters of the opcode to
202 their canonical form if it hasn't been done by ExpandNames before.
205 raise NotImplementedError
207 def Exec(self, feedback_fn):
210 This method should implement the actual work. It should raise
211 errors.OpExecError for failures that are somewhat dealt with in
215 raise NotImplementedError
217 def BuildHooksEnv(self):
218 """Build hooks environment for this LU.
220 This method should return a three-node tuple consisting of: a dict
221 containing the environment that will be used for running the
222 specific hook for this LU, a list of node names on which the hook
223 should run before the execution, and a list of node names on which
224 the hook should run after the execution.
226 The keys of the dict must not have 'GANETI_' prefixed as this will
227 be handled in the hooks runner. Also note additional keys will be
228 added by the hooks runner. If the LU doesn't define any
229 environment, an empty dict (and not None) should be returned.
231 No nodes should be returned as an empty list (and not None).
233 Note that if the HPATH for a LU class is None, this function will
237 raise NotImplementedError
239 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks.
242 This method is called every time a hooks phase is executed, and notifies
243 the Logical Unit about the hooks' result. The LU can then use it to alter
244 its result based on the hooks. By default the method does nothing and the
245 previous result is passed back unchanged but any LU can define it if it
246 wants to use the local cluster hook-scripts somehow.
248 @param phase: one of L{constants.HOOKS_PHASE_POST} or
249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250 @param hook_results: the results of the multi-node hooks rpc call
251 @param feedback_fn: function used send feedback back to the caller
252 @param lu_result: the previous Exec result this LU had, or None
254 @return: the new Exec result, based on the previous result
260 def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance.
263 Many LUs that work on an instance take its name in self.op.instance_name
264 and need to expand it and then declare the expanded name for locking. This
265 function does it, and then updates self.op.instance_name to the expanded
266 name. It also initializes needed_locks as a dict, if this hasn't been done
270 if self.needed_locks is None:
271 self.needed_locks = {}
273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274 "_ExpandAndLockInstance called with instance-level locks set"
275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276 if expanded_name is None:
277 raise errors.OpPrereqError("Instance '%s' not known" %
278 self.op.instance_name)
279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280 self.op.instance_name = expanded_name
282 def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking.
285 This function should be called after locking one or more instances to lock
286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287 with all primary or secondary nodes for instances already locked and
288 present in self.needed_locks[locking.LEVEL_INSTANCE].
290 It should be called from DeclareLocks, and for safety only works if
291 self.recalculate_locks[locking.LEVEL_NODE] is set.
293 In the future it may grow parameters to just lock some instance's nodes, or
294 to just lock primaries or secondary nodes, if needed.
296 If should be called in DeclareLocks in a way similar to::
298 if level == locking.LEVEL_NODE:
299 self._LockInstancesNodes()
301 @type primary_only: boolean
302 @param primary_only: only lock primary nodes of locked instances
305 assert locking.LEVEL_NODE in self.recalculate_locks, \
306 "_LockInstancesNodes helper function called with no nodes to recalculate"
308 # TODO: check if we're really been called with the instance locks held
310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311 # future we might want to have different behaviors depending on the value
312 # of self.recalculate_locks[locking.LEVEL_NODE]
314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315 instance = self.context.cfg.GetInstanceInfo(instance_name)
316 wanted_nodes.append(instance.primary_node)
318 wanted_nodes.extend(instance.secondary_nodes)
320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325 del self.recalculate_locks[locking.LEVEL_NODE]
328 class NoHooksLU(LogicalUnit):
329 """Simple LU which runs no hooks.
331 This LU is intended as a parent for other LogicalUnits which will
332 run no hooks, in order to reduce duplicate code.
339 def _GetWantedNodes(lu, nodes):
340 """Returns list of checked and expanded node names.
342 @type lu: L{LogicalUnit}
343 @param lu: the logical unit on whose behalf we execute
345 @param nodes: list of node names or None for all nodes
347 @return: the list of nodes, sorted
348 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351 if not isinstance(nodes, list):
352 raise errors.OpPrereqError("Invalid argument type 'nodes'")
355 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356 " non-empty list of nodes whose name is to be expanded.")
360 node = lu.cfg.ExpandNodeName(name)
362 raise errors.OpPrereqError("No such node name '%s'" % name)
365 return utils.NiceSort(wanted)
368 def _GetWantedInstances(lu, instances):
369 """Returns list of checked and expanded instance names.
371 @type lu: L{LogicalUnit}
372 @param lu: the logical unit on whose behalf we execute
373 @type instances: list
374 @param instances: list of instance names or None for all instances
376 @return: the list of instances, sorted
377 @raise errors.OpPrereqError: if the instances parameter is wrong type
378 @raise errors.OpPrereqError: if any of the passed instances is not found
381 if not isinstance(instances, list):
382 raise errors.OpPrereqError("Invalid argument type 'instances'")
387 for name in instances:
388 instance = lu.cfg.ExpandInstanceName(name)
390 raise errors.OpPrereqError("No such instance name '%s'" % name)
391 wanted.append(instance)
394 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398 def _CheckOutputFields(static, dynamic, selected):
399 """Checks whether all selected fields are valid.
401 @type static: L{utils.FieldSet}
402 @param static: static fields set
403 @type dynamic: L{utils.FieldSet}
404 @param dynamic: dynamic fields set
411 delta = f.NonMatching(selected)
413 raise errors.OpPrereqError("Unknown output fields selected: %s"
417 def _CheckBooleanOpField(op, name):
418 """Validates boolean opcode parameters.
420 This will ensure that an opcode parameter is either a boolean value,
421 or None (but that it always exists).
424 val = getattr(op, name, None)
425 if not (val is None or isinstance(val, bool)):
426 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428 setattr(op, name, val)
431 def _CheckNodeOnline(lu, node):
432 """Ensure that a given node is online.
434 @param lu: the LU on behalf of which we make the check
435 @param node: the node to check
436 @raise errors.OpPrereqError: if the node is offline
439 if lu.cfg.GetNodeInfo(node).offline:
440 raise errors.OpPrereqError("Can't use offline node %s" % node)
443 def _CheckNodeNotDrained(lu, node):
444 """Ensure that a given node is not drained.
446 @param lu: the LU on behalf of which we make the check
447 @param node: the node to check
448 @raise errors.OpPrereqError: if the node is drained
451 if lu.cfg.GetNodeInfo(node).drained:
452 raise errors.OpPrereqError("Can't use drained node %s" % node)
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456 memory, vcpus, nics, disk_template, disks):
457 """Builds instance related env variables for hooks
459 This builds the hook environment from individual variables.
462 @param name: the name of the instance
463 @type primary_node: string
464 @param primary_node: the name of the instance's primary node
465 @type secondary_nodes: list
466 @param secondary_nodes: list of secondary nodes as strings
467 @type os_type: string
468 @param os_type: the name of the instance's OS
469 @type status: boolean
470 @param status: the should_run status of the instance
472 @param memory: the memory size of the instance
474 @param vcpus: the count of VCPUs the instance has
476 @param nics: list of tuples (ip, bridge, mac) representing
477 the NICs the instance has
478 @type disk_template: string
479 @param disk_template: the distk template of the instance
481 @param disks: the list of (size, mode) pairs
483 @return: the hook environment for this instance
492 "INSTANCE_NAME": name,
493 "INSTANCE_PRIMARY": primary_node,
494 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495 "INSTANCE_OS_TYPE": os_type,
496 "INSTANCE_STATUS": str_status,
497 "INSTANCE_MEMORY": memory,
498 "INSTANCE_VCPUS": vcpus,
499 "INSTANCE_DISK_TEMPLATE": disk_template,
503 nic_count = len(nics)
504 for idx, (ip, mac, mode, link) in enumerate(nics):
507 env["INSTANCE_NIC%d_IP" % idx] = ip
508 env["INSTANCE_NIC%d_MAC" % idx] = mac
509 env["INSTANCE_NIC%d_MODE" % idx] = mode
510 env["INSTANCE_NIC%d_LINK" % idx] = link
511 if mode == constants.NIC_MODE_BRIDGED:
512 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
516 env["INSTANCE_NIC_COUNT"] = nic_count
519 disk_count = len(disks)
520 for idx, (size, mode) in enumerate(disks):
521 env["INSTANCE_DISK%d_SIZE" % idx] = size
522 env["INSTANCE_DISK%d_MODE" % idx] = mode
526 env["INSTANCE_DISK_COUNT"] = disk_count
530 def _PreBuildNICHooksList(lu, nics):
531 """Build a list of nic information tuples.
533 This list is suitable to be passed to _BuildInstanceHookEnv.
535 @type lu: L{LogicalUnit}
536 @param lu: the logical unit on whose behalf we execute
537 @type nics: list of L{objects.NIC}
538 @param nics: list of nics to convert to hooks tuples
542 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
546 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547 mode = filled_params[constants.NIC_MODE]
548 link = filled_params[constants.NIC_LINK]
549 hooks_nics.append((ip, mac, mode, link))
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553 """Builds instance related env variables for hooks from an object.
555 @type lu: L{LogicalUnit}
556 @param lu: the logical unit on whose behalf we execute
557 @type instance: L{objects.Instance}
558 @param instance: the instance for which we should build the
561 @param override: dictionary with key/values that will override
564 @return: the hook environment dictionary
567 bep = lu.cfg.GetClusterInfo().FillBE(instance)
569 'name': instance.name,
570 'primary_node': instance.primary_node,
571 'secondary_nodes': instance.secondary_nodes,
572 'os_type': instance.os,
573 'status': instance.admin_up,
574 'memory': bep[constants.BE_MEMORY],
575 'vcpus': bep[constants.BE_VCPUS],
576 'nics': _PreBuildNICHooksList(lu, instance.nics),
577 'disk_template': instance.disk_template,
578 'disks': [(disk.size, disk.mode) for disk in instance.disks],
581 args.update(override)
582 return _BuildInstanceHookEnv(**args)
585 def _AdjustCandidatePool(lu):
586 """Adjust the candidate pool after node operations.
589 mod_list = lu.cfg.MaintainCandidatePool()
591 lu.LogInfo("Promoted nodes to master candidate role: %s",
592 ", ".join(node.name for node in mod_list))
593 for name in mod_list:
594 lu.context.ReaddNode(name)
595 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
597 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602 profile=constants.PP_DEFAULT):
603 """Check that the brigdes needed by a list of nics exist.
606 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608 for nic in target_nics]
609 brlist = [params[constants.NIC_LINK] for params in paramslist
610 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
612 result = lu.rpc.call_bridges_exist(target_node, brlist)
615 raise errors.OpPrereqError("One or more target bridges %s does not"
616 " exist on destination node '%s'" %
617 (brlist, target_node))
620 def _CheckInstanceBridgesExist(lu, instance, node=None):
621 """Check that the brigdes needed by an instance exist.
625 node=instance.primary_node
626 _CheckNicsBridgesExist(lu, instance.nics, node)
629 class LUDestroyCluster(NoHooksLU):
630 """Logical unit for destroying the cluster.
635 def CheckPrereq(self):
636 """Check prerequisites.
638 This checks whether the cluster is empty.
640 Any errors are signalled by raising errors.OpPrereqError.
643 master = self.cfg.GetMasterNode()
645 nodelist = self.cfg.GetNodeList()
646 if len(nodelist) != 1 or nodelist[0] != master:
647 raise errors.OpPrereqError("There are still %d node(s) in"
648 " this cluster." % (len(nodelist) - 1))
649 instancelist = self.cfg.GetInstanceList()
651 raise errors.OpPrereqError("There are still %d instance(s) in"
652 " this cluster." % len(instancelist))
654 def Exec(self, feedback_fn):
655 """Destroys the cluster.
658 master = self.cfg.GetMasterNode()
659 result = self.rpc.call_node_stop_master(master, False)
662 raise errors.OpExecError("Could not disable the master role")
663 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
664 utils.CreateBackup(priv_key)
665 utils.CreateBackup(pub_key)
669 class LUVerifyCluster(LogicalUnit):
670 """Verifies the cluster status.
673 HPATH = "cluster-verify"
674 HTYPE = constants.HTYPE_CLUSTER
675 _OP_REQP = ["skip_checks"]
678 def ExpandNames(self):
679 self.needed_locks = {
680 locking.LEVEL_NODE: locking.ALL_SET,
681 locking.LEVEL_INSTANCE: locking.ALL_SET,
683 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
685 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
686 node_result, feedback_fn, master_files,
688 """Run multiple tests against a node.
692 - compares ganeti version
693 - checks vg existance and size > 20G
694 - checks config file checksum
695 - checks ssh to other nodes
697 @type nodeinfo: L{objects.Node}
698 @param nodeinfo: the node to check
699 @param file_list: required list of files
700 @param local_cksum: dictionary of local files and their checksums
701 @param node_result: the results from the node
702 @param feedback_fn: function used to accumulate results
703 @param master_files: list of files that only masters should have
704 @param drbd_map: the useddrbd minors for this node, in
705 form of minor: (instance, must_exist) which correspond to instances
706 and their running status
707 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
712 # main result, node_result should be a non-empty dict
713 if not node_result or not isinstance(node_result, dict):
714 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
717 # compares ganeti version
718 local_version = constants.PROTOCOL_VERSION
719 remote_version = node_result.get('version', None)
720 if not (remote_version and isinstance(remote_version, (list, tuple)) and
721 len(remote_version) == 2):
722 feedback_fn(" - ERROR: connection to %s failed" % (node))
725 if local_version != remote_version[0]:
726 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
727 " node %s %s" % (local_version, node, remote_version[0]))
730 # node seems compatible, we can actually try to look into its results
734 # full package version
735 if constants.RELEASE_VERSION != remote_version[1]:
736 feedback_fn(" - WARNING: software version mismatch: master %s,"
738 (constants.RELEASE_VERSION, node, remote_version[1]))
740 # checks vg existence and size > 20G
741 if vg_name is not None:
742 vglist = node_result.get(constants.NV_VGLIST, None)
744 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
748 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
749 constants.MIN_VG_SIZE)
751 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
754 # checks config file checksum
756 remote_cksum = node_result.get(constants.NV_FILELIST, None)
757 if not isinstance(remote_cksum, dict):
759 feedback_fn(" - ERROR: node hasn't returned file checksum data")
761 for file_name in file_list:
762 node_is_mc = nodeinfo.master_candidate
763 must_have_file = file_name not in master_files
764 if file_name not in remote_cksum:
765 if node_is_mc or must_have_file:
767 feedback_fn(" - ERROR: file '%s' missing" % file_name)
768 elif remote_cksum[file_name] != local_cksum[file_name]:
769 if node_is_mc or must_have_file:
771 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
773 # not candidate and this is not a must-have file
775 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
778 # all good, except non-master/non-must have combination
779 if not node_is_mc and not must_have_file:
780 feedback_fn(" - ERROR: file '%s' should not exist on non master"
781 " candidates" % file_name)
785 if constants.NV_NODELIST not in node_result:
787 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
789 if node_result[constants.NV_NODELIST]:
791 for node in node_result[constants.NV_NODELIST]:
792 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
793 (node, node_result[constants.NV_NODELIST][node]))
795 if constants.NV_NODENETTEST not in node_result:
797 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
799 if node_result[constants.NV_NODENETTEST]:
801 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
803 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
804 (node, node_result[constants.NV_NODENETTEST][node]))
806 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
807 if isinstance(hyp_result, dict):
808 for hv_name, hv_result in hyp_result.iteritems():
809 if hv_result is not None:
810 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
811 (hv_name, hv_result))
813 # check used drbd list
814 if vg_name is not None:
815 used_minors = node_result.get(constants.NV_DRBDLIST, [])
816 if not isinstance(used_minors, (tuple, list)):
817 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
820 for minor, (iname, must_exist) in drbd_map.items():
821 if minor not in used_minors and must_exist:
822 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
823 " not active" % (minor, iname))
825 for minor in used_minors:
826 if minor not in drbd_map:
827 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
833 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
834 node_instance, feedback_fn, n_offline):
835 """Verify an instance.
837 This function checks to see if the required block devices are
838 available on the instance's node.
843 node_current = instanceconfig.primary_node
846 instanceconfig.MapLVsByNode(node_vol_should)
848 for node in node_vol_should:
849 if node in n_offline:
850 # ignore missing volumes on offline nodes
852 for volume in node_vol_should[node]:
853 if node not in node_vol_is or volume not in node_vol_is[node]:
854 feedback_fn(" - ERROR: volume %s missing on node %s" %
858 if instanceconfig.admin_up:
859 if ((node_current not in node_instance or
860 not instance in node_instance[node_current]) and
861 node_current not in n_offline):
862 feedback_fn(" - ERROR: instance %s not running on node %s" %
863 (instance, node_current))
866 for node in node_instance:
867 if (not node == node_current):
868 if instance in node_instance[node]:
869 feedback_fn(" - ERROR: instance %s should not run on node %s" %
875 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
876 """Verify if there are any unknown volumes in the cluster.
878 The .os, .swap and backup volumes are ignored. All other volumes are
884 for node in node_vol_is:
885 for volume in node_vol_is[node]:
886 if node not in node_vol_should or volume not in node_vol_should[node]:
887 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
892 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
893 """Verify the list of running instances.
895 This checks what instances are running but unknown to the cluster.
899 for node in node_instance:
900 for runninginstance in node_instance[node]:
901 if runninginstance not in instancelist:
902 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
903 (runninginstance, node))
907 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
908 """Verify N+1 Memory Resilience.
910 Check that if one single node dies we can still start all the instances it
916 for node, nodeinfo in node_info.iteritems():
917 # This code checks that every node which is now listed as secondary has
918 # enough memory to host all instances it is supposed to should a single
919 # other node in the cluster fail.
920 # FIXME: not ready for failover to an arbitrary node
921 # FIXME: does not support file-backed instances
922 # WARNING: we currently take into account down instances as well as up
923 # ones, considering that even if they're down someone might want to start
924 # them even in the event of a node failure.
925 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
927 for instance in instances:
928 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
929 if bep[constants.BE_AUTO_BALANCE]:
930 needed_mem += bep[constants.BE_MEMORY]
931 if nodeinfo['mfree'] < needed_mem:
932 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
933 " failovers should node %s fail" % (node, prinode))
937 def CheckPrereq(self):
938 """Check prerequisites.
940 Transform the list of checks we're going to skip into a set and check that
941 all its members are valid.
944 self.skip_set = frozenset(self.op.skip_checks)
945 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
946 raise errors.OpPrereqError("Invalid checks to be skipped specified")
948 def BuildHooksEnv(self):
951 Cluster-Verify hooks just rone in the post phase and their failure makes
952 the output be logged in the verify output and the verification to fail.
955 all_nodes = self.cfg.GetNodeList()
957 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
959 for node in self.cfg.GetAllNodesInfo().values():
960 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
962 return env, [], all_nodes
964 def Exec(self, feedback_fn):
965 """Verify integrity of cluster, performing various test on nodes.
969 feedback_fn("* Verifying global settings")
970 for msg in self.cfg.VerifyConfig():
971 feedback_fn(" - ERROR: %s" % msg)
973 vg_name = self.cfg.GetVGName()
974 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
975 nodelist = utils.NiceSort(self.cfg.GetNodeList())
976 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
977 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
978 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
979 for iname in instancelist)
980 i_non_redundant = [] # Non redundant instances
981 i_non_a_balanced = [] # Non auto-balanced instances
982 n_offline = [] # List of offline nodes
983 n_drained = [] # List of nodes being drained
989 # FIXME: verify OS list
991 master_files = [constants.CLUSTER_CONF_FILE]
993 file_names = ssconf.SimpleStore().GetFileList()
994 file_names.append(constants.SSL_CERT_FILE)
995 file_names.append(constants.RAPI_CERT_FILE)
996 file_names.extend(master_files)
998 local_checksums = utils.FingerprintFiles(file_names)
1000 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1001 node_verify_param = {
1002 constants.NV_FILELIST: file_names,
1003 constants.NV_NODELIST: [node.name for node in nodeinfo
1004 if not node.offline],
1005 constants.NV_HYPERVISOR: hypervisors,
1006 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1007 node.secondary_ip) for node in nodeinfo
1008 if not node.offline],
1009 constants.NV_INSTANCELIST: hypervisors,
1010 constants.NV_VERSION: None,
1011 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1013 if vg_name is not None:
1014 node_verify_param[constants.NV_VGLIST] = None
1015 node_verify_param[constants.NV_LVLIST] = vg_name
1016 node_verify_param[constants.NV_DRBDLIST] = None
1017 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1018 self.cfg.GetClusterName())
1020 cluster = self.cfg.GetClusterInfo()
1021 master_node = self.cfg.GetMasterNode()
1022 all_drbd_map = self.cfg.ComputeDRBDMap()
1024 for node_i in nodeinfo:
1026 nresult = all_nvinfo[node].data
1029 feedback_fn("* Skipping offline node %s" % (node,))
1030 n_offline.append(node)
1033 if node == master_node:
1035 elif node_i.master_candidate:
1036 ntype = "master candidate"
1037 elif node_i.drained:
1039 n_drained.append(node)
1042 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1044 if all_nvinfo[node].failed or not isinstance(nresult, dict):
1045 feedback_fn(" - ERROR: connection to %s failed" % (node,))
1050 for minor, instance in all_drbd_map[node].items():
1051 if instance not in instanceinfo:
1052 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1054 # ghost instance should not be running, but otherwise we
1055 # don't give double warnings (both ghost instance and
1056 # unallocated minor in use)
1057 node_drbd[minor] = (instance, False)
1059 instance = instanceinfo[instance]
1060 node_drbd[minor] = (instance.name, instance.admin_up)
1061 result = self._VerifyNode(node_i, file_names, local_checksums,
1062 nresult, feedback_fn, master_files,
1066 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1068 node_volume[node] = {}
1069 elif isinstance(lvdata, basestring):
1070 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1071 (node, utils.SafeEncode(lvdata)))
1073 node_volume[node] = {}
1074 elif not isinstance(lvdata, dict):
1075 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1079 node_volume[node] = lvdata
1082 idata = nresult.get(constants.NV_INSTANCELIST, None)
1083 if not isinstance(idata, list):
1084 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1089 node_instance[node] = idata
1092 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093 if not isinstance(nodeinfo, dict):
1094 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1100 "mfree": int(nodeinfo['memory_free']),
1103 # dictionary holding all instances this node is secondary for,
1104 # grouped by their primary node. Each key is a cluster node, and each
1105 # value is a list of instances which have the key as primary and the
1106 # current node as secondary. this is handy to calculate N+1 memory
1107 # availability if you can only failover from a primary to its
1109 "sinst-by-pnode": {},
1111 # FIXME: devise a free space model for file based instances as well
1112 if vg_name is not None:
1113 if (constants.NV_VGLIST not in nresult or
1114 vg_name not in nresult[constants.NV_VGLIST]):
1115 feedback_fn(" - ERROR: node %s didn't return data for the"
1116 " volume group '%s' - it is either missing or broken" %
1120 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121 except (ValueError, KeyError):
1122 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1123 " from node %s" % (node,))
1127 node_vol_should = {}
1129 for instance in instancelist:
1130 feedback_fn("* Verifying instance %s" % instance)
1131 inst_config = instanceinfo[instance]
1132 result = self._VerifyInstance(instance, inst_config, node_volume,
1133 node_instance, feedback_fn, n_offline)
1135 inst_nodes_offline = []
1137 inst_config.MapLVsByNode(node_vol_should)
1139 instance_cfg[instance] = inst_config
1141 pnode = inst_config.primary_node
1142 if pnode in node_info:
1143 node_info[pnode]['pinst'].append(instance)
1144 elif pnode not in n_offline:
1145 feedback_fn(" - ERROR: instance %s, connection to primary node"
1146 " %s failed" % (instance, pnode))
1149 if pnode in n_offline:
1150 inst_nodes_offline.append(pnode)
1152 # If the instance is non-redundant we cannot survive losing its primary
1153 # node, so we are not N+1 compliant. On the other hand we have no disk
1154 # templates with more than one secondary so that situation is not well
1156 # FIXME: does not support file-backed instances
1157 if len(inst_config.secondary_nodes) == 0:
1158 i_non_redundant.append(instance)
1159 elif len(inst_config.secondary_nodes) > 1:
1160 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1163 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164 i_non_a_balanced.append(instance)
1166 for snode in inst_config.secondary_nodes:
1167 if snode in node_info:
1168 node_info[snode]['sinst'].append(instance)
1169 if pnode not in node_info[snode]['sinst-by-pnode']:
1170 node_info[snode]['sinst-by-pnode'][pnode] = []
1171 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172 elif snode not in n_offline:
1173 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1174 " %s failed" % (instance, snode))
1176 if snode in n_offline:
1177 inst_nodes_offline.append(snode)
1179 if inst_nodes_offline:
1180 # warn that the instance lives on offline nodes, and set bad=True
1181 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1182 ", ".join(inst_nodes_offline))
1185 feedback_fn("* Verifying orphan volumes")
1186 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1190 feedback_fn("* Verifying remaining instances")
1191 result = self._VerifyOrphanInstances(instancelist, node_instance,
1195 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196 feedback_fn("* Verifying N+1 Memory redundancy")
1197 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1200 feedback_fn("* Other Notes")
1202 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1203 % len(i_non_redundant))
1205 if i_non_a_balanced:
1206 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1207 % len(i_non_a_balanced))
1210 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1213 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1217 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218 """Analize the post-hooks' result
1220 This method analyses the hook result, handles it, and sends some
1221 nicely-formatted feedback back to the user.
1223 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225 @param hooks_results: the results of the multi-node hooks rpc call
1226 @param feedback_fn: function used send feedback back to the caller
1227 @param lu_result: previous Exec result
1228 @return: the new Exec result, based on the previous result
1232 # We only really run POST phase hooks, and are only interested in
1234 if phase == constants.HOOKS_PHASE_POST:
1235 # Used to change hooks' output to proper indentation
1236 indent_re = re.compile('^', re.M)
1237 feedback_fn("* Hooks Results")
1238 if not hooks_results:
1239 feedback_fn(" - ERROR: general communication failure")
1242 for node_name in hooks_results:
1243 show_node_header = True
1244 res = hooks_results[node_name]
1245 if res.failed or res.data is False or not isinstance(res.data, list):
1247 # no need to warn or set fail return value
1249 feedback_fn(" Communication failure in hooks execution")
1252 for script, hkr, output in res.data:
1253 if hkr == constants.HKR_FAIL:
1254 # The node header is only shown once, if there are
1255 # failing hooks on that node
1256 if show_node_header:
1257 feedback_fn(" Node %s:" % node_name)
1258 show_node_header = False
1259 feedback_fn(" ERROR: Script %s failed, output:" % script)
1260 output = indent_re.sub(' ', output)
1261 feedback_fn("%s" % output)
1267 class LUVerifyDisks(NoHooksLU):
1268 """Verifies the cluster disks status.
1274 def ExpandNames(self):
1275 self.needed_locks = {
1276 locking.LEVEL_NODE: locking.ALL_SET,
1277 locking.LEVEL_INSTANCE: locking.ALL_SET,
1279 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1281 def CheckPrereq(self):
1282 """Check prerequisites.
1284 This has no prerequisites.
1289 def Exec(self, feedback_fn):
1290 """Verify integrity of cluster disks.
1293 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1295 vg_name = self.cfg.GetVGName()
1296 nodes = utils.NiceSort(self.cfg.GetNodeList())
1297 instances = [self.cfg.GetInstanceInfo(name)
1298 for name in self.cfg.GetInstanceList()]
1301 for inst in instances:
1303 if (not inst.admin_up or
1304 inst.disk_template not in constants.DTS_NET_MIRROR):
1306 inst.MapLVsByNode(inst_lvs)
1307 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1308 for node, vol_list in inst_lvs.iteritems():
1309 for vol in vol_list:
1310 nv_dict[(node, vol)] = inst
1315 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1320 lvs = node_lvs[node]
1323 self.LogWarning("Connection to node %s failed: %s" %
1327 if isinstance(lvs, basestring):
1328 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1329 res_nlvm[node] = lvs
1331 elif not isinstance(lvs, dict):
1332 logging.warning("Connection to node %s failed or invalid data"
1334 res_nodes.append(node)
1337 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1338 inst = nv_dict.pop((node, lv_name), None)
1339 if (not lv_online and inst is not None
1340 and inst.name not in res_instances):
1341 res_instances.append(inst.name)
1343 # any leftover items in nv_dict are missing LVs, let's arrange the
1345 for key, inst in nv_dict.iteritems():
1346 if inst.name not in res_missing:
1347 res_missing[inst.name] = []
1348 res_missing[inst.name].append(key)
1353 class LURenameCluster(LogicalUnit):
1354 """Rename the cluster.
1357 HPATH = "cluster-rename"
1358 HTYPE = constants.HTYPE_CLUSTER
1361 def BuildHooksEnv(self):
1366 "OP_TARGET": self.cfg.GetClusterName(),
1367 "NEW_NAME": self.op.name,
1369 mn = self.cfg.GetMasterNode()
1370 return env, [mn], [mn]
1372 def CheckPrereq(self):
1373 """Verify that the passed name is a valid one.
1376 hostname = utils.HostInfo(self.op.name)
1378 new_name = hostname.name
1379 self.ip = new_ip = hostname.ip
1380 old_name = self.cfg.GetClusterName()
1381 old_ip = self.cfg.GetMasterIP()
1382 if new_name == old_name and new_ip == old_ip:
1383 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1384 " cluster has changed")
1385 if new_ip != old_ip:
1386 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1387 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1388 " reachable on the network. Aborting." %
1391 self.op.name = new_name
1393 def Exec(self, feedback_fn):
1394 """Rename the cluster.
1397 clustername = self.op.name
1400 # shutdown the master IP
1401 master = self.cfg.GetMasterNode()
1402 result = self.rpc.call_node_stop_master(master, False)
1403 if result.failed or not result.data:
1404 raise errors.OpExecError("Could not disable the master role")
1407 cluster = self.cfg.GetClusterInfo()
1408 cluster.cluster_name = clustername
1409 cluster.master_ip = ip
1410 self.cfg.Update(cluster)
1412 # update the known hosts file
1413 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1414 node_list = self.cfg.GetNodeList()
1416 node_list.remove(master)
1419 result = self.rpc.call_upload_file(node_list,
1420 constants.SSH_KNOWN_HOSTS_FILE)
1421 for to_node, to_result in result.iteritems():
1422 msg = to_result.RemoteFailMsg()
1424 msg = ("Copy of file %s to node %s failed: %s" %
1425 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1426 self.proc.LogWarning(msg)
1429 result = self.rpc.call_node_start_master(master, False)
1430 if result.failed or not result.data:
1431 self.LogWarning("Could not re-enable the master role on"
1432 " the master, please restart manually.")
1435 def _RecursiveCheckIfLVMBased(disk):
1436 """Check if the given disk or its children are lvm-based.
1438 @type disk: L{objects.Disk}
1439 @param disk: the disk to check
1441 @return: boolean indicating whether a LD_LV dev_type was found or not
1445 for chdisk in disk.children:
1446 if _RecursiveCheckIfLVMBased(chdisk):
1448 return disk.dev_type == constants.LD_LV
1451 class LUSetClusterParams(LogicalUnit):
1452 """Change the parameters of the cluster.
1455 HPATH = "cluster-modify"
1456 HTYPE = constants.HTYPE_CLUSTER
1460 def CheckArguments(self):
1464 if not hasattr(self.op, "candidate_pool_size"):
1465 self.op.candidate_pool_size = None
1466 if self.op.candidate_pool_size is not None:
1468 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1469 except (ValueError, TypeError), err:
1470 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1472 if self.op.candidate_pool_size < 1:
1473 raise errors.OpPrereqError("At least one master candidate needed")
1475 def ExpandNames(self):
1476 # FIXME: in the future maybe other cluster params won't require checking on
1477 # all nodes to be modified.
1478 self.needed_locks = {
1479 locking.LEVEL_NODE: locking.ALL_SET,
1481 self.share_locks[locking.LEVEL_NODE] = 1
1483 def BuildHooksEnv(self):
1488 "OP_TARGET": self.cfg.GetClusterName(),
1489 "NEW_VG_NAME": self.op.vg_name,
1491 mn = self.cfg.GetMasterNode()
1492 return env, [mn], [mn]
1494 def CheckPrereq(self):
1495 """Check prerequisites.
1497 This checks whether the given params don't conflict and
1498 if the given volume group is valid.
1501 if self.op.vg_name is not None and not self.op.vg_name:
1502 instances = self.cfg.GetAllInstancesInfo().values()
1503 for inst in instances:
1504 for disk in inst.disks:
1505 if _RecursiveCheckIfLVMBased(disk):
1506 raise errors.OpPrereqError("Cannot disable lvm storage while"
1507 " lvm-based instances exist")
1509 node_list = self.acquired_locks[locking.LEVEL_NODE]
1511 # if vg_name not None, checks given volume group on all nodes
1513 vglist = self.rpc.call_vg_list(node_list)
1514 for node in node_list:
1515 if vglist[node].failed:
1516 # ignoring down node
1517 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1519 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1521 constants.MIN_VG_SIZE)
1523 raise errors.OpPrereqError("Error on node '%s': %s" %
1526 self.cluster = cluster = self.cfg.GetClusterInfo()
1527 # validate params changes
1528 if self.op.beparams:
1529 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1530 self.new_beparams = objects.FillDict(
1531 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1533 if self.op.nicparams:
1534 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1535 self.new_nicparams = objects.FillDict(
1536 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1537 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1539 # hypervisor list/parameters
1540 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1541 if self.op.hvparams:
1542 if not isinstance(self.op.hvparams, dict):
1543 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1544 for hv_name, hv_dict in self.op.hvparams.items():
1545 if hv_name not in self.new_hvparams:
1546 self.new_hvparams[hv_name] = hv_dict
1548 self.new_hvparams[hv_name].update(hv_dict)
1550 if self.op.enabled_hypervisors is not None:
1551 self.hv_list = self.op.enabled_hypervisors
1553 self.hv_list = cluster.enabled_hypervisors
1555 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1556 # either the enabled list has changed, or the parameters have, validate
1557 for hv_name, hv_params in self.new_hvparams.items():
1558 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1559 (self.op.enabled_hypervisors and
1560 hv_name in self.op.enabled_hypervisors)):
1561 # either this is a new hypervisor, or its parameters have changed
1562 hv_class = hypervisor.GetHypervisor(hv_name)
1563 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1564 hv_class.CheckParameterSyntax(hv_params)
1565 _CheckHVParams(self, node_list, hv_name, hv_params)
1567 def Exec(self, feedback_fn):
1568 """Change the parameters of the cluster.
1571 if self.op.vg_name is not None:
1572 new_volume = self.op.vg_name
1575 if new_volume != self.cfg.GetVGName():
1576 self.cfg.SetVGName(new_volume)
1578 feedback_fn("Cluster LVM configuration already in desired"
1579 " state, not changing")
1580 if self.op.hvparams:
1581 self.cluster.hvparams = self.new_hvparams
1582 if self.op.enabled_hypervisors is not None:
1583 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1584 if self.op.beparams:
1585 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1586 if self.op.nicparams:
1587 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1589 if self.op.candidate_pool_size is not None:
1590 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1592 self.cfg.Update(self.cluster)
1594 # we want to update nodes after the cluster so that if any errors
1595 # happen, we have recorded and saved the cluster info
1596 if self.op.candidate_pool_size is not None:
1597 _AdjustCandidatePool(self)
1600 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1601 """Distribute additional files which are part of the cluster configuration.
1603 ConfigWriter takes care of distributing the config and ssconf files, but
1604 there are more files which should be distributed to all nodes. This function
1605 makes sure those are copied.
1607 @param lu: calling logical unit
1608 @param additional_nodes: list of nodes not in the config to distribute to
1611 # 1. Gather target nodes
1612 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1613 dist_nodes = lu.cfg.GetNodeList()
1614 if additional_nodes is not None:
1615 dist_nodes.extend(additional_nodes)
1616 if myself.name in dist_nodes:
1617 dist_nodes.remove(myself.name)
1618 # 2. Gather files to distribute
1619 dist_files = set([constants.ETC_HOSTS,
1620 constants.SSH_KNOWN_HOSTS_FILE,
1621 constants.RAPI_CERT_FILE,
1622 constants.RAPI_USERS_FILE,
1625 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1626 for hv_name in enabled_hypervisors:
1627 hv_class = hypervisor.GetHypervisor(hv_name)
1628 dist_files.update(hv_class.GetAncillaryFiles())
1630 # 3. Perform the files upload
1631 for fname in dist_files:
1632 if os.path.exists(fname):
1633 result = lu.rpc.call_upload_file(dist_nodes, fname)
1634 for to_node, to_result in result.items():
1635 msg = to_result.RemoteFailMsg()
1637 msg = ("Copy of file %s to node %s failed: %s" %
1638 (fname, to_node, msg))
1639 lu.proc.LogWarning(msg)
1642 class LURedistributeConfig(NoHooksLU):
1643 """Force the redistribution of cluster configuration.
1645 This is a very simple LU.
1651 def ExpandNames(self):
1652 self.needed_locks = {
1653 locking.LEVEL_NODE: locking.ALL_SET,
1655 self.share_locks[locking.LEVEL_NODE] = 1
1657 def CheckPrereq(self):
1658 """Check prerequisites.
1662 def Exec(self, feedback_fn):
1663 """Redistribute the configuration.
1666 self.cfg.Update(self.cfg.GetClusterInfo())
1667 _RedistributeAncillaryFiles(self)
1670 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1671 """Sleep and poll for an instance's disk to sync.
1674 if not instance.disks:
1678 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1680 node = instance.primary_node
1682 for dev in instance.disks:
1683 lu.cfg.SetDiskID(dev, node)
1689 cumul_degraded = False
1690 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1691 msg = rstats.RemoteFailMsg()
1693 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1696 raise errors.RemoteError("Can't contact node %s for mirror data,"
1697 " aborting." % node)
1700 rstats = rstats.payload
1702 for i, mstat in enumerate(rstats):
1704 lu.LogWarning("Can't compute data for node %s/%s",
1705 node, instance.disks[i].iv_name)
1707 # we ignore the ldisk parameter
1708 perc_done, est_time, is_degraded, _ = mstat
1709 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1710 if perc_done is not None:
1712 if est_time is not None:
1713 rem_time = "%d estimated seconds remaining" % est_time
1716 rem_time = "no time estimate"
1717 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1718 (instance.disks[i].iv_name, perc_done, rem_time))
1722 time.sleep(min(60, max_time))
1725 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1726 return not cumul_degraded
1729 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1730 """Check that mirrors are not degraded.
1732 The ldisk parameter, if True, will change the test from the
1733 is_degraded attribute (which represents overall non-ok status for
1734 the device(s)) to the ldisk (representing the local storage status).
1737 lu.cfg.SetDiskID(dev, node)
1744 if on_primary or dev.AssembleOnSecondary():
1745 rstats = lu.rpc.call_blockdev_find(node, dev)
1746 msg = rstats.RemoteFailMsg()
1748 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1750 elif not rstats.payload:
1751 lu.LogWarning("Can't find disk on node %s", node)
1754 result = result and (not rstats.payload[idx])
1756 for child in dev.children:
1757 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1762 class LUDiagnoseOS(NoHooksLU):
1763 """Logical unit for OS diagnose/query.
1766 _OP_REQP = ["output_fields", "names"]
1768 _FIELDS_STATIC = utils.FieldSet()
1769 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1771 def ExpandNames(self):
1773 raise errors.OpPrereqError("Selective OS query not supported")
1775 _CheckOutputFields(static=self._FIELDS_STATIC,
1776 dynamic=self._FIELDS_DYNAMIC,
1777 selected=self.op.output_fields)
1779 # Lock all nodes, in shared mode
1780 # Temporary removal of locks, should be reverted later
1781 # TODO: reintroduce locks when they are lighter-weight
1782 self.needed_locks = {}
1783 #self.share_locks[locking.LEVEL_NODE] = 1
1784 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1786 def CheckPrereq(self):
1787 """Check prerequisites.
1792 def _DiagnoseByOS(node_list, rlist):
1793 """Remaps a per-node return list into an a per-os per-node dictionary
1795 @param node_list: a list with the names of all nodes
1796 @param rlist: a map with node names as keys and OS objects as values
1799 @return: a dictionary with osnames as keys and as value another map, with
1800 nodes as keys and list of OS objects as values, eg::
1802 {"debian-etch": {"node1": [<object>,...],
1803 "node2": [<object>,]}
1808 # we build here the list of nodes that didn't fail the RPC (at RPC
1809 # level), so that nodes with a non-responding node daemon don't
1810 # make all OSes invalid
1811 good_nodes = [node_name for node_name in rlist
1812 if not rlist[node_name].failed]
1813 for node_name, nr in rlist.iteritems():
1814 if nr.failed or not nr.data:
1816 for os_obj in nr.data:
1817 if os_obj.name not in all_os:
1818 # build a list of nodes for this os containing empty lists
1819 # for each node in node_list
1820 all_os[os_obj.name] = {}
1821 for nname in good_nodes:
1822 all_os[os_obj.name][nname] = []
1823 all_os[os_obj.name][node_name].append(os_obj)
1826 def Exec(self, feedback_fn):
1827 """Compute the list of OSes.
1830 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1831 node_data = self.rpc.call_os_diagnose(valid_nodes)
1832 if node_data == False:
1833 raise errors.OpExecError("Can't gather the list of OSes")
1834 pol = self._DiagnoseByOS(valid_nodes, node_data)
1836 for os_name, os_data in pol.iteritems():
1838 for field in self.op.output_fields:
1841 elif field == "valid":
1842 val = utils.all([osl and osl[0] for osl in os_data.values()])
1843 elif field == "node_status":
1845 for node_name, nos_list in os_data.iteritems():
1846 val[node_name] = [(v.status, v.path) for v in nos_list]
1848 raise errors.ParameterError(field)
1855 class LURemoveNode(LogicalUnit):
1856 """Logical unit for removing a node.
1859 HPATH = "node-remove"
1860 HTYPE = constants.HTYPE_NODE
1861 _OP_REQP = ["node_name"]
1863 def BuildHooksEnv(self):
1866 This doesn't run on the target node in the pre phase as a failed
1867 node would then be impossible to remove.
1871 "OP_TARGET": self.op.node_name,
1872 "NODE_NAME": self.op.node_name,
1874 all_nodes = self.cfg.GetNodeList()
1875 all_nodes.remove(self.op.node_name)
1876 return env, all_nodes, all_nodes
1878 def CheckPrereq(self):
1879 """Check prerequisites.
1882 - the node exists in the configuration
1883 - it does not have primary or secondary instances
1884 - it's not the master
1886 Any errors are signalled by raising errors.OpPrereqError.
1889 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1891 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1893 instance_list = self.cfg.GetInstanceList()
1895 masternode = self.cfg.GetMasterNode()
1896 if node.name == masternode:
1897 raise errors.OpPrereqError("Node is the master node,"
1898 " you need to failover first.")
1900 for instance_name in instance_list:
1901 instance = self.cfg.GetInstanceInfo(instance_name)
1902 if node.name in instance.all_nodes:
1903 raise errors.OpPrereqError("Instance %s is still running on the node,"
1904 " please remove first." % instance_name)
1905 self.op.node_name = node.name
1908 def Exec(self, feedback_fn):
1909 """Removes the node from the cluster.
1913 logging.info("Stopping the node daemon and removing configs from node %s",
1916 self.context.RemoveNode(node.name)
1918 self.rpc.call_node_leave_cluster(node.name)
1920 # Promote nodes to master candidate as needed
1921 _AdjustCandidatePool(self)
1924 class LUQueryNodes(NoHooksLU):
1925 """Logical unit for querying nodes.
1928 _OP_REQP = ["output_fields", "names", "use_locking"]
1930 _FIELDS_DYNAMIC = utils.FieldSet(
1932 "mtotal", "mnode", "mfree",
1934 "ctotal", "cnodes", "csockets",
1937 _FIELDS_STATIC = utils.FieldSet(
1938 "name", "pinst_cnt", "sinst_cnt",
1939 "pinst_list", "sinst_list",
1940 "pip", "sip", "tags",
1948 def ExpandNames(self):
1949 _CheckOutputFields(static=self._FIELDS_STATIC,
1950 dynamic=self._FIELDS_DYNAMIC,
1951 selected=self.op.output_fields)
1953 self.needed_locks = {}
1954 self.share_locks[locking.LEVEL_NODE] = 1
1957 self.wanted = _GetWantedNodes(self, self.op.names)
1959 self.wanted = locking.ALL_SET
1961 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1962 self.do_locking = self.do_node_query and self.op.use_locking
1964 # if we don't request only static fields, we need to lock the nodes
1965 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1968 def CheckPrereq(self):
1969 """Check prerequisites.
1972 # The validation of the node list is done in the _GetWantedNodes,
1973 # if non empty, and if empty, there's no validation to do
1976 def Exec(self, feedback_fn):
1977 """Computes the list of nodes and their attributes.
1980 all_info = self.cfg.GetAllNodesInfo()
1982 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1983 elif self.wanted != locking.ALL_SET:
1984 nodenames = self.wanted
1985 missing = set(nodenames).difference(all_info.keys())
1987 raise errors.OpExecError(
1988 "Some nodes were removed before retrieving their data: %s" % missing)
1990 nodenames = all_info.keys()
1992 nodenames = utils.NiceSort(nodenames)
1993 nodelist = [all_info[name] for name in nodenames]
1995 # begin data gathering
1997 if self.do_node_query:
1999 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2000 self.cfg.GetHypervisorType())
2001 for name in nodenames:
2002 nodeinfo = node_data[name]
2003 if not nodeinfo.failed and nodeinfo.data:
2004 nodeinfo = nodeinfo.data
2005 fn = utils.TryConvert
2007 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2008 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2009 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2010 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2011 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2012 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2013 "bootid": nodeinfo.get('bootid', None),
2014 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2015 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2018 live_data[name] = {}
2020 live_data = dict.fromkeys(nodenames, {})
2022 node_to_primary = dict([(name, set()) for name in nodenames])
2023 node_to_secondary = dict([(name, set()) for name in nodenames])
2025 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2026 "sinst_cnt", "sinst_list"))
2027 if inst_fields & frozenset(self.op.output_fields):
2028 instancelist = self.cfg.GetInstanceList()
2030 for instance_name in instancelist:
2031 inst = self.cfg.GetInstanceInfo(instance_name)
2032 if inst.primary_node in node_to_primary:
2033 node_to_primary[inst.primary_node].add(inst.name)
2034 for secnode in inst.secondary_nodes:
2035 if secnode in node_to_secondary:
2036 node_to_secondary[secnode].add(inst.name)
2038 master_node = self.cfg.GetMasterNode()
2040 # end data gathering
2043 for node in nodelist:
2045 for field in self.op.output_fields:
2048 elif field == "pinst_list":
2049 val = list(node_to_primary[node.name])
2050 elif field == "sinst_list":
2051 val = list(node_to_secondary[node.name])
2052 elif field == "pinst_cnt":
2053 val = len(node_to_primary[node.name])
2054 elif field == "sinst_cnt":
2055 val = len(node_to_secondary[node.name])
2056 elif field == "pip":
2057 val = node.primary_ip
2058 elif field == "sip":
2059 val = node.secondary_ip
2060 elif field == "tags":
2061 val = list(node.GetTags())
2062 elif field == "serial_no":
2063 val = node.serial_no
2064 elif field == "master_candidate":
2065 val = node.master_candidate
2066 elif field == "master":
2067 val = node.name == master_node
2068 elif field == "offline":
2070 elif field == "drained":
2072 elif self._FIELDS_DYNAMIC.Matches(field):
2073 val = live_data[node.name].get(field, None)
2075 raise errors.ParameterError(field)
2076 node_output.append(val)
2077 output.append(node_output)
2082 class LUQueryNodeVolumes(NoHooksLU):
2083 """Logical unit for getting volumes on node(s).
2086 _OP_REQP = ["nodes", "output_fields"]
2088 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2089 _FIELDS_STATIC = utils.FieldSet("node")
2091 def ExpandNames(self):
2092 _CheckOutputFields(static=self._FIELDS_STATIC,
2093 dynamic=self._FIELDS_DYNAMIC,
2094 selected=self.op.output_fields)
2096 self.needed_locks = {}
2097 self.share_locks[locking.LEVEL_NODE] = 1
2098 if not self.op.nodes:
2099 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2101 self.needed_locks[locking.LEVEL_NODE] = \
2102 _GetWantedNodes(self, self.op.nodes)
2104 def CheckPrereq(self):
2105 """Check prerequisites.
2107 This checks that the fields required are valid output fields.
2110 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2112 def Exec(self, feedback_fn):
2113 """Computes the list of nodes and their attributes.
2116 nodenames = self.nodes
2117 volumes = self.rpc.call_node_volumes(nodenames)
2119 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2120 in self.cfg.GetInstanceList()]
2122 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2125 for node in nodenames:
2126 if node not in volumes or volumes[node].failed or not volumes[node].data:
2129 node_vols = volumes[node].data[:]
2130 node_vols.sort(key=lambda vol: vol['dev'])
2132 for vol in node_vols:
2134 for field in self.op.output_fields:
2137 elif field == "phys":
2141 elif field == "name":
2143 elif field == "size":
2144 val = int(float(vol['size']))
2145 elif field == "instance":
2147 if node not in lv_by_node[inst]:
2149 if vol['name'] in lv_by_node[inst][node]:
2155 raise errors.ParameterError(field)
2156 node_output.append(str(val))
2158 output.append(node_output)
2163 class LUAddNode(LogicalUnit):
2164 """Logical unit for adding node to the cluster.
2168 HTYPE = constants.HTYPE_NODE
2169 _OP_REQP = ["node_name"]
2171 def BuildHooksEnv(self):
2174 This will run on all nodes before, and on all nodes + the new node after.
2178 "OP_TARGET": self.op.node_name,
2179 "NODE_NAME": self.op.node_name,
2180 "NODE_PIP": self.op.primary_ip,
2181 "NODE_SIP": self.op.secondary_ip,
2183 nodes_0 = self.cfg.GetNodeList()
2184 nodes_1 = nodes_0 + [self.op.node_name, ]
2185 return env, nodes_0, nodes_1
2187 def CheckPrereq(self):
2188 """Check prerequisites.
2191 - the new node is not already in the config
2193 - its parameters (single/dual homed) matches the cluster
2195 Any errors are signalled by raising errors.OpPrereqError.
2198 node_name = self.op.node_name
2201 dns_data = utils.HostInfo(node_name)
2203 node = dns_data.name
2204 primary_ip = self.op.primary_ip = dns_data.ip
2205 secondary_ip = getattr(self.op, "secondary_ip", None)
2206 if secondary_ip is None:
2207 secondary_ip = primary_ip
2208 if not utils.IsValidIP(secondary_ip):
2209 raise errors.OpPrereqError("Invalid secondary IP given")
2210 self.op.secondary_ip = secondary_ip
2212 node_list = cfg.GetNodeList()
2213 if not self.op.readd and node in node_list:
2214 raise errors.OpPrereqError("Node %s is already in the configuration" %
2216 elif self.op.readd and node not in node_list:
2217 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2219 for existing_node_name in node_list:
2220 existing_node = cfg.GetNodeInfo(existing_node_name)
2222 if self.op.readd and node == existing_node_name:
2223 if (existing_node.primary_ip != primary_ip or
2224 existing_node.secondary_ip != secondary_ip):
2225 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2226 " address configuration as before")
2229 if (existing_node.primary_ip == primary_ip or
2230 existing_node.secondary_ip == primary_ip or
2231 existing_node.primary_ip == secondary_ip or
2232 existing_node.secondary_ip == secondary_ip):
2233 raise errors.OpPrereqError("New node ip address(es) conflict with"
2234 " existing node %s" % existing_node.name)
2236 # check that the type of the node (single versus dual homed) is the
2237 # same as for the master
2238 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2239 master_singlehomed = myself.secondary_ip == myself.primary_ip
2240 newbie_singlehomed = secondary_ip == primary_ip
2241 if master_singlehomed != newbie_singlehomed:
2242 if master_singlehomed:
2243 raise errors.OpPrereqError("The master has no private ip but the"
2244 " new node has one")
2246 raise errors.OpPrereqError("The master has a private ip but the"
2247 " new node doesn't have one")
2249 # checks reachablity
2250 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2251 raise errors.OpPrereqError("Node not reachable by ping")
2253 if not newbie_singlehomed:
2254 # check reachability from my secondary ip to newbie's secondary ip
2255 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2256 source=myself.secondary_ip):
2257 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2258 " based ping to noded port")
2260 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2261 mc_now, _ = self.cfg.GetMasterCandidateStats()
2262 master_candidate = mc_now < cp_size
2264 self.new_node = objects.Node(name=node,
2265 primary_ip=primary_ip,
2266 secondary_ip=secondary_ip,
2267 master_candidate=master_candidate,
2268 offline=False, drained=False)
2270 def Exec(self, feedback_fn):
2271 """Adds the new node to the cluster.
2274 new_node = self.new_node
2275 node = new_node.name
2277 # check connectivity
2278 result = self.rpc.call_version([node])[node]
2281 if constants.PROTOCOL_VERSION == result.data:
2282 logging.info("Communication to node %s fine, sw version %s match",
2285 raise errors.OpExecError("Version mismatch master version %s,"
2286 " node version %s" %
2287 (constants.PROTOCOL_VERSION, result.data))
2289 raise errors.OpExecError("Cannot get version from the new node")
2292 logging.info("Copy ssh key to node %s", node)
2293 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2295 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2296 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2302 keyarray.append(f.read())
2306 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2308 keyarray[3], keyarray[4], keyarray[5])
2310 msg = result.RemoteFailMsg()
2312 raise errors.OpExecError("Cannot transfer ssh keys to the"
2313 " new node: %s" % msg)
2315 # Add node to our /etc/hosts, and add key to known_hosts
2316 if self.cfg.GetClusterInfo().modify_etc_hosts:
2317 utils.AddHostToEtcHosts(new_node.name)
2319 if new_node.secondary_ip != new_node.primary_ip:
2320 result = self.rpc.call_node_has_ip_address(new_node.name,
2321 new_node.secondary_ip)
2322 if result.failed or not result.data:
2323 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2324 " you gave (%s). Please fix and re-run this"
2325 " command." % new_node.secondary_ip)
2327 node_verify_list = [self.cfg.GetMasterNode()]
2328 node_verify_param = {
2330 # TODO: do a node-net-test as well?
2333 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2334 self.cfg.GetClusterName())
2335 for verifier in node_verify_list:
2336 if result[verifier].failed or not result[verifier].data:
2337 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2338 " for remote verification" % verifier)
2339 if result[verifier].data['nodelist']:
2340 for failed in result[verifier].data['nodelist']:
2341 feedback_fn("ssh/hostname verification failed %s -> %s" %
2342 (verifier, result[verifier].data['nodelist'][failed]))
2343 raise errors.OpExecError("ssh/hostname verification failed.")
2346 _RedistributeAncillaryFiles(self)
2347 self.context.ReaddNode(new_node)
2349 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2350 self.context.AddNode(new_node)
2353 class LUSetNodeParams(LogicalUnit):
2354 """Modifies the parameters of a node.
2357 HPATH = "node-modify"
2358 HTYPE = constants.HTYPE_NODE
2359 _OP_REQP = ["node_name"]
2362 def CheckArguments(self):
2363 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2364 if node_name is None:
2365 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2366 self.op.node_name = node_name
2367 _CheckBooleanOpField(self.op, 'master_candidate')
2368 _CheckBooleanOpField(self.op, 'offline')
2369 _CheckBooleanOpField(self.op, 'drained')
2370 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2371 if all_mods.count(None) == 3:
2372 raise errors.OpPrereqError("Please pass at least one modification")
2373 if all_mods.count(True) > 1:
2374 raise errors.OpPrereqError("Can't set the node into more than one"
2375 " state at the same time")
2377 def ExpandNames(self):
2378 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2380 def BuildHooksEnv(self):
2383 This runs on the master node.
2387 "OP_TARGET": self.op.node_name,
2388 "MASTER_CANDIDATE": str(self.op.master_candidate),
2389 "OFFLINE": str(self.op.offline),
2390 "DRAINED": str(self.op.drained),
2392 nl = [self.cfg.GetMasterNode(),
2396 def CheckPrereq(self):
2397 """Check prerequisites.
2399 This only checks the instance list against the existing names.
2402 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2404 if ((self.op.master_candidate == False or self.op.offline == True or
2405 self.op.drained == True) and node.master_candidate):
2406 # we will demote the node from master_candidate
2407 if self.op.node_name == self.cfg.GetMasterNode():
2408 raise errors.OpPrereqError("The master node has to be a"
2409 " master candidate, online and not drained")
2410 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2411 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2412 if num_candidates <= cp_size:
2413 msg = ("Not enough master candidates (desired"
2414 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2416 self.LogWarning(msg)
2418 raise errors.OpPrereqError(msg)
2420 if (self.op.master_candidate == True and
2421 ((node.offline and not self.op.offline == False) or
2422 (node.drained and not self.op.drained == False))):
2423 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2424 " to master_candidate" % node.name)
2428 def Exec(self, feedback_fn):
2437 if self.op.offline is not None:
2438 node.offline = self.op.offline
2439 result.append(("offline", str(self.op.offline)))
2440 if self.op.offline == True:
2441 if node.master_candidate:
2442 node.master_candidate = False
2444 result.append(("master_candidate", "auto-demotion due to offline"))
2446 node.drained = False
2447 result.append(("drained", "clear drained status due to offline"))
2449 if self.op.master_candidate is not None:
2450 node.master_candidate = self.op.master_candidate
2452 result.append(("master_candidate", str(self.op.master_candidate)))
2453 if self.op.master_candidate == False:
2454 rrc = self.rpc.call_node_demote_from_mc(node.name)
2455 msg = rrc.RemoteFailMsg()
2457 self.LogWarning("Node failed to demote itself: %s" % msg)
2459 if self.op.drained is not None:
2460 node.drained = self.op.drained
2461 result.append(("drained", str(self.op.drained)))
2462 if self.op.drained == True:
2463 if node.master_candidate:
2464 node.master_candidate = False
2466 result.append(("master_candidate", "auto-demotion due to drain"))
2468 node.offline = False
2469 result.append(("offline", "clear offline status due to drain"))
2471 # this will trigger configuration file update, if needed
2472 self.cfg.Update(node)
2473 # this will trigger job queue propagation or cleanup
2475 self.context.ReaddNode(node)
2480 class LUPowercycleNode(NoHooksLU):
2481 """Powercycles a node.
2484 _OP_REQP = ["node_name", "force"]
2487 def CheckArguments(self):
2488 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2489 if node_name is None:
2490 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2491 self.op.node_name = node_name
2492 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2493 raise errors.OpPrereqError("The node is the master and the force"
2494 " parameter was not set")
2496 def ExpandNames(self):
2497 """Locking for PowercycleNode.
2499 This is a last-resource option and shouldn't block on other
2500 jobs. Therefore, we grab no locks.
2503 self.needed_locks = {}
2505 def CheckPrereq(self):
2506 """Check prerequisites.
2508 This LU has no prereqs.
2513 def Exec(self, feedback_fn):
2517 result = self.rpc.call_node_powercycle(self.op.node_name,
2518 self.cfg.GetHypervisorType())
2519 msg = result.RemoteFailMsg()
2521 raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2522 return result.payload
2525 class LUQueryClusterInfo(NoHooksLU):
2526 """Query cluster configuration.
2532 def ExpandNames(self):
2533 self.needed_locks = {}
2535 def CheckPrereq(self):
2536 """No prerequsites needed for this LU.
2541 def Exec(self, feedback_fn):
2542 """Return cluster config.
2545 cluster = self.cfg.GetClusterInfo()
2547 "software_version": constants.RELEASE_VERSION,
2548 "protocol_version": constants.PROTOCOL_VERSION,
2549 "config_version": constants.CONFIG_VERSION,
2550 "os_api_version": constants.OS_API_VERSION,
2551 "export_version": constants.EXPORT_VERSION,
2552 "architecture": (platform.architecture()[0], platform.machine()),
2553 "name": cluster.cluster_name,
2554 "master": cluster.master_node,
2555 "default_hypervisor": cluster.default_hypervisor,
2556 "enabled_hypervisors": cluster.enabled_hypervisors,
2557 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2558 for hypervisor in cluster.enabled_hypervisors]),
2559 "beparams": cluster.beparams,
2560 "nicparams": cluster.nicparams,
2561 "candidate_pool_size": cluster.candidate_pool_size,
2562 "master_netdev": cluster.master_netdev,
2563 "volume_group_name": cluster.volume_group_name,
2564 "file_storage_dir": cluster.file_storage_dir,
2570 class LUQueryConfigValues(NoHooksLU):
2571 """Return configuration values.
2576 _FIELDS_DYNAMIC = utils.FieldSet()
2577 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2579 def ExpandNames(self):
2580 self.needed_locks = {}
2582 _CheckOutputFields(static=self._FIELDS_STATIC,
2583 dynamic=self._FIELDS_DYNAMIC,
2584 selected=self.op.output_fields)
2586 def CheckPrereq(self):
2587 """No prerequisites.
2592 def Exec(self, feedback_fn):
2593 """Dump a representation of the cluster config to the standard output.
2597 for field in self.op.output_fields:
2598 if field == "cluster_name":
2599 entry = self.cfg.GetClusterName()
2600 elif field == "master_node":
2601 entry = self.cfg.GetMasterNode()
2602 elif field == "drain_flag":
2603 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2605 raise errors.ParameterError(field)
2606 values.append(entry)
2610 class LUActivateInstanceDisks(NoHooksLU):
2611 """Bring up an instance's disks.
2614 _OP_REQP = ["instance_name"]
2617 def ExpandNames(self):
2618 self._ExpandAndLockInstance()
2619 self.needed_locks[locking.LEVEL_NODE] = []
2620 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2622 def DeclareLocks(self, level):
2623 if level == locking.LEVEL_NODE:
2624 self._LockInstancesNodes()
2626 def CheckPrereq(self):
2627 """Check prerequisites.
2629 This checks that the instance is in the cluster.
2632 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2633 assert self.instance is not None, \
2634 "Cannot retrieve locked instance %s" % self.op.instance_name
2635 _CheckNodeOnline(self, self.instance.primary_node)
2637 def Exec(self, feedback_fn):
2638 """Activate the disks.
2641 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2643 raise errors.OpExecError("Cannot activate block devices")
2648 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2649 """Prepare the block devices for an instance.
2651 This sets up the block devices on all nodes.
2653 @type lu: L{LogicalUnit}
2654 @param lu: the logical unit on whose behalf we execute
2655 @type instance: L{objects.Instance}
2656 @param instance: the instance for whose disks we assemble
2657 @type ignore_secondaries: boolean
2658 @param ignore_secondaries: if true, errors on secondary nodes
2659 won't result in an error return from the function
2660 @return: False if the operation failed, otherwise a list of
2661 (host, instance_visible_name, node_visible_name)
2662 with the mapping from node devices to instance devices
2667 iname = instance.name
2668 # With the two passes mechanism we try to reduce the window of
2669 # opportunity for the race condition of switching DRBD to primary
2670 # before handshaking occured, but we do not eliminate it
2672 # The proper fix would be to wait (with some limits) until the
2673 # connection has been made and drbd transitions from WFConnection
2674 # into any other network-connected state (Connected, SyncTarget,
2677 # 1st pass, assemble on all nodes in secondary mode
2678 for inst_disk in instance.disks:
2679 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2680 lu.cfg.SetDiskID(node_disk, node)
2681 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2682 msg = result.RemoteFailMsg()
2684 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2685 " (is_primary=False, pass=1): %s",
2686 inst_disk.iv_name, node, msg)
2687 if not ignore_secondaries:
2690 # FIXME: race condition on drbd migration to primary
2692 # 2nd pass, do only the primary node
2693 for inst_disk in instance.disks:
2694 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2695 if node != instance.primary_node:
2697 lu.cfg.SetDiskID(node_disk, node)
2698 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2699 msg = result.RemoteFailMsg()
2701 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2702 " (is_primary=True, pass=2): %s",
2703 inst_disk.iv_name, node, msg)
2705 device_info.append((instance.primary_node, inst_disk.iv_name,
2708 # leave the disks configured for the primary node
2709 # this is a workaround that would be fixed better by
2710 # improving the logical/physical id handling
2711 for disk in instance.disks:
2712 lu.cfg.SetDiskID(disk, instance.primary_node)
2714 return disks_ok, device_info
2717 def _StartInstanceDisks(lu, instance, force):
2718 """Start the disks of an instance.
2721 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2722 ignore_secondaries=force)
2724 _ShutdownInstanceDisks(lu, instance)
2725 if force is not None and not force:
2726 lu.proc.LogWarning("", hint="If the message above refers to a"
2728 " you can retry the operation using '--force'.")
2729 raise errors.OpExecError("Disk consistency error")
2732 class LUDeactivateInstanceDisks(NoHooksLU):
2733 """Shutdown an instance's disks.
2736 _OP_REQP = ["instance_name"]
2739 def ExpandNames(self):
2740 self._ExpandAndLockInstance()
2741 self.needed_locks[locking.LEVEL_NODE] = []
2742 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2744 def DeclareLocks(self, level):
2745 if level == locking.LEVEL_NODE:
2746 self._LockInstancesNodes()
2748 def CheckPrereq(self):
2749 """Check prerequisites.
2751 This checks that the instance is in the cluster.
2754 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2755 assert self.instance is not None, \
2756 "Cannot retrieve locked instance %s" % self.op.instance_name
2758 def Exec(self, feedback_fn):
2759 """Deactivate the disks
2762 instance = self.instance
2763 _SafeShutdownInstanceDisks(self, instance)
2766 def _SafeShutdownInstanceDisks(lu, instance):
2767 """Shutdown block devices of an instance.
2769 This function checks if an instance is running, before calling
2770 _ShutdownInstanceDisks.
2773 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2774 [instance.hypervisor])
2775 ins_l = ins_l[instance.primary_node]
2776 if ins_l.failed or not isinstance(ins_l.data, list):
2777 raise errors.OpExecError("Can't contact node '%s'" %
2778 instance.primary_node)
2780 if instance.name in ins_l.data:
2781 raise errors.OpExecError("Instance is running, can't shutdown"
2784 _ShutdownInstanceDisks(lu, instance)
2787 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2788 """Shutdown block devices of an instance.
2790 This does the shutdown on all nodes of the instance.
2792 If the ignore_primary is false, errors on the primary node are
2797 for disk in instance.disks:
2798 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2799 lu.cfg.SetDiskID(top_disk, node)
2800 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2801 msg = result.RemoteFailMsg()
2803 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2804 disk.iv_name, node, msg)
2805 if not ignore_primary or node != instance.primary_node:
2810 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2811 """Checks if a node has enough free memory.
2813 This function check if a given node has the needed amount of free
2814 memory. In case the node has less memory or we cannot get the
2815 information from the node, this function raise an OpPrereqError
2818 @type lu: C{LogicalUnit}
2819 @param lu: a logical unit from which we get configuration data
2821 @param node: the node to check
2822 @type reason: C{str}
2823 @param reason: string to use in the error message
2824 @type requested: C{int}
2825 @param requested: the amount of memory in MiB to check for
2826 @type hypervisor_name: C{str}
2827 @param hypervisor_name: the hypervisor to ask for memory stats
2828 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2829 we cannot check the node
2832 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2833 nodeinfo[node].Raise()
2834 free_mem = nodeinfo[node].data.get('memory_free')
2835 if not isinstance(free_mem, int):
2836 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2837 " was '%s'" % (node, free_mem))
2838 if requested > free_mem:
2839 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2840 " needed %s MiB, available %s MiB" %
2841 (node, reason, requested, free_mem))
2844 class LUStartupInstance(LogicalUnit):
2845 """Starts an instance.
2848 HPATH = "instance-start"
2849 HTYPE = constants.HTYPE_INSTANCE
2850 _OP_REQP = ["instance_name", "force"]
2853 def ExpandNames(self):
2854 self._ExpandAndLockInstance()
2856 def BuildHooksEnv(self):
2859 This runs on master, primary and secondary nodes of the instance.
2863 "FORCE": self.op.force,
2865 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2866 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2869 def CheckPrereq(self):
2870 """Check prerequisites.
2872 This checks that the instance is in the cluster.
2875 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2876 assert self.instance is not None, \
2877 "Cannot retrieve locked instance %s" % self.op.instance_name
2880 self.beparams = getattr(self.op, "beparams", {})
2882 if not isinstance(self.beparams, dict):
2883 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2884 " dict" % (type(self.beparams), ))
2885 # fill the beparams dict
2886 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2887 self.op.beparams = self.beparams
2890 self.hvparams = getattr(self.op, "hvparams", {})
2892 if not isinstance(self.hvparams, dict):
2893 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2894 " dict" % (type(self.hvparams), ))
2896 # check hypervisor parameter syntax (locally)
2897 cluster = self.cfg.GetClusterInfo()
2898 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2899 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2901 filled_hvp.update(self.hvparams)
2902 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2903 hv_type.CheckParameterSyntax(filled_hvp)
2904 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2905 self.op.hvparams = self.hvparams
2907 _CheckNodeOnline(self, instance.primary_node)
2909 bep = self.cfg.GetClusterInfo().FillBE(instance)
2910 # check bridges existance
2911 _CheckInstanceBridgesExist(self, instance)
2913 remote_info = self.rpc.call_instance_info(instance.primary_node,
2915 instance.hypervisor)
2917 if not remote_info.data:
2918 _CheckNodeFreeMemory(self, instance.primary_node,
2919 "starting instance %s" % instance.name,
2920 bep[constants.BE_MEMORY], instance.hypervisor)
2922 def Exec(self, feedback_fn):
2923 """Start the instance.
2926 instance = self.instance
2927 force = self.op.force
2929 self.cfg.MarkInstanceUp(instance.name)
2931 node_current = instance.primary_node
2933 _StartInstanceDisks(self, instance, force)
2935 result = self.rpc.call_instance_start(node_current, instance,
2936 self.hvparams, self.beparams)
2937 msg = result.RemoteFailMsg()
2939 _ShutdownInstanceDisks(self, instance)
2940 raise errors.OpExecError("Could not start instance: %s" % msg)
2943 class LURebootInstance(LogicalUnit):
2944 """Reboot an instance.
2947 HPATH = "instance-reboot"
2948 HTYPE = constants.HTYPE_INSTANCE
2949 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2952 def ExpandNames(self):
2953 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2954 constants.INSTANCE_REBOOT_HARD,
2955 constants.INSTANCE_REBOOT_FULL]:
2956 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2957 (constants.INSTANCE_REBOOT_SOFT,
2958 constants.INSTANCE_REBOOT_HARD,
2959 constants.INSTANCE_REBOOT_FULL))
2960 self._ExpandAndLockInstance()
2962 def BuildHooksEnv(self):
2965 This runs on master, primary and secondary nodes of the instance.
2969 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2970 "REBOOT_TYPE": self.op.reboot_type,
2972 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2973 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2976 def CheckPrereq(self):
2977 """Check prerequisites.
2979 This checks that the instance is in the cluster.
2982 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2983 assert self.instance is not None, \
2984 "Cannot retrieve locked instance %s" % self.op.instance_name
2986 _CheckNodeOnline(self, instance.primary_node)
2988 # check bridges existance
2989 _CheckInstanceBridgesExist(self, instance)
2991 def Exec(self, feedback_fn):
2992 """Reboot the instance.
2995 instance = self.instance
2996 ignore_secondaries = self.op.ignore_secondaries
2997 reboot_type = self.op.reboot_type
2999 node_current = instance.primary_node
3001 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3002 constants.INSTANCE_REBOOT_HARD]:
3003 for disk in instance.disks:
3004 self.cfg.SetDiskID(disk, node_current)
3005 result = self.rpc.call_instance_reboot(node_current, instance,
3007 msg = result.RemoteFailMsg()
3009 raise errors.OpExecError("Could not reboot instance: %s" % msg)
3011 result = self.rpc.call_instance_shutdown(node_current, instance)
3012 msg = result.RemoteFailMsg()
3014 raise errors.OpExecError("Could not shutdown instance for"
3015 " full reboot: %s" % msg)
3016 _ShutdownInstanceDisks(self, instance)
3017 _StartInstanceDisks(self, instance, ignore_secondaries)
3018 result = self.rpc.call_instance_start(node_current, instance, None, None)
3019 msg = result.RemoteFailMsg()
3021 _ShutdownInstanceDisks(self, instance)
3022 raise errors.OpExecError("Could not start instance for"
3023 " full reboot: %s" % msg)
3025 self.cfg.MarkInstanceUp(instance.name)
3028 class LUShutdownInstance(LogicalUnit):
3029 """Shutdown an instance.
3032 HPATH = "instance-stop"
3033 HTYPE = constants.HTYPE_INSTANCE
3034 _OP_REQP = ["instance_name"]
3037 def ExpandNames(self):
3038 self._ExpandAndLockInstance()
3040 def BuildHooksEnv(self):
3043 This runs on master, primary and secondary nodes of the instance.
3046 env = _BuildInstanceHookEnvByObject(self, self.instance)
3047 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3050 def CheckPrereq(self):
3051 """Check prerequisites.
3053 This checks that the instance is in the cluster.
3056 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3057 assert self.instance is not None, \
3058 "Cannot retrieve locked instance %s" % self.op.instance_name
3059 _CheckNodeOnline(self, self.instance.primary_node)
3061 def Exec(self, feedback_fn):
3062 """Shutdown the instance.
3065 instance = self.instance
3066 node_current = instance.primary_node
3067 self.cfg.MarkInstanceDown(instance.name)
3068 result = self.rpc.call_instance_shutdown(node_current, instance)
3069 msg = result.RemoteFailMsg()
3071 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3073 _ShutdownInstanceDisks(self, instance)
3076 class LUReinstallInstance(LogicalUnit):
3077 """Reinstall an instance.
3080 HPATH = "instance-reinstall"
3081 HTYPE = constants.HTYPE_INSTANCE
3082 _OP_REQP = ["instance_name"]
3085 def ExpandNames(self):
3086 self._ExpandAndLockInstance()
3088 def BuildHooksEnv(self):
3091 This runs on master, primary and secondary nodes of the instance.
3094 env = _BuildInstanceHookEnvByObject(self, self.instance)
3095 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3098 def CheckPrereq(self):
3099 """Check prerequisites.
3101 This checks that the instance is in the cluster and is not running.
3104 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3105 assert instance is not None, \
3106 "Cannot retrieve locked instance %s" % self.op.instance_name
3107 _CheckNodeOnline(self, instance.primary_node)
3109 if instance.disk_template == constants.DT_DISKLESS:
3110 raise errors.OpPrereqError("Instance '%s' has no disks" %
3111 self.op.instance_name)
3112 if instance.admin_up:
3113 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3114 self.op.instance_name)
3115 remote_info = self.rpc.call_instance_info(instance.primary_node,
3117 instance.hypervisor)
3119 if remote_info.data:
3120 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3121 (self.op.instance_name,
3122 instance.primary_node))
3124 self.op.os_type = getattr(self.op, "os_type", None)
3125 if self.op.os_type is not None:
3127 pnode = self.cfg.GetNodeInfo(
3128 self.cfg.ExpandNodeName(instance.primary_node))
3130 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3132 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3134 if not isinstance(result.data, objects.OS):
3135 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3136 " primary node" % self.op.os_type)
3138 self.instance = instance
3140 def Exec(self, feedback_fn):
3141 """Reinstall the instance.
3144 inst = self.instance
3146 if self.op.os_type is not None:
3147 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3148 inst.os = self.op.os_type
3149 self.cfg.Update(inst)
3151 _StartInstanceDisks(self, inst, None)
3153 feedback_fn("Running the instance OS create scripts...")
3154 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3155 msg = result.RemoteFailMsg()
3157 raise errors.OpExecError("Could not install OS for instance %s"
3159 (inst.name, inst.primary_node, msg))
3161 _ShutdownInstanceDisks(self, inst)
3164 class LURenameInstance(LogicalUnit):
3165 """Rename an instance.
3168 HPATH = "instance-rename"
3169 HTYPE = constants.HTYPE_INSTANCE
3170 _OP_REQP = ["instance_name", "new_name"]
3172 def BuildHooksEnv(self):
3175 This runs on master, primary and secondary nodes of the instance.
3178 env = _BuildInstanceHookEnvByObject(self, self.instance)
3179 env["INSTANCE_NEW_NAME"] = self.op.new_name
3180 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3183 def CheckPrereq(self):
3184 """Check prerequisites.
3186 This checks that the instance is in the cluster and is not running.
3189 instance = self.cfg.GetInstanceInfo(
3190 self.cfg.ExpandInstanceName(self.op.instance_name))
3191 if instance is None:
3192 raise errors.OpPrereqError("Instance '%s' not known" %
3193 self.op.instance_name)
3194 _CheckNodeOnline(self, instance.primary_node)
3196 if instance.admin_up:
3197 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3198 self.op.instance_name)
3199 remote_info = self.rpc.call_instance_info(instance.primary_node,
3201 instance.hypervisor)
3203 if remote_info.data:
3204 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3205 (self.op.instance_name,
3206 instance.primary_node))
3207 self.instance = instance
3209 # new name verification
3210 name_info = utils.HostInfo(self.op.new_name)
3212 self.op.new_name = new_name = name_info.name
3213 instance_list = self.cfg.GetInstanceList()
3214 if new_name in instance_list:
3215 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3218 if not getattr(self.op, "ignore_ip", False):
3219 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3220 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3221 (name_info.ip, new_name))
3224 def Exec(self, feedback_fn):
3225 """Reinstall the instance.
3228 inst = self.instance
3229 old_name = inst.name
3231 if inst.disk_template == constants.DT_FILE:
3232 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3234 self.cfg.RenameInstance(inst.name, self.op.new_name)
3235 # Change the instance lock. This is definitely safe while we hold the BGL
3236 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3237 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3239 # re-read the instance from the configuration after rename
3240 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3242 if inst.disk_template == constants.DT_FILE:
3243 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3244 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3245 old_file_storage_dir,
3246 new_file_storage_dir)
3249 raise errors.OpExecError("Could not connect to node '%s' to rename"
3250 " directory '%s' to '%s' (but the instance"
3251 " has been renamed in Ganeti)" % (
3252 inst.primary_node, old_file_storage_dir,
3253 new_file_storage_dir))
3255 if not result.data[0]:
3256 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3257 " (but the instance has been renamed in"
3258 " Ganeti)" % (old_file_storage_dir,
3259 new_file_storage_dir))
3261 _StartInstanceDisks(self, inst, None)
3263 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3265 msg = result.RemoteFailMsg()
3267 msg = ("Could not run OS rename script for instance %s on node %s"
3268 " (but the instance has been renamed in Ganeti): %s" %
3269 (inst.name, inst.primary_node, msg))
3270 self.proc.LogWarning(msg)
3272 _ShutdownInstanceDisks(self, inst)
3275 class LURemoveInstance(LogicalUnit):
3276 """Remove an instance.
3279 HPATH = "instance-remove"
3280 HTYPE = constants.HTYPE_INSTANCE
3281 _OP_REQP = ["instance_name", "ignore_failures"]
3284 def ExpandNames(self):
3285 self._ExpandAndLockInstance()
3286 self.needed_locks[locking.LEVEL_NODE] = []
3287 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3289 def DeclareLocks(self, level):
3290 if level == locking.LEVEL_NODE:
3291 self._LockInstancesNodes()
3293 def BuildHooksEnv(self):
3296 This runs on master, primary and secondary nodes of the instance.
3299 env = _BuildInstanceHookEnvByObject(self, self.instance)
3300 nl = [self.cfg.GetMasterNode()]
3303 def CheckPrereq(self):
3304 """Check prerequisites.
3306 This checks that the instance is in the cluster.
3309 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3310 assert self.instance is not None, \
3311 "Cannot retrieve locked instance %s" % self.op.instance_name
3313 def Exec(self, feedback_fn):
3314 """Remove the instance.
3317 instance = self.instance
3318 logging.info("Shutting down instance %s on node %s",
3319 instance.name, instance.primary_node)
3321 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3322 msg = result.RemoteFailMsg()
3324 if self.op.ignore_failures:
3325 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3327 raise errors.OpExecError("Could not shutdown instance %s on"
3329 (instance.name, instance.primary_node, msg))
3331 logging.info("Removing block devices for instance %s", instance.name)
3333 if not _RemoveDisks(self, instance):
3334 if self.op.ignore_failures:
3335 feedback_fn("Warning: can't remove instance's disks")
3337 raise errors.OpExecError("Can't remove instance's disks")
3339 logging.info("Removing instance %s out of cluster config", instance.name)
3341 self.cfg.RemoveInstance(instance.name)
3342 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3345 class LUQueryInstances(NoHooksLU):
3346 """Logical unit for querying instances.
3349 _OP_REQP = ["output_fields", "names", "use_locking"]
3351 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3353 "disk_template", "ip", "mac", "bridge",
3354 "sda_size", "sdb_size", "vcpus", "tags",
3355 "network_port", "beparams",
3356 r"(disk)\.(size)/([0-9]+)",
3357 r"(disk)\.(sizes)", "disk_usage",
3358 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3359 r"(nic)\.(macs|ips|bridges)",
3360 r"(disk|nic)\.(count)",
3361 "serial_no", "hypervisor", "hvparams",] +
3363 for name in constants.HVS_PARAMETERS] +
3365 for name in constants.BES_PARAMETERS])
3366 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3369 def ExpandNames(self):
3370 _CheckOutputFields(static=self._FIELDS_STATIC,
3371 dynamic=self._FIELDS_DYNAMIC,
3372 selected=self.op.output_fields)
3374 self.needed_locks = {}
3375 self.share_locks[locking.LEVEL_INSTANCE] = 1
3376 self.share_locks[locking.LEVEL_NODE] = 1
3379 self.wanted = _GetWantedInstances(self, self.op.names)
3381 self.wanted = locking.ALL_SET
3383 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3384 self.do_locking = self.do_node_query and self.op.use_locking
3386 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3387 self.needed_locks[locking.LEVEL_NODE] = []
3388 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3390 def DeclareLocks(self, level):
3391 if level == locking.LEVEL_NODE and self.do_locking:
3392 self._LockInstancesNodes()
3394 def CheckPrereq(self):
3395 """Check prerequisites.
3400 def Exec(self, feedback_fn):
3401 """Computes the list of nodes and their attributes.
3404 all_info = self.cfg.GetAllInstancesInfo()
3405 if self.wanted == locking.ALL_SET:
3406 # caller didn't specify instance names, so ordering is not important
3408 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3410 instance_names = all_info.keys()
3411 instance_names = utils.NiceSort(instance_names)
3413 # caller did specify names, so we must keep the ordering
3415 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3417 tgt_set = all_info.keys()
3418 missing = set(self.wanted).difference(tgt_set)
3420 raise errors.OpExecError("Some instances were removed before"
3421 " retrieving their data: %s" % missing)
3422 instance_names = self.wanted
3424 instance_list = [all_info[iname] for iname in instance_names]
3426 # begin data gathering
3428 nodes = frozenset([inst.primary_node for inst in instance_list])
3429 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3433 if self.do_node_query:
3435 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3437 result = node_data[name]
3439 # offline nodes will be in both lists
3440 off_nodes.append(name)
3442 bad_nodes.append(name)
3445 live_data.update(result.data)
3446 # else no instance is alive
3448 live_data = dict([(name, {}) for name in instance_names])
3450 # end data gathering
3455 for instance in instance_list:
3457 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3458 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3459 for field in self.op.output_fields:
3460 st_match = self._FIELDS_STATIC.Matches(field)
3465 elif field == "pnode":
3466 val = instance.primary_node
3467 elif field == "snodes":
3468 val = list(instance.secondary_nodes)
3469 elif field == "admin_state":
3470 val = instance.admin_up
3471 elif field == "oper_state":
3472 if instance.primary_node in bad_nodes:
3475 val = bool(live_data.get(instance.name))
3476 elif field == "status":
3477 if instance.primary_node in off_nodes:
3478 val = "ERROR_nodeoffline"
3479 elif instance.primary_node in bad_nodes:
3480 val = "ERROR_nodedown"
3482 running = bool(live_data.get(instance.name))
3484 if instance.admin_up:
3489 if instance.admin_up:
3493 elif field == "oper_ram":
3494 if instance.primary_node in bad_nodes:
3496 elif instance.name in live_data:
3497 val = live_data[instance.name].get("memory", "?")
3500 elif field == "disk_template":
3501 val = instance.disk_template
3503 val = instance.nics[0].ip
3504 elif field == "bridge":
3505 val = instance.nics[0].bridge
3506 elif field == "mac":
3507 val = instance.nics[0].mac
3508 elif field == "sda_size" or field == "sdb_size":
3509 idx = ord(field[2]) - ord('a')
3511 val = instance.FindDisk(idx).size
3512 except errors.OpPrereqError:
3514 elif field == "disk_usage": # total disk usage per node
3515 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3516 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3517 elif field == "tags":
3518 val = list(instance.GetTags())
3519 elif field == "serial_no":
3520 val = instance.serial_no
3521 elif field == "network_port":
3522 val = instance.network_port
3523 elif field == "hypervisor":
3524 val = instance.hypervisor
3525 elif field == "hvparams":
3527 elif (field.startswith(HVPREFIX) and
3528 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3529 val = i_hv.get(field[len(HVPREFIX):], None)
3530 elif field == "beparams":
3532 elif (field.startswith(BEPREFIX) and
3533 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3534 val = i_be.get(field[len(BEPREFIX):], None)
3535 elif st_match and st_match.groups():
3536 # matches a variable list
3537 st_groups = st_match.groups()
3538 if st_groups and st_groups[0] == "disk":
3539 if st_groups[1] == "count":
3540 val = len(instance.disks)
3541 elif st_groups[1] == "sizes":
3542 val = [disk.size for disk in instance.disks]
3543 elif st_groups[1] == "size":
3545 val = instance.FindDisk(st_groups[2]).size
3546 except errors.OpPrereqError:
3549 assert False, "Unhandled disk parameter"
3550 elif st_groups[0] == "nic":
3551 if st_groups[1] == "count":
3552 val = len(instance.nics)
3553 elif st_groups[1] == "macs":
3554 val = [nic.mac for nic in instance.nics]
3555 elif st_groups[1] == "ips":
3556 val = [nic.ip for nic in instance.nics]
3557 elif st_groups[1] == "bridges":
3558 val = [nic.bridge for nic in instance.nics]
3561 nic_idx = int(st_groups[2])
3562 if nic_idx >= len(instance.nics):
3565 if st_groups[1] == "mac":
3566 val = instance.nics[nic_idx].mac
3567 elif st_groups[1] == "ip":
3568 val = instance.nics[nic_idx].ip
3569 elif st_groups[1] == "bridge":
3570 val = instance.nics[nic_idx].bridge
3572 assert False, "Unhandled NIC parameter"
3574 assert False, "Unhandled variable parameter"
3576 raise errors.ParameterError(field)
3583 class LUFailoverInstance(LogicalUnit):
3584 """Failover an instance.
3587 HPATH = "instance-failover"
3588 HTYPE = constants.HTYPE_INSTANCE
3589 _OP_REQP = ["instance_name", "ignore_consistency"]
3592 def ExpandNames(self):
3593 self._ExpandAndLockInstance()
3594 self.needed_locks[locking.LEVEL_NODE] = []
3595 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3597 def DeclareLocks(self, level):
3598 if level == locking.LEVEL_NODE:
3599 self._LockInstancesNodes()
3601 def BuildHooksEnv(self):
3604 This runs on master, primary and secondary nodes of the instance.
3608 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3610 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3611 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3614 def CheckPrereq(self):
3615 """Check prerequisites.
3617 This checks that the instance is in the cluster.
3620 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3621 assert self.instance is not None, \
3622 "Cannot retrieve locked instance %s" % self.op.instance_name
3624 bep = self.cfg.GetClusterInfo().FillBE(instance)
3625 if instance.disk_template not in constants.DTS_NET_MIRROR:
3626 raise errors.OpPrereqError("Instance's disk layout is not"
3627 " network mirrored, cannot failover.")
3629 secondary_nodes = instance.secondary_nodes
3630 if not secondary_nodes:
3631 raise errors.ProgrammerError("no secondary node but using "
3632 "a mirrored disk template")
3634 target_node = secondary_nodes[0]
3635 _CheckNodeOnline(self, target_node)
3636 _CheckNodeNotDrained(self, target_node)
3637 # check memory requirements on the secondary node
3638 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3639 instance.name, bep[constants.BE_MEMORY],
3640 instance.hypervisor)
3641 # check bridge existance
3642 _CheckInstanceBridgesExist(self, instance, node=target_node)
3644 def Exec(self, feedback_fn):
3645 """Failover an instance.
3647 The failover is done by shutting it down on its present node and
3648 starting it on the secondary.
3651 instance = self.instance
3653 source_node = instance.primary_node
3654 target_node = instance.secondary_nodes[0]
3656 feedback_fn("* checking disk consistency between source and target")
3657 for dev in instance.disks:
3658 # for drbd, these are drbd over lvm
3659 if not _CheckDiskConsistency(self, dev, target_node, False):
3660 if instance.admin_up and not self.op.ignore_consistency:
3661 raise errors.OpExecError("Disk %s is degraded on target node,"
3662 " aborting failover." % dev.iv_name)
3664 feedback_fn("* shutting down instance on source node")
3665 logging.info("Shutting down instance %s on node %s",
3666 instance.name, source_node)
3668 result = self.rpc.call_instance_shutdown(source_node, instance)
3669 msg = result.RemoteFailMsg()
3671 if self.op.ignore_consistency:
3672 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3673 " Proceeding anyway. Please make sure node"
3674 " %s is down. Error details: %s",
3675 instance.name, source_node, source_node, msg)
3677 raise errors.OpExecError("Could not shutdown instance %s on"
3679 (instance.name, source_node, msg))
3681 feedback_fn("* deactivating the instance's disks on source node")
3682 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3683 raise errors.OpExecError("Can't shut down the instance's disks.")
3685 instance.primary_node = target_node
3686 # distribute new instance config to the other nodes
3687 self.cfg.Update(instance)
3689 # Only start the instance if it's marked as up
3690 if instance.admin_up:
3691 feedback_fn("* activating the instance's disks on target node")
3692 logging.info("Starting instance %s on node %s",
3693 instance.name, target_node)
3695 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3696 ignore_secondaries=True)
3698 _ShutdownInstanceDisks(self, instance)
3699 raise errors.OpExecError("Can't activate the instance's disks")
3701 feedback_fn("* starting the instance on the target node")
3702 result = self.rpc.call_instance_start(target_node, instance, None, None)
3703 msg = result.RemoteFailMsg()
3705 _ShutdownInstanceDisks(self, instance)
3706 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3707 (instance.name, target_node, msg))
3710 class LUMigrateInstance(LogicalUnit):
3711 """Migrate an instance.
3713 This is migration without shutting down, compared to the failover,
3714 which is done with shutdown.
3717 HPATH = "instance-migrate"
3718 HTYPE = constants.HTYPE_INSTANCE
3719 _OP_REQP = ["instance_name", "live", "cleanup"]
3723 def ExpandNames(self):
3724 self._ExpandAndLockInstance()
3725 self.needed_locks[locking.LEVEL_NODE] = []
3726 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3728 def DeclareLocks(self, level):
3729 if level == locking.LEVEL_NODE:
3730 self._LockInstancesNodes()
3732 def BuildHooksEnv(self):
3735 This runs on master, primary and secondary nodes of the instance.
3738 env = _BuildInstanceHookEnvByObject(self, self.instance)
3739 env["MIGRATE_LIVE"] = self.op.live
3740 env["MIGRATE_CLEANUP"] = self.op.cleanup
3741 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3744 def CheckPrereq(self):
3745 """Check prerequisites.
3747 This checks that the instance is in the cluster.
3750 instance = self.cfg.GetInstanceInfo(
3751 self.cfg.ExpandInstanceName(self.op.instance_name))
3752 if instance is None:
3753 raise errors.OpPrereqError("Instance '%s' not known" %
3754 self.op.instance_name)
3756 if instance.disk_template != constants.DT_DRBD8:
3757 raise errors.OpPrereqError("Instance's disk layout is not"
3758 " drbd8, cannot migrate.")
3760 secondary_nodes = instance.secondary_nodes
3761 if not secondary_nodes:
3762 raise errors.ConfigurationError("No secondary node but using"
3763 " drbd8 disk template")
3765 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3767 target_node = secondary_nodes[0]
3768 # check memory requirements on the secondary node
3769 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3770 instance.name, i_be[constants.BE_MEMORY],
3771 instance.hypervisor)
3773 # check bridge existance
3774 _CheckInstanceBridgesExist(self, instance, node=target_node)
3776 if not self.op.cleanup:
3777 _CheckNodeNotDrained(self, target_node)
3778 result = self.rpc.call_instance_migratable(instance.primary_node,
3780 msg = result.RemoteFailMsg()
3782 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3785 self.instance = instance
3787 def _WaitUntilSync(self):
3788 """Poll with custom rpc for disk sync.
3790 This uses our own step-based rpc call.
3793 self.feedback_fn("* wait until resync is done")
3797 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3799 self.instance.disks)
3801 for node, nres in result.items():
3802 msg = nres.RemoteFailMsg()
3804 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3806 node_done, node_percent = nres.payload
3807 all_done = all_done and node_done
3808 if node_percent is not None:
3809 min_percent = min(min_percent, node_percent)
3811 if min_percent < 100:
3812 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3815 def _EnsureSecondary(self, node):
3816 """Demote a node to secondary.
3819 self.feedback_fn("* switching node %s to secondary mode" % node)
3821 for dev in self.instance.disks:
3822 self.cfg.SetDiskID(dev, node)
3824 result = self.rpc.call_blockdev_close(node, self.instance.name,
3825 self.instance.disks)
3826 msg = result.RemoteFailMsg()
3828 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3829 " error %s" % (node, msg))
3831 def _GoStandalone(self):
3832 """Disconnect from the network.
3835 self.feedback_fn("* changing into standalone mode")
3836 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3837 self.instance.disks)
3838 for node, nres in result.items():
3839 msg = nres.RemoteFailMsg()
3841 raise errors.OpExecError("Cannot disconnect disks node %s,"
3842 " error %s" % (node, msg))
3844 def _GoReconnect(self, multimaster):
3845 """Reconnect to the network.
3851 msg = "single-master"
3852 self.feedback_fn("* changing disks into %s mode" % msg)
3853 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3854 self.instance.disks,
3855 self.instance.name, multimaster)
3856 for node, nres in result.items():
3857 msg = nres.RemoteFailMsg()
3859 raise errors.OpExecError("Cannot change disks config on node %s,"
3860 " error: %s" % (node, msg))
3862 def _ExecCleanup(self):
3863 """Try to cleanup after a failed migration.
3865 The cleanup is done by:
3866 - check that the instance is running only on one node
3867 (and update the config if needed)
3868 - change disks on its secondary node to secondary
3869 - wait until disks are fully synchronized
3870 - disconnect from the network
3871 - change disks into single-master mode
3872 - wait again until disks are fully synchronized
3875 instance = self.instance
3876 target_node = self.target_node
3877 source_node = self.source_node
3879 # check running on only one node
3880 self.feedback_fn("* checking where the instance actually runs"
3881 " (if this hangs, the hypervisor might be in"
3883 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3884 for node, result in ins_l.items():
3886 if not isinstance(result.data, list):
3887 raise errors.OpExecError("Can't contact node '%s'" % node)
3889 runningon_source = instance.name in ins_l[source_node].data
3890 runningon_target = instance.name in ins_l[target_node].data
3892 if runningon_source and runningon_target:
3893 raise errors.OpExecError("Instance seems to be running on two nodes,"
3894 " or the hypervisor is confused. You will have"
3895 " to ensure manually that it runs only on one"
3896 " and restart this operation.")
3898 if not (runningon_source or runningon_target):
3899 raise errors.OpExecError("Instance does not seem to be running at all."
3900 " In this case, it's safer to repair by"
3901 " running 'gnt-instance stop' to ensure disk"
3902 " shutdown, and then restarting it.")
3904 if runningon_target:
3905 # the migration has actually succeeded, we need to update the config
3906 self.feedback_fn("* instance running on secondary node (%s),"
3907 " updating config" % target_node)
3908 instance.primary_node = target_node
3909 self.cfg.Update(instance)
3910 demoted_node = source_node
3912 self.feedback_fn("* instance confirmed to be running on its"
3913 " primary node (%s)" % source_node)
3914 demoted_node = target_node
3916 self._EnsureSecondary(demoted_node)
3918 self._WaitUntilSync()
3919 except errors.OpExecError:
3920 # we ignore here errors, since if the device is standalone, it
3921 # won't be able to sync
3923 self._GoStandalone()
3924 self._GoReconnect(False)
3925 self._WaitUntilSync()
3927 self.feedback_fn("* done")
3929 def _RevertDiskStatus(self):
3930 """Try to revert the disk status after a failed migration.
3933 target_node = self.target_node
3935 self._EnsureSecondary(target_node)
3936 self._GoStandalone()
3937 self._GoReconnect(False)
3938 self._WaitUntilSync()
3939 except errors.OpExecError, err:
3940 self.LogWarning("Migration failed and I can't reconnect the"
3941 " drives: error '%s'\n"
3942 "Please look and recover the instance status" %
3945 def _AbortMigration(self):
3946 """Call the hypervisor code to abort a started migration.
3949 instance = self.instance
3950 target_node = self.target_node
3951 migration_info = self.migration_info
3953 abort_result = self.rpc.call_finalize_migration(target_node,
3957 abort_msg = abort_result.RemoteFailMsg()
3959 logging.error("Aborting migration failed on target node %s: %s" %
3960 (target_node, abort_msg))
3961 # Don't raise an exception here, as we stil have to try to revert the
3962 # disk status, even if this step failed.
3964 def _ExecMigration(self):
3965 """Migrate an instance.
3967 The migrate is done by:
3968 - change the disks into dual-master mode
3969 - wait until disks are fully synchronized again
3970 - migrate the instance
3971 - change disks on the new secondary node (the old primary) to secondary
3972 - wait until disks are fully synchronized
3973 - change disks into single-master mode
3976 instance = self.instance
3977 target_node = self.target_node
3978 source_node = self.source_node
3980 self.feedback_fn("* checking disk consistency between source and target")
3981 for dev in instance.disks:
3982 if not _CheckDiskConsistency(self, dev, target_node, False):
3983 raise errors.OpExecError("Disk %s is degraded or not fully"
3984 " synchronized on target node,"
3985 " aborting migrate." % dev.iv_name)
3987 # First get the migration information from the remote node
3988 result = self.rpc.call_migration_info(source_node, instance)
3989 msg = result.RemoteFailMsg()
3991 log_err = ("Failed fetching source migration information from %s: %s" %
3993 logging.error(log_err)
3994 raise errors.OpExecError(log_err)
3996 self.migration_info = migration_info = result.payload
3998 # Then switch the disks to master/master mode
3999 self._EnsureSecondary(target_node)
4000 self._GoStandalone()
4001 self._GoReconnect(True)
4002 self._WaitUntilSync()
4004 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4005 result = self.rpc.call_accept_instance(target_node,
4008 self.nodes_ip[target_node])
4010 msg = result.RemoteFailMsg()
4012 logging.error("Instance pre-migration failed, trying to revert"
4013 " disk status: %s", msg)
4014 self._AbortMigration()
4015 self._RevertDiskStatus()
4016 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4017 (instance.name, msg))
4019 self.feedback_fn("* migrating instance to %s" % target_node)
4021 result = self.rpc.call_instance_migrate(source_node, instance,
4022 self.nodes_ip[target_node],
4024 msg = result.RemoteFailMsg()
4026 logging.error("Instance migration failed, trying to revert"
4027 " disk status: %s", msg)
4028 self._AbortMigration()
4029 self._RevertDiskStatus()
4030 raise errors.OpExecError("Could not migrate instance %s: %s" %
4031 (instance.name, msg))
4034 instance.primary_node = target_node
4035 # distribute new instance config to the other nodes
4036 self.cfg.Update(instance)
4038 result = self.rpc.call_finalize_migration(target_node,
4042 msg = result.RemoteFailMsg()
4044 logging.error("Instance migration succeeded, but finalization failed:"
4046 raise errors.OpExecError("Could not finalize instance migration: %s" %
4049 self._EnsureSecondary(source_node)
4050 self._WaitUntilSync()
4051 self._GoStandalone()
4052 self._GoReconnect(False)
4053 self._WaitUntilSync()
4055 self.feedback_fn("* done")
4057 def Exec(self, feedback_fn):
4058 """Perform the migration.
4061 self.feedback_fn = feedback_fn
4063 self.source_node = self.instance.primary_node
4064 self.target_node = self.instance.secondary_nodes[0]
4065 self.all_nodes = [self.source_node, self.target_node]
4067 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4068 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4071 return self._ExecCleanup()
4073 return self._ExecMigration()
4076 def _CreateBlockDev(lu, node, instance, device, force_create,
4078 """Create a tree of block devices on a given node.
4080 If this device type has to be created on secondaries, create it and
4083 If not, just recurse to children keeping the same 'force' value.
4085 @param lu: the lu on whose behalf we execute
4086 @param node: the node on which to create the device
4087 @type instance: L{objects.Instance}
4088 @param instance: the instance which owns the device
4089 @type device: L{objects.Disk}
4090 @param device: the device to create
4091 @type force_create: boolean
4092 @param force_create: whether to force creation of this device; this
4093 will be change to True whenever we find a device which has
4094 CreateOnSecondary() attribute
4095 @param info: the extra 'metadata' we should attach to the device
4096 (this will be represented as a LVM tag)
4097 @type force_open: boolean
4098 @param force_open: this parameter will be passes to the
4099 L{backend.BlockdevCreate} function where it specifies
4100 whether we run on primary or not, and it affects both
4101 the child assembly and the device own Open() execution
4104 if device.CreateOnSecondary():
4108 for child in device.children:
4109 _CreateBlockDev(lu, node, instance, child, force_create,
4112 if not force_create:
4115 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4118 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4119 """Create a single block device on a given node.
4121 This will not recurse over children of the device, so they must be
4124 @param lu: the lu on whose behalf we execute
4125 @param node: the node on which to create the device
4126 @type instance: L{objects.Instance}
4127 @param instance: the instance which owns the device
4128 @type device: L{objects.Disk}
4129 @param device: the device to create
4130 @param info: the extra 'metadata' we should attach to the device
4131 (this will be represented as a LVM tag)
4132 @type force_open: boolean
4133 @param force_open: this parameter will be passes to the
4134 L{backend.BlockdevCreate} function where it specifies
4135 whether we run on primary or not, and it affects both
4136 the child assembly and the device own Open() execution
4139 lu.cfg.SetDiskID(device, node)
4140 result = lu.rpc.call_blockdev_create(node, device, device.size,
4141 instance.name, force_open, info)
4142 msg = result.RemoteFailMsg()
4144 raise errors.OpExecError("Can't create block device %s on"
4145 " node %s for instance %s: %s" %
4146 (device, node, instance.name, msg))
4147 if device.physical_id is None:
4148 device.physical_id = result.payload
4151 def _GenerateUniqueNames(lu, exts):
4152 """Generate a suitable LV name.
4154 This will generate a logical volume name for the given instance.
4159 new_id = lu.cfg.GenerateUniqueID()
4160 results.append("%s%s" % (new_id, val))
4164 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4166 """Generate a drbd8 device complete with its children.
4169 port = lu.cfg.AllocatePort()
4170 vgname = lu.cfg.GetVGName()
4171 shared_secret = lu.cfg.GenerateDRBDSecret()
4172 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4173 logical_id=(vgname, names[0]))
4174 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4175 logical_id=(vgname, names[1]))
4176 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4177 logical_id=(primary, secondary, port,
4180 children=[dev_data, dev_meta],
4185 def _GenerateDiskTemplate(lu, template_name,
4186 instance_name, primary_node,
4187 secondary_nodes, disk_info,
4188 file_storage_dir, file_driver,
4190 """Generate the entire disk layout for a given template type.
4193 #TODO: compute space requirements
4195 vgname = lu.cfg.GetVGName()
4196 disk_count = len(disk_info)
4198 if template_name == constants.DT_DISKLESS:
4200 elif template_name == constants.DT_PLAIN:
4201 if len(secondary_nodes) != 0:
4202 raise errors.ProgrammerError("Wrong template configuration")
4204 names = _GenerateUniqueNames(lu, [".disk%d" % i
4205 for i in range(disk_count)])
4206 for idx, disk in enumerate(disk_info):
4207 disk_index = idx + base_index
4208 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4209 logical_id=(vgname, names[idx]),
4210 iv_name="disk/%d" % disk_index,
4212 disks.append(disk_dev)
4213 elif template_name == constants.DT_DRBD8:
4214 if len(secondary_nodes) != 1:
4215 raise errors.ProgrammerError("Wrong template configuration")
4216 remote_node = secondary_nodes[0]
4217 minors = lu.cfg.AllocateDRBDMinor(
4218 [primary_node, remote_node] * len(disk_info), instance_name)
4221 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4222 for i in range(disk_count)]):
4223 names.append(lv_prefix + "_data")
4224 names.append(lv_prefix + "_meta")
4225 for idx, disk in enumerate(disk_info):
4226 disk_index = idx + base_index
4227 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4228 disk["size"], names[idx*2:idx*2+2],
4229 "disk/%d" % disk_index,
4230 minors[idx*2], minors[idx*2+1])
4231 disk_dev.mode = disk["mode"]
4232 disks.append(disk_dev)
4233 elif template_name == constants.DT_FILE:
4234 if len(secondary_nodes) != 0:
4235 raise errors.ProgrammerError("Wrong template configuration")
4237 for idx, disk in enumerate(disk_info):
4238 disk_index = idx + base_index
4239 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4240 iv_name="disk/%d" % disk_index,
4241 logical_id=(file_driver,
4242 "%s/disk%d" % (file_storage_dir,
4245 disks.append(disk_dev)
4247 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4251 def _GetInstanceInfoText(instance):
4252 """Compute that text that should be added to the disk's metadata.
4255 return "originstname+%s" % instance.name
4258 def _CreateDisks(lu, instance):
4259 """Create all disks for an instance.
4261 This abstracts away some work from AddInstance.
4263 @type lu: L{LogicalUnit}
4264 @param lu: the logical unit on whose behalf we execute
4265 @type instance: L{objects.Instance}
4266 @param instance: the instance whose disks we should create
4268 @return: the success of the creation
4271 info = _GetInstanceInfoText(instance)
4272 pnode = instance.primary_node
4274 if instance.disk_template == constants.DT_FILE:
4275 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4276 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4278 if result.failed or not result.data:
4279 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4281 if not result.data[0]:
4282 raise errors.OpExecError("Failed to create directory '%s'" %
4285 # Note: this needs to be kept in sync with adding of disks in
4286 # LUSetInstanceParams
4287 for device in instance.disks:
4288 logging.info("Creating volume %s for instance %s",
4289 device.iv_name, instance.name)
4291 for node in instance.all_nodes:
4292 f_create = node == pnode
4293 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4296 def _RemoveDisks(lu, instance):
4297 """Remove all disks for an instance.
4299 This abstracts away some work from `AddInstance()` and
4300 `RemoveInstance()`. Note that in case some of the devices couldn't
4301 be removed, the removal will continue with the other ones (compare
4302 with `_CreateDisks()`).
4304 @type lu: L{LogicalUnit}
4305 @param lu: the logical unit on whose behalf we execute
4306 @type instance: L{objects.Instance}
4307 @param instance: the instance whose disks we should remove
4309 @return: the success of the removal
4312 logging.info("Removing block devices for instance %s", instance.name)
4315 for device in instance.disks:
4316 for node, disk in device.ComputeNodeTree(instance.primary_node):
4317 lu.cfg.SetDiskID(disk, node)
4318 msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4320 lu.LogWarning("Could not remove block device %s on node %s,"
4321 " continuing anyway: %s", device.iv_name, node, msg)
4324 if instance.disk_template == constants.DT_FILE:
4325 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4326 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4328 if result.failed or not result.data:
4329 logging.error("Could not remove directory '%s'", file_storage_dir)
4335 def _ComputeDiskSize(disk_template, disks):
4336 """Compute disk size requirements in the volume group
4339 # Required free disk space as a function of disk and swap space
4341 constants.DT_DISKLESS: None,
4342 constants.DT_PLAIN: sum(d["size"] for d in disks),
4343 # 128 MB are added for drbd metadata for each disk
4344 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4345 constants.DT_FILE: None,
4348 if disk_template not in req_size_dict:
4349 raise errors.ProgrammerError("Disk template '%s' size requirement"
4350 " is unknown" % disk_template)
4352 return req_size_dict[disk_template]
4355 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4356 """Hypervisor parameter validation.
4358 This function abstract the hypervisor parameter validation to be
4359 used in both instance create and instance modify.
4361 @type lu: L{LogicalUnit}
4362 @param lu: the logical unit for which we check
4363 @type nodenames: list
4364 @param nodenames: the list of nodes on which we should check
4365 @type hvname: string
4366 @param hvname: the name of the hypervisor we should use
4367 @type hvparams: dict
4368 @param hvparams: the parameters which we need to check
4369 @raise errors.OpPrereqError: if the parameters are not valid
4372 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4375 for node in nodenames:
4379 msg = info.RemoteFailMsg()
4381 raise errors.OpPrereqError("Hypervisor parameter validation"
4382 " failed on node %s: %s" % (node, msg))
4385 class LUCreateInstance(LogicalUnit):
4386 """Create an instance.
4389 HPATH = "instance-add"
4390 HTYPE = constants.HTYPE_INSTANCE
4391 _OP_REQP = ["instance_name", "disks", "disk_template",
4393 "wait_for_sync", "ip_check", "nics",
4394 "hvparams", "beparams"]
4397 def _ExpandNode(self, node):
4398 """Expands and checks one node name.
4401 node_full = self.cfg.ExpandNodeName(node)
4402 if node_full is None:
4403 raise errors.OpPrereqError("Unknown node %s" % node)
4406 def ExpandNames(self):
4407 """ExpandNames for CreateInstance.
4409 Figure out the right locks for instance creation.
4412 self.needed_locks = {}
4414 # set optional parameters to none if they don't exist
4415 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4416 if not hasattr(self.op, attr):
4417 setattr(self.op, attr, None)
4419 # cheap checks, mostly valid constants given
4421 # verify creation mode
4422 if self.op.mode not in (constants.INSTANCE_CREATE,
4423 constants.INSTANCE_IMPORT):
4424 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4427 # disk template and mirror node verification
4428 if self.op.disk_template not in constants.DISK_TEMPLATES:
4429 raise errors.OpPrereqError("Invalid disk template name")
4431 if self.op.hypervisor is None:
4432 self.op.hypervisor = self.cfg.GetHypervisorType()
4434 cluster = self.cfg.GetClusterInfo()
4435 enabled_hvs = cluster.enabled_hypervisors
4436 if self.op.hypervisor not in enabled_hvs:
4437 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4438 " cluster (%s)" % (self.op.hypervisor,
4439 ",".join(enabled_hvs)))
4441 # check hypervisor parameter syntax (locally)
4442 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4443 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4445 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4446 hv_type.CheckParameterSyntax(filled_hvp)
4448 # fill and remember the beparams dict
4449 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4450 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4453 #### instance parameters check
4455 # instance name verification
4456 hostname1 = utils.HostInfo(self.op.instance_name)
4457 self.op.instance_name = instance_name = hostname1.name
4459 # this is just a preventive check, but someone might still add this
4460 # instance in the meantime, and creation will fail at lock-add time
4461 if instance_name in self.cfg.GetInstanceList():
4462 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4465 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4469 for idx, nic in enumerate(self.op.nics):
4470 nic_mode_req = nic.get("mode", None)
4471 nic_mode = nic_mode_req
4472 if nic_mode is None:
4473 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4475 # in routed mode, for the first nic, the default ip is 'auto'
4476 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4477 default_ip_mode = constants.VALUE_AUTO
4479 default_ip_mode = constants.VALUE_NONE
4481 # ip validity checks
4482 ip = nic.get("ip", default_ip_mode)
4483 if ip is None or ip.lower() == constants.VALUE_NONE:
4485 elif ip.lower() == constants.VALUE_AUTO:
4486 nic_ip = hostname1.ip
4488 if not utils.IsValidIP(ip):
4489 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4490 " like a valid IP" % ip)
4493 # TODO: check the ip for uniqueness !!
4494 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4495 raise errors.OpPrereqError("Routed nic mode requires an ip address")
4497 # MAC address verification
4498 mac = nic.get("mac", constants.VALUE_AUTO)
4499 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4500 if not utils.IsValidMac(mac.lower()):
4501 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4503 # bridge verification
4504 bridge = nic.get("bridge", None)
4505 link = nic.get("link", None)
4507 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4508 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4509 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4515 nicparams[constants.NIC_MODE] = nic_mode_req
4517 nicparams[constants.NIC_LINK] = link
4519 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4521 objects.NIC.CheckParameterSyntax(check_params)
4522 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4524 # disk checks/pre-build
4526 for disk in self.op.disks:
4527 mode = disk.get("mode", constants.DISK_RDWR)
4528 if mode not in constants.DISK_ACCESS_SET:
4529 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4531 size = disk.get("size", None)
4533 raise errors.OpPrereqError("Missing disk size")
4537 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4538 self.disks.append({"size": size, "mode": mode})
4540 # used in CheckPrereq for ip ping check
4541 self.check_ip = hostname1.ip
4543 # file storage checks
4544 if (self.op.file_driver and
4545 not self.op.file_driver in constants.FILE_DRIVER):
4546 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4547 self.op.file_driver)
4549 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4550 raise errors.OpPrereqError("File storage directory path not absolute")
4552 ### Node/iallocator related checks
4553 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4554 raise errors.OpPrereqError("One and only one of iallocator and primary"
4555 " node must be given")
4557 if self.op.iallocator:
4558 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4560 self.op.pnode = self._ExpandNode(self.op.pnode)
4561 nodelist = [self.op.pnode]
4562 if self.op.snode is not None:
4563 self.op.snode = self._ExpandNode(self.op.snode)
4564 nodelist.append(self.op.snode)
4565 self.needed_locks[locking.LEVEL_NODE] = nodelist
4567 # in case of import lock the source node too
4568 if self.op.mode == constants.INSTANCE_IMPORT:
4569 src_node = getattr(self.op, "src_node", None)
4570 src_path = getattr(self.op, "src_path", None)
4572 if src_path is None:
4573 self.op.src_path = src_path = self.op.instance_name
4575 if src_node is None:
4576 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4577 self.op.src_node = None
4578 if os.path.isabs(src_path):
4579 raise errors.OpPrereqError("Importing an instance from an absolute"
4580 " path requires a source node option.")
4582 self.op.src_node = src_node = self._ExpandNode(src_node)
4583 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4584 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4585 if not os.path.isabs(src_path):
4586 self.op.src_path = src_path = \
4587 os.path.join(constants.EXPORT_DIR, src_path)
4589 else: # INSTANCE_CREATE
4590 if getattr(self.op, "os_type", None) is None:
4591 raise errors.OpPrereqError("No guest OS specified")
4593 def _RunAllocator(self):
4594 """Run the allocator based on input opcode.
4597 nics = [n.ToDict() for n in self.nics]
4598 ial = IAllocator(self,
4599 mode=constants.IALLOCATOR_MODE_ALLOC,
4600 name=self.op.instance_name,
4601 disk_template=self.op.disk_template,
4604 vcpus=self.be_full[constants.BE_VCPUS],
4605 mem_size=self.be_full[constants.BE_MEMORY],
4608 hypervisor=self.op.hypervisor,
4611 ial.Run(self.op.iallocator)
4614 raise errors.OpPrereqError("Can't compute nodes using"
4615 " iallocator '%s': %s" % (self.op.iallocator,
4617 if len(ial.nodes) != ial.required_nodes:
4618 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4619 " of nodes (%s), required %s" %
4620 (self.op.iallocator, len(ial.nodes),
4621 ial.required_nodes))
4622 self.op.pnode = ial.nodes[0]
4623 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4624 self.op.instance_name, self.op.iallocator,
4625 ", ".join(ial.nodes))
4626 if ial.required_nodes == 2:
4627 self.op.snode = ial.nodes[1]
4629 def BuildHooksEnv(self):
4632 This runs on master, primary and secondary nodes of the instance.
4636 "ADD_MODE": self.op.mode,
4638 if self.op.mode == constants.INSTANCE_IMPORT:
4639 env["SRC_NODE"] = self.op.src_node
4640 env["SRC_PATH"] = self.op.src_path
4641 env["SRC_IMAGES"] = self.src_images
4643 env.update(_BuildInstanceHookEnv(
4644 name=self.op.instance_name,
4645 primary_node=self.op.pnode,
4646 secondary_nodes=self.secondaries,
4647 status=self.op.start,
4648 os_type=self.op.os_type,
4649 memory=self.be_full[constants.BE_MEMORY],
4650 vcpus=self.be_full[constants.BE_VCPUS],
4651 nics=_PreBuildNICHooksList(self, self.nics),
4652 disk_template=self.op.disk_template,
4653 disks=[(d["size"], d["mode"]) for d in self.disks],
4656 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4661 def CheckPrereq(self):
4662 """Check prerequisites.
4665 if (not self.cfg.GetVGName() and
4666 self.op.disk_template not in constants.DTS_NOT_LVM):
4667 raise errors.OpPrereqError("Cluster does not support lvm-based"
4670 if self.op.mode == constants.INSTANCE_IMPORT:
4671 src_node = self.op.src_node
4672 src_path = self.op.src_path
4674 if src_node is None:
4675 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4676 exp_list = self.rpc.call_export_list(locked_nodes)
4678 for node in exp_list:
4679 if exp_list[node].RemoteFailMsg():
4681 if src_path in exp_list[node].payload:
4683 self.op.src_node = src_node = node
4684 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4688 raise errors.OpPrereqError("No export found for relative path %s" %
4691 _CheckNodeOnline(self, src_node)
4692 result = self.rpc.call_export_info(src_node, src_path)
4693 msg = result.RemoteFailMsg()
4695 raise errors.OpPrereqError("No export or invalid export found in"
4696 " dir %s: %s" % (src_path, msg))
4698 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4699 if not export_info.has_section(constants.INISECT_EXP):
4700 raise errors.ProgrammerError("Corrupted export config")
4702 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4703 if (int(ei_version) != constants.EXPORT_VERSION):
4704 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4705 (ei_version, constants.EXPORT_VERSION))
4707 # Check that the new instance doesn't have less disks than the export
4708 instance_disks = len(self.disks)
4709 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4710 if instance_disks < export_disks:
4711 raise errors.OpPrereqError("Not enough disks to import."
4712 " (instance: %d, export: %d)" %
4713 (instance_disks, export_disks))
4715 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4717 for idx in range(export_disks):
4718 option = 'disk%d_dump' % idx
4719 if export_info.has_option(constants.INISECT_INS, option):
4720 # FIXME: are the old os-es, disk sizes, etc. useful?
4721 export_name = export_info.get(constants.INISECT_INS, option)
4722 image = os.path.join(src_path, export_name)
4723 disk_images.append(image)
4725 disk_images.append(False)
4727 self.src_images = disk_images
4729 old_name = export_info.get(constants.INISECT_INS, 'name')
4730 # FIXME: int() here could throw a ValueError on broken exports
4731 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4732 if self.op.instance_name == old_name:
4733 for idx, nic in enumerate(self.nics):
4734 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4735 nic_mac_ini = 'nic%d_mac' % idx
4736 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4738 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4739 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4740 if self.op.start and not self.op.ip_check:
4741 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4742 " adding an instance in start mode")
4744 if self.op.ip_check:
4745 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4746 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4747 (self.check_ip, self.op.instance_name))
4749 #### mac address generation
4750 # By generating here the mac address both the allocator and the hooks get
4751 # the real final mac address rather than the 'auto' or 'generate' value.
4752 # There is a race condition between the generation and the instance object
4753 # creation, which means that we know the mac is valid now, but we're not
4754 # sure it will be when we actually add the instance. If things go bad
4755 # adding the instance will abort because of a duplicate mac, and the
4756 # creation job will fail.
4757 for nic in self.nics:
4758 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4759 nic.mac = self.cfg.GenerateMAC()
4763 if self.op.iallocator is not None:
4764 self._RunAllocator()
4766 #### node related checks
4768 # check primary node
4769 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4770 assert self.pnode is not None, \
4771 "Cannot retrieve locked node %s" % self.op.pnode
4773 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4776 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4779 self.secondaries = []
4781 # mirror node verification
4782 if self.op.disk_template in constants.DTS_NET_MIRROR:
4783 if self.op.snode is None:
4784 raise errors.OpPrereqError("The networked disk templates need"
4786 if self.op.snode == pnode.name:
4787 raise errors.OpPrereqError("The secondary node cannot be"
4788 " the primary node.")
4789 _CheckNodeOnline(self, self.op.snode)
4790 _CheckNodeNotDrained(self, self.op.snode)
4791 self.secondaries.append(self.op.snode)
4793 nodenames = [pnode.name] + self.secondaries
4795 req_size = _ComputeDiskSize(self.op.disk_template,
4798 # Check lv size requirements
4799 if req_size is not None:
4800 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4802 for node in nodenames:
4803 info = nodeinfo[node]
4807 raise errors.OpPrereqError("Cannot get current information"
4808 " from node '%s'" % node)
4809 vg_free = info.get('vg_free', None)
4810 if not isinstance(vg_free, int):
4811 raise errors.OpPrereqError("Can't compute free disk space on"
4813 if req_size > info['vg_free']:
4814 raise errors.OpPrereqError("Not enough disk space on target node %s."
4815 " %d MB available, %d MB required" %
4816 (node, info['vg_free'], req_size))
4818 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4821 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4823 if not isinstance(result.data, objects.OS):
4824 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4825 " primary node" % self.op.os_type)
4827 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4829 # memory check on primary node
4831 _CheckNodeFreeMemory(self, self.pnode.name,
4832 "creating instance %s" % self.op.instance_name,
4833 self.be_full[constants.BE_MEMORY],
4836 def Exec(self, feedback_fn):
4837 """Create and add the instance to the cluster.
4840 instance = self.op.instance_name
4841 pnode_name = self.pnode.name
4843 ht_kind = self.op.hypervisor
4844 if ht_kind in constants.HTS_REQ_PORT:
4845 network_port = self.cfg.AllocatePort()
4849 ##if self.op.vnc_bind_address is None:
4850 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4852 # this is needed because os.path.join does not accept None arguments
4853 if self.op.file_storage_dir is None:
4854 string_file_storage_dir = ""
4856 string_file_storage_dir = self.op.file_storage_dir
4858 # build the full file storage dir path
4859 file_storage_dir = os.path.normpath(os.path.join(
4860 self.cfg.GetFileStorageDir(),
4861 string_file_storage_dir, instance))
4864 disks = _GenerateDiskTemplate(self,
4865 self.op.disk_template,
4866 instance, pnode_name,
4870 self.op.file_driver,
4873 iobj = objects.Instance(name=instance, os=self.op.os_type,
4874 primary_node=pnode_name,
4875 nics=self.nics, disks=disks,
4876 disk_template=self.op.disk_template,
4878 network_port=network_port,
4879 beparams=self.op.beparams,
4880 hvparams=self.op.hvparams,
4881 hypervisor=self.op.hypervisor,
4884 feedback_fn("* creating instance disks...")
4886 _CreateDisks(self, iobj)
4887 except errors.OpExecError:
4888 self.LogWarning("Device creation failed, reverting...")
4890 _RemoveDisks(self, iobj)
4892 self.cfg.ReleaseDRBDMinors(instance)
4895 feedback_fn("adding instance %s to cluster config" % instance)
4897 self.cfg.AddInstance(iobj)
4898 # Declare that we don't want to remove the instance lock anymore, as we've
4899 # added the instance to the config
4900 del self.remove_locks[locking.LEVEL_INSTANCE]
4901 # Unlock all the nodes
4902 if self.op.mode == constants.INSTANCE_IMPORT:
4903 nodes_keep = [self.op.src_node]
4904 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4905 if node != self.op.src_node]
4906 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4907 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4909 self.context.glm.release(locking.LEVEL_NODE)
4910 del self.acquired_locks[locking.LEVEL_NODE]
4912 if self.op.wait_for_sync:
4913 disk_abort = not _WaitForSync(self, iobj)
4914 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4915 # make sure the disks are not degraded (still sync-ing is ok)
4917 feedback_fn("* checking mirrors status")
4918 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4923 _RemoveDisks(self, iobj)
4924 self.cfg.RemoveInstance(iobj.name)
4925 # Make sure the instance lock gets removed
4926 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4927 raise errors.OpExecError("There are some degraded disks for"
4930 feedback_fn("creating os for instance %s on node %s" %
4931 (instance, pnode_name))
4933 if iobj.disk_template != constants.DT_DISKLESS:
4934 if self.op.mode == constants.INSTANCE_CREATE:
4935 feedback_fn("* running the instance OS create scripts...")
4936 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4937 msg = result.RemoteFailMsg()
4939 raise errors.OpExecError("Could not add os for instance %s"
4941 (instance, pnode_name, msg))
4943 elif self.op.mode == constants.INSTANCE_IMPORT:
4944 feedback_fn("* running the instance OS import scripts...")
4945 src_node = self.op.src_node
4946 src_images = self.src_images
4947 cluster_name = self.cfg.GetClusterName()
4948 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4949 src_node, src_images,
4951 import_result.Raise()
4952 for idx, result in enumerate(import_result.data):
4954 self.LogWarning("Could not import the image %s for instance"
4955 " %s, disk %d, on node %s" %
4956 (src_images[idx], instance, idx, pnode_name))
4958 # also checked in the prereq part
4959 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4963 iobj.admin_up = True
4964 self.cfg.Update(iobj)
4965 logging.info("Starting instance %s on node %s", instance, pnode_name)
4966 feedback_fn("* starting instance...")
4967 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4968 msg = result.RemoteFailMsg()
4970 raise errors.OpExecError("Could not start instance: %s" % msg)
4973 class LUConnectConsole(NoHooksLU):
4974 """Connect to an instance's console.
4976 This is somewhat special in that it returns the command line that
4977 you need to run on the master node in order to connect to the
4981 _OP_REQP = ["instance_name"]
4984 def ExpandNames(self):
4985 self._ExpandAndLockInstance()
4987 def CheckPrereq(self):
4988 """Check prerequisites.
4990 This checks that the instance is in the cluster.
4993 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4994 assert self.instance is not None, \
4995 "Cannot retrieve locked instance %s" % self.op.instance_name
4996 _CheckNodeOnline(self, self.instance.primary_node)
4998 def Exec(self, feedback_fn):
4999 """Connect to the console of an instance
5002 instance = self.instance
5003 node = instance.primary_node
5005 node_insts = self.rpc.call_instance_list([node],
5006 [instance.hypervisor])[node]
5009 if instance.name not in node_insts.data:
5010 raise errors.OpExecError("Instance %s is not running." % instance.name)
5012 logging.debug("Connecting to console of %s on %s", instance.name, node)
5014 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5015 cluster = self.cfg.GetClusterInfo()
5016 # beparams and hvparams are passed separately, to avoid editing the
5017 # instance and then saving the defaults in the instance itself.
5018 hvparams = cluster.FillHV(instance)
5019 beparams = cluster.FillBE(instance)
5020 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5023 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5026 class LUReplaceDisks(LogicalUnit):
5027 """Replace the disks of an instance.
5030 HPATH = "mirrors-replace"
5031 HTYPE = constants.HTYPE_INSTANCE
5032 _OP_REQP = ["instance_name", "mode", "disks"]
5035 def CheckArguments(self):
5036 if not hasattr(self.op, "remote_node"):
5037 self.op.remote_node = None
5038 if not hasattr(self.op, "iallocator"):
5039 self.op.iallocator = None
5041 # check for valid parameter combination
5042 cnt = [self.op.remote_node, self.op.iallocator].count(None)
5043 if self.op.mode == constants.REPLACE_DISK_CHG:
5045 raise errors.OpPrereqError("When changing the secondary either an"
5046 " iallocator script must be used or the"
5049 raise errors.OpPrereqError("Give either the iallocator or the new"
5050 " secondary, not both")
5051 else: # not replacing the secondary
5053 raise errors.OpPrereqError("The iallocator and new node options can"
5054 " be used only when changing the"
5057 def ExpandNames(self):
5058 self._ExpandAndLockInstance()
5060 if self.op.iallocator is not None:
5061 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5062 elif self.op.remote_node is not None:
5063 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5064 if remote_node is None:
5065 raise errors.OpPrereqError("Node '%s' not known" %
5066 self.op.remote_node)
5067 self.op.remote_node = remote_node
5068 # Warning: do not remove the locking of the new secondary here
5069 # unless DRBD8.AddChildren is changed to work in parallel;
5070 # currently it doesn't since parallel invocations of
5071 # FindUnusedMinor will conflict
5072 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5073 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5075 self.needed_locks[locking.LEVEL_NODE] = []
5076 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5078 def DeclareLocks(self, level):
5079 # If we're not already locking all nodes in the set we have to declare the
5080 # instance's primary/secondary nodes.
5081 if (level == locking.LEVEL_NODE and
5082 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5083 self._LockInstancesNodes()
5085 def _RunAllocator(self):
5086 """Compute a new secondary node using an IAllocator.
5089 ial = IAllocator(self,
5090 mode=constants.IALLOCATOR_MODE_RELOC,
5091 name=self.op.instance_name,
5092 relocate_from=[self.sec_node])
5094 ial.Run(self.op.iallocator)
5097 raise errors.OpPrereqError("Can't compute nodes using"
5098 " iallocator '%s': %s" % (self.op.iallocator,
5100 if len(ial.nodes) != ial.required_nodes:
5101 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5102 " of nodes (%s), required %s" %
5103 (len(ial.nodes), ial.required_nodes))
5104 self.op.remote_node = ial.nodes[0]
5105 self.LogInfo("Selected new secondary for the instance: %s",
5106 self.op.remote_node)
5108 def BuildHooksEnv(self):
5111 This runs on the master, the primary and all the secondaries.
5115 "MODE": self.op.mode,
5116 "NEW_SECONDARY": self.op.remote_node,
5117 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5119 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5121 self.cfg.GetMasterNode(),
5122 self.instance.primary_node,
5124 if self.op.remote_node is not None:
5125 nl.append(self.op.remote_node)
5128 def CheckPrereq(self):
5129 """Check prerequisites.
5131 This checks that the instance is in the cluster.
5134 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5135 assert instance is not None, \
5136 "Cannot retrieve locked instance %s" % self.op.instance_name
5137 self.instance = instance
5139 if instance.disk_template != constants.DT_DRBD8:
5140 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5143 if len(instance.secondary_nodes) != 1:
5144 raise errors.OpPrereqError("The instance has a strange layout,"
5145 " expected one secondary but found %d" %
5146 len(instance.secondary_nodes))
5148 self.sec_node = instance.secondary_nodes[0]
5150 if self.op.iallocator is not None:
5151 self._RunAllocator()
5153 remote_node = self.op.remote_node
5154 if remote_node is not None:
5155 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5156 assert self.remote_node_info is not None, \
5157 "Cannot retrieve locked node %s" % remote_node
5159 self.remote_node_info = None
5160 if remote_node == instance.primary_node:
5161 raise errors.OpPrereqError("The specified node is the primary node of"
5163 elif remote_node == self.sec_node:
5164 raise errors.OpPrereqError("The specified node is already the"
5165 " secondary node of the instance.")
5167 if self.op.mode == constants.REPLACE_DISK_PRI:
5168 n1 = self.tgt_node = instance.primary_node
5169 n2 = self.oth_node = self.sec_node
5170 elif self.op.mode == constants.REPLACE_DISK_SEC:
5171 n1 = self.tgt_node = self.sec_node
5172 n2 = self.oth_node = instance.primary_node
5173 elif self.op.mode == constants.REPLACE_DISK_CHG:
5174 n1 = self.new_node = remote_node
5175 n2 = self.oth_node = instance.primary_node
5176 self.tgt_node = self.sec_node
5177 _CheckNodeNotDrained(self, remote_node)
5179 raise errors.ProgrammerError("Unhandled disk replace mode")
5181 _CheckNodeOnline(self, n1)
5182 _CheckNodeOnline(self, n2)
5184 if not self.op.disks:
5185 self.op.disks = range(len(instance.disks))
5187 for disk_idx in self.op.disks:
5188 instance.FindDisk(disk_idx)
5190 def _ExecD8DiskOnly(self, feedback_fn):
5191 """Replace a disk on the primary or secondary for dbrd8.
5193 The algorithm for replace is quite complicated:
5195 1. for each disk to be replaced:
5197 1. create new LVs on the target node with unique names
5198 1. detach old LVs from the drbd device
5199 1. rename old LVs to name_replaced.<time_t>
5200 1. rename new LVs to old LVs
5201 1. attach the new LVs (with the old names now) to the drbd device
5203 1. wait for sync across all devices
5205 1. for each modified disk:
5207 1. remove old LVs (which have the name name_replaces.<time_t>)
5209 Failures are not very well handled.
5213 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5214 instance = self.instance
5216 vgname = self.cfg.GetVGName()
5219 tgt_node = self.tgt_node
5220 oth_node = self.oth_node
5222 # Step: check device activation
5223 self.proc.LogStep(1, steps_total, "check device existence")
5224 info("checking volume groups")
5225 my_vg = cfg.GetVGName()
5226 results = self.rpc.call_vg_list([oth_node, tgt_node])
5228 raise errors.OpExecError("Can't list volume groups on the nodes")
5229 for node in oth_node, tgt_node:
5231 if res.failed or not res.data or my_vg not in res.data:
5232 raise errors.OpExecError("Volume group '%s' not found on %s" %
5234 for idx, dev in enumerate(instance.disks):
5235 if idx not in self.op.disks:
5237 for node in tgt_node, oth_node:
5238 info("checking disk/%d on %s" % (idx, node))
5239 cfg.SetDiskID(dev, node)
5240 result = self.rpc.call_blockdev_find(node, dev)
5241 msg = result.RemoteFailMsg()
5242 if not msg and not result.payload:
5243 msg = "disk not found"
5245 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5248 # Step: check other node consistency
5249 self.proc.LogStep(2, steps_total, "check peer consistency")
5250 for idx, dev in enumerate(instance.disks):
5251 if idx not in self.op.disks:
5253 info("checking disk/%d consistency on %s" % (idx, oth_node))
5254 if not _CheckDiskConsistency(self, dev, oth_node,
5255 oth_node==instance.primary_node):
5256 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5257 " to replace disks on this node (%s)" %
5258 (oth_node, tgt_node))
5260 # Step: create new storage
5261 self.proc.LogStep(3, steps_total, "allocate new storage")
5262 for idx, dev in enumerate(instance.disks):
5263 if idx not in self.op.disks:
5266 cfg.SetDiskID(dev, tgt_node)
5267 lv_names = [".disk%d_%s" % (idx, suf)
5268 for suf in ["data", "meta"]]
5269 names = _GenerateUniqueNames(self, lv_names)
5270 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5271 logical_id=(vgname, names[0]))
5272 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5273 logical_id=(vgname, names[1]))
5274 new_lvs = [lv_data, lv_meta]
5275 old_lvs = dev.children
5276 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5277 info("creating new local storage on %s for %s" %
5278 (tgt_node, dev.iv_name))
5279 # we pass force_create=True to force the LVM creation
5280 for new_lv in new_lvs:
5281 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5282 _GetInstanceInfoText(instance), False)
5284 # Step: for each lv, detach+rename*2+attach
5285 self.proc.LogStep(4, steps_total, "change drbd configuration")
5286 for dev, old_lvs, new_lvs in iv_names.itervalues():
5287 info("detaching %s drbd from local storage" % dev.iv_name)
5288 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5289 msg = result.RemoteFailMsg()
5291 raise errors.OpExecError("Can't detach drbd from local storage on node"
5292 " %s for device %s: %s" %
5293 (tgt_node, dev.iv_name, msg))
5295 #cfg.Update(instance)
5297 # ok, we created the new LVs, so now we know we have the needed
5298 # storage; as such, we proceed on the target node to rename
5299 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5300 # using the assumption that logical_id == physical_id (which in
5301 # turn is the unique_id on that node)
5303 # FIXME(iustin): use a better name for the replaced LVs
5304 temp_suffix = int(time.time())
5305 ren_fn = lambda d, suff: (d.physical_id[0],
5306 d.physical_id[1] + "_replaced-%s" % suff)
5307 # build the rename list based on what LVs exist on the node
5309 for to_ren in old_lvs:
5310 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5311 if not result.RemoteFailMsg() and result.payload:
5313 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5315 info("renaming the old LVs on the target node")
5316 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5317 msg = result.RemoteFailMsg()
5319 raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5321 # now we rename the new LVs to the old LVs
5322 info("renaming the new LVs on the target node")
5323 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5324 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5325 msg = result.RemoteFailMsg()
5327 raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5330 for old, new in zip(old_lvs, new_lvs):
5331 new.logical_id = old.logical_id
5332 cfg.SetDiskID(new, tgt_node)
5334 for disk in old_lvs:
5335 disk.logical_id = ren_fn(disk, temp_suffix)
5336 cfg.SetDiskID(disk, tgt_node)
5338 # now that the new lvs have the old name, we can add them to the device
5339 info("adding new mirror component on %s" % tgt_node)
5340 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5341 msg = result.RemoteFailMsg()
5343 for new_lv in new_lvs:
5344 msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5346 warning("Can't rollback device %s: %s", dev, msg,
5347 hint="cleanup manually the unused logical volumes")
5348 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5350 dev.children = new_lvs
5351 cfg.Update(instance)
5353 # Step: wait for sync
5355 # this can fail as the old devices are degraded and _WaitForSync
5356 # does a combined result over all disks, so we don't check its
5358 self.proc.LogStep(5, steps_total, "sync devices")
5359 _WaitForSync(self, instance, unlock=True)
5361 # so check manually all the devices
5362 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5363 cfg.SetDiskID(dev, instance.primary_node)
5364 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5365 msg = result.RemoteFailMsg()
5366 if not msg and not result.payload:
5367 msg = "disk not found"
5369 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5371 if result.payload[5]:
5372 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5374 # Step: remove old storage
5375 self.proc.LogStep(6, steps_total, "removing old storage")
5376 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5377 info("remove logical volumes for %s" % name)
5379 cfg.SetDiskID(lv, tgt_node)
5380 msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5382 warning("Can't remove old LV: %s" % msg,
5383 hint="manually remove unused LVs")
5386 def _ExecD8Secondary(self, feedback_fn):
5387 """Replace the secondary node for drbd8.
5389 The algorithm for replace is quite complicated:
5390 - for all disks of the instance:
5391 - create new LVs on the new node with same names
5392 - shutdown the drbd device on the old secondary
5393 - disconnect the drbd network on the primary
5394 - create the drbd device on the new secondary
5395 - network attach the drbd on the primary, using an artifice:
5396 the drbd code for Attach() will connect to the network if it
5397 finds a device which is connected to the good local disks but
5399 - wait for sync across all devices
5400 - remove all disks from the old secondary
5402 Failures are not very well handled.
5406 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5407 instance = self.instance
5411 old_node = self.tgt_node
5412 new_node = self.new_node
5413 pri_node = instance.primary_node
5415 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5416 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5417 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5420 # Step: check device activation
5421 self.proc.LogStep(1, steps_total, "check device existence")
5422 info("checking volume groups")
5423 my_vg = cfg.GetVGName()
5424 results = self.rpc.call_vg_list([pri_node, new_node])
5425 for node in pri_node, new_node:
5427 if res.failed or not res.data or my_vg not in res.data:
5428 raise errors.OpExecError("Volume group '%s' not found on %s" %
5430 for idx, dev in enumerate(instance.disks):
5431 if idx not in self.op.disks:
5433 info("checking disk/%d on %s" % (idx, pri_node))
5434 cfg.SetDiskID(dev, pri_node)
5435 result = self.rpc.call_blockdev_find(pri_node, dev)
5436 msg = result.RemoteFailMsg()
5437 if not msg and not result.payload:
5438 msg = "disk not found"
5440 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5441 (idx, pri_node, msg))
5443 # Step: check other node consistency
5444 self.proc.LogStep(2, steps_total, "check peer consistency")
5445 for idx, dev in enumerate(instance.disks):
5446 if idx not in self.op.disks:
5448 info("checking disk/%d consistency on %s" % (idx, pri_node))
5449 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5450 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5451 " unsafe to replace the secondary" %
5454 # Step: create new storage
5455 self.proc.LogStep(3, steps_total, "allocate new storage")
5456 for idx, dev in enumerate(instance.disks):
5457 info("adding new local storage on %s for disk/%d" %
5459 # we pass force_create=True to force LVM creation
5460 for new_lv in dev.children:
5461 _CreateBlockDev(self, new_node, instance, new_lv, True,
5462 _GetInstanceInfoText(instance), False)
5464 # Step 4: dbrd minors and drbd setups changes
5465 # after this, we must manually remove the drbd minors on both the
5466 # error and the success paths
5467 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5469 logging.debug("Allocated minors %s" % (minors,))
5470 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5471 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5473 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5474 # create new devices on new_node; note that we create two IDs:
5475 # one without port, so the drbd will be activated without
5476 # networking information on the new node at this stage, and one
5477 # with network, for the latter activation in step 4
5478 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5479 if pri_node == o_node1:
5484 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5485 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5487 iv_names[idx] = (dev, dev.children, new_net_id)
5488 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5490 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5491 logical_id=new_alone_id,
5492 children=dev.children)
5494 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5495 _GetInstanceInfoText(instance), False)
5496 except errors.GenericError:
5497 self.cfg.ReleaseDRBDMinors(instance.name)
5500 for idx, dev in enumerate(instance.disks):
5501 # we have new devices, shutdown the drbd on the old secondary
5502 info("shutting down drbd for disk/%d on old node" % idx)
5503 cfg.SetDiskID(dev, old_node)
5504 msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5506 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5508 hint="Please cleanup this device manually as soon as possible")
5510 info("detaching primary drbds from the network (=> standalone)")
5511 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5512 instance.disks)[pri_node]
5514 msg = result.RemoteFailMsg()
5516 # detaches didn't succeed (unlikely)
5517 self.cfg.ReleaseDRBDMinors(instance.name)
5518 raise errors.OpExecError("Can't detach the disks from the network on"
5519 " old node: %s" % (msg,))
5521 # if we managed to detach at least one, we update all the disks of
5522 # the instance to point to the new secondary
5523 info("updating instance configuration")
5524 for dev, _, new_logical_id in iv_names.itervalues():
5525 dev.logical_id = new_logical_id
5526 cfg.SetDiskID(dev, pri_node)
5527 cfg.Update(instance)
5529 # and now perform the drbd attach
5530 info("attaching primary drbds to new secondary (standalone => connected)")
5531 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5532 instance.disks, instance.name,
5534 for to_node, to_result in result.items():
5535 msg = to_result.RemoteFailMsg()
5537 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5538 hint="please do a gnt-instance info to see the"
5541 # this can fail as the old devices are degraded and _WaitForSync
5542 # does a combined result over all disks, so we don't check its
5544 self.proc.LogStep(5, steps_total, "sync devices")
5545 _WaitForSync(self, instance, unlock=True)
5547 # so check manually all the devices
5548 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5549 cfg.SetDiskID(dev, pri_node)
5550 result = self.rpc.call_blockdev_find(pri_node, dev)
5551 msg = result.RemoteFailMsg()
5552 if not msg and not result.payload:
5553 msg = "disk not found"
5555 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5557 if result.payload[5]:
5558 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5560 self.proc.LogStep(6, steps_total, "removing old storage")
5561 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5562 info("remove logical volumes for disk/%d" % idx)
5564 cfg.SetDiskID(lv, old_node)
5565 msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5567 warning("Can't remove LV on old secondary: %s", msg,
5568 hint="Cleanup stale volumes by hand")
5570 def Exec(self, feedback_fn):
5571 """Execute disk replacement.
5573 This dispatches the disk replacement to the appropriate handler.
5576 instance = self.instance
5578 # Activate the instance disks if we're replacing them on a down instance
5579 if not instance.admin_up:
5580 _StartInstanceDisks(self, instance, True)
5582 if self.op.mode == constants.REPLACE_DISK_CHG:
5583 fn = self._ExecD8Secondary
5585 fn = self._ExecD8DiskOnly
5587 ret = fn(feedback_fn)
5589 # Deactivate the instance disks if we're replacing them on a down instance
5590 if not instance.admin_up:
5591 _SafeShutdownInstanceDisks(self, instance)
5596 class LUGrowDisk(LogicalUnit):
5597 """Grow a disk of an instance.
5601 HTYPE = constants.HTYPE_INSTANCE
5602 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5605 def ExpandNames(self):
5606 self._ExpandAndLockInstance()
5607 self.needed_locks[locking.LEVEL_NODE] = []
5608 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5610 def DeclareLocks(self, level):
5611 if level == locking.LEVEL_NODE:
5612 self._LockInstancesNodes()
5614 def BuildHooksEnv(self):
5617 This runs on the master, the primary and all the secondaries.
5621 "DISK": self.op.disk,
5622 "AMOUNT": self.op.amount,
5624 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5626 self.cfg.GetMasterNode(),
5627 self.instance.primary_node,
5631 def CheckPrereq(self):
5632 """Check prerequisites.
5634 This checks that the instance is in the cluster.
5637 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5638 assert instance is not None, \
5639 "Cannot retrieve locked instance %s" % self.op.instance_name
5640 nodenames = list(instance.all_nodes)
5641 for node in nodenames:
5642 _CheckNodeOnline(self, node)
5645 self.instance = instance
5647 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5648 raise errors.OpPrereqError("Instance's disk layout does not support"
5651 self.disk = instance.FindDisk(self.op.disk)
5653 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5654 instance.hypervisor)
5655 for node in nodenames:
5656 info = nodeinfo[node]
5657 if info.failed or not info.data:
5658 raise errors.OpPrereqError("Cannot get current information"
5659 " from node '%s'" % node)
5660 vg_free = info.data.get('vg_free', None)
5661 if not isinstance(vg_free, int):
5662 raise errors.OpPrereqError("Can't compute free disk space on"
5664 if self.op.amount > vg_free:
5665 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5666 " %d MiB available, %d MiB required" %
5667 (node, vg_free, self.op.amount))
5669 def Exec(self, feedback_fn):
5670 """Execute disk grow.
5673 instance = self.instance
5675 for node in instance.all_nodes:
5676 self.cfg.SetDiskID(disk, node)
5677 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5678 msg = result.RemoteFailMsg()
5680 raise errors.OpExecError("Grow request failed to node %s: %s" %
5682 disk.RecordGrow(self.op.amount)
5683 self.cfg.Update(instance)
5684 if self.op.wait_for_sync:
5685 disk_abort = not _WaitForSync(self, instance)
5687 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5688 " status.\nPlease check the instance.")
5691 class LUQueryInstanceData(NoHooksLU):
5692 """Query runtime instance data.
5695 _OP_REQP = ["instances", "static"]
5698 def ExpandNames(self):
5699 self.needed_locks = {}
5700 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5702 if not isinstance(self.op.instances, list):
5703 raise errors.OpPrereqError("Invalid argument type 'instances'")
5705 if self.op.instances:
5706 self.wanted_names = []
5707 for name in self.op.instances:
5708 full_name = self.cfg.ExpandInstanceName(name)
5709 if full_name is None:
5710 raise errors.OpPrereqError("Instance '%s' not known" % name)
5711 self.wanted_names.append(full_name)
5712 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5714 self.wanted_names = None
5715 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5717 self.needed_locks[locking.LEVEL_NODE] = []
5718 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5720 def DeclareLocks(self, level):
5721 if level == locking.LEVEL_NODE:
5722 self._LockInstancesNodes()
5724 def CheckPrereq(self):
5725 """Check prerequisites.
5727 This only checks the optional instance list against the existing names.
5730 if self.wanted_names is None:
5731 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5733 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5734 in self.wanted_names]
5737 def _ComputeDiskStatus(self, instance, snode, dev):
5738 """Compute block device status.
5741 static = self.op.static
5743 self.cfg.SetDiskID(dev, instance.primary_node)
5744 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5745 if dev_pstatus.offline:
5748 msg = dev_pstatus.RemoteFailMsg()
5750 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5751 (instance.name, msg))
5752 dev_pstatus = dev_pstatus.payload
5756 if dev.dev_type in constants.LDS_DRBD:
5757 # we change the snode then (otherwise we use the one passed in)
5758 if dev.logical_id[0] == instance.primary_node:
5759 snode = dev.logical_id[1]
5761 snode = dev.logical_id[0]
5763 if snode and not static:
5764 self.cfg.SetDiskID(dev, snode)
5765 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5766 if dev_sstatus.offline:
5769 msg = dev_sstatus.RemoteFailMsg()
5771 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5772 (instance.name, msg))
5773 dev_sstatus = dev_sstatus.payload
5778 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5779 for child in dev.children]
5784 "iv_name": dev.iv_name,
5785 "dev_type": dev.dev_type,
5786 "logical_id": dev.logical_id,
5787 "physical_id": dev.physical_id,
5788 "pstatus": dev_pstatus,
5789 "sstatus": dev_sstatus,
5790 "children": dev_children,
5796 def Exec(self, feedback_fn):
5797 """Gather and return data"""
5800 cluster = self.cfg.GetClusterInfo()
5802 for instance in self.wanted_instances:
5803 if not self.op.static:
5804 remote_info = self.rpc.call_instance_info(instance.primary_node,
5806 instance.hypervisor)
5808 remote_info = remote_info.data
5809 if remote_info and "state" in remote_info:
5812 remote_state = "down"
5815 if instance.admin_up:
5818 config_state = "down"
5820 disks = [self._ComputeDiskStatus(instance, None, device)
5821 for device in instance.disks]
5824 "name": instance.name,
5825 "config_state": config_state,
5826 "run_state": remote_state,
5827 "pnode": instance.primary_node,
5828 "snodes": instance.secondary_nodes,
5830 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5832 "hypervisor": instance.hypervisor,
5833 "network_port": instance.network_port,
5834 "hv_instance": instance.hvparams,
5835 "hv_actual": cluster.FillHV(instance),
5836 "be_instance": instance.beparams,
5837 "be_actual": cluster.FillBE(instance),
5840 result[instance.name] = idict
5845 class LUSetInstanceParams(LogicalUnit):
5846 """Modifies an instances's parameters.
5849 HPATH = "instance-modify"
5850 HTYPE = constants.HTYPE_INSTANCE
5851 _OP_REQP = ["instance_name"]
5854 def CheckArguments(self):
5855 if not hasattr(self.op, 'nics'):
5857 if not hasattr(self.op, 'disks'):
5859 if not hasattr(self.op, 'beparams'):
5860 self.op.beparams = {}
5861 if not hasattr(self.op, 'hvparams'):
5862 self.op.hvparams = {}
5863 self.op.force = getattr(self.op, "force", False)
5864 if not (self.op.nics or self.op.disks or
5865 self.op.hvparams or self.op.beparams):
5866 raise errors.OpPrereqError("No changes submitted")
5870 for disk_op, disk_dict in self.op.disks:
5871 if disk_op == constants.DDM_REMOVE:
5874 elif disk_op == constants.DDM_ADD:
5877 if not isinstance(disk_op, int):
5878 raise errors.OpPrereqError("Invalid disk index")
5879 if disk_op == constants.DDM_ADD:
5880 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5881 if mode not in constants.DISK_ACCESS_SET:
5882 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5883 size = disk_dict.get('size', None)
5885 raise errors.OpPrereqError("Required disk parameter size missing")
5888 except ValueError, err:
5889 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5891 disk_dict['size'] = size
5893 # modification of disk
5894 if 'size' in disk_dict:
5895 raise errors.OpPrereqError("Disk size change not possible, use"
5898 if disk_addremove > 1:
5899 raise errors.OpPrereqError("Only one disk add or remove operation"
5900 " supported at a time")
5904 for nic_op, nic_dict in self.op.nics:
5905 if nic_op == constants.DDM_REMOVE:
5908 elif nic_op == constants.DDM_ADD:
5911 if not isinstance(nic_op, int):
5912 raise errors.OpPrereqError("Invalid nic index")
5914 # nic_dict should be a dict
5915 nic_ip = nic_dict.get('ip', None)
5916 if nic_ip is not None:
5917 if nic_ip.lower() == constants.VALUE_NONE:
5918 nic_dict['ip'] = None
5920 if not utils.IsValidIP(nic_ip):
5921 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5923 nic_bridge = nic_dict.get('bridge', None)
5924 nic_link = nic_dict.get('link', None)
5925 if nic_bridge and nic_link:
5926 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5927 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5928 nic_dict['bridge'] = None
5929 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5930 nic_dict['link'] = None
5932 if nic_op == constants.DDM_ADD:
5933 nic_mac = nic_dict.get('mac', None)
5935 nic_dict['mac'] = constants.VALUE_AUTO
5937 if 'mac' in nic_dict:
5938 nic_mac = nic_dict['mac']
5939 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5940 if not utils.IsValidMac(nic_mac):
5941 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5942 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5943 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5944 " modifying an existing nic")
5946 if nic_addremove > 1:
5947 raise errors.OpPrereqError("Only one NIC add or remove operation"
5948 " supported at a time")
5950 def ExpandNames(self):
5951 self._ExpandAndLockInstance()
5952 self.needed_locks[locking.LEVEL_NODE] = []
5953 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5955 def DeclareLocks(self, level):
5956 if level == locking.LEVEL_NODE:
5957 self._LockInstancesNodes()
5959 def BuildHooksEnv(self):
5962 This runs on the master, primary and secondaries.
5966 if constants.BE_MEMORY in self.be_new:
5967 args['memory'] = self.be_new[constants.BE_MEMORY]
5968 if constants.BE_VCPUS in self.be_new:
5969 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5970 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5971 # information at all.
5974 nic_override = dict(self.op.nics)
5975 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
5976 for idx, nic in enumerate(self.instance.nics):
5977 if idx in nic_override:
5978 this_nic_override = nic_override[idx]
5980 this_nic_override = {}
5981 if 'ip' in this_nic_override:
5982 ip = this_nic_override['ip']
5985 if 'mac' in this_nic_override:
5986 mac = this_nic_override['mac']
5989 if idx in self.nic_pnew:
5990 nicparams = self.nic_pnew[idx]
5992 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
5993 mode = nicparams[constants.NIC_MODE]
5994 link = nicparams[constants.NIC_LINK]
5995 args['nics'].append((ip, mac, mode, link))
5996 if constants.DDM_ADD in nic_override:
5997 ip = nic_override[constants.DDM_ADD].get('ip', None)
5998 mac = nic_override[constants.DDM_ADD]['mac']
5999 nicparams = self.nic_pnew[constants.DDM_ADD]
6000 mode = nicparams[constants.NIC_MODE]
6001 link = nicparams[constants.NIC_LINK]
6002 args['nics'].append((ip, mac, mode, link))
6003 elif constants.DDM_REMOVE in nic_override:
6004 del args['nics'][-1]
6006 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6007 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6010 def _GetUpdatedParams(self, old_params, update_dict,
6011 default_values, parameter_types):
6012 """Return the new params dict for the given params.
6014 @type old_params: dict
6015 @type old_params: old parameters
6016 @type update_dict: dict
6017 @type update_dict: dict containing new parameter values,
6018 or constants.VALUE_DEFAULT to reset the
6019 parameter to its default value
6020 @type default_values: dict
6021 @param default_values: default values for the filled parameters
6022 @type parameter_types: dict
6023 @param parameter_types: dict mapping target dict keys to types
6024 in constants.ENFORCEABLE_TYPES
6025 @rtype: (dict, dict)
6026 @return: (new_parameters, filled_parameters)
6029 params_copy = copy.deepcopy(old_params)
6030 for key, val in update_dict.iteritems():
6031 if val == constants.VALUE_DEFAULT:
6033 del params_copy[key]
6037 params_copy[key] = val
6038 utils.ForceDictType(params_copy, parameter_types)
6039 params_filled = objects.FillDict(default_values, params_copy)
6040 return (params_copy, params_filled)
6042 def CheckPrereq(self):
6043 """Check prerequisites.
6045 This only checks the instance list against the existing names.
6048 force = self.force = self.op.force
6050 # checking the new params on the primary/secondary nodes
6052 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6053 cluster = self.cluster = self.cfg.GetClusterInfo()
6054 assert self.instance is not None, \
6055 "Cannot retrieve locked instance %s" % self.op.instance_name
6056 pnode = instance.primary_node
6057 nodelist = list(instance.all_nodes)
6059 # hvparams processing
6060 if self.op.hvparams:
6061 i_hvdict, hv_new = self._GetUpdatedParams(
6062 instance.hvparams, self.op.hvparams,
6063 cluster.hvparams[instance.hypervisor],
6064 constants.HVS_PARAMETER_TYPES)
6066 hypervisor.GetHypervisor(
6067 instance.hypervisor).CheckParameterSyntax(hv_new)
6068 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6069 self.hv_new = hv_new # the new actual values
6070 self.hv_inst = i_hvdict # the new dict (without defaults)
6072 self.hv_new = self.hv_inst = {}
6074 # beparams processing
6075 if self.op.beparams:
6076 i_bedict, be_new = self._GetUpdatedParams(
6077 instance.beparams, self.op.beparams,
6078 cluster.beparams[constants.PP_DEFAULT],
6079 constants.BES_PARAMETER_TYPES)
6080 self.be_new = be_new # the new actual values
6081 self.be_inst = i_bedict # the new dict (without defaults)
6083 self.be_new = self.be_inst = {}
6087 if constants.BE_MEMORY in self.op.beparams and not self.force:
6088 mem_check_list = [pnode]
6089 if be_new[constants.BE_AUTO_BALANCE]:
6090 # either we changed auto_balance to yes or it was from before
6091 mem_check_list.extend(instance.secondary_nodes)
6092 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6093 instance.hypervisor)
6094 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6095 instance.hypervisor)
6096 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6097 # Assume the primary node is unreachable and go ahead
6098 self.warn.append("Can't get info from primary node %s" % pnode)
6100 if not instance_info.failed and instance_info.data:
6101 current_mem = int(instance_info.data['memory'])
6103 # Assume instance not running
6104 # (there is a slight race condition here, but it's not very probable,
6105 # and we have no other way to check)
6107 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6108 nodeinfo[pnode].data['memory_free'])
6110 raise errors.OpPrereqError("This change will prevent the instance"
6111 " from starting, due to %d MB of memory"
6112 " missing on its primary node" % miss_mem)
6114 if be_new[constants.BE_AUTO_BALANCE]:
6115 for node, nres in nodeinfo.iteritems():
6116 if node not in instance.secondary_nodes:
6118 if nres.failed or not isinstance(nres.data, dict):
6119 self.warn.append("Can't get info from secondary node %s" % node)
6120 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6121 self.warn.append("Not enough memory to failover instance to"
6122 " secondary node %s" % node)
6127 for nic_op, nic_dict in self.op.nics:
6128 if nic_op == constants.DDM_REMOVE:
6129 if not instance.nics:
6130 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6132 if nic_op != constants.DDM_ADD:
6134 if nic_op < 0 or nic_op >= len(instance.nics):
6135 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6137 (nic_op, len(instance.nics)))
6138 old_nic_params = instance.nics[nic_op].nicparams
6139 old_nic_ip = instance.nics[nic_op].ip
6144 update_params_dict = dict([(key, nic_dict[key])
6145 for key in constants.NICS_PARAMETERS
6146 if key in nic_dict])
6148 if 'bridge' in nic_dict:
6149 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6151 new_nic_params, new_filled_nic_params = \
6152 self._GetUpdatedParams(old_nic_params, update_params_dict,
6153 cluster.nicparams[constants.PP_DEFAULT],
6154 constants.NICS_PARAMETER_TYPES)
6155 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6156 self.nic_pinst[nic_op] = new_nic_params
6157 self.nic_pnew[nic_op] = new_filled_nic_params
6158 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6160 if new_nic_mode == constants.NIC_MODE_BRIDGED:
6161 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6162 result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6165 msg = ("Bridge '%s' doesn't exist on one of"
6166 " the instance nodes" % nic_bridge)
6168 self.warn.append(msg)
6170 raise errors.OpPrereqError(msg)
6171 if new_nic_mode == constants.NIC_MODE_ROUTED:
6172 if 'ip' in nic_dict:
6173 nic_ip = nic_dict['ip']
6177 raise errors.OpPrereqError('Cannot set the nic ip to None'
6179 if 'mac' in nic_dict:
6180 nic_mac = nic_dict['mac']
6182 raise errors.OpPrereqError('Cannot set the nic mac to None')
6183 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6184 # otherwise generate the mac
6185 nic_dict['mac'] = self.cfg.GenerateMAC()
6187 # or validate/reserve the current one
6188 if self.cfg.IsMacInUse(nic_mac):
6189 raise errors.OpPrereqError("MAC address %s already in use"
6190 " in cluster" % nic_mac)
6193 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6194 raise errors.OpPrereqError("Disk operations not supported for"
6195 " diskless instances")
6196 for disk_op, disk_dict in self.op.disks:
6197 if disk_op == constants.DDM_REMOVE:
6198 if len(instance.disks) == 1:
6199 raise errors.OpPrereqError("Cannot remove the last disk of"
6201 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6202 ins_l = ins_l[pnode]
6203 if ins_l.failed or not isinstance(ins_l.data, list):
6204 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6205 if instance.name in ins_l.data:
6206 raise errors.OpPrereqError("Instance is running, can't remove"
6209 if (disk_op == constants.DDM_ADD and
6210 len(instance.nics) >= constants.MAX_DISKS):
6211 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6212 " add more" % constants.MAX_DISKS)
6213 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6215 if disk_op < 0 or disk_op >= len(instance.disks):
6216 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6218 (disk_op, len(instance.disks)))
6222 def Exec(self, feedback_fn):
6223 """Modifies an instance.
6225 All parameters take effect only at the next restart of the instance.
6228 # Process here the warnings from CheckPrereq, as we don't have a
6229 # feedback_fn there.
6230 for warn in self.warn:
6231 feedback_fn("WARNING: %s" % warn)
6234 instance = self.instance
6235 cluster = self.cluster
6237 for disk_op, disk_dict in self.op.disks:
6238 if disk_op == constants.DDM_REMOVE:
6239 # remove the last disk
6240 device = instance.disks.pop()
6241 device_idx = len(instance.disks)
6242 for node, disk in device.ComputeNodeTree(instance.primary_node):
6243 self.cfg.SetDiskID(disk, node)
6244 msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6246 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6247 " continuing anyway", device_idx, node, msg)
6248 result.append(("disk/%d" % device_idx, "remove"))
6249 elif disk_op == constants.DDM_ADD:
6251 if instance.disk_template == constants.DT_FILE:
6252 file_driver, file_path = instance.disks[0].logical_id
6253 file_path = os.path.dirname(file_path)
6255 file_driver = file_path = None
6256 disk_idx_base = len(instance.disks)
6257 new_disk = _GenerateDiskTemplate(self,
6258 instance.disk_template,
6259 instance.name, instance.primary_node,
6260 instance.secondary_nodes,
6265 instance.disks.append(new_disk)
6266 info = _GetInstanceInfoText(instance)
6268 logging.info("Creating volume %s for instance %s",
6269 new_disk.iv_name, instance.name)
6270 # Note: this needs to be kept in sync with _CreateDisks
6272 for node in instance.all_nodes:
6273 f_create = node == instance.primary_node
6275 _CreateBlockDev(self, node, instance, new_disk,
6276 f_create, info, f_create)
6277 except errors.OpExecError, err:
6278 self.LogWarning("Failed to create volume %s (%s) on"
6280 new_disk.iv_name, new_disk, node, err)
6281 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6282 (new_disk.size, new_disk.mode)))
6284 # change a given disk
6285 instance.disks[disk_op].mode = disk_dict['mode']
6286 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6288 for nic_op, nic_dict in self.op.nics:
6289 if nic_op == constants.DDM_REMOVE:
6290 # remove the last nic
6291 del instance.nics[-1]
6292 result.append(("nic.%d" % len(instance.nics), "remove"))
6293 elif nic_op == constants.DDM_ADD:
6294 # mac and bridge should be set, by now
6295 mac = nic_dict['mac']
6296 ip = nic_dict.get('ip', None)
6297 nicparams = self.nic_pinst[constants.DDM_ADD]
6298 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6299 instance.nics.append(new_nic)
6300 result.append(("nic.%d" % (len(instance.nics) - 1),
6301 "add:mac=%s,ip=%s,mode=%s,link=%s" %
6302 (new_nic.mac, new_nic.ip,
6303 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6304 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6307 for key in 'mac', 'ip':
6309 setattr(instance.nics[nic_op], key, nic_dict[key])
6310 if nic_op in self.nic_pnew:
6311 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6312 for key, val in nic_dict.iteritems():
6313 result.append(("nic.%s/%d" % (key, nic_op), val))
6316 if self.op.hvparams:
6317 instance.hvparams = self.hv_inst
6318 for key, val in self.op.hvparams.iteritems():
6319 result.append(("hv/%s" % key, val))
6322 if self.op.beparams:
6323 instance.beparams = self.be_inst
6324 for key, val in self.op.beparams.iteritems():
6325 result.append(("be/%s" % key, val))
6327 self.cfg.Update(instance)
6332 class LUQueryExports(NoHooksLU):
6333 """Query the exports list
6336 _OP_REQP = ['nodes']
6339 def ExpandNames(self):
6340 self.needed_locks = {}
6341 self.share_locks[locking.LEVEL_NODE] = 1
6342 if not self.op.nodes:
6343 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6345 self.needed_locks[locking.LEVEL_NODE] = \
6346 _GetWantedNodes(self, self.op.nodes)
6348 def CheckPrereq(self):
6349 """Check prerequisites.
6352 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6354 def Exec(self, feedback_fn):
6355 """Compute the list of all the exported system images.
6358 @return: a dictionary with the structure node->(export-list)
6359 where export-list is a list of the instances exported on
6363 rpcresult = self.rpc.call_export_list(self.nodes)
6365 for node in rpcresult:
6366 if rpcresult[node].RemoteFailMsg():
6367 result[node] = False
6369 result[node] = rpcresult[node].payload
6374 class LUExportInstance(LogicalUnit):
6375 """Export an instance to an image in the cluster.
6378 HPATH = "instance-export"
6379 HTYPE = constants.HTYPE_INSTANCE
6380 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6383 def ExpandNames(self):
6384 self._ExpandAndLockInstance()
6385 # FIXME: lock only instance primary and destination node
6387 # Sad but true, for now we have do lock all nodes, as we don't know where
6388 # the previous export might be, and and in this LU we search for it and
6389 # remove it from its current node. In the future we could fix this by:
6390 # - making a tasklet to search (share-lock all), then create the new one,
6391 # then one to remove, after
6392 # - removing the removal operation altoghether
6393 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6395 def DeclareLocks(self, level):
6396 """Last minute lock declaration."""
6397 # All nodes are locked anyway, so nothing to do here.
6399 def BuildHooksEnv(self):
6402 This will run on the master, primary node and target node.
6406 "EXPORT_NODE": self.op.target_node,
6407 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6409 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6410 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6411 self.op.target_node]
6414 def CheckPrereq(self):
6415 """Check prerequisites.
6417 This checks that the instance and node names are valid.
6420 instance_name = self.op.instance_name
6421 self.instance = self.cfg.GetInstanceInfo(instance_name)
6422 assert self.instance is not None, \
6423 "Cannot retrieve locked instance %s" % self.op.instance_name
6424 _CheckNodeOnline(self, self.instance.primary_node)
6426 self.dst_node = self.cfg.GetNodeInfo(
6427 self.cfg.ExpandNodeName(self.op.target_node))
6429 if self.dst_node is None:
6430 # This is wrong node name, not a non-locked node
6431 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6432 _CheckNodeOnline(self, self.dst_node.name)
6433 _CheckNodeNotDrained(self, self.dst_node.name)
6435 # instance disk type verification
6436 for disk in self.instance.disks:
6437 if disk.dev_type == constants.LD_FILE:
6438 raise errors.OpPrereqError("Export not supported for instances with"
6439 " file-based disks")
6441 def Exec(self, feedback_fn):
6442 """Export an instance to an image in the cluster.
6445 instance = self.instance
6446 dst_node = self.dst_node
6447 src_node = instance.primary_node
6448 if self.op.shutdown:
6449 # shutdown the instance, but not the disks
6450 result = self.rpc.call_instance_shutdown(src_node, instance)
6451 msg = result.RemoteFailMsg()
6453 raise errors.OpExecError("Could not shutdown instance %s on"
6455 (instance.name, src_node, msg))
6457 vgname = self.cfg.GetVGName()
6461 # set the disks ID correctly since call_instance_start needs the
6462 # correct drbd minor to create the symlinks
6463 for disk in instance.disks:
6464 self.cfg.SetDiskID(disk, src_node)
6467 for disk in instance.disks:
6468 # result.payload will be a snapshot of an lvm leaf of the one we passed
6469 result = self.rpc.call_blockdev_snapshot(src_node, disk)
6470 msg = result.RemoteFailMsg()
6472 self.LogWarning("Could not snapshot block device %s on node %s: %s",
6473 disk.logical_id[1], src_node, msg)
6474 snap_disks.append(False)
6476 disk_id = (vgname, result.payload)
6477 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6478 logical_id=disk_id, physical_id=disk_id,
6479 iv_name=disk.iv_name)
6480 snap_disks.append(new_dev)
6483 if self.op.shutdown and instance.admin_up:
6484 result = self.rpc.call_instance_start(src_node, instance, None, None)
6485 msg = result.RemoteFailMsg()
6487 _ShutdownInstanceDisks(self, instance)
6488 raise errors.OpExecError("Could not start instance: %s" % msg)
6490 # TODO: check for size
6492 cluster_name = self.cfg.GetClusterName()
6493 for idx, dev in enumerate(snap_disks):
6495 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6496 instance, cluster_name, idx)
6497 msg = result.RemoteFailMsg()
6499 self.LogWarning("Could not export block device %s from node %s to"
6500 " node %s: %s", dev.logical_id[1], src_node,
6502 msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6504 self.LogWarning("Could not remove snapshot block device %s from node"
6505 " %s: %s", dev.logical_id[1], src_node, msg)
6507 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6508 msg = result.RemoteFailMsg()
6510 self.LogWarning("Could not finalize export for instance %s"
6511 " on node %s: %s", instance.name, dst_node.name, msg)
6513 nodelist = self.cfg.GetNodeList()
6514 nodelist.remove(dst_node.name)
6516 # on one-node clusters nodelist will be empty after the removal
6517 # if we proceed the backup would be removed because OpQueryExports
6518 # substitutes an empty list with the full cluster node list.
6520 exportlist = self.rpc.call_export_list(nodelist)
6521 for node in exportlist:
6522 if exportlist[node].RemoteFailMsg():
6524 if instance.name in exportlist[node].payload:
6525 if not self.rpc.call_export_remove(node, instance.name):
6526 self.LogWarning("Could not remove older export for instance %s"
6527 " on node %s", instance.name, node)
6530 class LURemoveExport(NoHooksLU):
6531 """Remove exports related to the named instance.
6534 _OP_REQP = ["instance_name"]
6537 def ExpandNames(self):
6538 self.needed_locks = {}
6539 # We need all nodes to be locked in order for RemoveExport to work, but we
6540 # don't need to lock the instance itself, as nothing will happen to it (and
6541 # we can remove exports also for a removed instance)
6542 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6544 def CheckPrereq(self):
6545 """Check prerequisites.
6549 def Exec(self, feedback_fn):
6550 """Remove any export.
6553 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6554 # If the instance was not found we'll try with the name that was passed in.
6555 # This will only work if it was an FQDN, though.
6557 if not instance_name:
6559 instance_name = self.op.instance_name
6561 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6562 exportlist = self.rpc.call_export_list(locked_nodes)
6564 for node in exportlist:
6565 msg = exportlist[node].RemoteFailMsg()
6567 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6569 if instance_name in exportlist[node].payload:
6571 result = self.rpc.call_export_remove(node, instance_name)
6572 if result.failed or not result.data:
6573 logging.error("Could not remove export for instance %s"
6574 " on node %s", instance_name, node)
6576 if fqdn_warn and not found:
6577 feedback_fn("Export not found. If trying to remove an export belonging"
6578 " to a deleted instance please use its Fully Qualified"
6582 class TagsLU(NoHooksLU):
6585 This is an abstract class which is the parent of all the other tags LUs.
6589 def ExpandNames(self):
6590 self.needed_locks = {}
6591 if self.op.kind == constants.TAG_NODE:
6592 name = self.cfg.ExpandNodeName(self.op.name)
6594 raise errors.OpPrereqError("Invalid node name (%s)" %
6597 self.needed_locks[locking.LEVEL_NODE] = name
6598 elif self.op.kind == constants.TAG_INSTANCE:
6599 name = self.cfg.ExpandInstanceName(self.op.name)
6601 raise errors.OpPrereqError("Invalid instance name (%s)" %
6604 self.needed_locks[locking.LEVEL_INSTANCE] = name
6606 def CheckPrereq(self):
6607 """Check prerequisites.
6610 if self.op.kind == constants.TAG_CLUSTER:
6611 self.target = self.cfg.GetClusterInfo()
6612 elif self.op.kind == constants.TAG_NODE:
6613 self.target = self.cfg.GetNodeInfo(self.op.name)
6614 elif self.op.kind == constants.TAG_INSTANCE:
6615 self.target = self.cfg.GetInstanceInfo(self.op.name)
6617 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6621 class LUGetTags(TagsLU):
6622 """Returns the tags of a given object.
6625 _OP_REQP = ["kind", "name"]
6628 def Exec(self, feedback_fn):
6629 """Returns the tag list.
6632 return list(self.target.GetTags())
6635 class LUSearchTags(NoHooksLU):
6636 """Searches the tags for a given pattern.
6639 _OP_REQP = ["pattern"]
6642 def ExpandNames(self):
6643 self.needed_locks = {}
6645 def CheckPrereq(self):
6646 """Check prerequisites.
6648 This checks the pattern passed for validity by compiling it.
6652 self.re = re.compile(self.op.pattern)
6653 except re.error, err:
6654 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6655 (self.op.pattern, err))
6657 def Exec(self, feedback_fn):
6658 """Returns the tag list.
6662 tgts = [("/cluster", cfg.GetClusterInfo())]
6663 ilist = cfg.GetAllInstancesInfo().values()
6664 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6665 nlist = cfg.GetAllNodesInfo().values()
6666 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6668 for path, target in tgts:
6669 for tag in target.GetTags():
6670 if self.re.search(tag):
6671 results.append((path, tag))
6675 class LUAddTags(TagsLU):
6676 """Sets a tag on a given object.
6679 _OP_REQP = ["kind", "name", "tags"]
6682 def CheckPrereq(self):
6683 """Check prerequisites.
6685 This checks the type and length of the tag name and value.
6688 TagsLU.CheckPrereq(self)
6689 for tag in self.op.tags:
6690 objects.TaggableObject.ValidateTag(tag)
6692 def Exec(self, feedback_fn):
6697 for tag in self.op.tags:
6698 self.target.AddTag(tag)
6699 except errors.TagError, err:
6700 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6702 self.cfg.Update(self.target)
6703 except errors.ConfigurationError:
6704 raise errors.OpRetryError("There has been a modification to the"
6705 " config file and the operation has been"
6706 " aborted. Please retry.")
6709 class LUDelTags(TagsLU):
6710 """Delete a list of tags from a given object.
6713 _OP_REQP = ["kind", "name", "tags"]
6716 def CheckPrereq(self):
6717 """Check prerequisites.
6719 This checks that we have the given tag.
6722 TagsLU.CheckPrereq(self)
6723 for tag in self.op.tags:
6724 objects.TaggableObject.ValidateTag(tag)
6725 del_tags = frozenset(self.op.tags)
6726 cur_tags = self.target.GetTags()
6727 if not del_tags <= cur_tags:
6728 diff_tags = del_tags - cur_tags
6729 diff_names = ["'%s'" % tag for tag in diff_tags]
6731 raise errors.OpPrereqError("Tag(s) %s not found" %
6732 (",".join(diff_names)))
6734 def Exec(self, feedback_fn):
6735 """Remove the tag from the object.
6738 for tag in self.op.tags:
6739 self.target.RemoveTag(tag)
6741 self.cfg.Update(self.target)
6742 except errors.ConfigurationError:
6743 raise errors.OpRetryError("There has been a modification to the"
6744 " config file and the operation has been"
6745 " aborted. Please retry.")
6748 class LUTestDelay(NoHooksLU):
6749 """Sleep for a specified amount of time.
6751 This LU sleeps on the master and/or nodes for a specified amount of
6755 _OP_REQP = ["duration", "on_master", "on_nodes"]
6758 def ExpandNames(self):
6759 """Expand names and set required locks.
6761 This expands the node list, if any.
6764 self.needed_locks = {}
6765 if self.op.on_nodes:
6766 # _GetWantedNodes can be used here, but is not always appropriate to use
6767 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6769 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6770 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6772 def CheckPrereq(self):
6773 """Check prerequisites.
6777 def Exec(self, feedback_fn):
6778 """Do the actual sleep.
6781 if self.op.on_master:
6782 if not utils.TestDelay(self.op.duration):
6783 raise errors.OpExecError("Error during master delay test")
6784 if self.op.on_nodes:
6785 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6787 raise errors.OpExecError("Complete failure from rpc call")
6788 for node, node_result in result.items():
6790 if not node_result.data:
6791 raise errors.OpExecError("Failure during rpc call to node %s,"
6792 " result: %s" % (node, node_result.data))
6795 class IAllocator(object):
6796 """IAllocator framework.
6798 An IAllocator instance has three sets of attributes:
6799 - cfg that is needed to query the cluster
6800 - input data (all members of the _KEYS class attribute are required)
6801 - four buffer attributes (in|out_data|text), that represent the
6802 input (to the external script) in text and data structure format,
6803 and the output from it, again in two formats
6804 - the result variables from the script (success, info, nodes) for
6809 "mem_size", "disks", "disk_template",
6810 "os", "tags", "nics", "vcpus", "hypervisor",
6816 def __init__(self, lu, mode, name, **kwargs):
6818 # init buffer variables
6819 self.in_text = self.out_text = self.in_data = self.out_data = None
6820 # init all input fields so that pylint is happy
6823 self.mem_size = self.disks = self.disk_template = None
6824 self.os = self.tags = self.nics = self.vcpus = None
6825 self.hypervisor = None
6826 self.relocate_from = None
6828 self.required_nodes = None
6829 # init result fields
6830 self.success = self.info = self.nodes = None
6831 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6832 keyset = self._ALLO_KEYS
6833 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6834 keyset = self._RELO_KEYS
6836 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6837 " IAllocator" % self.mode)
6839 if key not in keyset:
6840 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6841 " IAllocator" % key)
6842 setattr(self, key, kwargs[key])
6844 if key not in kwargs:
6845 raise errors.ProgrammerError("Missing input parameter '%s' to"
6846 " IAllocator" % key)
6847 self._BuildInputData()
6849 def _ComputeClusterData(self):
6850 """Compute the generic allocator input data.
6852 This is the data that is independent of the actual operation.
6856 cluster_info = cfg.GetClusterInfo()
6859 "version": constants.IALLOCATOR_VERSION,
6860 "cluster_name": cfg.GetClusterName(),
6861 "cluster_tags": list(cluster_info.GetTags()),
6862 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6863 # we don't have job IDs
6865 iinfo = cfg.GetAllInstancesInfo().values()
6866 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6870 node_list = cfg.GetNodeList()
6872 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6873 hypervisor_name = self.hypervisor
6874 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6875 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6877 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6879 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6880 cluster_info.enabled_hypervisors)
6881 for nname, nresult in node_data.items():
6882 # first fill in static (config-based) values
6883 ninfo = cfg.GetNodeInfo(nname)
6885 "tags": list(ninfo.GetTags()),
6886 "primary_ip": ninfo.primary_ip,
6887 "secondary_ip": ninfo.secondary_ip,
6888 "offline": ninfo.offline,
6889 "drained": ninfo.drained,
6890 "master_candidate": ninfo.master_candidate,
6893 if not ninfo.offline:
6895 if not isinstance(nresult.data, dict):
6896 raise errors.OpExecError("Can't get data for node %s" % nname)
6897 remote_info = nresult.data
6898 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6899 'vg_size', 'vg_free', 'cpu_total']:
6900 if attr not in remote_info:
6901 raise errors.OpExecError("Node '%s' didn't return attribute"
6902 " '%s'" % (nname, attr))
6904 remote_info[attr] = int(remote_info[attr])
6905 except ValueError, err:
6906 raise errors.OpExecError("Node '%s' returned invalid value"
6907 " for '%s': %s" % (nname, attr, err))
6908 # compute memory used by primary instances
6909 i_p_mem = i_p_up_mem = 0
6910 for iinfo, beinfo in i_list:
6911 if iinfo.primary_node == nname:
6912 i_p_mem += beinfo[constants.BE_MEMORY]
6913 if iinfo.name not in node_iinfo[nname].data:
6916 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6917 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6918 remote_info['memory_free'] -= max(0, i_mem_diff)
6921 i_p_up_mem += beinfo[constants.BE_MEMORY]
6923 # compute memory used by instances
6925 "total_memory": remote_info['memory_total'],
6926 "reserved_memory": remote_info['memory_dom0'],
6927 "free_memory": remote_info['memory_free'],
6928 "total_disk": remote_info['vg_size'],
6929 "free_disk": remote_info['vg_free'],
6930 "total_cpus": remote_info['cpu_total'],
6931 "i_pri_memory": i_p_mem,
6932 "i_pri_up_memory": i_p_up_mem,
6936 node_results[nname] = pnr
6937 data["nodes"] = node_results
6941 for iinfo, beinfo in i_list:
6943 for nic in iinfo.nics:
6944 filled_params = objects.FillDict(
6945 cluster_info.nicparams[constants.PP_DEFAULT],
6947 nic_dict = {"mac": nic.mac,
6949 "mode": filled_params[constants.NIC_MODE],
6950 "link": filled_params[constants.NIC_LINK],
6952 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6953 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6954 nic_data.append(nic_dict)
6956 "tags": list(iinfo.GetTags()),
6957 "admin_up": iinfo.admin_up,
6958 "vcpus": beinfo[constants.BE_VCPUS],
6959 "memory": beinfo[constants.BE_MEMORY],
6961 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6963 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6964 "disk_template": iinfo.disk_template,
6965 "hypervisor": iinfo.hypervisor,
6967 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6969 instance_data[iinfo.name] = pir
6971 data["instances"] = instance_data
6975 def _AddNewInstance(self):
6976 """Add new instance data to allocator structure.
6978 This in combination with _AllocatorGetClusterData will create the
6979 correct structure needed as input for the allocator.
6981 The checks for the completeness of the opcode must have already been
6987 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6989 if self.disk_template in constants.DTS_NET_MIRROR:
6990 self.required_nodes = 2
6992 self.required_nodes = 1
6996 "disk_template": self.disk_template,
6999 "vcpus": self.vcpus,
7000 "memory": self.mem_size,
7001 "disks": self.disks,
7002 "disk_space_total": disk_space,
7004 "required_nodes": self.required_nodes,
7006 data["request"] = request
7008 def _AddRelocateInstance(self):
7009 """Add relocate instance data to allocator structure.
7011 This in combination with _IAllocatorGetClusterData will create the
7012 correct structure needed as input for the allocator.
7014 The checks for the completeness of the opcode must have already been
7018 instance = self.lu.cfg.GetInstanceInfo(self.name)
7019 if instance is None:
7020 raise errors.ProgrammerError("Unknown instance '%s' passed to"
7021 " IAllocator" % self.name)
7023 if instance.disk_template not in constants.DTS_NET_MIRROR:
7024 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7026 if len(instance.secondary_nodes) != 1:
7027 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7029 self.required_nodes = 1
7030 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7031 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7036 "disk_space_total": disk_space,
7037 "required_nodes": self.required_nodes,
7038 "relocate_from": self.relocate_from,
7040 self.in_data["request"] = request
7042 def _BuildInputData(self):
7043 """Build input data structures.
7046 self._ComputeClusterData()
7048 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7049 self._AddNewInstance()
7051 self._AddRelocateInstance()
7053 self.in_text = serializer.Dump(self.in_data)
7055 def Run(self, name, validate=True, call_fn=None):
7056 """Run an instance allocator and return the results.
7060 call_fn = self.lu.rpc.call_iallocator_runner
7063 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7066 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7067 raise errors.OpExecError("Invalid result from master iallocator runner")
7069 rcode, stdout, stderr, fail = result.data
7071 if rcode == constants.IARUN_NOTFOUND:
7072 raise errors.OpExecError("Can't find allocator '%s'" % name)
7073 elif rcode == constants.IARUN_FAILURE:
7074 raise errors.OpExecError("Instance allocator call failed: %s,"
7075 " output: %s" % (fail, stdout+stderr))
7076 self.out_text = stdout
7078 self._ValidateResult()
7080 def _ValidateResult(self):
7081 """Process the allocator results.
7083 This will process and if successful save the result in
7084 self.out_data and the other parameters.
7088 rdict = serializer.Load(self.out_text)
7089 except Exception, err:
7090 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7092 if not isinstance(rdict, dict):
7093 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7095 for key in "success", "info", "nodes":
7096 if key not in rdict:
7097 raise errors.OpExecError("Can't parse iallocator results:"
7098 " missing key '%s'" % key)
7099 setattr(self, key, rdict[key])
7101 if not isinstance(rdict["nodes"], list):
7102 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7104 self.out_data = rdict
7107 class LUTestAllocator(NoHooksLU):
7108 """Run allocator tests.
7110 This LU runs the allocator tests
7113 _OP_REQP = ["direction", "mode", "name"]
7115 def CheckPrereq(self):
7116 """Check prerequisites.
7118 This checks the opcode parameters depending on the director and mode test.
7121 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7122 for attr in ["name", "mem_size", "disks", "disk_template",
7123 "os", "tags", "nics", "vcpus"]:
7124 if not hasattr(self.op, attr):
7125 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7127 iname = self.cfg.ExpandInstanceName(self.op.name)
7128 if iname is not None:
7129 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7131 if not isinstance(self.op.nics, list):
7132 raise errors.OpPrereqError("Invalid parameter 'nics'")
7133 for row in self.op.nics:
7134 if (not isinstance(row, dict) or
7137 "bridge" not in row):
7138 raise errors.OpPrereqError("Invalid contents of the"
7139 " 'nics' parameter")
7140 if not isinstance(self.op.disks, list):
7141 raise errors.OpPrereqError("Invalid parameter 'disks'")
7142 for row in self.op.disks:
7143 if (not isinstance(row, dict) or
7144 "size" not in row or
7145 not isinstance(row["size"], int) or
7146 "mode" not in row or
7147 row["mode"] not in ['r', 'w']):
7148 raise errors.OpPrereqError("Invalid contents of the"
7149 " 'disks' parameter")
7150 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7151 self.op.hypervisor = self.cfg.GetHypervisorType()
7152 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7153 if not hasattr(self.op, "name"):
7154 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7155 fname = self.cfg.ExpandInstanceName(self.op.name)
7157 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7159 self.op.name = fname
7160 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7162 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7165 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7166 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7167 raise errors.OpPrereqError("Missing allocator name")
7168 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7169 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7172 def Exec(self, feedback_fn):
7173 """Run the allocator test.
7176 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7177 ial = IAllocator(self,
7180 mem_size=self.op.mem_size,
7181 disks=self.op.disks,
7182 disk_template=self.op.disk_template,
7186 vcpus=self.op.vcpus,
7187 hypervisor=self.op.hypervisor,
7190 ial = IAllocator(self,
7193 relocate_from=list(self.relocate_from),
7196 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7197 result = ial.in_text
7199 ial.Run(self.op.allocator, validate=False)
7200 result = ial.out_text