4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 for attr_name in self._OP_REQP:
94 attr_val = getattr(op, attr_name, None)
96 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108 ssh = property(fget=__GetSSH)
110 def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments.
113 This method is for doing a simple syntactic check and ensure
114 validity of opcode parameters, without any cluster-related
115 checks. While the same can be accomplished in ExpandNames and/or
116 CheckPrereq, doing these separate is better because:
118 - ExpandNames is left as as purely a lock-related function
119 - CheckPrereq is run after we have aquired locks (and possible
122 The function is allowed to change the self.op attribute so that
123 later methods can no longer worry about missing parameters.
128 def ExpandNames(self):
129 """Expand names for this LU.
131 This method is called before starting to execute the opcode, and it should
132 update all the parameters of the opcode to their canonical form (e.g. a
133 short node name must be fully expanded after this method has successfully
134 completed). This way locking, hooks, logging, ecc. can work correctly.
136 LUs which implement this method must also populate the self.needed_locks
137 member, as a dict with lock levels as keys, and a list of needed lock names
140 - use an empty dict if you don't need any lock
141 - if you don't need any lock at a particular level omit that level
142 - don't put anything for the BGL level
143 - if you want all locks at a level use locking.ALL_SET as a value
145 If you need to share locks (rather than acquire them exclusively) at one
146 level you can modify self.share_locks, setting a true value (usually 1) for
147 that level. By default locks are not shared.
151 # Acquire all nodes and one instance
152 self.needed_locks = {
153 locking.LEVEL_NODE: locking.ALL_SET,
154 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156 # Acquire just two nodes
157 self.needed_locks = {
158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161 self.needed_locks = {} # No, you can't leave it to the default value None
164 # The implementation of this method is mandatory only if the new LU is
165 # concurrent, so that old LUs don't need to be changed all at the same
168 self.needed_locks = {} # Exclusive LUs don't need locks.
170 raise NotImplementedError
172 def DeclareLocks(self, level):
173 """Declare LU locking needs for a level
175 While most LUs can just declare their locking needs at ExpandNames time,
176 sometimes there's the need to calculate some locks after having acquired
177 the ones before. This function is called just before acquiring locks at a
178 particular level, but after acquiring the ones at lower levels, and permits
179 such calculations. It can be used to modify self.needed_locks, and by
180 default it does nothing.
182 This function is only called if you have something already set in
183 self.needed_locks for the level.
185 @param level: Locking level which is going to be locked
186 @type level: member of ganeti.locking.LEVELS
190 def CheckPrereq(self):
191 """Check prerequisites for this LU.
193 This method should check that the prerequisites for the execution
194 of this LU are fulfilled. It can do internode communication, but
195 it should be idempotent - no cluster or system changes are
198 The method should raise errors.OpPrereqError in case something is
199 not fulfilled. Its return value is ignored.
201 This method should also update all the parameters of the opcode to
202 their canonical form if it hasn't been done by ExpandNames before.
205 raise NotImplementedError
207 def Exec(self, feedback_fn):
210 This method should implement the actual work. It should raise
211 errors.OpExecError for failures that are somewhat dealt with in
215 raise NotImplementedError
217 def BuildHooksEnv(self):
218 """Build hooks environment for this LU.
220 This method should return a three-node tuple consisting of: a dict
221 containing the environment that will be used for running the
222 specific hook for this LU, a list of node names on which the hook
223 should run before the execution, and a list of node names on which
224 the hook should run after the execution.
226 The keys of the dict must not have 'GANETI_' prefixed as this will
227 be handled in the hooks runner. Also note additional keys will be
228 added by the hooks runner. If the LU doesn't define any
229 environment, an empty dict (and not None) should be returned.
231 No nodes should be returned as an empty list (and not None).
233 Note that if the HPATH for a LU class is None, this function will
237 raise NotImplementedError
239 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks.
242 This method is called every time a hooks phase is executed, and notifies
243 the Logical Unit about the hooks' result. The LU can then use it to alter
244 its result based on the hooks. By default the method does nothing and the
245 previous result is passed back unchanged but any LU can define it if it
246 wants to use the local cluster hook-scripts somehow.
248 @param phase: one of L{constants.HOOKS_PHASE_POST} or
249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250 @param hook_results: the results of the multi-node hooks rpc call
251 @param feedback_fn: function used send feedback back to the caller
252 @param lu_result: the previous Exec result this LU had, or None
254 @return: the new Exec result, based on the previous result
260 def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance.
263 Many LUs that work on an instance take its name in self.op.instance_name
264 and need to expand it and then declare the expanded name for locking. This
265 function does it, and then updates self.op.instance_name to the expanded
266 name. It also initializes needed_locks as a dict, if this hasn't been done
270 if self.needed_locks is None:
271 self.needed_locks = {}
273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274 "_ExpandAndLockInstance called with instance-level locks set"
275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276 if expanded_name is None:
277 raise errors.OpPrereqError("Instance '%s' not known" %
278 self.op.instance_name)
279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280 self.op.instance_name = expanded_name
282 def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking.
285 This function should be called after locking one or more instances to lock
286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287 with all primary or secondary nodes for instances already locked and
288 present in self.needed_locks[locking.LEVEL_INSTANCE].
290 It should be called from DeclareLocks, and for safety only works if
291 self.recalculate_locks[locking.LEVEL_NODE] is set.
293 In the future it may grow parameters to just lock some instance's nodes, or
294 to just lock primaries or secondary nodes, if needed.
296 If should be called in DeclareLocks in a way similar to::
298 if level == locking.LEVEL_NODE:
299 self._LockInstancesNodes()
301 @type primary_only: boolean
302 @param primary_only: only lock primary nodes of locked instances
305 assert locking.LEVEL_NODE in self.recalculate_locks, \
306 "_LockInstancesNodes helper function called with no nodes to recalculate"
308 # TODO: check if we're really been called with the instance locks held
310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311 # future we might want to have different behaviors depending on the value
312 # of self.recalculate_locks[locking.LEVEL_NODE]
314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315 instance = self.context.cfg.GetInstanceInfo(instance_name)
316 wanted_nodes.append(instance.primary_node)
318 wanted_nodes.extend(instance.secondary_nodes)
320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325 del self.recalculate_locks[locking.LEVEL_NODE]
328 class NoHooksLU(LogicalUnit):
329 """Simple LU which runs no hooks.
331 This LU is intended as a parent for other LogicalUnits which will
332 run no hooks, in order to reduce duplicate code.
339 def _GetWantedNodes(lu, nodes):
340 """Returns list of checked and expanded node names.
342 @type lu: L{LogicalUnit}
343 @param lu: the logical unit on whose behalf we execute
345 @param nodes: list of node names or None for all nodes
347 @return: the list of nodes, sorted
348 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351 if not isinstance(nodes, list):
352 raise errors.OpPrereqError("Invalid argument type 'nodes'")
355 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356 " non-empty list of nodes whose name is to be expanded.")
360 node = lu.cfg.ExpandNodeName(name)
362 raise errors.OpPrereqError("No such node name '%s'" % name)
365 return utils.NiceSort(wanted)
368 def _GetWantedInstances(lu, instances):
369 """Returns list of checked and expanded instance names.
371 @type lu: L{LogicalUnit}
372 @param lu: the logical unit on whose behalf we execute
373 @type instances: list
374 @param instances: list of instance names or None for all instances
376 @return: the list of instances, sorted
377 @raise errors.OpPrereqError: if the instances parameter is wrong type
378 @raise errors.OpPrereqError: if any of the passed instances is not found
381 if not isinstance(instances, list):
382 raise errors.OpPrereqError("Invalid argument type 'instances'")
387 for name in instances:
388 instance = lu.cfg.ExpandInstanceName(name)
390 raise errors.OpPrereqError("No such instance name '%s'" % name)
391 wanted.append(instance)
394 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398 def _CheckOutputFields(static, dynamic, selected):
399 """Checks whether all selected fields are valid.
401 @type static: L{utils.FieldSet}
402 @param static: static fields set
403 @type dynamic: L{utils.FieldSet}
404 @param dynamic: dynamic fields set
411 delta = f.NonMatching(selected)
413 raise errors.OpPrereqError("Unknown output fields selected: %s"
417 def _CheckBooleanOpField(op, name):
418 """Validates boolean opcode parameters.
420 This will ensure that an opcode parameter is either a boolean value,
421 or None (but that it always exists).
424 val = getattr(op, name, None)
425 if not (val is None or isinstance(val, bool)):
426 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428 setattr(op, name, val)
431 def _CheckNodeOnline(lu, node):
432 """Ensure that a given node is online.
434 @param lu: the LU on behalf of which we make the check
435 @param node: the node to check
436 @raise errors.OpPrereqError: if the node is offline
439 if lu.cfg.GetNodeInfo(node).offline:
440 raise errors.OpPrereqError("Can't use offline node %s" % node)
443 def _CheckNodeNotDrained(lu, node):
444 """Ensure that a given node is not drained.
446 @param lu: the LU on behalf of which we make the check
447 @param node: the node to check
448 @raise errors.OpPrereqError: if the node is drained
451 if lu.cfg.GetNodeInfo(node).drained:
452 raise errors.OpPrereqError("Can't use drained node %s" % node)
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456 memory, vcpus, nics, disk_template, disks):
457 """Builds instance related env variables for hooks
459 This builds the hook environment from individual variables.
462 @param name: the name of the instance
463 @type primary_node: string
464 @param primary_node: the name of the instance's primary node
465 @type secondary_nodes: list
466 @param secondary_nodes: list of secondary nodes as strings
467 @type os_type: string
468 @param os_type: the name of the instance's OS
469 @type status: boolean
470 @param status: the should_run status of the instance
472 @param memory: the memory size of the instance
474 @param vcpus: the count of VCPUs the instance has
476 @param nics: list of tuples (ip, bridge, mac) representing
477 the NICs the instance has
478 @type disk_template: string
479 @param disk_template: the distk template of the instance
481 @param disks: the list of (size, mode) pairs
483 @return: the hook environment for this instance
492 "INSTANCE_NAME": name,
493 "INSTANCE_PRIMARY": primary_node,
494 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495 "INSTANCE_OS_TYPE": os_type,
496 "INSTANCE_STATUS": str_status,
497 "INSTANCE_MEMORY": memory,
498 "INSTANCE_VCPUS": vcpus,
499 "INSTANCE_DISK_TEMPLATE": disk_template,
503 nic_count = len(nics)
504 for idx, (ip, mac, mode, link) in enumerate(nics):
507 env["INSTANCE_NIC%d_IP" % idx] = ip
508 env["INSTANCE_NIC%d_MAC" % idx] = mac
509 env["INSTANCE_NIC%d_MODE" % idx] = mode
510 env["INSTANCE_NIC%d_LINK" % idx] = link
511 if mode == constants.NIC_MODE_BRIDGED:
512 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
516 env["INSTANCE_NIC_COUNT"] = nic_count
519 disk_count = len(disks)
520 for idx, (size, mode) in enumerate(disks):
521 env["INSTANCE_DISK%d_SIZE" % idx] = size
522 env["INSTANCE_DISK%d_MODE" % idx] = mode
526 env["INSTANCE_DISK_COUNT"] = disk_count
530 def _PreBuildNICHooksList(lu, nics):
531 """Build a list of nic information tuples.
533 This list is suitable to be passed to _BuildInstanceHookEnv.
535 @type lu: L{LogicalUnit}
536 @param lu: the logical unit on whose behalf we execute
537 @type nics: list of L{objects.NIC}
538 @param nics: list of nics to convert to hooks tuples
542 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
546 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547 mode = filled_params[constants.NIC_MODE]
548 link = filled_params[constants.NIC_LINK]
549 hooks_nics.append((ip, mac, mode, link))
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553 """Builds instance related env variables for hooks from an object.
555 @type lu: L{LogicalUnit}
556 @param lu: the logical unit on whose behalf we execute
557 @type instance: L{objects.Instance}
558 @param instance: the instance for which we should build the
561 @param override: dictionary with key/values that will override
564 @return: the hook environment dictionary
567 bep = lu.cfg.GetClusterInfo().FillBE(instance)
569 'name': instance.name,
570 'primary_node': instance.primary_node,
571 'secondary_nodes': instance.secondary_nodes,
572 'os_type': instance.os,
573 'status': instance.admin_up,
574 'memory': bep[constants.BE_MEMORY],
575 'vcpus': bep[constants.BE_VCPUS],
576 'nics': _PreBuildNICHooksList(lu, instance.nics),
577 'disk_template': instance.disk_template,
578 'disks': [(disk.size, disk.mode) for disk in instance.disks],
581 args.update(override)
582 return _BuildInstanceHookEnv(**args)
585 def _AdjustCandidatePool(lu):
586 """Adjust the candidate pool after node operations.
589 mod_list = lu.cfg.MaintainCandidatePool()
591 lu.LogInfo("Promoted nodes to master candidate role: %s",
592 ", ".join(node.name for node in mod_list))
593 for name in mod_list:
594 lu.context.ReaddNode(name)
595 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
597 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602 profile=constants.PP_DEFAULT):
603 """Check that the brigdes needed by a list of nics exist.
606 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608 for nic in target_nics]
609 brlist = [params[constants.NIC_LINK] for params in paramslist
610 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
612 result = lu.rpc.call_bridges_exist(target_node, brlist)
613 result.Raise("Error checking bridges on destination node '%s'" %
614 target_node, prereq=True)
617 def _CheckInstanceBridgesExist(lu, instance, node=None):
618 """Check that the brigdes needed by an instance exist.
622 node=instance.primary_node
623 _CheckNicsBridgesExist(lu, instance.nics, node)
626 class LUDestroyCluster(NoHooksLU):
627 """Logical unit for destroying the cluster.
632 def CheckPrereq(self):
633 """Check prerequisites.
635 This checks whether the cluster is empty.
637 Any errors are signalled by raising errors.OpPrereqError.
640 master = self.cfg.GetMasterNode()
642 nodelist = self.cfg.GetNodeList()
643 if len(nodelist) != 1 or nodelist[0] != master:
644 raise errors.OpPrereqError("There are still %d node(s) in"
645 " this cluster." % (len(nodelist) - 1))
646 instancelist = self.cfg.GetInstanceList()
648 raise errors.OpPrereqError("There are still %d instance(s) in"
649 " this cluster." % len(instancelist))
651 def Exec(self, feedback_fn):
652 """Destroys the cluster.
655 master = self.cfg.GetMasterNode()
656 result = self.rpc.call_node_stop_master(master, False)
657 result.Raise("Could not disable the master role")
658 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
659 utils.CreateBackup(priv_key)
660 utils.CreateBackup(pub_key)
664 class LUVerifyCluster(LogicalUnit):
665 """Verifies the cluster status.
668 HPATH = "cluster-verify"
669 HTYPE = constants.HTYPE_CLUSTER
670 _OP_REQP = ["skip_checks"]
673 def ExpandNames(self):
674 self.needed_locks = {
675 locking.LEVEL_NODE: locking.ALL_SET,
676 locking.LEVEL_INSTANCE: locking.ALL_SET,
678 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
680 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
681 node_result, feedback_fn, master_files,
683 """Run multiple tests against a node.
687 - compares ganeti version
688 - checks vg existance and size > 20G
689 - checks config file checksum
690 - checks ssh to other nodes
692 @type nodeinfo: L{objects.Node}
693 @param nodeinfo: the node to check
694 @param file_list: required list of files
695 @param local_cksum: dictionary of local files and their checksums
696 @param node_result: the results from the node
697 @param feedback_fn: function used to accumulate results
698 @param master_files: list of files that only masters should have
699 @param drbd_map: the useddrbd minors for this node, in
700 form of minor: (instance, must_exist) which correspond to instances
701 and their running status
702 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
707 # main result, node_result should be a non-empty dict
708 if not node_result or not isinstance(node_result, dict):
709 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
712 # compares ganeti version
713 local_version = constants.PROTOCOL_VERSION
714 remote_version = node_result.get('version', None)
715 if not (remote_version and isinstance(remote_version, (list, tuple)) and
716 len(remote_version) == 2):
717 feedback_fn(" - ERROR: connection to %s failed" % (node))
720 if local_version != remote_version[0]:
721 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
722 " node %s %s" % (local_version, node, remote_version[0]))
725 # node seems compatible, we can actually try to look into its results
729 # full package version
730 if constants.RELEASE_VERSION != remote_version[1]:
731 feedback_fn(" - WARNING: software version mismatch: master %s,"
733 (constants.RELEASE_VERSION, node, remote_version[1]))
735 # checks vg existence and size > 20G
736 if vg_name is not None:
737 vglist = node_result.get(constants.NV_VGLIST, None)
739 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
743 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
744 constants.MIN_VG_SIZE)
746 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
749 # checks config file checksum
751 remote_cksum = node_result.get(constants.NV_FILELIST, None)
752 if not isinstance(remote_cksum, dict):
754 feedback_fn(" - ERROR: node hasn't returned file checksum data")
756 for file_name in file_list:
757 node_is_mc = nodeinfo.master_candidate
758 must_have_file = file_name not in master_files
759 if file_name not in remote_cksum:
760 if node_is_mc or must_have_file:
762 feedback_fn(" - ERROR: file '%s' missing" % file_name)
763 elif remote_cksum[file_name] != local_cksum[file_name]:
764 if node_is_mc or must_have_file:
766 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
768 # not candidate and this is not a must-have file
770 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
773 # all good, except non-master/non-must have combination
774 if not node_is_mc and not must_have_file:
775 feedback_fn(" - ERROR: file '%s' should not exist on non master"
776 " candidates" % file_name)
780 if constants.NV_NODELIST not in node_result:
782 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
784 if node_result[constants.NV_NODELIST]:
786 for node in node_result[constants.NV_NODELIST]:
787 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
788 (node, node_result[constants.NV_NODELIST][node]))
790 if constants.NV_NODENETTEST not in node_result:
792 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
794 if node_result[constants.NV_NODENETTEST]:
796 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
798 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
799 (node, node_result[constants.NV_NODENETTEST][node]))
801 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
802 if isinstance(hyp_result, dict):
803 for hv_name, hv_result in hyp_result.iteritems():
804 if hv_result is not None:
805 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
806 (hv_name, hv_result))
808 # check used drbd list
809 if vg_name is not None:
810 used_minors = node_result.get(constants.NV_DRBDLIST, [])
811 if not isinstance(used_minors, (tuple, list)):
812 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
815 for minor, (iname, must_exist) in drbd_map.items():
816 if minor not in used_minors and must_exist:
817 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
818 " not active" % (minor, iname))
820 for minor in used_minors:
821 if minor not in drbd_map:
822 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
828 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
829 node_instance, feedback_fn, n_offline):
830 """Verify an instance.
832 This function checks to see if the required block devices are
833 available on the instance's node.
838 node_current = instanceconfig.primary_node
841 instanceconfig.MapLVsByNode(node_vol_should)
843 for node in node_vol_should:
844 if node in n_offline:
845 # ignore missing volumes on offline nodes
847 for volume in node_vol_should[node]:
848 if node not in node_vol_is or volume not in node_vol_is[node]:
849 feedback_fn(" - ERROR: volume %s missing on node %s" %
853 if instanceconfig.admin_up:
854 if ((node_current not in node_instance or
855 not instance in node_instance[node_current]) and
856 node_current not in n_offline):
857 feedback_fn(" - ERROR: instance %s not running on node %s" %
858 (instance, node_current))
861 for node in node_instance:
862 if (not node == node_current):
863 if instance in node_instance[node]:
864 feedback_fn(" - ERROR: instance %s should not run on node %s" %
870 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
871 """Verify if there are any unknown volumes in the cluster.
873 The .os, .swap and backup volumes are ignored. All other volumes are
879 for node in node_vol_is:
880 for volume in node_vol_is[node]:
881 if node not in node_vol_should or volume not in node_vol_should[node]:
882 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
887 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
888 """Verify the list of running instances.
890 This checks what instances are running but unknown to the cluster.
894 for node in node_instance:
895 for runninginstance in node_instance[node]:
896 if runninginstance not in instancelist:
897 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
898 (runninginstance, node))
902 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
903 """Verify N+1 Memory Resilience.
905 Check that if one single node dies we can still start all the instances it
911 for node, nodeinfo in node_info.iteritems():
912 # This code checks that every node which is now listed as secondary has
913 # enough memory to host all instances it is supposed to should a single
914 # other node in the cluster fail.
915 # FIXME: not ready for failover to an arbitrary node
916 # FIXME: does not support file-backed instances
917 # WARNING: we currently take into account down instances as well as up
918 # ones, considering that even if they're down someone might want to start
919 # them even in the event of a node failure.
920 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
922 for instance in instances:
923 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
924 if bep[constants.BE_AUTO_BALANCE]:
925 needed_mem += bep[constants.BE_MEMORY]
926 if nodeinfo['mfree'] < needed_mem:
927 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
928 " failovers should node %s fail" % (node, prinode))
932 def CheckPrereq(self):
933 """Check prerequisites.
935 Transform the list of checks we're going to skip into a set and check that
936 all its members are valid.
939 self.skip_set = frozenset(self.op.skip_checks)
940 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
941 raise errors.OpPrereqError("Invalid checks to be skipped specified")
943 def BuildHooksEnv(self):
946 Cluster-Verify hooks just rone in the post phase and their failure makes
947 the output be logged in the verify output and the verification to fail.
950 all_nodes = self.cfg.GetNodeList()
952 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
954 for node in self.cfg.GetAllNodesInfo().values():
955 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
957 return env, [], all_nodes
959 def Exec(self, feedback_fn):
960 """Verify integrity of cluster, performing various test on nodes.
964 feedback_fn("* Verifying global settings")
965 for msg in self.cfg.VerifyConfig():
966 feedback_fn(" - ERROR: %s" % msg)
968 vg_name = self.cfg.GetVGName()
969 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
970 nodelist = utils.NiceSort(self.cfg.GetNodeList())
971 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
972 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
973 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
974 for iname in instancelist)
975 i_non_redundant = [] # Non redundant instances
976 i_non_a_balanced = [] # Non auto-balanced instances
977 n_offline = [] # List of offline nodes
978 n_drained = [] # List of nodes being drained
984 # FIXME: verify OS list
986 master_files = [constants.CLUSTER_CONF_FILE]
988 file_names = ssconf.SimpleStore().GetFileList()
989 file_names.append(constants.SSL_CERT_FILE)
990 file_names.append(constants.RAPI_CERT_FILE)
991 file_names.extend(master_files)
993 local_checksums = utils.FingerprintFiles(file_names)
995 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
996 node_verify_param = {
997 constants.NV_FILELIST: file_names,
998 constants.NV_NODELIST: [node.name for node in nodeinfo
999 if not node.offline],
1000 constants.NV_HYPERVISOR: hypervisors,
1001 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1002 node.secondary_ip) for node in nodeinfo
1003 if not node.offline],
1004 constants.NV_INSTANCELIST: hypervisors,
1005 constants.NV_VERSION: None,
1006 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1008 if vg_name is not None:
1009 node_verify_param[constants.NV_VGLIST] = None
1010 node_verify_param[constants.NV_LVLIST] = vg_name
1011 node_verify_param[constants.NV_DRBDLIST] = None
1012 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1013 self.cfg.GetClusterName())
1015 cluster = self.cfg.GetClusterInfo()
1016 master_node = self.cfg.GetMasterNode()
1017 all_drbd_map = self.cfg.ComputeDRBDMap()
1019 for node_i in nodeinfo:
1023 feedback_fn("* Skipping offline node %s" % (node,))
1024 n_offline.append(node)
1027 if node == master_node:
1029 elif node_i.master_candidate:
1030 ntype = "master candidate"
1031 elif node_i.drained:
1033 n_drained.append(node)
1036 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1038 msg = all_nvinfo[node].fail_msg
1040 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1044 nresult = all_nvinfo[node].payload
1046 for minor, instance in all_drbd_map[node].items():
1047 if instance not in instanceinfo:
1048 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1050 # ghost instance should not be running, but otherwise we
1051 # don't give double warnings (both ghost instance and
1052 # unallocated minor in use)
1053 node_drbd[minor] = (instance, False)
1055 instance = instanceinfo[instance]
1056 node_drbd[minor] = (instance.name, instance.admin_up)
1057 result = self._VerifyNode(node_i, file_names, local_checksums,
1058 nresult, feedback_fn, master_files,
1062 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1064 node_volume[node] = {}
1065 elif isinstance(lvdata, basestring):
1066 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1067 (node, utils.SafeEncode(lvdata)))
1069 node_volume[node] = {}
1070 elif not isinstance(lvdata, dict):
1071 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1075 node_volume[node] = lvdata
1078 idata = nresult.get(constants.NV_INSTANCELIST, None)
1079 if not isinstance(idata, list):
1080 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1085 node_instance[node] = idata
1088 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1089 if not isinstance(nodeinfo, dict):
1090 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1096 "mfree": int(nodeinfo['memory_free']),
1099 # dictionary holding all instances this node is secondary for,
1100 # grouped by their primary node. Each key is a cluster node, and each
1101 # value is a list of instances which have the key as primary and the
1102 # current node as secondary. this is handy to calculate N+1 memory
1103 # availability if you can only failover from a primary to its
1105 "sinst-by-pnode": {},
1107 # FIXME: devise a free space model for file based instances as well
1108 if vg_name is not None:
1109 if (constants.NV_VGLIST not in nresult or
1110 vg_name not in nresult[constants.NV_VGLIST]):
1111 feedback_fn(" - ERROR: node %s didn't return data for the"
1112 " volume group '%s' - it is either missing or broken" %
1116 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1117 except (ValueError, KeyError):
1118 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1119 " from node %s" % (node,))
1123 node_vol_should = {}
1125 for instance in instancelist:
1126 feedback_fn("* Verifying instance %s" % instance)
1127 inst_config = instanceinfo[instance]
1128 result = self._VerifyInstance(instance, inst_config, node_volume,
1129 node_instance, feedback_fn, n_offline)
1131 inst_nodes_offline = []
1133 inst_config.MapLVsByNode(node_vol_should)
1135 instance_cfg[instance] = inst_config
1137 pnode = inst_config.primary_node
1138 if pnode in node_info:
1139 node_info[pnode]['pinst'].append(instance)
1140 elif pnode not in n_offline:
1141 feedback_fn(" - ERROR: instance %s, connection to primary node"
1142 " %s failed" % (instance, pnode))
1145 if pnode in n_offline:
1146 inst_nodes_offline.append(pnode)
1148 # If the instance is non-redundant we cannot survive losing its primary
1149 # node, so we are not N+1 compliant. On the other hand we have no disk
1150 # templates with more than one secondary so that situation is not well
1152 # FIXME: does not support file-backed instances
1153 if len(inst_config.secondary_nodes) == 0:
1154 i_non_redundant.append(instance)
1155 elif len(inst_config.secondary_nodes) > 1:
1156 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1159 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1160 i_non_a_balanced.append(instance)
1162 for snode in inst_config.secondary_nodes:
1163 if snode in node_info:
1164 node_info[snode]['sinst'].append(instance)
1165 if pnode not in node_info[snode]['sinst-by-pnode']:
1166 node_info[snode]['sinst-by-pnode'][pnode] = []
1167 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1168 elif snode not in n_offline:
1169 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1170 " %s failed" % (instance, snode))
1172 if snode in n_offline:
1173 inst_nodes_offline.append(snode)
1175 if inst_nodes_offline:
1176 # warn that the instance lives on offline nodes, and set bad=True
1177 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1178 ", ".join(inst_nodes_offline))
1181 feedback_fn("* Verifying orphan volumes")
1182 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1186 feedback_fn("* Verifying remaining instances")
1187 result = self._VerifyOrphanInstances(instancelist, node_instance,
1191 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1192 feedback_fn("* Verifying N+1 Memory redundancy")
1193 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1196 feedback_fn("* Other Notes")
1198 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1199 % len(i_non_redundant))
1201 if i_non_a_balanced:
1202 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1203 % len(i_non_a_balanced))
1206 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1209 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1213 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1214 """Analize the post-hooks' result
1216 This method analyses the hook result, handles it, and sends some
1217 nicely-formatted feedback back to the user.
1219 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1220 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1221 @param hooks_results: the results of the multi-node hooks rpc call
1222 @param feedback_fn: function used send feedback back to the caller
1223 @param lu_result: previous Exec result
1224 @return: the new Exec result, based on the previous result
1228 # We only really run POST phase hooks, and are only interested in
1230 if phase == constants.HOOKS_PHASE_POST:
1231 # Used to change hooks' output to proper indentation
1232 indent_re = re.compile('^', re.M)
1233 feedback_fn("* Hooks Results")
1234 if not hooks_results:
1235 feedback_fn(" - ERROR: general communication failure")
1238 for node_name in hooks_results:
1239 show_node_header = True
1240 res = hooks_results[node_name]
1244 # no need to warn or set fail return value
1246 feedback_fn(" Communication failure in hooks execution: %s" %
1250 for script, hkr, output in res.payload:
1251 if hkr == constants.HKR_FAIL:
1252 # The node header is only shown once, if there are
1253 # failing hooks on that node
1254 if show_node_header:
1255 feedback_fn(" Node %s:" % node_name)
1256 show_node_header = False
1257 feedback_fn(" ERROR: Script %s failed, output:" % script)
1258 output = indent_re.sub(' ', output)
1259 feedback_fn("%s" % output)
1265 class LUVerifyDisks(NoHooksLU):
1266 """Verifies the cluster disks status.
1272 def ExpandNames(self):
1273 self.needed_locks = {
1274 locking.LEVEL_NODE: locking.ALL_SET,
1275 locking.LEVEL_INSTANCE: locking.ALL_SET,
1277 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1279 def CheckPrereq(self):
1280 """Check prerequisites.
1282 This has no prerequisites.
1287 def Exec(self, feedback_fn):
1288 """Verify integrity of cluster disks.
1290 @rtype: tuple of three items
1291 @return: a tuple of (dict of node-to-node_error, list of instances
1292 which need activate-disks, dict of instance: (node, volume) for
1296 result = res_nodes, res_instances, res_missing = {}, [], {}
1298 vg_name = self.cfg.GetVGName()
1299 nodes = utils.NiceSort(self.cfg.GetNodeList())
1300 instances = [self.cfg.GetInstanceInfo(name)
1301 for name in self.cfg.GetInstanceList()]
1304 for inst in instances:
1306 if (not inst.admin_up or
1307 inst.disk_template not in constants.DTS_NET_MIRROR):
1309 inst.MapLVsByNode(inst_lvs)
1310 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1311 for node, vol_list in inst_lvs.iteritems():
1312 for vol in vol_list:
1313 nv_dict[(node, vol)] = inst
1318 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1323 node_res = node_lvs[node]
1324 if node_res.offline:
1326 msg = node_res.fail_msg
1328 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1329 res_nodes[node] = msg
1332 lvs = node_res.payload
1333 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1334 inst = nv_dict.pop((node, lv_name), None)
1335 if (not lv_online and inst is not None
1336 and inst.name not in res_instances):
1337 res_instances.append(inst.name)
1339 # any leftover items in nv_dict are missing LVs, let's arrange the
1341 for key, inst in nv_dict.iteritems():
1342 if inst.name not in res_missing:
1343 res_missing[inst.name] = []
1344 res_missing[inst.name].append(key)
1349 class LURenameCluster(LogicalUnit):
1350 """Rename the cluster.
1353 HPATH = "cluster-rename"
1354 HTYPE = constants.HTYPE_CLUSTER
1357 def BuildHooksEnv(self):
1362 "OP_TARGET": self.cfg.GetClusterName(),
1363 "NEW_NAME": self.op.name,
1365 mn = self.cfg.GetMasterNode()
1366 return env, [mn], [mn]
1368 def CheckPrereq(self):
1369 """Verify that the passed name is a valid one.
1372 hostname = utils.HostInfo(self.op.name)
1374 new_name = hostname.name
1375 self.ip = new_ip = hostname.ip
1376 old_name = self.cfg.GetClusterName()
1377 old_ip = self.cfg.GetMasterIP()
1378 if new_name == old_name and new_ip == old_ip:
1379 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1380 " cluster has changed")
1381 if new_ip != old_ip:
1382 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1383 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1384 " reachable on the network. Aborting." %
1387 self.op.name = new_name
1389 def Exec(self, feedback_fn):
1390 """Rename the cluster.
1393 clustername = self.op.name
1396 # shutdown the master IP
1397 master = self.cfg.GetMasterNode()
1398 result = self.rpc.call_node_stop_master(master, False)
1399 result.Raise("Could not disable the master role")
1402 cluster = self.cfg.GetClusterInfo()
1403 cluster.cluster_name = clustername
1404 cluster.master_ip = ip
1405 self.cfg.Update(cluster)
1407 # update the known hosts file
1408 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1409 node_list = self.cfg.GetNodeList()
1411 node_list.remove(master)
1414 result = self.rpc.call_upload_file(node_list,
1415 constants.SSH_KNOWN_HOSTS_FILE)
1416 for to_node, to_result in result.iteritems():
1417 msg = to_result.fail_msg
1419 msg = ("Copy of file %s to node %s failed: %s" %
1420 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1421 self.proc.LogWarning(msg)
1424 result = self.rpc.call_node_start_master(master, False)
1425 msg = result.fail_msg
1427 self.LogWarning("Could not re-enable the master role on"
1428 " the master, please restart manually: %s", msg)
1431 def _RecursiveCheckIfLVMBased(disk):
1432 """Check if the given disk or its children are lvm-based.
1434 @type disk: L{objects.Disk}
1435 @param disk: the disk to check
1437 @return: boolean indicating whether a LD_LV dev_type was found or not
1441 for chdisk in disk.children:
1442 if _RecursiveCheckIfLVMBased(chdisk):
1444 return disk.dev_type == constants.LD_LV
1447 class LUSetClusterParams(LogicalUnit):
1448 """Change the parameters of the cluster.
1451 HPATH = "cluster-modify"
1452 HTYPE = constants.HTYPE_CLUSTER
1456 def CheckArguments(self):
1460 if not hasattr(self.op, "candidate_pool_size"):
1461 self.op.candidate_pool_size = None
1462 if self.op.candidate_pool_size is not None:
1464 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1465 except (ValueError, TypeError), err:
1466 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1468 if self.op.candidate_pool_size < 1:
1469 raise errors.OpPrereqError("At least one master candidate needed")
1471 def ExpandNames(self):
1472 # FIXME: in the future maybe other cluster params won't require checking on
1473 # all nodes to be modified.
1474 self.needed_locks = {
1475 locking.LEVEL_NODE: locking.ALL_SET,
1477 self.share_locks[locking.LEVEL_NODE] = 1
1479 def BuildHooksEnv(self):
1484 "OP_TARGET": self.cfg.GetClusterName(),
1485 "NEW_VG_NAME": self.op.vg_name,
1487 mn = self.cfg.GetMasterNode()
1488 return env, [mn], [mn]
1490 def CheckPrereq(self):
1491 """Check prerequisites.
1493 This checks whether the given params don't conflict and
1494 if the given volume group is valid.
1497 if self.op.vg_name is not None and not self.op.vg_name:
1498 instances = self.cfg.GetAllInstancesInfo().values()
1499 for inst in instances:
1500 for disk in inst.disks:
1501 if _RecursiveCheckIfLVMBased(disk):
1502 raise errors.OpPrereqError("Cannot disable lvm storage while"
1503 " lvm-based instances exist")
1505 node_list = self.acquired_locks[locking.LEVEL_NODE]
1507 # if vg_name not None, checks given volume group on all nodes
1509 vglist = self.rpc.call_vg_list(node_list)
1510 for node in node_list:
1511 msg = vglist[node].fail_msg
1513 # ignoring down node
1514 self.LogWarning("Error while gathering data on node %s"
1515 " (ignoring node): %s", node, msg)
1517 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1519 constants.MIN_VG_SIZE)
1521 raise errors.OpPrereqError("Error on node '%s': %s" %
1524 self.cluster = cluster = self.cfg.GetClusterInfo()
1525 # validate params changes
1526 if self.op.beparams:
1527 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1528 self.new_beparams = objects.FillDict(
1529 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1531 if self.op.nicparams:
1532 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1533 self.new_nicparams = objects.FillDict(
1534 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1535 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1537 # hypervisor list/parameters
1538 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1539 if self.op.hvparams:
1540 if not isinstance(self.op.hvparams, dict):
1541 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1542 for hv_name, hv_dict in self.op.hvparams.items():
1543 if hv_name not in self.new_hvparams:
1544 self.new_hvparams[hv_name] = hv_dict
1546 self.new_hvparams[hv_name].update(hv_dict)
1548 if self.op.enabled_hypervisors is not None:
1549 self.hv_list = self.op.enabled_hypervisors
1551 self.hv_list = cluster.enabled_hypervisors
1553 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1554 # either the enabled list has changed, or the parameters have, validate
1555 for hv_name, hv_params in self.new_hvparams.items():
1556 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1557 (self.op.enabled_hypervisors and
1558 hv_name in self.op.enabled_hypervisors)):
1559 # either this is a new hypervisor, or its parameters have changed
1560 hv_class = hypervisor.GetHypervisor(hv_name)
1561 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1562 hv_class.CheckParameterSyntax(hv_params)
1563 _CheckHVParams(self, node_list, hv_name, hv_params)
1565 def Exec(self, feedback_fn):
1566 """Change the parameters of the cluster.
1569 if self.op.vg_name is not None:
1570 new_volume = self.op.vg_name
1573 if new_volume != self.cfg.GetVGName():
1574 self.cfg.SetVGName(new_volume)
1576 feedback_fn("Cluster LVM configuration already in desired"
1577 " state, not changing")
1578 if self.op.hvparams:
1579 self.cluster.hvparams = self.new_hvparams
1580 if self.op.enabled_hypervisors is not None:
1581 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1582 if self.op.beparams:
1583 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1584 if self.op.nicparams:
1585 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1587 if self.op.candidate_pool_size is not None:
1588 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1590 self.cfg.Update(self.cluster)
1592 # we want to update nodes after the cluster so that if any errors
1593 # happen, we have recorded and saved the cluster info
1594 if self.op.candidate_pool_size is not None:
1595 _AdjustCandidatePool(self)
1598 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1599 """Distribute additional files which are part of the cluster configuration.
1601 ConfigWriter takes care of distributing the config and ssconf files, but
1602 there are more files which should be distributed to all nodes. This function
1603 makes sure those are copied.
1605 @param lu: calling logical unit
1606 @param additional_nodes: list of nodes not in the config to distribute to
1609 # 1. Gather target nodes
1610 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1611 dist_nodes = lu.cfg.GetNodeList()
1612 if additional_nodes is not None:
1613 dist_nodes.extend(additional_nodes)
1614 if myself.name in dist_nodes:
1615 dist_nodes.remove(myself.name)
1616 # 2. Gather files to distribute
1617 dist_files = set([constants.ETC_HOSTS,
1618 constants.SSH_KNOWN_HOSTS_FILE,
1619 constants.RAPI_CERT_FILE,
1620 constants.RAPI_USERS_FILE,
1623 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1624 for hv_name in enabled_hypervisors:
1625 hv_class = hypervisor.GetHypervisor(hv_name)
1626 dist_files.update(hv_class.GetAncillaryFiles())
1628 # 3. Perform the files upload
1629 for fname in dist_files:
1630 if os.path.exists(fname):
1631 result = lu.rpc.call_upload_file(dist_nodes, fname)
1632 for to_node, to_result in result.items():
1633 msg = to_result.fail_msg
1635 msg = ("Copy of file %s to node %s failed: %s" %
1636 (fname, to_node, msg))
1637 lu.proc.LogWarning(msg)
1640 class LURedistributeConfig(NoHooksLU):
1641 """Force the redistribution of cluster configuration.
1643 This is a very simple LU.
1649 def ExpandNames(self):
1650 self.needed_locks = {
1651 locking.LEVEL_NODE: locking.ALL_SET,
1653 self.share_locks[locking.LEVEL_NODE] = 1
1655 def CheckPrereq(self):
1656 """Check prerequisites.
1660 def Exec(self, feedback_fn):
1661 """Redistribute the configuration.
1664 self.cfg.Update(self.cfg.GetClusterInfo())
1665 _RedistributeAncillaryFiles(self)
1668 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1669 """Sleep and poll for an instance's disk to sync.
1672 if not instance.disks:
1676 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1678 node = instance.primary_node
1680 for dev in instance.disks:
1681 lu.cfg.SetDiskID(dev, node)
1687 cumul_degraded = False
1688 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1689 msg = rstats.fail_msg
1691 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1694 raise errors.RemoteError("Can't contact node %s for mirror data,"
1695 " aborting." % node)
1698 rstats = rstats.payload
1700 for i, mstat in enumerate(rstats):
1702 lu.LogWarning("Can't compute data for node %s/%s",
1703 node, instance.disks[i].iv_name)
1705 # we ignore the ldisk parameter
1706 perc_done, est_time, is_degraded, _ = mstat
1707 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1708 if perc_done is not None:
1710 if est_time is not None:
1711 rem_time = "%d estimated seconds remaining" % est_time
1714 rem_time = "no time estimate"
1715 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1716 (instance.disks[i].iv_name, perc_done, rem_time))
1720 time.sleep(min(60, max_time))
1723 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1724 return not cumul_degraded
1727 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1728 """Check that mirrors are not degraded.
1730 The ldisk parameter, if True, will change the test from the
1731 is_degraded attribute (which represents overall non-ok status for
1732 the device(s)) to the ldisk (representing the local storage status).
1735 lu.cfg.SetDiskID(dev, node)
1742 if on_primary or dev.AssembleOnSecondary():
1743 rstats = lu.rpc.call_blockdev_find(node, dev)
1744 msg = rstats.fail_msg
1746 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1748 elif not rstats.payload:
1749 lu.LogWarning("Can't find disk on node %s", node)
1752 result = result and (not rstats.payload[idx])
1754 for child in dev.children:
1755 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1760 class LUDiagnoseOS(NoHooksLU):
1761 """Logical unit for OS diagnose/query.
1764 _OP_REQP = ["output_fields", "names"]
1766 _FIELDS_STATIC = utils.FieldSet()
1767 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1769 def ExpandNames(self):
1771 raise errors.OpPrereqError("Selective OS query not supported")
1773 _CheckOutputFields(static=self._FIELDS_STATIC,
1774 dynamic=self._FIELDS_DYNAMIC,
1775 selected=self.op.output_fields)
1777 # Lock all nodes, in shared mode
1778 # Temporary removal of locks, should be reverted later
1779 # TODO: reintroduce locks when they are lighter-weight
1780 self.needed_locks = {}
1781 #self.share_locks[locking.LEVEL_NODE] = 1
1782 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1784 def CheckPrereq(self):
1785 """Check prerequisites.
1790 def _DiagnoseByOS(node_list, rlist):
1791 """Remaps a per-node return list into an a per-os per-node dictionary
1793 @param node_list: a list with the names of all nodes
1794 @param rlist: a map with node names as keys and OS objects as values
1797 @return: a dictionary with osnames as keys and as value another map, with
1798 nodes as keys and tuples of (path, status, diagnose) as values, eg::
1800 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1801 (/srv/..., False, "invalid api")],
1802 "node2": [(/srv/..., True, "")]}
1807 # we build here the list of nodes that didn't fail the RPC (at RPC
1808 # level), so that nodes with a non-responding node daemon don't
1809 # make all OSes invalid
1810 good_nodes = [node_name for node_name in rlist
1811 if not rlist[node_name].fail_msg]
1812 for node_name, nr in rlist.items():
1813 if nr.fail_msg or not nr.payload:
1815 for name, path, status, diagnose in nr.payload:
1816 if name not in all_os:
1817 # build a list of nodes for this os containing empty lists
1818 # for each node in node_list
1820 for nname in good_nodes:
1821 all_os[name][nname] = []
1822 all_os[name][node_name].append((path, status, diagnose))
1825 def Exec(self, feedback_fn):
1826 """Compute the list of OSes.
1829 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1830 node_data = self.rpc.call_os_diagnose(valid_nodes)
1831 pol = self._DiagnoseByOS(valid_nodes, node_data)
1833 for os_name, os_data in pol.items():
1835 for field in self.op.output_fields:
1838 elif field == "valid":
1839 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1840 elif field == "node_status":
1841 # this is just a copy of the dict
1843 for node_name, nos_list in os_data.items():
1844 val[node_name] = nos_list
1846 raise errors.ParameterError(field)
1853 class LURemoveNode(LogicalUnit):
1854 """Logical unit for removing a node.
1857 HPATH = "node-remove"
1858 HTYPE = constants.HTYPE_NODE
1859 _OP_REQP = ["node_name"]
1861 def BuildHooksEnv(self):
1864 This doesn't run on the target node in the pre phase as a failed
1865 node would then be impossible to remove.
1869 "OP_TARGET": self.op.node_name,
1870 "NODE_NAME": self.op.node_name,
1872 all_nodes = self.cfg.GetNodeList()
1873 all_nodes.remove(self.op.node_name)
1874 return env, all_nodes, all_nodes
1876 def CheckPrereq(self):
1877 """Check prerequisites.
1880 - the node exists in the configuration
1881 - it does not have primary or secondary instances
1882 - it's not the master
1884 Any errors are signalled by raising errors.OpPrereqError.
1887 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1889 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1891 instance_list = self.cfg.GetInstanceList()
1893 masternode = self.cfg.GetMasterNode()
1894 if node.name == masternode:
1895 raise errors.OpPrereqError("Node is the master node,"
1896 " you need to failover first.")
1898 for instance_name in instance_list:
1899 instance = self.cfg.GetInstanceInfo(instance_name)
1900 if node.name in instance.all_nodes:
1901 raise errors.OpPrereqError("Instance %s is still running on the node,"
1902 " please remove first." % instance_name)
1903 self.op.node_name = node.name
1906 def Exec(self, feedback_fn):
1907 """Removes the node from the cluster.
1911 logging.info("Stopping the node daemon and removing configs from node %s",
1914 self.context.RemoveNode(node.name)
1916 result = self.rpc.call_node_leave_cluster(node.name)
1917 msg = result.fail_msg
1919 self.LogWarning("Errors encountered on the remote node while leaving"
1920 " the cluster: %s", msg)
1922 # Promote nodes to master candidate as needed
1923 _AdjustCandidatePool(self)
1926 class LUQueryNodes(NoHooksLU):
1927 """Logical unit for querying nodes.
1930 _OP_REQP = ["output_fields", "names", "use_locking"]
1932 _FIELDS_DYNAMIC = utils.FieldSet(
1934 "mtotal", "mnode", "mfree",
1936 "ctotal", "cnodes", "csockets",
1939 _FIELDS_STATIC = utils.FieldSet(
1940 "name", "pinst_cnt", "sinst_cnt",
1941 "pinst_list", "sinst_list",
1942 "pip", "sip", "tags",
1950 def ExpandNames(self):
1951 _CheckOutputFields(static=self._FIELDS_STATIC,
1952 dynamic=self._FIELDS_DYNAMIC,
1953 selected=self.op.output_fields)
1955 self.needed_locks = {}
1956 self.share_locks[locking.LEVEL_NODE] = 1
1959 self.wanted = _GetWantedNodes(self, self.op.names)
1961 self.wanted = locking.ALL_SET
1963 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1964 self.do_locking = self.do_node_query and self.op.use_locking
1966 # if we don't request only static fields, we need to lock the nodes
1967 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1970 def CheckPrereq(self):
1971 """Check prerequisites.
1974 # The validation of the node list is done in the _GetWantedNodes,
1975 # if non empty, and if empty, there's no validation to do
1978 def Exec(self, feedback_fn):
1979 """Computes the list of nodes and their attributes.
1982 all_info = self.cfg.GetAllNodesInfo()
1984 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1985 elif self.wanted != locking.ALL_SET:
1986 nodenames = self.wanted
1987 missing = set(nodenames).difference(all_info.keys())
1989 raise errors.OpExecError(
1990 "Some nodes were removed before retrieving their data: %s" % missing)
1992 nodenames = all_info.keys()
1994 nodenames = utils.NiceSort(nodenames)
1995 nodelist = [all_info[name] for name in nodenames]
1997 # begin data gathering
1999 if self.do_node_query:
2001 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2002 self.cfg.GetHypervisorType())
2003 for name in nodenames:
2004 nodeinfo = node_data[name]
2005 if not nodeinfo.fail_msg and nodeinfo.payload:
2006 nodeinfo = nodeinfo.payload
2007 fn = utils.TryConvert
2009 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2010 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2011 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2012 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2013 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2014 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2015 "bootid": nodeinfo.get('bootid', None),
2016 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2017 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2020 live_data[name] = {}
2022 live_data = dict.fromkeys(nodenames, {})
2024 node_to_primary = dict([(name, set()) for name in nodenames])
2025 node_to_secondary = dict([(name, set()) for name in nodenames])
2027 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2028 "sinst_cnt", "sinst_list"))
2029 if inst_fields & frozenset(self.op.output_fields):
2030 instancelist = self.cfg.GetInstanceList()
2032 for instance_name in instancelist:
2033 inst = self.cfg.GetInstanceInfo(instance_name)
2034 if inst.primary_node in node_to_primary:
2035 node_to_primary[inst.primary_node].add(inst.name)
2036 for secnode in inst.secondary_nodes:
2037 if secnode in node_to_secondary:
2038 node_to_secondary[secnode].add(inst.name)
2040 master_node = self.cfg.GetMasterNode()
2042 # end data gathering
2045 for node in nodelist:
2047 for field in self.op.output_fields:
2050 elif field == "pinst_list":
2051 val = list(node_to_primary[node.name])
2052 elif field == "sinst_list":
2053 val = list(node_to_secondary[node.name])
2054 elif field == "pinst_cnt":
2055 val = len(node_to_primary[node.name])
2056 elif field == "sinst_cnt":
2057 val = len(node_to_secondary[node.name])
2058 elif field == "pip":
2059 val = node.primary_ip
2060 elif field == "sip":
2061 val = node.secondary_ip
2062 elif field == "tags":
2063 val = list(node.GetTags())
2064 elif field == "serial_no":
2065 val = node.serial_no
2066 elif field == "master_candidate":
2067 val = node.master_candidate
2068 elif field == "master":
2069 val = node.name == master_node
2070 elif field == "offline":
2072 elif field == "drained":
2074 elif self._FIELDS_DYNAMIC.Matches(field):
2075 val = live_data[node.name].get(field, None)
2077 raise errors.ParameterError(field)
2078 node_output.append(val)
2079 output.append(node_output)
2084 class LUQueryNodeVolumes(NoHooksLU):
2085 """Logical unit for getting volumes on node(s).
2088 _OP_REQP = ["nodes", "output_fields"]
2090 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2091 _FIELDS_STATIC = utils.FieldSet("node")
2093 def ExpandNames(self):
2094 _CheckOutputFields(static=self._FIELDS_STATIC,
2095 dynamic=self._FIELDS_DYNAMIC,
2096 selected=self.op.output_fields)
2098 self.needed_locks = {}
2099 self.share_locks[locking.LEVEL_NODE] = 1
2100 if not self.op.nodes:
2101 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2103 self.needed_locks[locking.LEVEL_NODE] = \
2104 _GetWantedNodes(self, self.op.nodes)
2106 def CheckPrereq(self):
2107 """Check prerequisites.
2109 This checks that the fields required are valid output fields.
2112 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2114 def Exec(self, feedback_fn):
2115 """Computes the list of nodes and their attributes.
2118 nodenames = self.nodes
2119 volumes = self.rpc.call_node_volumes(nodenames)
2121 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2122 in self.cfg.GetInstanceList()]
2124 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2127 for node in nodenames:
2128 nresult = volumes[node]
2131 msg = nresult.fail_msg
2133 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2136 node_vols = nresult.payload[:]
2137 node_vols.sort(key=lambda vol: vol['dev'])
2139 for vol in node_vols:
2141 for field in self.op.output_fields:
2144 elif field == "phys":
2148 elif field == "name":
2150 elif field == "size":
2151 val = int(float(vol['size']))
2152 elif field == "instance":
2154 if node not in lv_by_node[inst]:
2156 if vol['name'] in lv_by_node[inst][node]:
2162 raise errors.ParameterError(field)
2163 node_output.append(str(val))
2165 output.append(node_output)
2170 class LUAddNode(LogicalUnit):
2171 """Logical unit for adding node to the cluster.
2175 HTYPE = constants.HTYPE_NODE
2176 _OP_REQP = ["node_name"]
2178 def BuildHooksEnv(self):
2181 This will run on all nodes before, and on all nodes + the new node after.
2185 "OP_TARGET": self.op.node_name,
2186 "NODE_NAME": self.op.node_name,
2187 "NODE_PIP": self.op.primary_ip,
2188 "NODE_SIP": self.op.secondary_ip,
2190 nodes_0 = self.cfg.GetNodeList()
2191 nodes_1 = nodes_0 + [self.op.node_name, ]
2192 return env, nodes_0, nodes_1
2194 def CheckPrereq(self):
2195 """Check prerequisites.
2198 - the new node is not already in the config
2200 - its parameters (single/dual homed) matches the cluster
2202 Any errors are signalled by raising errors.OpPrereqError.
2205 node_name = self.op.node_name
2208 dns_data = utils.HostInfo(node_name)
2210 node = dns_data.name
2211 primary_ip = self.op.primary_ip = dns_data.ip
2212 secondary_ip = getattr(self.op, "secondary_ip", None)
2213 if secondary_ip is None:
2214 secondary_ip = primary_ip
2215 if not utils.IsValidIP(secondary_ip):
2216 raise errors.OpPrereqError("Invalid secondary IP given")
2217 self.op.secondary_ip = secondary_ip
2219 node_list = cfg.GetNodeList()
2220 if not self.op.readd and node in node_list:
2221 raise errors.OpPrereqError("Node %s is already in the configuration" %
2223 elif self.op.readd and node not in node_list:
2224 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2226 for existing_node_name in node_list:
2227 existing_node = cfg.GetNodeInfo(existing_node_name)
2229 if self.op.readd and node == existing_node_name:
2230 if (existing_node.primary_ip != primary_ip or
2231 existing_node.secondary_ip != secondary_ip):
2232 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2233 " address configuration as before")
2236 if (existing_node.primary_ip == primary_ip or
2237 existing_node.secondary_ip == primary_ip or
2238 existing_node.primary_ip == secondary_ip or
2239 existing_node.secondary_ip == secondary_ip):
2240 raise errors.OpPrereqError("New node ip address(es) conflict with"
2241 " existing node %s" % existing_node.name)
2243 # check that the type of the node (single versus dual homed) is the
2244 # same as for the master
2245 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2246 master_singlehomed = myself.secondary_ip == myself.primary_ip
2247 newbie_singlehomed = secondary_ip == primary_ip
2248 if master_singlehomed != newbie_singlehomed:
2249 if master_singlehomed:
2250 raise errors.OpPrereqError("The master has no private ip but the"
2251 " new node has one")
2253 raise errors.OpPrereqError("The master has a private ip but the"
2254 " new node doesn't have one")
2256 # checks reachablity
2257 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2258 raise errors.OpPrereqError("Node not reachable by ping")
2260 if not newbie_singlehomed:
2261 # check reachability from my secondary ip to newbie's secondary ip
2262 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2263 source=myself.secondary_ip):
2264 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2265 " based ping to noded port")
2267 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2268 mc_now, _ = self.cfg.GetMasterCandidateStats()
2269 master_candidate = mc_now < cp_size
2271 self.new_node = objects.Node(name=node,
2272 primary_ip=primary_ip,
2273 secondary_ip=secondary_ip,
2274 master_candidate=master_candidate,
2275 offline=False, drained=False)
2277 def Exec(self, feedback_fn):
2278 """Adds the new node to the cluster.
2281 new_node = self.new_node
2282 node = new_node.name
2284 # check connectivity
2285 result = self.rpc.call_version([node])[node]
2286 result.Raise("Can't get version information from node %s" % node)
2287 if constants.PROTOCOL_VERSION == result.payload:
2288 logging.info("Communication to node %s fine, sw version %s match",
2289 node, result.payload)
2291 raise errors.OpExecError("Version mismatch master version %s,"
2292 " node version %s" %
2293 (constants.PROTOCOL_VERSION, result.payload))
2296 logging.info("Copy ssh key to node %s", node)
2297 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2299 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2300 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2306 keyarray.append(f.read())
2310 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2312 keyarray[3], keyarray[4], keyarray[5])
2313 result.Raise("Cannot transfer ssh keys to the new node")
2315 # Add node to our /etc/hosts, and add key to known_hosts
2316 if self.cfg.GetClusterInfo().modify_etc_hosts:
2317 utils.AddHostToEtcHosts(new_node.name)
2319 if new_node.secondary_ip != new_node.primary_ip:
2320 result = self.rpc.call_node_has_ip_address(new_node.name,
2321 new_node.secondary_ip)
2322 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2324 if not result.payload:
2325 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2326 " you gave (%s). Please fix and re-run this"
2327 " command." % new_node.secondary_ip)
2329 node_verify_list = [self.cfg.GetMasterNode()]
2330 node_verify_param = {
2332 # TODO: do a node-net-test as well?
2335 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2336 self.cfg.GetClusterName())
2337 for verifier in node_verify_list:
2338 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2339 nl_payload = result[verifier].payload['nodelist']
2341 for failed in nl_payload:
2342 feedback_fn("ssh/hostname verification failed %s -> %s" %
2343 (verifier, nl_payload[failed]))
2344 raise errors.OpExecError("ssh/hostname verification failed.")
2347 _RedistributeAncillaryFiles(self)
2348 self.context.ReaddNode(new_node)
2350 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2351 self.context.AddNode(new_node)
2354 class LUSetNodeParams(LogicalUnit):
2355 """Modifies the parameters of a node.
2358 HPATH = "node-modify"
2359 HTYPE = constants.HTYPE_NODE
2360 _OP_REQP = ["node_name"]
2363 def CheckArguments(self):
2364 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2365 if node_name is None:
2366 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2367 self.op.node_name = node_name
2368 _CheckBooleanOpField(self.op, 'master_candidate')
2369 _CheckBooleanOpField(self.op, 'offline')
2370 _CheckBooleanOpField(self.op, 'drained')
2371 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2372 if all_mods.count(None) == 3:
2373 raise errors.OpPrereqError("Please pass at least one modification")
2374 if all_mods.count(True) > 1:
2375 raise errors.OpPrereqError("Can't set the node into more than one"
2376 " state at the same time")
2378 def ExpandNames(self):
2379 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2381 def BuildHooksEnv(self):
2384 This runs on the master node.
2388 "OP_TARGET": self.op.node_name,
2389 "MASTER_CANDIDATE": str(self.op.master_candidate),
2390 "OFFLINE": str(self.op.offline),
2391 "DRAINED": str(self.op.drained),
2393 nl = [self.cfg.GetMasterNode(),
2397 def CheckPrereq(self):
2398 """Check prerequisites.
2400 This only checks the instance list against the existing names.
2403 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2405 if ((self.op.master_candidate == False or self.op.offline == True or
2406 self.op.drained == True) and node.master_candidate):
2407 # we will demote the node from master_candidate
2408 if self.op.node_name == self.cfg.GetMasterNode():
2409 raise errors.OpPrereqError("The master node has to be a"
2410 " master candidate, online and not drained")
2411 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2412 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2413 if num_candidates <= cp_size:
2414 msg = ("Not enough master candidates (desired"
2415 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2417 self.LogWarning(msg)
2419 raise errors.OpPrereqError(msg)
2421 if (self.op.master_candidate == True and
2422 ((node.offline and not self.op.offline == False) or
2423 (node.drained and not self.op.drained == False))):
2424 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2425 " to master_candidate" % node.name)
2429 def Exec(self, feedback_fn):
2438 if self.op.offline is not None:
2439 node.offline = self.op.offline
2440 result.append(("offline", str(self.op.offline)))
2441 if self.op.offline == True:
2442 if node.master_candidate:
2443 node.master_candidate = False
2445 result.append(("master_candidate", "auto-demotion due to offline"))
2447 node.drained = False
2448 result.append(("drained", "clear drained status due to offline"))
2450 if self.op.master_candidate is not None:
2451 node.master_candidate = self.op.master_candidate
2453 result.append(("master_candidate", str(self.op.master_candidate)))
2454 if self.op.master_candidate == False:
2455 rrc = self.rpc.call_node_demote_from_mc(node.name)
2458 self.LogWarning("Node failed to demote itself: %s" % msg)
2460 if self.op.drained is not None:
2461 node.drained = self.op.drained
2462 result.append(("drained", str(self.op.drained)))
2463 if self.op.drained == True:
2464 if node.master_candidate:
2465 node.master_candidate = False
2467 result.append(("master_candidate", "auto-demotion due to drain"))
2469 node.offline = False
2470 result.append(("offline", "clear offline status due to drain"))
2472 # this will trigger configuration file update, if needed
2473 self.cfg.Update(node)
2474 # this will trigger job queue propagation or cleanup
2476 self.context.ReaddNode(node)
2481 class LUPowercycleNode(NoHooksLU):
2482 """Powercycles a node.
2485 _OP_REQP = ["node_name", "force"]
2488 def CheckArguments(self):
2489 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2490 if node_name is None:
2491 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2492 self.op.node_name = node_name
2493 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2494 raise errors.OpPrereqError("The node is the master and the force"
2495 " parameter was not set")
2497 def ExpandNames(self):
2498 """Locking for PowercycleNode.
2500 This is a last-resource option and shouldn't block on other
2501 jobs. Therefore, we grab no locks.
2504 self.needed_locks = {}
2506 def CheckPrereq(self):
2507 """Check prerequisites.
2509 This LU has no prereqs.
2514 def Exec(self, feedback_fn):
2518 result = self.rpc.call_node_powercycle(self.op.node_name,
2519 self.cfg.GetHypervisorType())
2520 result.Raise("Failed to schedule the reboot")
2521 return result.payload
2524 class LUQueryClusterInfo(NoHooksLU):
2525 """Query cluster configuration.
2531 def ExpandNames(self):
2532 self.needed_locks = {}
2534 def CheckPrereq(self):
2535 """No prerequsites needed for this LU.
2540 def Exec(self, feedback_fn):
2541 """Return cluster config.
2544 cluster = self.cfg.GetClusterInfo()
2546 "software_version": constants.RELEASE_VERSION,
2547 "protocol_version": constants.PROTOCOL_VERSION,
2548 "config_version": constants.CONFIG_VERSION,
2549 "os_api_version": constants.OS_API_VERSION,
2550 "export_version": constants.EXPORT_VERSION,
2551 "architecture": (platform.architecture()[0], platform.machine()),
2552 "name": cluster.cluster_name,
2553 "master": cluster.master_node,
2554 "default_hypervisor": cluster.default_hypervisor,
2555 "enabled_hypervisors": cluster.enabled_hypervisors,
2556 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2557 for hypervisor in cluster.enabled_hypervisors]),
2558 "beparams": cluster.beparams,
2559 "nicparams": cluster.nicparams,
2560 "candidate_pool_size": cluster.candidate_pool_size,
2561 "master_netdev": cluster.master_netdev,
2562 "volume_group_name": cluster.volume_group_name,
2563 "file_storage_dir": cluster.file_storage_dir,
2569 class LUQueryConfigValues(NoHooksLU):
2570 """Return configuration values.
2575 _FIELDS_DYNAMIC = utils.FieldSet()
2576 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2578 def ExpandNames(self):
2579 self.needed_locks = {}
2581 _CheckOutputFields(static=self._FIELDS_STATIC,
2582 dynamic=self._FIELDS_DYNAMIC,
2583 selected=self.op.output_fields)
2585 def CheckPrereq(self):
2586 """No prerequisites.
2591 def Exec(self, feedback_fn):
2592 """Dump a representation of the cluster config to the standard output.
2596 for field in self.op.output_fields:
2597 if field == "cluster_name":
2598 entry = self.cfg.GetClusterName()
2599 elif field == "master_node":
2600 entry = self.cfg.GetMasterNode()
2601 elif field == "drain_flag":
2602 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2604 raise errors.ParameterError(field)
2605 values.append(entry)
2609 class LUActivateInstanceDisks(NoHooksLU):
2610 """Bring up an instance's disks.
2613 _OP_REQP = ["instance_name"]
2616 def ExpandNames(self):
2617 self._ExpandAndLockInstance()
2618 self.needed_locks[locking.LEVEL_NODE] = []
2619 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2621 def DeclareLocks(self, level):
2622 if level == locking.LEVEL_NODE:
2623 self._LockInstancesNodes()
2625 def CheckPrereq(self):
2626 """Check prerequisites.
2628 This checks that the instance is in the cluster.
2631 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2632 assert self.instance is not None, \
2633 "Cannot retrieve locked instance %s" % self.op.instance_name
2634 _CheckNodeOnline(self, self.instance.primary_node)
2636 def Exec(self, feedback_fn):
2637 """Activate the disks.
2640 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2642 raise errors.OpExecError("Cannot activate block devices")
2647 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2648 """Prepare the block devices for an instance.
2650 This sets up the block devices on all nodes.
2652 @type lu: L{LogicalUnit}
2653 @param lu: the logical unit on whose behalf we execute
2654 @type instance: L{objects.Instance}
2655 @param instance: the instance for whose disks we assemble
2656 @type ignore_secondaries: boolean
2657 @param ignore_secondaries: if true, errors on secondary nodes
2658 won't result in an error return from the function
2659 @return: False if the operation failed, otherwise a list of
2660 (host, instance_visible_name, node_visible_name)
2661 with the mapping from node devices to instance devices
2666 iname = instance.name
2667 # With the two passes mechanism we try to reduce the window of
2668 # opportunity for the race condition of switching DRBD to primary
2669 # before handshaking occured, but we do not eliminate it
2671 # The proper fix would be to wait (with some limits) until the
2672 # connection has been made and drbd transitions from WFConnection
2673 # into any other network-connected state (Connected, SyncTarget,
2676 # 1st pass, assemble on all nodes in secondary mode
2677 for inst_disk in instance.disks:
2678 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2679 lu.cfg.SetDiskID(node_disk, node)
2680 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2681 msg = result.fail_msg
2683 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2684 " (is_primary=False, pass=1): %s",
2685 inst_disk.iv_name, node, msg)
2686 if not ignore_secondaries:
2689 # FIXME: race condition on drbd migration to primary
2691 # 2nd pass, do only the primary node
2692 for inst_disk in instance.disks:
2693 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2694 if node != instance.primary_node:
2696 lu.cfg.SetDiskID(node_disk, node)
2697 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2698 msg = result.fail_msg
2700 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2701 " (is_primary=True, pass=2): %s",
2702 inst_disk.iv_name, node, msg)
2704 device_info.append((instance.primary_node, inst_disk.iv_name,
2707 # leave the disks configured for the primary node
2708 # this is a workaround that would be fixed better by
2709 # improving the logical/physical id handling
2710 for disk in instance.disks:
2711 lu.cfg.SetDiskID(disk, instance.primary_node)
2713 return disks_ok, device_info
2716 def _StartInstanceDisks(lu, instance, force):
2717 """Start the disks of an instance.
2720 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2721 ignore_secondaries=force)
2723 _ShutdownInstanceDisks(lu, instance)
2724 if force is not None and not force:
2725 lu.proc.LogWarning("", hint="If the message above refers to a"
2727 " you can retry the operation using '--force'.")
2728 raise errors.OpExecError("Disk consistency error")
2731 class LUDeactivateInstanceDisks(NoHooksLU):
2732 """Shutdown an instance's disks.
2735 _OP_REQP = ["instance_name"]
2738 def ExpandNames(self):
2739 self._ExpandAndLockInstance()
2740 self.needed_locks[locking.LEVEL_NODE] = []
2741 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2743 def DeclareLocks(self, level):
2744 if level == locking.LEVEL_NODE:
2745 self._LockInstancesNodes()
2747 def CheckPrereq(self):
2748 """Check prerequisites.
2750 This checks that the instance is in the cluster.
2753 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2754 assert self.instance is not None, \
2755 "Cannot retrieve locked instance %s" % self.op.instance_name
2757 def Exec(self, feedback_fn):
2758 """Deactivate the disks
2761 instance = self.instance
2762 _SafeShutdownInstanceDisks(self, instance)
2765 def _SafeShutdownInstanceDisks(lu, instance):
2766 """Shutdown block devices of an instance.
2768 This function checks if an instance is running, before calling
2769 _ShutdownInstanceDisks.
2772 pnode = instance.primary_node
2773 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2774 ins_l.Raise("Can't contact node %s" % pnode)
2776 if instance.name in ins_l.payload:
2777 raise errors.OpExecError("Instance is running, can't shutdown"
2780 _ShutdownInstanceDisks(lu, instance)
2783 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2784 """Shutdown block devices of an instance.
2786 This does the shutdown on all nodes of the instance.
2788 If the ignore_primary is false, errors on the primary node are
2793 for disk in instance.disks:
2794 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2795 lu.cfg.SetDiskID(top_disk, node)
2796 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2797 msg = result.fail_msg
2799 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2800 disk.iv_name, node, msg)
2801 if not ignore_primary or node != instance.primary_node:
2806 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2807 """Checks if a node has enough free memory.
2809 This function check if a given node has the needed amount of free
2810 memory. In case the node has less memory or we cannot get the
2811 information from the node, this function raise an OpPrereqError
2814 @type lu: C{LogicalUnit}
2815 @param lu: a logical unit from which we get configuration data
2817 @param node: the node to check
2818 @type reason: C{str}
2819 @param reason: string to use in the error message
2820 @type requested: C{int}
2821 @param requested: the amount of memory in MiB to check for
2822 @type hypervisor_name: C{str}
2823 @param hypervisor_name: the hypervisor to ask for memory stats
2824 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2825 we cannot check the node
2828 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2829 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2830 free_mem = nodeinfo[node].payload.get('memory_free', None)
2831 if not isinstance(free_mem, int):
2832 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2833 " was '%s'" % (node, free_mem))
2834 if requested > free_mem:
2835 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2836 " needed %s MiB, available %s MiB" %
2837 (node, reason, requested, free_mem))
2840 class LUStartupInstance(LogicalUnit):
2841 """Starts an instance.
2844 HPATH = "instance-start"
2845 HTYPE = constants.HTYPE_INSTANCE
2846 _OP_REQP = ["instance_name", "force"]
2849 def ExpandNames(self):
2850 self._ExpandAndLockInstance()
2852 def BuildHooksEnv(self):
2855 This runs on master, primary and secondary nodes of the instance.
2859 "FORCE": self.op.force,
2861 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2862 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2865 def CheckPrereq(self):
2866 """Check prerequisites.
2868 This checks that the instance is in the cluster.
2871 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2872 assert self.instance is not None, \
2873 "Cannot retrieve locked instance %s" % self.op.instance_name
2876 self.beparams = getattr(self.op, "beparams", {})
2878 if not isinstance(self.beparams, dict):
2879 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2880 " dict" % (type(self.beparams), ))
2881 # fill the beparams dict
2882 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2883 self.op.beparams = self.beparams
2886 self.hvparams = getattr(self.op, "hvparams", {})
2888 if not isinstance(self.hvparams, dict):
2889 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2890 " dict" % (type(self.hvparams), ))
2892 # check hypervisor parameter syntax (locally)
2893 cluster = self.cfg.GetClusterInfo()
2894 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2895 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2897 filled_hvp.update(self.hvparams)
2898 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2899 hv_type.CheckParameterSyntax(filled_hvp)
2900 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2901 self.op.hvparams = self.hvparams
2903 _CheckNodeOnline(self, instance.primary_node)
2905 bep = self.cfg.GetClusterInfo().FillBE(instance)
2906 # check bridges existance
2907 _CheckInstanceBridgesExist(self, instance)
2909 remote_info = self.rpc.call_instance_info(instance.primary_node,
2911 instance.hypervisor)
2912 remote_info.Raise("Error checking node %s" % instance.primary_node,
2914 if not remote_info.payload: # not running already
2915 _CheckNodeFreeMemory(self, instance.primary_node,
2916 "starting instance %s" % instance.name,
2917 bep[constants.BE_MEMORY], instance.hypervisor)
2919 def Exec(self, feedback_fn):
2920 """Start the instance.
2923 instance = self.instance
2924 force = self.op.force
2926 self.cfg.MarkInstanceUp(instance.name)
2928 node_current = instance.primary_node
2930 _StartInstanceDisks(self, instance, force)
2932 result = self.rpc.call_instance_start(node_current, instance,
2933 self.hvparams, self.beparams)
2934 msg = result.fail_msg
2936 _ShutdownInstanceDisks(self, instance)
2937 raise errors.OpExecError("Could not start instance: %s" % msg)
2940 class LURebootInstance(LogicalUnit):
2941 """Reboot an instance.
2944 HPATH = "instance-reboot"
2945 HTYPE = constants.HTYPE_INSTANCE
2946 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2949 def ExpandNames(self):
2950 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2951 constants.INSTANCE_REBOOT_HARD,
2952 constants.INSTANCE_REBOOT_FULL]:
2953 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2954 (constants.INSTANCE_REBOOT_SOFT,
2955 constants.INSTANCE_REBOOT_HARD,
2956 constants.INSTANCE_REBOOT_FULL))
2957 self._ExpandAndLockInstance()
2959 def BuildHooksEnv(self):
2962 This runs on master, primary and secondary nodes of the instance.
2966 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2967 "REBOOT_TYPE": self.op.reboot_type,
2969 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2970 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2973 def CheckPrereq(self):
2974 """Check prerequisites.
2976 This checks that the instance is in the cluster.
2979 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2980 assert self.instance is not None, \
2981 "Cannot retrieve locked instance %s" % self.op.instance_name
2983 _CheckNodeOnline(self, instance.primary_node)
2985 # check bridges existance
2986 _CheckInstanceBridgesExist(self, instance)
2988 def Exec(self, feedback_fn):
2989 """Reboot the instance.
2992 instance = self.instance
2993 ignore_secondaries = self.op.ignore_secondaries
2994 reboot_type = self.op.reboot_type
2996 node_current = instance.primary_node
2998 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2999 constants.INSTANCE_REBOOT_HARD]:
3000 for disk in instance.disks:
3001 self.cfg.SetDiskID(disk, node_current)
3002 result = self.rpc.call_instance_reboot(node_current, instance,
3004 result.Raise("Could not reboot instance")
3006 result = self.rpc.call_instance_shutdown(node_current, instance)
3007 result.Raise("Could not shutdown instance for full reboot")
3008 _ShutdownInstanceDisks(self, instance)
3009 _StartInstanceDisks(self, instance, ignore_secondaries)
3010 result = self.rpc.call_instance_start(node_current, instance, None, None)
3011 msg = result.fail_msg
3013 _ShutdownInstanceDisks(self, instance)
3014 raise errors.OpExecError("Could not start instance for"
3015 " full reboot: %s" % msg)
3017 self.cfg.MarkInstanceUp(instance.name)
3020 class LUShutdownInstance(LogicalUnit):
3021 """Shutdown an instance.
3024 HPATH = "instance-stop"
3025 HTYPE = constants.HTYPE_INSTANCE
3026 _OP_REQP = ["instance_name"]
3029 def ExpandNames(self):
3030 self._ExpandAndLockInstance()
3032 def BuildHooksEnv(self):
3035 This runs on master, primary and secondary nodes of the instance.
3038 env = _BuildInstanceHookEnvByObject(self, self.instance)
3039 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3042 def CheckPrereq(self):
3043 """Check prerequisites.
3045 This checks that the instance is in the cluster.
3048 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3049 assert self.instance is not None, \
3050 "Cannot retrieve locked instance %s" % self.op.instance_name
3051 _CheckNodeOnline(self, self.instance.primary_node)
3053 def Exec(self, feedback_fn):
3054 """Shutdown the instance.
3057 instance = self.instance
3058 node_current = instance.primary_node
3059 self.cfg.MarkInstanceDown(instance.name)
3060 result = self.rpc.call_instance_shutdown(node_current, instance)
3061 msg = result.fail_msg
3063 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3065 _ShutdownInstanceDisks(self, instance)
3068 class LUReinstallInstance(LogicalUnit):
3069 """Reinstall an instance.
3072 HPATH = "instance-reinstall"
3073 HTYPE = constants.HTYPE_INSTANCE
3074 _OP_REQP = ["instance_name"]
3077 def ExpandNames(self):
3078 self._ExpandAndLockInstance()
3080 def BuildHooksEnv(self):
3083 This runs on master, primary and secondary nodes of the instance.
3086 env = _BuildInstanceHookEnvByObject(self, self.instance)
3087 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3090 def CheckPrereq(self):
3091 """Check prerequisites.
3093 This checks that the instance is in the cluster and is not running.
3096 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3097 assert instance is not None, \
3098 "Cannot retrieve locked instance %s" % self.op.instance_name
3099 _CheckNodeOnline(self, instance.primary_node)
3101 if instance.disk_template == constants.DT_DISKLESS:
3102 raise errors.OpPrereqError("Instance '%s' has no disks" %
3103 self.op.instance_name)
3104 if instance.admin_up:
3105 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3106 self.op.instance_name)
3107 remote_info = self.rpc.call_instance_info(instance.primary_node,
3109 instance.hypervisor)
3110 remote_info.Raise("Error checking node %s" % instance.primary_node,
3112 if remote_info.payload:
3113 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3114 (self.op.instance_name,
3115 instance.primary_node))
3117 self.op.os_type = getattr(self.op, "os_type", None)
3118 if self.op.os_type is not None:
3120 pnode = self.cfg.GetNodeInfo(
3121 self.cfg.ExpandNodeName(instance.primary_node))
3123 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3125 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3126 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3127 (self.op.os_type, pnode.name), prereq=True)
3129 self.instance = instance
3131 def Exec(self, feedback_fn):
3132 """Reinstall the instance.
3135 inst = self.instance
3137 if self.op.os_type is not None:
3138 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3139 inst.os = self.op.os_type
3140 self.cfg.Update(inst)
3142 _StartInstanceDisks(self, inst, None)
3144 feedback_fn("Running the instance OS create scripts...")
3145 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3146 result.Raise("Could not install OS for instance %s on node %s" %
3147 (inst.name, inst.primary_node))
3149 _ShutdownInstanceDisks(self, inst)
3152 class LURenameInstance(LogicalUnit):
3153 """Rename an instance.
3156 HPATH = "instance-rename"
3157 HTYPE = constants.HTYPE_INSTANCE
3158 _OP_REQP = ["instance_name", "new_name"]
3160 def BuildHooksEnv(self):
3163 This runs on master, primary and secondary nodes of the instance.
3166 env = _BuildInstanceHookEnvByObject(self, self.instance)
3167 env["INSTANCE_NEW_NAME"] = self.op.new_name
3168 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3171 def CheckPrereq(self):
3172 """Check prerequisites.
3174 This checks that the instance is in the cluster and is not running.
3177 instance = self.cfg.GetInstanceInfo(
3178 self.cfg.ExpandInstanceName(self.op.instance_name))
3179 if instance is None:
3180 raise errors.OpPrereqError("Instance '%s' not known" %
3181 self.op.instance_name)
3182 _CheckNodeOnline(self, instance.primary_node)
3184 if instance.admin_up:
3185 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3186 self.op.instance_name)
3187 remote_info = self.rpc.call_instance_info(instance.primary_node,
3189 instance.hypervisor)
3190 remote_info.Raise("Error checking node %s" % instance.primary_node,
3192 if remote_info.payload:
3193 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3194 (self.op.instance_name,
3195 instance.primary_node))
3196 self.instance = instance
3198 # new name verification
3199 name_info = utils.HostInfo(self.op.new_name)
3201 self.op.new_name = new_name = name_info.name
3202 instance_list = self.cfg.GetInstanceList()
3203 if new_name in instance_list:
3204 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3207 if not getattr(self.op, "ignore_ip", False):
3208 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3209 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3210 (name_info.ip, new_name))
3213 def Exec(self, feedback_fn):
3214 """Reinstall the instance.
3217 inst = self.instance
3218 old_name = inst.name
3220 if inst.disk_template == constants.DT_FILE:
3221 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3223 self.cfg.RenameInstance(inst.name, self.op.new_name)
3224 # Change the instance lock. This is definitely safe while we hold the BGL
3225 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3226 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3228 # re-read the instance from the configuration after rename
3229 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3231 if inst.disk_template == constants.DT_FILE:
3232 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3233 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3234 old_file_storage_dir,
3235 new_file_storage_dir)
3236 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3237 " (but the instance has been renamed in Ganeti)" %
3238 (inst.primary_node, old_file_storage_dir,
3239 new_file_storage_dir))
3241 _StartInstanceDisks(self, inst, None)
3243 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3245 msg = result.fail_msg
3247 msg = ("Could not run OS rename script for instance %s on node %s"
3248 " (but the instance has been renamed in Ganeti): %s" %
3249 (inst.name, inst.primary_node, msg))
3250 self.proc.LogWarning(msg)
3252 _ShutdownInstanceDisks(self, inst)
3255 class LURemoveInstance(LogicalUnit):
3256 """Remove an instance.
3259 HPATH = "instance-remove"
3260 HTYPE = constants.HTYPE_INSTANCE
3261 _OP_REQP = ["instance_name", "ignore_failures"]
3264 def ExpandNames(self):
3265 self._ExpandAndLockInstance()
3266 self.needed_locks[locking.LEVEL_NODE] = []
3267 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3269 def DeclareLocks(self, level):
3270 if level == locking.LEVEL_NODE:
3271 self._LockInstancesNodes()
3273 def BuildHooksEnv(self):
3276 This runs on master, primary and secondary nodes of the instance.
3279 env = _BuildInstanceHookEnvByObject(self, self.instance)
3280 nl = [self.cfg.GetMasterNode()]
3283 def CheckPrereq(self):
3284 """Check prerequisites.
3286 This checks that the instance is in the cluster.
3289 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3290 assert self.instance is not None, \
3291 "Cannot retrieve locked instance %s" % self.op.instance_name
3293 def Exec(self, feedback_fn):
3294 """Remove the instance.
3297 instance = self.instance
3298 logging.info("Shutting down instance %s on node %s",
3299 instance.name, instance.primary_node)
3301 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3302 msg = result.fail_msg
3304 if self.op.ignore_failures:
3305 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3307 raise errors.OpExecError("Could not shutdown instance %s on"
3309 (instance.name, instance.primary_node, msg))
3311 logging.info("Removing block devices for instance %s", instance.name)
3313 if not _RemoveDisks(self, instance):
3314 if self.op.ignore_failures:
3315 feedback_fn("Warning: can't remove instance's disks")
3317 raise errors.OpExecError("Can't remove instance's disks")
3319 logging.info("Removing instance %s out of cluster config", instance.name)
3321 self.cfg.RemoveInstance(instance.name)
3322 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3325 class LUQueryInstances(NoHooksLU):
3326 """Logical unit for querying instances.
3329 _OP_REQP = ["output_fields", "names", "use_locking"]
3331 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3333 "disk_template", "ip", "mac", "bridge",
3334 "sda_size", "sdb_size", "vcpus", "tags",
3335 "network_port", "beparams",
3336 r"(disk)\.(size)/([0-9]+)",
3337 r"(disk)\.(sizes)", "disk_usage",
3338 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3339 r"(nic)\.(macs|ips|bridges)",
3340 r"(disk|nic)\.(count)",
3341 "serial_no", "hypervisor", "hvparams",] +
3343 for name in constants.HVS_PARAMETERS] +
3345 for name in constants.BES_PARAMETERS])
3346 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3349 def ExpandNames(self):
3350 _CheckOutputFields(static=self._FIELDS_STATIC,
3351 dynamic=self._FIELDS_DYNAMIC,
3352 selected=self.op.output_fields)
3354 self.needed_locks = {}
3355 self.share_locks[locking.LEVEL_INSTANCE] = 1
3356 self.share_locks[locking.LEVEL_NODE] = 1
3359 self.wanted = _GetWantedInstances(self, self.op.names)
3361 self.wanted = locking.ALL_SET
3363 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3364 self.do_locking = self.do_node_query and self.op.use_locking
3366 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3367 self.needed_locks[locking.LEVEL_NODE] = []
3368 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3370 def DeclareLocks(self, level):
3371 if level == locking.LEVEL_NODE and self.do_locking:
3372 self._LockInstancesNodes()
3374 def CheckPrereq(self):
3375 """Check prerequisites.
3380 def Exec(self, feedback_fn):
3381 """Computes the list of nodes and their attributes.
3384 all_info = self.cfg.GetAllInstancesInfo()
3385 if self.wanted == locking.ALL_SET:
3386 # caller didn't specify instance names, so ordering is not important
3388 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3390 instance_names = all_info.keys()
3391 instance_names = utils.NiceSort(instance_names)
3393 # caller did specify names, so we must keep the ordering
3395 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3397 tgt_set = all_info.keys()
3398 missing = set(self.wanted).difference(tgt_set)
3400 raise errors.OpExecError("Some instances were removed before"
3401 " retrieving their data: %s" % missing)
3402 instance_names = self.wanted
3404 instance_list = [all_info[iname] for iname in instance_names]
3406 # begin data gathering
3408 nodes = frozenset([inst.primary_node for inst in instance_list])
3409 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3413 if self.do_node_query:
3415 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3417 result = node_data[name]
3419 # offline nodes will be in both lists
3420 off_nodes.append(name)
3421 if result.failed or result.fail_msg:
3422 bad_nodes.append(name)
3425 live_data.update(result.payload)
3426 # else no instance is alive
3428 live_data = dict([(name, {}) for name in instance_names])
3430 # end data gathering
3435 for instance in instance_list:
3437 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3438 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3439 for field in self.op.output_fields:
3440 st_match = self._FIELDS_STATIC.Matches(field)
3445 elif field == "pnode":
3446 val = instance.primary_node
3447 elif field == "snodes":
3448 val = list(instance.secondary_nodes)
3449 elif field == "admin_state":
3450 val = instance.admin_up
3451 elif field == "oper_state":
3452 if instance.primary_node in bad_nodes:
3455 val = bool(live_data.get(instance.name))
3456 elif field == "status":
3457 if instance.primary_node in off_nodes:
3458 val = "ERROR_nodeoffline"
3459 elif instance.primary_node in bad_nodes:
3460 val = "ERROR_nodedown"
3462 running = bool(live_data.get(instance.name))
3464 if instance.admin_up:
3469 if instance.admin_up:
3473 elif field == "oper_ram":
3474 if instance.primary_node in bad_nodes:
3476 elif instance.name in live_data:
3477 val = live_data[instance.name].get("memory", "?")
3480 elif field == "disk_template":
3481 val = instance.disk_template
3483 val = instance.nics[0].ip
3484 elif field == "bridge":
3485 val = instance.nics[0].bridge
3486 elif field == "mac":
3487 val = instance.nics[0].mac
3488 elif field == "sda_size" or field == "sdb_size":
3489 idx = ord(field[2]) - ord('a')
3491 val = instance.FindDisk(idx).size
3492 except errors.OpPrereqError:
3494 elif field == "disk_usage": # total disk usage per node
3495 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3496 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3497 elif field == "tags":
3498 val = list(instance.GetTags())
3499 elif field == "serial_no":
3500 val = instance.serial_no
3501 elif field == "network_port":
3502 val = instance.network_port
3503 elif field == "hypervisor":
3504 val = instance.hypervisor
3505 elif field == "hvparams":
3507 elif (field.startswith(HVPREFIX) and
3508 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3509 val = i_hv.get(field[len(HVPREFIX):], None)
3510 elif field == "beparams":
3512 elif (field.startswith(BEPREFIX) and
3513 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3514 val = i_be.get(field[len(BEPREFIX):], None)
3515 elif st_match and st_match.groups():
3516 # matches a variable list
3517 st_groups = st_match.groups()
3518 if st_groups and st_groups[0] == "disk":
3519 if st_groups[1] == "count":
3520 val = len(instance.disks)
3521 elif st_groups[1] == "sizes":
3522 val = [disk.size for disk in instance.disks]
3523 elif st_groups[1] == "size":
3525 val = instance.FindDisk(st_groups[2]).size
3526 except errors.OpPrereqError:
3529 assert False, "Unhandled disk parameter"
3530 elif st_groups[0] == "nic":
3531 if st_groups[1] == "count":
3532 val = len(instance.nics)
3533 elif st_groups[1] == "macs":
3534 val = [nic.mac for nic in instance.nics]
3535 elif st_groups[1] == "ips":
3536 val = [nic.ip for nic in instance.nics]
3537 elif st_groups[1] == "bridges":
3538 val = [nic.bridge for nic in instance.nics]
3541 nic_idx = int(st_groups[2])
3542 if nic_idx >= len(instance.nics):
3545 if st_groups[1] == "mac":
3546 val = instance.nics[nic_idx].mac
3547 elif st_groups[1] == "ip":
3548 val = instance.nics[nic_idx].ip
3549 elif st_groups[1] == "bridge":
3550 val = instance.nics[nic_idx].bridge
3552 assert False, "Unhandled NIC parameter"
3554 assert False, "Unhandled variable parameter"
3556 raise errors.ParameterError(field)
3563 class LUFailoverInstance(LogicalUnit):
3564 """Failover an instance.
3567 HPATH = "instance-failover"
3568 HTYPE = constants.HTYPE_INSTANCE
3569 _OP_REQP = ["instance_name", "ignore_consistency"]
3572 def ExpandNames(self):
3573 self._ExpandAndLockInstance()
3574 self.needed_locks[locking.LEVEL_NODE] = []
3575 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3577 def DeclareLocks(self, level):
3578 if level == locking.LEVEL_NODE:
3579 self._LockInstancesNodes()
3581 def BuildHooksEnv(self):
3584 This runs on master, primary and secondary nodes of the instance.
3588 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3590 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3591 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3594 def CheckPrereq(self):
3595 """Check prerequisites.
3597 This checks that the instance is in the cluster.
3600 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3601 assert self.instance is not None, \
3602 "Cannot retrieve locked instance %s" % self.op.instance_name
3604 bep = self.cfg.GetClusterInfo().FillBE(instance)
3605 if instance.disk_template not in constants.DTS_NET_MIRROR:
3606 raise errors.OpPrereqError("Instance's disk layout is not"
3607 " network mirrored, cannot failover.")
3609 secondary_nodes = instance.secondary_nodes
3610 if not secondary_nodes:
3611 raise errors.ProgrammerError("no secondary node but using "
3612 "a mirrored disk template")
3614 target_node = secondary_nodes[0]
3615 _CheckNodeOnline(self, target_node)
3616 _CheckNodeNotDrained(self, target_node)
3617 # check memory requirements on the secondary node
3618 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3619 instance.name, bep[constants.BE_MEMORY],
3620 instance.hypervisor)
3621 # check bridge existance
3622 _CheckInstanceBridgesExist(self, instance, node=target_node)
3624 def Exec(self, feedback_fn):
3625 """Failover an instance.
3627 The failover is done by shutting it down on its present node and
3628 starting it on the secondary.
3631 instance = self.instance
3633 source_node = instance.primary_node
3634 target_node = instance.secondary_nodes[0]
3636 feedback_fn("* checking disk consistency between source and target")
3637 for dev in instance.disks:
3638 # for drbd, these are drbd over lvm
3639 if not _CheckDiskConsistency(self, dev, target_node, False):
3640 if instance.admin_up and not self.op.ignore_consistency:
3641 raise errors.OpExecError("Disk %s is degraded on target node,"
3642 " aborting failover." % dev.iv_name)
3644 feedback_fn("* shutting down instance on source node")
3645 logging.info("Shutting down instance %s on node %s",
3646 instance.name, source_node)
3648 result = self.rpc.call_instance_shutdown(source_node, instance)
3649 msg = result.fail_msg
3651 if self.op.ignore_consistency:
3652 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3653 " Proceeding anyway. Please make sure node"
3654 " %s is down. Error details: %s",
3655 instance.name, source_node, source_node, msg)
3657 raise errors.OpExecError("Could not shutdown instance %s on"
3659 (instance.name, source_node, msg))
3661 feedback_fn("* deactivating the instance's disks on source node")
3662 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3663 raise errors.OpExecError("Can't shut down the instance's disks.")
3665 instance.primary_node = target_node
3666 # distribute new instance config to the other nodes
3667 self.cfg.Update(instance)
3669 # Only start the instance if it's marked as up
3670 if instance.admin_up:
3671 feedback_fn("* activating the instance's disks on target node")
3672 logging.info("Starting instance %s on node %s",
3673 instance.name, target_node)
3675 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3676 ignore_secondaries=True)
3678 _ShutdownInstanceDisks(self, instance)
3679 raise errors.OpExecError("Can't activate the instance's disks")
3681 feedback_fn("* starting the instance on the target node")
3682 result = self.rpc.call_instance_start(target_node, instance, None, None)
3683 msg = result.fail_msg
3685 _ShutdownInstanceDisks(self, instance)
3686 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3687 (instance.name, target_node, msg))
3690 class LUMigrateInstance(LogicalUnit):
3691 """Migrate an instance.
3693 This is migration without shutting down, compared to the failover,
3694 which is done with shutdown.
3697 HPATH = "instance-migrate"
3698 HTYPE = constants.HTYPE_INSTANCE
3699 _OP_REQP = ["instance_name", "live", "cleanup"]
3703 def ExpandNames(self):
3704 self._ExpandAndLockInstance()
3705 self.needed_locks[locking.LEVEL_NODE] = []
3706 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3708 def DeclareLocks(self, level):
3709 if level == locking.LEVEL_NODE:
3710 self._LockInstancesNodes()
3712 def BuildHooksEnv(self):
3715 This runs on master, primary and secondary nodes of the instance.
3718 env = _BuildInstanceHookEnvByObject(self, self.instance)
3719 env["MIGRATE_LIVE"] = self.op.live
3720 env["MIGRATE_CLEANUP"] = self.op.cleanup
3721 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3724 def CheckPrereq(self):
3725 """Check prerequisites.
3727 This checks that the instance is in the cluster.
3730 instance = self.cfg.GetInstanceInfo(
3731 self.cfg.ExpandInstanceName(self.op.instance_name))
3732 if instance is None:
3733 raise errors.OpPrereqError("Instance '%s' not known" %
3734 self.op.instance_name)
3736 if instance.disk_template != constants.DT_DRBD8:
3737 raise errors.OpPrereqError("Instance's disk layout is not"
3738 " drbd8, cannot migrate.")
3740 secondary_nodes = instance.secondary_nodes
3741 if not secondary_nodes:
3742 raise errors.ConfigurationError("No secondary node but using"
3743 " drbd8 disk template")
3745 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3747 target_node = secondary_nodes[0]
3748 # check memory requirements on the secondary node
3749 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3750 instance.name, i_be[constants.BE_MEMORY],
3751 instance.hypervisor)
3753 # check bridge existance
3754 _CheckInstanceBridgesExist(self, instance, node=target_node)
3756 if not self.op.cleanup:
3757 _CheckNodeNotDrained(self, target_node)
3758 result = self.rpc.call_instance_migratable(instance.primary_node,
3760 result.Raise("Can't migrate, please use failover", prereq=True)
3762 self.instance = instance
3764 def _WaitUntilSync(self):
3765 """Poll with custom rpc for disk sync.
3767 This uses our own step-based rpc call.
3770 self.feedback_fn("* wait until resync is done")
3774 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3776 self.instance.disks)
3778 for node, nres in result.items():
3779 nres.Raise("Cannot resync disks on node %s" % node)
3780 node_done, node_percent = nres.payload
3781 all_done = all_done and node_done
3782 if node_percent is not None:
3783 min_percent = min(min_percent, node_percent)
3785 if min_percent < 100:
3786 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3789 def _EnsureSecondary(self, node):
3790 """Demote a node to secondary.
3793 self.feedback_fn("* switching node %s to secondary mode" % node)
3795 for dev in self.instance.disks:
3796 self.cfg.SetDiskID(dev, node)
3798 result = self.rpc.call_blockdev_close(node, self.instance.name,
3799 self.instance.disks)
3800 result.Raise("Cannot change disk to secondary on node %s" % node)
3802 def _GoStandalone(self):
3803 """Disconnect from the network.
3806 self.feedback_fn("* changing into standalone mode")
3807 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3808 self.instance.disks)
3809 for node, nres in result.items():
3810 nres.Raise("Cannot disconnect disks node %s" % node)
3812 def _GoReconnect(self, multimaster):
3813 """Reconnect to the network.
3819 msg = "single-master"
3820 self.feedback_fn("* changing disks into %s mode" % msg)
3821 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3822 self.instance.disks,
3823 self.instance.name, multimaster)
3824 for node, nres in result.items():
3825 nres.Raise("Cannot change disks config on node %s" % node)
3827 def _ExecCleanup(self):
3828 """Try to cleanup after a failed migration.
3830 The cleanup is done by:
3831 - check that the instance is running only on one node
3832 (and update the config if needed)
3833 - change disks on its secondary node to secondary
3834 - wait until disks are fully synchronized
3835 - disconnect from the network
3836 - change disks into single-master mode
3837 - wait again until disks are fully synchronized
3840 instance = self.instance
3841 target_node = self.target_node
3842 source_node = self.source_node
3844 # check running on only one node
3845 self.feedback_fn("* checking where the instance actually runs"
3846 " (if this hangs, the hypervisor might be in"
3848 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3849 for node, result in ins_l.items():
3850 result.Raise("Can't contact node %s" % node)
3852 runningon_source = instance.name in ins_l[source_node].payload
3853 runningon_target = instance.name in ins_l[target_node].payload
3855 if runningon_source and runningon_target:
3856 raise errors.OpExecError("Instance seems to be running on two nodes,"
3857 " or the hypervisor is confused. You will have"
3858 " to ensure manually that it runs only on one"
3859 " and restart this operation.")
3861 if not (runningon_source or runningon_target):
3862 raise errors.OpExecError("Instance does not seem to be running at all."
3863 " In this case, it's safer to repair by"
3864 " running 'gnt-instance stop' to ensure disk"
3865 " shutdown, and then restarting it.")
3867 if runningon_target:
3868 # the migration has actually succeeded, we need to update the config
3869 self.feedback_fn("* instance running on secondary node (%s),"
3870 " updating config" % target_node)
3871 instance.primary_node = target_node
3872 self.cfg.Update(instance)
3873 demoted_node = source_node
3875 self.feedback_fn("* instance confirmed to be running on its"
3876 " primary node (%s)" % source_node)
3877 demoted_node = target_node
3879 self._EnsureSecondary(demoted_node)
3881 self._WaitUntilSync()
3882 except errors.OpExecError:
3883 # we ignore here errors, since if the device is standalone, it
3884 # won't be able to sync
3886 self._GoStandalone()
3887 self._GoReconnect(False)
3888 self._WaitUntilSync()
3890 self.feedback_fn("* done")
3892 def _RevertDiskStatus(self):
3893 """Try to revert the disk status after a failed migration.
3896 target_node = self.target_node
3898 self._EnsureSecondary(target_node)
3899 self._GoStandalone()
3900 self._GoReconnect(False)
3901 self._WaitUntilSync()
3902 except errors.OpExecError, err:
3903 self.LogWarning("Migration failed and I can't reconnect the"
3904 " drives: error '%s'\n"
3905 "Please look and recover the instance status" %
3908 def _AbortMigration(self):
3909 """Call the hypervisor code to abort a started migration.
3912 instance = self.instance
3913 target_node = self.target_node
3914 migration_info = self.migration_info
3916 abort_result = self.rpc.call_finalize_migration(target_node,
3920 abort_msg = abort_result.fail_msg
3922 logging.error("Aborting migration failed on target node %s: %s" %
3923 (target_node, abort_msg))
3924 # Don't raise an exception here, as we stil have to try to revert the
3925 # disk status, even if this step failed.
3927 def _ExecMigration(self):
3928 """Migrate an instance.
3930 The migrate is done by:
3931 - change the disks into dual-master mode
3932 - wait until disks are fully synchronized again
3933 - migrate the instance
3934 - change disks on the new secondary node (the old primary) to secondary
3935 - wait until disks are fully synchronized
3936 - change disks into single-master mode
3939 instance = self.instance
3940 target_node = self.target_node
3941 source_node = self.source_node
3943 self.feedback_fn("* checking disk consistency between source and target")
3944 for dev in instance.disks:
3945 if not _CheckDiskConsistency(self, dev, target_node, False):
3946 raise errors.OpExecError("Disk %s is degraded or not fully"
3947 " synchronized on target node,"
3948 " aborting migrate." % dev.iv_name)
3950 # First get the migration information from the remote node
3951 result = self.rpc.call_migration_info(source_node, instance)
3952 msg = result.fail_msg
3954 log_err = ("Failed fetching source migration information from %s: %s" %
3956 logging.error(log_err)
3957 raise errors.OpExecError(log_err)
3959 self.migration_info = migration_info = result.payload
3961 # Then switch the disks to master/master mode
3962 self._EnsureSecondary(target_node)
3963 self._GoStandalone()
3964 self._GoReconnect(True)
3965 self._WaitUntilSync()
3967 self.feedback_fn("* preparing %s to accept the instance" % target_node)
3968 result = self.rpc.call_accept_instance(target_node,
3971 self.nodes_ip[target_node])
3973 msg = result.fail_msg
3975 logging.error("Instance pre-migration failed, trying to revert"
3976 " disk status: %s", msg)
3977 self._AbortMigration()
3978 self._RevertDiskStatus()
3979 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3980 (instance.name, msg))
3982 self.feedback_fn("* migrating instance to %s" % target_node)
3984 result = self.rpc.call_instance_migrate(source_node, instance,
3985 self.nodes_ip[target_node],
3987 msg = result.fail_msg
3989 logging.error("Instance migration failed, trying to revert"
3990 " disk status: %s", msg)
3991 self._AbortMigration()
3992 self._RevertDiskStatus()
3993 raise errors.OpExecError("Could not migrate instance %s: %s" %
3994 (instance.name, msg))
3997 instance.primary_node = target_node
3998 # distribute new instance config to the other nodes
3999 self.cfg.Update(instance)
4001 result = self.rpc.call_finalize_migration(target_node,
4005 msg = result.fail_msg
4007 logging.error("Instance migration succeeded, but finalization failed:"
4009 raise errors.OpExecError("Could not finalize instance migration: %s" %
4012 self._EnsureSecondary(source_node)
4013 self._WaitUntilSync()
4014 self._GoStandalone()
4015 self._GoReconnect(False)
4016 self._WaitUntilSync()
4018 self.feedback_fn("* done")
4020 def Exec(self, feedback_fn):
4021 """Perform the migration.
4024 self.feedback_fn = feedback_fn
4026 self.source_node = self.instance.primary_node
4027 self.target_node = self.instance.secondary_nodes[0]
4028 self.all_nodes = [self.source_node, self.target_node]
4030 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4031 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4034 return self._ExecCleanup()
4036 return self._ExecMigration()
4039 def _CreateBlockDev(lu, node, instance, device, force_create,
4041 """Create a tree of block devices on a given node.
4043 If this device type has to be created on secondaries, create it and
4046 If not, just recurse to children keeping the same 'force' value.
4048 @param lu: the lu on whose behalf we execute
4049 @param node: the node on which to create the device
4050 @type instance: L{objects.Instance}
4051 @param instance: the instance which owns the device
4052 @type device: L{objects.Disk}
4053 @param device: the device to create
4054 @type force_create: boolean
4055 @param force_create: whether to force creation of this device; this
4056 will be change to True whenever we find a device which has
4057 CreateOnSecondary() attribute
4058 @param info: the extra 'metadata' we should attach to the device
4059 (this will be represented as a LVM tag)
4060 @type force_open: boolean
4061 @param force_open: this parameter will be passes to the
4062 L{backend.BlockdevCreate} function where it specifies
4063 whether we run on primary or not, and it affects both
4064 the child assembly and the device own Open() execution
4067 if device.CreateOnSecondary():
4071 for child in device.children:
4072 _CreateBlockDev(lu, node, instance, child, force_create,
4075 if not force_create:
4078 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4081 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4082 """Create a single block device on a given node.
4084 This will not recurse over children of the device, so they must be
4087 @param lu: the lu on whose behalf we execute
4088 @param node: the node on which to create the device
4089 @type instance: L{objects.Instance}
4090 @param instance: the instance which owns the device
4091 @type device: L{objects.Disk}
4092 @param device: the device to create
4093 @param info: the extra 'metadata' we should attach to the device
4094 (this will be represented as a LVM tag)
4095 @type force_open: boolean
4096 @param force_open: this parameter will be passes to the
4097 L{backend.BlockdevCreate} function where it specifies
4098 whether we run on primary or not, and it affects both
4099 the child assembly and the device own Open() execution
4102 lu.cfg.SetDiskID(device, node)
4103 result = lu.rpc.call_blockdev_create(node, device, device.size,
4104 instance.name, force_open, info)
4105 result.Raise("Can't create block device %s on"
4106 " node %s for instance %s" % (device, node, instance.name))
4107 if device.physical_id is None:
4108 device.physical_id = result.payload
4111 def _GenerateUniqueNames(lu, exts):
4112 """Generate a suitable LV name.
4114 This will generate a logical volume name for the given instance.
4119 new_id = lu.cfg.GenerateUniqueID()
4120 results.append("%s%s" % (new_id, val))
4124 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4126 """Generate a drbd8 device complete with its children.
4129 port = lu.cfg.AllocatePort()
4130 vgname = lu.cfg.GetVGName()
4131 shared_secret = lu.cfg.GenerateDRBDSecret()
4132 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4133 logical_id=(vgname, names[0]))
4134 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4135 logical_id=(vgname, names[1]))
4136 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4137 logical_id=(primary, secondary, port,
4140 children=[dev_data, dev_meta],
4145 def _GenerateDiskTemplate(lu, template_name,
4146 instance_name, primary_node,
4147 secondary_nodes, disk_info,
4148 file_storage_dir, file_driver,
4150 """Generate the entire disk layout for a given template type.
4153 #TODO: compute space requirements
4155 vgname = lu.cfg.GetVGName()
4156 disk_count = len(disk_info)
4158 if template_name == constants.DT_DISKLESS:
4160 elif template_name == constants.DT_PLAIN:
4161 if len(secondary_nodes) != 0:
4162 raise errors.ProgrammerError("Wrong template configuration")
4164 names = _GenerateUniqueNames(lu, [".disk%d" % i
4165 for i in range(disk_count)])
4166 for idx, disk in enumerate(disk_info):
4167 disk_index = idx + base_index
4168 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4169 logical_id=(vgname, names[idx]),
4170 iv_name="disk/%d" % disk_index,
4172 disks.append(disk_dev)
4173 elif template_name == constants.DT_DRBD8:
4174 if len(secondary_nodes) != 1:
4175 raise errors.ProgrammerError("Wrong template configuration")
4176 remote_node = secondary_nodes[0]
4177 minors = lu.cfg.AllocateDRBDMinor(
4178 [primary_node, remote_node] * len(disk_info), instance_name)
4181 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4182 for i in range(disk_count)]):
4183 names.append(lv_prefix + "_data")
4184 names.append(lv_prefix + "_meta")
4185 for idx, disk in enumerate(disk_info):
4186 disk_index = idx + base_index
4187 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4188 disk["size"], names[idx*2:idx*2+2],
4189 "disk/%d" % disk_index,
4190 minors[idx*2], minors[idx*2+1])
4191 disk_dev.mode = disk["mode"]
4192 disks.append(disk_dev)
4193 elif template_name == constants.DT_FILE:
4194 if len(secondary_nodes) != 0:
4195 raise errors.ProgrammerError("Wrong template configuration")
4197 for idx, disk in enumerate(disk_info):
4198 disk_index = idx + base_index
4199 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4200 iv_name="disk/%d" % disk_index,
4201 logical_id=(file_driver,
4202 "%s/disk%d" % (file_storage_dir,
4205 disks.append(disk_dev)
4207 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4211 def _GetInstanceInfoText(instance):
4212 """Compute that text that should be added to the disk's metadata.
4215 return "originstname+%s" % instance.name
4218 def _CreateDisks(lu, instance):
4219 """Create all disks for an instance.
4221 This abstracts away some work from AddInstance.
4223 @type lu: L{LogicalUnit}
4224 @param lu: the logical unit on whose behalf we execute
4225 @type instance: L{objects.Instance}
4226 @param instance: the instance whose disks we should create
4228 @return: the success of the creation
4231 info = _GetInstanceInfoText(instance)
4232 pnode = instance.primary_node
4234 if instance.disk_template == constants.DT_FILE:
4235 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4236 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4238 result.Raise("Failed to create directory '%s' on"
4239 " node %s: %s" % (file_storage_dir, pnode))
4241 # Note: this needs to be kept in sync with adding of disks in
4242 # LUSetInstanceParams
4243 for device in instance.disks:
4244 logging.info("Creating volume %s for instance %s",
4245 device.iv_name, instance.name)
4247 for node in instance.all_nodes:
4248 f_create = node == pnode
4249 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4252 def _RemoveDisks(lu, instance):
4253 """Remove all disks for an instance.
4255 This abstracts away some work from `AddInstance()` and
4256 `RemoveInstance()`. Note that in case some of the devices couldn't
4257 be removed, the removal will continue with the other ones (compare
4258 with `_CreateDisks()`).
4260 @type lu: L{LogicalUnit}
4261 @param lu: the logical unit on whose behalf we execute
4262 @type instance: L{objects.Instance}
4263 @param instance: the instance whose disks we should remove
4265 @return: the success of the removal
4268 logging.info("Removing block devices for instance %s", instance.name)
4271 for device in instance.disks:
4272 for node, disk in device.ComputeNodeTree(instance.primary_node):
4273 lu.cfg.SetDiskID(disk, node)
4274 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4276 lu.LogWarning("Could not remove block device %s on node %s,"
4277 " continuing anyway: %s", device.iv_name, node, msg)
4280 if instance.disk_template == constants.DT_FILE:
4281 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4282 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4284 msg = result.fail_msg
4286 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4287 file_storage_dir, instance.primary_node, msg)
4293 def _ComputeDiskSize(disk_template, disks):
4294 """Compute disk size requirements in the volume group
4297 # Required free disk space as a function of disk and swap space
4299 constants.DT_DISKLESS: None,
4300 constants.DT_PLAIN: sum(d["size"] for d in disks),
4301 # 128 MB are added for drbd metadata for each disk
4302 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4303 constants.DT_FILE: None,
4306 if disk_template not in req_size_dict:
4307 raise errors.ProgrammerError("Disk template '%s' size requirement"
4308 " is unknown" % disk_template)
4310 return req_size_dict[disk_template]
4313 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4314 """Hypervisor parameter validation.
4316 This function abstract the hypervisor parameter validation to be
4317 used in both instance create and instance modify.
4319 @type lu: L{LogicalUnit}
4320 @param lu: the logical unit for which we check
4321 @type nodenames: list
4322 @param nodenames: the list of nodes on which we should check
4323 @type hvname: string
4324 @param hvname: the name of the hypervisor we should use
4325 @type hvparams: dict
4326 @param hvparams: the parameters which we need to check
4327 @raise errors.OpPrereqError: if the parameters are not valid
4330 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4333 for node in nodenames:
4337 info.Raise("Hypervisor parameter validation failed on node %s" % node)
4340 class LUCreateInstance(LogicalUnit):
4341 """Create an instance.
4344 HPATH = "instance-add"
4345 HTYPE = constants.HTYPE_INSTANCE
4346 _OP_REQP = ["instance_name", "disks", "disk_template",
4348 "wait_for_sync", "ip_check", "nics",
4349 "hvparams", "beparams"]
4352 def _ExpandNode(self, node):
4353 """Expands and checks one node name.
4356 node_full = self.cfg.ExpandNodeName(node)
4357 if node_full is None:
4358 raise errors.OpPrereqError("Unknown node %s" % node)
4361 def ExpandNames(self):
4362 """ExpandNames for CreateInstance.
4364 Figure out the right locks for instance creation.
4367 self.needed_locks = {}
4369 # set optional parameters to none if they don't exist
4370 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4371 if not hasattr(self.op, attr):
4372 setattr(self.op, attr, None)
4374 # cheap checks, mostly valid constants given
4376 # verify creation mode
4377 if self.op.mode not in (constants.INSTANCE_CREATE,
4378 constants.INSTANCE_IMPORT):
4379 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4382 # disk template and mirror node verification
4383 if self.op.disk_template not in constants.DISK_TEMPLATES:
4384 raise errors.OpPrereqError("Invalid disk template name")
4386 if self.op.hypervisor is None:
4387 self.op.hypervisor = self.cfg.GetHypervisorType()
4389 cluster = self.cfg.GetClusterInfo()
4390 enabled_hvs = cluster.enabled_hypervisors
4391 if self.op.hypervisor not in enabled_hvs:
4392 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4393 " cluster (%s)" % (self.op.hypervisor,
4394 ",".join(enabled_hvs)))
4396 # check hypervisor parameter syntax (locally)
4397 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4398 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4400 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4401 hv_type.CheckParameterSyntax(filled_hvp)
4403 # fill and remember the beparams dict
4404 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4405 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4408 #### instance parameters check
4410 # instance name verification
4411 hostname1 = utils.HostInfo(self.op.instance_name)
4412 self.op.instance_name = instance_name = hostname1.name
4414 # this is just a preventive check, but someone might still add this
4415 # instance in the meantime, and creation will fail at lock-add time
4416 if instance_name in self.cfg.GetInstanceList():
4417 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4420 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4424 for idx, nic in enumerate(self.op.nics):
4425 nic_mode_req = nic.get("mode", None)
4426 nic_mode = nic_mode_req
4427 if nic_mode is None:
4428 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4430 # in routed mode, for the first nic, the default ip is 'auto'
4431 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4432 default_ip_mode = constants.VALUE_AUTO
4434 default_ip_mode = constants.VALUE_NONE
4436 # ip validity checks
4437 ip = nic.get("ip", default_ip_mode)
4438 if ip is None or ip.lower() == constants.VALUE_NONE:
4440 elif ip.lower() == constants.VALUE_AUTO:
4441 nic_ip = hostname1.ip
4443 if not utils.IsValidIP(ip):
4444 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4445 " like a valid IP" % ip)
4448 # TODO: check the ip for uniqueness !!
4449 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4450 raise errors.OpPrereqError("Routed nic mode requires an ip address")
4452 # MAC address verification
4453 mac = nic.get("mac", constants.VALUE_AUTO)
4454 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4455 if not utils.IsValidMac(mac.lower()):
4456 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4458 # bridge verification
4459 bridge = nic.get("bridge", None)
4460 link = nic.get("link", None)
4462 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4463 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4464 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4470 nicparams[constants.NIC_MODE] = nic_mode_req
4472 nicparams[constants.NIC_LINK] = link
4474 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4476 objects.NIC.CheckParameterSyntax(check_params)
4477 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4479 # disk checks/pre-build
4481 for disk in self.op.disks:
4482 mode = disk.get("mode", constants.DISK_RDWR)
4483 if mode not in constants.DISK_ACCESS_SET:
4484 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4486 size = disk.get("size", None)
4488 raise errors.OpPrereqError("Missing disk size")
4492 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4493 self.disks.append({"size": size, "mode": mode})
4495 # used in CheckPrereq for ip ping check
4496 self.check_ip = hostname1.ip
4498 # file storage checks
4499 if (self.op.file_driver and
4500 not self.op.file_driver in constants.FILE_DRIVER):
4501 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4502 self.op.file_driver)
4504 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4505 raise errors.OpPrereqError("File storage directory path not absolute")
4507 ### Node/iallocator related checks
4508 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4509 raise errors.OpPrereqError("One and only one of iallocator and primary"
4510 " node must be given")
4512 if self.op.iallocator:
4513 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4515 self.op.pnode = self._ExpandNode(self.op.pnode)
4516 nodelist = [self.op.pnode]
4517 if self.op.snode is not None:
4518 self.op.snode = self._ExpandNode(self.op.snode)
4519 nodelist.append(self.op.snode)
4520 self.needed_locks[locking.LEVEL_NODE] = nodelist
4522 # in case of import lock the source node too
4523 if self.op.mode == constants.INSTANCE_IMPORT:
4524 src_node = getattr(self.op, "src_node", None)
4525 src_path = getattr(self.op, "src_path", None)
4527 if src_path is None:
4528 self.op.src_path = src_path = self.op.instance_name
4530 if src_node is None:
4531 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4532 self.op.src_node = None
4533 if os.path.isabs(src_path):
4534 raise errors.OpPrereqError("Importing an instance from an absolute"
4535 " path requires a source node option.")
4537 self.op.src_node = src_node = self._ExpandNode(src_node)
4538 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4539 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4540 if not os.path.isabs(src_path):
4541 self.op.src_path = src_path = \
4542 os.path.join(constants.EXPORT_DIR, src_path)
4544 else: # INSTANCE_CREATE
4545 if getattr(self.op, "os_type", None) is None:
4546 raise errors.OpPrereqError("No guest OS specified")
4548 def _RunAllocator(self):
4549 """Run the allocator based on input opcode.
4552 nics = [n.ToDict() for n in self.nics]
4553 ial = IAllocator(self,
4554 mode=constants.IALLOCATOR_MODE_ALLOC,
4555 name=self.op.instance_name,
4556 disk_template=self.op.disk_template,
4559 vcpus=self.be_full[constants.BE_VCPUS],
4560 mem_size=self.be_full[constants.BE_MEMORY],
4563 hypervisor=self.op.hypervisor,
4566 ial.Run(self.op.iallocator)
4569 raise errors.OpPrereqError("Can't compute nodes using"
4570 " iallocator '%s': %s" % (self.op.iallocator,
4572 if len(ial.nodes) != ial.required_nodes:
4573 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4574 " of nodes (%s), required %s" %
4575 (self.op.iallocator, len(ial.nodes),
4576 ial.required_nodes))
4577 self.op.pnode = ial.nodes[0]
4578 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4579 self.op.instance_name, self.op.iallocator,
4580 ", ".join(ial.nodes))
4581 if ial.required_nodes == 2:
4582 self.op.snode = ial.nodes[1]
4584 def BuildHooksEnv(self):
4587 This runs on master, primary and secondary nodes of the instance.
4591 "ADD_MODE": self.op.mode,
4593 if self.op.mode == constants.INSTANCE_IMPORT:
4594 env["SRC_NODE"] = self.op.src_node
4595 env["SRC_PATH"] = self.op.src_path
4596 env["SRC_IMAGES"] = self.src_images
4598 env.update(_BuildInstanceHookEnv(
4599 name=self.op.instance_name,
4600 primary_node=self.op.pnode,
4601 secondary_nodes=self.secondaries,
4602 status=self.op.start,
4603 os_type=self.op.os_type,
4604 memory=self.be_full[constants.BE_MEMORY],
4605 vcpus=self.be_full[constants.BE_VCPUS],
4606 nics=_PreBuildNICHooksList(self, self.nics),
4607 disk_template=self.op.disk_template,
4608 disks=[(d["size"], d["mode"]) for d in self.disks],
4611 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4616 def CheckPrereq(self):
4617 """Check prerequisites.
4620 if (not self.cfg.GetVGName() and
4621 self.op.disk_template not in constants.DTS_NOT_LVM):
4622 raise errors.OpPrereqError("Cluster does not support lvm-based"
4625 if self.op.mode == constants.INSTANCE_IMPORT:
4626 src_node = self.op.src_node
4627 src_path = self.op.src_path
4629 if src_node is None:
4630 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4631 exp_list = self.rpc.call_export_list(locked_nodes)
4633 for node in exp_list:
4634 if exp_list[node].fail_msg:
4636 if src_path in exp_list[node].payload:
4638 self.op.src_node = src_node = node
4639 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4643 raise errors.OpPrereqError("No export found for relative path %s" %
4646 _CheckNodeOnline(self, src_node)
4647 result = self.rpc.call_export_info(src_node, src_path)
4648 result.Raise("No export or invalid export found in dir %s" % src_path)
4650 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4651 if not export_info.has_section(constants.INISECT_EXP):
4652 raise errors.ProgrammerError("Corrupted export config")
4654 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4655 if (int(ei_version) != constants.EXPORT_VERSION):
4656 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4657 (ei_version, constants.EXPORT_VERSION))
4659 # Check that the new instance doesn't have less disks than the export
4660 instance_disks = len(self.disks)
4661 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4662 if instance_disks < export_disks:
4663 raise errors.OpPrereqError("Not enough disks to import."
4664 " (instance: %d, export: %d)" %
4665 (instance_disks, export_disks))
4667 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4669 for idx in range(export_disks):
4670 option = 'disk%d_dump' % idx
4671 if export_info.has_option(constants.INISECT_INS, option):
4672 # FIXME: are the old os-es, disk sizes, etc. useful?
4673 export_name = export_info.get(constants.INISECT_INS, option)
4674 image = os.path.join(src_path, export_name)
4675 disk_images.append(image)
4677 disk_images.append(False)
4679 self.src_images = disk_images
4681 old_name = export_info.get(constants.INISECT_INS, 'name')
4682 # FIXME: int() here could throw a ValueError on broken exports
4683 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4684 if self.op.instance_name == old_name:
4685 for idx, nic in enumerate(self.nics):
4686 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4687 nic_mac_ini = 'nic%d_mac' % idx
4688 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4690 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4691 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4692 if self.op.start and not self.op.ip_check:
4693 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4694 " adding an instance in start mode")
4696 if self.op.ip_check:
4697 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4698 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4699 (self.check_ip, self.op.instance_name))
4701 #### mac address generation
4702 # By generating here the mac address both the allocator and the hooks get
4703 # the real final mac address rather than the 'auto' or 'generate' value.
4704 # There is a race condition between the generation and the instance object
4705 # creation, which means that we know the mac is valid now, but we're not
4706 # sure it will be when we actually add the instance. If things go bad
4707 # adding the instance will abort because of a duplicate mac, and the
4708 # creation job will fail.
4709 for nic in self.nics:
4710 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4711 nic.mac = self.cfg.GenerateMAC()
4715 if self.op.iallocator is not None:
4716 self._RunAllocator()
4718 #### node related checks
4720 # check primary node
4721 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4722 assert self.pnode is not None, \
4723 "Cannot retrieve locked node %s" % self.op.pnode
4725 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4728 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4731 self.secondaries = []
4733 # mirror node verification
4734 if self.op.disk_template in constants.DTS_NET_MIRROR:
4735 if self.op.snode is None:
4736 raise errors.OpPrereqError("The networked disk templates need"
4738 if self.op.snode == pnode.name:
4739 raise errors.OpPrereqError("The secondary node cannot be"
4740 " the primary node.")
4741 _CheckNodeOnline(self, self.op.snode)
4742 _CheckNodeNotDrained(self, self.op.snode)
4743 self.secondaries.append(self.op.snode)
4745 nodenames = [pnode.name] + self.secondaries
4747 req_size = _ComputeDiskSize(self.op.disk_template,
4750 # Check lv size requirements
4751 if req_size is not None:
4752 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4754 for node in nodenames:
4755 info = nodeinfo[node]
4756 info.Raise("Cannot get current information from node %s" % node)
4758 vg_free = info.get('vg_free', None)
4759 if not isinstance(vg_free, int):
4760 raise errors.OpPrereqError("Can't compute free disk space on"
4762 if req_size > vg_free:
4763 raise errors.OpPrereqError("Not enough disk space on target node %s."
4764 " %d MB available, %d MB required" %
4765 (node, vg_free, req_size))
4767 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4770 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4771 result.Raise("OS '%s' not in supported os list for primary node %s" %
4772 (self.op.os_type, pnode.name), prereq=True)
4774 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4776 # memory check on primary node
4778 _CheckNodeFreeMemory(self, self.pnode.name,
4779 "creating instance %s" % self.op.instance_name,
4780 self.be_full[constants.BE_MEMORY],
4783 def Exec(self, feedback_fn):
4784 """Create and add the instance to the cluster.
4787 instance = self.op.instance_name
4788 pnode_name = self.pnode.name
4790 ht_kind = self.op.hypervisor
4791 if ht_kind in constants.HTS_REQ_PORT:
4792 network_port = self.cfg.AllocatePort()
4796 ##if self.op.vnc_bind_address is None:
4797 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4799 # this is needed because os.path.join does not accept None arguments
4800 if self.op.file_storage_dir is None:
4801 string_file_storage_dir = ""
4803 string_file_storage_dir = self.op.file_storage_dir
4805 # build the full file storage dir path
4806 file_storage_dir = os.path.normpath(os.path.join(
4807 self.cfg.GetFileStorageDir(),
4808 string_file_storage_dir, instance))
4811 disks = _GenerateDiskTemplate(self,
4812 self.op.disk_template,
4813 instance, pnode_name,
4817 self.op.file_driver,
4820 iobj = objects.Instance(name=instance, os=self.op.os_type,
4821 primary_node=pnode_name,
4822 nics=self.nics, disks=disks,
4823 disk_template=self.op.disk_template,
4825 network_port=network_port,
4826 beparams=self.op.beparams,
4827 hvparams=self.op.hvparams,
4828 hypervisor=self.op.hypervisor,
4831 feedback_fn("* creating instance disks...")
4833 _CreateDisks(self, iobj)
4834 except errors.OpExecError:
4835 self.LogWarning("Device creation failed, reverting...")
4837 _RemoveDisks(self, iobj)
4839 self.cfg.ReleaseDRBDMinors(instance)
4842 feedback_fn("adding instance %s to cluster config" % instance)
4844 self.cfg.AddInstance(iobj)
4845 # Declare that we don't want to remove the instance lock anymore, as we've
4846 # added the instance to the config
4847 del self.remove_locks[locking.LEVEL_INSTANCE]
4848 # Unlock all the nodes
4849 if self.op.mode == constants.INSTANCE_IMPORT:
4850 nodes_keep = [self.op.src_node]
4851 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4852 if node != self.op.src_node]
4853 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4854 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4856 self.context.glm.release(locking.LEVEL_NODE)
4857 del self.acquired_locks[locking.LEVEL_NODE]
4859 if self.op.wait_for_sync:
4860 disk_abort = not _WaitForSync(self, iobj)
4861 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4862 # make sure the disks are not degraded (still sync-ing is ok)
4864 feedback_fn("* checking mirrors status")
4865 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4870 _RemoveDisks(self, iobj)
4871 self.cfg.RemoveInstance(iobj.name)
4872 # Make sure the instance lock gets removed
4873 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4874 raise errors.OpExecError("There are some degraded disks for"
4877 feedback_fn("creating os for instance %s on node %s" %
4878 (instance, pnode_name))
4880 if iobj.disk_template != constants.DT_DISKLESS:
4881 if self.op.mode == constants.INSTANCE_CREATE:
4882 feedback_fn("* running the instance OS create scripts...")
4883 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4884 result.Raise("Could not add os for instance %s"
4885 " on node %s" % (instance, pnode_name))
4887 elif self.op.mode == constants.INSTANCE_IMPORT:
4888 feedback_fn("* running the instance OS import scripts...")
4889 src_node = self.op.src_node
4890 src_images = self.src_images
4891 cluster_name = self.cfg.GetClusterName()
4892 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4893 src_node, src_images,
4895 msg = import_result.fail_msg
4897 self.LogWarning("Error while importing the disk images for instance"
4898 " %s on node %s: %s" % (instance, pnode_name, msg))
4900 # also checked in the prereq part
4901 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4905 iobj.admin_up = True
4906 self.cfg.Update(iobj)
4907 logging.info("Starting instance %s on node %s", instance, pnode_name)
4908 feedback_fn("* starting instance...")
4909 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4910 result.Raise("Could not start instance")
4913 class LUConnectConsole(NoHooksLU):
4914 """Connect to an instance's console.
4916 This is somewhat special in that it returns the command line that
4917 you need to run on the master node in order to connect to the
4921 _OP_REQP = ["instance_name"]
4924 def ExpandNames(self):
4925 self._ExpandAndLockInstance()
4927 def CheckPrereq(self):
4928 """Check prerequisites.
4930 This checks that the instance is in the cluster.
4933 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4934 assert self.instance is not None, \
4935 "Cannot retrieve locked instance %s" % self.op.instance_name
4936 _CheckNodeOnline(self, self.instance.primary_node)
4938 def Exec(self, feedback_fn):
4939 """Connect to the console of an instance
4942 instance = self.instance
4943 node = instance.primary_node
4945 node_insts = self.rpc.call_instance_list([node],
4946 [instance.hypervisor])[node]
4947 node_insts.Raise("Can't get node information from %s" % node)
4949 if instance.name not in node_insts.payload:
4950 raise errors.OpExecError("Instance %s is not running." % instance.name)
4952 logging.debug("Connecting to console of %s on %s", instance.name, node)
4954 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4955 cluster = self.cfg.GetClusterInfo()
4956 # beparams and hvparams are passed separately, to avoid editing the
4957 # instance and then saving the defaults in the instance itself.
4958 hvparams = cluster.FillHV(instance)
4959 beparams = cluster.FillBE(instance)
4960 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4963 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4966 class LUReplaceDisks(LogicalUnit):
4967 """Replace the disks of an instance.
4970 HPATH = "mirrors-replace"
4971 HTYPE = constants.HTYPE_INSTANCE
4972 _OP_REQP = ["instance_name", "mode", "disks"]
4975 def CheckArguments(self):
4976 if not hasattr(self.op, "remote_node"):
4977 self.op.remote_node = None
4978 if not hasattr(self.op, "iallocator"):
4979 self.op.iallocator = None
4981 # check for valid parameter combination
4982 cnt = [self.op.remote_node, self.op.iallocator].count(None)
4983 if self.op.mode == constants.REPLACE_DISK_CHG:
4985 raise errors.OpPrereqError("When changing the secondary either an"
4986 " iallocator script must be used or the"
4989 raise errors.OpPrereqError("Give either the iallocator or the new"
4990 " secondary, not both")
4991 else: # not replacing the secondary
4993 raise errors.OpPrereqError("The iallocator and new node options can"
4994 " be used only when changing the"
4997 def ExpandNames(self):
4998 self._ExpandAndLockInstance()
5000 if self.op.iallocator is not None:
5001 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5002 elif self.op.remote_node is not None:
5003 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5004 if remote_node is None:
5005 raise errors.OpPrereqError("Node '%s' not known" %
5006 self.op.remote_node)
5007 self.op.remote_node = remote_node
5008 # Warning: do not remove the locking of the new secondary here
5009 # unless DRBD8.AddChildren is changed to work in parallel;
5010 # currently it doesn't since parallel invocations of
5011 # FindUnusedMinor will conflict
5012 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5013 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5015 self.needed_locks[locking.LEVEL_NODE] = []
5016 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5018 def DeclareLocks(self, level):
5019 # If we're not already locking all nodes in the set we have to declare the
5020 # instance's primary/secondary nodes.
5021 if (level == locking.LEVEL_NODE and
5022 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5023 self._LockInstancesNodes()
5025 def _RunAllocator(self):
5026 """Compute a new secondary node using an IAllocator.
5029 ial = IAllocator(self,
5030 mode=constants.IALLOCATOR_MODE_RELOC,
5031 name=self.op.instance_name,
5032 relocate_from=[self.sec_node])
5034 ial.Run(self.op.iallocator)
5037 raise errors.OpPrereqError("Can't compute nodes using"
5038 " iallocator '%s': %s" % (self.op.iallocator,
5040 if len(ial.nodes) != ial.required_nodes:
5041 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5042 " of nodes (%s), required %s" %
5043 (len(ial.nodes), ial.required_nodes))
5044 self.op.remote_node = ial.nodes[0]
5045 self.LogInfo("Selected new secondary for the instance: %s",
5046 self.op.remote_node)
5048 def BuildHooksEnv(self):
5051 This runs on the master, the primary and all the secondaries.
5055 "MODE": self.op.mode,
5056 "NEW_SECONDARY": self.op.remote_node,
5057 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5059 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5061 self.cfg.GetMasterNode(),
5062 self.instance.primary_node,
5064 if self.op.remote_node is not None:
5065 nl.append(self.op.remote_node)
5068 def CheckPrereq(self):
5069 """Check prerequisites.
5071 This checks that the instance is in the cluster.
5074 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5075 assert instance is not None, \
5076 "Cannot retrieve locked instance %s" % self.op.instance_name
5077 self.instance = instance
5079 if instance.disk_template != constants.DT_DRBD8:
5080 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5083 if len(instance.secondary_nodes) != 1:
5084 raise errors.OpPrereqError("The instance has a strange layout,"
5085 " expected one secondary but found %d" %
5086 len(instance.secondary_nodes))
5088 self.sec_node = instance.secondary_nodes[0]
5090 if self.op.iallocator is not None:
5091 self._RunAllocator()
5093 remote_node = self.op.remote_node
5094 if remote_node is not None:
5095 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5096 assert self.remote_node_info is not None, \
5097 "Cannot retrieve locked node %s" % remote_node
5099 self.remote_node_info = None
5100 if remote_node == instance.primary_node:
5101 raise errors.OpPrereqError("The specified node is the primary node of"
5103 elif remote_node == self.sec_node:
5104 raise errors.OpPrereqError("The specified node is already the"
5105 " secondary node of the instance.")
5107 if self.op.mode == constants.REPLACE_DISK_PRI:
5108 n1 = self.tgt_node = instance.primary_node
5109 n2 = self.oth_node = self.sec_node
5110 elif self.op.mode == constants.REPLACE_DISK_SEC:
5111 n1 = self.tgt_node = self.sec_node
5112 n2 = self.oth_node = instance.primary_node
5113 elif self.op.mode == constants.REPLACE_DISK_CHG:
5114 n1 = self.new_node = remote_node
5115 n2 = self.oth_node = instance.primary_node
5116 self.tgt_node = self.sec_node
5117 _CheckNodeNotDrained(self, remote_node)
5119 raise errors.ProgrammerError("Unhandled disk replace mode")
5121 _CheckNodeOnline(self, n1)
5122 _CheckNodeOnline(self, n2)
5124 if not self.op.disks:
5125 self.op.disks = range(len(instance.disks))
5127 for disk_idx in self.op.disks:
5128 instance.FindDisk(disk_idx)
5130 def _ExecD8DiskOnly(self, feedback_fn):
5131 """Replace a disk on the primary or secondary for dbrd8.
5133 The algorithm for replace is quite complicated:
5135 1. for each disk to be replaced:
5137 1. create new LVs on the target node with unique names
5138 1. detach old LVs from the drbd device
5139 1. rename old LVs to name_replaced.<time_t>
5140 1. rename new LVs to old LVs
5141 1. attach the new LVs (with the old names now) to the drbd device
5143 1. wait for sync across all devices
5145 1. for each modified disk:
5147 1. remove old LVs (which have the name name_replaces.<time_t>)
5149 Failures are not very well handled.
5153 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5154 instance = self.instance
5156 vgname = self.cfg.GetVGName()
5159 tgt_node = self.tgt_node
5160 oth_node = self.oth_node
5162 # Step: check device activation
5163 self.proc.LogStep(1, steps_total, "check device existence")
5164 info("checking volume groups")
5165 my_vg = cfg.GetVGName()
5166 results = self.rpc.call_vg_list([oth_node, tgt_node])
5168 raise errors.OpExecError("Can't list volume groups on the nodes")
5169 for node in oth_node, tgt_node:
5171 res.Raise("Error checking node %s" % node)
5172 if my_vg not in res.payload:
5173 raise errors.OpExecError("Volume group '%s' not found on %s" %
5175 for idx, dev in enumerate(instance.disks):
5176 if idx not in self.op.disks:
5178 for node in tgt_node, oth_node:
5179 info("checking disk/%d on %s" % (idx, node))
5180 cfg.SetDiskID(dev, node)
5181 result = self.rpc.call_blockdev_find(node, dev)
5182 msg = result.fail_msg
5183 if not msg and not result.payload:
5184 msg = "disk not found"
5186 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5189 # Step: check other node consistency
5190 self.proc.LogStep(2, steps_total, "check peer consistency")
5191 for idx, dev in enumerate(instance.disks):
5192 if idx not in self.op.disks:
5194 info("checking disk/%d consistency on %s" % (idx, oth_node))
5195 if not _CheckDiskConsistency(self, dev, oth_node,
5196 oth_node==instance.primary_node):
5197 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5198 " to replace disks on this node (%s)" %
5199 (oth_node, tgt_node))
5201 # Step: create new storage
5202 self.proc.LogStep(3, steps_total, "allocate new storage")
5203 for idx, dev in enumerate(instance.disks):
5204 if idx not in self.op.disks:
5207 cfg.SetDiskID(dev, tgt_node)
5208 lv_names = [".disk%d_%s" % (idx, suf)
5209 for suf in ["data", "meta"]]
5210 names = _GenerateUniqueNames(self, lv_names)
5211 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5212 logical_id=(vgname, names[0]))
5213 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5214 logical_id=(vgname, names[1]))
5215 new_lvs = [lv_data, lv_meta]
5216 old_lvs = dev.children
5217 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5218 info("creating new local storage on %s for %s" %
5219 (tgt_node, dev.iv_name))
5220 # we pass force_create=True to force the LVM creation
5221 for new_lv in new_lvs:
5222 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5223 _GetInstanceInfoText(instance), False)
5225 # Step: for each lv, detach+rename*2+attach
5226 self.proc.LogStep(4, steps_total, "change drbd configuration")
5227 for dev, old_lvs, new_lvs in iv_names.itervalues():
5228 info("detaching %s drbd from local storage" % dev.iv_name)
5229 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5230 result.Raise("Can't detach drbd from local storage on node"
5231 " %s for device %s" % (tgt_node, dev.iv_name))
5233 #cfg.Update(instance)
5235 # ok, we created the new LVs, so now we know we have the needed
5236 # storage; as such, we proceed on the target node to rename
5237 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5238 # using the assumption that logical_id == physical_id (which in
5239 # turn is the unique_id on that node)
5241 # FIXME(iustin): use a better name for the replaced LVs
5242 temp_suffix = int(time.time())
5243 ren_fn = lambda d, suff: (d.physical_id[0],
5244 d.physical_id[1] + "_replaced-%s" % suff)
5245 # build the rename list based on what LVs exist on the node
5247 for to_ren in old_lvs:
5248 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5249 if not result.fail_msg and result.payload:
5251 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5253 info("renaming the old LVs on the target node")
5254 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5255 result.Raise("Can't rename old LVs on node %s" % tgt_node)
5256 # now we rename the new LVs to the old LVs
5257 info("renaming the new LVs on the target node")
5258 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5259 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5260 result.Raise("Can't rename new LVs on node %s" % tgt_node)
5262 for old, new in zip(old_lvs, new_lvs):
5263 new.logical_id = old.logical_id
5264 cfg.SetDiskID(new, tgt_node)
5266 for disk in old_lvs:
5267 disk.logical_id = ren_fn(disk, temp_suffix)
5268 cfg.SetDiskID(disk, tgt_node)
5270 # now that the new lvs have the old name, we can add them to the device
5271 info("adding new mirror component on %s" % tgt_node)
5272 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5273 msg = result.fail_msg
5275 for new_lv in new_lvs:
5276 msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
5278 warning("Can't rollback device %s: %s", dev, msg2,
5279 hint="cleanup manually the unused logical volumes")
5280 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5282 dev.children = new_lvs
5283 cfg.Update(instance)
5285 # Step: wait for sync
5287 # this can fail as the old devices are degraded and _WaitForSync
5288 # does a combined result over all disks, so we don't check its
5290 self.proc.LogStep(5, steps_total, "sync devices")
5291 _WaitForSync(self, instance, unlock=True)
5293 # so check manually all the devices
5294 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5295 cfg.SetDiskID(dev, instance.primary_node)
5296 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5297 msg = result.fail_msg
5298 if not msg and not result.payload:
5299 msg = "disk not found"
5301 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5303 if result.payload[5]:
5304 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5306 # Step: remove old storage
5307 self.proc.LogStep(6, steps_total, "removing old storage")
5308 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5309 info("remove logical volumes for %s" % name)
5311 cfg.SetDiskID(lv, tgt_node)
5312 msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
5314 warning("Can't remove old LV: %s" % msg,
5315 hint="manually remove unused LVs")
5318 def _ExecD8Secondary(self, feedback_fn):
5319 """Replace the secondary node for drbd8.
5321 The algorithm for replace is quite complicated:
5322 - for all disks of the instance:
5323 - create new LVs on the new node with same names
5324 - shutdown the drbd device on the old secondary
5325 - disconnect the drbd network on the primary
5326 - create the drbd device on the new secondary
5327 - network attach the drbd on the primary, using an artifice:
5328 the drbd code for Attach() will connect to the network if it
5329 finds a device which is connected to the good local disks but
5331 - wait for sync across all devices
5332 - remove all disks from the old secondary
5334 Failures are not very well handled.
5338 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5339 instance = self.instance
5343 old_node = self.tgt_node
5344 new_node = self.new_node
5345 pri_node = instance.primary_node
5347 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5348 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5349 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5352 # Step: check device activation
5353 self.proc.LogStep(1, steps_total, "check device existence")
5354 info("checking volume groups")
5355 my_vg = cfg.GetVGName()
5356 results = self.rpc.call_vg_list([pri_node, new_node])
5357 for node in pri_node, new_node:
5359 res.Raise("Error checking node %s" % node)
5360 if my_vg not in res.payload:
5361 raise errors.OpExecError("Volume group '%s' not found on %s" %
5363 for idx, dev in enumerate(instance.disks):
5364 if idx not in self.op.disks:
5366 info("checking disk/%d on %s" % (idx, pri_node))
5367 cfg.SetDiskID(dev, pri_node)
5368 result = self.rpc.call_blockdev_find(pri_node, dev)
5369 msg = result.fail_msg
5370 if not msg and not result.payload:
5371 msg = "disk not found"
5373 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5374 (idx, pri_node, msg))
5376 # Step: check other node consistency
5377 self.proc.LogStep(2, steps_total, "check peer consistency")
5378 for idx, dev in enumerate(instance.disks):
5379 if idx not in self.op.disks:
5381 info("checking disk/%d consistency on %s" % (idx, pri_node))
5382 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5383 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5384 " unsafe to replace the secondary" %
5387 # Step: create new storage
5388 self.proc.LogStep(3, steps_total, "allocate new storage")
5389 for idx, dev in enumerate(instance.disks):
5390 info("adding new local storage on %s for disk/%d" %
5392 # we pass force_create=True to force LVM creation
5393 for new_lv in dev.children:
5394 _CreateBlockDev(self, new_node, instance, new_lv, True,
5395 _GetInstanceInfoText(instance), False)
5397 # Step 4: dbrd minors and drbd setups changes
5398 # after this, we must manually remove the drbd minors on both the
5399 # error and the success paths
5400 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5402 logging.debug("Allocated minors %s" % (minors,))
5403 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5404 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5406 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5407 # create new devices on new_node; note that we create two IDs:
5408 # one without port, so the drbd will be activated without
5409 # networking information on the new node at this stage, and one
5410 # with network, for the latter activation in step 4
5411 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5412 if pri_node == o_node1:
5417 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5418 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5420 iv_names[idx] = (dev, dev.children, new_net_id)
5421 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5423 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5424 logical_id=new_alone_id,
5425 children=dev.children)
5427 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5428 _GetInstanceInfoText(instance), False)
5429 except errors.GenericError:
5430 self.cfg.ReleaseDRBDMinors(instance.name)
5433 for idx, dev in enumerate(instance.disks):
5434 # we have new devices, shutdown the drbd on the old secondary
5435 info("shutting down drbd for disk/%d on old node" % idx)
5436 cfg.SetDiskID(dev, old_node)
5437 msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
5439 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5441 hint="Please cleanup this device manually as soon as possible")
5443 info("detaching primary drbds from the network (=> standalone)")
5444 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5445 instance.disks)[pri_node]
5447 msg = result.fail_msg
5449 # detaches didn't succeed (unlikely)
5450 self.cfg.ReleaseDRBDMinors(instance.name)
5451 raise errors.OpExecError("Can't detach the disks from the network on"
5452 " old node: %s" % (msg,))
5454 # if we managed to detach at least one, we update all the disks of
5455 # the instance to point to the new secondary
5456 info("updating instance configuration")
5457 for dev, _, new_logical_id in iv_names.itervalues():
5458 dev.logical_id = new_logical_id
5459 cfg.SetDiskID(dev, pri_node)
5460 cfg.Update(instance)
5462 # and now perform the drbd attach
5463 info("attaching primary drbds to new secondary (standalone => connected)")
5464 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5465 instance.disks, instance.name,
5467 for to_node, to_result in result.items():
5468 msg = to_result.fail_msg
5470 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5471 hint="please do a gnt-instance info to see the"
5474 # this can fail as the old devices are degraded and _WaitForSync
5475 # does a combined result over all disks, so we don't check its
5477 self.proc.LogStep(5, steps_total, "sync devices")
5478 _WaitForSync(self, instance, unlock=True)
5480 # so check manually all the devices
5481 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5482 cfg.SetDiskID(dev, pri_node)
5483 result = self.rpc.call_blockdev_find(pri_node, dev)
5484 msg = result.fail_msg
5485 if not msg and not result.payload:
5486 msg = "disk not found"
5488 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5490 if result.payload[5]:
5491 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5493 self.proc.LogStep(6, steps_total, "removing old storage")
5494 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5495 info("remove logical volumes for disk/%d" % idx)
5497 cfg.SetDiskID(lv, old_node)
5498 msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
5500 warning("Can't remove LV on old secondary: %s", msg,
5501 hint="Cleanup stale volumes by hand")
5503 def Exec(self, feedback_fn):
5504 """Execute disk replacement.
5506 This dispatches the disk replacement to the appropriate handler.
5509 instance = self.instance
5511 # Activate the instance disks if we're replacing them on a down instance
5512 if not instance.admin_up:
5513 _StartInstanceDisks(self, instance, True)
5515 if self.op.mode == constants.REPLACE_DISK_CHG:
5516 fn = self._ExecD8Secondary
5518 fn = self._ExecD8DiskOnly
5520 ret = fn(feedback_fn)
5522 # Deactivate the instance disks if we're replacing them on a down instance
5523 if not instance.admin_up:
5524 _SafeShutdownInstanceDisks(self, instance)
5529 class LUGrowDisk(LogicalUnit):
5530 """Grow a disk of an instance.
5534 HTYPE = constants.HTYPE_INSTANCE
5535 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5538 def ExpandNames(self):
5539 self._ExpandAndLockInstance()
5540 self.needed_locks[locking.LEVEL_NODE] = []
5541 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5543 def DeclareLocks(self, level):
5544 if level == locking.LEVEL_NODE:
5545 self._LockInstancesNodes()
5547 def BuildHooksEnv(self):
5550 This runs on the master, the primary and all the secondaries.
5554 "DISK": self.op.disk,
5555 "AMOUNT": self.op.amount,
5557 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5559 self.cfg.GetMasterNode(),
5560 self.instance.primary_node,
5564 def CheckPrereq(self):
5565 """Check prerequisites.
5567 This checks that the instance is in the cluster.
5570 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5571 assert instance is not None, \
5572 "Cannot retrieve locked instance %s" % self.op.instance_name
5573 nodenames = list(instance.all_nodes)
5574 for node in nodenames:
5575 _CheckNodeOnline(self, node)
5578 self.instance = instance
5580 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5581 raise errors.OpPrereqError("Instance's disk layout does not support"
5584 self.disk = instance.FindDisk(self.op.disk)
5586 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5587 instance.hypervisor)
5588 for node in nodenames:
5589 info = nodeinfo[node]
5590 info.Raise("Cannot get current information from node %s" % node)
5591 vg_free = info.payload.get('vg_free', None)
5592 if not isinstance(vg_free, int):
5593 raise errors.OpPrereqError("Can't compute free disk space on"
5595 if self.op.amount > vg_free:
5596 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5597 " %d MiB available, %d MiB required" %
5598 (node, vg_free, self.op.amount))
5600 def Exec(self, feedback_fn):
5601 """Execute disk grow.
5604 instance = self.instance
5606 for node in instance.all_nodes:
5607 self.cfg.SetDiskID(disk, node)
5608 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5609 result.Raise("Grow request failed to node %s" % node)
5610 disk.RecordGrow(self.op.amount)
5611 self.cfg.Update(instance)
5612 if self.op.wait_for_sync:
5613 disk_abort = not _WaitForSync(self, instance)
5615 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5616 " status.\nPlease check the instance.")
5619 class LUQueryInstanceData(NoHooksLU):
5620 """Query runtime instance data.
5623 _OP_REQP = ["instances", "static"]
5626 def ExpandNames(self):
5627 self.needed_locks = {}
5628 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5630 if not isinstance(self.op.instances, list):
5631 raise errors.OpPrereqError("Invalid argument type 'instances'")
5633 if self.op.instances:
5634 self.wanted_names = []
5635 for name in self.op.instances:
5636 full_name = self.cfg.ExpandInstanceName(name)
5637 if full_name is None:
5638 raise errors.OpPrereqError("Instance '%s' not known" % name)
5639 self.wanted_names.append(full_name)
5640 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5642 self.wanted_names = None
5643 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5645 self.needed_locks[locking.LEVEL_NODE] = []
5646 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5648 def DeclareLocks(self, level):
5649 if level == locking.LEVEL_NODE:
5650 self._LockInstancesNodes()
5652 def CheckPrereq(self):
5653 """Check prerequisites.
5655 This only checks the optional instance list against the existing names.
5658 if self.wanted_names is None:
5659 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5661 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5662 in self.wanted_names]
5665 def _ComputeDiskStatus(self, instance, snode, dev):
5666 """Compute block device status.
5669 static = self.op.static
5671 self.cfg.SetDiskID(dev, instance.primary_node)
5672 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5673 if dev_pstatus.offline:
5676 dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5677 dev_pstatus = dev_pstatus.payload
5681 if dev.dev_type in constants.LDS_DRBD:
5682 # we change the snode then (otherwise we use the one passed in)
5683 if dev.logical_id[0] == instance.primary_node:
5684 snode = dev.logical_id[1]
5686 snode = dev.logical_id[0]
5688 if snode and not static:
5689 self.cfg.SetDiskID(dev, snode)
5690 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5691 if dev_sstatus.offline:
5694 dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5695 dev_sstatus = dev_sstatus.payload
5700 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5701 for child in dev.children]
5706 "iv_name": dev.iv_name,
5707 "dev_type": dev.dev_type,
5708 "logical_id": dev.logical_id,
5709 "physical_id": dev.physical_id,
5710 "pstatus": dev_pstatus,
5711 "sstatus": dev_sstatus,
5712 "children": dev_children,
5718 def Exec(self, feedback_fn):
5719 """Gather and return data"""
5722 cluster = self.cfg.GetClusterInfo()
5724 for instance in self.wanted_instances:
5725 if not self.op.static:
5726 remote_info = self.rpc.call_instance_info(instance.primary_node,
5728 instance.hypervisor)
5729 remote_info.Raise("Error checking node %s" % instance.primary_node)
5730 remote_info = remote_info.payload
5731 if remote_info and "state" in remote_info:
5734 remote_state = "down"
5737 if instance.admin_up:
5740 config_state = "down"
5742 disks = [self._ComputeDiskStatus(instance, None, device)
5743 for device in instance.disks]
5746 "name": instance.name,
5747 "config_state": config_state,
5748 "run_state": remote_state,
5749 "pnode": instance.primary_node,
5750 "snodes": instance.secondary_nodes,
5752 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5754 "hypervisor": instance.hypervisor,
5755 "network_port": instance.network_port,
5756 "hv_instance": instance.hvparams,
5757 "hv_actual": cluster.FillHV(instance),
5758 "be_instance": instance.beparams,
5759 "be_actual": cluster.FillBE(instance),
5762 result[instance.name] = idict
5767 class LUSetInstanceParams(LogicalUnit):
5768 """Modifies an instances's parameters.
5771 HPATH = "instance-modify"
5772 HTYPE = constants.HTYPE_INSTANCE
5773 _OP_REQP = ["instance_name"]
5776 def CheckArguments(self):
5777 if not hasattr(self.op, 'nics'):
5779 if not hasattr(self.op, 'disks'):
5781 if not hasattr(self.op, 'beparams'):
5782 self.op.beparams = {}
5783 if not hasattr(self.op, 'hvparams'):
5784 self.op.hvparams = {}
5785 self.op.force = getattr(self.op, "force", False)
5786 if not (self.op.nics or self.op.disks or
5787 self.op.hvparams or self.op.beparams):
5788 raise errors.OpPrereqError("No changes submitted")
5792 for disk_op, disk_dict in self.op.disks:
5793 if disk_op == constants.DDM_REMOVE:
5796 elif disk_op == constants.DDM_ADD:
5799 if not isinstance(disk_op, int):
5800 raise errors.OpPrereqError("Invalid disk index")
5801 if disk_op == constants.DDM_ADD:
5802 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5803 if mode not in constants.DISK_ACCESS_SET:
5804 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5805 size = disk_dict.get('size', None)
5807 raise errors.OpPrereqError("Required disk parameter size missing")
5810 except ValueError, err:
5811 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5813 disk_dict['size'] = size
5815 # modification of disk
5816 if 'size' in disk_dict:
5817 raise errors.OpPrereqError("Disk size change not possible, use"
5820 if disk_addremove > 1:
5821 raise errors.OpPrereqError("Only one disk add or remove operation"
5822 " supported at a time")
5826 for nic_op, nic_dict in self.op.nics:
5827 if nic_op == constants.DDM_REMOVE:
5830 elif nic_op == constants.DDM_ADD:
5833 if not isinstance(nic_op, int):
5834 raise errors.OpPrereqError("Invalid nic index")
5836 # nic_dict should be a dict
5837 nic_ip = nic_dict.get('ip', None)
5838 if nic_ip is not None:
5839 if nic_ip.lower() == constants.VALUE_NONE:
5840 nic_dict['ip'] = None
5842 if not utils.IsValidIP(nic_ip):
5843 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5845 nic_bridge = nic_dict.get('bridge', None)
5846 nic_link = nic_dict.get('link', None)
5847 if nic_bridge and nic_link:
5848 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5849 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5850 nic_dict['bridge'] = None
5851 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5852 nic_dict['link'] = None
5854 if nic_op == constants.DDM_ADD:
5855 nic_mac = nic_dict.get('mac', None)
5857 nic_dict['mac'] = constants.VALUE_AUTO
5859 if 'mac' in nic_dict:
5860 nic_mac = nic_dict['mac']
5861 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5862 if not utils.IsValidMac(nic_mac):
5863 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5864 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5865 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5866 " modifying an existing nic")
5868 if nic_addremove > 1:
5869 raise errors.OpPrereqError("Only one NIC add or remove operation"
5870 " supported at a time")
5872 def ExpandNames(self):
5873 self._ExpandAndLockInstance()
5874 self.needed_locks[locking.LEVEL_NODE] = []
5875 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5877 def DeclareLocks(self, level):
5878 if level == locking.LEVEL_NODE:
5879 self._LockInstancesNodes()
5881 def BuildHooksEnv(self):
5884 This runs on the master, primary and secondaries.
5888 if constants.BE_MEMORY in self.be_new:
5889 args['memory'] = self.be_new[constants.BE_MEMORY]
5890 if constants.BE_VCPUS in self.be_new:
5891 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5892 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5893 # information at all.
5896 nic_override = dict(self.op.nics)
5897 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
5898 for idx, nic in enumerate(self.instance.nics):
5899 if idx in nic_override:
5900 this_nic_override = nic_override[idx]
5902 this_nic_override = {}
5903 if 'ip' in this_nic_override:
5904 ip = this_nic_override['ip']
5907 if 'mac' in this_nic_override:
5908 mac = this_nic_override['mac']
5911 if idx in self.nic_pnew:
5912 nicparams = self.nic_pnew[idx]
5914 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
5915 mode = nicparams[constants.NIC_MODE]
5916 link = nicparams[constants.NIC_LINK]
5917 args['nics'].append((ip, mac, mode, link))
5918 if constants.DDM_ADD in nic_override:
5919 ip = nic_override[constants.DDM_ADD].get('ip', None)
5920 mac = nic_override[constants.DDM_ADD]['mac']
5921 nicparams = self.nic_pnew[constants.DDM_ADD]
5922 mode = nicparams[constants.NIC_MODE]
5923 link = nicparams[constants.NIC_LINK]
5924 args['nics'].append((ip, mac, mode, link))
5925 elif constants.DDM_REMOVE in nic_override:
5926 del args['nics'][-1]
5928 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5929 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5932 def _GetUpdatedParams(self, old_params, update_dict,
5933 default_values, parameter_types):
5934 """Return the new params dict for the given params.
5936 @type old_params: dict
5937 @type old_params: old parameters
5938 @type update_dict: dict
5939 @type update_dict: dict containing new parameter values,
5940 or constants.VALUE_DEFAULT to reset the
5941 parameter to its default value
5942 @type default_values: dict
5943 @param default_values: default values for the filled parameters
5944 @type parameter_types: dict
5945 @param parameter_types: dict mapping target dict keys to types
5946 in constants.ENFORCEABLE_TYPES
5947 @rtype: (dict, dict)
5948 @return: (new_parameters, filled_parameters)
5951 params_copy = copy.deepcopy(old_params)
5952 for key, val in update_dict.iteritems():
5953 if val == constants.VALUE_DEFAULT:
5955 del params_copy[key]
5959 params_copy[key] = val
5960 utils.ForceDictType(params_copy, parameter_types)
5961 params_filled = objects.FillDict(default_values, params_copy)
5962 return (params_copy, params_filled)
5964 def CheckPrereq(self):
5965 """Check prerequisites.
5967 This only checks the instance list against the existing names.
5970 force = self.force = self.op.force
5972 # checking the new params on the primary/secondary nodes
5974 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5975 cluster = self.cluster = self.cfg.GetClusterInfo()
5976 assert self.instance is not None, \
5977 "Cannot retrieve locked instance %s" % self.op.instance_name
5978 pnode = instance.primary_node
5979 nodelist = list(instance.all_nodes)
5981 # hvparams processing
5982 if self.op.hvparams:
5983 i_hvdict, hv_new = self._GetUpdatedParams(
5984 instance.hvparams, self.op.hvparams,
5985 cluster.hvparams[instance.hypervisor],
5986 constants.HVS_PARAMETER_TYPES)
5988 hypervisor.GetHypervisor(
5989 instance.hypervisor).CheckParameterSyntax(hv_new)
5990 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5991 self.hv_new = hv_new # the new actual values
5992 self.hv_inst = i_hvdict # the new dict (without defaults)
5994 self.hv_new = self.hv_inst = {}
5996 # beparams processing
5997 if self.op.beparams:
5998 i_bedict, be_new = self._GetUpdatedParams(
5999 instance.beparams, self.op.beparams,
6000 cluster.beparams[constants.PP_DEFAULT],
6001 constants.BES_PARAMETER_TYPES)
6002 self.be_new = be_new # the new actual values
6003 self.be_inst = i_bedict # the new dict (without defaults)
6005 self.be_new = self.be_inst = {}
6009 if constants.BE_MEMORY in self.op.beparams and not self.force:
6010 mem_check_list = [pnode]
6011 if be_new[constants.BE_AUTO_BALANCE]:
6012 # either we changed auto_balance to yes or it was from before
6013 mem_check_list.extend(instance.secondary_nodes)
6014 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6015 instance.hypervisor)
6016 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6017 instance.hypervisor)
6018 pninfo = nodeinfo[pnode]
6019 msg = pninfo.fail_msg
6021 # Assume the primary node is unreachable and go ahead
6022 self.warn.append("Can't get info from primary node %s: %s" %
6024 elif not isinstance(pninfo.payload.get('memory_free', None), int):
6025 self.warn.append("Node data from primary node %s doesn't contain"
6026 " free memory information" % pnode)
6027 elif instance_info.fail_msg:
6028 self.warn.append("Can't get instance runtime information: %s" %
6029 instance_info.fail_msg)
6031 if instance_info.payload:
6032 current_mem = int(instance_info.payload['memory'])
6034 # Assume instance not running
6035 # (there is a slight race condition here, but it's not very probable,
6036 # and we have no other way to check)
6038 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6039 pninfo.payload['memory_free'])
6041 raise errors.OpPrereqError("This change will prevent the instance"
6042 " from starting, due to %d MB of memory"
6043 " missing on its primary node" % miss_mem)
6045 if be_new[constants.BE_AUTO_BALANCE]:
6046 for node, nres in nodeinfo.items():
6047 if node not in instance.secondary_nodes:
6051 self.warn.append("Can't get info from secondary node %s: %s" %
6053 elif not isinstance(nres.payload.get('memory_free', None), int):
6054 self.warn.append("Secondary node %s didn't return free"
6055 " memory information" % node)
6056 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6057 self.warn.append("Not enough memory to failover instance to"
6058 " secondary node %s" % node)
6063 for nic_op, nic_dict in self.op.nics:
6064 if nic_op == constants.DDM_REMOVE:
6065 if not instance.nics:
6066 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6068 if nic_op != constants.DDM_ADD:
6070 if nic_op < 0 or nic_op >= len(instance.nics):
6071 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6073 (nic_op, len(instance.nics)))
6074 old_nic_params = instance.nics[nic_op].nicparams
6075 old_nic_ip = instance.nics[nic_op].ip
6080 update_params_dict = dict([(key, nic_dict[key])
6081 for key in constants.NICS_PARAMETERS
6082 if key in nic_dict])
6084 if 'bridge' in nic_dict:
6085 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6087 new_nic_params, new_filled_nic_params = \
6088 self._GetUpdatedParams(old_nic_params, update_params_dict,
6089 cluster.nicparams[constants.PP_DEFAULT],
6090 constants.NICS_PARAMETER_TYPES)
6091 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6092 self.nic_pinst[nic_op] = new_nic_params
6093 self.nic_pnew[nic_op] = new_filled_nic_params
6094 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6096 if new_nic_mode == constants.NIC_MODE_BRIDGED:
6097 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6098 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6100 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6102 self.warn.append(msg)
6104 raise errors.OpPrereqError(msg)
6105 if new_nic_mode == constants.NIC_MODE_ROUTED:
6106 if 'ip' in nic_dict:
6107 nic_ip = nic_dict['ip']
6111 raise errors.OpPrereqError('Cannot set the nic ip to None'
6113 if 'mac' in nic_dict:
6114 nic_mac = nic_dict['mac']
6116 raise errors.OpPrereqError('Cannot set the nic mac to None')
6117 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6118 # otherwise generate the mac
6119 nic_dict['mac'] = self.cfg.GenerateMAC()
6121 # or validate/reserve the current one
6122 if self.cfg.IsMacInUse(nic_mac):
6123 raise errors.OpPrereqError("MAC address %s already in use"
6124 " in cluster" % nic_mac)
6127 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6128 raise errors.OpPrereqError("Disk operations not supported for"
6129 " diskless instances")
6130 for disk_op, disk_dict in self.op.disks:
6131 if disk_op == constants.DDM_REMOVE:
6132 if len(instance.disks) == 1:
6133 raise errors.OpPrereqError("Cannot remove the last disk of"
6135 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6136 ins_l = ins_l[pnode]
6137 msg = ins_l.fail_msg
6139 raise errors.OpPrereqError("Can't contact node %s: %s" %
6141 if instance.name in ins_l.payload:
6142 raise errors.OpPrereqError("Instance is running, can't remove"
6145 if (disk_op == constants.DDM_ADD and
6146 len(instance.nics) >= constants.MAX_DISKS):
6147 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6148 " add more" % constants.MAX_DISKS)
6149 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6151 if disk_op < 0 or disk_op >= len(instance.disks):
6152 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6154 (disk_op, len(instance.disks)))
6158 def Exec(self, feedback_fn):
6159 """Modifies an instance.
6161 All parameters take effect only at the next restart of the instance.
6164 # Process here the warnings from CheckPrereq, as we don't have a
6165 # feedback_fn there.
6166 for warn in self.warn:
6167 feedback_fn("WARNING: %s" % warn)
6170 instance = self.instance
6171 cluster = self.cluster
6173 for disk_op, disk_dict in self.op.disks:
6174 if disk_op == constants.DDM_REMOVE:
6175 # remove the last disk
6176 device = instance.disks.pop()
6177 device_idx = len(instance.disks)
6178 for node, disk in device.ComputeNodeTree(instance.primary_node):
6179 self.cfg.SetDiskID(disk, node)
6180 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6182 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6183 " continuing anyway", device_idx, node, msg)
6184 result.append(("disk/%d" % device_idx, "remove"))
6185 elif disk_op == constants.DDM_ADD:
6187 if instance.disk_template == constants.DT_FILE:
6188 file_driver, file_path = instance.disks[0].logical_id
6189 file_path = os.path.dirname(file_path)
6191 file_driver = file_path = None
6192 disk_idx_base = len(instance.disks)
6193 new_disk = _GenerateDiskTemplate(self,
6194 instance.disk_template,
6195 instance.name, instance.primary_node,
6196 instance.secondary_nodes,
6201 instance.disks.append(new_disk)
6202 info = _GetInstanceInfoText(instance)
6204 logging.info("Creating volume %s for instance %s",
6205 new_disk.iv_name, instance.name)
6206 # Note: this needs to be kept in sync with _CreateDisks
6208 for node in instance.all_nodes:
6209 f_create = node == instance.primary_node
6211 _CreateBlockDev(self, node, instance, new_disk,
6212 f_create, info, f_create)
6213 except errors.OpExecError, err:
6214 self.LogWarning("Failed to create volume %s (%s) on"
6216 new_disk.iv_name, new_disk, node, err)
6217 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6218 (new_disk.size, new_disk.mode)))
6220 # change a given disk
6221 instance.disks[disk_op].mode = disk_dict['mode']
6222 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6224 for nic_op, nic_dict in self.op.nics:
6225 if nic_op == constants.DDM_REMOVE:
6226 # remove the last nic
6227 del instance.nics[-1]
6228 result.append(("nic.%d" % len(instance.nics), "remove"))
6229 elif nic_op == constants.DDM_ADD:
6230 # mac and bridge should be set, by now
6231 mac = nic_dict['mac']
6232 ip = nic_dict.get('ip', None)
6233 nicparams = self.nic_pinst[constants.DDM_ADD]
6234 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6235 instance.nics.append(new_nic)
6236 result.append(("nic.%d" % (len(instance.nics) - 1),
6237 "add:mac=%s,ip=%s,mode=%s,link=%s" %
6238 (new_nic.mac, new_nic.ip,
6239 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6240 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6243 for key in 'mac', 'ip':
6245 setattr(instance.nics[nic_op], key, nic_dict[key])
6246 if nic_op in self.nic_pnew:
6247 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6248 for key, val in nic_dict.iteritems():
6249 result.append(("nic.%s/%d" % (key, nic_op), val))
6252 if self.op.hvparams:
6253 instance.hvparams = self.hv_inst
6254 for key, val in self.op.hvparams.iteritems():
6255 result.append(("hv/%s" % key, val))
6258 if self.op.beparams:
6259 instance.beparams = self.be_inst
6260 for key, val in self.op.beparams.iteritems():
6261 result.append(("be/%s" % key, val))
6263 self.cfg.Update(instance)
6268 class LUQueryExports(NoHooksLU):
6269 """Query the exports list
6272 _OP_REQP = ['nodes']
6275 def ExpandNames(self):
6276 self.needed_locks = {}
6277 self.share_locks[locking.LEVEL_NODE] = 1
6278 if not self.op.nodes:
6279 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6281 self.needed_locks[locking.LEVEL_NODE] = \
6282 _GetWantedNodes(self, self.op.nodes)
6284 def CheckPrereq(self):
6285 """Check prerequisites.
6288 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6290 def Exec(self, feedback_fn):
6291 """Compute the list of all the exported system images.
6294 @return: a dictionary with the structure node->(export-list)
6295 where export-list is a list of the instances exported on
6299 rpcresult = self.rpc.call_export_list(self.nodes)
6301 for node in rpcresult:
6302 if rpcresult[node].fail_msg:
6303 result[node] = False
6305 result[node] = rpcresult[node].payload
6310 class LUExportInstance(LogicalUnit):
6311 """Export an instance to an image in the cluster.
6314 HPATH = "instance-export"
6315 HTYPE = constants.HTYPE_INSTANCE
6316 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6319 def ExpandNames(self):
6320 self._ExpandAndLockInstance()
6321 # FIXME: lock only instance primary and destination node
6323 # Sad but true, for now we have do lock all nodes, as we don't know where
6324 # the previous export might be, and and in this LU we search for it and
6325 # remove it from its current node. In the future we could fix this by:
6326 # - making a tasklet to search (share-lock all), then create the new one,
6327 # then one to remove, after
6328 # - removing the removal operation altoghether
6329 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6331 def DeclareLocks(self, level):
6332 """Last minute lock declaration."""
6333 # All nodes are locked anyway, so nothing to do here.
6335 def BuildHooksEnv(self):
6338 This will run on the master, primary node and target node.
6342 "EXPORT_NODE": self.op.target_node,
6343 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6345 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6346 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6347 self.op.target_node]
6350 def CheckPrereq(self):
6351 """Check prerequisites.
6353 This checks that the instance and node names are valid.
6356 instance_name = self.op.instance_name
6357 self.instance = self.cfg.GetInstanceInfo(instance_name)
6358 assert self.instance is not None, \
6359 "Cannot retrieve locked instance %s" % self.op.instance_name
6360 _CheckNodeOnline(self, self.instance.primary_node)
6362 self.dst_node = self.cfg.GetNodeInfo(
6363 self.cfg.ExpandNodeName(self.op.target_node))
6365 if self.dst_node is None:
6366 # This is wrong node name, not a non-locked node
6367 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6368 _CheckNodeOnline(self, self.dst_node.name)
6369 _CheckNodeNotDrained(self, self.dst_node.name)
6371 # instance disk type verification
6372 for disk in self.instance.disks:
6373 if disk.dev_type == constants.LD_FILE:
6374 raise errors.OpPrereqError("Export not supported for instances with"
6375 " file-based disks")
6377 def Exec(self, feedback_fn):
6378 """Export an instance to an image in the cluster.
6381 instance = self.instance
6382 dst_node = self.dst_node
6383 src_node = instance.primary_node
6384 if self.op.shutdown:
6385 # shutdown the instance, but not the disks
6386 result = self.rpc.call_instance_shutdown(src_node, instance)
6387 result.Raise("Could not shutdown instance %s on"
6388 " node %s" % (instance.name, src_node))
6390 vgname = self.cfg.GetVGName()
6394 # set the disks ID correctly since call_instance_start needs the
6395 # correct drbd minor to create the symlinks
6396 for disk in instance.disks:
6397 self.cfg.SetDiskID(disk, src_node)
6400 for disk in instance.disks:
6401 # result.payload will be a snapshot of an lvm leaf of the one we passed
6402 result = self.rpc.call_blockdev_snapshot(src_node, disk)
6403 msg = result.fail_msg
6405 self.LogWarning("Could not snapshot block device %s on node %s: %s",
6406 disk.logical_id[1], src_node, msg)
6407 snap_disks.append(False)
6409 disk_id = (vgname, result.payload)
6410 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6411 logical_id=disk_id, physical_id=disk_id,
6412 iv_name=disk.iv_name)
6413 snap_disks.append(new_dev)
6416 if self.op.shutdown and instance.admin_up:
6417 result = self.rpc.call_instance_start(src_node, instance, None, None)
6418 msg = result.fail_msg
6420 _ShutdownInstanceDisks(self, instance)
6421 raise errors.OpExecError("Could not start instance: %s" % msg)
6423 # TODO: check for size
6425 cluster_name = self.cfg.GetClusterName()
6426 for idx, dev in enumerate(snap_disks):
6428 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6429 instance, cluster_name, idx)
6430 msg = result.fail_msg
6432 self.LogWarning("Could not export block device %s from node %s to"
6433 " node %s: %s", dev.logical_id[1], src_node,
6435 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6437 self.LogWarning("Could not remove snapshot block device %s from node"
6438 " %s: %s", dev.logical_id[1], src_node, msg)
6440 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6441 msg = result.fail_msg
6443 self.LogWarning("Could not finalize export for instance %s"
6444 " on node %s: %s", instance.name, dst_node.name, msg)
6446 nodelist = self.cfg.GetNodeList()
6447 nodelist.remove(dst_node.name)
6449 # on one-node clusters nodelist will be empty after the removal
6450 # if we proceed the backup would be removed because OpQueryExports
6451 # substitutes an empty list with the full cluster node list.
6452 iname = instance.name
6454 exportlist = self.rpc.call_export_list(nodelist)
6455 for node in exportlist:
6456 if exportlist[node].fail_msg:
6458 if iname in exportlist[node].payload:
6459 msg = self.rpc.call_export_remove(node, iname).fail_msg
6461 self.LogWarning("Could not remove older export for instance %s"
6462 " on node %s: %s", iname, node, msg)
6465 class LURemoveExport(NoHooksLU):
6466 """Remove exports related to the named instance.
6469 _OP_REQP = ["instance_name"]
6472 def ExpandNames(self):
6473 self.needed_locks = {}
6474 # We need all nodes to be locked in order for RemoveExport to work, but we
6475 # don't need to lock the instance itself, as nothing will happen to it (and
6476 # we can remove exports also for a removed instance)
6477 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6479 def CheckPrereq(self):
6480 """Check prerequisites.
6484 def Exec(self, feedback_fn):
6485 """Remove any export.
6488 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6489 # If the instance was not found we'll try with the name that was passed in.
6490 # This will only work if it was an FQDN, though.
6492 if not instance_name:
6494 instance_name = self.op.instance_name
6496 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6497 exportlist = self.rpc.call_export_list(locked_nodes)
6499 for node in exportlist:
6500 msg = exportlist[node].fail_msg
6502 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6504 if instance_name in exportlist[node].payload:
6506 result = self.rpc.call_export_remove(node, instance_name)
6507 msg = result.fail_msg
6509 logging.error("Could not remove export for instance %s"
6510 " on node %s: %s", instance_name, node, msg)
6512 if fqdn_warn and not found:
6513 feedback_fn("Export not found. If trying to remove an export belonging"
6514 " to a deleted instance please use its Fully Qualified"
6518 class TagsLU(NoHooksLU):
6521 This is an abstract class which is the parent of all the other tags LUs.
6525 def ExpandNames(self):
6526 self.needed_locks = {}
6527 if self.op.kind == constants.TAG_NODE:
6528 name = self.cfg.ExpandNodeName(self.op.name)
6530 raise errors.OpPrereqError("Invalid node name (%s)" %
6533 self.needed_locks[locking.LEVEL_NODE] = name
6534 elif self.op.kind == constants.TAG_INSTANCE:
6535 name = self.cfg.ExpandInstanceName(self.op.name)
6537 raise errors.OpPrereqError("Invalid instance name (%s)" %
6540 self.needed_locks[locking.LEVEL_INSTANCE] = name
6542 def CheckPrereq(self):
6543 """Check prerequisites.
6546 if self.op.kind == constants.TAG_CLUSTER:
6547 self.target = self.cfg.GetClusterInfo()
6548 elif self.op.kind == constants.TAG_NODE:
6549 self.target = self.cfg.GetNodeInfo(self.op.name)
6550 elif self.op.kind == constants.TAG_INSTANCE:
6551 self.target = self.cfg.GetInstanceInfo(self.op.name)
6553 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6557 class LUGetTags(TagsLU):
6558 """Returns the tags of a given object.
6561 _OP_REQP = ["kind", "name"]
6564 def Exec(self, feedback_fn):
6565 """Returns the tag list.
6568 return list(self.target.GetTags())
6571 class LUSearchTags(NoHooksLU):
6572 """Searches the tags for a given pattern.
6575 _OP_REQP = ["pattern"]
6578 def ExpandNames(self):
6579 self.needed_locks = {}
6581 def CheckPrereq(self):
6582 """Check prerequisites.
6584 This checks the pattern passed for validity by compiling it.
6588 self.re = re.compile(self.op.pattern)
6589 except re.error, err:
6590 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6591 (self.op.pattern, err))
6593 def Exec(self, feedback_fn):
6594 """Returns the tag list.
6598 tgts = [("/cluster", cfg.GetClusterInfo())]
6599 ilist = cfg.GetAllInstancesInfo().values()
6600 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6601 nlist = cfg.GetAllNodesInfo().values()
6602 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6604 for path, target in tgts:
6605 for tag in target.GetTags():
6606 if self.re.search(tag):
6607 results.append((path, tag))
6611 class LUAddTags(TagsLU):
6612 """Sets a tag on a given object.
6615 _OP_REQP = ["kind", "name", "tags"]
6618 def CheckPrereq(self):
6619 """Check prerequisites.
6621 This checks the type and length of the tag name and value.
6624 TagsLU.CheckPrereq(self)
6625 for tag in self.op.tags:
6626 objects.TaggableObject.ValidateTag(tag)
6628 def Exec(self, feedback_fn):
6633 for tag in self.op.tags:
6634 self.target.AddTag(tag)
6635 except errors.TagError, err:
6636 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6638 self.cfg.Update(self.target)
6639 except errors.ConfigurationError:
6640 raise errors.OpRetryError("There has been a modification to the"
6641 " config file and the operation has been"
6642 " aborted. Please retry.")
6645 class LUDelTags(TagsLU):
6646 """Delete a list of tags from a given object.
6649 _OP_REQP = ["kind", "name", "tags"]
6652 def CheckPrereq(self):
6653 """Check prerequisites.
6655 This checks that we have the given tag.
6658 TagsLU.CheckPrereq(self)
6659 for tag in self.op.tags:
6660 objects.TaggableObject.ValidateTag(tag)
6661 del_tags = frozenset(self.op.tags)
6662 cur_tags = self.target.GetTags()
6663 if not del_tags <= cur_tags:
6664 diff_tags = del_tags - cur_tags
6665 diff_names = ["'%s'" % tag for tag in diff_tags]
6667 raise errors.OpPrereqError("Tag(s) %s not found" %
6668 (",".join(diff_names)))
6670 def Exec(self, feedback_fn):
6671 """Remove the tag from the object.
6674 for tag in self.op.tags:
6675 self.target.RemoveTag(tag)
6677 self.cfg.Update(self.target)
6678 except errors.ConfigurationError:
6679 raise errors.OpRetryError("There has been a modification to the"
6680 " config file and the operation has been"
6681 " aborted. Please retry.")
6684 class LUTestDelay(NoHooksLU):
6685 """Sleep for a specified amount of time.
6687 This LU sleeps on the master and/or nodes for a specified amount of
6691 _OP_REQP = ["duration", "on_master", "on_nodes"]
6694 def ExpandNames(self):
6695 """Expand names and set required locks.
6697 This expands the node list, if any.
6700 self.needed_locks = {}
6701 if self.op.on_nodes:
6702 # _GetWantedNodes can be used here, but is not always appropriate to use
6703 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6705 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6706 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6708 def CheckPrereq(self):
6709 """Check prerequisites.
6713 def Exec(self, feedback_fn):
6714 """Do the actual sleep.
6717 if self.op.on_master:
6718 if not utils.TestDelay(self.op.duration):
6719 raise errors.OpExecError("Error during master delay test")
6720 if self.op.on_nodes:
6721 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6722 for node, node_result in result.items():
6723 node_result.Raise("Failure during rpc call to node %s" % node)
6726 class IAllocator(object):
6727 """IAllocator framework.
6729 An IAllocator instance has three sets of attributes:
6730 - cfg that is needed to query the cluster
6731 - input data (all members of the _KEYS class attribute are required)
6732 - four buffer attributes (in|out_data|text), that represent the
6733 input (to the external script) in text and data structure format,
6734 and the output from it, again in two formats
6735 - the result variables from the script (success, info, nodes) for
6740 "mem_size", "disks", "disk_template",
6741 "os", "tags", "nics", "vcpus", "hypervisor",
6747 def __init__(self, lu, mode, name, **kwargs):
6749 # init buffer variables
6750 self.in_text = self.out_text = self.in_data = self.out_data = None
6751 # init all input fields so that pylint is happy
6754 self.mem_size = self.disks = self.disk_template = None
6755 self.os = self.tags = self.nics = self.vcpus = None
6756 self.hypervisor = None
6757 self.relocate_from = None
6759 self.required_nodes = None
6760 # init result fields
6761 self.success = self.info = self.nodes = None
6762 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6763 keyset = self._ALLO_KEYS
6764 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6765 keyset = self._RELO_KEYS
6767 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6768 " IAllocator" % self.mode)
6770 if key not in keyset:
6771 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6772 " IAllocator" % key)
6773 setattr(self, key, kwargs[key])
6775 if key not in kwargs:
6776 raise errors.ProgrammerError("Missing input parameter '%s' to"
6777 " IAllocator" % key)
6778 self._BuildInputData()
6780 def _ComputeClusterData(self):
6781 """Compute the generic allocator input data.
6783 This is the data that is independent of the actual operation.
6787 cluster_info = cfg.GetClusterInfo()
6790 "version": constants.IALLOCATOR_VERSION,
6791 "cluster_name": cfg.GetClusterName(),
6792 "cluster_tags": list(cluster_info.GetTags()),
6793 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6794 # we don't have job IDs
6796 iinfo = cfg.GetAllInstancesInfo().values()
6797 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6801 node_list = cfg.GetNodeList()
6803 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6804 hypervisor_name = self.hypervisor
6805 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6806 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6808 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6810 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6811 cluster_info.enabled_hypervisors)
6812 for nname, nresult in node_data.items():
6813 # first fill in static (config-based) values
6814 ninfo = cfg.GetNodeInfo(nname)
6816 "tags": list(ninfo.GetTags()),
6817 "primary_ip": ninfo.primary_ip,
6818 "secondary_ip": ninfo.secondary_ip,
6819 "offline": ninfo.offline,
6820 "drained": ninfo.drained,
6821 "master_candidate": ninfo.master_candidate,
6824 if not ninfo.offline:
6825 nresult.Raise("Can't get data for node %s" % nname)
6826 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
6828 remote_info = nresult.payload
6829 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6830 'vg_size', 'vg_free', 'cpu_total']:
6831 if attr not in remote_info:
6832 raise errors.OpExecError("Node '%s' didn't return attribute"
6833 " '%s'" % (nname, attr))
6834 if not isinstance(remote_info[attr], int):
6835 raise errors.OpExecError("Node '%s' returned invalid value"
6837 (nname, attr, remote_info[attr]))
6838 # compute memory used by primary instances
6839 i_p_mem = i_p_up_mem = 0
6840 for iinfo, beinfo in i_list:
6841 if iinfo.primary_node == nname:
6842 i_p_mem += beinfo[constants.BE_MEMORY]
6843 if iinfo.name not in node_iinfo[nname].payload:
6846 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6847 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6848 remote_info['memory_free'] -= max(0, i_mem_diff)
6851 i_p_up_mem += beinfo[constants.BE_MEMORY]
6853 # compute memory used by instances
6855 "total_memory": remote_info['memory_total'],
6856 "reserved_memory": remote_info['memory_dom0'],
6857 "free_memory": remote_info['memory_free'],
6858 "total_disk": remote_info['vg_size'],
6859 "free_disk": remote_info['vg_free'],
6860 "total_cpus": remote_info['cpu_total'],
6861 "i_pri_memory": i_p_mem,
6862 "i_pri_up_memory": i_p_up_mem,
6866 node_results[nname] = pnr
6867 data["nodes"] = node_results
6871 for iinfo, beinfo in i_list:
6873 for nic in iinfo.nics:
6874 filled_params = objects.FillDict(
6875 cluster_info.nicparams[constants.PP_DEFAULT],
6877 nic_dict = {"mac": nic.mac,
6879 "mode": filled_params[constants.NIC_MODE],
6880 "link": filled_params[constants.NIC_LINK],
6882 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6883 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6884 nic_data.append(nic_dict)
6886 "tags": list(iinfo.GetTags()),
6887 "admin_up": iinfo.admin_up,
6888 "vcpus": beinfo[constants.BE_VCPUS],
6889 "memory": beinfo[constants.BE_MEMORY],
6891 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6893 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6894 "disk_template": iinfo.disk_template,
6895 "hypervisor": iinfo.hypervisor,
6897 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6899 instance_data[iinfo.name] = pir
6901 data["instances"] = instance_data
6905 def _AddNewInstance(self):
6906 """Add new instance data to allocator structure.
6908 This in combination with _AllocatorGetClusterData will create the
6909 correct structure needed as input for the allocator.
6911 The checks for the completeness of the opcode must have already been
6917 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6919 if self.disk_template in constants.DTS_NET_MIRROR:
6920 self.required_nodes = 2
6922 self.required_nodes = 1
6926 "disk_template": self.disk_template,
6929 "vcpus": self.vcpus,
6930 "memory": self.mem_size,
6931 "disks": self.disks,
6932 "disk_space_total": disk_space,
6934 "required_nodes": self.required_nodes,
6936 data["request"] = request
6938 def _AddRelocateInstance(self):
6939 """Add relocate instance data to allocator structure.
6941 This in combination with _IAllocatorGetClusterData will create the
6942 correct structure needed as input for the allocator.
6944 The checks for the completeness of the opcode must have already been
6948 instance = self.lu.cfg.GetInstanceInfo(self.name)
6949 if instance is None:
6950 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6951 " IAllocator" % self.name)
6953 if instance.disk_template not in constants.DTS_NET_MIRROR:
6954 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6956 if len(instance.secondary_nodes) != 1:
6957 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6959 self.required_nodes = 1
6960 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6961 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6966 "disk_space_total": disk_space,
6967 "required_nodes": self.required_nodes,
6968 "relocate_from": self.relocate_from,
6970 self.in_data["request"] = request
6972 def _BuildInputData(self):
6973 """Build input data structures.
6976 self._ComputeClusterData()
6978 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6979 self._AddNewInstance()
6981 self._AddRelocateInstance()
6983 self.in_text = serializer.Dump(self.in_data)
6985 def Run(self, name, validate=True, call_fn=None):
6986 """Run an instance allocator and return the results.
6990 call_fn = self.lu.rpc.call_iallocator_runner
6993 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6994 result.Raise("Failure while running the iallocator script")
6996 self.out_text = result.payload
6998 self._ValidateResult()
7000 def _ValidateResult(self):
7001 """Process the allocator results.
7003 This will process and if successful save the result in
7004 self.out_data and the other parameters.
7008 rdict = serializer.Load(self.out_text)
7009 except Exception, err:
7010 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7012 if not isinstance(rdict, dict):
7013 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7015 for key in "success", "info", "nodes":
7016 if key not in rdict:
7017 raise errors.OpExecError("Can't parse iallocator results:"
7018 " missing key '%s'" % key)
7019 setattr(self, key, rdict[key])
7021 if not isinstance(rdict["nodes"], list):
7022 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7024 self.out_data = rdict
7027 class LUTestAllocator(NoHooksLU):
7028 """Run allocator tests.
7030 This LU runs the allocator tests
7033 _OP_REQP = ["direction", "mode", "name"]
7035 def CheckPrereq(self):
7036 """Check prerequisites.
7038 This checks the opcode parameters depending on the director and mode test.
7041 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7042 for attr in ["name", "mem_size", "disks", "disk_template",
7043 "os", "tags", "nics", "vcpus"]:
7044 if not hasattr(self.op, attr):
7045 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7047 iname = self.cfg.ExpandInstanceName(self.op.name)
7048 if iname is not None:
7049 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7051 if not isinstance(self.op.nics, list):
7052 raise errors.OpPrereqError("Invalid parameter 'nics'")
7053 for row in self.op.nics:
7054 if (not isinstance(row, dict) or
7057 "bridge" not in row):
7058 raise errors.OpPrereqError("Invalid contents of the"
7059 " 'nics' parameter")
7060 if not isinstance(self.op.disks, list):
7061 raise errors.OpPrereqError("Invalid parameter 'disks'")
7062 for row in self.op.disks:
7063 if (not isinstance(row, dict) or
7064 "size" not in row or
7065 not isinstance(row["size"], int) or
7066 "mode" not in row or
7067 row["mode"] not in ['r', 'w']):
7068 raise errors.OpPrereqError("Invalid contents of the"
7069 " 'disks' parameter")
7070 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7071 self.op.hypervisor = self.cfg.GetHypervisorType()
7072 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7073 if not hasattr(self.op, "name"):
7074 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7075 fname = self.cfg.ExpandInstanceName(self.op.name)
7077 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7079 self.op.name = fname
7080 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7082 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7085 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7086 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7087 raise errors.OpPrereqError("Missing allocator name")
7088 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7089 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7092 def Exec(self, feedback_fn):
7093 """Run the allocator test.
7096 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7097 ial = IAllocator(self,
7100 mem_size=self.op.mem_size,
7101 disks=self.op.disks,
7102 disk_template=self.op.disk_template,
7106 vcpus=self.op.vcpus,
7107 hypervisor=self.op.hypervisor,
7110 ial = IAllocator(self,
7113 relocate_from=list(self.relocate_from),
7116 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7117 result = ial.in_text
7119 ial.Run(self.op.allocator, validate=False)
7120 result = ial.out_text