4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 for attr_name in self._OP_REQP:
94 attr_val = getattr(op, attr_name, None)
96 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108 ssh = property(fget=__GetSSH)
110 def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments.
113 This method is for doing a simple syntactic check and ensure
114 validity of opcode parameters, without any cluster-related
115 checks. While the same can be accomplished in ExpandNames and/or
116 CheckPrereq, doing these separate is better because:
118 - ExpandNames is left as as purely a lock-related function
119 - CheckPrereq is run after we have aquired locks (and possible
122 The function is allowed to change the self.op attribute so that
123 later methods can no longer worry about missing parameters.
128 def ExpandNames(self):
129 """Expand names for this LU.
131 This method is called before starting to execute the opcode, and it should
132 update all the parameters of the opcode to their canonical form (e.g. a
133 short node name must be fully expanded after this method has successfully
134 completed). This way locking, hooks, logging, ecc. can work correctly.
136 LUs which implement this method must also populate the self.needed_locks
137 member, as a dict with lock levels as keys, and a list of needed lock names
140 - use an empty dict if you don't need any lock
141 - if you don't need any lock at a particular level omit that level
142 - don't put anything for the BGL level
143 - if you want all locks at a level use locking.ALL_SET as a value
145 If you need to share locks (rather than acquire them exclusively) at one
146 level you can modify self.share_locks, setting a true value (usually 1) for
147 that level. By default locks are not shared.
151 # Acquire all nodes and one instance
152 self.needed_locks = {
153 locking.LEVEL_NODE: locking.ALL_SET,
154 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156 # Acquire just two nodes
157 self.needed_locks = {
158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161 self.needed_locks = {} # No, you can't leave it to the default value None
164 # The implementation of this method is mandatory only if the new LU is
165 # concurrent, so that old LUs don't need to be changed all at the same
168 self.needed_locks = {} # Exclusive LUs don't need locks.
170 raise NotImplementedError
172 def DeclareLocks(self, level):
173 """Declare LU locking needs for a level
175 While most LUs can just declare their locking needs at ExpandNames time,
176 sometimes there's the need to calculate some locks after having acquired
177 the ones before. This function is called just before acquiring locks at a
178 particular level, but after acquiring the ones at lower levels, and permits
179 such calculations. It can be used to modify self.needed_locks, and by
180 default it does nothing.
182 This function is only called if you have something already set in
183 self.needed_locks for the level.
185 @param level: Locking level which is going to be locked
186 @type level: member of ganeti.locking.LEVELS
190 def CheckPrereq(self):
191 """Check prerequisites for this LU.
193 This method should check that the prerequisites for the execution
194 of this LU are fulfilled. It can do internode communication, but
195 it should be idempotent - no cluster or system changes are
198 The method should raise errors.OpPrereqError in case something is
199 not fulfilled. Its return value is ignored.
201 This method should also update all the parameters of the opcode to
202 their canonical form if it hasn't been done by ExpandNames before.
205 raise NotImplementedError
207 def Exec(self, feedback_fn):
210 This method should implement the actual work. It should raise
211 errors.OpExecError for failures that are somewhat dealt with in
215 raise NotImplementedError
217 def BuildHooksEnv(self):
218 """Build hooks environment for this LU.
220 This method should return a three-node tuple consisting of: a dict
221 containing the environment that will be used for running the
222 specific hook for this LU, a list of node names on which the hook
223 should run before the execution, and a list of node names on which
224 the hook should run after the execution.
226 The keys of the dict must not have 'GANETI_' prefixed as this will
227 be handled in the hooks runner. Also note additional keys will be
228 added by the hooks runner. If the LU doesn't define any
229 environment, an empty dict (and not None) should be returned.
231 No nodes should be returned as an empty list (and not None).
233 Note that if the HPATH for a LU class is None, this function will
237 raise NotImplementedError
239 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks.
242 This method is called every time a hooks phase is executed, and notifies
243 the Logical Unit about the hooks' result. The LU can then use it to alter
244 its result based on the hooks. By default the method does nothing and the
245 previous result is passed back unchanged but any LU can define it if it
246 wants to use the local cluster hook-scripts somehow.
248 @param phase: one of L{constants.HOOKS_PHASE_POST} or
249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250 @param hook_results: the results of the multi-node hooks rpc call
251 @param feedback_fn: function used send feedback back to the caller
252 @param lu_result: the previous Exec result this LU had, or None
254 @return: the new Exec result, based on the previous result
260 def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance.
263 Many LUs that work on an instance take its name in self.op.instance_name
264 and need to expand it and then declare the expanded name for locking. This
265 function does it, and then updates self.op.instance_name to the expanded
266 name. It also initializes needed_locks as a dict, if this hasn't been done
270 if self.needed_locks is None:
271 self.needed_locks = {}
273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274 "_ExpandAndLockInstance called with instance-level locks set"
275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276 if expanded_name is None:
277 raise errors.OpPrereqError("Instance '%s' not known" %
278 self.op.instance_name)
279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280 self.op.instance_name = expanded_name
282 def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking.
285 This function should be called after locking one or more instances to lock
286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287 with all primary or secondary nodes for instances already locked and
288 present in self.needed_locks[locking.LEVEL_INSTANCE].
290 It should be called from DeclareLocks, and for safety only works if
291 self.recalculate_locks[locking.LEVEL_NODE] is set.
293 In the future it may grow parameters to just lock some instance's nodes, or
294 to just lock primaries or secondary nodes, if needed.
296 If should be called in DeclareLocks in a way similar to::
298 if level == locking.LEVEL_NODE:
299 self._LockInstancesNodes()
301 @type primary_only: boolean
302 @param primary_only: only lock primary nodes of locked instances
305 assert locking.LEVEL_NODE in self.recalculate_locks, \
306 "_LockInstancesNodes helper function called with no nodes to recalculate"
308 # TODO: check if we're really been called with the instance locks held
310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311 # future we might want to have different behaviors depending on the value
312 # of self.recalculate_locks[locking.LEVEL_NODE]
314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315 instance = self.context.cfg.GetInstanceInfo(instance_name)
316 wanted_nodes.append(instance.primary_node)
318 wanted_nodes.extend(instance.secondary_nodes)
320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325 del self.recalculate_locks[locking.LEVEL_NODE]
328 class NoHooksLU(LogicalUnit):
329 """Simple LU which runs no hooks.
331 This LU is intended as a parent for other LogicalUnits which will
332 run no hooks, in order to reduce duplicate code.
339 def _GetWantedNodes(lu, nodes):
340 """Returns list of checked and expanded node names.
342 @type lu: L{LogicalUnit}
343 @param lu: the logical unit on whose behalf we execute
345 @param nodes: list of node names or None for all nodes
347 @return: the list of nodes, sorted
348 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351 if not isinstance(nodes, list):
352 raise errors.OpPrereqError("Invalid argument type 'nodes'")
355 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356 " non-empty list of nodes whose name is to be expanded.")
360 node = lu.cfg.ExpandNodeName(name)
362 raise errors.OpPrereqError("No such node name '%s'" % name)
365 return utils.NiceSort(wanted)
368 def _GetWantedInstances(lu, instances):
369 """Returns list of checked and expanded instance names.
371 @type lu: L{LogicalUnit}
372 @param lu: the logical unit on whose behalf we execute
373 @type instances: list
374 @param instances: list of instance names or None for all instances
376 @return: the list of instances, sorted
377 @raise errors.OpPrereqError: if the instances parameter is wrong type
378 @raise errors.OpPrereqError: if any of the passed instances is not found
381 if not isinstance(instances, list):
382 raise errors.OpPrereqError("Invalid argument type 'instances'")
387 for name in instances:
388 instance = lu.cfg.ExpandInstanceName(name)
390 raise errors.OpPrereqError("No such instance name '%s'" % name)
391 wanted.append(instance)
394 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398 def _CheckOutputFields(static, dynamic, selected):
399 """Checks whether all selected fields are valid.
401 @type static: L{utils.FieldSet}
402 @param static: static fields set
403 @type dynamic: L{utils.FieldSet}
404 @param dynamic: dynamic fields set
411 delta = f.NonMatching(selected)
413 raise errors.OpPrereqError("Unknown output fields selected: %s"
417 def _CheckBooleanOpField(op, name):
418 """Validates boolean opcode parameters.
420 This will ensure that an opcode parameter is either a boolean value,
421 or None (but that it always exists).
424 val = getattr(op, name, None)
425 if not (val is None or isinstance(val, bool)):
426 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428 setattr(op, name, val)
431 def _CheckNodeOnline(lu, node):
432 """Ensure that a given node is online.
434 @param lu: the LU on behalf of which we make the check
435 @param node: the node to check
436 @raise errors.OpPrereqError: if the node is offline
439 if lu.cfg.GetNodeInfo(node).offline:
440 raise errors.OpPrereqError("Can't use offline node %s" % node)
443 def _CheckNodeNotDrained(lu, node):
444 """Ensure that a given node is not drained.
446 @param lu: the LU on behalf of which we make the check
447 @param node: the node to check
448 @raise errors.OpPrereqError: if the node is drained
451 if lu.cfg.GetNodeInfo(node).drained:
452 raise errors.OpPrereqError("Can't use drained node %s" % node)
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456 memory, vcpus, nics, disk_template, disks):
457 """Builds instance related env variables for hooks
459 This builds the hook environment from individual variables.
462 @param name: the name of the instance
463 @type primary_node: string
464 @param primary_node: the name of the instance's primary node
465 @type secondary_nodes: list
466 @param secondary_nodes: list of secondary nodes as strings
467 @type os_type: string
468 @param os_type: the name of the instance's OS
469 @type status: boolean
470 @param status: the should_run status of the instance
472 @param memory: the memory size of the instance
474 @param vcpus: the count of VCPUs the instance has
476 @param nics: list of tuples (ip, bridge, mac) representing
477 the NICs the instance has
478 @type disk_template: string
479 @param disk_template: the distk template of the instance
481 @param disks: the list of (size, mode) pairs
483 @return: the hook environment for this instance
492 "INSTANCE_NAME": name,
493 "INSTANCE_PRIMARY": primary_node,
494 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495 "INSTANCE_OS_TYPE": os_type,
496 "INSTANCE_STATUS": str_status,
497 "INSTANCE_MEMORY": memory,
498 "INSTANCE_VCPUS": vcpus,
499 "INSTANCE_DISK_TEMPLATE": disk_template,
503 nic_count = len(nics)
504 for idx, (ip, mac, mode, link) in enumerate(nics):
507 env["INSTANCE_NIC%d_IP" % idx] = ip
508 env["INSTANCE_NIC%d_MAC" % idx] = mac
509 env["INSTANCE_NIC%d_MODE" % idx] = mode
510 env["INSTANCE_NIC%d_LINK" % idx] = link
511 if mode == constants.NIC_MODE_BRIDGED:
512 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
516 env["INSTANCE_NIC_COUNT"] = nic_count
519 disk_count = len(disks)
520 for idx, (size, mode) in enumerate(disks):
521 env["INSTANCE_DISK%d_SIZE" % idx] = size
522 env["INSTANCE_DISK%d_MODE" % idx] = mode
526 env["INSTANCE_DISK_COUNT"] = disk_count
530 def _PreBuildNICHooksList(lu, nics):
531 """Build a list of nic information tuples.
533 This list is suitable to be passed to _BuildInstanceHookEnv.
535 @type lu: L{LogicalUnit}
536 @param lu: the logical unit on whose behalf we execute
537 @type nics: list of L{objects.NIC}
538 @param nics: list of nics to convert to hooks tuples
542 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
546 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547 mode = filled_params[constants.NIC_MODE]
548 link = filled_params[constants.NIC_LINK]
549 hooks_nics.append((ip, mac, mode, link))
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553 """Builds instance related env variables for hooks from an object.
555 @type lu: L{LogicalUnit}
556 @param lu: the logical unit on whose behalf we execute
557 @type instance: L{objects.Instance}
558 @param instance: the instance for which we should build the
561 @param override: dictionary with key/values that will override
564 @return: the hook environment dictionary
567 bep = lu.cfg.GetClusterInfo().FillBE(instance)
569 'name': instance.name,
570 'primary_node': instance.primary_node,
571 'secondary_nodes': instance.secondary_nodes,
572 'os_type': instance.os,
573 'status': instance.admin_up,
574 'memory': bep[constants.BE_MEMORY],
575 'vcpus': bep[constants.BE_VCPUS],
576 'nics': _PreBuildNICHooksList(lu, instance.nics),
577 'disk_template': instance.disk_template,
578 'disks': [(disk.size, disk.mode) for disk in instance.disks],
581 args.update(override)
582 return _BuildInstanceHookEnv(**args)
585 def _AdjustCandidatePool(lu):
586 """Adjust the candidate pool after node operations.
589 mod_list = lu.cfg.MaintainCandidatePool()
591 lu.LogInfo("Promoted nodes to master candidate role: %s",
592 ", ".join(node.name for node in mod_list))
593 for name in mod_list:
594 lu.context.ReaddNode(name)
595 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
597 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602 profile=constants.PP_DEFAULT):
603 """Check that the brigdes needed by a list of nics exist.
606 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608 for nic in target_nics]
609 brlist = [params[constants.NIC_LINK] for params in paramslist
610 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
612 result = lu.rpc.call_bridges_exist(target_node, brlist)
613 msg = result.RemoteFailMsg()
615 raise errors.OpPrereqError("Error checking bridges on destination node"
616 " '%s': %s" % (target_node, msg))
619 def _CheckInstanceBridgesExist(lu, instance, node=None):
620 """Check that the brigdes needed by an instance exist.
624 node=instance.primary_node
625 _CheckNicsBridgesExist(lu, instance.nics, node)
628 class LUDestroyCluster(NoHooksLU):
629 """Logical unit for destroying the cluster.
634 def CheckPrereq(self):
635 """Check prerequisites.
637 This checks whether the cluster is empty.
639 Any errors are signalled by raising errors.OpPrereqError.
642 master = self.cfg.GetMasterNode()
644 nodelist = self.cfg.GetNodeList()
645 if len(nodelist) != 1 or nodelist[0] != master:
646 raise errors.OpPrereqError("There are still %d node(s) in"
647 " this cluster." % (len(nodelist) - 1))
648 instancelist = self.cfg.GetInstanceList()
650 raise errors.OpPrereqError("There are still %d instance(s) in"
651 " this cluster." % len(instancelist))
653 def Exec(self, feedback_fn):
654 """Destroys the cluster.
657 master = self.cfg.GetMasterNode()
658 result = self.rpc.call_node_stop_master(master, False)
659 msg = result.RemoteFailMsg()
661 raise errors.OpExecError("Could not disable the master role: %s" % msg)
662 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
663 utils.CreateBackup(priv_key)
664 utils.CreateBackup(pub_key)
668 class LUVerifyCluster(LogicalUnit):
669 """Verifies the cluster status.
672 HPATH = "cluster-verify"
673 HTYPE = constants.HTYPE_CLUSTER
674 _OP_REQP = ["skip_checks"]
677 def ExpandNames(self):
678 self.needed_locks = {
679 locking.LEVEL_NODE: locking.ALL_SET,
680 locking.LEVEL_INSTANCE: locking.ALL_SET,
682 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
684 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
685 node_result, feedback_fn, master_files,
687 """Run multiple tests against a node.
691 - compares ganeti version
692 - checks vg existance and size > 20G
693 - checks config file checksum
694 - checks ssh to other nodes
696 @type nodeinfo: L{objects.Node}
697 @param nodeinfo: the node to check
698 @param file_list: required list of files
699 @param local_cksum: dictionary of local files and their checksums
700 @param node_result: the results from the node
701 @param feedback_fn: function used to accumulate results
702 @param master_files: list of files that only masters should have
703 @param drbd_map: the useddrbd minors for this node, in
704 form of minor: (instance, must_exist) which correspond to instances
705 and their running status
706 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
711 # main result, node_result should be a non-empty dict
712 if not node_result or not isinstance(node_result, dict):
713 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
716 # compares ganeti version
717 local_version = constants.PROTOCOL_VERSION
718 remote_version = node_result.get('version', None)
719 if not (remote_version and isinstance(remote_version, (list, tuple)) and
720 len(remote_version) == 2):
721 feedback_fn(" - ERROR: connection to %s failed" % (node))
724 if local_version != remote_version[0]:
725 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
726 " node %s %s" % (local_version, node, remote_version[0]))
729 # node seems compatible, we can actually try to look into its results
733 # full package version
734 if constants.RELEASE_VERSION != remote_version[1]:
735 feedback_fn(" - WARNING: software version mismatch: master %s,"
737 (constants.RELEASE_VERSION, node, remote_version[1]))
739 # checks vg existence and size > 20G
740 if vg_name is not None:
741 vglist = node_result.get(constants.NV_VGLIST, None)
743 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
747 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
748 constants.MIN_VG_SIZE)
750 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
753 # checks config file checksum
755 remote_cksum = node_result.get(constants.NV_FILELIST, None)
756 if not isinstance(remote_cksum, dict):
758 feedback_fn(" - ERROR: node hasn't returned file checksum data")
760 for file_name in file_list:
761 node_is_mc = nodeinfo.master_candidate
762 must_have_file = file_name not in master_files
763 if file_name not in remote_cksum:
764 if node_is_mc or must_have_file:
766 feedback_fn(" - ERROR: file '%s' missing" % file_name)
767 elif remote_cksum[file_name] != local_cksum[file_name]:
768 if node_is_mc or must_have_file:
770 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
772 # not candidate and this is not a must-have file
774 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
777 # all good, except non-master/non-must have combination
778 if not node_is_mc and not must_have_file:
779 feedback_fn(" - ERROR: file '%s' should not exist on non master"
780 " candidates" % file_name)
784 if constants.NV_NODELIST not in node_result:
786 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
788 if node_result[constants.NV_NODELIST]:
790 for node in node_result[constants.NV_NODELIST]:
791 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
792 (node, node_result[constants.NV_NODELIST][node]))
794 if constants.NV_NODENETTEST not in node_result:
796 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
798 if node_result[constants.NV_NODENETTEST]:
800 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
802 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
803 (node, node_result[constants.NV_NODENETTEST][node]))
805 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
806 if isinstance(hyp_result, dict):
807 for hv_name, hv_result in hyp_result.iteritems():
808 if hv_result is not None:
809 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
810 (hv_name, hv_result))
812 # check used drbd list
813 if vg_name is not None:
814 used_minors = node_result.get(constants.NV_DRBDLIST, [])
815 if not isinstance(used_minors, (tuple, list)):
816 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
819 for minor, (iname, must_exist) in drbd_map.items():
820 if minor not in used_minors and must_exist:
821 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
822 " not active" % (minor, iname))
824 for minor in used_minors:
825 if minor not in drbd_map:
826 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
832 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
833 node_instance, feedback_fn, n_offline):
834 """Verify an instance.
836 This function checks to see if the required block devices are
837 available on the instance's node.
842 node_current = instanceconfig.primary_node
845 instanceconfig.MapLVsByNode(node_vol_should)
847 for node in node_vol_should:
848 if node in n_offline:
849 # ignore missing volumes on offline nodes
851 for volume in node_vol_should[node]:
852 if node not in node_vol_is or volume not in node_vol_is[node]:
853 feedback_fn(" - ERROR: volume %s missing on node %s" %
857 if instanceconfig.admin_up:
858 if ((node_current not in node_instance or
859 not instance in node_instance[node_current]) and
860 node_current not in n_offline):
861 feedback_fn(" - ERROR: instance %s not running on node %s" %
862 (instance, node_current))
865 for node in node_instance:
866 if (not node == node_current):
867 if instance in node_instance[node]:
868 feedback_fn(" - ERROR: instance %s should not run on node %s" %
874 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
875 """Verify if there are any unknown volumes in the cluster.
877 The .os, .swap and backup volumes are ignored. All other volumes are
883 for node in node_vol_is:
884 for volume in node_vol_is[node]:
885 if node not in node_vol_should or volume not in node_vol_should[node]:
886 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
891 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
892 """Verify the list of running instances.
894 This checks what instances are running but unknown to the cluster.
898 for node in node_instance:
899 for runninginstance in node_instance[node]:
900 if runninginstance not in instancelist:
901 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
902 (runninginstance, node))
906 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
907 """Verify N+1 Memory Resilience.
909 Check that if one single node dies we can still start all the instances it
915 for node, nodeinfo in node_info.iteritems():
916 # This code checks that every node which is now listed as secondary has
917 # enough memory to host all instances it is supposed to should a single
918 # other node in the cluster fail.
919 # FIXME: not ready for failover to an arbitrary node
920 # FIXME: does not support file-backed instances
921 # WARNING: we currently take into account down instances as well as up
922 # ones, considering that even if they're down someone might want to start
923 # them even in the event of a node failure.
924 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
926 for instance in instances:
927 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
928 if bep[constants.BE_AUTO_BALANCE]:
929 needed_mem += bep[constants.BE_MEMORY]
930 if nodeinfo['mfree'] < needed_mem:
931 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
932 " failovers should node %s fail" % (node, prinode))
936 def CheckPrereq(self):
937 """Check prerequisites.
939 Transform the list of checks we're going to skip into a set and check that
940 all its members are valid.
943 self.skip_set = frozenset(self.op.skip_checks)
944 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
945 raise errors.OpPrereqError("Invalid checks to be skipped specified")
947 def BuildHooksEnv(self):
950 Cluster-Verify hooks just rone in the post phase and their failure makes
951 the output be logged in the verify output and the verification to fail.
954 all_nodes = self.cfg.GetNodeList()
956 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
958 for node in self.cfg.GetAllNodesInfo().values():
959 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
961 return env, [], all_nodes
963 def Exec(self, feedback_fn):
964 """Verify integrity of cluster, performing various test on nodes.
968 feedback_fn("* Verifying global settings")
969 for msg in self.cfg.VerifyConfig():
970 feedback_fn(" - ERROR: %s" % msg)
972 vg_name = self.cfg.GetVGName()
973 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
974 nodelist = utils.NiceSort(self.cfg.GetNodeList())
975 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
976 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
977 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
978 for iname in instancelist)
979 i_non_redundant = [] # Non redundant instances
980 i_non_a_balanced = [] # Non auto-balanced instances
981 n_offline = [] # List of offline nodes
982 n_drained = [] # List of nodes being drained
988 # FIXME: verify OS list
990 master_files = [constants.CLUSTER_CONF_FILE]
992 file_names = ssconf.SimpleStore().GetFileList()
993 file_names.append(constants.SSL_CERT_FILE)
994 file_names.append(constants.RAPI_CERT_FILE)
995 file_names.extend(master_files)
997 local_checksums = utils.FingerprintFiles(file_names)
999 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1000 node_verify_param = {
1001 constants.NV_FILELIST: file_names,
1002 constants.NV_NODELIST: [node.name for node in nodeinfo
1003 if not node.offline],
1004 constants.NV_HYPERVISOR: hypervisors,
1005 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1006 node.secondary_ip) for node in nodeinfo
1007 if not node.offline],
1008 constants.NV_INSTANCELIST: hypervisors,
1009 constants.NV_VERSION: None,
1010 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1012 if vg_name is not None:
1013 node_verify_param[constants.NV_VGLIST] = None
1014 node_verify_param[constants.NV_LVLIST] = vg_name
1015 node_verify_param[constants.NV_DRBDLIST] = None
1016 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1017 self.cfg.GetClusterName())
1019 cluster = self.cfg.GetClusterInfo()
1020 master_node = self.cfg.GetMasterNode()
1021 all_drbd_map = self.cfg.ComputeDRBDMap()
1023 for node_i in nodeinfo:
1027 feedback_fn("* Skipping offline node %s" % (node,))
1028 n_offline.append(node)
1031 if node == master_node:
1033 elif node_i.master_candidate:
1034 ntype = "master candidate"
1035 elif node_i.drained:
1037 n_drained.append(node)
1040 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1042 msg = all_nvinfo[node].RemoteFailMsg()
1044 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1048 nresult = all_nvinfo[node].payload
1050 for minor, instance in all_drbd_map[node].items():
1051 if instance not in instanceinfo:
1052 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1054 # ghost instance should not be running, but otherwise we
1055 # don't give double warnings (both ghost instance and
1056 # unallocated minor in use)
1057 node_drbd[minor] = (instance, False)
1059 instance = instanceinfo[instance]
1060 node_drbd[minor] = (instance.name, instance.admin_up)
1061 result = self._VerifyNode(node_i, file_names, local_checksums,
1062 nresult, feedback_fn, master_files,
1066 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1068 node_volume[node] = {}
1069 elif isinstance(lvdata, basestring):
1070 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1071 (node, utils.SafeEncode(lvdata)))
1073 node_volume[node] = {}
1074 elif not isinstance(lvdata, dict):
1075 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1079 node_volume[node] = lvdata
1082 idata = nresult.get(constants.NV_INSTANCELIST, None)
1083 if not isinstance(idata, list):
1084 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1089 node_instance[node] = idata
1092 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093 if not isinstance(nodeinfo, dict):
1094 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1100 "mfree": int(nodeinfo['memory_free']),
1103 # dictionary holding all instances this node is secondary for,
1104 # grouped by their primary node. Each key is a cluster node, and each
1105 # value is a list of instances which have the key as primary and the
1106 # current node as secondary. this is handy to calculate N+1 memory
1107 # availability if you can only failover from a primary to its
1109 "sinst-by-pnode": {},
1111 # FIXME: devise a free space model for file based instances as well
1112 if vg_name is not None:
1113 if (constants.NV_VGLIST not in nresult or
1114 vg_name not in nresult[constants.NV_VGLIST]):
1115 feedback_fn(" - ERROR: node %s didn't return data for the"
1116 " volume group '%s' - it is either missing or broken" %
1120 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121 except (ValueError, KeyError):
1122 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1123 " from node %s" % (node,))
1127 node_vol_should = {}
1129 for instance in instancelist:
1130 feedback_fn("* Verifying instance %s" % instance)
1131 inst_config = instanceinfo[instance]
1132 result = self._VerifyInstance(instance, inst_config, node_volume,
1133 node_instance, feedback_fn, n_offline)
1135 inst_nodes_offline = []
1137 inst_config.MapLVsByNode(node_vol_should)
1139 instance_cfg[instance] = inst_config
1141 pnode = inst_config.primary_node
1142 if pnode in node_info:
1143 node_info[pnode]['pinst'].append(instance)
1144 elif pnode not in n_offline:
1145 feedback_fn(" - ERROR: instance %s, connection to primary node"
1146 " %s failed" % (instance, pnode))
1149 if pnode in n_offline:
1150 inst_nodes_offline.append(pnode)
1152 # If the instance is non-redundant we cannot survive losing its primary
1153 # node, so we are not N+1 compliant. On the other hand we have no disk
1154 # templates with more than one secondary so that situation is not well
1156 # FIXME: does not support file-backed instances
1157 if len(inst_config.secondary_nodes) == 0:
1158 i_non_redundant.append(instance)
1159 elif len(inst_config.secondary_nodes) > 1:
1160 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1163 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164 i_non_a_balanced.append(instance)
1166 for snode in inst_config.secondary_nodes:
1167 if snode in node_info:
1168 node_info[snode]['sinst'].append(instance)
1169 if pnode not in node_info[snode]['sinst-by-pnode']:
1170 node_info[snode]['sinst-by-pnode'][pnode] = []
1171 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172 elif snode not in n_offline:
1173 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1174 " %s failed" % (instance, snode))
1176 if snode in n_offline:
1177 inst_nodes_offline.append(snode)
1179 if inst_nodes_offline:
1180 # warn that the instance lives on offline nodes, and set bad=True
1181 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1182 ", ".join(inst_nodes_offline))
1185 feedback_fn("* Verifying orphan volumes")
1186 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1190 feedback_fn("* Verifying remaining instances")
1191 result = self._VerifyOrphanInstances(instancelist, node_instance,
1195 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196 feedback_fn("* Verifying N+1 Memory redundancy")
1197 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1200 feedback_fn("* Other Notes")
1202 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1203 % len(i_non_redundant))
1205 if i_non_a_balanced:
1206 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1207 % len(i_non_a_balanced))
1210 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1213 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1217 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218 """Analize the post-hooks' result
1220 This method analyses the hook result, handles it, and sends some
1221 nicely-formatted feedback back to the user.
1223 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225 @param hooks_results: the results of the multi-node hooks rpc call
1226 @param feedback_fn: function used send feedback back to the caller
1227 @param lu_result: previous Exec result
1228 @return: the new Exec result, based on the previous result
1232 # We only really run POST phase hooks, and are only interested in
1234 if phase == constants.HOOKS_PHASE_POST:
1235 # Used to change hooks' output to proper indentation
1236 indent_re = re.compile('^', re.M)
1237 feedback_fn("* Hooks Results")
1238 if not hooks_results:
1239 feedback_fn(" - ERROR: general communication failure")
1242 for node_name in hooks_results:
1243 show_node_header = True
1244 res = hooks_results[node_name]
1245 if res.failed or res.data is False or not isinstance(res.data, list):
1247 # no need to warn or set fail return value
1249 feedback_fn(" Communication failure in hooks execution")
1252 for script, hkr, output in res.data:
1253 if hkr == constants.HKR_FAIL:
1254 # The node header is only shown once, if there are
1255 # failing hooks on that node
1256 if show_node_header:
1257 feedback_fn(" Node %s:" % node_name)
1258 show_node_header = False
1259 feedback_fn(" ERROR: Script %s failed, output:" % script)
1260 output = indent_re.sub(' ', output)
1261 feedback_fn("%s" % output)
1267 class LUVerifyDisks(NoHooksLU):
1268 """Verifies the cluster disks status.
1274 def ExpandNames(self):
1275 self.needed_locks = {
1276 locking.LEVEL_NODE: locking.ALL_SET,
1277 locking.LEVEL_INSTANCE: locking.ALL_SET,
1279 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1281 def CheckPrereq(self):
1282 """Check prerequisites.
1284 This has no prerequisites.
1289 def Exec(self, feedback_fn):
1290 """Verify integrity of cluster disks.
1292 @rtype: tuple of three items
1293 @return: a tuple of (dict of node-to-node_error, list of instances
1294 which need activate-disks, dict of instance: (node, volume) for
1298 result = res_nodes, res_instances, res_missing = {}, [], {}
1300 vg_name = self.cfg.GetVGName()
1301 nodes = utils.NiceSort(self.cfg.GetNodeList())
1302 instances = [self.cfg.GetInstanceInfo(name)
1303 for name in self.cfg.GetInstanceList()]
1306 for inst in instances:
1308 if (not inst.admin_up or
1309 inst.disk_template not in constants.DTS_NET_MIRROR):
1311 inst.MapLVsByNode(inst_lvs)
1312 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1313 for node, vol_list in inst_lvs.iteritems():
1314 for vol in vol_list:
1315 nv_dict[(node, vol)] = inst
1320 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1325 node_res = node_lvs[node]
1326 if node_res.offline:
1328 msg = node_res.RemoteFailMsg()
1330 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1331 res_nodes[node] = msg
1334 lvs = node_res.payload
1335 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1336 inst = nv_dict.pop((node, lv_name), None)
1337 if (not lv_online and inst is not None
1338 and inst.name not in res_instances):
1339 res_instances.append(inst.name)
1341 # any leftover items in nv_dict are missing LVs, let's arrange the
1343 for key, inst in nv_dict.iteritems():
1344 if inst.name not in res_missing:
1345 res_missing[inst.name] = []
1346 res_missing[inst.name].append(key)
1351 class LURenameCluster(LogicalUnit):
1352 """Rename the cluster.
1355 HPATH = "cluster-rename"
1356 HTYPE = constants.HTYPE_CLUSTER
1359 def BuildHooksEnv(self):
1364 "OP_TARGET": self.cfg.GetClusterName(),
1365 "NEW_NAME": self.op.name,
1367 mn = self.cfg.GetMasterNode()
1368 return env, [mn], [mn]
1370 def CheckPrereq(self):
1371 """Verify that the passed name is a valid one.
1374 hostname = utils.HostInfo(self.op.name)
1376 new_name = hostname.name
1377 self.ip = new_ip = hostname.ip
1378 old_name = self.cfg.GetClusterName()
1379 old_ip = self.cfg.GetMasterIP()
1380 if new_name == old_name and new_ip == old_ip:
1381 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1382 " cluster has changed")
1383 if new_ip != old_ip:
1384 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1385 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1386 " reachable on the network. Aborting." %
1389 self.op.name = new_name
1391 def Exec(self, feedback_fn):
1392 """Rename the cluster.
1395 clustername = self.op.name
1398 # shutdown the master IP
1399 master = self.cfg.GetMasterNode()
1400 result = self.rpc.call_node_stop_master(master, False)
1401 msg = result.RemoteFailMsg()
1403 raise errors.OpExecError("Could not disable the master role: %s" % msg)
1406 cluster = self.cfg.GetClusterInfo()
1407 cluster.cluster_name = clustername
1408 cluster.master_ip = ip
1409 self.cfg.Update(cluster)
1411 # update the known hosts file
1412 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1413 node_list = self.cfg.GetNodeList()
1415 node_list.remove(master)
1418 result = self.rpc.call_upload_file(node_list,
1419 constants.SSH_KNOWN_HOSTS_FILE)
1420 for to_node, to_result in result.iteritems():
1421 msg = to_result.RemoteFailMsg()
1423 msg = ("Copy of file %s to node %s failed: %s" %
1424 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1425 self.proc.LogWarning(msg)
1428 result = self.rpc.call_node_start_master(master, False)
1429 msg = result.RemoteFailMsg()
1431 self.LogWarning("Could not re-enable the master role on"
1432 " the master, please restart manually: %s", msg)
1435 def _RecursiveCheckIfLVMBased(disk):
1436 """Check if the given disk or its children are lvm-based.
1438 @type disk: L{objects.Disk}
1439 @param disk: the disk to check
1441 @return: boolean indicating whether a LD_LV dev_type was found or not
1445 for chdisk in disk.children:
1446 if _RecursiveCheckIfLVMBased(chdisk):
1448 return disk.dev_type == constants.LD_LV
1451 class LUSetClusterParams(LogicalUnit):
1452 """Change the parameters of the cluster.
1455 HPATH = "cluster-modify"
1456 HTYPE = constants.HTYPE_CLUSTER
1460 def CheckArguments(self):
1464 if not hasattr(self.op, "candidate_pool_size"):
1465 self.op.candidate_pool_size = None
1466 if self.op.candidate_pool_size is not None:
1468 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1469 except (ValueError, TypeError), err:
1470 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1472 if self.op.candidate_pool_size < 1:
1473 raise errors.OpPrereqError("At least one master candidate needed")
1475 def ExpandNames(self):
1476 # FIXME: in the future maybe other cluster params won't require checking on
1477 # all nodes to be modified.
1478 self.needed_locks = {
1479 locking.LEVEL_NODE: locking.ALL_SET,
1481 self.share_locks[locking.LEVEL_NODE] = 1
1483 def BuildHooksEnv(self):
1488 "OP_TARGET": self.cfg.GetClusterName(),
1489 "NEW_VG_NAME": self.op.vg_name,
1491 mn = self.cfg.GetMasterNode()
1492 return env, [mn], [mn]
1494 def CheckPrereq(self):
1495 """Check prerequisites.
1497 This checks whether the given params don't conflict and
1498 if the given volume group is valid.
1501 if self.op.vg_name is not None and not self.op.vg_name:
1502 instances = self.cfg.GetAllInstancesInfo().values()
1503 for inst in instances:
1504 for disk in inst.disks:
1505 if _RecursiveCheckIfLVMBased(disk):
1506 raise errors.OpPrereqError("Cannot disable lvm storage while"
1507 " lvm-based instances exist")
1509 node_list = self.acquired_locks[locking.LEVEL_NODE]
1511 # if vg_name not None, checks given volume group on all nodes
1513 vglist = self.rpc.call_vg_list(node_list)
1514 for node in node_list:
1515 msg = vglist[node].RemoteFailMsg()
1517 # ignoring down node
1518 self.LogWarning("Error while gathering data on node %s"
1519 " (ignoring node): %s", node, msg)
1521 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1523 constants.MIN_VG_SIZE)
1525 raise errors.OpPrereqError("Error on node '%s': %s" %
1528 self.cluster = cluster = self.cfg.GetClusterInfo()
1529 # validate params changes
1530 if self.op.beparams:
1531 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1532 self.new_beparams = objects.FillDict(
1533 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1535 if self.op.nicparams:
1536 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1537 self.new_nicparams = objects.FillDict(
1538 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1539 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1541 # hypervisor list/parameters
1542 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1543 if self.op.hvparams:
1544 if not isinstance(self.op.hvparams, dict):
1545 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1546 for hv_name, hv_dict in self.op.hvparams.items():
1547 if hv_name not in self.new_hvparams:
1548 self.new_hvparams[hv_name] = hv_dict
1550 self.new_hvparams[hv_name].update(hv_dict)
1552 if self.op.enabled_hypervisors is not None:
1553 self.hv_list = self.op.enabled_hypervisors
1555 self.hv_list = cluster.enabled_hypervisors
1557 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1558 # either the enabled list has changed, or the parameters have, validate
1559 for hv_name, hv_params in self.new_hvparams.items():
1560 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1561 (self.op.enabled_hypervisors and
1562 hv_name in self.op.enabled_hypervisors)):
1563 # either this is a new hypervisor, or its parameters have changed
1564 hv_class = hypervisor.GetHypervisor(hv_name)
1565 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1566 hv_class.CheckParameterSyntax(hv_params)
1567 _CheckHVParams(self, node_list, hv_name, hv_params)
1569 def Exec(self, feedback_fn):
1570 """Change the parameters of the cluster.
1573 if self.op.vg_name is not None:
1574 new_volume = self.op.vg_name
1577 if new_volume != self.cfg.GetVGName():
1578 self.cfg.SetVGName(new_volume)
1580 feedback_fn("Cluster LVM configuration already in desired"
1581 " state, not changing")
1582 if self.op.hvparams:
1583 self.cluster.hvparams = self.new_hvparams
1584 if self.op.enabled_hypervisors is not None:
1585 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1586 if self.op.beparams:
1587 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1588 if self.op.nicparams:
1589 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1591 if self.op.candidate_pool_size is not None:
1592 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1594 self.cfg.Update(self.cluster)
1596 # we want to update nodes after the cluster so that if any errors
1597 # happen, we have recorded and saved the cluster info
1598 if self.op.candidate_pool_size is not None:
1599 _AdjustCandidatePool(self)
1602 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1603 """Distribute additional files which are part of the cluster configuration.
1605 ConfigWriter takes care of distributing the config and ssconf files, but
1606 there are more files which should be distributed to all nodes. This function
1607 makes sure those are copied.
1609 @param lu: calling logical unit
1610 @param additional_nodes: list of nodes not in the config to distribute to
1613 # 1. Gather target nodes
1614 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1615 dist_nodes = lu.cfg.GetNodeList()
1616 if additional_nodes is not None:
1617 dist_nodes.extend(additional_nodes)
1618 if myself.name in dist_nodes:
1619 dist_nodes.remove(myself.name)
1620 # 2. Gather files to distribute
1621 dist_files = set([constants.ETC_HOSTS,
1622 constants.SSH_KNOWN_HOSTS_FILE,
1623 constants.RAPI_CERT_FILE,
1624 constants.RAPI_USERS_FILE,
1627 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1628 for hv_name in enabled_hypervisors:
1629 hv_class = hypervisor.GetHypervisor(hv_name)
1630 dist_files.update(hv_class.GetAncillaryFiles())
1632 # 3. Perform the files upload
1633 for fname in dist_files:
1634 if os.path.exists(fname):
1635 result = lu.rpc.call_upload_file(dist_nodes, fname)
1636 for to_node, to_result in result.items():
1637 msg = to_result.RemoteFailMsg()
1639 msg = ("Copy of file %s to node %s failed: %s" %
1640 (fname, to_node, msg))
1641 lu.proc.LogWarning(msg)
1644 class LURedistributeConfig(NoHooksLU):
1645 """Force the redistribution of cluster configuration.
1647 This is a very simple LU.
1653 def ExpandNames(self):
1654 self.needed_locks = {
1655 locking.LEVEL_NODE: locking.ALL_SET,
1657 self.share_locks[locking.LEVEL_NODE] = 1
1659 def CheckPrereq(self):
1660 """Check prerequisites.
1664 def Exec(self, feedback_fn):
1665 """Redistribute the configuration.
1668 self.cfg.Update(self.cfg.GetClusterInfo())
1669 _RedistributeAncillaryFiles(self)
1672 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1673 """Sleep and poll for an instance's disk to sync.
1676 if not instance.disks:
1680 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1682 node = instance.primary_node
1684 for dev in instance.disks:
1685 lu.cfg.SetDiskID(dev, node)
1691 cumul_degraded = False
1692 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1693 msg = rstats.RemoteFailMsg()
1695 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1698 raise errors.RemoteError("Can't contact node %s for mirror data,"
1699 " aborting." % node)
1702 rstats = rstats.payload
1704 for i, mstat in enumerate(rstats):
1706 lu.LogWarning("Can't compute data for node %s/%s",
1707 node, instance.disks[i].iv_name)
1709 # we ignore the ldisk parameter
1710 perc_done, est_time, is_degraded, _ = mstat
1711 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1712 if perc_done is not None:
1714 if est_time is not None:
1715 rem_time = "%d estimated seconds remaining" % est_time
1718 rem_time = "no time estimate"
1719 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1720 (instance.disks[i].iv_name, perc_done, rem_time))
1724 time.sleep(min(60, max_time))
1727 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1728 return not cumul_degraded
1731 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1732 """Check that mirrors are not degraded.
1734 The ldisk parameter, if True, will change the test from the
1735 is_degraded attribute (which represents overall non-ok status for
1736 the device(s)) to the ldisk (representing the local storage status).
1739 lu.cfg.SetDiskID(dev, node)
1746 if on_primary or dev.AssembleOnSecondary():
1747 rstats = lu.rpc.call_blockdev_find(node, dev)
1748 msg = rstats.RemoteFailMsg()
1750 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1752 elif not rstats.payload:
1753 lu.LogWarning("Can't find disk on node %s", node)
1756 result = result and (not rstats.payload[idx])
1758 for child in dev.children:
1759 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1764 class LUDiagnoseOS(NoHooksLU):
1765 """Logical unit for OS diagnose/query.
1768 _OP_REQP = ["output_fields", "names"]
1770 _FIELDS_STATIC = utils.FieldSet()
1771 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1773 def ExpandNames(self):
1775 raise errors.OpPrereqError("Selective OS query not supported")
1777 _CheckOutputFields(static=self._FIELDS_STATIC,
1778 dynamic=self._FIELDS_DYNAMIC,
1779 selected=self.op.output_fields)
1781 # Lock all nodes, in shared mode
1782 # Temporary removal of locks, should be reverted later
1783 # TODO: reintroduce locks when they are lighter-weight
1784 self.needed_locks = {}
1785 #self.share_locks[locking.LEVEL_NODE] = 1
1786 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1788 def CheckPrereq(self):
1789 """Check prerequisites.
1794 def _DiagnoseByOS(node_list, rlist):
1795 """Remaps a per-node return list into an a per-os per-node dictionary
1797 @param node_list: a list with the names of all nodes
1798 @param rlist: a map with node names as keys and OS objects as values
1801 @return: a dictionary with osnames as keys and as value another map, with
1802 nodes as keys and list of OS objects as values, eg::
1804 {"debian-etch": {"node1": [<object>,...],
1805 "node2": [<object>,]}
1810 # we build here the list of nodes that didn't fail the RPC (at RPC
1811 # level), so that nodes with a non-responding node daemon don't
1812 # make all OSes invalid
1813 good_nodes = [node_name for node_name in rlist
1814 if not rlist[node_name].RemoteFailMsg()]
1815 for node_name, nr in rlist.items():
1816 if nr.RemoteFailMsg() or not nr.payload:
1818 for os_serialized in nr.payload:
1819 os_obj = objects.OS.FromDict(os_serialized)
1820 if os_obj.name not in all_os:
1821 # build a list of nodes for this os containing empty lists
1822 # for each node in node_list
1823 all_os[os_obj.name] = {}
1824 for nname in good_nodes:
1825 all_os[os_obj.name][nname] = []
1826 all_os[os_obj.name][node_name].append(os_obj)
1829 def Exec(self, feedback_fn):
1830 """Compute the list of OSes.
1833 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1834 node_data = self.rpc.call_os_diagnose(valid_nodes)
1835 pol = self._DiagnoseByOS(valid_nodes, node_data)
1837 for os_name, os_data in pol.items():
1839 for field in self.op.output_fields:
1842 elif field == "valid":
1843 val = utils.all([osl and osl[0] for osl in os_data.values()])
1844 elif field == "node_status":
1846 for node_name, nos_list in os_data.iteritems():
1847 val[node_name] = [(v.status, v.path) for v in nos_list]
1849 raise errors.ParameterError(field)
1856 class LURemoveNode(LogicalUnit):
1857 """Logical unit for removing a node.
1860 HPATH = "node-remove"
1861 HTYPE = constants.HTYPE_NODE
1862 _OP_REQP = ["node_name"]
1864 def BuildHooksEnv(self):
1867 This doesn't run on the target node in the pre phase as a failed
1868 node would then be impossible to remove.
1872 "OP_TARGET": self.op.node_name,
1873 "NODE_NAME": self.op.node_name,
1875 all_nodes = self.cfg.GetNodeList()
1876 all_nodes.remove(self.op.node_name)
1877 return env, all_nodes, all_nodes
1879 def CheckPrereq(self):
1880 """Check prerequisites.
1883 - the node exists in the configuration
1884 - it does not have primary or secondary instances
1885 - it's not the master
1887 Any errors are signalled by raising errors.OpPrereqError.
1890 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1892 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1894 instance_list = self.cfg.GetInstanceList()
1896 masternode = self.cfg.GetMasterNode()
1897 if node.name == masternode:
1898 raise errors.OpPrereqError("Node is the master node,"
1899 " you need to failover first.")
1901 for instance_name in instance_list:
1902 instance = self.cfg.GetInstanceInfo(instance_name)
1903 if node.name in instance.all_nodes:
1904 raise errors.OpPrereqError("Instance %s is still running on the node,"
1905 " please remove first." % instance_name)
1906 self.op.node_name = node.name
1909 def Exec(self, feedback_fn):
1910 """Removes the node from the cluster.
1914 logging.info("Stopping the node daemon and removing configs from node %s",
1917 self.context.RemoveNode(node.name)
1919 result = self.rpc.call_node_leave_cluster(node.name)
1920 msg = result.RemoteFailMsg()
1922 self.LogWarning("Errors encountered on the remote node while leaving"
1923 " the cluster: %s", msg)
1925 # Promote nodes to master candidate as needed
1926 _AdjustCandidatePool(self)
1929 class LUQueryNodes(NoHooksLU):
1930 """Logical unit for querying nodes.
1933 _OP_REQP = ["output_fields", "names", "use_locking"]
1935 _FIELDS_DYNAMIC = utils.FieldSet(
1937 "mtotal", "mnode", "mfree",
1939 "ctotal", "cnodes", "csockets",
1942 _FIELDS_STATIC = utils.FieldSet(
1943 "name", "pinst_cnt", "sinst_cnt",
1944 "pinst_list", "sinst_list",
1945 "pip", "sip", "tags",
1953 def ExpandNames(self):
1954 _CheckOutputFields(static=self._FIELDS_STATIC,
1955 dynamic=self._FIELDS_DYNAMIC,
1956 selected=self.op.output_fields)
1958 self.needed_locks = {}
1959 self.share_locks[locking.LEVEL_NODE] = 1
1962 self.wanted = _GetWantedNodes(self, self.op.names)
1964 self.wanted = locking.ALL_SET
1966 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1967 self.do_locking = self.do_node_query and self.op.use_locking
1969 # if we don't request only static fields, we need to lock the nodes
1970 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1973 def CheckPrereq(self):
1974 """Check prerequisites.
1977 # The validation of the node list is done in the _GetWantedNodes,
1978 # if non empty, and if empty, there's no validation to do
1981 def Exec(self, feedback_fn):
1982 """Computes the list of nodes and their attributes.
1985 all_info = self.cfg.GetAllNodesInfo()
1987 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1988 elif self.wanted != locking.ALL_SET:
1989 nodenames = self.wanted
1990 missing = set(nodenames).difference(all_info.keys())
1992 raise errors.OpExecError(
1993 "Some nodes were removed before retrieving their data: %s" % missing)
1995 nodenames = all_info.keys()
1997 nodenames = utils.NiceSort(nodenames)
1998 nodelist = [all_info[name] for name in nodenames]
2000 # begin data gathering
2002 if self.do_node_query:
2004 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2005 self.cfg.GetHypervisorType())
2006 for name in nodenames:
2007 nodeinfo = node_data[name]
2008 if not nodeinfo.RemoteFailMsg() and nodeinfo.payload:
2009 nodeinfo = nodeinfo.payload
2010 fn = utils.TryConvert
2012 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2013 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2014 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2015 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2016 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2017 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2018 "bootid": nodeinfo.get('bootid', None),
2019 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2020 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2023 live_data[name] = {}
2025 live_data = dict.fromkeys(nodenames, {})
2027 node_to_primary = dict([(name, set()) for name in nodenames])
2028 node_to_secondary = dict([(name, set()) for name in nodenames])
2030 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2031 "sinst_cnt", "sinst_list"))
2032 if inst_fields & frozenset(self.op.output_fields):
2033 instancelist = self.cfg.GetInstanceList()
2035 for instance_name in instancelist:
2036 inst = self.cfg.GetInstanceInfo(instance_name)
2037 if inst.primary_node in node_to_primary:
2038 node_to_primary[inst.primary_node].add(inst.name)
2039 for secnode in inst.secondary_nodes:
2040 if secnode in node_to_secondary:
2041 node_to_secondary[secnode].add(inst.name)
2043 master_node = self.cfg.GetMasterNode()
2045 # end data gathering
2048 for node in nodelist:
2050 for field in self.op.output_fields:
2053 elif field == "pinst_list":
2054 val = list(node_to_primary[node.name])
2055 elif field == "sinst_list":
2056 val = list(node_to_secondary[node.name])
2057 elif field == "pinst_cnt":
2058 val = len(node_to_primary[node.name])
2059 elif field == "sinst_cnt":
2060 val = len(node_to_secondary[node.name])
2061 elif field == "pip":
2062 val = node.primary_ip
2063 elif field == "sip":
2064 val = node.secondary_ip
2065 elif field == "tags":
2066 val = list(node.GetTags())
2067 elif field == "serial_no":
2068 val = node.serial_no
2069 elif field == "master_candidate":
2070 val = node.master_candidate
2071 elif field == "master":
2072 val = node.name == master_node
2073 elif field == "offline":
2075 elif field == "drained":
2077 elif self._FIELDS_DYNAMIC.Matches(field):
2078 val = live_data[node.name].get(field, None)
2080 raise errors.ParameterError(field)
2081 node_output.append(val)
2082 output.append(node_output)
2087 class LUQueryNodeVolumes(NoHooksLU):
2088 """Logical unit for getting volumes on node(s).
2091 _OP_REQP = ["nodes", "output_fields"]
2093 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2094 _FIELDS_STATIC = utils.FieldSet("node")
2096 def ExpandNames(self):
2097 _CheckOutputFields(static=self._FIELDS_STATIC,
2098 dynamic=self._FIELDS_DYNAMIC,
2099 selected=self.op.output_fields)
2101 self.needed_locks = {}
2102 self.share_locks[locking.LEVEL_NODE] = 1
2103 if not self.op.nodes:
2104 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2106 self.needed_locks[locking.LEVEL_NODE] = \
2107 _GetWantedNodes(self, self.op.nodes)
2109 def CheckPrereq(self):
2110 """Check prerequisites.
2112 This checks that the fields required are valid output fields.
2115 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2117 def Exec(self, feedback_fn):
2118 """Computes the list of nodes and their attributes.
2121 nodenames = self.nodes
2122 volumes = self.rpc.call_node_volumes(nodenames)
2124 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2125 in self.cfg.GetInstanceList()]
2127 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2130 for node in nodenames:
2131 nresult = volumes[node]
2134 msg = nresult.RemoteFailMsg()
2136 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2139 node_vols = nresult.payload[:]
2140 node_vols.sort(key=lambda vol: vol['dev'])
2142 for vol in node_vols:
2144 for field in self.op.output_fields:
2147 elif field == "phys":
2151 elif field == "name":
2153 elif field == "size":
2154 val = int(float(vol['size']))
2155 elif field == "instance":
2157 if node not in lv_by_node[inst]:
2159 if vol['name'] in lv_by_node[inst][node]:
2165 raise errors.ParameterError(field)
2166 node_output.append(str(val))
2168 output.append(node_output)
2173 class LUAddNode(LogicalUnit):
2174 """Logical unit for adding node to the cluster.
2178 HTYPE = constants.HTYPE_NODE
2179 _OP_REQP = ["node_name"]
2181 def BuildHooksEnv(self):
2184 This will run on all nodes before, and on all nodes + the new node after.
2188 "OP_TARGET": self.op.node_name,
2189 "NODE_NAME": self.op.node_name,
2190 "NODE_PIP": self.op.primary_ip,
2191 "NODE_SIP": self.op.secondary_ip,
2193 nodes_0 = self.cfg.GetNodeList()
2194 nodes_1 = nodes_0 + [self.op.node_name, ]
2195 return env, nodes_0, nodes_1
2197 def CheckPrereq(self):
2198 """Check prerequisites.
2201 - the new node is not already in the config
2203 - its parameters (single/dual homed) matches the cluster
2205 Any errors are signalled by raising errors.OpPrereqError.
2208 node_name = self.op.node_name
2211 dns_data = utils.HostInfo(node_name)
2213 node = dns_data.name
2214 primary_ip = self.op.primary_ip = dns_data.ip
2215 secondary_ip = getattr(self.op, "secondary_ip", None)
2216 if secondary_ip is None:
2217 secondary_ip = primary_ip
2218 if not utils.IsValidIP(secondary_ip):
2219 raise errors.OpPrereqError("Invalid secondary IP given")
2220 self.op.secondary_ip = secondary_ip
2222 node_list = cfg.GetNodeList()
2223 if not self.op.readd and node in node_list:
2224 raise errors.OpPrereqError("Node %s is already in the configuration" %
2226 elif self.op.readd and node not in node_list:
2227 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2229 for existing_node_name in node_list:
2230 existing_node = cfg.GetNodeInfo(existing_node_name)
2232 if self.op.readd and node == existing_node_name:
2233 if (existing_node.primary_ip != primary_ip or
2234 existing_node.secondary_ip != secondary_ip):
2235 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2236 " address configuration as before")
2239 if (existing_node.primary_ip == primary_ip or
2240 existing_node.secondary_ip == primary_ip or
2241 existing_node.primary_ip == secondary_ip or
2242 existing_node.secondary_ip == secondary_ip):
2243 raise errors.OpPrereqError("New node ip address(es) conflict with"
2244 " existing node %s" % existing_node.name)
2246 # check that the type of the node (single versus dual homed) is the
2247 # same as for the master
2248 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2249 master_singlehomed = myself.secondary_ip == myself.primary_ip
2250 newbie_singlehomed = secondary_ip == primary_ip
2251 if master_singlehomed != newbie_singlehomed:
2252 if master_singlehomed:
2253 raise errors.OpPrereqError("The master has no private ip but the"
2254 " new node has one")
2256 raise errors.OpPrereqError("The master has a private ip but the"
2257 " new node doesn't have one")
2259 # checks reachablity
2260 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2261 raise errors.OpPrereqError("Node not reachable by ping")
2263 if not newbie_singlehomed:
2264 # check reachability from my secondary ip to newbie's secondary ip
2265 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2266 source=myself.secondary_ip):
2267 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2268 " based ping to noded port")
2270 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2271 mc_now, _ = self.cfg.GetMasterCandidateStats()
2272 master_candidate = mc_now < cp_size
2274 self.new_node = objects.Node(name=node,
2275 primary_ip=primary_ip,
2276 secondary_ip=secondary_ip,
2277 master_candidate=master_candidate,
2278 offline=False, drained=False)
2280 def Exec(self, feedback_fn):
2281 """Adds the new node to the cluster.
2284 new_node = self.new_node
2285 node = new_node.name
2287 # check connectivity
2288 result = self.rpc.call_version([node])[node]
2289 msg = result.RemoteFailMsg()
2291 raise errors.OpExecError("Can't get version information from"
2292 " node %s: %s" % (node, msg))
2293 if constants.PROTOCOL_VERSION == result.payload:
2294 logging.info("Communication to node %s fine, sw version %s match",
2295 node, result.payload)
2297 raise errors.OpExecError("Version mismatch master version %s,"
2298 " node version %s" %
2299 (constants.PROTOCOL_VERSION, result.payload))
2302 logging.info("Copy ssh key to node %s", node)
2303 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2305 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2306 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2312 keyarray.append(f.read())
2316 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2318 keyarray[3], keyarray[4], keyarray[5])
2320 msg = result.RemoteFailMsg()
2322 raise errors.OpExecError("Cannot transfer ssh keys to the"
2323 " new node: %s" % msg)
2325 # Add node to our /etc/hosts, and add key to known_hosts
2326 if self.cfg.GetClusterInfo().modify_etc_hosts:
2327 utils.AddHostToEtcHosts(new_node.name)
2329 if new_node.secondary_ip != new_node.primary_ip:
2330 result = self.rpc.call_node_has_ip_address(new_node.name,
2331 new_node.secondary_ip)
2332 msg = result.RemoteFailMsg()
2334 raise errors.OpPrereqError("Failure checking secondary ip"
2335 " on node %s: %s" % (new_node.name, msg))
2336 if not result.payload:
2337 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2338 " you gave (%s). Please fix and re-run this"
2339 " command." % new_node.secondary_ip)
2341 node_verify_list = [self.cfg.GetMasterNode()]
2342 node_verify_param = {
2344 # TODO: do a node-net-test as well?
2347 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2348 self.cfg.GetClusterName())
2349 for verifier in node_verify_list:
2350 msg = result[verifier].RemoteFailMsg()
2352 raise errors.OpExecError("Cannot communicate with node %s: %s" %
2354 nl_payload = result[verifier].payload['nodelist']
2356 for failed in nl_payload:
2357 feedback_fn("ssh/hostname verification failed %s -> %s" %
2358 (verifier, nl_payload[failed]))
2359 raise errors.OpExecError("ssh/hostname verification failed.")
2362 _RedistributeAncillaryFiles(self)
2363 self.context.ReaddNode(new_node)
2365 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2366 self.context.AddNode(new_node)
2369 class LUSetNodeParams(LogicalUnit):
2370 """Modifies the parameters of a node.
2373 HPATH = "node-modify"
2374 HTYPE = constants.HTYPE_NODE
2375 _OP_REQP = ["node_name"]
2378 def CheckArguments(self):
2379 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2380 if node_name is None:
2381 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2382 self.op.node_name = node_name
2383 _CheckBooleanOpField(self.op, 'master_candidate')
2384 _CheckBooleanOpField(self.op, 'offline')
2385 _CheckBooleanOpField(self.op, 'drained')
2386 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2387 if all_mods.count(None) == 3:
2388 raise errors.OpPrereqError("Please pass at least one modification")
2389 if all_mods.count(True) > 1:
2390 raise errors.OpPrereqError("Can't set the node into more than one"
2391 " state at the same time")
2393 def ExpandNames(self):
2394 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2396 def BuildHooksEnv(self):
2399 This runs on the master node.
2403 "OP_TARGET": self.op.node_name,
2404 "MASTER_CANDIDATE": str(self.op.master_candidate),
2405 "OFFLINE": str(self.op.offline),
2406 "DRAINED": str(self.op.drained),
2408 nl = [self.cfg.GetMasterNode(),
2412 def CheckPrereq(self):
2413 """Check prerequisites.
2415 This only checks the instance list against the existing names.
2418 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2420 if ((self.op.master_candidate == False or self.op.offline == True or
2421 self.op.drained == True) and node.master_candidate):
2422 # we will demote the node from master_candidate
2423 if self.op.node_name == self.cfg.GetMasterNode():
2424 raise errors.OpPrereqError("The master node has to be a"
2425 " master candidate, online and not drained")
2426 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2427 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2428 if num_candidates <= cp_size:
2429 msg = ("Not enough master candidates (desired"
2430 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2432 self.LogWarning(msg)
2434 raise errors.OpPrereqError(msg)
2436 if (self.op.master_candidate == True and
2437 ((node.offline and not self.op.offline == False) or
2438 (node.drained and not self.op.drained == False))):
2439 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2440 " to master_candidate" % node.name)
2444 def Exec(self, feedback_fn):
2453 if self.op.offline is not None:
2454 node.offline = self.op.offline
2455 result.append(("offline", str(self.op.offline)))
2456 if self.op.offline == True:
2457 if node.master_candidate:
2458 node.master_candidate = False
2460 result.append(("master_candidate", "auto-demotion due to offline"))
2462 node.drained = False
2463 result.append(("drained", "clear drained status due to offline"))
2465 if self.op.master_candidate is not None:
2466 node.master_candidate = self.op.master_candidate
2468 result.append(("master_candidate", str(self.op.master_candidate)))
2469 if self.op.master_candidate == False:
2470 rrc = self.rpc.call_node_demote_from_mc(node.name)
2471 msg = rrc.RemoteFailMsg()
2473 self.LogWarning("Node failed to demote itself: %s" % msg)
2475 if self.op.drained is not None:
2476 node.drained = self.op.drained
2477 result.append(("drained", str(self.op.drained)))
2478 if self.op.drained == True:
2479 if node.master_candidate:
2480 node.master_candidate = False
2482 result.append(("master_candidate", "auto-demotion due to drain"))
2484 node.offline = False
2485 result.append(("offline", "clear offline status due to drain"))
2487 # this will trigger configuration file update, if needed
2488 self.cfg.Update(node)
2489 # this will trigger job queue propagation or cleanup
2491 self.context.ReaddNode(node)
2496 class LUPowercycleNode(NoHooksLU):
2497 """Powercycles a node.
2500 _OP_REQP = ["node_name", "force"]
2503 def CheckArguments(self):
2504 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2505 if node_name is None:
2506 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2507 self.op.node_name = node_name
2508 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2509 raise errors.OpPrereqError("The node is the master and the force"
2510 " parameter was not set")
2512 def ExpandNames(self):
2513 """Locking for PowercycleNode.
2515 This is a last-resource option and shouldn't block on other
2516 jobs. Therefore, we grab no locks.
2519 self.needed_locks = {}
2521 def CheckPrereq(self):
2522 """Check prerequisites.
2524 This LU has no prereqs.
2529 def Exec(self, feedback_fn):
2533 result = self.rpc.call_node_powercycle(self.op.node_name,
2534 self.cfg.GetHypervisorType())
2535 msg = result.RemoteFailMsg()
2537 raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2538 return result.payload
2541 class LUQueryClusterInfo(NoHooksLU):
2542 """Query cluster configuration.
2548 def ExpandNames(self):
2549 self.needed_locks = {}
2551 def CheckPrereq(self):
2552 """No prerequsites needed for this LU.
2557 def Exec(self, feedback_fn):
2558 """Return cluster config.
2561 cluster = self.cfg.GetClusterInfo()
2563 "software_version": constants.RELEASE_VERSION,
2564 "protocol_version": constants.PROTOCOL_VERSION,
2565 "config_version": constants.CONFIG_VERSION,
2566 "os_api_version": constants.OS_API_VERSION,
2567 "export_version": constants.EXPORT_VERSION,
2568 "architecture": (platform.architecture()[0], platform.machine()),
2569 "name": cluster.cluster_name,
2570 "master": cluster.master_node,
2571 "default_hypervisor": cluster.default_hypervisor,
2572 "enabled_hypervisors": cluster.enabled_hypervisors,
2573 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2574 for hypervisor in cluster.enabled_hypervisors]),
2575 "beparams": cluster.beparams,
2576 "nicparams": cluster.nicparams,
2577 "candidate_pool_size": cluster.candidate_pool_size,
2578 "master_netdev": cluster.master_netdev,
2579 "volume_group_name": cluster.volume_group_name,
2580 "file_storage_dir": cluster.file_storage_dir,
2586 class LUQueryConfigValues(NoHooksLU):
2587 """Return configuration values.
2592 _FIELDS_DYNAMIC = utils.FieldSet()
2593 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2595 def ExpandNames(self):
2596 self.needed_locks = {}
2598 _CheckOutputFields(static=self._FIELDS_STATIC,
2599 dynamic=self._FIELDS_DYNAMIC,
2600 selected=self.op.output_fields)
2602 def CheckPrereq(self):
2603 """No prerequisites.
2608 def Exec(self, feedback_fn):
2609 """Dump a representation of the cluster config to the standard output.
2613 for field in self.op.output_fields:
2614 if field == "cluster_name":
2615 entry = self.cfg.GetClusterName()
2616 elif field == "master_node":
2617 entry = self.cfg.GetMasterNode()
2618 elif field == "drain_flag":
2619 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2621 raise errors.ParameterError(field)
2622 values.append(entry)
2626 class LUActivateInstanceDisks(NoHooksLU):
2627 """Bring up an instance's disks.
2630 _OP_REQP = ["instance_name"]
2633 def ExpandNames(self):
2634 self._ExpandAndLockInstance()
2635 self.needed_locks[locking.LEVEL_NODE] = []
2636 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2638 def DeclareLocks(self, level):
2639 if level == locking.LEVEL_NODE:
2640 self._LockInstancesNodes()
2642 def CheckPrereq(self):
2643 """Check prerequisites.
2645 This checks that the instance is in the cluster.
2648 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2649 assert self.instance is not None, \
2650 "Cannot retrieve locked instance %s" % self.op.instance_name
2651 _CheckNodeOnline(self, self.instance.primary_node)
2653 def Exec(self, feedback_fn):
2654 """Activate the disks.
2657 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2659 raise errors.OpExecError("Cannot activate block devices")
2664 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2665 """Prepare the block devices for an instance.
2667 This sets up the block devices on all nodes.
2669 @type lu: L{LogicalUnit}
2670 @param lu: the logical unit on whose behalf we execute
2671 @type instance: L{objects.Instance}
2672 @param instance: the instance for whose disks we assemble
2673 @type ignore_secondaries: boolean
2674 @param ignore_secondaries: if true, errors on secondary nodes
2675 won't result in an error return from the function
2676 @return: False if the operation failed, otherwise a list of
2677 (host, instance_visible_name, node_visible_name)
2678 with the mapping from node devices to instance devices
2683 iname = instance.name
2684 # With the two passes mechanism we try to reduce the window of
2685 # opportunity for the race condition of switching DRBD to primary
2686 # before handshaking occured, but we do not eliminate it
2688 # The proper fix would be to wait (with some limits) until the
2689 # connection has been made and drbd transitions from WFConnection
2690 # into any other network-connected state (Connected, SyncTarget,
2693 # 1st pass, assemble on all nodes in secondary mode
2694 for inst_disk in instance.disks:
2695 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2696 lu.cfg.SetDiskID(node_disk, node)
2697 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2698 msg = result.RemoteFailMsg()
2700 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2701 " (is_primary=False, pass=1): %s",
2702 inst_disk.iv_name, node, msg)
2703 if not ignore_secondaries:
2706 # FIXME: race condition on drbd migration to primary
2708 # 2nd pass, do only the primary node
2709 for inst_disk in instance.disks:
2710 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2711 if node != instance.primary_node:
2713 lu.cfg.SetDiskID(node_disk, node)
2714 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2715 msg = result.RemoteFailMsg()
2717 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2718 " (is_primary=True, pass=2): %s",
2719 inst_disk.iv_name, node, msg)
2721 device_info.append((instance.primary_node, inst_disk.iv_name,
2724 # leave the disks configured for the primary node
2725 # this is a workaround that would be fixed better by
2726 # improving the logical/physical id handling
2727 for disk in instance.disks:
2728 lu.cfg.SetDiskID(disk, instance.primary_node)
2730 return disks_ok, device_info
2733 def _StartInstanceDisks(lu, instance, force):
2734 """Start the disks of an instance.
2737 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2738 ignore_secondaries=force)
2740 _ShutdownInstanceDisks(lu, instance)
2741 if force is not None and not force:
2742 lu.proc.LogWarning("", hint="If the message above refers to a"
2744 " you can retry the operation using '--force'.")
2745 raise errors.OpExecError("Disk consistency error")
2748 class LUDeactivateInstanceDisks(NoHooksLU):
2749 """Shutdown an instance's disks.
2752 _OP_REQP = ["instance_name"]
2755 def ExpandNames(self):
2756 self._ExpandAndLockInstance()
2757 self.needed_locks[locking.LEVEL_NODE] = []
2758 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2760 def DeclareLocks(self, level):
2761 if level == locking.LEVEL_NODE:
2762 self._LockInstancesNodes()
2764 def CheckPrereq(self):
2765 """Check prerequisites.
2767 This checks that the instance is in the cluster.
2770 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2771 assert self.instance is not None, \
2772 "Cannot retrieve locked instance %s" % self.op.instance_name
2774 def Exec(self, feedback_fn):
2775 """Deactivate the disks
2778 instance = self.instance
2779 _SafeShutdownInstanceDisks(self, instance)
2782 def _SafeShutdownInstanceDisks(lu, instance):
2783 """Shutdown block devices of an instance.
2785 This function checks if an instance is running, before calling
2786 _ShutdownInstanceDisks.
2789 pnode = instance.primary_node
2790 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])
2791 ins_l = ins_l[pnode]
2792 msg = ins_l.RemoteFailMsg()
2794 raise errors.OpExecError("Can't contact node %s: %s" % (pnode, msg))
2796 if instance.name in ins_l.payload:
2797 raise errors.OpExecError("Instance is running, can't shutdown"
2800 _ShutdownInstanceDisks(lu, instance)
2803 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2804 """Shutdown block devices of an instance.
2806 This does the shutdown on all nodes of the instance.
2808 If the ignore_primary is false, errors on the primary node are
2813 for disk in instance.disks:
2814 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2815 lu.cfg.SetDiskID(top_disk, node)
2816 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2817 msg = result.RemoteFailMsg()
2819 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2820 disk.iv_name, node, msg)
2821 if not ignore_primary or node != instance.primary_node:
2826 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2827 """Checks if a node has enough free memory.
2829 This function check if a given node has the needed amount of free
2830 memory. In case the node has less memory or we cannot get the
2831 information from the node, this function raise an OpPrereqError
2834 @type lu: C{LogicalUnit}
2835 @param lu: a logical unit from which we get configuration data
2837 @param node: the node to check
2838 @type reason: C{str}
2839 @param reason: string to use in the error message
2840 @type requested: C{int}
2841 @param requested: the amount of memory in MiB to check for
2842 @type hypervisor_name: C{str}
2843 @param hypervisor_name: the hypervisor to ask for memory stats
2844 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2845 we cannot check the node
2848 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2849 msg = nodeinfo[node].RemoteFailMsg()
2851 raise errors.OpPrereqError("Can't get data from node %s: %s" % (node, msg))
2852 free_mem = nodeinfo[node].payload.get('memory_free', None)
2853 if not isinstance(free_mem, int):
2854 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2855 " was '%s'" % (node, free_mem))
2856 if requested > free_mem:
2857 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2858 " needed %s MiB, available %s MiB" %
2859 (node, reason, requested, free_mem))
2862 class LUStartupInstance(LogicalUnit):
2863 """Starts an instance.
2866 HPATH = "instance-start"
2867 HTYPE = constants.HTYPE_INSTANCE
2868 _OP_REQP = ["instance_name", "force"]
2871 def ExpandNames(self):
2872 self._ExpandAndLockInstance()
2874 def BuildHooksEnv(self):
2877 This runs on master, primary and secondary nodes of the instance.
2881 "FORCE": self.op.force,
2883 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2884 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2887 def CheckPrereq(self):
2888 """Check prerequisites.
2890 This checks that the instance is in the cluster.
2893 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2894 assert self.instance is not None, \
2895 "Cannot retrieve locked instance %s" % self.op.instance_name
2898 self.beparams = getattr(self.op, "beparams", {})
2900 if not isinstance(self.beparams, dict):
2901 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2902 " dict" % (type(self.beparams), ))
2903 # fill the beparams dict
2904 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2905 self.op.beparams = self.beparams
2908 self.hvparams = getattr(self.op, "hvparams", {})
2910 if not isinstance(self.hvparams, dict):
2911 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2912 " dict" % (type(self.hvparams), ))
2914 # check hypervisor parameter syntax (locally)
2915 cluster = self.cfg.GetClusterInfo()
2916 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2917 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2919 filled_hvp.update(self.hvparams)
2920 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2921 hv_type.CheckParameterSyntax(filled_hvp)
2922 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2923 self.op.hvparams = self.hvparams
2925 _CheckNodeOnline(self, instance.primary_node)
2927 bep = self.cfg.GetClusterInfo().FillBE(instance)
2928 # check bridges existance
2929 _CheckInstanceBridgesExist(self, instance)
2931 remote_info = self.rpc.call_instance_info(instance.primary_node,
2933 instance.hypervisor)
2934 msg = remote_info.RemoteFailMsg()
2936 raise errors.OpPrereqError("Error checking node %s: %s" %
2937 (instance.primary_node, msg))
2938 if not remote_info.payload: # not running already
2939 _CheckNodeFreeMemory(self, instance.primary_node,
2940 "starting instance %s" % instance.name,
2941 bep[constants.BE_MEMORY], instance.hypervisor)
2943 def Exec(self, feedback_fn):
2944 """Start the instance.
2947 instance = self.instance
2948 force = self.op.force
2950 self.cfg.MarkInstanceUp(instance.name)
2952 node_current = instance.primary_node
2954 _StartInstanceDisks(self, instance, force)
2956 result = self.rpc.call_instance_start(node_current, instance,
2957 self.hvparams, self.beparams)
2958 msg = result.RemoteFailMsg()
2960 _ShutdownInstanceDisks(self, instance)
2961 raise errors.OpExecError("Could not start instance: %s" % msg)
2964 class LURebootInstance(LogicalUnit):
2965 """Reboot an instance.
2968 HPATH = "instance-reboot"
2969 HTYPE = constants.HTYPE_INSTANCE
2970 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2973 def ExpandNames(self):
2974 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2975 constants.INSTANCE_REBOOT_HARD,
2976 constants.INSTANCE_REBOOT_FULL]:
2977 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2978 (constants.INSTANCE_REBOOT_SOFT,
2979 constants.INSTANCE_REBOOT_HARD,
2980 constants.INSTANCE_REBOOT_FULL))
2981 self._ExpandAndLockInstance()
2983 def BuildHooksEnv(self):
2986 This runs on master, primary and secondary nodes of the instance.
2990 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2991 "REBOOT_TYPE": self.op.reboot_type,
2993 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2994 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2997 def CheckPrereq(self):
2998 """Check prerequisites.
3000 This checks that the instance is in the cluster.
3003 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3004 assert self.instance is not None, \
3005 "Cannot retrieve locked instance %s" % self.op.instance_name
3007 _CheckNodeOnline(self, instance.primary_node)
3009 # check bridges existance
3010 _CheckInstanceBridgesExist(self, instance)
3012 def Exec(self, feedback_fn):
3013 """Reboot the instance.
3016 instance = self.instance
3017 ignore_secondaries = self.op.ignore_secondaries
3018 reboot_type = self.op.reboot_type
3020 node_current = instance.primary_node
3022 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3023 constants.INSTANCE_REBOOT_HARD]:
3024 for disk in instance.disks:
3025 self.cfg.SetDiskID(disk, node_current)
3026 result = self.rpc.call_instance_reboot(node_current, instance,
3028 msg = result.RemoteFailMsg()
3030 raise errors.OpExecError("Could not reboot instance: %s" % msg)
3032 result = self.rpc.call_instance_shutdown(node_current, instance)
3033 msg = result.RemoteFailMsg()
3035 raise errors.OpExecError("Could not shutdown instance for"
3036 " full reboot: %s" % msg)
3037 _ShutdownInstanceDisks(self, instance)
3038 _StartInstanceDisks(self, instance, ignore_secondaries)
3039 result = self.rpc.call_instance_start(node_current, instance, None, None)
3040 msg = result.RemoteFailMsg()
3042 _ShutdownInstanceDisks(self, instance)
3043 raise errors.OpExecError("Could not start instance for"
3044 " full reboot: %s" % msg)
3046 self.cfg.MarkInstanceUp(instance.name)
3049 class LUShutdownInstance(LogicalUnit):
3050 """Shutdown an instance.
3053 HPATH = "instance-stop"
3054 HTYPE = constants.HTYPE_INSTANCE
3055 _OP_REQP = ["instance_name"]
3058 def ExpandNames(self):
3059 self._ExpandAndLockInstance()
3061 def BuildHooksEnv(self):
3064 This runs on master, primary and secondary nodes of the instance.
3067 env = _BuildInstanceHookEnvByObject(self, self.instance)
3068 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3071 def CheckPrereq(self):
3072 """Check prerequisites.
3074 This checks that the instance is in the cluster.
3077 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3078 assert self.instance is not None, \
3079 "Cannot retrieve locked instance %s" % self.op.instance_name
3080 _CheckNodeOnline(self, self.instance.primary_node)
3082 def Exec(self, feedback_fn):
3083 """Shutdown the instance.
3086 instance = self.instance
3087 node_current = instance.primary_node
3088 self.cfg.MarkInstanceDown(instance.name)
3089 result = self.rpc.call_instance_shutdown(node_current, instance)
3090 msg = result.RemoteFailMsg()
3092 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3094 _ShutdownInstanceDisks(self, instance)
3097 class LUReinstallInstance(LogicalUnit):
3098 """Reinstall an instance.
3101 HPATH = "instance-reinstall"
3102 HTYPE = constants.HTYPE_INSTANCE
3103 _OP_REQP = ["instance_name"]
3106 def ExpandNames(self):
3107 self._ExpandAndLockInstance()
3109 def BuildHooksEnv(self):
3112 This runs on master, primary and secondary nodes of the instance.
3115 env = _BuildInstanceHookEnvByObject(self, self.instance)
3116 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3119 def CheckPrereq(self):
3120 """Check prerequisites.
3122 This checks that the instance is in the cluster and is not running.
3125 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3126 assert instance is not None, \
3127 "Cannot retrieve locked instance %s" % self.op.instance_name
3128 _CheckNodeOnline(self, instance.primary_node)
3130 if instance.disk_template == constants.DT_DISKLESS:
3131 raise errors.OpPrereqError("Instance '%s' has no disks" %
3132 self.op.instance_name)
3133 if instance.admin_up:
3134 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3135 self.op.instance_name)
3136 remote_info = self.rpc.call_instance_info(instance.primary_node,
3138 instance.hypervisor)
3139 msg = remote_info.RemoteFailMsg()
3141 raise errors.OpPrereqError("Error checking node %s: %s" %
3142 (instance.primary_node, msg))
3143 if remote_info.payload:
3144 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3145 (self.op.instance_name,
3146 instance.primary_node))
3148 self.op.os_type = getattr(self.op, "os_type", None)
3149 if self.op.os_type is not None:
3151 pnode = self.cfg.GetNodeInfo(
3152 self.cfg.ExpandNodeName(instance.primary_node))
3154 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3156 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3158 if not isinstance(result.data, objects.OS):
3159 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3160 " primary node" % self.op.os_type)
3162 self.instance = instance
3164 def Exec(self, feedback_fn):
3165 """Reinstall the instance.
3168 inst = self.instance
3170 if self.op.os_type is not None:
3171 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3172 inst.os = self.op.os_type
3173 self.cfg.Update(inst)
3175 _StartInstanceDisks(self, inst, None)
3177 feedback_fn("Running the instance OS create scripts...")
3178 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3179 msg = result.RemoteFailMsg()
3181 raise errors.OpExecError("Could not install OS for instance %s"
3183 (inst.name, inst.primary_node, msg))
3185 _ShutdownInstanceDisks(self, inst)
3188 class LURenameInstance(LogicalUnit):
3189 """Rename an instance.
3192 HPATH = "instance-rename"
3193 HTYPE = constants.HTYPE_INSTANCE
3194 _OP_REQP = ["instance_name", "new_name"]
3196 def BuildHooksEnv(self):
3199 This runs on master, primary and secondary nodes of the instance.
3202 env = _BuildInstanceHookEnvByObject(self, self.instance)
3203 env["INSTANCE_NEW_NAME"] = self.op.new_name
3204 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3207 def CheckPrereq(self):
3208 """Check prerequisites.
3210 This checks that the instance is in the cluster and is not running.
3213 instance = self.cfg.GetInstanceInfo(
3214 self.cfg.ExpandInstanceName(self.op.instance_name))
3215 if instance is None:
3216 raise errors.OpPrereqError("Instance '%s' not known" %
3217 self.op.instance_name)
3218 _CheckNodeOnline(self, instance.primary_node)
3220 if instance.admin_up:
3221 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3222 self.op.instance_name)
3223 remote_info = self.rpc.call_instance_info(instance.primary_node,
3225 instance.hypervisor)
3226 msg = remote_info.RemoteFailMsg()
3228 raise errors.OpPrereqError("Error checking node %s: %s" %
3229 (instance.primary_node, msg))
3230 if remote_info.payload:
3231 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3232 (self.op.instance_name,
3233 instance.primary_node))
3234 self.instance = instance
3236 # new name verification
3237 name_info = utils.HostInfo(self.op.new_name)
3239 self.op.new_name = new_name = name_info.name
3240 instance_list = self.cfg.GetInstanceList()
3241 if new_name in instance_list:
3242 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3245 if not getattr(self.op, "ignore_ip", False):
3246 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3247 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3248 (name_info.ip, new_name))
3251 def Exec(self, feedback_fn):
3252 """Reinstall the instance.
3255 inst = self.instance
3256 old_name = inst.name
3258 if inst.disk_template == constants.DT_FILE:
3259 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3261 self.cfg.RenameInstance(inst.name, self.op.new_name)
3262 # Change the instance lock. This is definitely safe while we hold the BGL
3263 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3264 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3266 # re-read the instance from the configuration after rename
3267 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3269 if inst.disk_template == constants.DT_FILE:
3270 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3271 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3272 old_file_storage_dir,
3273 new_file_storage_dir)
3276 raise errors.OpExecError("Could not connect to node '%s' to rename"
3277 " directory '%s' to '%s' (but the instance"
3278 " has been renamed in Ganeti)" % (
3279 inst.primary_node, old_file_storage_dir,
3280 new_file_storage_dir))
3282 if not result.data[0]:
3283 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3284 " (but the instance has been renamed in"
3285 " Ganeti)" % (old_file_storage_dir,
3286 new_file_storage_dir))
3288 _StartInstanceDisks(self, inst, None)
3290 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3292 msg = result.RemoteFailMsg()
3294 msg = ("Could not run OS rename script for instance %s on node %s"
3295 " (but the instance has been renamed in Ganeti): %s" %
3296 (inst.name, inst.primary_node, msg))
3297 self.proc.LogWarning(msg)
3299 _ShutdownInstanceDisks(self, inst)
3302 class LURemoveInstance(LogicalUnit):
3303 """Remove an instance.
3306 HPATH = "instance-remove"
3307 HTYPE = constants.HTYPE_INSTANCE
3308 _OP_REQP = ["instance_name", "ignore_failures"]
3311 def ExpandNames(self):
3312 self._ExpandAndLockInstance()
3313 self.needed_locks[locking.LEVEL_NODE] = []
3314 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3316 def DeclareLocks(self, level):
3317 if level == locking.LEVEL_NODE:
3318 self._LockInstancesNodes()
3320 def BuildHooksEnv(self):
3323 This runs on master, primary and secondary nodes of the instance.
3326 env = _BuildInstanceHookEnvByObject(self, self.instance)
3327 nl = [self.cfg.GetMasterNode()]
3330 def CheckPrereq(self):
3331 """Check prerequisites.
3333 This checks that the instance is in the cluster.
3336 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3337 assert self.instance is not None, \
3338 "Cannot retrieve locked instance %s" % self.op.instance_name
3340 def Exec(self, feedback_fn):
3341 """Remove the instance.
3344 instance = self.instance
3345 logging.info("Shutting down instance %s on node %s",
3346 instance.name, instance.primary_node)
3348 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3349 msg = result.RemoteFailMsg()
3351 if self.op.ignore_failures:
3352 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3354 raise errors.OpExecError("Could not shutdown instance %s on"
3356 (instance.name, instance.primary_node, msg))
3358 logging.info("Removing block devices for instance %s", instance.name)
3360 if not _RemoveDisks(self, instance):
3361 if self.op.ignore_failures:
3362 feedback_fn("Warning: can't remove instance's disks")
3364 raise errors.OpExecError("Can't remove instance's disks")
3366 logging.info("Removing instance %s out of cluster config", instance.name)
3368 self.cfg.RemoveInstance(instance.name)
3369 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3372 class LUQueryInstances(NoHooksLU):
3373 """Logical unit for querying instances.
3376 _OP_REQP = ["output_fields", "names", "use_locking"]
3378 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3380 "disk_template", "ip", "mac", "bridge",
3381 "sda_size", "sdb_size", "vcpus", "tags",
3382 "network_port", "beparams",
3383 r"(disk)\.(size)/([0-9]+)",
3384 r"(disk)\.(sizes)", "disk_usage",
3385 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3386 r"(nic)\.(macs|ips|bridges)",
3387 r"(disk|nic)\.(count)",
3388 "serial_no", "hypervisor", "hvparams",] +
3390 for name in constants.HVS_PARAMETERS] +
3392 for name in constants.BES_PARAMETERS])
3393 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3396 def ExpandNames(self):
3397 _CheckOutputFields(static=self._FIELDS_STATIC,
3398 dynamic=self._FIELDS_DYNAMIC,
3399 selected=self.op.output_fields)
3401 self.needed_locks = {}
3402 self.share_locks[locking.LEVEL_INSTANCE] = 1
3403 self.share_locks[locking.LEVEL_NODE] = 1
3406 self.wanted = _GetWantedInstances(self, self.op.names)
3408 self.wanted = locking.ALL_SET
3410 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3411 self.do_locking = self.do_node_query and self.op.use_locking
3413 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3414 self.needed_locks[locking.LEVEL_NODE] = []
3415 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3417 def DeclareLocks(self, level):
3418 if level == locking.LEVEL_NODE and self.do_locking:
3419 self._LockInstancesNodes()
3421 def CheckPrereq(self):
3422 """Check prerequisites.
3427 def Exec(self, feedback_fn):
3428 """Computes the list of nodes and their attributes.
3431 all_info = self.cfg.GetAllInstancesInfo()
3432 if self.wanted == locking.ALL_SET:
3433 # caller didn't specify instance names, so ordering is not important
3435 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3437 instance_names = all_info.keys()
3438 instance_names = utils.NiceSort(instance_names)
3440 # caller did specify names, so we must keep the ordering
3442 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3444 tgt_set = all_info.keys()
3445 missing = set(self.wanted).difference(tgt_set)
3447 raise errors.OpExecError("Some instances were removed before"
3448 " retrieving their data: %s" % missing)
3449 instance_names = self.wanted
3451 instance_list = [all_info[iname] for iname in instance_names]
3453 # begin data gathering
3455 nodes = frozenset([inst.primary_node for inst in instance_list])
3456 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3460 if self.do_node_query:
3462 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3464 result = node_data[name]
3466 # offline nodes will be in both lists
3467 off_nodes.append(name)
3468 if result.failed or result.RemoteFailMsg():
3469 bad_nodes.append(name)
3472 live_data.update(result.payload)
3473 # else no instance is alive
3475 live_data = dict([(name, {}) for name in instance_names])
3477 # end data gathering
3482 for instance in instance_list:
3484 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3485 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3486 for field in self.op.output_fields:
3487 st_match = self._FIELDS_STATIC.Matches(field)
3492 elif field == "pnode":
3493 val = instance.primary_node
3494 elif field == "snodes":
3495 val = list(instance.secondary_nodes)
3496 elif field == "admin_state":
3497 val = instance.admin_up
3498 elif field == "oper_state":
3499 if instance.primary_node in bad_nodes:
3502 val = bool(live_data.get(instance.name))
3503 elif field == "status":
3504 if instance.primary_node in off_nodes:
3505 val = "ERROR_nodeoffline"
3506 elif instance.primary_node in bad_nodes:
3507 val = "ERROR_nodedown"
3509 running = bool(live_data.get(instance.name))
3511 if instance.admin_up:
3516 if instance.admin_up:
3520 elif field == "oper_ram":
3521 if instance.primary_node in bad_nodes:
3523 elif instance.name in live_data:
3524 val = live_data[instance.name].get("memory", "?")
3527 elif field == "disk_template":
3528 val = instance.disk_template
3530 val = instance.nics[0].ip
3531 elif field == "bridge":
3532 val = instance.nics[0].bridge
3533 elif field == "mac":
3534 val = instance.nics[0].mac
3535 elif field == "sda_size" or field == "sdb_size":
3536 idx = ord(field[2]) - ord('a')
3538 val = instance.FindDisk(idx).size
3539 except errors.OpPrereqError:
3541 elif field == "disk_usage": # total disk usage per node
3542 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3543 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3544 elif field == "tags":
3545 val = list(instance.GetTags())
3546 elif field == "serial_no":
3547 val = instance.serial_no
3548 elif field == "network_port":
3549 val = instance.network_port
3550 elif field == "hypervisor":
3551 val = instance.hypervisor
3552 elif field == "hvparams":
3554 elif (field.startswith(HVPREFIX) and
3555 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3556 val = i_hv.get(field[len(HVPREFIX):], None)
3557 elif field == "beparams":
3559 elif (field.startswith(BEPREFIX) and
3560 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3561 val = i_be.get(field[len(BEPREFIX):], None)
3562 elif st_match and st_match.groups():
3563 # matches a variable list
3564 st_groups = st_match.groups()
3565 if st_groups and st_groups[0] == "disk":
3566 if st_groups[1] == "count":
3567 val = len(instance.disks)
3568 elif st_groups[1] == "sizes":
3569 val = [disk.size for disk in instance.disks]
3570 elif st_groups[1] == "size":
3572 val = instance.FindDisk(st_groups[2]).size
3573 except errors.OpPrereqError:
3576 assert False, "Unhandled disk parameter"
3577 elif st_groups[0] == "nic":
3578 if st_groups[1] == "count":
3579 val = len(instance.nics)
3580 elif st_groups[1] == "macs":
3581 val = [nic.mac for nic in instance.nics]
3582 elif st_groups[1] == "ips":
3583 val = [nic.ip for nic in instance.nics]
3584 elif st_groups[1] == "bridges":
3585 val = [nic.bridge for nic in instance.nics]
3588 nic_idx = int(st_groups[2])
3589 if nic_idx >= len(instance.nics):
3592 if st_groups[1] == "mac":
3593 val = instance.nics[nic_idx].mac
3594 elif st_groups[1] == "ip":
3595 val = instance.nics[nic_idx].ip
3596 elif st_groups[1] == "bridge":
3597 val = instance.nics[nic_idx].bridge
3599 assert False, "Unhandled NIC parameter"
3601 assert False, "Unhandled variable parameter"
3603 raise errors.ParameterError(field)
3610 class LUFailoverInstance(LogicalUnit):
3611 """Failover an instance.
3614 HPATH = "instance-failover"
3615 HTYPE = constants.HTYPE_INSTANCE
3616 _OP_REQP = ["instance_name", "ignore_consistency"]
3619 def ExpandNames(self):
3620 self._ExpandAndLockInstance()
3621 self.needed_locks[locking.LEVEL_NODE] = []
3622 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3624 def DeclareLocks(self, level):
3625 if level == locking.LEVEL_NODE:
3626 self._LockInstancesNodes()
3628 def BuildHooksEnv(self):
3631 This runs on master, primary and secondary nodes of the instance.
3635 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3637 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3638 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3641 def CheckPrereq(self):
3642 """Check prerequisites.
3644 This checks that the instance is in the cluster.
3647 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3648 assert self.instance is not None, \
3649 "Cannot retrieve locked instance %s" % self.op.instance_name
3651 bep = self.cfg.GetClusterInfo().FillBE(instance)
3652 if instance.disk_template not in constants.DTS_NET_MIRROR:
3653 raise errors.OpPrereqError("Instance's disk layout is not"
3654 " network mirrored, cannot failover.")
3656 secondary_nodes = instance.secondary_nodes
3657 if not secondary_nodes:
3658 raise errors.ProgrammerError("no secondary node but using "
3659 "a mirrored disk template")
3661 target_node = secondary_nodes[0]
3662 _CheckNodeOnline(self, target_node)
3663 _CheckNodeNotDrained(self, target_node)
3664 # check memory requirements on the secondary node
3665 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3666 instance.name, bep[constants.BE_MEMORY],
3667 instance.hypervisor)
3668 # check bridge existance
3669 _CheckInstanceBridgesExist(self, instance, node=target_node)
3671 def Exec(self, feedback_fn):
3672 """Failover an instance.
3674 The failover is done by shutting it down on its present node and
3675 starting it on the secondary.
3678 instance = self.instance
3680 source_node = instance.primary_node
3681 target_node = instance.secondary_nodes[0]
3683 feedback_fn("* checking disk consistency between source and target")
3684 for dev in instance.disks:
3685 # for drbd, these are drbd over lvm
3686 if not _CheckDiskConsistency(self, dev, target_node, False):
3687 if instance.admin_up and not self.op.ignore_consistency:
3688 raise errors.OpExecError("Disk %s is degraded on target node,"
3689 " aborting failover." % dev.iv_name)
3691 feedback_fn("* shutting down instance on source node")
3692 logging.info("Shutting down instance %s on node %s",
3693 instance.name, source_node)
3695 result = self.rpc.call_instance_shutdown(source_node, instance)
3696 msg = result.RemoteFailMsg()
3698 if self.op.ignore_consistency:
3699 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3700 " Proceeding anyway. Please make sure node"
3701 " %s is down. Error details: %s",
3702 instance.name, source_node, source_node, msg)
3704 raise errors.OpExecError("Could not shutdown instance %s on"
3706 (instance.name, source_node, msg))
3708 feedback_fn("* deactivating the instance's disks on source node")
3709 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3710 raise errors.OpExecError("Can't shut down the instance's disks.")
3712 instance.primary_node = target_node
3713 # distribute new instance config to the other nodes
3714 self.cfg.Update(instance)
3716 # Only start the instance if it's marked as up
3717 if instance.admin_up:
3718 feedback_fn("* activating the instance's disks on target node")
3719 logging.info("Starting instance %s on node %s",
3720 instance.name, target_node)
3722 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3723 ignore_secondaries=True)
3725 _ShutdownInstanceDisks(self, instance)
3726 raise errors.OpExecError("Can't activate the instance's disks")
3728 feedback_fn("* starting the instance on the target node")
3729 result = self.rpc.call_instance_start(target_node, instance, None, None)
3730 msg = result.RemoteFailMsg()
3732 _ShutdownInstanceDisks(self, instance)
3733 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3734 (instance.name, target_node, msg))
3737 class LUMigrateInstance(LogicalUnit):
3738 """Migrate an instance.
3740 This is migration without shutting down, compared to the failover,
3741 which is done with shutdown.
3744 HPATH = "instance-migrate"
3745 HTYPE = constants.HTYPE_INSTANCE
3746 _OP_REQP = ["instance_name", "live", "cleanup"]
3750 def ExpandNames(self):
3751 self._ExpandAndLockInstance()
3752 self.needed_locks[locking.LEVEL_NODE] = []
3753 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3755 def DeclareLocks(self, level):
3756 if level == locking.LEVEL_NODE:
3757 self._LockInstancesNodes()
3759 def BuildHooksEnv(self):
3762 This runs on master, primary and secondary nodes of the instance.
3765 env = _BuildInstanceHookEnvByObject(self, self.instance)
3766 env["MIGRATE_LIVE"] = self.op.live
3767 env["MIGRATE_CLEANUP"] = self.op.cleanup
3768 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3771 def CheckPrereq(self):
3772 """Check prerequisites.
3774 This checks that the instance is in the cluster.
3777 instance = self.cfg.GetInstanceInfo(
3778 self.cfg.ExpandInstanceName(self.op.instance_name))
3779 if instance is None:
3780 raise errors.OpPrereqError("Instance '%s' not known" %
3781 self.op.instance_name)
3783 if instance.disk_template != constants.DT_DRBD8:
3784 raise errors.OpPrereqError("Instance's disk layout is not"
3785 " drbd8, cannot migrate.")
3787 secondary_nodes = instance.secondary_nodes
3788 if not secondary_nodes:
3789 raise errors.ConfigurationError("No secondary node but using"
3790 " drbd8 disk template")
3792 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3794 target_node = secondary_nodes[0]
3795 # check memory requirements on the secondary node
3796 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3797 instance.name, i_be[constants.BE_MEMORY],
3798 instance.hypervisor)
3800 # check bridge existance
3801 _CheckInstanceBridgesExist(self, instance, node=target_node)
3803 if not self.op.cleanup:
3804 _CheckNodeNotDrained(self, target_node)
3805 result = self.rpc.call_instance_migratable(instance.primary_node,
3807 msg = result.RemoteFailMsg()
3809 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3812 self.instance = instance
3814 def _WaitUntilSync(self):
3815 """Poll with custom rpc for disk sync.
3817 This uses our own step-based rpc call.
3820 self.feedback_fn("* wait until resync is done")
3824 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3826 self.instance.disks)
3828 for node, nres in result.items():
3829 msg = nres.RemoteFailMsg()
3831 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3833 node_done, node_percent = nres.payload
3834 all_done = all_done and node_done
3835 if node_percent is not None:
3836 min_percent = min(min_percent, node_percent)
3838 if min_percent < 100:
3839 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3842 def _EnsureSecondary(self, node):
3843 """Demote a node to secondary.
3846 self.feedback_fn("* switching node %s to secondary mode" % node)
3848 for dev in self.instance.disks:
3849 self.cfg.SetDiskID(dev, node)
3851 result = self.rpc.call_blockdev_close(node, self.instance.name,
3852 self.instance.disks)
3853 msg = result.RemoteFailMsg()
3855 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3856 " error %s" % (node, msg))
3858 def _GoStandalone(self):
3859 """Disconnect from the network.
3862 self.feedback_fn("* changing into standalone mode")
3863 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3864 self.instance.disks)
3865 for node, nres in result.items():
3866 msg = nres.RemoteFailMsg()
3868 raise errors.OpExecError("Cannot disconnect disks node %s,"
3869 " error %s" % (node, msg))
3871 def _GoReconnect(self, multimaster):
3872 """Reconnect to the network.
3878 msg = "single-master"
3879 self.feedback_fn("* changing disks into %s mode" % msg)
3880 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3881 self.instance.disks,
3882 self.instance.name, multimaster)
3883 for node, nres in result.items():
3884 msg = nres.RemoteFailMsg()
3886 raise errors.OpExecError("Cannot change disks config on node %s,"
3887 " error: %s" % (node, msg))
3889 def _ExecCleanup(self):
3890 """Try to cleanup after a failed migration.
3892 The cleanup is done by:
3893 - check that the instance is running only on one node
3894 (and update the config if needed)
3895 - change disks on its secondary node to secondary
3896 - wait until disks are fully synchronized
3897 - disconnect from the network
3898 - change disks into single-master mode
3899 - wait again until disks are fully synchronized
3902 instance = self.instance
3903 target_node = self.target_node
3904 source_node = self.source_node
3906 # check running on only one node
3907 self.feedback_fn("* checking where the instance actually runs"
3908 " (if this hangs, the hypervisor might be in"
3910 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3911 for node, result in ins_l.items():
3912 msg = result.RemoteFailMsg()
3914 raise errors.OpExecError("Can't contact node %s: %s" % (node, msg))
3916 runningon_source = instance.name in ins_l[source_node].payload
3917 runningon_target = instance.name in ins_l[target_node].payload
3919 if runningon_source and runningon_target:
3920 raise errors.OpExecError("Instance seems to be running on two nodes,"
3921 " or the hypervisor is confused. You will have"
3922 " to ensure manually that it runs only on one"
3923 " and restart this operation.")
3925 if not (runningon_source or runningon_target):
3926 raise errors.OpExecError("Instance does not seem to be running at all."
3927 " In this case, it's safer to repair by"
3928 " running 'gnt-instance stop' to ensure disk"
3929 " shutdown, and then restarting it.")
3931 if runningon_target:
3932 # the migration has actually succeeded, we need to update the config
3933 self.feedback_fn("* instance running on secondary node (%s),"
3934 " updating config" % target_node)
3935 instance.primary_node = target_node
3936 self.cfg.Update(instance)
3937 demoted_node = source_node
3939 self.feedback_fn("* instance confirmed to be running on its"
3940 " primary node (%s)" % source_node)
3941 demoted_node = target_node
3943 self._EnsureSecondary(demoted_node)
3945 self._WaitUntilSync()
3946 except errors.OpExecError:
3947 # we ignore here errors, since if the device is standalone, it
3948 # won't be able to sync
3950 self._GoStandalone()
3951 self._GoReconnect(False)
3952 self._WaitUntilSync()
3954 self.feedback_fn("* done")
3956 def _RevertDiskStatus(self):
3957 """Try to revert the disk status after a failed migration.
3960 target_node = self.target_node
3962 self._EnsureSecondary(target_node)
3963 self._GoStandalone()
3964 self._GoReconnect(False)
3965 self._WaitUntilSync()
3966 except errors.OpExecError, err:
3967 self.LogWarning("Migration failed and I can't reconnect the"
3968 " drives: error '%s'\n"
3969 "Please look and recover the instance status" %
3972 def _AbortMigration(self):
3973 """Call the hypervisor code to abort a started migration.
3976 instance = self.instance
3977 target_node = self.target_node
3978 migration_info = self.migration_info
3980 abort_result = self.rpc.call_finalize_migration(target_node,
3984 abort_msg = abort_result.RemoteFailMsg()
3986 logging.error("Aborting migration failed on target node %s: %s" %
3987 (target_node, abort_msg))
3988 # Don't raise an exception here, as we stil have to try to revert the
3989 # disk status, even if this step failed.
3991 def _ExecMigration(self):
3992 """Migrate an instance.
3994 The migrate is done by:
3995 - change the disks into dual-master mode
3996 - wait until disks are fully synchronized again
3997 - migrate the instance
3998 - change disks on the new secondary node (the old primary) to secondary
3999 - wait until disks are fully synchronized
4000 - change disks into single-master mode
4003 instance = self.instance
4004 target_node = self.target_node
4005 source_node = self.source_node
4007 self.feedback_fn("* checking disk consistency between source and target")
4008 for dev in instance.disks:
4009 if not _CheckDiskConsistency(self, dev, target_node, False):
4010 raise errors.OpExecError("Disk %s is degraded or not fully"
4011 " synchronized on target node,"
4012 " aborting migrate." % dev.iv_name)
4014 # First get the migration information from the remote node
4015 result = self.rpc.call_migration_info(source_node, instance)
4016 msg = result.RemoteFailMsg()
4018 log_err = ("Failed fetching source migration information from %s: %s" %
4020 logging.error(log_err)
4021 raise errors.OpExecError(log_err)
4023 self.migration_info = migration_info = result.payload
4025 # Then switch the disks to master/master mode
4026 self._EnsureSecondary(target_node)
4027 self._GoStandalone()
4028 self._GoReconnect(True)
4029 self._WaitUntilSync()
4031 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4032 result = self.rpc.call_accept_instance(target_node,
4035 self.nodes_ip[target_node])
4037 msg = result.RemoteFailMsg()
4039 logging.error("Instance pre-migration failed, trying to revert"
4040 " disk status: %s", msg)
4041 self._AbortMigration()
4042 self._RevertDiskStatus()
4043 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4044 (instance.name, msg))
4046 self.feedback_fn("* migrating instance to %s" % target_node)
4048 result = self.rpc.call_instance_migrate(source_node, instance,
4049 self.nodes_ip[target_node],
4051 msg = result.RemoteFailMsg()
4053 logging.error("Instance migration failed, trying to revert"
4054 " disk status: %s", msg)
4055 self._AbortMigration()
4056 self._RevertDiskStatus()
4057 raise errors.OpExecError("Could not migrate instance %s: %s" %
4058 (instance.name, msg))
4061 instance.primary_node = target_node
4062 # distribute new instance config to the other nodes
4063 self.cfg.Update(instance)
4065 result = self.rpc.call_finalize_migration(target_node,
4069 msg = result.RemoteFailMsg()
4071 logging.error("Instance migration succeeded, but finalization failed:"
4073 raise errors.OpExecError("Could not finalize instance migration: %s" %
4076 self._EnsureSecondary(source_node)
4077 self._WaitUntilSync()
4078 self._GoStandalone()
4079 self._GoReconnect(False)
4080 self._WaitUntilSync()
4082 self.feedback_fn("* done")
4084 def Exec(self, feedback_fn):
4085 """Perform the migration.
4088 self.feedback_fn = feedback_fn
4090 self.source_node = self.instance.primary_node
4091 self.target_node = self.instance.secondary_nodes[0]
4092 self.all_nodes = [self.source_node, self.target_node]
4094 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4095 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4098 return self._ExecCleanup()
4100 return self._ExecMigration()
4103 def _CreateBlockDev(lu, node, instance, device, force_create,
4105 """Create a tree of block devices on a given node.
4107 If this device type has to be created on secondaries, create it and
4110 If not, just recurse to children keeping the same 'force' value.
4112 @param lu: the lu on whose behalf we execute
4113 @param node: the node on which to create the device
4114 @type instance: L{objects.Instance}
4115 @param instance: the instance which owns the device
4116 @type device: L{objects.Disk}
4117 @param device: the device to create
4118 @type force_create: boolean
4119 @param force_create: whether to force creation of this device; this
4120 will be change to True whenever we find a device which has
4121 CreateOnSecondary() attribute
4122 @param info: the extra 'metadata' we should attach to the device
4123 (this will be represented as a LVM tag)
4124 @type force_open: boolean
4125 @param force_open: this parameter will be passes to the
4126 L{backend.BlockdevCreate} function where it specifies
4127 whether we run on primary or not, and it affects both
4128 the child assembly and the device own Open() execution
4131 if device.CreateOnSecondary():
4135 for child in device.children:
4136 _CreateBlockDev(lu, node, instance, child, force_create,
4139 if not force_create:
4142 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4145 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4146 """Create a single block device on a given node.
4148 This will not recurse over children of the device, so they must be
4151 @param lu: the lu on whose behalf we execute
4152 @param node: the node on which to create the device
4153 @type instance: L{objects.Instance}
4154 @param instance: the instance which owns the device
4155 @type device: L{objects.Disk}
4156 @param device: the device to create
4157 @param info: the extra 'metadata' we should attach to the device
4158 (this will be represented as a LVM tag)
4159 @type force_open: boolean
4160 @param force_open: this parameter will be passes to the
4161 L{backend.BlockdevCreate} function where it specifies
4162 whether we run on primary or not, and it affects both
4163 the child assembly and the device own Open() execution
4166 lu.cfg.SetDiskID(device, node)
4167 result = lu.rpc.call_blockdev_create(node, device, device.size,
4168 instance.name, force_open, info)
4169 msg = result.RemoteFailMsg()
4171 raise errors.OpExecError("Can't create block device %s on"
4172 " node %s for instance %s: %s" %
4173 (device, node, instance.name, msg))
4174 if device.physical_id is None:
4175 device.physical_id = result.payload
4178 def _GenerateUniqueNames(lu, exts):
4179 """Generate a suitable LV name.
4181 This will generate a logical volume name for the given instance.
4186 new_id = lu.cfg.GenerateUniqueID()
4187 results.append("%s%s" % (new_id, val))
4191 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4193 """Generate a drbd8 device complete with its children.
4196 port = lu.cfg.AllocatePort()
4197 vgname = lu.cfg.GetVGName()
4198 shared_secret = lu.cfg.GenerateDRBDSecret()
4199 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4200 logical_id=(vgname, names[0]))
4201 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4202 logical_id=(vgname, names[1]))
4203 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4204 logical_id=(primary, secondary, port,
4207 children=[dev_data, dev_meta],
4212 def _GenerateDiskTemplate(lu, template_name,
4213 instance_name, primary_node,
4214 secondary_nodes, disk_info,
4215 file_storage_dir, file_driver,
4217 """Generate the entire disk layout for a given template type.
4220 #TODO: compute space requirements
4222 vgname = lu.cfg.GetVGName()
4223 disk_count = len(disk_info)
4225 if template_name == constants.DT_DISKLESS:
4227 elif template_name == constants.DT_PLAIN:
4228 if len(secondary_nodes) != 0:
4229 raise errors.ProgrammerError("Wrong template configuration")
4231 names = _GenerateUniqueNames(lu, [".disk%d" % i
4232 for i in range(disk_count)])
4233 for idx, disk in enumerate(disk_info):
4234 disk_index = idx + base_index
4235 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4236 logical_id=(vgname, names[idx]),
4237 iv_name="disk/%d" % disk_index,
4239 disks.append(disk_dev)
4240 elif template_name == constants.DT_DRBD8:
4241 if len(secondary_nodes) != 1:
4242 raise errors.ProgrammerError("Wrong template configuration")
4243 remote_node = secondary_nodes[0]
4244 minors = lu.cfg.AllocateDRBDMinor(
4245 [primary_node, remote_node] * len(disk_info), instance_name)
4248 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4249 for i in range(disk_count)]):
4250 names.append(lv_prefix + "_data")
4251 names.append(lv_prefix + "_meta")
4252 for idx, disk in enumerate(disk_info):
4253 disk_index = idx + base_index
4254 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4255 disk["size"], names[idx*2:idx*2+2],
4256 "disk/%d" % disk_index,
4257 minors[idx*2], minors[idx*2+1])
4258 disk_dev.mode = disk["mode"]
4259 disks.append(disk_dev)
4260 elif template_name == constants.DT_FILE:
4261 if len(secondary_nodes) != 0:
4262 raise errors.ProgrammerError("Wrong template configuration")
4264 for idx, disk in enumerate(disk_info):
4265 disk_index = idx + base_index
4266 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4267 iv_name="disk/%d" % disk_index,
4268 logical_id=(file_driver,
4269 "%s/disk%d" % (file_storage_dir,
4272 disks.append(disk_dev)
4274 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4278 def _GetInstanceInfoText(instance):
4279 """Compute that text that should be added to the disk's metadata.
4282 return "originstname+%s" % instance.name
4285 def _CreateDisks(lu, instance):
4286 """Create all disks for an instance.
4288 This abstracts away some work from AddInstance.
4290 @type lu: L{LogicalUnit}
4291 @param lu: the logical unit on whose behalf we execute
4292 @type instance: L{objects.Instance}
4293 @param instance: the instance whose disks we should create
4295 @return: the success of the creation
4298 info = _GetInstanceInfoText(instance)
4299 pnode = instance.primary_node
4301 if instance.disk_template == constants.DT_FILE:
4302 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4303 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4305 if result.failed or not result.data:
4306 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4308 if not result.data[0]:
4309 raise errors.OpExecError("Failed to create directory '%s'" %
4312 # Note: this needs to be kept in sync with adding of disks in
4313 # LUSetInstanceParams
4314 for device in instance.disks:
4315 logging.info("Creating volume %s for instance %s",
4316 device.iv_name, instance.name)
4318 for node in instance.all_nodes:
4319 f_create = node == pnode
4320 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4323 def _RemoveDisks(lu, instance):
4324 """Remove all disks for an instance.
4326 This abstracts away some work from `AddInstance()` and
4327 `RemoveInstance()`. Note that in case some of the devices couldn't
4328 be removed, the removal will continue with the other ones (compare
4329 with `_CreateDisks()`).
4331 @type lu: L{LogicalUnit}
4332 @param lu: the logical unit on whose behalf we execute
4333 @type instance: L{objects.Instance}
4334 @param instance: the instance whose disks we should remove
4336 @return: the success of the removal
4339 logging.info("Removing block devices for instance %s", instance.name)
4342 for device in instance.disks:
4343 for node, disk in device.ComputeNodeTree(instance.primary_node):
4344 lu.cfg.SetDiskID(disk, node)
4345 msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4347 lu.LogWarning("Could not remove block device %s on node %s,"
4348 " continuing anyway: %s", device.iv_name, node, msg)
4351 if instance.disk_template == constants.DT_FILE:
4352 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4353 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4355 if result.failed or not result.data:
4356 logging.error("Could not remove directory '%s'", file_storage_dir)
4362 def _ComputeDiskSize(disk_template, disks):
4363 """Compute disk size requirements in the volume group
4366 # Required free disk space as a function of disk and swap space
4368 constants.DT_DISKLESS: None,
4369 constants.DT_PLAIN: sum(d["size"] for d in disks),
4370 # 128 MB are added for drbd metadata for each disk
4371 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4372 constants.DT_FILE: None,
4375 if disk_template not in req_size_dict:
4376 raise errors.ProgrammerError("Disk template '%s' size requirement"
4377 " is unknown" % disk_template)
4379 return req_size_dict[disk_template]
4382 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4383 """Hypervisor parameter validation.
4385 This function abstract the hypervisor parameter validation to be
4386 used in both instance create and instance modify.
4388 @type lu: L{LogicalUnit}
4389 @param lu: the logical unit for which we check
4390 @type nodenames: list
4391 @param nodenames: the list of nodes on which we should check
4392 @type hvname: string
4393 @param hvname: the name of the hypervisor we should use
4394 @type hvparams: dict
4395 @param hvparams: the parameters which we need to check
4396 @raise errors.OpPrereqError: if the parameters are not valid
4399 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4402 for node in nodenames:
4406 msg = info.RemoteFailMsg()
4408 raise errors.OpPrereqError("Hypervisor parameter validation"
4409 " failed on node %s: %s" % (node, msg))
4412 class LUCreateInstance(LogicalUnit):
4413 """Create an instance.
4416 HPATH = "instance-add"
4417 HTYPE = constants.HTYPE_INSTANCE
4418 _OP_REQP = ["instance_name", "disks", "disk_template",
4420 "wait_for_sync", "ip_check", "nics",
4421 "hvparams", "beparams"]
4424 def _ExpandNode(self, node):
4425 """Expands and checks one node name.
4428 node_full = self.cfg.ExpandNodeName(node)
4429 if node_full is None:
4430 raise errors.OpPrereqError("Unknown node %s" % node)
4433 def ExpandNames(self):
4434 """ExpandNames for CreateInstance.
4436 Figure out the right locks for instance creation.
4439 self.needed_locks = {}
4441 # set optional parameters to none if they don't exist
4442 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4443 if not hasattr(self.op, attr):
4444 setattr(self.op, attr, None)
4446 # cheap checks, mostly valid constants given
4448 # verify creation mode
4449 if self.op.mode not in (constants.INSTANCE_CREATE,
4450 constants.INSTANCE_IMPORT):
4451 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4454 # disk template and mirror node verification
4455 if self.op.disk_template not in constants.DISK_TEMPLATES:
4456 raise errors.OpPrereqError("Invalid disk template name")
4458 if self.op.hypervisor is None:
4459 self.op.hypervisor = self.cfg.GetHypervisorType()
4461 cluster = self.cfg.GetClusterInfo()
4462 enabled_hvs = cluster.enabled_hypervisors
4463 if self.op.hypervisor not in enabled_hvs:
4464 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4465 " cluster (%s)" % (self.op.hypervisor,
4466 ",".join(enabled_hvs)))
4468 # check hypervisor parameter syntax (locally)
4469 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4470 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4472 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4473 hv_type.CheckParameterSyntax(filled_hvp)
4475 # fill and remember the beparams dict
4476 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4477 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4480 #### instance parameters check
4482 # instance name verification
4483 hostname1 = utils.HostInfo(self.op.instance_name)
4484 self.op.instance_name = instance_name = hostname1.name
4486 # this is just a preventive check, but someone might still add this
4487 # instance in the meantime, and creation will fail at lock-add time
4488 if instance_name in self.cfg.GetInstanceList():
4489 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4492 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4496 for idx, nic in enumerate(self.op.nics):
4497 nic_mode_req = nic.get("mode", None)
4498 nic_mode = nic_mode_req
4499 if nic_mode is None:
4500 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4502 # in routed mode, for the first nic, the default ip is 'auto'
4503 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4504 default_ip_mode = constants.VALUE_AUTO
4506 default_ip_mode = constants.VALUE_NONE
4508 # ip validity checks
4509 ip = nic.get("ip", default_ip_mode)
4510 if ip is None or ip.lower() == constants.VALUE_NONE:
4512 elif ip.lower() == constants.VALUE_AUTO:
4513 nic_ip = hostname1.ip
4515 if not utils.IsValidIP(ip):
4516 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4517 " like a valid IP" % ip)
4520 # TODO: check the ip for uniqueness !!
4521 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4522 raise errors.OpPrereqError("Routed nic mode requires an ip address")
4524 # MAC address verification
4525 mac = nic.get("mac", constants.VALUE_AUTO)
4526 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4527 if not utils.IsValidMac(mac.lower()):
4528 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4530 # bridge verification
4531 bridge = nic.get("bridge", None)
4532 link = nic.get("link", None)
4534 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4535 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4536 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4542 nicparams[constants.NIC_MODE] = nic_mode_req
4544 nicparams[constants.NIC_LINK] = link
4546 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4548 objects.NIC.CheckParameterSyntax(check_params)
4549 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4551 # disk checks/pre-build
4553 for disk in self.op.disks:
4554 mode = disk.get("mode", constants.DISK_RDWR)
4555 if mode not in constants.DISK_ACCESS_SET:
4556 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4558 size = disk.get("size", None)
4560 raise errors.OpPrereqError("Missing disk size")
4564 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4565 self.disks.append({"size": size, "mode": mode})
4567 # used in CheckPrereq for ip ping check
4568 self.check_ip = hostname1.ip
4570 # file storage checks
4571 if (self.op.file_driver and
4572 not self.op.file_driver in constants.FILE_DRIVER):
4573 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4574 self.op.file_driver)
4576 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4577 raise errors.OpPrereqError("File storage directory path not absolute")
4579 ### Node/iallocator related checks
4580 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4581 raise errors.OpPrereqError("One and only one of iallocator and primary"
4582 " node must be given")
4584 if self.op.iallocator:
4585 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4587 self.op.pnode = self._ExpandNode(self.op.pnode)
4588 nodelist = [self.op.pnode]
4589 if self.op.snode is not None:
4590 self.op.snode = self._ExpandNode(self.op.snode)
4591 nodelist.append(self.op.snode)
4592 self.needed_locks[locking.LEVEL_NODE] = nodelist
4594 # in case of import lock the source node too
4595 if self.op.mode == constants.INSTANCE_IMPORT:
4596 src_node = getattr(self.op, "src_node", None)
4597 src_path = getattr(self.op, "src_path", None)
4599 if src_path is None:
4600 self.op.src_path = src_path = self.op.instance_name
4602 if src_node is None:
4603 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4604 self.op.src_node = None
4605 if os.path.isabs(src_path):
4606 raise errors.OpPrereqError("Importing an instance from an absolute"
4607 " path requires a source node option.")
4609 self.op.src_node = src_node = self._ExpandNode(src_node)
4610 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4611 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4612 if not os.path.isabs(src_path):
4613 self.op.src_path = src_path = \
4614 os.path.join(constants.EXPORT_DIR, src_path)
4616 else: # INSTANCE_CREATE
4617 if getattr(self.op, "os_type", None) is None:
4618 raise errors.OpPrereqError("No guest OS specified")
4620 def _RunAllocator(self):
4621 """Run the allocator based on input opcode.
4624 nics = [n.ToDict() for n in self.nics]
4625 ial = IAllocator(self,
4626 mode=constants.IALLOCATOR_MODE_ALLOC,
4627 name=self.op.instance_name,
4628 disk_template=self.op.disk_template,
4631 vcpus=self.be_full[constants.BE_VCPUS],
4632 mem_size=self.be_full[constants.BE_MEMORY],
4635 hypervisor=self.op.hypervisor,
4638 ial.Run(self.op.iallocator)
4641 raise errors.OpPrereqError("Can't compute nodes using"
4642 " iallocator '%s': %s" % (self.op.iallocator,
4644 if len(ial.nodes) != ial.required_nodes:
4645 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4646 " of nodes (%s), required %s" %
4647 (self.op.iallocator, len(ial.nodes),
4648 ial.required_nodes))
4649 self.op.pnode = ial.nodes[0]
4650 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4651 self.op.instance_name, self.op.iallocator,
4652 ", ".join(ial.nodes))
4653 if ial.required_nodes == 2:
4654 self.op.snode = ial.nodes[1]
4656 def BuildHooksEnv(self):
4659 This runs on master, primary and secondary nodes of the instance.
4663 "ADD_MODE": self.op.mode,
4665 if self.op.mode == constants.INSTANCE_IMPORT:
4666 env["SRC_NODE"] = self.op.src_node
4667 env["SRC_PATH"] = self.op.src_path
4668 env["SRC_IMAGES"] = self.src_images
4670 env.update(_BuildInstanceHookEnv(
4671 name=self.op.instance_name,
4672 primary_node=self.op.pnode,
4673 secondary_nodes=self.secondaries,
4674 status=self.op.start,
4675 os_type=self.op.os_type,
4676 memory=self.be_full[constants.BE_MEMORY],
4677 vcpus=self.be_full[constants.BE_VCPUS],
4678 nics=_PreBuildNICHooksList(self, self.nics),
4679 disk_template=self.op.disk_template,
4680 disks=[(d["size"], d["mode"]) for d in self.disks],
4683 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4688 def CheckPrereq(self):
4689 """Check prerequisites.
4692 if (not self.cfg.GetVGName() and
4693 self.op.disk_template not in constants.DTS_NOT_LVM):
4694 raise errors.OpPrereqError("Cluster does not support lvm-based"
4697 if self.op.mode == constants.INSTANCE_IMPORT:
4698 src_node = self.op.src_node
4699 src_path = self.op.src_path
4701 if src_node is None:
4702 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4703 exp_list = self.rpc.call_export_list(locked_nodes)
4705 for node in exp_list:
4706 if exp_list[node].RemoteFailMsg():
4708 if src_path in exp_list[node].payload:
4710 self.op.src_node = src_node = node
4711 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4715 raise errors.OpPrereqError("No export found for relative path %s" %
4718 _CheckNodeOnline(self, src_node)
4719 result = self.rpc.call_export_info(src_node, src_path)
4720 msg = result.RemoteFailMsg()
4722 raise errors.OpPrereqError("No export or invalid export found in"
4723 " dir %s: %s" % (src_path, msg))
4725 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4726 if not export_info.has_section(constants.INISECT_EXP):
4727 raise errors.ProgrammerError("Corrupted export config")
4729 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4730 if (int(ei_version) != constants.EXPORT_VERSION):
4731 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4732 (ei_version, constants.EXPORT_VERSION))
4734 # Check that the new instance doesn't have less disks than the export
4735 instance_disks = len(self.disks)
4736 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4737 if instance_disks < export_disks:
4738 raise errors.OpPrereqError("Not enough disks to import."
4739 " (instance: %d, export: %d)" %
4740 (instance_disks, export_disks))
4742 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4744 for idx in range(export_disks):
4745 option = 'disk%d_dump' % idx
4746 if export_info.has_option(constants.INISECT_INS, option):
4747 # FIXME: are the old os-es, disk sizes, etc. useful?
4748 export_name = export_info.get(constants.INISECT_INS, option)
4749 image = os.path.join(src_path, export_name)
4750 disk_images.append(image)
4752 disk_images.append(False)
4754 self.src_images = disk_images
4756 old_name = export_info.get(constants.INISECT_INS, 'name')
4757 # FIXME: int() here could throw a ValueError on broken exports
4758 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4759 if self.op.instance_name == old_name:
4760 for idx, nic in enumerate(self.nics):
4761 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4762 nic_mac_ini = 'nic%d_mac' % idx
4763 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4765 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4766 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4767 if self.op.start and not self.op.ip_check:
4768 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4769 " adding an instance in start mode")
4771 if self.op.ip_check:
4772 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4773 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4774 (self.check_ip, self.op.instance_name))
4776 #### mac address generation
4777 # By generating here the mac address both the allocator and the hooks get
4778 # the real final mac address rather than the 'auto' or 'generate' value.
4779 # There is a race condition between the generation and the instance object
4780 # creation, which means that we know the mac is valid now, but we're not
4781 # sure it will be when we actually add the instance. If things go bad
4782 # adding the instance will abort because of a duplicate mac, and the
4783 # creation job will fail.
4784 for nic in self.nics:
4785 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4786 nic.mac = self.cfg.GenerateMAC()
4790 if self.op.iallocator is not None:
4791 self._RunAllocator()
4793 #### node related checks
4795 # check primary node
4796 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4797 assert self.pnode is not None, \
4798 "Cannot retrieve locked node %s" % self.op.pnode
4800 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4803 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4806 self.secondaries = []
4808 # mirror node verification
4809 if self.op.disk_template in constants.DTS_NET_MIRROR:
4810 if self.op.snode is None:
4811 raise errors.OpPrereqError("The networked disk templates need"
4813 if self.op.snode == pnode.name:
4814 raise errors.OpPrereqError("The secondary node cannot be"
4815 " the primary node.")
4816 _CheckNodeOnline(self, self.op.snode)
4817 _CheckNodeNotDrained(self, self.op.snode)
4818 self.secondaries.append(self.op.snode)
4820 nodenames = [pnode.name] + self.secondaries
4822 req_size = _ComputeDiskSize(self.op.disk_template,
4825 # Check lv size requirements
4826 if req_size is not None:
4827 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4829 for node in nodenames:
4830 info = nodeinfo[node]
4831 msg = info.RemoteFailMsg()
4833 raise errors.OpPrereqError("Cannot get current information"
4834 " from node %s: %s" % (node, msg))
4836 vg_free = info.get('vg_free', None)
4837 if not isinstance(vg_free, int):
4838 raise errors.OpPrereqError("Can't compute free disk space on"
4840 if req_size > vg_free:
4841 raise errors.OpPrereqError("Not enough disk space on target node %s."
4842 " %d MB available, %d MB required" %
4843 (node, vg_free, req_size))
4845 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4848 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4850 if not isinstance(result.data, objects.OS):
4851 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4852 " primary node" % self.op.os_type)
4854 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4856 # memory check on primary node
4858 _CheckNodeFreeMemory(self, self.pnode.name,
4859 "creating instance %s" % self.op.instance_name,
4860 self.be_full[constants.BE_MEMORY],
4863 def Exec(self, feedback_fn):
4864 """Create and add the instance to the cluster.
4867 instance = self.op.instance_name
4868 pnode_name = self.pnode.name
4870 ht_kind = self.op.hypervisor
4871 if ht_kind in constants.HTS_REQ_PORT:
4872 network_port = self.cfg.AllocatePort()
4876 ##if self.op.vnc_bind_address is None:
4877 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4879 # this is needed because os.path.join does not accept None arguments
4880 if self.op.file_storage_dir is None:
4881 string_file_storage_dir = ""
4883 string_file_storage_dir = self.op.file_storage_dir
4885 # build the full file storage dir path
4886 file_storage_dir = os.path.normpath(os.path.join(
4887 self.cfg.GetFileStorageDir(),
4888 string_file_storage_dir, instance))
4891 disks = _GenerateDiskTemplate(self,
4892 self.op.disk_template,
4893 instance, pnode_name,
4897 self.op.file_driver,
4900 iobj = objects.Instance(name=instance, os=self.op.os_type,
4901 primary_node=pnode_name,
4902 nics=self.nics, disks=disks,
4903 disk_template=self.op.disk_template,
4905 network_port=network_port,
4906 beparams=self.op.beparams,
4907 hvparams=self.op.hvparams,
4908 hypervisor=self.op.hypervisor,
4911 feedback_fn("* creating instance disks...")
4913 _CreateDisks(self, iobj)
4914 except errors.OpExecError:
4915 self.LogWarning("Device creation failed, reverting...")
4917 _RemoveDisks(self, iobj)
4919 self.cfg.ReleaseDRBDMinors(instance)
4922 feedback_fn("adding instance %s to cluster config" % instance)
4924 self.cfg.AddInstance(iobj)
4925 # Declare that we don't want to remove the instance lock anymore, as we've
4926 # added the instance to the config
4927 del self.remove_locks[locking.LEVEL_INSTANCE]
4928 # Unlock all the nodes
4929 if self.op.mode == constants.INSTANCE_IMPORT:
4930 nodes_keep = [self.op.src_node]
4931 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4932 if node != self.op.src_node]
4933 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4934 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4936 self.context.glm.release(locking.LEVEL_NODE)
4937 del self.acquired_locks[locking.LEVEL_NODE]
4939 if self.op.wait_for_sync:
4940 disk_abort = not _WaitForSync(self, iobj)
4941 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4942 # make sure the disks are not degraded (still sync-ing is ok)
4944 feedback_fn("* checking mirrors status")
4945 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4950 _RemoveDisks(self, iobj)
4951 self.cfg.RemoveInstance(iobj.name)
4952 # Make sure the instance lock gets removed
4953 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4954 raise errors.OpExecError("There are some degraded disks for"
4957 feedback_fn("creating os for instance %s on node %s" %
4958 (instance, pnode_name))
4960 if iobj.disk_template != constants.DT_DISKLESS:
4961 if self.op.mode == constants.INSTANCE_CREATE:
4962 feedback_fn("* running the instance OS create scripts...")
4963 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4964 msg = result.RemoteFailMsg()
4966 raise errors.OpExecError("Could not add os for instance %s"
4968 (instance, pnode_name, msg))
4970 elif self.op.mode == constants.INSTANCE_IMPORT:
4971 feedback_fn("* running the instance OS import scripts...")
4972 src_node = self.op.src_node
4973 src_images = self.src_images
4974 cluster_name = self.cfg.GetClusterName()
4975 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4976 src_node, src_images,
4978 msg = import_result.RemoteFailMsg()
4980 self.LogWarning("Error while importing the disk images for instance"
4981 " %s on node %s: %s" % (instance, pnode_name, msg))
4983 # also checked in the prereq part
4984 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4988 iobj.admin_up = True
4989 self.cfg.Update(iobj)
4990 logging.info("Starting instance %s on node %s", instance, pnode_name)
4991 feedback_fn("* starting instance...")
4992 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4993 msg = result.RemoteFailMsg()
4995 raise errors.OpExecError("Could not start instance: %s" % msg)
4998 class LUConnectConsole(NoHooksLU):
4999 """Connect to an instance's console.
5001 This is somewhat special in that it returns the command line that
5002 you need to run on the master node in order to connect to the
5006 _OP_REQP = ["instance_name"]
5009 def ExpandNames(self):
5010 self._ExpandAndLockInstance()
5012 def CheckPrereq(self):
5013 """Check prerequisites.
5015 This checks that the instance is in the cluster.
5018 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5019 assert self.instance is not None, \
5020 "Cannot retrieve locked instance %s" % self.op.instance_name
5021 _CheckNodeOnline(self, self.instance.primary_node)
5023 def Exec(self, feedback_fn):
5024 """Connect to the console of an instance
5027 instance = self.instance
5028 node = instance.primary_node
5030 node_insts = self.rpc.call_instance_list([node],
5031 [instance.hypervisor])[node]
5032 msg = node_insts.RemoteFailMsg()
5034 raise errors.OpExecError("Can't get node information from %s: %s" %
5037 if instance.name not in node_insts.payload:
5038 raise errors.OpExecError("Instance %s is not running." % instance.name)
5040 logging.debug("Connecting to console of %s on %s", instance.name, node)
5042 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5043 cluster = self.cfg.GetClusterInfo()
5044 # beparams and hvparams are passed separately, to avoid editing the
5045 # instance and then saving the defaults in the instance itself.
5046 hvparams = cluster.FillHV(instance)
5047 beparams = cluster.FillBE(instance)
5048 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5051 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5054 class LUReplaceDisks(LogicalUnit):
5055 """Replace the disks of an instance.
5058 HPATH = "mirrors-replace"
5059 HTYPE = constants.HTYPE_INSTANCE
5060 _OP_REQP = ["instance_name", "mode", "disks"]
5063 def CheckArguments(self):
5064 if not hasattr(self.op, "remote_node"):
5065 self.op.remote_node = None
5066 if not hasattr(self.op, "iallocator"):
5067 self.op.iallocator = None
5069 # check for valid parameter combination
5070 cnt = [self.op.remote_node, self.op.iallocator].count(None)
5071 if self.op.mode == constants.REPLACE_DISK_CHG:
5073 raise errors.OpPrereqError("When changing the secondary either an"
5074 " iallocator script must be used or the"
5077 raise errors.OpPrereqError("Give either the iallocator or the new"
5078 " secondary, not both")
5079 else: # not replacing the secondary
5081 raise errors.OpPrereqError("The iallocator and new node options can"
5082 " be used only when changing the"
5085 def ExpandNames(self):
5086 self._ExpandAndLockInstance()
5088 if self.op.iallocator is not None:
5089 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5090 elif self.op.remote_node is not None:
5091 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5092 if remote_node is None:
5093 raise errors.OpPrereqError("Node '%s' not known" %
5094 self.op.remote_node)
5095 self.op.remote_node = remote_node
5096 # Warning: do not remove the locking of the new secondary here
5097 # unless DRBD8.AddChildren is changed to work in parallel;
5098 # currently it doesn't since parallel invocations of
5099 # FindUnusedMinor will conflict
5100 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5101 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5103 self.needed_locks[locking.LEVEL_NODE] = []
5104 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5106 def DeclareLocks(self, level):
5107 # If we're not already locking all nodes in the set we have to declare the
5108 # instance's primary/secondary nodes.
5109 if (level == locking.LEVEL_NODE and
5110 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5111 self._LockInstancesNodes()
5113 def _RunAllocator(self):
5114 """Compute a new secondary node using an IAllocator.
5117 ial = IAllocator(self,
5118 mode=constants.IALLOCATOR_MODE_RELOC,
5119 name=self.op.instance_name,
5120 relocate_from=[self.sec_node])
5122 ial.Run(self.op.iallocator)
5125 raise errors.OpPrereqError("Can't compute nodes using"
5126 " iallocator '%s': %s" % (self.op.iallocator,
5128 if len(ial.nodes) != ial.required_nodes:
5129 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5130 " of nodes (%s), required %s" %
5131 (len(ial.nodes), ial.required_nodes))
5132 self.op.remote_node = ial.nodes[0]
5133 self.LogInfo("Selected new secondary for the instance: %s",
5134 self.op.remote_node)
5136 def BuildHooksEnv(self):
5139 This runs on the master, the primary and all the secondaries.
5143 "MODE": self.op.mode,
5144 "NEW_SECONDARY": self.op.remote_node,
5145 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5147 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5149 self.cfg.GetMasterNode(),
5150 self.instance.primary_node,
5152 if self.op.remote_node is not None:
5153 nl.append(self.op.remote_node)
5156 def CheckPrereq(self):
5157 """Check prerequisites.
5159 This checks that the instance is in the cluster.
5162 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5163 assert instance is not None, \
5164 "Cannot retrieve locked instance %s" % self.op.instance_name
5165 self.instance = instance
5167 if instance.disk_template != constants.DT_DRBD8:
5168 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5171 if len(instance.secondary_nodes) != 1:
5172 raise errors.OpPrereqError("The instance has a strange layout,"
5173 " expected one secondary but found %d" %
5174 len(instance.secondary_nodes))
5176 self.sec_node = instance.secondary_nodes[0]
5178 if self.op.iallocator is not None:
5179 self._RunAllocator()
5181 remote_node = self.op.remote_node
5182 if remote_node is not None:
5183 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5184 assert self.remote_node_info is not None, \
5185 "Cannot retrieve locked node %s" % remote_node
5187 self.remote_node_info = None
5188 if remote_node == instance.primary_node:
5189 raise errors.OpPrereqError("The specified node is the primary node of"
5191 elif remote_node == self.sec_node:
5192 raise errors.OpPrereqError("The specified node is already the"
5193 " secondary node of the instance.")
5195 if self.op.mode == constants.REPLACE_DISK_PRI:
5196 n1 = self.tgt_node = instance.primary_node
5197 n2 = self.oth_node = self.sec_node
5198 elif self.op.mode == constants.REPLACE_DISK_SEC:
5199 n1 = self.tgt_node = self.sec_node
5200 n2 = self.oth_node = instance.primary_node
5201 elif self.op.mode == constants.REPLACE_DISK_CHG:
5202 n1 = self.new_node = remote_node
5203 n2 = self.oth_node = instance.primary_node
5204 self.tgt_node = self.sec_node
5205 _CheckNodeNotDrained(self, remote_node)
5207 raise errors.ProgrammerError("Unhandled disk replace mode")
5209 _CheckNodeOnline(self, n1)
5210 _CheckNodeOnline(self, n2)
5212 if not self.op.disks:
5213 self.op.disks = range(len(instance.disks))
5215 for disk_idx in self.op.disks:
5216 instance.FindDisk(disk_idx)
5218 def _ExecD8DiskOnly(self, feedback_fn):
5219 """Replace a disk on the primary or secondary for dbrd8.
5221 The algorithm for replace is quite complicated:
5223 1. for each disk to be replaced:
5225 1. create new LVs on the target node with unique names
5226 1. detach old LVs from the drbd device
5227 1. rename old LVs to name_replaced.<time_t>
5228 1. rename new LVs to old LVs
5229 1. attach the new LVs (with the old names now) to the drbd device
5231 1. wait for sync across all devices
5233 1. for each modified disk:
5235 1. remove old LVs (which have the name name_replaces.<time_t>)
5237 Failures are not very well handled.
5241 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5242 instance = self.instance
5244 vgname = self.cfg.GetVGName()
5247 tgt_node = self.tgt_node
5248 oth_node = self.oth_node
5250 # Step: check device activation
5251 self.proc.LogStep(1, steps_total, "check device existence")
5252 info("checking volume groups")
5253 my_vg = cfg.GetVGName()
5254 results = self.rpc.call_vg_list([oth_node, tgt_node])
5256 raise errors.OpExecError("Can't list volume groups on the nodes")
5257 for node in oth_node, tgt_node:
5259 msg = res.RemoteFailMsg()
5261 raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5262 if my_vg not in res.payload:
5263 raise errors.OpExecError("Volume group '%s' not found on %s" %
5265 for idx, dev in enumerate(instance.disks):
5266 if idx not in self.op.disks:
5268 for node in tgt_node, oth_node:
5269 info("checking disk/%d on %s" % (idx, node))
5270 cfg.SetDiskID(dev, node)
5271 result = self.rpc.call_blockdev_find(node, dev)
5272 msg = result.RemoteFailMsg()
5273 if not msg and not result.payload:
5274 msg = "disk not found"
5276 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5279 # Step: check other node consistency
5280 self.proc.LogStep(2, steps_total, "check peer consistency")
5281 for idx, dev in enumerate(instance.disks):
5282 if idx not in self.op.disks:
5284 info("checking disk/%d consistency on %s" % (idx, oth_node))
5285 if not _CheckDiskConsistency(self, dev, oth_node,
5286 oth_node==instance.primary_node):
5287 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5288 " to replace disks on this node (%s)" %
5289 (oth_node, tgt_node))
5291 # Step: create new storage
5292 self.proc.LogStep(3, steps_total, "allocate new storage")
5293 for idx, dev in enumerate(instance.disks):
5294 if idx not in self.op.disks:
5297 cfg.SetDiskID(dev, tgt_node)
5298 lv_names = [".disk%d_%s" % (idx, suf)
5299 for suf in ["data", "meta"]]
5300 names = _GenerateUniqueNames(self, lv_names)
5301 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5302 logical_id=(vgname, names[0]))
5303 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5304 logical_id=(vgname, names[1]))
5305 new_lvs = [lv_data, lv_meta]
5306 old_lvs = dev.children
5307 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5308 info("creating new local storage on %s for %s" %
5309 (tgt_node, dev.iv_name))
5310 # we pass force_create=True to force the LVM creation
5311 for new_lv in new_lvs:
5312 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5313 _GetInstanceInfoText(instance), False)
5315 # Step: for each lv, detach+rename*2+attach
5316 self.proc.LogStep(4, steps_total, "change drbd configuration")
5317 for dev, old_lvs, new_lvs in iv_names.itervalues():
5318 info("detaching %s drbd from local storage" % dev.iv_name)
5319 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5320 msg = result.RemoteFailMsg()
5322 raise errors.OpExecError("Can't detach drbd from local storage on node"
5323 " %s for device %s: %s" %
5324 (tgt_node, dev.iv_name, msg))
5326 #cfg.Update(instance)
5328 # ok, we created the new LVs, so now we know we have the needed
5329 # storage; as such, we proceed on the target node to rename
5330 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5331 # using the assumption that logical_id == physical_id (which in
5332 # turn is the unique_id on that node)
5334 # FIXME(iustin): use a better name for the replaced LVs
5335 temp_suffix = int(time.time())
5336 ren_fn = lambda d, suff: (d.physical_id[0],
5337 d.physical_id[1] + "_replaced-%s" % suff)
5338 # build the rename list based on what LVs exist on the node
5340 for to_ren in old_lvs:
5341 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5342 if not result.RemoteFailMsg() and result.payload:
5344 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5346 info("renaming the old LVs on the target node")
5347 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5348 msg = result.RemoteFailMsg()
5350 raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5352 # now we rename the new LVs to the old LVs
5353 info("renaming the new LVs on the target node")
5354 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5355 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5356 msg = result.RemoteFailMsg()
5358 raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5361 for old, new in zip(old_lvs, new_lvs):
5362 new.logical_id = old.logical_id
5363 cfg.SetDiskID(new, tgt_node)
5365 for disk in old_lvs:
5366 disk.logical_id = ren_fn(disk, temp_suffix)
5367 cfg.SetDiskID(disk, tgt_node)
5369 # now that the new lvs have the old name, we can add them to the device
5370 info("adding new mirror component on %s" % tgt_node)
5371 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5372 msg = result.RemoteFailMsg()
5374 for new_lv in new_lvs:
5375 msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5377 warning("Can't rollback device %s: %s", dev, msg,
5378 hint="cleanup manually the unused logical volumes")
5379 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5381 dev.children = new_lvs
5382 cfg.Update(instance)
5384 # Step: wait for sync
5386 # this can fail as the old devices are degraded and _WaitForSync
5387 # does a combined result over all disks, so we don't check its
5389 self.proc.LogStep(5, steps_total, "sync devices")
5390 _WaitForSync(self, instance, unlock=True)
5392 # so check manually all the devices
5393 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5394 cfg.SetDiskID(dev, instance.primary_node)
5395 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5396 msg = result.RemoteFailMsg()
5397 if not msg and not result.payload:
5398 msg = "disk not found"
5400 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5402 if result.payload[5]:
5403 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5405 # Step: remove old storage
5406 self.proc.LogStep(6, steps_total, "removing old storage")
5407 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5408 info("remove logical volumes for %s" % name)
5410 cfg.SetDiskID(lv, tgt_node)
5411 msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5413 warning("Can't remove old LV: %s" % msg,
5414 hint="manually remove unused LVs")
5417 def _ExecD8Secondary(self, feedback_fn):
5418 """Replace the secondary node for drbd8.
5420 The algorithm for replace is quite complicated:
5421 - for all disks of the instance:
5422 - create new LVs on the new node with same names
5423 - shutdown the drbd device on the old secondary
5424 - disconnect the drbd network on the primary
5425 - create the drbd device on the new secondary
5426 - network attach the drbd on the primary, using an artifice:
5427 the drbd code for Attach() will connect to the network if it
5428 finds a device which is connected to the good local disks but
5430 - wait for sync across all devices
5431 - remove all disks from the old secondary
5433 Failures are not very well handled.
5437 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5438 instance = self.instance
5442 old_node = self.tgt_node
5443 new_node = self.new_node
5444 pri_node = instance.primary_node
5446 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5447 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5448 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5451 # Step: check device activation
5452 self.proc.LogStep(1, steps_total, "check device existence")
5453 info("checking volume groups")
5454 my_vg = cfg.GetVGName()
5455 results = self.rpc.call_vg_list([pri_node, new_node])
5456 for node in pri_node, new_node:
5458 msg = res.RemoteFailMsg()
5460 raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5461 if my_vg not in res.payload:
5462 raise errors.OpExecError("Volume group '%s' not found on %s" %
5464 for idx, dev in enumerate(instance.disks):
5465 if idx not in self.op.disks:
5467 info("checking disk/%d on %s" % (idx, pri_node))
5468 cfg.SetDiskID(dev, pri_node)
5469 result = self.rpc.call_blockdev_find(pri_node, dev)
5470 msg = result.RemoteFailMsg()
5471 if not msg and not result.payload:
5472 msg = "disk not found"
5474 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5475 (idx, pri_node, msg))
5477 # Step: check other node consistency
5478 self.proc.LogStep(2, steps_total, "check peer consistency")
5479 for idx, dev in enumerate(instance.disks):
5480 if idx not in self.op.disks:
5482 info("checking disk/%d consistency on %s" % (idx, pri_node))
5483 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5484 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5485 " unsafe to replace the secondary" %
5488 # Step: create new storage
5489 self.proc.LogStep(3, steps_total, "allocate new storage")
5490 for idx, dev in enumerate(instance.disks):
5491 info("adding new local storage on %s for disk/%d" %
5493 # we pass force_create=True to force LVM creation
5494 for new_lv in dev.children:
5495 _CreateBlockDev(self, new_node, instance, new_lv, True,
5496 _GetInstanceInfoText(instance), False)
5498 # Step 4: dbrd minors and drbd setups changes
5499 # after this, we must manually remove the drbd minors on both the
5500 # error and the success paths
5501 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5503 logging.debug("Allocated minors %s" % (minors,))
5504 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5505 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5507 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5508 # create new devices on new_node; note that we create two IDs:
5509 # one without port, so the drbd will be activated without
5510 # networking information on the new node at this stage, and one
5511 # with network, for the latter activation in step 4
5512 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5513 if pri_node == o_node1:
5518 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5519 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5521 iv_names[idx] = (dev, dev.children, new_net_id)
5522 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5524 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5525 logical_id=new_alone_id,
5526 children=dev.children)
5528 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5529 _GetInstanceInfoText(instance), False)
5530 except errors.GenericError:
5531 self.cfg.ReleaseDRBDMinors(instance.name)
5534 for idx, dev in enumerate(instance.disks):
5535 # we have new devices, shutdown the drbd on the old secondary
5536 info("shutting down drbd for disk/%d on old node" % idx)
5537 cfg.SetDiskID(dev, old_node)
5538 msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5540 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5542 hint="Please cleanup this device manually as soon as possible")
5544 info("detaching primary drbds from the network (=> standalone)")
5545 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5546 instance.disks)[pri_node]
5548 msg = result.RemoteFailMsg()
5550 # detaches didn't succeed (unlikely)
5551 self.cfg.ReleaseDRBDMinors(instance.name)
5552 raise errors.OpExecError("Can't detach the disks from the network on"
5553 " old node: %s" % (msg,))
5555 # if we managed to detach at least one, we update all the disks of
5556 # the instance to point to the new secondary
5557 info("updating instance configuration")
5558 for dev, _, new_logical_id in iv_names.itervalues():
5559 dev.logical_id = new_logical_id
5560 cfg.SetDiskID(dev, pri_node)
5561 cfg.Update(instance)
5563 # and now perform the drbd attach
5564 info("attaching primary drbds to new secondary (standalone => connected)")
5565 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5566 instance.disks, instance.name,
5568 for to_node, to_result in result.items():
5569 msg = to_result.RemoteFailMsg()
5571 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5572 hint="please do a gnt-instance info to see the"
5575 # this can fail as the old devices are degraded and _WaitForSync
5576 # does a combined result over all disks, so we don't check its
5578 self.proc.LogStep(5, steps_total, "sync devices")
5579 _WaitForSync(self, instance, unlock=True)
5581 # so check manually all the devices
5582 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5583 cfg.SetDiskID(dev, pri_node)
5584 result = self.rpc.call_blockdev_find(pri_node, dev)
5585 msg = result.RemoteFailMsg()
5586 if not msg and not result.payload:
5587 msg = "disk not found"
5589 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5591 if result.payload[5]:
5592 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5594 self.proc.LogStep(6, steps_total, "removing old storage")
5595 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5596 info("remove logical volumes for disk/%d" % idx)
5598 cfg.SetDiskID(lv, old_node)
5599 msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5601 warning("Can't remove LV on old secondary: %s", msg,
5602 hint="Cleanup stale volumes by hand")
5604 def Exec(self, feedback_fn):
5605 """Execute disk replacement.
5607 This dispatches the disk replacement to the appropriate handler.
5610 instance = self.instance
5612 # Activate the instance disks if we're replacing them on a down instance
5613 if not instance.admin_up:
5614 _StartInstanceDisks(self, instance, True)
5616 if self.op.mode == constants.REPLACE_DISK_CHG:
5617 fn = self._ExecD8Secondary
5619 fn = self._ExecD8DiskOnly
5621 ret = fn(feedback_fn)
5623 # Deactivate the instance disks if we're replacing them on a down instance
5624 if not instance.admin_up:
5625 _SafeShutdownInstanceDisks(self, instance)
5630 class LUGrowDisk(LogicalUnit):
5631 """Grow a disk of an instance.
5635 HTYPE = constants.HTYPE_INSTANCE
5636 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5639 def ExpandNames(self):
5640 self._ExpandAndLockInstance()
5641 self.needed_locks[locking.LEVEL_NODE] = []
5642 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5644 def DeclareLocks(self, level):
5645 if level == locking.LEVEL_NODE:
5646 self._LockInstancesNodes()
5648 def BuildHooksEnv(self):
5651 This runs on the master, the primary and all the secondaries.
5655 "DISK": self.op.disk,
5656 "AMOUNT": self.op.amount,
5658 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5660 self.cfg.GetMasterNode(),
5661 self.instance.primary_node,
5665 def CheckPrereq(self):
5666 """Check prerequisites.
5668 This checks that the instance is in the cluster.
5671 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5672 assert instance is not None, \
5673 "Cannot retrieve locked instance %s" % self.op.instance_name
5674 nodenames = list(instance.all_nodes)
5675 for node in nodenames:
5676 _CheckNodeOnline(self, node)
5679 self.instance = instance
5681 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5682 raise errors.OpPrereqError("Instance's disk layout does not support"
5685 self.disk = instance.FindDisk(self.op.disk)
5687 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5688 instance.hypervisor)
5689 for node in nodenames:
5690 info = nodeinfo[node]
5691 msg = info.RemoteFailMsg()
5693 raise errors.OpPrereqError("Cannot get current information"
5694 " from node %s:" % (node, msg))
5695 vg_free = info.payload.get('vg_free', None)
5696 if not isinstance(vg_free, int):
5697 raise errors.OpPrereqError("Can't compute free disk space on"
5699 if self.op.amount > vg_free:
5700 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5701 " %d MiB available, %d MiB required" %
5702 (node, vg_free, self.op.amount))
5704 def Exec(self, feedback_fn):
5705 """Execute disk grow.
5708 instance = self.instance
5710 for node in instance.all_nodes:
5711 self.cfg.SetDiskID(disk, node)
5712 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5713 msg = result.RemoteFailMsg()
5715 raise errors.OpExecError("Grow request failed to node %s: %s" %
5717 disk.RecordGrow(self.op.amount)
5718 self.cfg.Update(instance)
5719 if self.op.wait_for_sync:
5720 disk_abort = not _WaitForSync(self, instance)
5722 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5723 " status.\nPlease check the instance.")
5726 class LUQueryInstanceData(NoHooksLU):
5727 """Query runtime instance data.
5730 _OP_REQP = ["instances", "static"]
5733 def ExpandNames(self):
5734 self.needed_locks = {}
5735 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5737 if not isinstance(self.op.instances, list):
5738 raise errors.OpPrereqError("Invalid argument type 'instances'")
5740 if self.op.instances:
5741 self.wanted_names = []
5742 for name in self.op.instances:
5743 full_name = self.cfg.ExpandInstanceName(name)
5744 if full_name is None:
5745 raise errors.OpPrereqError("Instance '%s' not known" % name)
5746 self.wanted_names.append(full_name)
5747 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5749 self.wanted_names = None
5750 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5752 self.needed_locks[locking.LEVEL_NODE] = []
5753 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5755 def DeclareLocks(self, level):
5756 if level == locking.LEVEL_NODE:
5757 self._LockInstancesNodes()
5759 def CheckPrereq(self):
5760 """Check prerequisites.
5762 This only checks the optional instance list against the existing names.
5765 if self.wanted_names is None:
5766 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5768 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5769 in self.wanted_names]
5772 def _ComputeDiskStatus(self, instance, snode, dev):
5773 """Compute block device status.
5776 static = self.op.static
5778 self.cfg.SetDiskID(dev, instance.primary_node)
5779 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5780 if dev_pstatus.offline:
5783 msg = dev_pstatus.RemoteFailMsg()
5785 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5786 (instance.name, msg))
5787 dev_pstatus = dev_pstatus.payload
5791 if dev.dev_type in constants.LDS_DRBD:
5792 # we change the snode then (otherwise we use the one passed in)
5793 if dev.logical_id[0] == instance.primary_node:
5794 snode = dev.logical_id[1]
5796 snode = dev.logical_id[0]
5798 if snode and not static:
5799 self.cfg.SetDiskID(dev, snode)
5800 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5801 if dev_sstatus.offline:
5804 msg = dev_sstatus.RemoteFailMsg()
5806 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5807 (instance.name, msg))
5808 dev_sstatus = dev_sstatus.payload
5813 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5814 for child in dev.children]
5819 "iv_name": dev.iv_name,
5820 "dev_type": dev.dev_type,
5821 "logical_id": dev.logical_id,
5822 "physical_id": dev.physical_id,
5823 "pstatus": dev_pstatus,
5824 "sstatus": dev_sstatus,
5825 "children": dev_children,
5831 def Exec(self, feedback_fn):
5832 """Gather and return data"""
5835 cluster = self.cfg.GetClusterInfo()
5837 for instance in self.wanted_instances:
5838 if not self.op.static:
5839 remote_info = self.rpc.call_instance_info(instance.primary_node,
5841 instance.hypervisor)
5842 msg = remote_info.RemoteFailMsg()
5844 raise errors.OpExecError("Error checking node %s: %s" %
5845 (instance.primary_node, msg))
5846 remote_info = remote_info.payload
5847 if remote_info and "state" in remote_info:
5850 remote_state = "down"
5853 if instance.admin_up:
5856 config_state = "down"
5858 disks = [self._ComputeDiskStatus(instance, None, device)
5859 for device in instance.disks]
5862 "name": instance.name,
5863 "config_state": config_state,
5864 "run_state": remote_state,
5865 "pnode": instance.primary_node,
5866 "snodes": instance.secondary_nodes,
5868 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5870 "hypervisor": instance.hypervisor,
5871 "network_port": instance.network_port,
5872 "hv_instance": instance.hvparams,
5873 "hv_actual": cluster.FillHV(instance),
5874 "be_instance": instance.beparams,
5875 "be_actual": cluster.FillBE(instance),
5878 result[instance.name] = idict
5883 class LUSetInstanceParams(LogicalUnit):
5884 """Modifies an instances's parameters.
5887 HPATH = "instance-modify"
5888 HTYPE = constants.HTYPE_INSTANCE
5889 _OP_REQP = ["instance_name"]
5892 def CheckArguments(self):
5893 if not hasattr(self.op, 'nics'):
5895 if not hasattr(self.op, 'disks'):
5897 if not hasattr(self.op, 'beparams'):
5898 self.op.beparams = {}
5899 if not hasattr(self.op, 'hvparams'):
5900 self.op.hvparams = {}
5901 self.op.force = getattr(self.op, "force", False)
5902 if not (self.op.nics or self.op.disks or
5903 self.op.hvparams or self.op.beparams):
5904 raise errors.OpPrereqError("No changes submitted")
5908 for disk_op, disk_dict in self.op.disks:
5909 if disk_op == constants.DDM_REMOVE:
5912 elif disk_op == constants.DDM_ADD:
5915 if not isinstance(disk_op, int):
5916 raise errors.OpPrereqError("Invalid disk index")
5917 if disk_op == constants.DDM_ADD:
5918 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5919 if mode not in constants.DISK_ACCESS_SET:
5920 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5921 size = disk_dict.get('size', None)
5923 raise errors.OpPrereqError("Required disk parameter size missing")
5926 except ValueError, err:
5927 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5929 disk_dict['size'] = size
5931 # modification of disk
5932 if 'size' in disk_dict:
5933 raise errors.OpPrereqError("Disk size change not possible, use"
5936 if disk_addremove > 1:
5937 raise errors.OpPrereqError("Only one disk add or remove operation"
5938 " supported at a time")
5942 for nic_op, nic_dict in self.op.nics:
5943 if nic_op == constants.DDM_REMOVE:
5946 elif nic_op == constants.DDM_ADD:
5949 if not isinstance(nic_op, int):
5950 raise errors.OpPrereqError("Invalid nic index")
5952 # nic_dict should be a dict
5953 nic_ip = nic_dict.get('ip', None)
5954 if nic_ip is not None:
5955 if nic_ip.lower() == constants.VALUE_NONE:
5956 nic_dict['ip'] = None
5958 if not utils.IsValidIP(nic_ip):
5959 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5961 nic_bridge = nic_dict.get('bridge', None)
5962 nic_link = nic_dict.get('link', None)
5963 if nic_bridge and nic_link:
5964 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5965 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5966 nic_dict['bridge'] = None
5967 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5968 nic_dict['link'] = None
5970 if nic_op == constants.DDM_ADD:
5971 nic_mac = nic_dict.get('mac', None)
5973 nic_dict['mac'] = constants.VALUE_AUTO
5975 if 'mac' in nic_dict:
5976 nic_mac = nic_dict['mac']
5977 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5978 if not utils.IsValidMac(nic_mac):
5979 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5980 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5981 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5982 " modifying an existing nic")
5984 if nic_addremove > 1:
5985 raise errors.OpPrereqError("Only one NIC add or remove operation"
5986 " supported at a time")
5988 def ExpandNames(self):
5989 self._ExpandAndLockInstance()
5990 self.needed_locks[locking.LEVEL_NODE] = []
5991 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5993 def DeclareLocks(self, level):
5994 if level == locking.LEVEL_NODE:
5995 self._LockInstancesNodes()
5997 def BuildHooksEnv(self):
6000 This runs on the master, primary and secondaries.
6004 if constants.BE_MEMORY in self.be_new:
6005 args['memory'] = self.be_new[constants.BE_MEMORY]
6006 if constants.BE_VCPUS in self.be_new:
6007 args['vcpus'] = self.be_new[constants.BE_VCPUS]
6008 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6009 # information at all.
6012 nic_override = dict(self.op.nics)
6013 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6014 for idx, nic in enumerate(self.instance.nics):
6015 if idx in nic_override:
6016 this_nic_override = nic_override[idx]
6018 this_nic_override = {}
6019 if 'ip' in this_nic_override:
6020 ip = this_nic_override['ip']
6023 if 'mac' in this_nic_override:
6024 mac = this_nic_override['mac']
6027 if idx in self.nic_pnew:
6028 nicparams = self.nic_pnew[idx]
6030 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6031 mode = nicparams[constants.NIC_MODE]
6032 link = nicparams[constants.NIC_LINK]
6033 args['nics'].append((ip, mac, mode, link))
6034 if constants.DDM_ADD in nic_override:
6035 ip = nic_override[constants.DDM_ADD].get('ip', None)
6036 mac = nic_override[constants.DDM_ADD]['mac']
6037 nicparams = self.nic_pnew[constants.DDM_ADD]
6038 mode = nicparams[constants.NIC_MODE]
6039 link = nicparams[constants.NIC_LINK]
6040 args['nics'].append((ip, mac, mode, link))
6041 elif constants.DDM_REMOVE in nic_override:
6042 del args['nics'][-1]
6044 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6045 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6048 def _GetUpdatedParams(self, old_params, update_dict,
6049 default_values, parameter_types):
6050 """Return the new params dict for the given params.
6052 @type old_params: dict
6053 @type old_params: old parameters
6054 @type update_dict: dict
6055 @type update_dict: dict containing new parameter values,
6056 or constants.VALUE_DEFAULT to reset the
6057 parameter to its default value
6058 @type default_values: dict
6059 @param default_values: default values for the filled parameters
6060 @type parameter_types: dict
6061 @param parameter_types: dict mapping target dict keys to types
6062 in constants.ENFORCEABLE_TYPES
6063 @rtype: (dict, dict)
6064 @return: (new_parameters, filled_parameters)
6067 params_copy = copy.deepcopy(old_params)
6068 for key, val in update_dict.iteritems():
6069 if val == constants.VALUE_DEFAULT:
6071 del params_copy[key]
6075 params_copy[key] = val
6076 utils.ForceDictType(params_copy, parameter_types)
6077 params_filled = objects.FillDict(default_values, params_copy)
6078 return (params_copy, params_filled)
6080 def CheckPrereq(self):
6081 """Check prerequisites.
6083 This only checks the instance list against the existing names.
6086 force = self.force = self.op.force
6088 # checking the new params on the primary/secondary nodes
6090 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6091 cluster = self.cluster = self.cfg.GetClusterInfo()
6092 assert self.instance is not None, \
6093 "Cannot retrieve locked instance %s" % self.op.instance_name
6094 pnode = instance.primary_node
6095 nodelist = list(instance.all_nodes)
6097 # hvparams processing
6098 if self.op.hvparams:
6099 i_hvdict, hv_new = self._GetUpdatedParams(
6100 instance.hvparams, self.op.hvparams,
6101 cluster.hvparams[instance.hypervisor],
6102 constants.HVS_PARAMETER_TYPES)
6104 hypervisor.GetHypervisor(
6105 instance.hypervisor).CheckParameterSyntax(hv_new)
6106 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6107 self.hv_new = hv_new # the new actual values
6108 self.hv_inst = i_hvdict # the new dict (without defaults)
6110 self.hv_new = self.hv_inst = {}
6112 # beparams processing
6113 if self.op.beparams:
6114 i_bedict, be_new = self._GetUpdatedParams(
6115 instance.beparams, self.op.beparams,
6116 cluster.beparams[constants.PP_DEFAULT],
6117 constants.BES_PARAMETER_TYPES)
6118 self.be_new = be_new # the new actual values
6119 self.be_inst = i_bedict # the new dict (without defaults)
6121 self.be_new = self.be_inst = {}
6125 if constants.BE_MEMORY in self.op.beparams and not self.force:
6126 mem_check_list = [pnode]
6127 if be_new[constants.BE_AUTO_BALANCE]:
6128 # either we changed auto_balance to yes or it was from before
6129 mem_check_list.extend(instance.secondary_nodes)
6130 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6131 instance.hypervisor)
6132 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6133 instance.hypervisor)
6134 pninfo = nodeinfo[pnode]
6135 msg = pninfo.RemoteFailMsg()
6137 # Assume the primary node is unreachable and go ahead
6138 self.warn.append("Can't get info from primary node %s: %s" %
6140 elif not isinstance(pninfo.payload.get('memory_free', None), int):
6141 self.warn.append("Node data from primary node %s doesn't contain"
6142 " free memory information" % pnode)
6143 elif instance_info.RemoteFailMsg():
6144 self.warn.append("Can't get instance runtime information: %s" %
6145 instance_info.RemoteFailMsg())
6147 if instance_info.payload:
6148 current_mem = int(instance_info.payload['memory'])
6150 # Assume instance not running
6151 # (there is a slight race condition here, but it's not very probable,
6152 # and we have no other way to check)
6154 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6155 pninfo.payload['memory_free'])
6157 raise errors.OpPrereqError("This change will prevent the instance"
6158 " from starting, due to %d MB of memory"
6159 " missing on its primary node" % miss_mem)
6161 if be_new[constants.BE_AUTO_BALANCE]:
6162 for node, nres in nodeinfo.items():
6163 if node not in instance.secondary_nodes:
6165 msg = nres.RemoteFailMsg()
6167 self.warn.append("Can't get info from secondary node %s: %s" %
6169 elif not isinstance(nres.payload.get('memory_free', None), int):
6170 self.warn.append("Secondary node %s didn't return free"
6171 " memory information" % node)
6172 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6173 self.warn.append("Not enough memory to failover instance to"
6174 " secondary node %s" % node)
6179 for nic_op, nic_dict in self.op.nics:
6180 if nic_op == constants.DDM_REMOVE:
6181 if not instance.nics:
6182 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6184 if nic_op != constants.DDM_ADD:
6186 if nic_op < 0 or nic_op >= len(instance.nics):
6187 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6189 (nic_op, len(instance.nics)))
6190 old_nic_params = instance.nics[nic_op].nicparams
6191 old_nic_ip = instance.nics[nic_op].ip
6196 update_params_dict = dict([(key, nic_dict[key])
6197 for key in constants.NICS_PARAMETERS
6198 if key in nic_dict])
6200 if 'bridge' in nic_dict:
6201 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6203 new_nic_params, new_filled_nic_params = \
6204 self._GetUpdatedParams(old_nic_params, update_params_dict,
6205 cluster.nicparams[constants.PP_DEFAULT],
6206 constants.NICS_PARAMETER_TYPES)
6207 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6208 self.nic_pinst[nic_op] = new_nic_params
6209 self.nic_pnew[nic_op] = new_filled_nic_params
6210 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6212 if new_nic_mode == constants.NIC_MODE_BRIDGED:
6213 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6214 result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6215 msg = result.RemoteFailMsg()
6217 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6219 self.warn.append(msg)
6221 raise errors.OpPrereqError(msg)
6222 if new_nic_mode == constants.NIC_MODE_ROUTED:
6223 if 'ip' in nic_dict:
6224 nic_ip = nic_dict['ip']
6228 raise errors.OpPrereqError('Cannot set the nic ip to None'
6230 if 'mac' in nic_dict:
6231 nic_mac = nic_dict['mac']
6233 raise errors.OpPrereqError('Cannot set the nic mac to None')
6234 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6235 # otherwise generate the mac
6236 nic_dict['mac'] = self.cfg.GenerateMAC()
6238 # or validate/reserve the current one
6239 if self.cfg.IsMacInUse(nic_mac):
6240 raise errors.OpPrereqError("MAC address %s already in use"
6241 " in cluster" % nic_mac)
6244 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6245 raise errors.OpPrereqError("Disk operations not supported for"
6246 " diskless instances")
6247 for disk_op, disk_dict in self.op.disks:
6248 if disk_op == constants.DDM_REMOVE:
6249 if len(instance.disks) == 1:
6250 raise errors.OpPrereqError("Cannot remove the last disk of"
6252 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6253 ins_l = ins_l[pnode]
6254 msg = ins_l.RemoteFailMsg()
6256 raise errors.OpPrereqError("Can't contact node %s: %s" %
6258 if instance.name in ins_l.payload:
6259 raise errors.OpPrereqError("Instance is running, can't remove"
6262 if (disk_op == constants.DDM_ADD and
6263 len(instance.nics) >= constants.MAX_DISKS):
6264 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6265 " add more" % constants.MAX_DISKS)
6266 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6268 if disk_op < 0 or disk_op >= len(instance.disks):
6269 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6271 (disk_op, len(instance.disks)))
6275 def Exec(self, feedback_fn):
6276 """Modifies an instance.
6278 All parameters take effect only at the next restart of the instance.
6281 # Process here the warnings from CheckPrereq, as we don't have a
6282 # feedback_fn there.
6283 for warn in self.warn:
6284 feedback_fn("WARNING: %s" % warn)
6287 instance = self.instance
6288 cluster = self.cluster
6290 for disk_op, disk_dict in self.op.disks:
6291 if disk_op == constants.DDM_REMOVE:
6292 # remove the last disk
6293 device = instance.disks.pop()
6294 device_idx = len(instance.disks)
6295 for node, disk in device.ComputeNodeTree(instance.primary_node):
6296 self.cfg.SetDiskID(disk, node)
6297 msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6299 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6300 " continuing anyway", device_idx, node, msg)
6301 result.append(("disk/%d" % device_idx, "remove"))
6302 elif disk_op == constants.DDM_ADD:
6304 if instance.disk_template == constants.DT_FILE:
6305 file_driver, file_path = instance.disks[0].logical_id
6306 file_path = os.path.dirname(file_path)
6308 file_driver = file_path = None
6309 disk_idx_base = len(instance.disks)
6310 new_disk = _GenerateDiskTemplate(self,
6311 instance.disk_template,
6312 instance.name, instance.primary_node,
6313 instance.secondary_nodes,
6318 instance.disks.append(new_disk)
6319 info = _GetInstanceInfoText(instance)
6321 logging.info("Creating volume %s for instance %s",
6322 new_disk.iv_name, instance.name)
6323 # Note: this needs to be kept in sync with _CreateDisks
6325 for node in instance.all_nodes:
6326 f_create = node == instance.primary_node
6328 _CreateBlockDev(self, node, instance, new_disk,
6329 f_create, info, f_create)
6330 except errors.OpExecError, err:
6331 self.LogWarning("Failed to create volume %s (%s) on"
6333 new_disk.iv_name, new_disk, node, err)
6334 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6335 (new_disk.size, new_disk.mode)))
6337 # change a given disk
6338 instance.disks[disk_op].mode = disk_dict['mode']
6339 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6341 for nic_op, nic_dict in self.op.nics:
6342 if nic_op == constants.DDM_REMOVE:
6343 # remove the last nic
6344 del instance.nics[-1]
6345 result.append(("nic.%d" % len(instance.nics), "remove"))
6346 elif nic_op == constants.DDM_ADD:
6347 # mac and bridge should be set, by now
6348 mac = nic_dict['mac']
6349 ip = nic_dict.get('ip', None)
6350 nicparams = self.nic_pinst[constants.DDM_ADD]
6351 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6352 instance.nics.append(new_nic)
6353 result.append(("nic.%d" % (len(instance.nics) - 1),
6354 "add:mac=%s,ip=%s,mode=%s,link=%s" %
6355 (new_nic.mac, new_nic.ip,
6356 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6357 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6360 for key in 'mac', 'ip':
6362 setattr(instance.nics[nic_op], key, nic_dict[key])
6363 if nic_op in self.nic_pnew:
6364 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6365 for key, val in nic_dict.iteritems():
6366 result.append(("nic.%s/%d" % (key, nic_op), val))
6369 if self.op.hvparams:
6370 instance.hvparams = self.hv_inst
6371 for key, val in self.op.hvparams.iteritems():
6372 result.append(("hv/%s" % key, val))
6375 if self.op.beparams:
6376 instance.beparams = self.be_inst
6377 for key, val in self.op.beparams.iteritems():
6378 result.append(("be/%s" % key, val))
6380 self.cfg.Update(instance)
6385 class LUQueryExports(NoHooksLU):
6386 """Query the exports list
6389 _OP_REQP = ['nodes']
6392 def ExpandNames(self):
6393 self.needed_locks = {}
6394 self.share_locks[locking.LEVEL_NODE] = 1
6395 if not self.op.nodes:
6396 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6398 self.needed_locks[locking.LEVEL_NODE] = \
6399 _GetWantedNodes(self, self.op.nodes)
6401 def CheckPrereq(self):
6402 """Check prerequisites.
6405 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6407 def Exec(self, feedback_fn):
6408 """Compute the list of all the exported system images.
6411 @return: a dictionary with the structure node->(export-list)
6412 where export-list is a list of the instances exported on
6416 rpcresult = self.rpc.call_export_list(self.nodes)
6418 for node in rpcresult:
6419 if rpcresult[node].RemoteFailMsg():
6420 result[node] = False
6422 result[node] = rpcresult[node].payload
6427 class LUExportInstance(LogicalUnit):
6428 """Export an instance to an image in the cluster.
6431 HPATH = "instance-export"
6432 HTYPE = constants.HTYPE_INSTANCE
6433 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6436 def ExpandNames(self):
6437 self._ExpandAndLockInstance()
6438 # FIXME: lock only instance primary and destination node
6440 # Sad but true, for now we have do lock all nodes, as we don't know where
6441 # the previous export might be, and and in this LU we search for it and
6442 # remove it from its current node. In the future we could fix this by:
6443 # - making a tasklet to search (share-lock all), then create the new one,
6444 # then one to remove, after
6445 # - removing the removal operation altoghether
6446 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6448 def DeclareLocks(self, level):
6449 """Last minute lock declaration."""
6450 # All nodes are locked anyway, so nothing to do here.
6452 def BuildHooksEnv(self):
6455 This will run on the master, primary node and target node.
6459 "EXPORT_NODE": self.op.target_node,
6460 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6462 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6463 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6464 self.op.target_node]
6467 def CheckPrereq(self):
6468 """Check prerequisites.
6470 This checks that the instance and node names are valid.
6473 instance_name = self.op.instance_name
6474 self.instance = self.cfg.GetInstanceInfo(instance_name)
6475 assert self.instance is not None, \
6476 "Cannot retrieve locked instance %s" % self.op.instance_name
6477 _CheckNodeOnline(self, self.instance.primary_node)
6479 self.dst_node = self.cfg.GetNodeInfo(
6480 self.cfg.ExpandNodeName(self.op.target_node))
6482 if self.dst_node is None:
6483 # This is wrong node name, not a non-locked node
6484 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6485 _CheckNodeOnline(self, self.dst_node.name)
6486 _CheckNodeNotDrained(self, self.dst_node.name)
6488 # instance disk type verification
6489 for disk in self.instance.disks:
6490 if disk.dev_type == constants.LD_FILE:
6491 raise errors.OpPrereqError("Export not supported for instances with"
6492 " file-based disks")
6494 def Exec(self, feedback_fn):
6495 """Export an instance to an image in the cluster.
6498 instance = self.instance
6499 dst_node = self.dst_node
6500 src_node = instance.primary_node
6501 if self.op.shutdown:
6502 # shutdown the instance, but not the disks
6503 result = self.rpc.call_instance_shutdown(src_node, instance)
6504 msg = result.RemoteFailMsg()
6506 raise errors.OpExecError("Could not shutdown instance %s on"
6508 (instance.name, src_node, msg))
6510 vgname = self.cfg.GetVGName()
6514 # set the disks ID correctly since call_instance_start needs the
6515 # correct drbd minor to create the symlinks
6516 for disk in instance.disks:
6517 self.cfg.SetDiskID(disk, src_node)
6520 for disk in instance.disks:
6521 # result.payload will be a snapshot of an lvm leaf of the one we passed
6522 result = self.rpc.call_blockdev_snapshot(src_node, disk)
6523 msg = result.RemoteFailMsg()
6525 self.LogWarning("Could not snapshot block device %s on node %s: %s",
6526 disk.logical_id[1], src_node, msg)
6527 snap_disks.append(False)
6529 disk_id = (vgname, result.payload)
6530 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6531 logical_id=disk_id, physical_id=disk_id,
6532 iv_name=disk.iv_name)
6533 snap_disks.append(new_dev)
6536 if self.op.shutdown and instance.admin_up:
6537 result = self.rpc.call_instance_start(src_node, instance, None, None)
6538 msg = result.RemoteFailMsg()
6540 _ShutdownInstanceDisks(self, instance)
6541 raise errors.OpExecError("Could not start instance: %s" % msg)
6543 # TODO: check for size
6545 cluster_name = self.cfg.GetClusterName()
6546 for idx, dev in enumerate(snap_disks):
6548 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6549 instance, cluster_name, idx)
6550 msg = result.RemoteFailMsg()
6552 self.LogWarning("Could not export block device %s from node %s to"
6553 " node %s: %s", dev.logical_id[1], src_node,
6555 msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6557 self.LogWarning("Could not remove snapshot block device %s from node"
6558 " %s: %s", dev.logical_id[1], src_node, msg)
6560 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6561 msg = result.RemoteFailMsg()
6563 self.LogWarning("Could not finalize export for instance %s"
6564 " on node %s: %s", instance.name, dst_node.name, msg)
6566 nodelist = self.cfg.GetNodeList()
6567 nodelist.remove(dst_node.name)
6569 # on one-node clusters nodelist will be empty after the removal
6570 # if we proceed the backup would be removed because OpQueryExports
6571 # substitutes an empty list with the full cluster node list.
6572 iname = instance.name
6574 exportlist = self.rpc.call_export_list(nodelist)
6575 for node in exportlist:
6576 if exportlist[node].RemoteFailMsg():
6578 if iname in exportlist[node].payload:
6579 msg = self.rpc.call_export_remove(node, iname).RemoteFailMsg()
6581 self.LogWarning("Could not remove older export for instance %s"
6582 " on node %s: %s", iname, node, msg)
6585 class LURemoveExport(NoHooksLU):
6586 """Remove exports related to the named instance.
6589 _OP_REQP = ["instance_name"]
6592 def ExpandNames(self):
6593 self.needed_locks = {}
6594 # We need all nodes to be locked in order for RemoveExport to work, but we
6595 # don't need to lock the instance itself, as nothing will happen to it (and
6596 # we can remove exports also for a removed instance)
6597 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6599 def CheckPrereq(self):
6600 """Check prerequisites.
6604 def Exec(self, feedback_fn):
6605 """Remove any export.
6608 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6609 # If the instance was not found we'll try with the name that was passed in.
6610 # This will only work if it was an FQDN, though.
6612 if not instance_name:
6614 instance_name = self.op.instance_name
6616 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6617 exportlist = self.rpc.call_export_list(locked_nodes)
6619 for node in exportlist:
6620 msg = exportlist[node].RemoteFailMsg()
6622 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6624 if instance_name in exportlist[node].payload:
6626 result = self.rpc.call_export_remove(node, instance_name)
6627 msg = result.RemoteFailMsg()
6629 logging.error("Could not remove export for instance %s"
6630 " on node %s: %s", instance_name, node, msg)
6632 if fqdn_warn and not found:
6633 feedback_fn("Export not found. If trying to remove an export belonging"
6634 " to a deleted instance please use its Fully Qualified"
6638 class TagsLU(NoHooksLU):
6641 This is an abstract class which is the parent of all the other tags LUs.
6645 def ExpandNames(self):
6646 self.needed_locks = {}
6647 if self.op.kind == constants.TAG_NODE:
6648 name = self.cfg.ExpandNodeName(self.op.name)
6650 raise errors.OpPrereqError("Invalid node name (%s)" %
6653 self.needed_locks[locking.LEVEL_NODE] = name
6654 elif self.op.kind == constants.TAG_INSTANCE:
6655 name = self.cfg.ExpandInstanceName(self.op.name)
6657 raise errors.OpPrereqError("Invalid instance name (%s)" %
6660 self.needed_locks[locking.LEVEL_INSTANCE] = name
6662 def CheckPrereq(self):
6663 """Check prerequisites.
6666 if self.op.kind == constants.TAG_CLUSTER:
6667 self.target = self.cfg.GetClusterInfo()
6668 elif self.op.kind == constants.TAG_NODE:
6669 self.target = self.cfg.GetNodeInfo(self.op.name)
6670 elif self.op.kind == constants.TAG_INSTANCE:
6671 self.target = self.cfg.GetInstanceInfo(self.op.name)
6673 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6677 class LUGetTags(TagsLU):
6678 """Returns the tags of a given object.
6681 _OP_REQP = ["kind", "name"]
6684 def Exec(self, feedback_fn):
6685 """Returns the tag list.
6688 return list(self.target.GetTags())
6691 class LUSearchTags(NoHooksLU):
6692 """Searches the tags for a given pattern.
6695 _OP_REQP = ["pattern"]
6698 def ExpandNames(self):
6699 self.needed_locks = {}
6701 def CheckPrereq(self):
6702 """Check prerequisites.
6704 This checks the pattern passed for validity by compiling it.
6708 self.re = re.compile(self.op.pattern)
6709 except re.error, err:
6710 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6711 (self.op.pattern, err))
6713 def Exec(self, feedback_fn):
6714 """Returns the tag list.
6718 tgts = [("/cluster", cfg.GetClusterInfo())]
6719 ilist = cfg.GetAllInstancesInfo().values()
6720 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6721 nlist = cfg.GetAllNodesInfo().values()
6722 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6724 for path, target in tgts:
6725 for tag in target.GetTags():
6726 if self.re.search(tag):
6727 results.append((path, tag))
6731 class LUAddTags(TagsLU):
6732 """Sets a tag on a given object.
6735 _OP_REQP = ["kind", "name", "tags"]
6738 def CheckPrereq(self):
6739 """Check prerequisites.
6741 This checks the type and length of the tag name and value.
6744 TagsLU.CheckPrereq(self)
6745 for tag in self.op.tags:
6746 objects.TaggableObject.ValidateTag(tag)
6748 def Exec(self, feedback_fn):
6753 for tag in self.op.tags:
6754 self.target.AddTag(tag)
6755 except errors.TagError, err:
6756 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6758 self.cfg.Update(self.target)
6759 except errors.ConfigurationError:
6760 raise errors.OpRetryError("There has been a modification to the"
6761 " config file and the operation has been"
6762 " aborted. Please retry.")
6765 class LUDelTags(TagsLU):
6766 """Delete a list of tags from a given object.
6769 _OP_REQP = ["kind", "name", "tags"]
6772 def CheckPrereq(self):
6773 """Check prerequisites.
6775 This checks that we have the given tag.
6778 TagsLU.CheckPrereq(self)
6779 for tag in self.op.tags:
6780 objects.TaggableObject.ValidateTag(tag)
6781 del_tags = frozenset(self.op.tags)
6782 cur_tags = self.target.GetTags()
6783 if not del_tags <= cur_tags:
6784 diff_tags = del_tags - cur_tags
6785 diff_names = ["'%s'" % tag for tag in diff_tags]
6787 raise errors.OpPrereqError("Tag(s) %s not found" %
6788 (",".join(diff_names)))
6790 def Exec(self, feedback_fn):
6791 """Remove the tag from the object.
6794 for tag in self.op.tags:
6795 self.target.RemoveTag(tag)
6797 self.cfg.Update(self.target)
6798 except errors.ConfigurationError:
6799 raise errors.OpRetryError("There has been a modification to the"
6800 " config file and the operation has been"
6801 " aborted. Please retry.")
6804 class LUTestDelay(NoHooksLU):
6805 """Sleep for a specified amount of time.
6807 This LU sleeps on the master and/or nodes for a specified amount of
6811 _OP_REQP = ["duration", "on_master", "on_nodes"]
6814 def ExpandNames(self):
6815 """Expand names and set required locks.
6817 This expands the node list, if any.
6820 self.needed_locks = {}
6821 if self.op.on_nodes:
6822 # _GetWantedNodes can be used here, but is not always appropriate to use
6823 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6825 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6826 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6828 def CheckPrereq(self):
6829 """Check prerequisites.
6833 def Exec(self, feedback_fn):
6834 """Do the actual sleep.
6837 if self.op.on_master:
6838 if not utils.TestDelay(self.op.duration):
6839 raise errors.OpExecError("Error during master delay test")
6840 if self.op.on_nodes:
6841 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6843 raise errors.OpExecError("Complete failure from rpc call")
6844 for node, node_result in result.items():
6846 if not node_result.data:
6847 raise errors.OpExecError("Failure during rpc call to node %s,"
6848 " result: %s" % (node, node_result.data))
6851 class IAllocator(object):
6852 """IAllocator framework.
6854 An IAllocator instance has three sets of attributes:
6855 - cfg that is needed to query the cluster
6856 - input data (all members of the _KEYS class attribute are required)
6857 - four buffer attributes (in|out_data|text), that represent the
6858 input (to the external script) in text and data structure format,
6859 and the output from it, again in two formats
6860 - the result variables from the script (success, info, nodes) for
6865 "mem_size", "disks", "disk_template",
6866 "os", "tags", "nics", "vcpus", "hypervisor",
6872 def __init__(self, lu, mode, name, **kwargs):
6874 # init buffer variables
6875 self.in_text = self.out_text = self.in_data = self.out_data = None
6876 # init all input fields so that pylint is happy
6879 self.mem_size = self.disks = self.disk_template = None
6880 self.os = self.tags = self.nics = self.vcpus = None
6881 self.hypervisor = None
6882 self.relocate_from = None
6884 self.required_nodes = None
6885 # init result fields
6886 self.success = self.info = self.nodes = None
6887 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6888 keyset = self._ALLO_KEYS
6889 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6890 keyset = self._RELO_KEYS
6892 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6893 " IAllocator" % self.mode)
6895 if key not in keyset:
6896 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6897 " IAllocator" % key)
6898 setattr(self, key, kwargs[key])
6900 if key not in kwargs:
6901 raise errors.ProgrammerError("Missing input parameter '%s' to"
6902 " IAllocator" % key)
6903 self._BuildInputData()
6905 def _ComputeClusterData(self):
6906 """Compute the generic allocator input data.
6908 This is the data that is independent of the actual operation.
6912 cluster_info = cfg.GetClusterInfo()
6915 "version": constants.IALLOCATOR_VERSION,
6916 "cluster_name": cfg.GetClusterName(),
6917 "cluster_tags": list(cluster_info.GetTags()),
6918 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6919 # we don't have job IDs
6921 iinfo = cfg.GetAllInstancesInfo().values()
6922 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6926 node_list = cfg.GetNodeList()
6928 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6929 hypervisor_name = self.hypervisor
6930 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6931 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6933 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6935 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6936 cluster_info.enabled_hypervisors)
6937 for nname, nresult in node_data.items():
6938 # first fill in static (config-based) values
6939 ninfo = cfg.GetNodeInfo(nname)
6941 "tags": list(ninfo.GetTags()),
6942 "primary_ip": ninfo.primary_ip,
6943 "secondary_ip": ninfo.secondary_ip,
6944 "offline": ninfo.offline,
6945 "drained": ninfo.drained,
6946 "master_candidate": ninfo.master_candidate,
6949 if not ninfo.offline:
6950 msg = nresult.RemoteFailMsg()
6952 raise errors.OpExecError("Can't get data for node %s: %s" %
6954 msg = node_iinfo[nname].RemoteFailMsg()
6956 raise errors.OpExecError("Can't get node instance info"
6957 " from node %s: %s" % (nname, msg))
6958 remote_info = nresult.payload
6959 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6960 'vg_size', 'vg_free', 'cpu_total']:
6961 if attr not in remote_info:
6962 raise errors.OpExecError("Node '%s' didn't return attribute"
6963 " '%s'" % (nname, attr))
6964 if not isinstance(remote_info[attr], int):
6965 raise errors.OpExecError("Node '%s' returned invalid value"
6967 (nname, attr, remote_info[attr]))
6968 # compute memory used by primary instances
6969 i_p_mem = i_p_up_mem = 0
6970 for iinfo, beinfo in i_list:
6971 if iinfo.primary_node == nname:
6972 i_p_mem += beinfo[constants.BE_MEMORY]
6973 if iinfo.name not in node_iinfo[nname].payload:
6976 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6977 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6978 remote_info['memory_free'] -= max(0, i_mem_diff)
6981 i_p_up_mem += beinfo[constants.BE_MEMORY]
6983 # compute memory used by instances
6985 "total_memory": remote_info['memory_total'],
6986 "reserved_memory": remote_info['memory_dom0'],
6987 "free_memory": remote_info['memory_free'],
6988 "total_disk": remote_info['vg_size'],
6989 "free_disk": remote_info['vg_free'],
6990 "total_cpus": remote_info['cpu_total'],
6991 "i_pri_memory": i_p_mem,
6992 "i_pri_up_memory": i_p_up_mem,
6996 node_results[nname] = pnr
6997 data["nodes"] = node_results
7001 for iinfo, beinfo in i_list:
7003 for nic in iinfo.nics:
7004 filled_params = objects.FillDict(
7005 cluster_info.nicparams[constants.PP_DEFAULT],
7007 nic_dict = {"mac": nic.mac,
7009 "mode": filled_params[constants.NIC_MODE],
7010 "link": filled_params[constants.NIC_LINK],
7012 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7013 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7014 nic_data.append(nic_dict)
7016 "tags": list(iinfo.GetTags()),
7017 "admin_up": iinfo.admin_up,
7018 "vcpus": beinfo[constants.BE_VCPUS],
7019 "memory": beinfo[constants.BE_MEMORY],
7021 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7023 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7024 "disk_template": iinfo.disk_template,
7025 "hypervisor": iinfo.hypervisor,
7027 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7029 instance_data[iinfo.name] = pir
7031 data["instances"] = instance_data
7035 def _AddNewInstance(self):
7036 """Add new instance data to allocator structure.
7038 This in combination with _AllocatorGetClusterData will create the
7039 correct structure needed as input for the allocator.
7041 The checks for the completeness of the opcode must have already been
7047 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7049 if self.disk_template in constants.DTS_NET_MIRROR:
7050 self.required_nodes = 2
7052 self.required_nodes = 1
7056 "disk_template": self.disk_template,
7059 "vcpus": self.vcpus,
7060 "memory": self.mem_size,
7061 "disks": self.disks,
7062 "disk_space_total": disk_space,
7064 "required_nodes": self.required_nodes,
7066 data["request"] = request
7068 def _AddRelocateInstance(self):
7069 """Add relocate instance data to allocator structure.
7071 This in combination with _IAllocatorGetClusterData will create the
7072 correct structure needed as input for the allocator.
7074 The checks for the completeness of the opcode must have already been
7078 instance = self.lu.cfg.GetInstanceInfo(self.name)
7079 if instance is None:
7080 raise errors.ProgrammerError("Unknown instance '%s' passed to"
7081 " IAllocator" % self.name)
7083 if instance.disk_template not in constants.DTS_NET_MIRROR:
7084 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7086 if len(instance.secondary_nodes) != 1:
7087 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7089 self.required_nodes = 1
7090 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7091 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7096 "disk_space_total": disk_space,
7097 "required_nodes": self.required_nodes,
7098 "relocate_from": self.relocate_from,
7100 self.in_data["request"] = request
7102 def _BuildInputData(self):
7103 """Build input data structures.
7106 self._ComputeClusterData()
7108 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7109 self._AddNewInstance()
7111 self._AddRelocateInstance()
7113 self.in_text = serializer.Dump(self.in_data)
7115 def Run(self, name, validate=True, call_fn=None):
7116 """Run an instance allocator and return the results.
7120 call_fn = self.lu.rpc.call_iallocator_runner
7123 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7126 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7127 raise errors.OpExecError("Invalid result from master iallocator runner")
7129 rcode, stdout, stderr, fail = result.data
7131 if rcode == constants.IARUN_NOTFOUND:
7132 raise errors.OpExecError("Can't find allocator '%s'" % name)
7133 elif rcode == constants.IARUN_FAILURE:
7134 raise errors.OpExecError("Instance allocator call failed: %s,"
7135 " output: %s" % (fail, stdout+stderr))
7136 self.out_text = stdout
7138 self._ValidateResult()
7140 def _ValidateResult(self):
7141 """Process the allocator results.
7143 This will process and if successful save the result in
7144 self.out_data and the other parameters.
7148 rdict = serializer.Load(self.out_text)
7149 except Exception, err:
7150 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7152 if not isinstance(rdict, dict):
7153 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7155 for key in "success", "info", "nodes":
7156 if key not in rdict:
7157 raise errors.OpExecError("Can't parse iallocator results:"
7158 " missing key '%s'" % key)
7159 setattr(self, key, rdict[key])
7161 if not isinstance(rdict["nodes"], list):
7162 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7164 self.out_data = rdict
7167 class LUTestAllocator(NoHooksLU):
7168 """Run allocator tests.
7170 This LU runs the allocator tests
7173 _OP_REQP = ["direction", "mode", "name"]
7175 def CheckPrereq(self):
7176 """Check prerequisites.
7178 This checks the opcode parameters depending on the director and mode test.
7181 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7182 for attr in ["name", "mem_size", "disks", "disk_template",
7183 "os", "tags", "nics", "vcpus"]:
7184 if not hasattr(self.op, attr):
7185 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7187 iname = self.cfg.ExpandInstanceName(self.op.name)
7188 if iname is not None:
7189 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7191 if not isinstance(self.op.nics, list):
7192 raise errors.OpPrereqError("Invalid parameter 'nics'")
7193 for row in self.op.nics:
7194 if (not isinstance(row, dict) or
7197 "bridge" not in row):
7198 raise errors.OpPrereqError("Invalid contents of the"
7199 " 'nics' parameter")
7200 if not isinstance(self.op.disks, list):
7201 raise errors.OpPrereqError("Invalid parameter 'disks'")
7202 for row in self.op.disks:
7203 if (not isinstance(row, dict) or
7204 "size" not in row or
7205 not isinstance(row["size"], int) or
7206 "mode" not in row or
7207 row["mode"] not in ['r', 'w']):
7208 raise errors.OpPrereqError("Invalid contents of the"
7209 " 'disks' parameter")
7210 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7211 self.op.hypervisor = self.cfg.GetHypervisorType()
7212 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7213 if not hasattr(self.op, "name"):
7214 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7215 fname = self.cfg.ExpandInstanceName(self.op.name)
7217 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7219 self.op.name = fname
7220 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7222 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7225 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7226 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7227 raise errors.OpPrereqError("Missing allocator name")
7228 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7229 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7232 def Exec(self, feedback_fn):
7233 """Run the allocator test.
7236 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7237 ial = IAllocator(self,
7240 mem_size=self.op.mem_size,
7241 disks=self.op.disks,
7242 disk_template=self.op.disk_template,
7246 vcpus=self.op.vcpus,
7247 hypervisor=self.op.hypervisor,
7250 ial = IAllocator(self,
7253 relocate_from=list(self.relocate_from),
7256 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7257 result = ial.in_text
7259 ial.Run(self.op.allocator, validate=False)
7260 result = ial.out_text