4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 for attr_name in self._OP_REQP:
94 attr_val = getattr(op, attr_name, None)
96 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108 ssh = property(fget=__GetSSH)
110 def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments.
113 This method is for doing a simple syntactic check and ensure
114 validity of opcode parameters, without any cluster-related
115 checks. While the same can be accomplished in ExpandNames and/or
116 CheckPrereq, doing these separate is better because:
118 - ExpandNames is left as as purely a lock-related function
119 - CheckPrereq is run after we have aquired locks (and possible
122 The function is allowed to change the self.op attribute so that
123 later methods can no longer worry about missing parameters.
128 def ExpandNames(self):
129 """Expand names for this LU.
131 This method is called before starting to execute the opcode, and it should
132 update all the parameters of the opcode to their canonical form (e.g. a
133 short node name must be fully expanded after this method has successfully
134 completed). This way locking, hooks, logging, ecc. can work correctly.
136 LUs which implement this method must also populate the self.needed_locks
137 member, as a dict with lock levels as keys, and a list of needed lock names
140 - use an empty dict if you don't need any lock
141 - if you don't need any lock at a particular level omit that level
142 - don't put anything for the BGL level
143 - if you want all locks at a level use locking.ALL_SET as a value
145 If you need to share locks (rather than acquire them exclusively) at one
146 level you can modify self.share_locks, setting a true value (usually 1) for
147 that level. By default locks are not shared.
151 # Acquire all nodes and one instance
152 self.needed_locks = {
153 locking.LEVEL_NODE: locking.ALL_SET,
154 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156 # Acquire just two nodes
157 self.needed_locks = {
158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161 self.needed_locks = {} # No, you can't leave it to the default value None
164 # The implementation of this method is mandatory only if the new LU is
165 # concurrent, so that old LUs don't need to be changed all at the same
168 self.needed_locks = {} # Exclusive LUs don't need locks.
170 raise NotImplementedError
172 def DeclareLocks(self, level):
173 """Declare LU locking needs for a level
175 While most LUs can just declare their locking needs at ExpandNames time,
176 sometimes there's the need to calculate some locks after having acquired
177 the ones before. This function is called just before acquiring locks at a
178 particular level, but after acquiring the ones at lower levels, and permits
179 such calculations. It can be used to modify self.needed_locks, and by
180 default it does nothing.
182 This function is only called if you have something already set in
183 self.needed_locks for the level.
185 @param level: Locking level which is going to be locked
186 @type level: member of ganeti.locking.LEVELS
190 def CheckPrereq(self):
191 """Check prerequisites for this LU.
193 This method should check that the prerequisites for the execution
194 of this LU are fulfilled. It can do internode communication, but
195 it should be idempotent - no cluster or system changes are
198 The method should raise errors.OpPrereqError in case something is
199 not fulfilled. Its return value is ignored.
201 This method should also update all the parameters of the opcode to
202 their canonical form if it hasn't been done by ExpandNames before.
205 raise NotImplementedError
207 def Exec(self, feedback_fn):
210 This method should implement the actual work. It should raise
211 errors.OpExecError for failures that are somewhat dealt with in
215 raise NotImplementedError
217 def BuildHooksEnv(self):
218 """Build hooks environment for this LU.
220 This method should return a three-node tuple consisting of: a dict
221 containing the environment that will be used for running the
222 specific hook for this LU, a list of node names on which the hook
223 should run before the execution, and a list of node names on which
224 the hook should run after the execution.
226 The keys of the dict must not have 'GANETI_' prefixed as this will
227 be handled in the hooks runner. Also note additional keys will be
228 added by the hooks runner. If the LU doesn't define any
229 environment, an empty dict (and not None) should be returned.
231 No nodes should be returned as an empty list (and not None).
233 Note that if the HPATH for a LU class is None, this function will
237 raise NotImplementedError
239 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks.
242 This method is called every time a hooks phase is executed, and notifies
243 the Logical Unit about the hooks' result. The LU can then use it to alter
244 its result based on the hooks. By default the method does nothing and the
245 previous result is passed back unchanged but any LU can define it if it
246 wants to use the local cluster hook-scripts somehow.
248 @param phase: one of L{constants.HOOKS_PHASE_POST} or
249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250 @param hook_results: the results of the multi-node hooks rpc call
251 @param feedback_fn: function used send feedback back to the caller
252 @param lu_result: the previous Exec result this LU had, or None
254 @return: the new Exec result, based on the previous result
260 def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance.
263 Many LUs that work on an instance take its name in self.op.instance_name
264 and need to expand it and then declare the expanded name for locking. This
265 function does it, and then updates self.op.instance_name to the expanded
266 name. It also initializes needed_locks as a dict, if this hasn't been done
270 if self.needed_locks is None:
271 self.needed_locks = {}
273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274 "_ExpandAndLockInstance called with instance-level locks set"
275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276 if expanded_name is None:
277 raise errors.OpPrereqError("Instance '%s' not known" %
278 self.op.instance_name)
279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280 self.op.instance_name = expanded_name
282 def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking.
285 This function should be called after locking one or more instances to lock
286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287 with all primary or secondary nodes for instances already locked and
288 present in self.needed_locks[locking.LEVEL_INSTANCE].
290 It should be called from DeclareLocks, and for safety only works if
291 self.recalculate_locks[locking.LEVEL_NODE] is set.
293 In the future it may grow parameters to just lock some instance's nodes, or
294 to just lock primaries or secondary nodes, if needed.
296 If should be called in DeclareLocks in a way similar to::
298 if level == locking.LEVEL_NODE:
299 self._LockInstancesNodes()
301 @type primary_only: boolean
302 @param primary_only: only lock primary nodes of locked instances
305 assert locking.LEVEL_NODE in self.recalculate_locks, \
306 "_LockInstancesNodes helper function called with no nodes to recalculate"
308 # TODO: check if we're really been called with the instance locks held
310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311 # future we might want to have different behaviors depending on the value
312 # of self.recalculate_locks[locking.LEVEL_NODE]
314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315 instance = self.context.cfg.GetInstanceInfo(instance_name)
316 wanted_nodes.append(instance.primary_node)
318 wanted_nodes.extend(instance.secondary_nodes)
320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325 del self.recalculate_locks[locking.LEVEL_NODE]
328 class NoHooksLU(LogicalUnit):
329 """Simple LU which runs no hooks.
331 This LU is intended as a parent for other LogicalUnits which will
332 run no hooks, in order to reduce duplicate code.
339 def _GetWantedNodes(lu, nodes):
340 """Returns list of checked and expanded node names.
342 @type lu: L{LogicalUnit}
343 @param lu: the logical unit on whose behalf we execute
345 @param nodes: list of node names or None for all nodes
347 @return: the list of nodes, sorted
348 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351 if not isinstance(nodes, list):
352 raise errors.OpPrereqError("Invalid argument type 'nodes'")
355 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356 " non-empty list of nodes whose name is to be expanded.")
360 node = lu.cfg.ExpandNodeName(name)
362 raise errors.OpPrereqError("No such node name '%s'" % name)
365 return utils.NiceSort(wanted)
368 def _GetWantedInstances(lu, instances):
369 """Returns list of checked and expanded instance names.
371 @type lu: L{LogicalUnit}
372 @param lu: the logical unit on whose behalf we execute
373 @type instances: list
374 @param instances: list of instance names or None for all instances
376 @return: the list of instances, sorted
377 @raise errors.OpPrereqError: if the instances parameter is wrong type
378 @raise errors.OpPrereqError: if any of the passed instances is not found
381 if not isinstance(instances, list):
382 raise errors.OpPrereqError("Invalid argument type 'instances'")
387 for name in instances:
388 instance = lu.cfg.ExpandInstanceName(name)
390 raise errors.OpPrereqError("No such instance name '%s'" % name)
391 wanted.append(instance)
394 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398 def _CheckOutputFields(static, dynamic, selected):
399 """Checks whether all selected fields are valid.
401 @type static: L{utils.FieldSet}
402 @param static: static fields set
403 @type dynamic: L{utils.FieldSet}
404 @param dynamic: dynamic fields set
411 delta = f.NonMatching(selected)
413 raise errors.OpPrereqError("Unknown output fields selected: %s"
417 def _CheckBooleanOpField(op, name):
418 """Validates boolean opcode parameters.
420 This will ensure that an opcode parameter is either a boolean value,
421 or None (but that it always exists).
424 val = getattr(op, name, None)
425 if not (val is None or isinstance(val, bool)):
426 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428 setattr(op, name, val)
431 def _CheckNodeOnline(lu, node):
432 """Ensure that a given node is online.
434 @param lu: the LU on behalf of which we make the check
435 @param node: the node to check
436 @raise errors.OpPrereqError: if the node is offline
439 if lu.cfg.GetNodeInfo(node).offline:
440 raise errors.OpPrereqError("Can't use offline node %s" % node)
443 def _CheckNodeNotDrained(lu, node):
444 """Ensure that a given node is not drained.
446 @param lu: the LU on behalf of which we make the check
447 @param node: the node to check
448 @raise errors.OpPrereqError: if the node is drained
451 if lu.cfg.GetNodeInfo(node).drained:
452 raise errors.OpPrereqError("Can't use drained node %s" % node)
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456 memory, vcpus, nics, disk_template, disks,
457 bep, hvp, hypervisor):
458 """Builds instance related env variables for hooks
460 This builds the hook environment from individual variables.
463 @param name: the name of the instance
464 @type primary_node: string
465 @param primary_node: the name of the instance's primary node
466 @type secondary_nodes: list
467 @param secondary_nodes: list of secondary nodes as strings
468 @type os_type: string
469 @param os_type: the name of the instance's OS
470 @type status: boolean
471 @param status: the should_run status of the instance
473 @param memory: the memory size of the instance
475 @param vcpus: the count of VCPUs the instance has
477 @param nics: list of tuples (ip, bridge, mac) representing
478 the NICs the instance has
479 @type disk_template: string
480 @param disk_template: the distk template of the instance
482 @param disks: the list of (size, mode) pairs
484 @param bep: the backend parameters for the instance
486 @param hvp: the hypervisor parameters for the instance
487 @type hypervisor: string
488 @param hypervisor: the hypervisor for the instance
490 @return: the hook environment for this instance
499 "INSTANCE_NAME": name,
500 "INSTANCE_PRIMARY": primary_node,
501 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502 "INSTANCE_OS_TYPE": os_type,
503 "INSTANCE_STATUS": str_status,
504 "INSTANCE_MEMORY": memory,
505 "INSTANCE_VCPUS": vcpus,
506 "INSTANCE_DISK_TEMPLATE": disk_template,
507 "INSTANCE_HYPERVISOR": hypervisor,
511 nic_count = len(nics)
512 for idx, (ip, mac, mode, link) in enumerate(nics):
515 env["INSTANCE_NIC%d_IP" % idx] = ip
516 env["INSTANCE_NIC%d_MAC" % idx] = mac
517 env["INSTANCE_NIC%d_MODE" % idx] = mode
518 env["INSTANCE_NIC%d_LINK" % idx] = link
519 if mode == constants.NIC_MODE_BRIDGED:
520 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
524 env["INSTANCE_NIC_COUNT"] = nic_count
527 disk_count = len(disks)
528 for idx, (size, mode) in enumerate(disks):
529 env["INSTANCE_DISK%d_SIZE" % idx] = size
530 env["INSTANCE_DISK%d_MODE" % idx] = mode
534 env["INSTANCE_DISK_COUNT"] = disk_count
536 for source, kind in [(bep, "BE"), (hvp, "HV")]:
537 for key, value in source.items():
538 env["INSTANCE_%s_%s" % (kind, key)] = value
542 def _PreBuildNICHooksList(lu, nics):
543 """Build a list of nic information tuples.
545 This list is suitable to be passed to _BuildInstanceHookEnv.
547 @type lu: L{LogicalUnit}
548 @param lu: the logical unit on whose behalf we execute
549 @type nics: list of L{objects.NIC}
550 @param nics: list of nics to convert to hooks tuples
554 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
558 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
559 mode = filled_params[constants.NIC_MODE]
560 link = filled_params[constants.NIC_LINK]
561 hooks_nics.append((ip, mac, mode, link))
564 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
565 """Builds instance related env variables for hooks from an object.
567 @type lu: L{LogicalUnit}
568 @param lu: the logical unit on whose behalf we execute
569 @type instance: L{objects.Instance}
570 @param instance: the instance for which we should build the
573 @param override: dictionary with key/values that will override
576 @return: the hook environment dictionary
579 cluster = lu.cfg.GetClusterInfo()
580 bep = cluster.FillBE(instance)
581 hvp = cluster.FillHV(instance)
583 'name': instance.name,
584 'primary_node': instance.primary_node,
585 'secondary_nodes': instance.secondary_nodes,
586 'os_type': instance.os,
587 'status': instance.admin_up,
588 'memory': bep[constants.BE_MEMORY],
589 'vcpus': bep[constants.BE_VCPUS],
590 'nics': _PreBuildNICHooksList(lu, instance.nics),
591 'disk_template': instance.disk_template,
592 'disks': [(disk.size, disk.mode) for disk in instance.disks],
595 'hypervisor': instance.hypervisor,
598 args.update(override)
599 return _BuildInstanceHookEnv(**args)
602 def _AdjustCandidatePool(lu):
603 """Adjust the candidate pool after node operations.
606 mod_list = lu.cfg.MaintainCandidatePool()
608 lu.LogInfo("Promoted nodes to master candidate role: %s",
609 ", ".join(node.name for node in mod_list))
610 for name in mod_list:
611 lu.context.ReaddNode(name)
612 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
614 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
618 def _CheckNicsBridgesExist(lu, target_nics, target_node,
619 profile=constants.PP_DEFAULT):
620 """Check that the brigdes needed by a list of nics exist.
623 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
624 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
625 for nic in target_nics]
626 brlist = [params[constants.NIC_LINK] for params in paramslist
627 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
629 result = lu.rpc.call_bridges_exist(target_node, brlist)
630 result.Raise("Error checking bridges on destination node '%s'" %
631 target_node, prereq=True)
634 def _CheckInstanceBridgesExist(lu, instance, node=None):
635 """Check that the brigdes needed by an instance exist.
639 node=instance.primary_node
640 _CheckNicsBridgesExist(lu, instance.nics, node)
643 class LUDestroyCluster(NoHooksLU):
644 """Logical unit for destroying the cluster.
649 def CheckPrereq(self):
650 """Check prerequisites.
652 This checks whether the cluster is empty.
654 Any errors are signalled by raising errors.OpPrereqError.
657 master = self.cfg.GetMasterNode()
659 nodelist = self.cfg.GetNodeList()
660 if len(nodelist) != 1 or nodelist[0] != master:
661 raise errors.OpPrereqError("There are still %d node(s) in"
662 " this cluster." % (len(nodelist) - 1))
663 instancelist = self.cfg.GetInstanceList()
665 raise errors.OpPrereqError("There are still %d instance(s) in"
666 " this cluster." % len(instancelist))
668 def Exec(self, feedback_fn):
669 """Destroys the cluster.
672 master = self.cfg.GetMasterNode()
673 result = self.rpc.call_node_stop_master(master, False)
674 result.Raise("Could not disable the master role")
675 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
676 utils.CreateBackup(priv_key)
677 utils.CreateBackup(pub_key)
681 class LUVerifyCluster(LogicalUnit):
682 """Verifies the cluster status.
685 HPATH = "cluster-verify"
686 HTYPE = constants.HTYPE_CLUSTER
687 _OP_REQP = ["skip_checks"]
690 def ExpandNames(self):
691 self.needed_locks = {
692 locking.LEVEL_NODE: locking.ALL_SET,
693 locking.LEVEL_INSTANCE: locking.ALL_SET,
695 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
697 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
698 node_result, feedback_fn, master_files,
700 """Run multiple tests against a node.
704 - compares ganeti version
705 - checks vg existance and size > 20G
706 - checks config file checksum
707 - checks ssh to other nodes
709 @type nodeinfo: L{objects.Node}
710 @param nodeinfo: the node to check
711 @param file_list: required list of files
712 @param local_cksum: dictionary of local files and their checksums
713 @param node_result: the results from the node
714 @param feedback_fn: function used to accumulate results
715 @param master_files: list of files that only masters should have
716 @param drbd_map: the useddrbd minors for this node, in
717 form of minor: (instance, must_exist) which correspond to instances
718 and their running status
719 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
724 # main result, node_result should be a non-empty dict
725 if not node_result or not isinstance(node_result, dict):
726 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
729 # compares ganeti version
730 local_version = constants.PROTOCOL_VERSION
731 remote_version = node_result.get('version', None)
732 if not (remote_version and isinstance(remote_version, (list, tuple)) and
733 len(remote_version) == 2):
734 feedback_fn(" - ERROR: connection to %s failed" % (node))
737 if local_version != remote_version[0]:
738 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
739 " node %s %s" % (local_version, node, remote_version[0]))
742 # node seems compatible, we can actually try to look into its results
746 # full package version
747 if constants.RELEASE_VERSION != remote_version[1]:
748 feedback_fn(" - WARNING: software version mismatch: master %s,"
750 (constants.RELEASE_VERSION, node, remote_version[1]))
752 # checks vg existence and size > 20G
753 if vg_name is not None:
754 vglist = node_result.get(constants.NV_VGLIST, None)
756 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
760 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
761 constants.MIN_VG_SIZE)
763 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
766 # checks config file checksum
768 remote_cksum = node_result.get(constants.NV_FILELIST, None)
769 if not isinstance(remote_cksum, dict):
771 feedback_fn(" - ERROR: node hasn't returned file checksum data")
773 for file_name in file_list:
774 node_is_mc = nodeinfo.master_candidate
775 must_have_file = file_name not in master_files
776 if file_name not in remote_cksum:
777 if node_is_mc or must_have_file:
779 feedback_fn(" - ERROR: file '%s' missing" % file_name)
780 elif remote_cksum[file_name] != local_cksum[file_name]:
781 if node_is_mc or must_have_file:
783 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
785 # not candidate and this is not a must-have file
787 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
790 # all good, except non-master/non-must have combination
791 if not node_is_mc and not must_have_file:
792 feedback_fn(" - ERROR: file '%s' should not exist on non master"
793 " candidates" % file_name)
797 if constants.NV_NODELIST not in node_result:
799 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
801 if node_result[constants.NV_NODELIST]:
803 for node in node_result[constants.NV_NODELIST]:
804 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
805 (node, node_result[constants.NV_NODELIST][node]))
807 if constants.NV_NODENETTEST not in node_result:
809 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
811 if node_result[constants.NV_NODENETTEST]:
813 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
815 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
816 (node, node_result[constants.NV_NODENETTEST][node]))
818 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
819 if isinstance(hyp_result, dict):
820 for hv_name, hv_result in hyp_result.iteritems():
821 if hv_result is not None:
822 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
823 (hv_name, hv_result))
825 # check used drbd list
826 if vg_name is not None:
827 used_minors = node_result.get(constants.NV_DRBDLIST, [])
828 if not isinstance(used_minors, (tuple, list)):
829 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
832 for minor, (iname, must_exist) in drbd_map.items():
833 if minor not in used_minors and must_exist:
834 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
835 " not active" % (minor, iname))
837 for minor in used_minors:
838 if minor not in drbd_map:
839 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
845 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
846 node_instance, feedback_fn, n_offline):
847 """Verify an instance.
849 This function checks to see if the required block devices are
850 available on the instance's node.
855 node_current = instanceconfig.primary_node
858 instanceconfig.MapLVsByNode(node_vol_should)
860 for node in node_vol_should:
861 if node in n_offline:
862 # ignore missing volumes on offline nodes
864 for volume in node_vol_should[node]:
865 if node not in node_vol_is or volume not in node_vol_is[node]:
866 feedback_fn(" - ERROR: volume %s missing on node %s" %
870 if instanceconfig.admin_up:
871 if ((node_current not in node_instance or
872 not instance in node_instance[node_current]) and
873 node_current not in n_offline):
874 feedback_fn(" - ERROR: instance %s not running on node %s" %
875 (instance, node_current))
878 for node in node_instance:
879 if (not node == node_current):
880 if instance in node_instance[node]:
881 feedback_fn(" - ERROR: instance %s should not run on node %s" %
887 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
888 """Verify if there are any unknown volumes in the cluster.
890 The .os, .swap and backup volumes are ignored. All other volumes are
896 for node in node_vol_is:
897 for volume in node_vol_is[node]:
898 if node not in node_vol_should or volume not in node_vol_should[node]:
899 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
904 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
905 """Verify the list of running instances.
907 This checks what instances are running but unknown to the cluster.
911 for node in node_instance:
912 for runninginstance in node_instance[node]:
913 if runninginstance not in instancelist:
914 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
915 (runninginstance, node))
919 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
920 """Verify N+1 Memory Resilience.
922 Check that if one single node dies we can still start all the instances it
928 for node, nodeinfo in node_info.iteritems():
929 # This code checks that every node which is now listed as secondary has
930 # enough memory to host all instances it is supposed to should a single
931 # other node in the cluster fail.
932 # FIXME: not ready for failover to an arbitrary node
933 # FIXME: does not support file-backed instances
934 # WARNING: we currently take into account down instances as well as up
935 # ones, considering that even if they're down someone might want to start
936 # them even in the event of a node failure.
937 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
939 for instance in instances:
940 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
941 if bep[constants.BE_AUTO_BALANCE]:
942 needed_mem += bep[constants.BE_MEMORY]
943 if nodeinfo['mfree'] < needed_mem:
944 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
945 " failovers should node %s fail" % (node, prinode))
949 def CheckPrereq(self):
950 """Check prerequisites.
952 Transform the list of checks we're going to skip into a set and check that
953 all its members are valid.
956 self.skip_set = frozenset(self.op.skip_checks)
957 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
958 raise errors.OpPrereqError("Invalid checks to be skipped specified")
960 def BuildHooksEnv(self):
963 Cluster-Verify hooks just rone in the post phase and their failure makes
964 the output be logged in the verify output and the verification to fail.
967 all_nodes = self.cfg.GetNodeList()
969 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
971 for node in self.cfg.GetAllNodesInfo().values():
972 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
974 return env, [], all_nodes
976 def Exec(self, feedback_fn):
977 """Verify integrity of cluster, performing various test on nodes.
981 feedback_fn("* Verifying global settings")
982 for msg in self.cfg.VerifyConfig():
983 feedback_fn(" - ERROR: %s" % msg)
985 vg_name = self.cfg.GetVGName()
986 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
987 nodelist = utils.NiceSort(self.cfg.GetNodeList())
988 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
989 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
990 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
991 for iname in instancelist)
992 i_non_redundant = [] # Non redundant instances
993 i_non_a_balanced = [] # Non auto-balanced instances
994 n_offline = [] # List of offline nodes
995 n_drained = [] # List of nodes being drained
1001 # FIXME: verify OS list
1002 # do local checksums
1003 master_files = [constants.CLUSTER_CONF_FILE]
1005 file_names = ssconf.SimpleStore().GetFileList()
1006 file_names.append(constants.SSL_CERT_FILE)
1007 file_names.append(constants.RAPI_CERT_FILE)
1008 file_names.extend(master_files)
1010 local_checksums = utils.FingerprintFiles(file_names)
1012 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1013 node_verify_param = {
1014 constants.NV_FILELIST: file_names,
1015 constants.NV_NODELIST: [node.name for node in nodeinfo
1016 if not node.offline],
1017 constants.NV_HYPERVISOR: hypervisors,
1018 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1019 node.secondary_ip) for node in nodeinfo
1020 if not node.offline],
1021 constants.NV_INSTANCELIST: hypervisors,
1022 constants.NV_VERSION: None,
1023 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1025 if vg_name is not None:
1026 node_verify_param[constants.NV_VGLIST] = None
1027 node_verify_param[constants.NV_LVLIST] = vg_name
1028 node_verify_param[constants.NV_DRBDLIST] = None
1029 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1030 self.cfg.GetClusterName())
1032 cluster = self.cfg.GetClusterInfo()
1033 master_node = self.cfg.GetMasterNode()
1034 all_drbd_map = self.cfg.ComputeDRBDMap()
1036 for node_i in nodeinfo:
1040 feedback_fn("* Skipping offline node %s" % (node,))
1041 n_offline.append(node)
1044 if node == master_node:
1046 elif node_i.master_candidate:
1047 ntype = "master candidate"
1048 elif node_i.drained:
1050 n_drained.append(node)
1053 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1055 msg = all_nvinfo[node].fail_msg
1057 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1061 nresult = all_nvinfo[node].payload
1063 for minor, instance in all_drbd_map[node].items():
1064 if instance not in instanceinfo:
1065 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1067 # ghost instance should not be running, but otherwise we
1068 # don't give double warnings (both ghost instance and
1069 # unallocated minor in use)
1070 node_drbd[minor] = (instance, False)
1072 instance = instanceinfo[instance]
1073 node_drbd[minor] = (instance.name, instance.admin_up)
1074 result = self._VerifyNode(node_i, file_names, local_checksums,
1075 nresult, feedback_fn, master_files,
1079 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1081 node_volume[node] = {}
1082 elif isinstance(lvdata, basestring):
1083 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1084 (node, utils.SafeEncode(lvdata)))
1086 node_volume[node] = {}
1087 elif not isinstance(lvdata, dict):
1088 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1092 node_volume[node] = lvdata
1095 idata = nresult.get(constants.NV_INSTANCELIST, None)
1096 if not isinstance(idata, list):
1097 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1102 node_instance[node] = idata
1105 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1106 if not isinstance(nodeinfo, dict):
1107 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1113 "mfree": int(nodeinfo['memory_free']),
1116 # dictionary holding all instances this node is secondary for,
1117 # grouped by their primary node. Each key is a cluster node, and each
1118 # value is a list of instances which have the key as primary and the
1119 # current node as secondary. this is handy to calculate N+1 memory
1120 # availability if you can only failover from a primary to its
1122 "sinst-by-pnode": {},
1124 # FIXME: devise a free space model for file based instances as well
1125 if vg_name is not None:
1126 if (constants.NV_VGLIST not in nresult or
1127 vg_name not in nresult[constants.NV_VGLIST]):
1128 feedback_fn(" - ERROR: node %s didn't return data for the"
1129 " volume group '%s' - it is either missing or broken" %
1133 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1134 except (ValueError, KeyError):
1135 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1136 " from node %s" % (node,))
1140 node_vol_should = {}
1142 for instance in instancelist:
1143 feedback_fn("* Verifying instance %s" % instance)
1144 inst_config = instanceinfo[instance]
1145 result = self._VerifyInstance(instance, inst_config, node_volume,
1146 node_instance, feedback_fn, n_offline)
1148 inst_nodes_offline = []
1150 inst_config.MapLVsByNode(node_vol_should)
1152 instance_cfg[instance] = inst_config
1154 pnode = inst_config.primary_node
1155 if pnode in node_info:
1156 node_info[pnode]['pinst'].append(instance)
1157 elif pnode not in n_offline:
1158 feedback_fn(" - ERROR: instance %s, connection to primary node"
1159 " %s failed" % (instance, pnode))
1162 if pnode in n_offline:
1163 inst_nodes_offline.append(pnode)
1165 # If the instance is non-redundant we cannot survive losing its primary
1166 # node, so we are not N+1 compliant. On the other hand we have no disk
1167 # templates with more than one secondary so that situation is not well
1169 # FIXME: does not support file-backed instances
1170 if len(inst_config.secondary_nodes) == 0:
1171 i_non_redundant.append(instance)
1172 elif len(inst_config.secondary_nodes) > 1:
1173 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1176 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1177 i_non_a_balanced.append(instance)
1179 for snode in inst_config.secondary_nodes:
1180 if snode in node_info:
1181 node_info[snode]['sinst'].append(instance)
1182 if pnode not in node_info[snode]['sinst-by-pnode']:
1183 node_info[snode]['sinst-by-pnode'][pnode] = []
1184 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1185 elif snode not in n_offline:
1186 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1187 " %s failed" % (instance, snode))
1189 if snode in n_offline:
1190 inst_nodes_offline.append(snode)
1192 if inst_nodes_offline:
1193 # warn that the instance lives on offline nodes, and set bad=True
1194 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1195 ", ".join(inst_nodes_offline))
1198 feedback_fn("* Verifying orphan volumes")
1199 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1203 feedback_fn("* Verifying remaining instances")
1204 result = self._VerifyOrphanInstances(instancelist, node_instance,
1208 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1209 feedback_fn("* Verifying N+1 Memory redundancy")
1210 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1213 feedback_fn("* Other Notes")
1215 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1216 % len(i_non_redundant))
1218 if i_non_a_balanced:
1219 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1220 % len(i_non_a_balanced))
1223 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1226 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1230 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1231 """Analize the post-hooks' result
1233 This method analyses the hook result, handles it, and sends some
1234 nicely-formatted feedback back to the user.
1236 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1237 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1238 @param hooks_results: the results of the multi-node hooks rpc call
1239 @param feedback_fn: function used send feedback back to the caller
1240 @param lu_result: previous Exec result
1241 @return: the new Exec result, based on the previous result
1245 # We only really run POST phase hooks, and are only interested in
1247 if phase == constants.HOOKS_PHASE_POST:
1248 # Used to change hooks' output to proper indentation
1249 indent_re = re.compile('^', re.M)
1250 feedback_fn("* Hooks Results")
1251 if not hooks_results:
1252 feedback_fn(" - ERROR: general communication failure")
1255 for node_name in hooks_results:
1256 show_node_header = True
1257 res = hooks_results[node_name]
1261 # no need to warn or set fail return value
1263 feedback_fn(" Communication failure in hooks execution: %s" %
1267 for script, hkr, output in res.payload:
1268 if hkr == constants.HKR_FAIL:
1269 # The node header is only shown once, if there are
1270 # failing hooks on that node
1271 if show_node_header:
1272 feedback_fn(" Node %s:" % node_name)
1273 show_node_header = False
1274 feedback_fn(" ERROR: Script %s failed, output:" % script)
1275 output = indent_re.sub(' ', output)
1276 feedback_fn("%s" % output)
1282 class LUVerifyDisks(NoHooksLU):
1283 """Verifies the cluster disks status.
1289 def ExpandNames(self):
1290 self.needed_locks = {
1291 locking.LEVEL_NODE: locking.ALL_SET,
1292 locking.LEVEL_INSTANCE: locking.ALL_SET,
1294 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1296 def CheckPrereq(self):
1297 """Check prerequisites.
1299 This has no prerequisites.
1304 def Exec(self, feedback_fn):
1305 """Verify integrity of cluster disks.
1307 @rtype: tuple of three items
1308 @return: a tuple of (dict of node-to-node_error, list of instances
1309 which need activate-disks, dict of instance: (node, volume) for
1313 result = res_nodes, res_instances, res_missing = {}, [], {}
1315 vg_name = self.cfg.GetVGName()
1316 nodes = utils.NiceSort(self.cfg.GetNodeList())
1317 instances = [self.cfg.GetInstanceInfo(name)
1318 for name in self.cfg.GetInstanceList()]
1321 for inst in instances:
1323 if (not inst.admin_up or
1324 inst.disk_template not in constants.DTS_NET_MIRROR):
1326 inst.MapLVsByNode(inst_lvs)
1327 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1328 for node, vol_list in inst_lvs.iteritems():
1329 for vol in vol_list:
1330 nv_dict[(node, vol)] = inst
1335 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1340 node_res = node_lvs[node]
1341 if node_res.offline:
1343 msg = node_res.fail_msg
1345 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1346 res_nodes[node] = msg
1349 lvs = node_res.payload
1350 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1351 inst = nv_dict.pop((node, lv_name), None)
1352 if (not lv_online and inst is not None
1353 and inst.name not in res_instances):
1354 res_instances.append(inst.name)
1356 # any leftover items in nv_dict are missing LVs, let's arrange the
1358 for key, inst in nv_dict.iteritems():
1359 if inst.name not in res_missing:
1360 res_missing[inst.name] = []
1361 res_missing[inst.name].append(key)
1366 class LURenameCluster(LogicalUnit):
1367 """Rename the cluster.
1370 HPATH = "cluster-rename"
1371 HTYPE = constants.HTYPE_CLUSTER
1374 def BuildHooksEnv(self):
1379 "OP_TARGET": self.cfg.GetClusterName(),
1380 "NEW_NAME": self.op.name,
1382 mn = self.cfg.GetMasterNode()
1383 return env, [mn], [mn]
1385 def CheckPrereq(self):
1386 """Verify that the passed name is a valid one.
1389 hostname = utils.HostInfo(self.op.name)
1391 new_name = hostname.name
1392 self.ip = new_ip = hostname.ip
1393 old_name = self.cfg.GetClusterName()
1394 old_ip = self.cfg.GetMasterIP()
1395 if new_name == old_name and new_ip == old_ip:
1396 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1397 " cluster has changed")
1398 if new_ip != old_ip:
1399 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1400 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1401 " reachable on the network. Aborting." %
1404 self.op.name = new_name
1406 def Exec(self, feedback_fn):
1407 """Rename the cluster.
1410 clustername = self.op.name
1413 # shutdown the master IP
1414 master = self.cfg.GetMasterNode()
1415 result = self.rpc.call_node_stop_master(master, False)
1416 result.Raise("Could not disable the master role")
1419 cluster = self.cfg.GetClusterInfo()
1420 cluster.cluster_name = clustername
1421 cluster.master_ip = ip
1422 self.cfg.Update(cluster)
1424 # update the known hosts file
1425 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1426 node_list = self.cfg.GetNodeList()
1428 node_list.remove(master)
1431 result = self.rpc.call_upload_file(node_list,
1432 constants.SSH_KNOWN_HOSTS_FILE)
1433 for to_node, to_result in result.iteritems():
1434 msg = to_result.fail_msg
1436 msg = ("Copy of file %s to node %s failed: %s" %
1437 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1438 self.proc.LogWarning(msg)
1441 result = self.rpc.call_node_start_master(master, False)
1442 msg = result.fail_msg
1444 self.LogWarning("Could not re-enable the master role on"
1445 " the master, please restart manually: %s", msg)
1448 def _RecursiveCheckIfLVMBased(disk):
1449 """Check if the given disk or its children are lvm-based.
1451 @type disk: L{objects.Disk}
1452 @param disk: the disk to check
1454 @return: boolean indicating whether a LD_LV dev_type was found or not
1458 for chdisk in disk.children:
1459 if _RecursiveCheckIfLVMBased(chdisk):
1461 return disk.dev_type == constants.LD_LV
1464 class LUSetClusterParams(LogicalUnit):
1465 """Change the parameters of the cluster.
1468 HPATH = "cluster-modify"
1469 HTYPE = constants.HTYPE_CLUSTER
1473 def CheckArguments(self):
1477 if not hasattr(self.op, "candidate_pool_size"):
1478 self.op.candidate_pool_size = None
1479 if self.op.candidate_pool_size is not None:
1481 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1482 except (ValueError, TypeError), err:
1483 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1485 if self.op.candidate_pool_size < 1:
1486 raise errors.OpPrereqError("At least one master candidate needed")
1488 def ExpandNames(self):
1489 # FIXME: in the future maybe other cluster params won't require checking on
1490 # all nodes to be modified.
1491 self.needed_locks = {
1492 locking.LEVEL_NODE: locking.ALL_SET,
1494 self.share_locks[locking.LEVEL_NODE] = 1
1496 def BuildHooksEnv(self):
1501 "OP_TARGET": self.cfg.GetClusterName(),
1502 "NEW_VG_NAME": self.op.vg_name,
1504 mn = self.cfg.GetMasterNode()
1505 return env, [mn], [mn]
1507 def CheckPrereq(self):
1508 """Check prerequisites.
1510 This checks whether the given params don't conflict and
1511 if the given volume group is valid.
1514 if self.op.vg_name is not None and not self.op.vg_name:
1515 instances = self.cfg.GetAllInstancesInfo().values()
1516 for inst in instances:
1517 for disk in inst.disks:
1518 if _RecursiveCheckIfLVMBased(disk):
1519 raise errors.OpPrereqError("Cannot disable lvm storage while"
1520 " lvm-based instances exist")
1522 node_list = self.acquired_locks[locking.LEVEL_NODE]
1524 # if vg_name not None, checks given volume group on all nodes
1526 vglist = self.rpc.call_vg_list(node_list)
1527 for node in node_list:
1528 msg = vglist[node].fail_msg
1530 # ignoring down node
1531 self.LogWarning("Error while gathering data on node %s"
1532 " (ignoring node): %s", node, msg)
1534 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1536 constants.MIN_VG_SIZE)
1538 raise errors.OpPrereqError("Error on node '%s': %s" %
1541 self.cluster = cluster = self.cfg.GetClusterInfo()
1542 # validate params changes
1543 if self.op.beparams:
1544 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1545 self.new_beparams = objects.FillDict(
1546 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1548 if self.op.nicparams:
1549 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1550 self.new_nicparams = objects.FillDict(
1551 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1552 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1554 # hypervisor list/parameters
1555 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1556 if self.op.hvparams:
1557 if not isinstance(self.op.hvparams, dict):
1558 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1559 for hv_name, hv_dict in self.op.hvparams.items():
1560 if hv_name not in self.new_hvparams:
1561 self.new_hvparams[hv_name] = hv_dict
1563 self.new_hvparams[hv_name].update(hv_dict)
1565 if self.op.enabled_hypervisors is not None:
1566 self.hv_list = self.op.enabled_hypervisors
1568 self.hv_list = cluster.enabled_hypervisors
1570 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1571 # either the enabled list has changed, or the parameters have, validate
1572 for hv_name, hv_params in self.new_hvparams.items():
1573 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1574 (self.op.enabled_hypervisors and
1575 hv_name in self.op.enabled_hypervisors)):
1576 # either this is a new hypervisor, or its parameters have changed
1577 hv_class = hypervisor.GetHypervisor(hv_name)
1578 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1579 hv_class.CheckParameterSyntax(hv_params)
1580 _CheckHVParams(self, node_list, hv_name, hv_params)
1582 def Exec(self, feedback_fn):
1583 """Change the parameters of the cluster.
1586 if self.op.vg_name is not None:
1587 new_volume = self.op.vg_name
1590 if new_volume != self.cfg.GetVGName():
1591 self.cfg.SetVGName(new_volume)
1593 feedback_fn("Cluster LVM configuration already in desired"
1594 " state, not changing")
1595 if self.op.hvparams:
1596 self.cluster.hvparams = self.new_hvparams
1597 if self.op.enabled_hypervisors is not None:
1598 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1599 if self.op.beparams:
1600 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1601 if self.op.nicparams:
1602 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1604 if self.op.candidate_pool_size is not None:
1605 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1607 self.cfg.Update(self.cluster)
1609 # we want to update nodes after the cluster so that if any errors
1610 # happen, we have recorded and saved the cluster info
1611 if self.op.candidate_pool_size is not None:
1612 _AdjustCandidatePool(self)
1615 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1616 """Distribute additional files which are part of the cluster configuration.
1618 ConfigWriter takes care of distributing the config and ssconf files, but
1619 there are more files which should be distributed to all nodes. This function
1620 makes sure those are copied.
1622 @param lu: calling logical unit
1623 @param additional_nodes: list of nodes not in the config to distribute to
1626 # 1. Gather target nodes
1627 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1628 dist_nodes = lu.cfg.GetNodeList()
1629 if additional_nodes is not None:
1630 dist_nodes.extend(additional_nodes)
1631 if myself.name in dist_nodes:
1632 dist_nodes.remove(myself.name)
1633 # 2. Gather files to distribute
1634 dist_files = set([constants.ETC_HOSTS,
1635 constants.SSH_KNOWN_HOSTS_FILE,
1636 constants.RAPI_CERT_FILE,
1637 constants.RAPI_USERS_FILE,
1640 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1641 for hv_name in enabled_hypervisors:
1642 hv_class = hypervisor.GetHypervisor(hv_name)
1643 dist_files.update(hv_class.GetAncillaryFiles())
1645 # 3. Perform the files upload
1646 for fname in dist_files:
1647 if os.path.exists(fname):
1648 result = lu.rpc.call_upload_file(dist_nodes, fname)
1649 for to_node, to_result in result.items():
1650 msg = to_result.fail_msg
1652 msg = ("Copy of file %s to node %s failed: %s" %
1653 (fname, to_node, msg))
1654 lu.proc.LogWarning(msg)
1657 class LURedistributeConfig(NoHooksLU):
1658 """Force the redistribution of cluster configuration.
1660 This is a very simple LU.
1666 def ExpandNames(self):
1667 self.needed_locks = {
1668 locking.LEVEL_NODE: locking.ALL_SET,
1670 self.share_locks[locking.LEVEL_NODE] = 1
1672 def CheckPrereq(self):
1673 """Check prerequisites.
1677 def Exec(self, feedback_fn):
1678 """Redistribute the configuration.
1681 self.cfg.Update(self.cfg.GetClusterInfo())
1682 _RedistributeAncillaryFiles(self)
1685 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1686 """Sleep and poll for an instance's disk to sync.
1689 if not instance.disks:
1693 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1695 node = instance.primary_node
1697 for dev in instance.disks:
1698 lu.cfg.SetDiskID(dev, node)
1701 degr_retries = 10 # in seconds, as we sleep 1 second each time
1705 cumul_degraded = False
1706 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1707 msg = rstats.fail_msg
1709 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1712 raise errors.RemoteError("Can't contact node %s for mirror data,"
1713 " aborting." % node)
1716 rstats = rstats.payload
1718 for i, mstat in enumerate(rstats):
1720 lu.LogWarning("Can't compute data for node %s/%s",
1721 node, instance.disks[i].iv_name)
1723 # we ignore the ldisk parameter
1724 perc_done, est_time, is_degraded, _ = mstat
1725 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1726 if perc_done is not None:
1728 if est_time is not None:
1729 rem_time = "%d estimated seconds remaining" % est_time
1732 rem_time = "no time estimate"
1733 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1734 (instance.disks[i].iv_name, perc_done, rem_time))
1736 # if we're done but degraded, let's do a few small retries, to
1737 # make sure we see a stable and not transient situation; therefore
1738 # we force restart of the loop
1739 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1740 logging.info("Degraded disks found, %d retries left", degr_retries)
1748 time.sleep(min(60, max_time))
1751 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1752 return not cumul_degraded
1755 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1756 """Check that mirrors are not degraded.
1758 The ldisk parameter, if True, will change the test from the
1759 is_degraded attribute (which represents overall non-ok status for
1760 the device(s)) to the ldisk (representing the local storage status).
1763 lu.cfg.SetDiskID(dev, node)
1770 if on_primary or dev.AssembleOnSecondary():
1771 rstats = lu.rpc.call_blockdev_find(node, dev)
1772 msg = rstats.fail_msg
1774 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1776 elif not rstats.payload:
1777 lu.LogWarning("Can't find disk on node %s", node)
1780 result = result and (not rstats.payload[idx])
1782 for child in dev.children:
1783 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1788 class LUDiagnoseOS(NoHooksLU):
1789 """Logical unit for OS diagnose/query.
1792 _OP_REQP = ["output_fields", "names"]
1794 _FIELDS_STATIC = utils.FieldSet()
1795 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1797 def ExpandNames(self):
1799 raise errors.OpPrereqError("Selective OS query not supported")
1801 _CheckOutputFields(static=self._FIELDS_STATIC,
1802 dynamic=self._FIELDS_DYNAMIC,
1803 selected=self.op.output_fields)
1805 # Lock all nodes, in shared mode
1806 # Temporary removal of locks, should be reverted later
1807 # TODO: reintroduce locks when they are lighter-weight
1808 self.needed_locks = {}
1809 #self.share_locks[locking.LEVEL_NODE] = 1
1810 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1812 def CheckPrereq(self):
1813 """Check prerequisites.
1818 def _DiagnoseByOS(node_list, rlist):
1819 """Remaps a per-node return list into an a per-os per-node dictionary
1821 @param node_list: a list with the names of all nodes
1822 @param rlist: a map with node names as keys and OS objects as values
1825 @return: a dictionary with osnames as keys and as value another map, with
1826 nodes as keys and tuples of (path, status, diagnose) as values, eg::
1828 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1829 (/srv/..., False, "invalid api")],
1830 "node2": [(/srv/..., True, "")]}
1835 # we build here the list of nodes that didn't fail the RPC (at RPC
1836 # level), so that nodes with a non-responding node daemon don't
1837 # make all OSes invalid
1838 good_nodes = [node_name for node_name in rlist
1839 if not rlist[node_name].fail_msg]
1840 for node_name, nr in rlist.items():
1841 if nr.fail_msg or not nr.payload:
1843 for name, path, status, diagnose in nr.payload:
1844 if name not in all_os:
1845 # build a list of nodes for this os containing empty lists
1846 # for each node in node_list
1848 for nname in good_nodes:
1849 all_os[name][nname] = []
1850 all_os[name][node_name].append((path, status, diagnose))
1853 def Exec(self, feedback_fn):
1854 """Compute the list of OSes.
1857 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1858 node_data = self.rpc.call_os_diagnose(valid_nodes)
1859 pol = self._DiagnoseByOS(valid_nodes, node_data)
1861 for os_name, os_data in pol.items():
1863 for field in self.op.output_fields:
1866 elif field == "valid":
1867 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1868 elif field == "node_status":
1869 # this is just a copy of the dict
1871 for node_name, nos_list in os_data.items():
1872 val[node_name] = nos_list
1874 raise errors.ParameterError(field)
1881 class LURemoveNode(LogicalUnit):
1882 """Logical unit for removing a node.
1885 HPATH = "node-remove"
1886 HTYPE = constants.HTYPE_NODE
1887 _OP_REQP = ["node_name"]
1889 def BuildHooksEnv(self):
1892 This doesn't run on the target node in the pre phase as a failed
1893 node would then be impossible to remove.
1897 "OP_TARGET": self.op.node_name,
1898 "NODE_NAME": self.op.node_name,
1900 all_nodes = self.cfg.GetNodeList()
1901 all_nodes.remove(self.op.node_name)
1902 return env, all_nodes, all_nodes
1904 def CheckPrereq(self):
1905 """Check prerequisites.
1908 - the node exists in the configuration
1909 - it does not have primary or secondary instances
1910 - it's not the master
1912 Any errors are signalled by raising errors.OpPrereqError.
1915 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1917 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1919 instance_list = self.cfg.GetInstanceList()
1921 masternode = self.cfg.GetMasterNode()
1922 if node.name == masternode:
1923 raise errors.OpPrereqError("Node is the master node,"
1924 " you need to failover first.")
1926 for instance_name in instance_list:
1927 instance = self.cfg.GetInstanceInfo(instance_name)
1928 if node.name in instance.all_nodes:
1929 raise errors.OpPrereqError("Instance %s is still running on the node,"
1930 " please remove first." % instance_name)
1931 self.op.node_name = node.name
1934 def Exec(self, feedback_fn):
1935 """Removes the node from the cluster.
1939 logging.info("Stopping the node daemon and removing configs from node %s",
1942 self.context.RemoveNode(node.name)
1944 result = self.rpc.call_node_leave_cluster(node.name)
1945 msg = result.fail_msg
1947 self.LogWarning("Errors encountered on the remote node while leaving"
1948 " the cluster: %s", msg)
1950 # Promote nodes to master candidate as needed
1951 _AdjustCandidatePool(self)
1954 class LUQueryNodes(NoHooksLU):
1955 """Logical unit for querying nodes.
1958 _OP_REQP = ["output_fields", "names", "use_locking"]
1960 _FIELDS_DYNAMIC = utils.FieldSet(
1962 "mtotal", "mnode", "mfree",
1964 "ctotal", "cnodes", "csockets",
1967 _FIELDS_STATIC = utils.FieldSet(
1968 "name", "pinst_cnt", "sinst_cnt",
1969 "pinst_list", "sinst_list",
1970 "pip", "sip", "tags",
1978 def ExpandNames(self):
1979 _CheckOutputFields(static=self._FIELDS_STATIC,
1980 dynamic=self._FIELDS_DYNAMIC,
1981 selected=self.op.output_fields)
1983 self.needed_locks = {}
1984 self.share_locks[locking.LEVEL_NODE] = 1
1987 self.wanted = _GetWantedNodes(self, self.op.names)
1989 self.wanted = locking.ALL_SET
1991 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1992 self.do_locking = self.do_node_query and self.op.use_locking
1994 # if we don't request only static fields, we need to lock the nodes
1995 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1998 def CheckPrereq(self):
1999 """Check prerequisites.
2002 # The validation of the node list is done in the _GetWantedNodes,
2003 # if non empty, and if empty, there's no validation to do
2006 def Exec(self, feedback_fn):
2007 """Computes the list of nodes and their attributes.
2010 all_info = self.cfg.GetAllNodesInfo()
2012 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2013 elif self.wanted != locking.ALL_SET:
2014 nodenames = self.wanted
2015 missing = set(nodenames).difference(all_info.keys())
2017 raise errors.OpExecError(
2018 "Some nodes were removed before retrieving their data: %s" % missing)
2020 nodenames = all_info.keys()
2022 nodenames = utils.NiceSort(nodenames)
2023 nodelist = [all_info[name] for name in nodenames]
2025 # begin data gathering
2027 if self.do_node_query:
2029 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2030 self.cfg.GetHypervisorType())
2031 for name in nodenames:
2032 nodeinfo = node_data[name]
2033 if not nodeinfo.fail_msg and nodeinfo.payload:
2034 nodeinfo = nodeinfo.payload
2035 fn = utils.TryConvert
2037 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2038 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2039 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2040 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2041 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2042 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2043 "bootid": nodeinfo.get('bootid', None),
2044 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2045 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2048 live_data[name] = {}
2050 live_data = dict.fromkeys(nodenames, {})
2052 node_to_primary = dict([(name, set()) for name in nodenames])
2053 node_to_secondary = dict([(name, set()) for name in nodenames])
2055 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2056 "sinst_cnt", "sinst_list"))
2057 if inst_fields & frozenset(self.op.output_fields):
2058 instancelist = self.cfg.GetInstanceList()
2060 for instance_name in instancelist:
2061 inst = self.cfg.GetInstanceInfo(instance_name)
2062 if inst.primary_node in node_to_primary:
2063 node_to_primary[inst.primary_node].add(inst.name)
2064 for secnode in inst.secondary_nodes:
2065 if secnode in node_to_secondary:
2066 node_to_secondary[secnode].add(inst.name)
2068 master_node = self.cfg.GetMasterNode()
2070 # end data gathering
2073 for node in nodelist:
2075 for field in self.op.output_fields:
2078 elif field == "pinst_list":
2079 val = list(node_to_primary[node.name])
2080 elif field == "sinst_list":
2081 val = list(node_to_secondary[node.name])
2082 elif field == "pinst_cnt":
2083 val = len(node_to_primary[node.name])
2084 elif field == "sinst_cnt":
2085 val = len(node_to_secondary[node.name])
2086 elif field == "pip":
2087 val = node.primary_ip
2088 elif field == "sip":
2089 val = node.secondary_ip
2090 elif field == "tags":
2091 val = list(node.GetTags())
2092 elif field == "serial_no":
2093 val = node.serial_no
2094 elif field == "master_candidate":
2095 val = node.master_candidate
2096 elif field == "master":
2097 val = node.name == master_node
2098 elif field == "offline":
2100 elif field == "drained":
2102 elif self._FIELDS_DYNAMIC.Matches(field):
2103 val = live_data[node.name].get(field, None)
2105 raise errors.ParameterError(field)
2106 node_output.append(val)
2107 output.append(node_output)
2112 class LUQueryNodeVolumes(NoHooksLU):
2113 """Logical unit for getting volumes on node(s).
2116 _OP_REQP = ["nodes", "output_fields"]
2118 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2119 _FIELDS_STATIC = utils.FieldSet("node")
2121 def ExpandNames(self):
2122 _CheckOutputFields(static=self._FIELDS_STATIC,
2123 dynamic=self._FIELDS_DYNAMIC,
2124 selected=self.op.output_fields)
2126 self.needed_locks = {}
2127 self.share_locks[locking.LEVEL_NODE] = 1
2128 if not self.op.nodes:
2129 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2131 self.needed_locks[locking.LEVEL_NODE] = \
2132 _GetWantedNodes(self, self.op.nodes)
2134 def CheckPrereq(self):
2135 """Check prerequisites.
2137 This checks that the fields required are valid output fields.
2140 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2142 def Exec(self, feedback_fn):
2143 """Computes the list of nodes and their attributes.
2146 nodenames = self.nodes
2147 volumes = self.rpc.call_node_volumes(nodenames)
2149 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2150 in self.cfg.GetInstanceList()]
2152 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2155 for node in nodenames:
2156 nresult = volumes[node]
2159 msg = nresult.fail_msg
2161 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2164 node_vols = nresult.payload[:]
2165 node_vols.sort(key=lambda vol: vol['dev'])
2167 for vol in node_vols:
2169 for field in self.op.output_fields:
2172 elif field == "phys":
2176 elif field == "name":
2178 elif field == "size":
2179 val = int(float(vol['size']))
2180 elif field == "instance":
2182 if node not in lv_by_node[inst]:
2184 if vol['name'] in lv_by_node[inst][node]:
2190 raise errors.ParameterError(field)
2191 node_output.append(str(val))
2193 output.append(node_output)
2198 class LUAddNode(LogicalUnit):
2199 """Logical unit for adding node to the cluster.
2203 HTYPE = constants.HTYPE_NODE
2204 _OP_REQP = ["node_name"]
2206 def BuildHooksEnv(self):
2209 This will run on all nodes before, and on all nodes + the new node after.
2213 "OP_TARGET": self.op.node_name,
2214 "NODE_NAME": self.op.node_name,
2215 "NODE_PIP": self.op.primary_ip,
2216 "NODE_SIP": self.op.secondary_ip,
2218 nodes_0 = self.cfg.GetNodeList()
2219 nodes_1 = nodes_0 + [self.op.node_name, ]
2220 return env, nodes_0, nodes_1
2222 def CheckPrereq(self):
2223 """Check prerequisites.
2226 - the new node is not already in the config
2228 - its parameters (single/dual homed) matches the cluster
2230 Any errors are signalled by raising errors.OpPrereqError.
2233 node_name = self.op.node_name
2236 dns_data = utils.HostInfo(node_name)
2238 node = dns_data.name
2239 primary_ip = self.op.primary_ip = dns_data.ip
2240 secondary_ip = getattr(self.op, "secondary_ip", None)
2241 if secondary_ip is None:
2242 secondary_ip = primary_ip
2243 if not utils.IsValidIP(secondary_ip):
2244 raise errors.OpPrereqError("Invalid secondary IP given")
2245 self.op.secondary_ip = secondary_ip
2247 node_list = cfg.GetNodeList()
2248 if not self.op.readd and node in node_list:
2249 raise errors.OpPrereqError("Node %s is already in the configuration" %
2251 elif self.op.readd and node not in node_list:
2252 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2254 for existing_node_name in node_list:
2255 existing_node = cfg.GetNodeInfo(existing_node_name)
2257 if self.op.readd and node == existing_node_name:
2258 if (existing_node.primary_ip != primary_ip or
2259 existing_node.secondary_ip != secondary_ip):
2260 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2261 " address configuration as before")
2264 if (existing_node.primary_ip == primary_ip or
2265 existing_node.secondary_ip == primary_ip or
2266 existing_node.primary_ip == secondary_ip or
2267 existing_node.secondary_ip == secondary_ip):
2268 raise errors.OpPrereqError("New node ip address(es) conflict with"
2269 " existing node %s" % existing_node.name)
2271 # check that the type of the node (single versus dual homed) is the
2272 # same as for the master
2273 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2274 master_singlehomed = myself.secondary_ip == myself.primary_ip
2275 newbie_singlehomed = secondary_ip == primary_ip
2276 if master_singlehomed != newbie_singlehomed:
2277 if master_singlehomed:
2278 raise errors.OpPrereqError("The master has no private ip but the"
2279 " new node has one")
2281 raise errors.OpPrereqError("The master has a private ip but the"
2282 " new node doesn't have one")
2284 # checks reachablity
2285 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2286 raise errors.OpPrereqError("Node not reachable by ping")
2288 if not newbie_singlehomed:
2289 # check reachability from my secondary ip to newbie's secondary ip
2290 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2291 source=myself.secondary_ip):
2292 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2293 " based ping to noded port")
2295 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2296 mc_now, _ = self.cfg.GetMasterCandidateStats()
2297 master_candidate = mc_now < cp_size
2299 self.new_node = objects.Node(name=node,
2300 primary_ip=primary_ip,
2301 secondary_ip=secondary_ip,
2302 master_candidate=master_candidate,
2303 offline=False, drained=False)
2305 def Exec(self, feedback_fn):
2306 """Adds the new node to the cluster.
2309 new_node = self.new_node
2310 node = new_node.name
2312 # check connectivity
2313 result = self.rpc.call_version([node])[node]
2314 result.Raise("Can't get version information from node %s" % node)
2315 if constants.PROTOCOL_VERSION == result.payload:
2316 logging.info("Communication to node %s fine, sw version %s match",
2317 node, result.payload)
2319 raise errors.OpExecError("Version mismatch master version %s,"
2320 " node version %s" %
2321 (constants.PROTOCOL_VERSION, result.payload))
2324 logging.info("Copy ssh key to node %s", node)
2325 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2327 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2328 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2334 keyarray.append(f.read())
2338 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2340 keyarray[3], keyarray[4], keyarray[5])
2341 result.Raise("Cannot transfer ssh keys to the new node")
2343 # Add node to our /etc/hosts, and add key to known_hosts
2344 if self.cfg.GetClusterInfo().modify_etc_hosts:
2345 utils.AddHostToEtcHosts(new_node.name)
2347 if new_node.secondary_ip != new_node.primary_ip:
2348 result = self.rpc.call_node_has_ip_address(new_node.name,
2349 new_node.secondary_ip)
2350 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2352 if not result.payload:
2353 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2354 " you gave (%s). Please fix and re-run this"
2355 " command." % new_node.secondary_ip)
2357 node_verify_list = [self.cfg.GetMasterNode()]
2358 node_verify_param = {
2360 # TODO: do a node-net-test as well?
2363 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2364 self.cfg.GetClusterName())
2365 for verifier in node_verify_list:
2366 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2367 nl_payload = result[verifier].payload['nodelist']
2369 for failed in nl_payload:
2370 feedback_fn("ssh/hostname verification failed %s -> %s" %
2371 (verifier, nl_payload[failed]))
2372 raise errors.OpExecError("ssh/hostname verification failed.")
2375 _RedistributeAncillaryFiles(self)
2376 self.context.ReaddNode(new_node)
2378 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2379 self.context.AddNode(new_node)
2382 class LUSetNodeParams(LogicalUnit):
2383 """Modifies the parameters of a node.
2386 HPATH = "node-modify"
2387 HTYPE = constants.HTYPE_NODE
2388 _OP_REQP = ["node_name"]
2391 def CheckArguments(self):
2392 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2393 if node_name is None:
2394 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2395 self.op.node_name = node_name
2396 _CheckBooleanOpField(self.op, 'master_candidate')
2397 _CheckBooleanOpField(self.op, 'offline')
2398 _CheckBooleanOpField(self.op, 'drained')
2399 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2400 if all_mods.count(None) == 3:
2401 raise errors.OpPrereqError("Please pass at least one modification")
2402 if all_mods.count(True) > 1:
2403 raise errors.OpPrereqError("Can't set the node into more than one"
2404 " state at the same time")
2406 def ExpandNames(self):
2407 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2409 def BuildHooksEnv(self):
2412 This runs on the master node.
2416 "OP_TARGET": self.op.node_name,
2417 "MASTER_CANDIDATE": str(self.op.master_candidate),
2418 "OFFLINE": str(self.op.offline),
2419 "DRAINED": str(self.op.drained),
2421 nl = [self.cfg.GetMasterNode(),
2425 def CheckPrereq(self):
2426 """Check prerequisites.
2428 This only checks the instance list against the existing names.
2431 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2433 if ((self.op.master_candidate == False or self.op.offline == True or
2434 self.op.drained == True) and node.master_candidate):
2435 # we will demote the node from master_candidate
2436 if self.op.node_name == self.cfg.GetMasterNode():
2437 raise errors.OpPrereqError("The master node has to be a"
2438 " master candidate, online and not drained")
2439 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2440 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2441 if num_candidates <= cp_size:
2442 msg = ("Not enough master candidates (desired"
2443 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2445 self.LogWarning(msg)
2447 raise errors.OpPrereqError(msg)
2449 if (self.op.master_candidate == True and
2450 ((node.offline and not self.op.offline == False) or
2451 (node.drained and not self.op.drained == False))):
2452 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2453 " to master_candidate" % node.name)
2457 def Exec(self, feedback_fn):
2466 if self.op.offline is not None:
2467 node.offline = self.op.offline
2468 result.append(("offline", str(self.op.offline)))
2469 if self.op.offline == True:
2470 if node.master_candidate:
2471 node.master_candidate = False
2473 result.append(("master_candidate", "auto-demotion due to offline"))
2475 node.drained = False
2476 result.append(("drained", "clear drained status due to offline"))
2478 if self.op.master_candidate is not None:
2479 node.master_candidate = self.op.master_candidate
2481 result.append(("master_candidate", str(self.op.master_candidate)))
2482 if self.op.master_candidate == False:
2483 rrc = self.rpc.call_node_demote_from_mc(node.name)
2486 self.LogWarning("Node failed to demote itself: %s" % msg)
2488 if self.op.drained is not None:
2489 node.drained = self.op.drained
2490 result.append(("drained", str(self.op.drained)))
2491 if self.op.drained == True:
2492 if node.master_candidate:
2493 node.master_candidate = False
2495 result.append(("master_candidate", "auto-demotion due to drain"))
2497 node.offline = False
2498 result.append(("offline", "clear offline status due to drain"))
2500 # this will trigger configuration file update, if needed
2501 self.cfg.Update(node)
2502 # this will trigger job queue propagation or cleanup
2504 self.context.ReaddNode(node)
2509 class LUPowercycleNode(NoHooksLU):
2510 """Powercycles a node.
2513 _OP_REQP = ["node_name", "force"]
2516 def CheckArguments(self):
2517 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2518 if node_name is None:
2519 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2520 self.op.node_name = node_name
2521 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2522 raise errors.OpPrereqError("The node is the master and the force"
2523 " parameter was not set")
2525 def ExpandNames(self):
2526 """Locking for PowercycleNode.
2528 This is a last-resource option and shouldn't block on other
2529 jobs. Therefore, we grab no locks.
2532 self.needed_locks = {}
2534 def CheckPrereq(self):
2535 """Check prerequisites.
2537 This LU has no prereqs.
2542 def Exec(self, feedback_fn):
2546 result = self.rpc.call_node_powercycle(self.op.node_name,
2547 self.cfg.GetHypervisorType())
2548 result.Raise("Failed to schedule the reboot")
2549 return result.payload
2552 class LUQueryClusterInfo(NoHooksLU):
2553 """Query cluster configuration.
2559 def ExpandNames(self):
2560 self.needed_locks = {}
2562 def CheckPrereq(self):
2563 """No prerequsites needed for this LU.
2568 def Exec(self, feedback_fn):
2569 """Return cluster config.
2572 cluster = self.cfg.GetClusterInfo()
2574 "software_version": constants.RELEASE_VERSION,
2575 "protocol_version": constants.PROTOCOL_VERSION,
2576 "config_version": constants.CONFIG_VERSION,
2577 "os_api_version": constants.OS_API_VERSION,
2578 "export_version": constants.EXPORT_VERSION,
2579 "architecture": (platform.architecture()[0], platform.machine()),
2580 "name": cluster.cluster_name,
2581 "master": cluster.master_node,
2582 "default_hypervisor": cluster.default_hypervisor,
2583 "enabled_hypervisors": cluster.enabled_hypervisors,
2584 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2585 for hypervisor in cluster.enabled_hypervisors]),
2586 "beparams": cluster.beparams,
2587 "nicparams": cluster.nicparams,
2588 "candidate_pool_size": cluster.candidate_pool_size,
2589 "master_netdev": cluster.master_netdev,
2590 "volume_group_name": cluster.volume_group_name,
2591 "file_storage_dir": cluster.file_storage_dir,
2597 class LUQueryConfigValues(NoHooksLU):
2598 """Return configuration values.
2603 _FIELDS_DYNAMIC = utils.FieldSet()
2604 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2606 def ExpandNames(self):
2607 self.needed_locks = {}
2609 _CheckOutputFields(static=self._FIELDS_STATIC,
2610 dynamic=self._FIELDS_DYNAMIC,
2611 selected=self.op.output_fields)
2613 def CheckPrereq(self):
2614 """No prerequisites.
2619 def Exec(self, feedback_fn):
2620 """Dump a representation of the cluster config to the standard output.
2624 for field in self.op.output_fields:
2625 if field == "cluster_name":
2626 entry = self.cfg.GetClusterName()
2627 elif field == "master_node":
2628 entry = self.cfg.GetMasterNode()
2629 elif field == "drain_flag":
2630 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2632 raise errors.ParameterError(field)
2633 values.append(entry)
2637 class LUActivateInstanceDisks(NoHooksLU):
2638 """Bring up an instance's disks.
2641 _OP_REQP = ["instance_name"]
2644 def ExpandNames(self):
2645 self._ExpandAndLockInstance()
2646 self.needed_locks[locking.LEVEL_NODE] = []
2647 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2649 def DeclareLocks(self, level):
2650 if level == locking.LEVEL_NODE:
2651 self._LockInstancesNodes()
2653 def CheckPrereq(self):
2654 """Check prerequisites.
2656 This checks that the instance is in the cluster.
2659 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2660 assert self.instance is not None, \
2661 "Cannot retrieve locked instance %s" % self.op.instance_name
2662 _CheckNodeOnline(self, self.instance.primary_node)
2664 def Exec(self, feedback_fn):
2665 """Activate the disks.
2668 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2670 raise errors.OpExecError("Cannot activate block devices")
2675 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2676 """Prepare the block devices for an instance.
2678 This sets up the block devices on all nodes.
2680 @type lu: L{LogicalUnit}
2681 @param lu: the logical unit on whose behalf we execute
2682 @type instance: L{objects.Instance}
2683 @param instance: the instance for whose disks we assemble
2684 @type ignore_secondaries: boolean
2685 @param ignore_secondaries: if true, errors on secondary nodes
2686 won't result in an error return from the function
2687 @return: False if the operation failed, otherwise a list of
2688 (host, instance_visible_name, node_visible_name)
2689 with the mapping from node devices to instance devices
2694 iname = instance.name
2695 # With the two passes mechanism we try to reduce the window of
2696 # opportunity for the race condition of switching DRBD to primary
2697 # before handshaking occured, but we do not eliminate it
2699 # The proper fix would be to wait (with some limits) until the
2700 # connection has been made and drbd transitions from WFConnection
2701 # into any other network-connected state (Connected, SyncTarget,
2704 # 1st pass, assemble on all nodes in secondary mode
2705 for inst_disk in instance.disks:
2706 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2707 lu.cfg.SetDiskID(node_disk, node)
2708 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2709 msg = result.fail_msg
2711 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2712 " (is_primary=False, pass=1): %s",
2713 inst_disk.iv_name, node, msg)
2714 if not ignore_secondaries:
2717 # FIXME: race condition on drbd migration to primary
2719 # 2nd pass, do only the primary node
2720 for inst_disk in instance.disks:
2721 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2722 if node != instance.primary_node:
2724 lu.cfg.SetDiskID(node_disk, node)
2725 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2726 msg = result.fail_msg
2728 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2729 " (is_primary=True, pass=2): %s",
2730 inst_disk.iv_name, node, msg)
2732 device_info.append((instance.primary_node, inst_disk.iv_name,
2735 # leave the disks configured for the primary node
2736 # this is a workaround that would be fixed better by
2737 # improving the logical/physical id handling
2738 for disk in instance.disks:
2739 lu.cfg.SetDiskID(disk, instance.primary_node)
2741 return disks_ok, device_info
2744 def _StartInstanceDisks(lu, instance, force):
2745 """Start the disks of an instance.
2748 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2749 ignore_secondaries=force)
2751 _ShutdownInstanceDisks(lu, instance)
2752 if force is not None and not force:
2753 lu.proc.LogWarning("", hint="If the message above refers to a"
2755 " you can retry the operation using '--force'.")
2756 raise errors.OpExecError("Disk consistency error")
2759 class LUDeactivateInstanceDisks(NoHooksLU):
2760 """Shutdown an instance's disks.
2763 _OP_REQP = ["instance_name"]
2766 def ExpandNames(self):
2767 self._ExpandAndLockInstance()
2768 self.needed_locks[locking.LEVEL_NODE] = []
2769 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2771 def DeclareLocks(self, level):
2772 if level == locking.LEVEL_NODE:
2773 self._LockInstancesNodes()
2775 def CheckPrereq(self):
2776 """Check prerequisites.
2778 This checks that the instance is in the cluster.
2781 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2782 assert self.instance is not None, \
2783 "Cannot retrieve locked instance %s" % self.op.instance_name
2785 def Exec(self, feedback_fn):
2786 """Deactivate the disks
2789 instance = self.instance
2790 _SafeShutdownInstanceDisks(self, instance)
2793 def _SafeShutdownInstanceDisks(lu, instance):
2794 """Shutdown block devices of an instance.
2796 This function checks if an instance is running, before calling
2797 _ShutdownInstanceDisks.
2800 pnode = instance.primary_node
2801 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2802 ins_l.Raise("Can't contact node %s" % pnode)
2804 if instance.name in ins_l.payload:
2805 raise errors.OpExecError("Instance is running, can't shutdown"
2808 _ShutdownInstanceDisks(lu, instance)
2811 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2812 """Shutdown block devices of an instance.
2814 This does the shutdown on all nodes of the instance.
2816 If the ignore_primary is false, errors on the primary node are
2821 for disk in instance.disks:
2822 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2823 lu.cfg.SetDiskID(top_disk, node)
2824 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2825 msg = result.fail_msg
2827 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2828 disk.iv_name, node, msg)
2829 if not ignore_primary or node != instance.primary_node:
2834 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2835 """Checks if a node has enough free memory.
2837 This function check if a given node has the needed amount of free
2838 memory. In case the node has less memory or we cannot get the
2839 information from the node, this function raise an OpPrereqError
2842 @type lu: C{LogicalUnit}
2843 @param lu: a logical unit from which we get configuration data
2845 @param node: the node to check
2846 @type reason: C{str}
2847 @param reason: string to use in the error message
2848 @type requested: C{int}
2849 @param requested: the amount of memory in MiB to check for
2850 @type hypervisor_name: C{str}
2851 @param hypervisor_name: the hypervisor to ask for memory stats
2852 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2853 we cannot check the node
2856 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2857 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2858 free_mem = nodeinfo[node].payload.get('memory_free', None)
2859 if not isinstance(free_mem, int):
2860 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2861 " was '%s'" % (node, free_mem))
2862 if requested > free_mem:
2863 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2864 " needed %s MiB, available %s MiB" %
2865 (node, reason, requested, free_mem))
2868 class LUStartupInstance(LogicalUnit):
2869 """Starts an instance.
2872 HPATH = "instance-start"
2873 HTYPE = constants.HTYPE_INSTANCE
2874 _OP_REQP = ["instance_name", "force"]
2877 def ExpandNames(self):
2878 self._ExpandAndLockInstance()
2880 def BuildHooksEnv(self):
2883 This runs on master, primary and secondary nodes of the instance.
2887 "FORCE": self.op.force,
2889 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2890 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2893 def CheckPrereq(self):
2894 """Check prerequisites.
2896 This checks that the instance is in the cluster.
2899 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2900 assert self.instance is not None, \
2901 "Cannot retrieve locked instance %s" % self.op.instance_name
2904 self.beparams = getattr(self.op, "beparams", {})
2906 if not isinstance(self.beparams, dict):
2907 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2908 " dict" % (type(self.beparams), ))
2909 # fill the beparams dict
2910 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2911 self.op.beparams = self.beparams
2914 self.hvparams = getattr(self.op, "hvparams", {})
2916 if not isinstance(self.hvparams, dict):
2917 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2918 " dict" % (type(self.hvparams), ))
2920 # check hypervisor parameter syntax (locally)
2921 cluster = self.cfg.GetClusterInfo()
2922 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2923 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2925 filled_hvp.update(self.hvparams)
2926 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2927 hv_type.CheckParameterSyntax(filled_hvp)
2928 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2929 self.op.hvparams = self.hvparams
2931 _CheckNodeOnline(self, instance.primary_node)
2933 bep = self.cfg.GetClusterInfo().FillBE(instance)
2934 # check bridges existance
2935 _CheckInstanceBridgesExist(self, instance)
2937 remote_info = self.rpc.call_instance_info(instance.primary_node,
2939 instance.hypervisor)
2940 remote_info.Raise("Error checking node %s" % instance.primary_node,
2942 if not remote_info.payload: # not running already
2943 _CheckNodeFreeMemory(self, instance.primary_node,
2944 "starting instance %s" % instance.name,
2945 bep[constants.BE_MEMORY], instance.hypervisor)
2947 def Exec(self, feedback_fn):
2948 """Start the instance.
2951 instance = self.instance
2952 force = self.op.force
2954 self.cfg.MarkInstanceUp(instance.name)
2956 node_current = instance.primary_node
2958 _StartInstanceDisks(self, instance, force)
2960 result = self.rpc.call_instance_start(node_current, instance,
2961 self.hvparams, self.beparams)
2962 msg = result.fail_msg
2964 _ShutdownInstanceDisks(self, instance)
2965 raise errors.OpExecError("Could not start instance: %s" % msg)
2968 class LURebootInstance(LogicalUnit):
2969 """Reboot an instance.
2972 HPATH = "instance-reboot"
2973 HTYPE = constants.HTYPE_INSTANCE
2974 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2977 def ExpandNames(self):
2978 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2979 constants.INSTANCE_REBOOT_HARD,
2980 constants.INSTANCE_REBOOT_FULL]:
2981 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2982 (constants.INSTANCE_REBOOT_SOFT,
2983 constants.INSTANCE_REBOOT_HARD,
2984 constants.INSTANCE_REBOOT_FULL))
2985 self._ExpandAndLockInstance()
2987 def BuildHooksEnv(self):
2990 This runs on master, primary and secondary nodes of the instance.
2994 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2995 "REBOOT_TYPE": self.op.reboot_type,
2997 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2998 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3001 def CheckPrereq(self):
3002 """Check prerequisites.
3004 This checks that the instance is in the cluster.
3007 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3008 assert self.instance is not None, \
3009 "Cannot retrieve locked instance %s" % self.op.instance_name
3011 _CheckNodeOnline(self, instance.primary_node)
3013 # check bridges existance
3014 _CheckInstanceBridgesExist(self, instance)
3016 def Exec(self, feedback_fn):
3017 """Reboot the instance.
3020 instance = self.instance
3021 ignore_secondaries = self.op.ignore_secondaries
3022 reboot_type = self.op.reboot_type
3024 node_current = instance.primary_node
3026 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3027 constants.INSTANCE_REBOOT_HARD]:
3028 for disk in instance.disks:
3029 self.cfg.SetDiskID(disk, node_current)
3030 result = self.rpc.call_instance_reboot(node_current, instance,
3032 result.Raise("Could not reboot instance")
3034 result = self.rpc.call_instance_shutdown(node_current, instance)
3035 result.Raise("Could not shutdown instance for full reboot")
3036 _ShutdownInstanceDisks(self, instance)
3037 _StartInstanceDisks(self, instance, ignore_secondaries)
3038 result = self.rpc.call_instance_start(node_current, instance, None, None)
3039 msg = result.fail_msg
3041 _ShutdownInstanceDisks(self, instance)
3042 raise errors.OpExecError("Could not start instance for"
3043 " full reboot: %s" % msg)
3045 self.cfg.MarkInstanceUp(instance.name)
3048 class LUShutdownInstance(LogicalUnit):
3049 """Shutdown an instance.
3052 HPATH = "instance-stop"
3053 HTYPE = constants.HTYPE_INSTANCE
3054 _OP_REQP = ["instance_name"]
3057 def ExpandNames(self):
3058 self._ExpandAndLockInstance()
3060 def BuildHooksEnv(self):
3063 This runs on master, primary and secondary nodes of the instance.
3066 env = _BuildInstanceHookEnvByObject(self, self.instance)
3067 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3070 def CheckPrereq(self):
3071 """Check prerequisites.
3073 This checks that the instance is in the cluster.
3076 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3077 assert self.instance is not None, \
3078 "Cannot retrieve locked instance %s" % self.op.instance_name
3079 _CheckNodeOnline(self, self.instance.primary_node)
3081 def Exec(self, feedback_fn):
3082 """Shutdown the instance.
3085 instance = self.instance
3086 node_current = instance.primary_node
3087 self.cfg.MarkInstanceDown(instance.name)
3088 result = self.rpc.call_instance_shutdown(node_current, instance)
3089 msg = result.fail_msg
3091 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3093 _ShutdownInstanceDisks(self, instance)
3096 class LUReinstallInstance(LogicalUnit):
3097 """Reinstall an instance.
3100 HPATH = "instance-reinstall"
3101 HTYPE = constants.HTYPE_INSTANCE
3102 _OP_REQP = ["instance_name"]
3105 def ExpandNames(self):
3106 self._ExpandAndLockInstance()
3108 def BuildHooksEnv(self):
3111 This runs on master, primary and secondary nodes of the instance.
3114 env = _BuildInstanceHookEnvByObject(self, self.instance)
3115 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3118 def CheckPrereq(self):
3119 """Check prerequisites.
3121 This checks that the instance is in the cluster and is not running.
3124 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3125 assert instance is not None, \
3126 "Cannot retrieve locked instance %s" % self.op.instance_name
3127 _CheckNodeOnline(self, instance.primary_node)
3129 if instance.disk_template == constants.DT_DISKLESS:
3130 raise errors.OpPrereqError("Instance '%s' has no disks" %
3131 self.op.instance_name)
3132 if instance.admin_up:
3133 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3134 self.op.instance_name)
3135 remote_info = self.rpc.call_instance_info(instance.primary_node,
3137 instance.hypervisor)
3138 remote_info.Raise("Error checking node %s" % instance.primary_node,
3140 if remote_info.payload:
3141 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3142 (self.op.instance_name,
3143 instance.primary_node))
3145 self.op.os_type = getattr(self.op, "os_type", None)
3146 if self.op.os_type is not None:
3148 pnode = self.cfg.GetNodeInfo(
3149 self.cfg.ExpandNodeName(instance.primary_node))
3151 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3153 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3154 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3155 (self.op.os_type, pnode.name), prereq=True)
3157 self.instance = instance
3159 def Exec(self, feedback_fn):
3160 """Reinstall the instance.
3163 inst = self.instance
3165 if self.op.os_type is not None:
3166 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3167 inst.os = self.op.os_type
3168 self.cfg.Update(inst)
3170 _StartInstanceDisks(self, inst, None)
3172 feedback_fn("Running the instance OS create scripts...")
3173 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3174 result.Raise("Could not install OS for instance %s on node %s" %
3175 (inst.name, inst.primary_node))
3177 _ShutdownInstanceDisks(self, inst)
3180 class LURenameInstance(LogicalUnit):
3181 """Rename an instance.
3184 HPATH = "instance-rename"
3185 HTYPE = constants.HTYPE_INSTANCE
3186 _OP_REQP = ["instance_name", "new_name"]
3188 def BuildHooksEnv(self):
3191 This runs on master, primary and secondary nodes of the instance.
3194 env = _BuildInstanceHookEnvByObject(self, self.instance)
3195 env["INSTANCE_NEW_NAME"] = self.op.new_name
3196 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3199 def CheckPrereq(self):
3200 """Check prerequisites.
3202 This checks that the instance is in the cluster and is not running.
3205 instance = self.cfg.GetInstanceInfo(
3206 self.cfg.ExpandInstanceName(self.op.instance_name))
3207 if instance is None:
3208 raise errors.OpPrereqError("Instance '%s' not known" %
3209 self.op.instance_name)
3210 _CheckNodeOnline(self, instance.primary_node)
3212 if instance.admin_up:
3213 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3214 self.op.instance_name)
3215 remote_info = self.rpc.call_instance_info(instance.primary_node,
3217 instance.hypervisor)
3218 remote_info.Raise("Error checking node %s" % instance.primary_node,
3220 if remote_info.payload:
3221 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3222 (self.op.instance_name,
3223 instance.primary_node))
3224 self.instance = instance
3226 # new name verification
3227 name_info = utils.HostInfo(self.op.new_name)
3229 self.op.new_name = new_name = name_info.name
3230 instance_list = self.cfg.GetInstanceList()
3231 if new_name in instance_list:
3232 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3235 if not getattr(self.op, "ignore_ip", False):
3236 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3237 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3238 (name_info.ip, new_name))
3241 def Exec(self, feedback_fn):
3242 """Reinstall the instance.
3245 inst = self.instance
3246 old_name = inst.name
3248 if inst.disk_template == constants.DT_FILE:
3249 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3251 self.cfg.RenameInstance(inst.name, self.op.new_name)
3252 # Change the instance lock. This is definitely safe while we hold the BGL
3253 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3254 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3256 # re-read the instance from the configuration after rename
3257 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3259 if inst.disk_template == constants.DT_FILE:
3260 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3261 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3262 old_file_storage_dir,
3263 new_file_storage_dir)
3264 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3265 " (but the instance has been renamed in Ganeti)" %
3266 (inst.primary_node, old_file_storage_dir,
3267 new_file_storage_dir))
3269 _StartInstanceDisks(self, inst, None)
3271 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3273 msg = result.fail_msg
3275 msg = ("Could not run OS rename script for instance %s on node %s"
3276 " (but the instance has been renamed in Ganeti): %s" %
3277 (inst.name, inst.primary_node, msg))
3278 self.proc.LogWarning(msg)
3280 _ShutdownInstanceDisks(self, inst)
3283 class LURemoveInstance(LogicalUnit):
3284 """Remove an instance.
3287 HPATH = "instance-remove"
3288 HTYPE = constants.HTYPE_INSTANCE
3289 _OP_REQP = ["instance_name", "ignore_failures"]
3292 def ExpandNames(self):
3293 self._ExpandAndLockInstance()
3294 self.needed_locks[locking.LEVEL_NODE] = []
3295 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3297 def DeclareLocks(self, level):
3298 if level == locking.LEVEL_NODE:
3299 self._LockInstancesNodes()
3301 def BuildHooksEnv(self):
3304 This runs on master, primary and secondary nodes of the instance.
3307 env = _BuildInstanceHookEnvByObject(self, self.instance)
3308 nl = [self.cfg.GetMasterNode()]
3311 def CheckPrereq(self):
3312 """Check prerequisites.
3314 This checks that the instance is in the cluster.
3317 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3318 assert self.instance is not None, \
3319 "Cannot retrieve locked instance %s" % self.op.instance_name
3321 def Exec(self, feedback_fn):
3322 """Remove the instance.
3325 instance = self.instance
3326 logging.info("Shutting down instance %s on node %s",
3327 instance.name, instance.primary_node)
3329 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3330 msg = result.fail_msg
3332 if self.op.ignore_failures:
3333 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3335 raise errors.OpExecError("Could not shutdown instance %s on"
3337 (instance.name, instance.primary_node, msg))
3339 logging.info("Removing block devices for instance %s", instance.name)
3341 if not _RemoveDisks(self, instance):
3342 if self.op.ignore_failures:
3343 feedback_fn("Warning: can't remove instance's disks")
3345 raise errors.OpExecError("Can't remove instance's disks")
3347 logging.info("Removing instance %s out of cluster config", instance.name)
3349 self.cfg.RemoveInstance(instance.name)
3350 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3353 class LUQueryInstances(NoHooksLU):
3354 """Logical unit for querying instances.
3357 _OP_REQP = ["output_fields", "names", "use_locking"]
3359 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3361 "disk_template", "ip", "mac", "bridge",
3362 "sda_size", "sdb_size", "vcpus", "tags",
3363 "network_port", "beparams",
3364 r"(disk)\.(size)/([0-9]+)",
3365 r"(disk)\.(sizes)", "disk_usage",
3366 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3367 r"(nic)\.(macs|ips|bridges)",
3368 r"(disk|nic)\.(count)",
3369 "serial_no", "hypervisor", "hvparams",] +
3371 for name in constants.HVS_PARAMETERS] +
3373 for name in constants.BES_PARAMETERS])
3374 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3377 def ExpandNames(self):
3378 _CheckOutputFields(static=self._FIELDS_STATIC,
3379 dynamic=self._FIELDS_DYNAMIC,
3380 selected=self.op.output_fields)
3382 self.needed_locks = {}
3383 self.share_locks[locking.LEVEL_INSTANCE] = 1
3384 self.share_locks[locking.LEVEL_NODE] = 1
3387 self.wanted = _GetWantedInstances(self, self.op.names)
3389 self.wanted = locking.ALL_SET
3391 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3392 self.do_locking = self.do_node_query and self.op.use_locking
3394 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3395 self.needed_locks[locking.LEVEL_NODE] = []
3396 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3398 def DeclareLocks(self, level):
3399 if level == locking.LEVEL_NODE and self.do_locking:
3400 self._LockInstancesNodes()
3402 def CheckPrereq(self):
3403 """Check prerequisites.
3408 def Exec(self, feedback_fn):
3409 """Computes the list of nodes and their attributes.
3412 all_info = self.cfg.GetAllInstancesInfo()
3413 if self.wanted == locking.ALL_SET:
3414 # caller didn't specify instance names, so ordering is not important
3416 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3418 instance_names = all_info.keys()
3419 instance_names = utils.NiceSort(instance_names)
3421 # caller did specify names, so we must keep the ordering
3423 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3425 tgt_set = all_info.keys()
3426 missing = set(self.wanted).difference(tgt_set)
3428 raise errors.OpExecError("Some instances were removed before"
3429 " retrieving their data: %s" % missing)
3430 instance_names = self.wanted
3432 instance_list = [all_info[iname] for iname in instance_names]
3434 # begin data gathering
3436 nodes = frozenset([inst.primary_node for inst in instance_list])
3437 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3441 if self.do_node_query:
3443 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3445 result = node_data[name]
3447 # offline nodes will be in both lists
3448 off_nodes.append(name)
3449 if result.failed or result.fail_msg:
3450 bad_nodes.append(name)
3453 live_data.update(result.payload)
3454 # else no instance is alive
3456 live_data = dict([(name, {}) for name in instance_names])
3458 # end data gathering
3463 for instance in instance_list:
3465 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3466 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3467 for field in self.op.output_fields:
3468 st_match = self._FIELDS_STATIC.Matches(field)
3473 elif field == "pnode":
3474 val = instance.primary_node
3475 elif field == "snodes":
3476 val = list(instance.secondary_nodes)
3477 elif field == "admin_state":
3478 val = instance.admin_up
3479 elif field == "oper_state":
3480 if instance.primary_node in bad_nodes:
3483 val = bool(live_data.get(instance.name))
3484 elif field == "status":
3485 if instance.primary_node in off_nodes:
3486 val = "ERROR_nodeoffline"
3487 elif instance.primary_node in bad_nodes:
3488 val = "ERROR_nodedown"
3490 running = bool(live_data.get(instance.name))
3492 if instance.admin_up:
3497 if instance.admin_up:
3501 elif field == "oper_ram":
3502 if instance.primary_node in bad_nodes:
3504 elif instance.name in live_data:
3505 val = live_data[instance.name].get("memory", "?")
3508 elif field == "disk_template":
3509 val = instance.disk_template
3512 val = instance.nics[0].ip
3515 elif field == "bridge":
3517 val = instance.nics[0].bridge
3520 elif field == "mac":
3522 val = instance.nics[0].mac
3525 elif field == "sda_size" or field == "sdb_size":
3526 idx = ord(field[2]) - ord('a')
3528 val = instance.FindDisk(idx).size
3529 except errors.OpPrereqError:
3531 elif field == "disk_usage": # total disk usage per node
3532 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3533 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3534 elif field == "tags":
3535 val = list(instance.GetTags())
3536 elif field == "serial_no":
3537 val = instance.serial_no
3538 elif field == "network_port":
3539 val = instance.network_port
3540 elif field == "hypervisor":
3541 val = instance.hypervisor
3542 elif field == "hvparams":
3544 elif (field.startswith(HVPREFIX) and
3545 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3546 val = i_hv.get(field[len(HVPREFIX):], None)
3547 elif field == "beparams":
3549 elif (field.startswith(BEPREFIX) and
3550 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3551 val = i_be.get(field[len(BEPREFIX):], None)
3552 elif st_match and st_match.groups():
3553 # matches a variable list
3554 st_groups = st_match.groups()
3555 if st_groups and st_groups[0] == "disk":
3556 if st_groups[1] == "count":
3557 val = len(instance.disks)
3558 elif st_groups[1] == "sizes":
3559 val = [disk.size for disk in instance.disks]
3560 elif st_groups[1] == "size":
3562 val = instance.FindDisk(st_groups[2]).size
3563 except errors.OpPrereqError:
3566 assert False, "Unhandled disk parameter"
3567 elif st_groups[0] == "nic":
3568 if st_groups[1] == "count":
3569 val = len(instance.nics)
3570 elif st_groups[1] == "macs":
3571 val = [nic.mac for nic in instance.nics]
3572 elif st_groups[1] == "ips":
3573 val = [nic.ip for nic in instance.nics]
3574 elif st_groups[1] == "bridges":
3575 val = [nic.bridge for nic in instance.nics]
3578 nic_idx = int(st_groups[2])
3579 if nic_idx >= len(instance.nics):
3582 if st_groups[1] == "mac":
3583 val = instance.nics[nic_idx].mac
3584 elif st_groups[1] == "ip":
3585 val = instance.nics[nic_idx].ip
3586 elif st_groups[1] == "bridge":
3587 val = instance.nics[nic_idx].bridge
3589 assert False, "Unhandled NIC parameter"
3591 assert False, "Unhandled variable parameter"
3593 raise errors.ParameterError(field)
3600 class LUFailoverInstance(LogicalUnit):
3601 """Failover an instance.
3604 HPATH = "instance-failover"
3605 HTYPE = constants.HTYPE_INSTANCE
3606 _OP_REQP = ["instance_name", "ignore_consistency"]
3609 def ExpandNames(self):
3610 self._ExpandAndLockInstance()
3611 self.needed_locks[locking.LEVEL_NODE] = []
3612 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3614 def DeclareLocks(self, level):
3615 if level == locking.LEVEL_NODE:
3616 self._LockInstancesNodes()
3618 def BuildHooksEnv(self):
3621 This runs on master, primary and secondary nodes of the instance.
3625 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3627 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3628 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3631 def CheckPrereq(self):
3632 """Check prerequisites.
3634 This checks that the instance is in the cluster.
3637 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3638 assert self.instance is not None, \
3639 "Cannot retrieve locked instance %s" % self.op.instance_name
3641 bep = self.cfg.GetClusterInfo().FillBE(instance)
3642 if instance.disk_template not in constants.DTS_NET_MIRROR:
3643 raise errors.OpPrereqError("Instance's disk layout is not"
3644 " network mirrored, cannot failover.")
3646 secondary_nodes = instance.secondary_nodes
3647 if not secondary_nodes:
3648 raise errors.ProgrammerError("no secondary node but using "
3649 "a mirrored disk template")
3651 target_node = secondary_nodes[0]
3652 _CheckNodeOnline(self, target_node)
3653 _CheckNodeNotDrained(self, target_node)
3654 if instance.admin_up:
3655 # check memory requirements on the secondary node
3656 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3657 instance.name, bep[constants.BE_MEMORY],
3658 instance.hypervisor)
3660 self.LogInfo("Not checking memory on the secondary node as"
3661 " instance will not be started")
3663 # check bridge existance
3664 _CheckInstanceBridgesExist(self, instance, node=target_node)
3666 def Exec(self, feedback_fn):
3667 """Failover an instance.
3669 The failover is done by shutting it down on its present node and
3670 starting it on the secondary.
3673 instance = self.instance
3675 source_node = instance.primary_node
3676 target_node = instance.secondary_nodes[0]
3678 feedback_fn("* checking disk consistency between source and target")
3679 for dev in instance.disks:
3680 # for drbd, these are drbd over lvm
3681 if not _CheckDiskConsistency(self, dev, target_node, False):
3682 if instance.admin_up and not self.op.ignore_consistency:
3683 raise errors.OpExecError("Disk %s is degraded on target node,"
3684 " aborting failover." % dev.iv_name)
3686 feedback_fn("* shutting down instance on source node")
3687 logging.info("Shutting down instance %s on node %s",
3688 instance.name, source_node)
3690 result = self.rpc.call_instance_shutdown(source_node, instance)
3691 msg = result.fail_msg
3693 if self.op.ignore_consistency:
3694 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3695 " Proceeding anyway. Please make sure node"
3696 " %s is down. Error details: %s",
3697 instance.name, source_node, source_node, msg)
3699 raise errors.OpExecError("Could not shutdown instance %s on"
3701 (instance.name, source_node, msg))
3703 feedback_fn("* deactivating the instance's disks on source node")
3704 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3705 raise errors.OpExecError("Can't shut down the instance's disks.")
3707 instance.primary_node = target_node
3708 # distribute new instance config to the other nodes
3709 self.cfg.Update(instance)
3711 # Only start the instance if it's marked as up
3712 if instance.admin_up:
3713 feedback_fn("* activating the instance's disks on target node")
3714 logging.info("Starting instance %s on node %s",
3715 instance.name, target_node)
3717 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3718 ignore_secondaries=True)
3720 _ShutdownInstanceDisks(self, instance)
3721 raise errors.OpExecError("Can't activate the instance's disks")
3723 feedback_fn("* starting the instance on the target node")
3724 result = self.rpc.call_instance_start(target_node, instance, None, None)
3725 msg = result.fail_msg
3727 _ShutdownInstanceDisks(self, instance)
3728 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3729 (instance.name, target_node, msg))
3732 class LUMigrateInstance(LogicalUnit):
3733 """Migrate an instance.
3735 This is migration without shutting down, compared to the failover,
3736 which is done with shutdown.
3739 HPATH = "instance-migrate"
3740 HTYPE = constants.HTYPE_INSTANCE
3741 _OP_REQP = ["instance_name", "live", "cleanup"]
3745 def ExpandNames(self):
3746 self._ExpandAndLockInstance()
3747 self.needed_locks[locking.LEVEL_NODE] = []
3748 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3750 def DeclareLocks(self, level):
3751 if level == locking.LEVEL_NODE:
3752 self._LockInstancesNodes()
3754 def BuildHooksEnv(self):
3757 This runs on master, primary and secondary nodes of the instance.
3760 env = _BuildInstanceHookEnvByObject(self, self.instance)
3761 env["MIGRATE_LIVE"] = self.op.live
3762 env["MIGRATE_CLEANUP"] = self.op.cleanup
3763 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3766 def CheckPrereq(self):
3767 """Check prerequisites.
3769 This checks that the instance is in the cluster.
3772 instance = self.cfg.GetInstanceInfo(
3773 self.cfg.ExpandInstanceName(self.op.instance_name))
3774 if instance is None:
3775 raise errors.OpPrereqError("Instance '%s' not known" %
3776 self.op.instance_name)
3778 if instance.disk_template != constants.DT_DRBD8:
3779 raise errors.OpPrereqError("Instance's disk layout is not"
3780 " drbd8, cannot migrate.")
3782 secondary_nodes = instance.secondary_nodes
3783 if not secondary_nodes:
3784 raise errors.ConfigurationError("No secondary node but using"
3785 " drbd8 disk template")
3787 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3789 target_node = secondary_nodes[0]
3790 # check memory requirements on the secondary node
3791 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3792 instance.name, i_be[constants.BE_MEMORY],
3793 instance.hypervisor)
3795 # check bridge existance
3796 _CheckInstanceBridgesExist(self, instance, node=target_node)
3798 if not self.op.cleanup:
3799 _CheckNodeNotDrained(self, target_node)
3800 result = self.rpc.call_instance_migratable(instance.primary_node,
3802 result.Raise("Can't migrate, please use failover", prereq=True)
3804 self.instance = instance
3806 def _WaitUntilSync(self):
3807 """Poll with custom rpc for disk sync.
3809 This uses our own step-based rpc call.
3812 self.feedback_fn("* wait until resync is done")
3816 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3818 self.instance.disks)
3820 for node, nres in result.items():
3821 nres.Raise("Cannot resync disks on node %s" % node)
3822 node_done, node_percent = nres.payload
3823 all_done = all_done and node_done
3824 if node_percent is not None:
3825 min_percent = min(min_percent, node_percent)
3827 if min_percent < 100:
3828 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3831 def _EnsureSecondary(self, node):
3832 """Demote a node to secondary.
3835 self.feedback_fn("* switching node %s to secondary mode" % node)
3837 for dev in self.instance.disks:
3838 self.cfg.SetDiskID(dev, node)
3840 result = self.rpc.call_blockdev_close(node, self.instance.name,
3841 self.instance.disks)
3842 result.Raise("Cannot change disk to secondary on node %s" % node)
3844 def _GoStandalone(self):
3845 """Disconnect from the network.
3848 self.feedback_fn("* changing into standalone mode")
3849 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3850 self.instance.disks)
3851 for node, nres in result.items():
3852 nres.Raise("Cannot disconnect disks node %s" % node)
3854 def _GoReconnect(self, multimaster):
3855 """Reconnect to the network.
3861 msg = "single-master"
3862 self.feedback_fn("* changing disks into %s mode" % msg)
3863 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3864 self.instance.disks,
3865 self.instance.name, multimaster)
3866 for node, nres in result.items():
3867 nres.Raise("Cannot change disks config on node %s" % node)
3869 def _ExecCleanup(self):
3870 """Try to cleanup after a failed migration.
3872 The cleanup is done by:
3873 - check that the instance is running only on one node
3874 (and update the config if needed)
3875 - change disks on its secondary node to secondary
3876 - wait until disks are fully synchronized
3877 - disconnect from the network
3878 - change disks into single-master mode
3879 - wait again until disks are fully synchronized
3882 instance = self.instance
3883 target_node = self.target_node
3884 source_node = self.source_node
3886 # check running on only one node
3887 self.feedback_fn("* checking where the instance actually runs"
3888 " (if this hangs, the hypervisor might be in"
3890 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3891 for node, result in ins_l.items():
3892 result.Raise("Can't contact node %s" % node)
3894 runningon_source = instance.name in ins_l[source_node].payload
3895 runningon_target = instance.name in ins_l[target_node].payload
3897 if runningon_source and runningon_target:
3898 raise errors.OpExecError("Instance seems to be running on two nodes,"
3899 " or the hypervisor is confused. You will have"
3900 " to ensure manually that it runs only on one"
3901 " and restart this operation.")
3903 if not (runningon_source or runningon_target):
3904 raise errors.OpExecError("Instance does not seem to be running at all."
3905 " In this case, it's safer to repair by"
3906 " running 'gnt-instance stop' to ensure disk"
3907 " shutdown, and then restarting it.")
3909 if runningon_target:
3910 # the migration has actually succeeded, we need to update the config
3911 self.feedback_fn("* instance running on secondary node (%s),"
3912 " updating config" % target_node)
3913 instance.primary_node = target_node
3914 self.cfg.Update(instance)
3915 demoted_node = source_node
3917 self.feedback_fn("* instance confirmed to be running on its"
3918 " primary node (%s)" % source_node)
3919 demoted_node = target_node
3921 self._EnsureSecondary(demoted_node)
3923 self._WaitUntilSync()
3924 except errors.OpExecError:
3925 # we ignore here errors, since if the device is standalone, it
3926 # won't be able to sync
3928 self._GoStandalone()
3929 self._GoReconnect(False)
3930 self._WaitUntilSync()
3932 self.feedback_fn("* done")
3934 def _RevertDiskStatus(self):
3935 """Try to revert the disk status after a failed migration.
3938 target_node = self.target_node
3940 self._EnsureSecondary(target_node)
3941 self._GoStandalone()
3942 self._GoReconnect(False)
3943 self._WaitUntilSync()
3944 except errors.OpExecError, err:
3945 self.LogWarning("Migration failed and I can't reconnect the"
3946 " drives: error '%s'\n"
3947 "Please look and recover the instance status" %
3950 def _AbortMigration(self):
3951 """Call the hypervisor code to abort a started migration.
3954 instance = self.instance
3955 target_node = self.target_node
3956 migration_info = self.migration_info
3958 abort_result = self.rpc.call_finalize_migration(target_node,
3962 abort_msg = abort_result.fail_msg
3964 logging.error("Aborting migration failed on target node %s: %s" %
3965 (target_node, abort_msg))
3966 # Don't raise an exception here, as we stil have to try to revert the
3967 # disk status, even if this step failed.
3969 def _ExecMigration(self):
3970 """Migrate an instance.
3972 The migrate is done by:
3973 - change the disks into dual-master mode
3974 - wait until disks are fully synchronized again
3975 - migrate the instance
3976 - change disks on the new secondary node (the old primary) to secondary
3977 - wait until disks are fully synchronized
3978 - change disks into single-master mode
3981 instance = self.instance
3982 target_node = self.target_node
3983 source_node = self.source_node
3985 self.feedback_fn("* checking disk consistency between source and target")
3986 for dev in instance.disks:
3987 if not _CheckDiskConsistency(self, dev, target_node, False):
3988 raise errors.OpExecError("Disk %s is degraded or not fully"
3989 " synchronized on target node,"
3990 " aborting migrate." % dev.iv_name)
3992 # First get the migration information from the remote node
3993 result = self.rpc.call_migration_info(source_node, instance)
3994 msg = result.fail_msg
3996 log_err = ("Failed fetching source migration information from %s: %s" %
3998 logging.error(log_err)
3999 raise errors.OpExecError(log_err)
4001 self.migration_info = migration_info = result.payload
4003 # Then switch the disks to master/master mode
4004 self._EnsureSecondary(target_node)
4005 self._GoStandalone()
4006 self._GoReconnect(True)
4007 self._WaitUntilSync()
4009 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4010 result = self.rpc.call_accept_instance(target_node,
4013 self.nodes_ip[target_node])
4015 msg = result.fail_msg
4017 logging.error("Instance pre-migration failed, trying to revert"
4018 " disk status: %s", msg)
4019 self._AbortMigration()
4020 self._RevertDiskStatus()
4021 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4022 (instance.name, msg))
4024 self.feedback_fn("* migrating instance to %s" % target_node)
4026 result = self.rpc.call_instance_migrate(source_node, instance,
4027 self.nodes_ip[target_node],
4029 msg = result.fail_msg
4031 logging.error("Instance migration failed, trying to revert"
4032 " disk status: %s", msg)
4033 self._AbortMigration()
4034 self._RevertDiskStatus()
4035 raise errors.OpExecError("Could not migrate instance %s: %s" %
4036 (instance.name, msg))
4039 instance.primary_node = target_node
4040 # distribute new instance config to the other nodes
4041 self.cfg.Update(instance)
4043 result = self.rpc.call_finalize_migration(target_node,
4047 msg = result.fail_msg
4049 logging.error("Instance migration succeeded, but finalization failed:"
4051 raise errors.OpExecError("Could not finalize instance migration: %s" %
4054 self._EnsureSecondary(source_node)
4055 self._WaitUntilSync()
4056 self._GoStandalone()
4057 self._GoReconnect(False)
4058 self._WaitUntilSync()
4060 self.feedback_fn("* done")
4062 def Exec(self, feedback_fn):
4063 """Perform the migration.
4066 self.feedback_fn = feedback_fn
4068 self.source_node = self.instance.primary_node
4069 self.target_node = self.instance.secondary_nodes[0]
4070 self.all_nodes = [self.source_node, self.target_node]
4072 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4073 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4076 return self._ExecCleanup()
4078 return self._ExecMigration()
4081 def _CreateBlockDev(lu, node, instance, device, force_create,
4083 """Create a tree of block devices on a given node.
4085 If this device type has to be created on secondaries, create it and
4088 If not, just recurse to children keeping the same 'force' value.
4090 @param lu: the lu on whose behalf we execute
4091 @param node: the node on which to create the device
4092 @type instance: L{objects.Instance}
4093 @param instance: the instance which owns the device
4094 @type device: L{objects.Disk}
4095 @param device: the device to create
4096 @type force_create: boolean
4097 @param force_create: whether to force creation of this device; this
4098 will be change to True whenever we find a device which has
4099 CreateOnSecondary() attribute
4100 @param info: the extra 'metadata' we should attach to the device
4101 (this will be represented as a LVM tag)
4102 @type force_open: boolean
4103 @param force_open: this parameter will be passes to the
4104 L{backend.BlockdevCreate} function where it specifies
4105 whether we run on primary or not, and it affects both
4106 the child assembly and the device own Open() execution
4109 if device.CreateOnSecondary():
4113 for child in device.children:
4114 _CreateBlockDev(lu, node, instance, child, force_create,
4117 if not force_create:
4120 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4123 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4124 """Create a single block device on a given node.
4126 This will not recurse over children of the device, so they must be
4129 @param lu: the lu on whose behalf we execute
4130 @param node: the node on which to create the device
4131 @type instance: L{objects.Instance}
4132 @param instance: the instance which owns the device
4133 @type device: L{objects.Disk}
4134 @param device: the device to create
4135 @param info: the extra 'metadata' we should attach to the device
4136 (this will be represented as a LVM tag)
4137 @type force_open: boolean
4138 @param force_open: this parameter will be passes to the
4139 L{backend.BlockdevCreate} function where it specifies
4140 whether we run on primary or not, and it affects both
4141 the child assembly and the device own Open() execution
4144 lu.cfg.SetDiskID(device, node)
4145 result = lu.rpc.call_blockdev_create(node, device, device.size,
4146 instance.name, force_open, info)
4147 result.Raise("Can't create block device %s on"
4148 " node %s for instance %s" % (device, node, instance.name))
4149 if device.physical_id is None:
4150 device.physical_id = result.payload
4153 def _GenerateUniqueNames(lu, exts):
4154 """Generate a suitable LV name.
4156 This will generate a logical volume name for the given instance.
4161 new_id = lu.cfg.GenerateUniqueID()
4162 results.append("%s%s" % (new_id, val))
4166 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4168 """Generate a drbd8 device complete with its children.
4171 port = lu.cfg.AllocatePort()
4172 vgname = lu.cfg.GetVGName()
4173 shared_secret = lu.cfg.GenerateDRBDSecret()
4174 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4175 logical_id=(vgname, names[0]))
4176 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4177 logical_id=(vgname, names[1]))
4178 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4179 logical_id=(primary, secondary, port,
4182 children=[dev_data, dev_meta],
4187 def _GenerateDiskTemplate(lu, template_name,
4188 instance_name, primary_node,
4189 secondary_nodes, disk_info,
4190 file_storage_dir, file_driver,
4192 """Generate the entire disk layout for a given template type.
4195 #TODO: compute space requirements
4197 vgname = lu.cfg.GetVGName()
4198 disk_count = len(disk_info)
4200 if template_name == constants.DT_DISKLESS:
4202 elif template_name == constants.DT_PLAIN:
4203 if len(secondary_nodes) != 0:
4204 raise errors.ProgrammerError("Wrong template configuration")
4206 names = _GenerateUniqueNames(lu, [".disk%d" % i
4207 for i in range(disk_count)])
4208 for idx, disk in enumerate(disk_info):
4209 disk_index = idx + base_index
4210 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4211 logical_id=(vgname, names[idx]),
4212 iv_name="disk/%d" % disk_index,
4214 disks.append(disk_dev)
4215 elif template_name == constants.DT_DRBD8:
4216 if len(secondary_nodes) != 1:
4217 raise errors.ProgrammerError("Wrong template configuration")
4218 remote_node = secondary_nodes[0]
4219 minors = lu.cfg.AllocateDRBDMinor(
4220 [primary_node, remote_node] * len(disk_info), instance_name)
4223 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4224 for i in range(disk_count)]):
4225 names.append(lv_prefix + "_data")
4226 names.append(lv_prefix + "_meta")
4227 for idx, disk in enumerate(disk_info):
4228 disk_index = idx + base_index
4229 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4230 disk["size"], names[idx*2:idx*2+2],
4231 "disk/%d" % disk_index,
4232 minors[idx*2], minors[idx*2+1])
4233 disk_dev.mode = disk["mode"]
4234 disks.append(disk_dev)
4235 elif template_name == constants.DT_FILE:
4236 if len(secondary_nodes) != 0:
4237 raise errors.ProgrammerError("Wrong template configuration")
4239 for idx, disk in enumerate(disk_info):
4240 disk_index = idx + base_index
4241 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4242 iv_name="disk/%d" % disk_index,
4243 logical_id=(file_driver,
4244 "%s/disk%d" % (file_storage_dir,
4247 disks.append(disk_dev)
4249 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4253 def _GetInstanceInfoText(instance):
4254 """Compute that text that should be added to the disk's metadata.
4257 return "originstname+%s" % instance.name
4260 def _CreateDisks(lu, instance):
4261 """Create all disks for an instance.
4263 This abstracts away some work from AddInstance.
4265 @type lu: L{LogicalUnit}
4266 @param lu: the logical unit on whose behalf we execute
4267 @type instance: L{objects.Instance}
4268 @param instance: the instance whose disks we should create
4270 @return: the success of the creation
4273 info = _GetInstanceInfoText(instance)
4274 pnode = instance.primary_node
4276 if instance.disk_template == constants.DT_FILE:
4277 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4278 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4280 result.Raise("Failed to create directory '%s' on"
4281 " node %s: %s" % (file_storage_dir, pnode))
4283 # Note: this needs to be kept in sync with adding of disks in
4284 # LUSetInstanceParams
4285 for device in instance.disks:
4286 logging.info("Creating volume %s for instance %s",
4287 device.iv_name, instance.name)
4289 for node in instance.all_nodes:
4290 f_create = node == pnode
4291 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4294 def _RemoveDisks(lu, instance):
4295 """Remove all disks for an instance.
4297 This abstracts away some work from `AddInstance()` and
4298 `RemoveInstance()`. Note that in case some of the devices couldn't
4299 be removed, the removal will continue with the other ones (compare
4300 with `_CreateDisks()`).
4302 @type lu: L{LogicalUnit}
4303 @param lu: the logical unit on whose behalf we execute
4304 @type instance: L{objects.Instance}
4305 @param instance: the instance whose disks we should remove
4307 @return: the success of the removal
4310 logging.info("Removing block devices for instance %s", instance.name)
4313 for device in instance.disks:
4314 for node, disk in device.ComputeNodeTree(instance.primary_node):
4315 lu.cfg.SetDiskID(disk, node)
4316 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4318 lu.LogWarning("Could not remove block device %s on node %s,"
4319 " continuing anyway: %s", device.iv_name, node, msg)
4322 if instance.disk_template == constants.DT_FILE:
4323 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4324 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4326 msg = result.fail_msg
4328 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4329 file_storage_dir, instance.primary_node, msg)
4335 def _ComputeDiskSize(disk_template, disks):
4336 """Compute disk size requirements in the volume group
4339 # Required free disk space as a function of disk and swap space
4341 constants.DT_DISKLESS: None,
4342 constants.DT_PLAIN: sum(d["size"] for d in disks),
4343 # 128 MB are added for drbd metadata for each disk
4344 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4345 constants.DT_FILE: None,
4348 if disk_template not in req_size_dict:
4349 raise errors.ProgrammerError("Disk template '%s' size requirement"
4350 " is unknown" % disk_template)
4352 return req_size_dict[disk_template]
4355 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4356 """Hypervisor parameter validation.
4358 This function abstract the hypervisor parameter validation to be
4359 used in both instance create and instance modify.
4361 @type lu: L{LogicalUnit}
4362 @param lu: the logical unit for which we check
4363 @type nodenames: list
4364 @param nodenames: the list of nodes on which we should check
4365 @type hvname: string
4366 @param hvname: the name of the hypervisor we should use
4367 @type hvparams: dict
4368 @param hvparams: the parameters which we need to check
4369 @raise errors.OpPrereqError: if the parameters are not valid
4372 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4375 for node in nodenames:
4379 info.Raise("Hypervisor parameter validation failed on node %s" % node)
4382 class LUCreateInstance(LogicalUnit):
4383 """Create an instance.
4386 HPATH = "instance-add"
4387 HTYPE = constants.HTYPE_INSTANCE
4388 _OP_REQP = ["instance_name", "disks", "disk_template",
4390 "wait_for_sync", "ip_check", "nics",
4391 "hvparams", "beparams"]
4394 def _ExpandNode(self, node):
4395 """Expands and checks one node name.
4398 node_full = self.cfg.ExpandNodeName(node)
4399 if node_full is None:
4400 raise errors.OpPrereqError("Unknown node %s" % node)
4403 def ExpandNames(self):
4404 """ExpandNames for CreateInstance.
4406 Figure out the right locks for instance creation.
4409 self.needed_locks = {}
4411 # set optional parameters to none if they don't exist
4412 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4413 if not hasattr(self.op, attr):
4414 setattr(self.op, attr, None)
4416 # cheap checks, mostly valid constants given
4418 # verify creation mode
4419 if self.op.mode not in (constants.INSTANCE_CREATE,
4420 constants.INSTANCE_IMPORT):
4421 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4424 # disk template and mirror node verification
4425 if self.op.disk_template not in constants.DISK_TEMPLATES:
4426 raise errors.OpPrereqError("Invalid disk template name")
4428 if self.op.hypervisor is None:
4429 self.op.hypervisor = self.cfg.GetHypervisorType()
4431 cluster = self.cfg.GetClusterInfo()
4432 enabled_hvs = cluster.enabled_hypervisors
4433 if self.op.hypervisor not in enabled_hvs:
4434 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4435 " cluster (%s)" % (self.op.hypervisor,
4436 ",".join(enabled_hvs)))
4438 # check hypervisor parameter syntax (locally)
4439 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4440 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4442 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4443 hv_type.CheckParameterSyntax(filled_hvp)
4444 self.hv_full = filled_hvp
4446 # fill and remember the beparams dict
4447 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4448 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4451 #### instance parameters check
4453 # instance name verification
4454 hostname1 = utils.HostInfo(self.op.instance_name)
4455 self.op.instance_name = instance_name = hostname1.name
4457 # this is just a preventive check, but someone might still add this
4458 # instance in the meantime, and creation will fail at lock-add time
4459 if instance_name in self.cfg.GetInstanceList():
4460 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4463 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4467 for idx, nic in enumerate(self.op.nics):
4468 nic_mode_req = nic.get("mode", None)
4469 nic_mode = nic_mode_req
4470 if nic_mode is None:
4471 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4473 # in routed mode, for the first nic, the default ip is 'auto'
4474 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4475 default_ip_mode = constants.VALUE_AUTO
4477 default_ip_mode = constants.VALUE_NONE
4479 # ip validity checks
4480 ip = nic.get("ip", default_ip_mode)
4481 if ip is None or ip.lower() == constants.VALUE_NONE:
4483 elif ip.lower() == constants.VALUE_AUTO:
4484 nic_ip = hostname1.ip
4486 if not utils.IsValidIP(ip):
4487 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4488 " like a valid IP" % ip)
4491 # TODO: check the ip for uniqueness !!
4492 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4493 raise errors.OpPrereqError("Routed nic mode requires an ip address")
4495 # MAC address verification
4496 mac = nic.get("mac", constants.VALUE_AUTO)
4497 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4498 if not utils.IsValidMac(mac.lower()):
4499 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4501 # bridge verification
4502 bridge = nic.get("bridge", None)
4503 link = nic.get("link", None)
4505 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4506 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4507 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4513 nicparams[constants.NIC_MODE] = nic_mode_req
4515 nicparams[constants.NIC_LINK] = link
4517 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4519 objects.NIC.CheckParameterSyntax(check_params)
4520 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4522 # disk checks/pre-build
4524 for disk in self.op.disks:
4525 mode = disk.get("mode", constants.DISK_RDWR)
4526 if mode not in constants.DISK_ACCESS_SET:
4527 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4529 size = disk.get("size", None)
4531 raise errors.OpPrereqError("Missing disk size")
4535 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4536 self.disks.append({"size": size, "mode": mode})
4538 # used in CheckPrereq for ip ping check
4539 self.check_ip = hostname1.ip
4541 # file storage checks
4542 if (self.op.file_driver and
4543 not self.op.file_driver in constants.FILE_DRIVER):
4544 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4545 self.op.file_driver)
4547 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4548 raise errors.OpPrereqError("File storage directory path not absolute")
4550 ### Node/iallocator related checks
4551 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4552 raise errors.OpPrereqError("One and only one of iallocator and primary"
4553 " node must be given")
4555 if self.op.iallocator:
4556 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4558 self.op.pnode = self._ExpandNode(self.op.pnode)
4559 nodelist = [self.op.pnode]
4560 if self.op.snode is not None:
4561 self.op.snode = self._ExpandNode(self.op.snode)
4562 nodelist.append(self.op.snode)
4563 self.needed_locks[locking.LEVEL_NODE] = nodelist
4565 # in case of import lock the source node too
4566 if self.op.mode == constants.INSTANCE_IMPORT:
4567 src_node = getattr(self.op, "src_node", None)
4568 src_path = getattr(self.op, "src_path", None)
4570 if src_path is None:
4571 self.op.src_path = src_path = self.op.instance_name
4573 if src_node is None:
4574 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4575 self.op.src_node = None
4576 if os.path.isabs(src_path):
4577 raise errors.OpPrereqError("Importing an instance from an absolute"
4578 " path requires a source node option.")
4580 self.op.src_node = src_node = self._ExpandNode(src_node)
4581 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4582 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4583 if not os.path.isabs(src_path):
4584 self.op.src_path = src_path = \
4585 os.path.join(constants.EXPORT_DIR, src_path)
4587 else: # INSTANCE_CREATE
4588 if getattr(self.op, "os_type", None) is None:
4589 raise errors.OpPrereqError("No guest OS specified")
4591 def _RunAllocator(self):
4592 """Run the allocator based on input opcode.
4595 nics = [n.ToDict() for n in self.nics]
4596 ial = IAllocator(self,
4597 mode=constants.IALLOCATOR_MODE_ALLOC,
4598 name=self.op.instance_name,
4599 disk_template=self.op.disk_template,
4602 vcpus=self.be_full[constants.BE_VCPUS],
4603 mem_size=self.be_full[constants.BE_MEMORY],
4606 hypervisor=self.op.hypervisor,
4609 ial.Run(self.op.iallocator)
4612 raise errors.OpPrereqError("Can't compute nodes using"
4613 " iallocator '%s': %s" % (self.op.iallocator,
4615 if len(ial.nodes) != ial.required_nodes:
4616 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4617 " of nodes (%s), required %s" %
4618 (self.op.iallocator, len(ial.nodes),
4619 ial.required_nodes))
4620 self.op.pnode = ial.nodes[0]
4621 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4622 self.op.instance_name, self.op.iallocator,
4623 ", ".join(ial.nodes))
4624 if ial.required_nodes == 2:
4625 self.op.snode = ial.nodes[1]
4627 def BuildHooksEnv(self):
4630 This runs on master, primary and secondary nodes of the instance.
4634 "ADD_MODE": self.op.mode,
4636 if self.op.mode == constants.INSTANCE_IMPORT:
4637 env["SRC_NODE"] = self.op.src_node
4638 env["SRC_PATH"] = self.op.src_path
4639 env["SRC_IMAGES"] = self.src_images
4641 env.update(_BuildInstanceHookEnv(
4642 name=self.op.instance_name,
4643 primary_node=self.op.pnode,
4644 secondary_nodes=self.secondaries,
4645 status=self.op.start,
4646 os_type=self.op.os_type,
4647 memory=self.be_full[constants.BE_MEMORY],
4648 vcpus=self.be_full[constants.BE_VCPUS],
4649 nics=_PreBuildNICHooksList(self, self.nics),
4650 disk_template=self.op.disk_template,
4651 disks=[(d["size"], d["mode"]) for d in self.disks],
4654 hypervisor=self.op.hypervisor,
4657 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4662 def CheckPrereq(self):
4663 """Check prerequisites.
4666 if (not self.cfg.GetVGName() and
4667 self.op.disk_template not in constants.DTS_NOT_LVM):
4668 raise errors.OpPrereqError("Cluster does not support lvm-based"
4671 if self.op.mode == constants.INSTANCE_IMPORT:
4672 src_node = self.op.src_node
4673 src_path = self.op.src_path
4675 if src_node is None:
4676 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4677 exp_list = self.rpc.call_export_list(locked_nodes)
4679 for node in exp_list:
4680 if exp_list[node].fail_msg:
4682 if src_path in exp_list[node].payload:
4684 self.op.src_node = src_node = node
4685 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4689 raise errors.OpPrereqError("No export found for relative path %s" %
4692 _CheckNodeOnline(self, src_node)
4693 result = self.rpc.call_export_info(src_node, src_path)
4694 result.Raise("No export or invalid export found in dir %s" % src_path)
4696 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4697 if not export_info.has_section(constants.INISECT_EXP):
4698 raise errors.ProgrammerError("Corrupted export config")
4700 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4701 if (int(ei_version) != constants.EXPORT_VERSION):
4702 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4703 (ei_version, constants.EXPORT_VERSION))
4705 # Check that the new instance doesn't have less disks than the export
4706 instance_disks = len(self.disks)
4707 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4708 if instance_disks < export_disks:
4709 raise errors.OpPrereqError("Not enough disks to import."
4710 " (instance: %d, export: %d)" %
4711 (instance_disks, export_disks))
4713 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4715 for idx in range(export_disks):
4716 option = 'disk%d_dump' % idx
4717 if export_info.has_option(constants.INISECT_INS, option):
4718 # FIXME: are the old os-es, disk sizes, etc. useful?
4719 export_name = export_info.get(constants.INISECT_INS, option)
4720 image = os.path.join(src_path, export_name)
4721 disk_images.append(image)
4723 disk_images.append(False)
4725 self.src_images = disk_images
4727 old_name = export_info.get(constants.INISECT_INS, 'name')
4728 # FIXME: int() here could throw a ValueError on broken exports
4729 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4730 if self.op.instance_name == old_name:
4731 for idx, nic in enumerate(self.nics):
4732 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4733 nic_mac_ini = 'nic%d_mac' % idx
4734 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4736 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4737 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4738 if self.op.start and not self.op.ip_check:
4739 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4740 " adding an instance in start mode")
4742 if self.op.ip_check:
4743 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4744 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4745 (self.check_ip, self.op.instance_name))
4747 #### mac address generation
4748 # By generating here the mac address both the allocator and the hooks get
4749 # the real final mac address rather than the 'auto' or 'generate' value.
4750 # There is a race condition between the generation and the instance object
4751 # creation, which means that we know the mac is valid now, but we're not
4752 # sure it will be when we actually add the instance. If things go bad
4753 # adding the instance will abort because of a duplicate mac, and the
4754 # creation job will fail.
4755 for nic in self.nics:
4756 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4757 nic.mac = self.cfg.GenerateMAC()
4761 if self.op.iallocator is not None:
4762 self._RunAllocator()
4764 #### node related checks
4766 # check primary node
4767 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4768 assert self.pnode is not None, \
4769 "Cannot retrieve locked node %s" % self.op.pnode
4771 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4774 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4777 self.secondaries = []
4779 # mirror node verification
4780 if self.op.disk_template in constants.DTS_NET_MIRROR:
4781 if self.op.snode is None:
4782 raise errors.OpPrereqError("The networked disk templates need"
4784 if self.op.snode == pnode.name:
4785 raise errors.OpPrereqError("The secondary node cannot be"
4786 " the primary node.")
4787 _CheckNodeOnline(self, self.op.snode)
4788 _CheckNodeNotDrained(self, self.op.snode)
4789 self.secondaries.append(self.op.snode)
4791 nodenames = [pnode.name] + self.secondaries
4793 req_size = _ComputeDiskSize(self.op.disk_template,
4796 # Check lv size requirements
4797 if req_size is not None:
4798 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4800 for node in nodenames:
4801 info = nodeinfo[node]
4802 info.Raise("Cannot get current information from node %s" % node)
4804 vg_free = info.get('vg_free', None)
4805 if not isinstance(vg_free, int):
4806 raise errors.OpPrereqError("Can't compute free disk space on"
4808 if req_size > vg_free:
4809 raise errors.OpPrereqError("Not enough disk space on target node %s."
4810 " %d MB available, %d MB required" %
4811 (node, vg_free, req_size))
4813 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4816 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4817 result.Raise("OS '%s' not in supported os list for primary node %s" %
4818 (self.op.os_type, pnode.name), prereq=True)
4820 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4822 # memory check on primary node
4824 _CheckNodeFreeMemory(self, self.pnode.name,
4825 "creating instance %s" % self.op.instance_name,
4826 self.be_full[constants.BE_MEMORY],
4829 def Exec(self, feedback_fn):
4830 """Create and add the instance to the cluster.
4833 instance = self.op.instance_name
4834 pnode_name = self.pnode.name
4836 ht_kind = self.op.hypervisor
4837 if ht_kind in constants.HTS_REQ_PORT:
4838 network_port = self.cfg.AllocatePort()
4842 ##if self.op.vnc_bind_address is None:
4843 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4845 # this is needed because os.path.join does not accept None arguments
4846 if self.op.file_storage_dir is None:
4847 string_file_storage_dir = ""
4849 string_file_storage_dir = self.op.file_storage_dir
4851 # build the full file storage dir path
4852 file_storage_dir = os.path.normpath(os.path.join(
4853 self.cfg.GetFileStorageDir(),
4854 string_file_storage_dir, instance))
4857 disks = _GenerateDiskTemplate(self,
4858 self.op.disk_template,
4859 instance, pnode_name,
4863 self.op.file_driver,
4866 iobj = objects.Instance(name=instance, os=self.op.os_type,
4867 primary_node=pnode_name,
4868 nics=self.nics, disks=disks,
4869 disk_template=self.op.disk_template,
4871 network_port=network_port,
4872 beparams=self.op.beparams,
4873 hvparams=self.op.hvparams,
4874 hypervisor=self.op.hypervisor,
4877 feedback_fn("* creating instance disks...")
4879 _CreateDisks(self, iobj)
4880 except errors.OpExecError:
4881 self.LogWarning("Device creation failed, reverting...")
4883 _RemoveDisks(self, iobj)
4885 self.cfg.ReleaseDRBDMinors(instance)
4888 feedback_fn("adding instance %s to cluster config" % instance)
4890 self.cfg.AddInstance(iobj)
4891 # Declare that we don't want to remove the instance lock anymore, as we've
4892 # added the instance to the config
4893 del self.remove_locks[locking.LEVEL_INSTANCE]
4894 # Unlock all the nodes
4895 if self.op.mode == constants.INSTANCE_IMPORT:
4896 nodes_keep = [self.op.src_node]
4897 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4898 if node != self.op.src_node]
4899 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4900 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4902 self.context.glm.release(locking.LEVEL_NODE)
4903 del self.acquired_locks[locking.LEVEL_NODE]
4905 if self.op.wait_for_sync:
4906 disk_abort = not _WaitForSync(self, iobj)
4907 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4908 # make sure the disks are not degraded (still sync-ing is ok)
4910 feedback_fn("* checking mirrors status")
4911 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4916 _RemoveDisks(self, iobj)
4917 self.cfg.RemoveInstance(iobj.name)
4918 # Make sure the instance lock gets removed
4919 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4920 raise errors.OpExecError("There are some degraded disks for"
4923 feedback_fn("creating os for instance %s on node %s" %
4924 (instance, pnode_name))
4926 if iobj.disk_template != constants.DT_DISKLESS:
4927 if self.op.mode == constants.INSTANCE_CREATE:
4928 feedback_fn("* running the instance OS create scripts...")
4929 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4930 result.Raise("Could not add os for instance %s"
4931 " on node %s" % (instance, pnode_name))
4933 elif self.op.mode == constants.INSTANCE_IMPORT:
4934 feedback_fn("* running the instance OS import scripts...")
4935 src_node = self.op.src_node
4936 src_images = self.src_images
4937 cluster_name = self.cfg.GetClusterName()
4938 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4939 src_node, src_images,
4941 msg = import_result.fail_msg
4943 self.LogWarning("Error while importing the disk images for instance"
4944 " %s on node %s: %s" % (instance, pnode_name, msg))
4946 # also checked in the prereq part
4947 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4951 iobj.admin_up = True
4952 self.cfg.Update(iobj)
4953 logging.info("Starting instance %s on node %s", instance, pnode_name)
4954 feedback_fn("* starting instance...")
4955 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4956 result.Raise("Could not start instance")
4959 class LUConnectConsole(NoHooksLU):
4960 """Connect to an instance's console.
4962 This is somewhat special in that it returns the command line that
4963 you need to run on the master node in order to connect to the
4967 _OP_REQP = ["instance_name"]
4970 def ExpandNames(self):
4971 self._ExpandAndLockInstance()
4973 def CheckPrereq(self):
4974 """Check prerequisites.
4976 This checks that the instance is in the cluster.
4979 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4980 assert self.instance is not None, \
4981 "Cannot retrieve locked instance %s" % self.op.instance_name
4982 _CheckNodeOnline(self, self.instance.primary_node)
4984 def Exec(self, feedback_fn):
4985 """Connect to the console of an instance
4988 instance = self.instance
4989 node = instance.primary_node
4991 node_insts = self.rpc.call_instance_list([node],
4992 [instance.hypervisor])[node]
4993 node_insts.Raise("Can't get node information from %s" % node)
4995 if instance.name not in node_insts.payload:
4996 raise errors.OpExecError("Instance %s is not running." % instance.name)
4998 logging.debug("Connecting to console of %s on %s", instance.name, node)
5000 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5001 cluster = self.cfg.GetClusterInfo()
5002 # beparams and hvparams are passed separately, to avoid editing the
5003 # instance and then saving the defaults in the instance itself.
5004 hvparams = cluster.FillHV(instance)
5005 beparams = cluster.FillBE(instance)
5006 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5009 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5012 class LUReplaceDisks(LogicalUnit):
5013 """Replace the disks of an instance.
5016 HPATH = "mirrors-replace"
5017 HTYPE = constants.HTYPE_INSTANCE
5018 _OP_REQP = ["instance_name", "mode", "disks"]
5021 def CheckArguments(self):
5022 if not hasattr(self.op, "remote_node"):
5023 self.op.remote_node = None
5024 if not hasattr(self.op, "iallocator"):
5025 self.op.iallocator = None
5027 # check for valid parameter combination
5028 cnt = [self.op.remote_node, self.op.iallocator].count(None)
5029 if self.op.mode == constants.REPLACE_DISK_CHG:
5031 raise errors.OpPrereqError("When changing the secondary either an"
5032 " iallocator script must be used or the"
5035 raise errors.OpPrereqError("Give either the iallocator or the new"
5036 " secondary, not both")
5037 else: # not replacing the secondary
5039 raise errors.OpPrereqError("The iallocator and new node options can"
5040 " be used only when changing the"
5043 def ExpandNames(self):
5044 self._ExpandAndLockInstance()
5046 if self.op.iallocator is not None:
5047 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5048 elif self.op.remote_node is not None:
5049 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5050 if remote_node is None:
5051 raise errors.OpPrereqError("Node '%s' not known" %
5052 self.op.remote_node)
5053 self.op.remote_node = remote_node
5054 # Warning: do not remove the locking of the new secondary here
5055 # unless DRBD8.AddChildren is changed to work in parallel;
5056 # currently it doesn't since parallel invocations of
5057 # FindUnusedMinor will conflict
5058 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5059 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5061 self.needed_locks[locking.LEVEL_NODE] = []
5062 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5064 def DeclareLocks(self, level):
5065 # If we're not already locking all nodes in the set we have to declare the
5066 # instance's primary/secondary nodes.
5067 if (level == locking.LEVEL_NODE and
5068 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5069 self._LockInstancesNodes()
5071 def _RunAllocator(self):
5072 """Compute a new secondary node using an IAllocator.
5075 ial = IAllocator(self,
5076 mode=constants.IALLOCATOR_MODE_RELOC,
5077 name=self.op.instance_name,
5078 relocate_from=[self.sec_node])
5080 ial.Run(self.op.iallocator)
5083 raise errors.OpPrereqError("Can't compute nodes using"
5084 " iallocator '%s': %s" % (self.op.iallocator,
5086 if len(ial.nodes) != ial.required_nodes:
5087 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5088 " of nodes (%s), required %s" %
5089 (len(ial.nodes), ial.required_nodes))
5090 self.op.remote_node = ial.nodes[0]
5091 self.LogInfo("Selected new secondary for the instance: %s",
5092 self.op.remote_node)
5094 def BuildHooksEnv(self):
5097 This runs on the master, the primary and all the secondaries.
5101 "MODE": self.op.mode,
5102 "NEW_SECONDARY": self.op.remote_node,
5103 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5105 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5107 self.cfg.GetMasterNode(),
5108 self.instance.primary_node,
5110 if self.op.remote_node is not None:
5111 nl.append(self.op.remote_node)
5114 def CheckPrereq(self):
5115 """Check prerequisites.
5117 This checks that the instance is in the cluster.
5120 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5121 assert instance is not None, \
5122 "Cannot retrieve locked instance %s" % self.op.instance_name
5123 self.instance = instance
5125 if instance.disk_template != constants.DT_DRBD8:
5126 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5129 if len(instance.secondary_nodes) != 1:
5130 raise errors.OpPrereqError("The instance has a strange layout,"
5131 " expected one secondary but found %d" %
5132 len(instance.secondary_nodes))
5134 self.sec_node = instance.secondary_nodes[0]
5136 if self.op.iallocator is not None:
5137 self._RunAllocator()
5139 remote_node = self.op.remote_node
5140 if remote_node is not None:
5141 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5142 assert self.remote_node_info is not None, \
5143 "Cannot retrieve locked node %s" % remote_node
5145 self.remote_node_info = None
5146 if remote_node == instance.primary_node:
5147 raise errors.OpPrereqError("The specified node is the primary node of"
5149 elif remote_node == self.sec_node:
5150 raise errors.OpPrereqError("The specified node is already the"
5151 " secondary node of the instance.")
5153 if self.op.mode == constants.REPLACE_DISK_PRI:
5154 n1 = self.tgt_node = instance.primary_node
5155 n2 = self.oth_node = self.sec_node
5156 elif self.op.mode == constants.REPLACE_DISK_SEC:
5157 n1 = self.tgt_node = self.sec_node
5158 n2 = self.oth_node = instance.primary_node
5159 elif self.op.mode == constants.REPLACE_DISK_CHG:
5160 n1 = self.new_node = remote_node
5161 n2 = self.oth_node = instance.primary_node
5162 self.tgt_node = self.sec_node
5163 _CheckNodeNotDrained(self, remote_node)
5165 raise errors.ProgrammerError("Unhandled disk replace mode")
5167 _CheckNodeOnline(self, n1)
5168 _CheckNodeOnline(self, n2)
5170 if not self.op.disks:
5171 self.op.disks = range(len(instance.disks))
5173 for disk_idx in self.op.disks:
5174 instance.FindDisk(disk_idx)
5176 def _ExecD8DiskOnly(self, feedback_fn):
5177 """Replace a disk on the primary or secondary for dbrd8.
5179 The algorithm for replace is quite complicated:
5181 1. for each disk to be replaced:
5183 1. create new LVs on the target node with unique names
5184 1. detach old LVs from the drbd device
5185 1. rename old LVs to name_replaced.<time_t>
5186 1. rename new LVs to old LVs
5187 1. attach the new LVs (with the old names now) to the drbd device
5189 1. wait for sync across all devices
5191 1. for each modified disk:
5193 1. remove old LVs (which have the name name_replaces.<time_t>)
5195 Failures are not very well handled.
5199 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5200 instance = self.instance
5202 vgname = self.cfg.GetVGName()
5205 tgt_node = self.tgt_node
5206 oth_node = self.oth_node
5208 # Step: check device activation
5209 self.proc.LogStep(1, steps_total, "check device existence")
5210 info("checking volume groups")
5211 my_vg = cfg.GetVGName()
5212 results = self.rpc.call_vg_list([oth_node, tgt_node])
5214 raise errors.OpExecError("Can't list volume groups on the nodes")
5215 for node in oth_node, tgt_node:
5217 res.Raise("Error checking node %s" % node)
5218 if my_vg not in res.payload:
5219 raise errors.OpExecError("Volume group '%s' not found on %s" %
5221 for idx, dev in enumerate(instance.disks):
5222 if idx not in self.op.disks:
5224 for node in tgt_node, oth_node:
5225 info("checking disk/%d on %s" % (idx, node))
5226 cfg.SetDiskID(dev, node)
5227 result = self.rpc.call_blockdev_find(node, dev)
5228 msg = result.fail_msg
5229 if not msg and not result.payload:
5230 msg = "disk not found"
5232 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5235 # Step: check other node consistency
5236 self.proc.LogStep(2, steps_total, "check peer consistency")
5237 for idx, dev in enumerate(instance.disks):
5238 if idx not in self.op.disks:
5240 info("checking disk/%d consistency on %s" % (idx, oth_node))
5241 if not _CheckDiskConsistency(self, dev, oth_node,
5242 oth_node==instance.primary_node):
5243 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5244 " to replace disks on this node (%s)" %
5245 (oth_node, tgt_node))
5247 # Step: create new storage
5248 self.proc.LogStep(3, steps_total, "allocate new storage")
5249 for idx, dev in enumerate(instance.disks):
5250 if idx not in self.op.disks:
5253 cfg.SetDiskID(dev, tgt_node)
5254 lv_names = [".disk%d_%s" % (idx, suf)
5255 for suf in ["data", "meta"]]
5256 names = _GenerateUniqueNames(self, lv_names)
5257 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5258 logical_id=(vgname, names[0]))
5259 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5260 logical_id=(vgname, names[1]))
5261 new_lvs = [lv_data, lv_meta]
5262 old_lvs = dev.children
5263 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5264 info("creating new local storage on %s for %s" %
5265 (tgt_node, dev.iv_name))
5266 # we pass force_create=True to force the LVM creation
5267 for new_lv in new_lvs:
5268 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5269 _GetInstanceInfoText(instance), False)
5271 # Step: for each lv, detach+rename*2+attach
5272 self.proc.LogStep(4, steps_total, "change drbd configuration")
5273 for dev, old_lvs, new_lvs in iv_names.itervalues():
5274 info("detaching %s drbd from local storage" % dev.iv_name)
5275 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5276 result.Raise("Can't detach drbd from local storage on node"
5277 " %s for device %s" % (tgt_node, dev.iv_name))
5279 #cfg.Update(instance)
5281 # ok, we created the new LVs, so now we know we have the needed
5282 # storage; as such, we proceed on the target node to rename
5283 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5284 # using the assumption that logical_id == physical_id (which in
5285 # turn is the unique_id on that node)
5287 # FIXME(iustin): use a better name for the replaced LVs
5288 temp_suffix = int(time.time())
5289 ren_fn = lambda d, suff: (d.physical_id[0],
5290 d.physical_id[1] + "_replaced-%s" % suff)
5291 # build the rename list based on what LVs exist on the node
5293 for to_ren in old_lvs:
5294 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5295 if not result.fail_msg and result.payload:
5297 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5299 info("renaming the old LVs on the target node")
5300 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5301 result.Raise("Can't rename old LVs on node %s" % tgt_node)
5302 # now we rename the new LVs to the old LVs
5303 info("renaming the new LVs on the target node")
5304 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5305 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5306 result.Raise("Can't rename new LVs on node %s" % tgt_node)
5308 for old, new in zip(old_lvs, new_lvs):
5309 new.logical_id = old.logical_id
5310 cfg.SetDiskID(new, tgt_node)
5312 for disk in old_lvs:
5313 disk.logical_id = ren_fn(disk, temp_suffix)
5314 cfg.SetDiskID(disk, tgt_node)
5316 # now that the new lvs have the old name, we can add them to the device
5317 info("adding new mirror component on %s" % tgt_node)
5318 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5319 msg = result.fail_msg
5321 for new_lv in new_lvs:
5322 msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
5324 warning("Can't rollback device %s: %s", dev, msg2,
5325 hint="cleanup manually the unused logical volumes")
5326 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5328 dev.children = new_lvs
5329 cfg.Update(instance)
5331 # Step: wait for sync
5333 # this can fail as the old devices are degraded and _WaitForSync
5334 # does a combined result over all disks, so we don't check its
5336 self.proc.LogStep(5, steps_total, "sync devices")
5337 _WaitForSync(self, instance, unlock=True)
5339 # so check manually all the devices
5340 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5341 cfg.SetDiskID(dev, instance.primary_node)
5342 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5343 msg = result.fail_msg
5344 if not msg and not result.payload:
5345 msg = "disk not found"
5347 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5349 if result.payload[5]:
5350 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5352 # Step: remove old storage
5353 self.proc.LogStep(6, steps_total, "removing old storage")
5354 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5355 info("remove logical volumes for %s" % name)
5357 cfg.SetDiskID(lv, tgt_node)
5358 msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
5360 warning("Can't remove old LV: %s" % msg,
5361 hint="manually remove unused LVs")
5364 def _ExecD8Secondary(self, feedback_fn):
5365 """Replace the secondary node for drbd8.
5367 The algorithm for replace is quite complicated:
5368 - for all disks of the instance:
5369 - create new LVs on the new node with same names
5370 - shutdown the drbd device on the old secondary
5371 - disconnect the drbd network on the primary
5372 - create the drbd device on the new secondary
5373 - network attach the drbd on the primary, using an artifice:
5374 the drbd code for Attach() will connect to the network if it
5375 finds a device which is connected to the good local disks but
5377 - wait for sync across all devices
5378 - remove all disks from the old secondary
5380 Failures are not very well handled.
5384 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5385 instance = self.instance
5389 old_node = self.tgt_node
5390 new_node = self.new_node
5391 pri_node = instance.primary_node
5393 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5394 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5395 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5398 # Step: check device activation
5399 self.proc.LogStep(1, steps_total, "check device existence")
5400 info("checking volume groups")
5401 my_vg = cfg.GetVGName()
5402 results = self.rpc.call_vg_list([pri_node, new_node])
5403 for node in pri_node, new_node:
5405 res.Raise("Error checking node %s" % node)
5406 if my_vg not in res.payload:
5407 raise errors.OpExecError("Volume group '%s' not found on %s" %
5409 for idx, dev in enumerate(instance.disks):
5410 if idx not in self.op.disks:
5412 info("checking disk/%d on %s" % (idx, pri_node))
5413 cfg.SetDiskID(dev, pri_node)
5414 result = self.rpc.call_blockdev_find(pri_node, dev)
5415 msg = result.fail_msg
5416 if not msg and not result.payload:
5417 msg = "disk not found"
5419 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5420 (idx, pri_node, msg))
5422 # Step: check other node consistency
5423 self.proc.LogStep(2, steps_total, "check peer consistency")
5424 for idx, dev in enumerate(instance.disks):
5425 if idx not in self.op.disks:
5427 info("checking disk/%d consistency on %s" % (idx, pri_node))
5428 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5429 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5430 " unsafe to replace the secondary" %
5433 # Step: create new storage
5434 self.proc.LogStep(3, steps_total, "allocate new storage")
5435 for idx, dev in enumerate(instance.disks):
5436 info("adding new local storage on %s for disk/%d" %
5438 # we pass force_create=True to force LVM creation
5439 for new_lv in dev.children:
5440 _CreateBlockDev(self, new_node, instance, new_lv, True,
5441 _GetInstanceInfoText(instance), False)
5443 # Step 4: dbrd minors and drbd setups changes
5444 # after this, we must manually remove the drbd minors on both the
5445 # error and the success paths
5446 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5448 logging.debug("Allocated minors %s" % (minors,))
5449 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5450 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5452 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5453 # create new devices on new_node; note that we create two IDs:
5454 # one without port, so the drbd will be activated without
5455 # networking information on the new node at this stage, and one
5456 # with network, for the latter activation in step 4
5457 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5458 if pri_node == o_node1:
5463 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5464 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5466 iv_names[idx] = (dev, dev.children, new_net_id)
5467 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5469 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5470 logical_id=new_alone_id,
5471 children=dev.children,
5474 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5475 _GetInstanceInfoText(instance), False)
5476 except errors.GenericError:
5477 self.cfg.ReleaseDRBDMinors(instance.name)
5480 for idx, dev in enumerate(instance.disks):
5481 # we have new devices, shutdown the drbd on the old secondary
5482 info("shutting down drbd for disk/%d on old node" % idx)
5483 cfg.SetDiskID(dev, old_node)
5484 msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
5486 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5488 hint="Please cleanup this device manually as soon as possible")
5490 info("detaching primary drbds from the network (=> standalone)")
5491 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5492 instance.disks)[pri_node]
5494 msg = result.fail_msg
5496 # detaches didn't succeed (unlikely)
5497 self.cfg.ReleaseDRBDMinors(instance.name)
5498 raise errors.OpExecError("Can't detach the disks from the network on"
5499 " old node: %s" % (msg,))
5501 # if we managed to detach at least one, we update all the disks of
5502 # the instance to point to the new secondary
5503 info("updating instance configuration")
5504 for dev, _, new_logical_id in iv_names.itervalues():
5505 dev.logical_id = new_logical_id
5506 cfg.SetDiskID(dev, pri_node)
5507 cfg.Update(instance)
5509 # and now perform the drbd attach
5510 info("attaching primary drbds to new secondary (standalone => connected)")
5511 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5512 instance.disks, instance.name,
5514 for to_node, to_result in result.items():
5515 msg = to_result.fail_msg
5517 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5518 hint="please do a gnt-instance info to see the"
5521 # this can fail as the old devices are degraded and _WaitForSync
5522 # does a combined result over all disks, so we don't check its
5524 self.proc.LogStep(5, steps_total, "sync devices")
5525 _WaitForSync(self, instance, unlock=True)
5527 # so check manually all the devices
5528 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5529 cfg.SetDiskID(dev, pri_node)
5530 result = self.rpc.call_blockdev_find(pri_node, dev)
5531 msg = result.fail_msg
5532 if not msg and not result.payload:
5533 msg = "disk not found"
5535 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5537 if result.payload[5]:
5538 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5540 self.proc.LogStep(6, steps_total, "removing old storage")
5541 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5542 info("remove logical volumes for disk/%d" % idx)
5544 cfg.SetDiskID(lv, old_node)
5545 msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
5547 warning("Can't remove LV on old secondary: %s", msg,
5548 hint="Cleanup stale volumes by hand")
5550 def Exec(self, feedback_fn):
5551 """Execute disk replacement.
5553 This dispatches the disk replacement to the appropriate handler.
5556 instance = self.instance
5558 # Activate the instance disks if we're replacing them on a down instance
5559 if not instance.admin_up:
5560 _StartInstanceDisks(self, instance, True)
5562 if self.op.mode == constants.REPLACE_DISK_CHG:
5563 fn = self._ExecD8Secondary
5565 fn = self._ExecD8DiskOnly
5567 ret = fn(feedback_fn)
5569 # Deactivate the instance disks if we're replacing them on a down instance
5570 if not instance.admin_up:
5571 _SafeShutdownInstanceDisks(self, instance)
5576 class LUGrowDisk(LogicalUnit):
5577 """Grow a disk of an instance.
5581 HTYPE = constants.HTYPE_INSTANCE
5582 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5585 def ExpandNames(self):
5586 self._ExpandAndLockInstance()
5587 self.needed_locks[locking.LEVEL_NODE] = []
5588 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5590 def DeclareLocks(self, level):
5591 if level == locking.LEVEL_NODE:
5592 self._LockInstancesNodes()
5594 def BuildHooksEnv(self):
5597 This runs on the master, the primary and all the secondaries.
5601 "DISK": self.op.disk,
5602 "AMOUNT": self.op.amount,
5604 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5606 self.cfg.GetMasterNode(),
5607 self.instance.primary_node,
5611 def CheckPrereq(self):
5612 """Check prerequisites.
5614 This checks that the instance is in the cluster.
5617 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5618 assert instance is not None, \
5619 "Cannot retrieve locked instance %s" % self.op.instance_name
5620 nodenames = list(instance.all_nodes)
5621 for node in nodenames:
5622 _CheckNodeOnline(self, node)
5625 self.instance = instance
5627 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5628 raise errors.OpPrereqError("Instance's disk layout does not support"
5631 self.disk = instance.FindDisk(self.op.disk)
5633 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5634 instance.hypervisor)
5635 for node in nodenames:
5636 info = nodeinfo[node]
5637 info.Raise("Cannot get current information from node %s" % node)
5638 vg_free = info.payload.get('vg_free', None)
5639 if not isinstance(vg_free, int):
5640 raise errors.OpPrereqError("Can't compute free disk space on"
5642 if self.op.amount > vg_free:
5643 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5644 " %d MiB available, %d MiB required" %
5645 (node, vg_free, self.op.amount))
5647 def Exec(self, feedback_fn):
5648 """Execute disk grow.
5651 instance = self.instance
5653 for node in instance.all_nodes:
5654 self.cfg.SetDiskID(disk, node)
5655 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5656 result.Raise("Grow request failed to node %s" % node)
5657 disk.RecordGrow(self.op.amount)
5658 self.cfg.Update(instance)
5659 if self.op.wait_for_sync:
5660 disk_abort = not _WaitForSync(self, instance)
5662 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5663 " status.\nPlease check the instance.")
5666 class LUQueryInstanceData(NoHooksLU):
5667 """Query runtime instance data.
5670 _OP_REQP = ["instances", "static"]
5673 def ExpandNames(self):
5674 self.needed_locks = {}
5675 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5677 if not isinstance(self.op.instances, list):
5678 raise errors.OpPrereqError("Invalid argument type 'instances'")
5680 if self.op.instances:
5681 self.wanted_names = []
5682 for name in self.op.instances:
5683 full_name = self.cfg.ExpandInstanceName(name)
5684 if full_name is None:
5685 raise errors.OpPrereqError("Instance '%s' not known" % name)
5686 self.wanted_names.append(full_name)
5687 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5689 self.wanted_names = None
5690 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5692 self.needed_locks[locking.LEVEL_NODE] = []
5693 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5695 def DeclareLocks(self, level):
5696 if level == locking.LEVEL_NODE:
5697 self._LockInstancesNodes()
5699 def CheckPrereq(self):
5700 """Check prerequisites.
5702 This only checks the optional instance list against the existing names.
5705 if self.wanted_names is None:
5706 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5708 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5709 in self.wanted_names]
5712 def _ComputeDiskStatus(self, instance, snode, dev):
5713 """Compute block device status.
5716 static = self.op.static
5718 self.cfg.SetDiskID(dev, instance.primary_node)
5719 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5720 if dev_pstatus.offline:
5723 dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5724 dev_pstatus = dev_pstatus.payload
5728 if dev.dev_type in constants.LDS_DRBD:
5729 # we change the snode then (otherwise we use the one passed in)
5730 if dev.logical_id[0] == instance.primary_node:
5731 snode = dev.logical_id[1]
5733 snode = dev.logical_id[0]
5735 if snode and not static:
5736 self.cfg.SetDiskID(dev, snode)
5737 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5738 if dev_sstatus.offline:
5741 dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5742 dev_sstatus = dev_sstatus.payload
5747 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5748 for child in dev.children]
5753 "iv_name": dev.iv_name,
5754 "dev_type": dev.dev_type,
5755 "logical_id": dev.logical_id,
5756 "physical_id": dev.physical_id,
5757 "pstatus": dev_pstatus,
5758 "sstatus": dev_sstatus,
5759 "children": dev_children,
5765 def Exec(self, feedback_fn):
5766 """Gather and return data"""
5769 cluster = self.cfg.GetClusterInfo()
5771 for instance in self.wanted_instances:
5772 if not self.op.static:
5773 remote_info = self.rpc.call_instance_info(instance.primary_node,
5775 instance.hypervisor)
5776 remote_info.Raise("Error checking node %s" % instance.primary_node)
5777 remote_info = remote_info.payload
5778 if remote_info and "state" in remote_info:
5781 remote_state = "down"
5784 if instance.admin_up:
5787 config_state = "down"
5789 disks = [self._ComputeDiskStatus(instance, None, device)
5790 for device in instance.disks]
5793 "name": instance.name,
5794 "config_state": config_state,
5795 "run_state": remote_state,
5796 "pnode": instance.primary_node,
5797 "snodes": instance.secondary_nodes,
5799 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5801 "hypervisor": instance.hypervisor,
5802 "network_port": instance.network_port,
5803 "hv_instance": instance.hvparams,
5804 "hv_actual": cluster.FillHV(instance),
5805 "be_instance": instance.beparams,
5806 "be_actual": cluster.FillBE(instance),
5809 result[instance.name] = idict
5814 class LUSetInstanceParams(LogicalUnit):
5815 """Modifies an instances's parameters.
5818 HPATH = "instance-modify"
5819 HTYPE = constants.HTYPE_INSTANCE
5820 _OP_REQP = ["instance_name"]
5823 def CheckArguments(self):
5824 if not hasattr(self.op, 'nics'):
5826 if not hasattr(self.op, 'disks'):
5828 if not hasattr(self.op, 'beparams'):
5829 self.op.beparams = {}
5830 if not hasattr(self.op, 'hvparams'):
5831 self.op.hvparams = {}
5832 self.op.force = getattr(self.op, "force", False)
5833 if not (self.op.nics or self.op.disks or
5834 self.op.hvparams or self.op.beparams):
5835 raise errors.OpPrereqError("No changes submitted")
5839 for disk_op, disk_dict in self.op.disks:
5840 if disk_op == constants.DDM_REMOVE:
5843 elif disk_op == constants.DDM_ADD:
5846 if not isinstance(disk_op, int):
5847 raise errors.OpPrereqError("Invalid disk index")
5848 if disk_op == constants.DDM_ADD:
5849 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5850 if mode not in constants.DISK_ACCESS_SET:
5851 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5852 size = disk_dict.get('size', None)
5854 raise errors.OpPrereqError("Required disk parameter size missing")
5857 except ValueError, err:
5858 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5860 disk_dict['size'] = size
5862 # modification of disk
5863 if 'size' in disk_dict:
5864 raise errors.OpPrereqError("Disk size change not possible, use"
5867 if disk_addremove > 1:
5868 raise errors.OpPrereqError("Only one disk add or remove operation"
5869 " supported at a time")
5873 for nic_op, nic_dict in self.op.nics:
5874 if nic_op == constants.DDM_REMOVE:
5877 elif nic_op == constants.DDM_ADD:
5880 if not isinstance(nic_op, int):
5881 raise errors.OpPrereqError("Invalid nic index")
5883 # nic_dict should be a dict
5884 nic_ip = nic_dict.get('ip', None)
5885 if nic_ip is not None:
5886 if nic_ip.lower() == constants.VALUE_NONE:
5887 nic_dict['ip'] = None
5889 if not utils.IsValidIP(nic_ip):
5890 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5892 nic_bridge = nic_dict.get('bridge', None)
5893 nic_link = nic_dict.get('link', None)
5894 if nic_bridge and nic_link:
5895 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5896 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5897 nic_dict['bridge'] = None
5898 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5899 nic_dict['link'] = None
5901 if nic_op == constants.DDM_ADD:
5902 nic_mac = nic_dict.get('mac', None)
5904 nic_dict['mac'] = constants.VALUE_AUTO
5906 if 'mac' in nic_dict:
5907 nic_mac = nic_dict['mac']
5908 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5909 if not utils.IsValidMac(nic_mac):
5910 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5911 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5912 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5913 " modifying an existing nic")
5915 if nic_addremove > 1:
5916 raise errors.OpPrereqError("Only one NIC add or remove operation"
5917 " supported at a time")
5919 def ExpandNames(self):
5920 self._ExpandAndLockInstance()
5921 self.needed_locks[locking.LEVEL_NODE] = []
5922 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5924 def DeclareLocks(self, level):
5925 if level == locking.LEVEL_NODE:
5926 self._LockInstancesNodes()
5928 def BuildHooksEnv(self):
5931 This runs on the master, primary and secondaries.
5935 if constants.BE_MEMORY in self.be_new:
5936 args['memory'] = self.be_new[constants.BE_MEMORY]
5937 if constants.BE_VCPUS in self.be_new:
5938 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5939 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5940 # information at all.
5943 nic_override = dict(self.op.nics)
5944 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
5945 for idx, nic in enumerate(self.instance.nics):
5946 if idx in nic_override:
5947 this_nic_override = nic_override[idx]
5949 this_nic_override = {}
5950 if 'ip' in this_nic_override:
5951 ip = this_nic_override['ip']
5954 if 'mac' in this_nic_override:
5955 mac = this_nic_override['mac']
5958 if idx in self.nic_pnew:
5959 nicparams = self.nic_pnew[idx]
5961 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
5962 mode = nicparams[constants.NIC_MODE]
5963 link = nicparams[constants.NIC_LINK]
5964 args['nics'].append((ip, mac, mode, link))
5965 if constants.DDM_ADD in nic_override:
5966 ip = nic_override[constants.DDM_ADD].get('ip', None)
5967 mac = nic_override[constants.DDM_ADD]['mac']
5968 nicparams = self.nic_pnew[constants.DDM_ADD]
5969 mode = nicparams[constants.NIC_MODE]
5970 link = nicparams[constants.NIC_LINK]
5971 args['nics'].append((ip, mac, mode, link))
5972 elif constants.DDM_REMOVE in nic_override:
5973 del args['nics'][-1]
5975 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5976 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5979 def _GetUpdatedParams(self, old_params, update_dict,
5980 default_values, parameter_types):
5981 """Return the new params dict for the given params.
5983 @type old_params: dict
5984 @type old_params: old parameters
5985 @type update_dict: dict
5986 @type update_dict: dict containing new parameter values,
5987 or constants.VALUE_DEFAULT to reset the
5988 parameter to its default value
5989 @type default_values: dict
5990 @param default_values: default values for the filled parameters
5991 @type parameter_types: dict
5992 @param parameter_types: dict mapping target dict keys to types
5993 in constants.ENFORCEABLE_TYPES
5994 @rtype: (dict, dict)
5995 @return: (new_parameters, filled_parameters)
5998 params_copy = copy.deepcopy(old_params)
5999 for key, val in update_dict.iteritems():
6000 if val == constants.VALUE_DEFAULT:
6002 del params_copy[key]
6006 params_copy[key] = val
6007 utils.ForceDictType(params_copy, parameter_types)
6008 params_filled = objects.FillDict(default_values, params_copy)
6009 return (params_copy, params_filled)
6011 def CheckPrereq(self):
6012 """Check prerequisites.
6014 This only checks the instance list against the existing names.
6017 force = self.force = self.op.force
6019 # checking the new params on the primary/secondary nodes
6021 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6022 cluster = self.cluster = self.cfg.GetClusterInfo()
6023 assert self.instance is not None, \
6024 "Cannot retrieve locked instance %s" % self.op.instance_name
6025 pnode = instance.primary_node
6026 nodelist = list(instance.all_nodes)
6028 # hvparams processing
6029 if self.op.hvparams:
6030 i_hvdict, hv_new = self._GetUpdatedParams(
6031 instance.hvparams, self.op.hvparams,
6032 cluster.hvparams[instance.hypervisor],
6033 constants.HVS_PARAMETER_TYPES)
6035 hypervisor.GetHypervisor(
6036 instance.hypervisor).CheckParameterSyntax(hv_new)
6037 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6038 self.hv_new = hv_new # the new actual values
6039 self.hv_inst = i_hvdict # the new dict (without defaults)
6041 self.hv_new = self.hv_inst = {}
6043 # beparams processing
6044 if self.op.beparams:
6045 i_bedict, be_new = self._GetUpdatedParams(
6046 instance.beparams, self.op.beparams,
6047 cluster.beparams[constants.PP_DEFAULT],
6048 constants.BES_PARAMETER_TYPES)
6049 self.be_new = be_new # the new actual values
6050 self.be_inst = i_bedict # the new dict (without defaults)
6052 self.be_new = self.be_inst = {}
6056 if constants.BE_MEMORY in self.op.beparams and not self.force:
6057 mem_check_list = [pnode]
6058 if be_new[constants.BE_AUTO_BALANCE]:
6059 # either we changed auto_balance to yes or it was from before
6060 mem_check_list.extend(instance.secondary_nodes)
6061 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6062 instance.hypervisor)
6063 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6064 instance.hypervisor)
6065 pninfo = nodeinfo[pnode]
6066 msg = pninfo.fail_msg
6068 # Assume the primary node is unreachable and go ahead
6069 self.warn.append("Can't get info from primary node %s: %s" %
6071 elif not isinstance(pninfo.payload.get('memory_free', None), int):
6072 self.warn.append("Node data from primary node %s doesn't contain"
6073 " free memory information" % pnode)
6074 elif instance_info.fail_msg:
6075 self.warn.append("Can't get instance runtime information: %s" %
6076 instance_info.fail_msg)
6078 if instance_info.payload:
6079 current_mem = int(instance_info.payload['memory'])
6081 # Assume instance not running
6082 # (there is a slight race condition here, but it's not very probable,
6083 # and we have no other way to check)
6085 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6086 pninfo.payload['memory_free'])
6088 raise errors.OpPrereqError("This change will prevent the instance"
6089 " from starting, due to %d MB of memory"
6090 " missing on its primary node" % miss_mem)
6092 if be_new[constants.BE_AUTO_BALANCE]:
6093 for node, nres in nodeinfo.items():
6094 if node not in instance.secondary_nodes:
6098 self.warn.append("Can't get info from secondary node %s: %s" %
6100 elif not isinstance(nres.payload.get('memory_free', None), int):
6101 self.warn.append("Secondary node %s didn't return free"
6102 " memory information" % node)
6103 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6104 self.warn.append("Not enough memory to failover instance to"
6105 " secondary node %s" % node)
6110 for nic_op, nic_dict in self.op.nics:
6111 if nic_op == constants.DDM_REMOVE:
6112 if not instance.nics:
6113 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6115 if nic_op != constants.DDM_ADD:
6117 if nic_op < 0 or nic_op >= len(instance.nics):
6118 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6120 (nic_op, len(instance.nics)))
6121 old_nic_params = instance.nics[nic_op].nicparams
6122 old_nic_ip = instance.nics[nic_op].ip
6127 update_params_dict = dict([(key, nic_dict[key])
6128 for key in constants.NICS_PARAMETERS
6129 if key in nic_dict])
6131 if 'bridge' in nic_dict:
6132 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6134 new_nic_params, new_filled_nic_params = \
6135 self._GetUpdatedParams(old_nic_params, update_params_dict,
6136 cluster.nicparams[constants.PP_DEFAULT],
6137 constants.NICS_PARAMETER_TYPES)
6138 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6139 self.nic_pinst[nic_op] = new_nic_params
6140 self.nic_pnew[nic_op] = new_filled_nic_params
6141 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6143 if new_nic_mode == constants.NIC_MODE_BRIDGED:
6144 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6145 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6147 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6149 self.warn.append(msg)
6151 raise errors.OpPrereqError(msg)
6152 if new_nic_mode == constants.NIC_MODE_ROUTED:
6153 if 'ip' in nic_dict:
6154 nic_ip = nic_dict['ip']
6158 raise errors.OpPrereqError('Cannot set the nic ip to None'
6160 if 'mac' in nic_dict:
6161 nic_mac = nic_dict['mac']
6163 raise errors.OpPrereqError('Cannot set the nic mac to None')
6164 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6165 # otherwise generate the mac
6166 nic_dict['mac'] = self.cfg.GenerateMAC()
6168 # or validate/reserve the current one
6169 if self.cfg.IsMacInUse(nic_mac):
6170 raise errors.OpPrereqError("MAC address %s already in use"
6171 " in cluster" % nic_mac)
6174 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6175 raise errors.OpPrereqError("Disk operations not supported for"
6176 " diskless instances")
6177 for disk_op, disk_dict in self.op.disks:
6178 if disk_op == constants.DDM_REMOVE:
6179 if len(instance.disks) == 1:
6180 raise errors.OpPrereqError("Cannot remove the last disk of"
6182 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6183 ins_l = ins_l[pnode]
6184 msg = ins_l.fail_msg
6186 raise errors.OpPrereqError("Can't contact node %s: %s" %
6188 if instance.name in ins_l.payload:
6189 raise errors.OpPrereqError("Instance is running, can't remove"
6192 if (disk_op == constants.DDM_ADD and
6193 len(instance.nics) >= constants.MAX_DISKS):
6194 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6195 " add more" % constants.MAX_DISKS)
6196 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6198 if disk_op < 0 or disk_op >= len(instance.disks):
6199 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6201 (disk_op, len(instance.disks)))
6205 def Exec(self, feedback_fn):
6206 """Modifies an instance.
6208 All parameters take effect only at the next restart of the instance.
6211 # Process here the warnings from CheckPrereq, as we don't have a
6212 # feedback_fn there.
6213 for warn in self.warn:
6214 feedback_fn("WARNING: %s" % warn)
6217 instance = self.instance
6218 cluster = self.cluster
6220 for disk_op, disk_dict in self.op.disks:
6221 if disk_op == constants.DDM_REMOVE:
6222 # remove the last disk
6223 device = instance.disks.pop()
6224 device_idx = len(instance.disks)
6225 for node, disk in device.ComputeNodeTree(instance.primary_node):
6226 self.cfg.SetDiskID(disk, node)
6227 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6229 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6230 " continuing anyway", device_idx, node, msg)
6231 result.append(("disk/%d" % device_idx, "remove"))
6232 elif disk_op == constants.DDM_ADD:
6234 if instance.disk_template == constants.DT_FILE:
6235 file_driver, file_path = instance.disks[0].logical_id
6236 file_path = os.path.dirname(file_path)
6238 file_driver = file_path = None
6239 disk_idx_base = len(instance.disks)
6240 new_disk = _GenerateDiskTemplate(self,
6241 instance.disk_template,
6242 instance.name, instance.primary_node,
6243 instance.secondary_nodes,
6248 instance.disks.append(new_disk)
6249 info = _GetInstanceInfoText(instance)
6251 logging.info("Creating volume %s for instance %s",
6252 new_disk.iv_name, instance.name)
6253 # Note: this needs to be kept in sync with _CreateDisks
6255 for node in instance.all_nodes:
6256 f_create = node == instance.primary_node
6258 _CreateBlockDev(self, node, instance, new_disk,
6259 f_create, info, f_create)
6260 except errors.OpExecError, err:
6261 self.LogWarning("Failed to create volume %s (%s) on"
6263 new_disk.iv_name, new_disk, node, err)
6264 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6265 (new_disk.size, new_disk.mode)))
6267 # change a given disk
6268 instance.disks[disk_op].mode = disk_dict['mode']
6269 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6271 for nic_op, nic_dict in self.op.nics:
6272 if nic_op == constants.DDM_REMOVE:
6273 # remove the last nic
6274 del instance.nics[-1]
6275 result.append(("nic.%d" % len(instance.nics), "remove"))
6276 elif nic_op == constants.DDM_ADD:
6277 # mac and bridge should be set, by now
6278 mac = nic_dict['mac']
6279 ip = nic_dict.get('ip', None)
6280 nicparams = self.nic_pinst[constants.DDM_ADD]
6281 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6282 instance.nics.append(new_nic)
6283 result.append(("nic.%d" % (len(instance.nics) - 1),
6284 "add:mac=%s,ip=%s,mode=%s,link=%s" %
6285 (new_nic.mac, new_nic.ip,
6286 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6287 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6290 for key in 'mac', 'ip':
6292 setattr(instance.nics[nic_op], key, nic_dict[key])
6293 if nic_op in self.nic_pnew:
6294 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6295 for key, val in nic_dict.iteritems():
6296 result.append(("nic.%s/%d" % (key, nic_op), val))
6299 if self.op.hvparams:
6300 instance.hvparams = self.hv_inst
6301 for key, val in self.op.hvparams.iteritems():
6302 result.append(("hv/%s" % key, val))
6305 if self.op.beparams:
6306 instance.beparams = self.be_inst
6307 for key, val in self.op.beparams.iteritems():
6308 result.append(("be/%s" % key, val))
6310 self.cfg.Update(instance)
6315 class LUQueryExports(NoHooksLU):
6316 """Query the exports list
6319 _OP_REQP = ['nodes']
6322 def ExpandNames(self):
6323 self.needed_locks = {}
6324 self.share_locks[locking.LEVEL_NODE] = 1
6325 if not self.op.nodes:
6326 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6328 self.needed_locks[locking.LEVEL_NODE] = \
6329 _GetWantedNodes(self, self.op.nodes)
6331 def CheckPrereq(self):
6332 """Check prerequisites.
6335 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6337 def Exec(self, feedback_fn):
6338 """Compute the list of all the exported system images.
6341 @return: a dictionary with the structure node->(export-list)
6342 where export-list is a list of the instances exported on
6346 rpcresult = self.rpc.call_export_list(self.nodes)
6348 for node in rpcresult:
6349 if rpcresult[node].fail_msg:
6350 result[node] = False
6352 result[node] = rpcresult[node].payload
6357 class LUExportInstance(LogicalUnit):
6358 """Export an instance to an image in the cluster.
6361 HPATH = "instance-export"
6362 HTYPE = constants.HTYPE_INSTANCE
6363 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6366 def ExpandNames(self):
6367 self._ExpandAndLockInstance()
6368 # FIXME: lock only instance primary and destination node
6370 # Sad but true, for now we have do lock all nodes, as we don't know where
6371 # the previous export might be, and and in this LU we search for it and
6372 # remove it from its current node. In the future we could fix this by:
6373 # - making a tasklet to search (share-lock all), then create the new one,
6374 # then one to remove, after
6375 # - removing the removal operation altoghether
6376 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6378 def DeclareLocks(self, level):
6379 """Last minute lock declaration."""
6380 # All nodes are locked anyway, so nothing to do here.
6382 def BuildHooksEnv(self):
6385 This will run on the master, primary node and target node.
6389 "EXPORT_NODE": self.op.target_node,
6390 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6392 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6393 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6394 self.op.target_node]
6397 def CheckPrereq(self):
6398 """Check prerequisites.
6400 This checks that the instance and node names are valid.
6403 instance_name = self.op.instance_name
6404 self.instance = self.cfg.GetInstanceInfo(instance_name)
6405 assert self.instance is not None, \
6406 "Cannot retrieve locked instance %s" % self.op.instance_name
6407 _CheckNodeOnline(self, self.instance.primary_node)
6409 self.dst_node = self.cfg.GetNodeInfo(
6410 self.cfg.ExpandNodeName(self.op.target_node))
6412 if self.dst_node is None:
6413 # This is wrong node name, not a non-locked node
6414 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6415 _CheckNodeOnline(self, self.dst_node.name)
6416 _CheckNodeNotDrained(self, self.dst_node.name)
6418 # instance disk type verification
6419 for disk in self.instance.disks:
6420 if disk.dev_type == constants.LD_FILE:
6421 raise errors.OpPrereqError("Export not supported for instances with"
6422 " file-based disks")
6424 def Exec(self, feedback_fn):
6425 """Export an instance to an image in the cluster.
6428 instance = self.instance
6429 dst_node = self.dst_node
6430 src_node = instance.primary_node
6431 if self.op.shutdown:
6432 # shutdown the instance, but not the disks
6433 result = self.rpc.call_instance_shutdown(src_node, instance)
6434 result.Raise("Could not shutdown instance %s on"
6435 " node %s" % (instance.name, src_node))
6437 vgname = self.cfg.GetVGName()
6441 # set the disks ID correctly since call_instance_start needs the
6442 # correct drbd minor to create the symlinks
6443 for disk in instance.disks:
6444 self.cfg.SetDiskID(disk, src_node)
6447 for idx, disk in enumerate(instance.disks):
6448 # result.payload will be a snapshot of an lvm leaf of the one we passed
6449 result = self.rpc.call_blockdev_snapshot(src_node, disk)
6450 msg = result.fail_msg
6452 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6454 snap_disks.append(False)
6456 disk_id = (vgname, result.payload)
6457 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6458 logical_id=disk_id, physical_id=disk_id,
6459 iv_name=disk.iv_name)
6460 snap_disks.append(new_dev)
6463 if self.op.shutdown and instance.admin_up:
6464 result = self.rpc.call_instance_start(src_node, instance, None, None)
6465 msg = result.fail_msg
6467 _ShutdownInstanceDisks(self, instance)
6468 raise errors.OpExecError("Could not start instance: %s" % msg)
6470 # TODO: check for size
6472 cluster_name = self.cfg.GetClusterName()
6473 for idx, dev in enumerate(snap_disks):
6475 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6476 instance, cluster_name, idx)
6477 msg = result.fail_msg
6479 self.LogWarning("Could not export disk/%s from node %s to"
6480 " node %s: %s", idx, src_node, dst_node.name, msg)
6481 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6483 self.LogWarning("Could not remove snapshot for disk/%d from node"
6484 " %s: %s", idx, src_node, msg)
6486 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6487 msg = result.fail_msg
6489 self.LogWarning("Could not finalize export for instance %s"
6490 " on node %s: %s", instance.name, dst_node.name, msg)
6492 nodelist = self.cfg.GetNodeList()
6493 nodelist.remove(dst_node.name)
6495 # on one-node clusters nodelist will be empty after the removal
6496 # if we proceed the backup would be removed because OpQueryExports
6497 # substitutes an empty list with the full cluster node list.
6498 iname = instance.name
6500 exportlist = self.rpc.call_export_list(nodelist)
6501 for node in exportlist:
6502 if exportlist[node].fail_msg:
6504 if iname in exportlist[node].payload:
6505 msg = self.rpc.call_export_remove(node, iname).fail_msg
6507 self.LogWarning("Could not remove older export for instance %s"
6508 " on node %s: %s", iname, node, msg)
6511 class LURemoveExport(NoHooksLU):
6512 """Remove exports related to the named instance.
6515 _OP_REQP = ["instance_name"]
6518 def ExpandNames(self):
6519 self.needed_locks = {}
6520 # We need all nodes to be locked in order for RemoveExport to work, but we
6521 # don't need to lock the instance itself, as nothing will happen to it (and
6522 # we can remove exports also for a removed instance)
6523 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6525 def CheckPrereq(self):
6526 """Check prerequisites.
6530 def Exec(self, feedback_fn):
6531 """Remove any export.
6534 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6535 # If the instance was not found we'll try with the name that was passed in.
6536 # This will only work if it was an FQDN, though.
6538 if not instance_name:
6540 instance_name = self.op.instance_name
6542 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6543 exportlist = self.rpc.call_export_list(locked_nodes)
6545 for node in exportlist:
6546 msg = exportlist[node].fail_msg
6548 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6550 if instance_name in exportlist[node].payload:
6552 result = self.rpc.call_export_remove(node, instance_name)
6553 msg = result.fail_msg
6555 logging.error("Could not remove export for instance %s"
6556 " on node %s: %s", instance_name, node, msg)
6558 if fqdn_warn and not found:
6559 feedback_fn("Export not found. If trying to remove an export belonging"
6560 " to a deleted instance please use its Fully Qualified"
6564 class TagsLU(NoHooksLU):
6567 This is an abstract class which is the parent of all the other tags LUs.
6571 def ExpandNames(self):
6572 self.needed_locks = {}
6573 if self.op.kind == constants.TAG_NODE:
6574 name = self.cfg.ExpandNodeName(self.op.name)
6576 raise errors.OpPrereqError("Invalid node name (%s)" %
6579 self.needed_locks[locking.LEVEL_NODE] = name
6580 elif self.op.kind == constants.TAG_INSTANCE:
6581 name = self.cfg.ExpandInstanceName(self.op.name)
6583 raise errors.OpPrereqError("Invalid instance name (%s)" %
6586 self.needed_locks[locking.LEVEL_INSTANCE] = name
6588 def CheckPrereq(self):
6589 """Check prerequisites.
6592 if self.op.kind == constants.TAG_CLUSTER:
6593 self.target = self.cfg.GetClusterInfo()
6594 elif self.op.kind == constants.TAG_NODE:
6595 self.target = self.cfg.GetNodeInfo(self.op.name)
6596 elif self.op.kind == constants.TAG_INSTANCE:
6597 self.target = self.cfg.GetInstanceInfo(self.op.name)
6599 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6603 class LUGetTags(TagsLU):
6604 """Returns the tags of a given object.
6607 _OP_REQP = ["kind", "name"]
6610 def Exec(self, feedback_fn):
6611 """Returns the tag list.
6614 return list(self.target.GetTags())
6617 class LUSearchTags(NoHooksLU):
6618 """Searches the tags for a given pattern.
6621 _OP_REQP = ["pattern"]
6624 def ExpandNames(self):
6625 self.needed_locks = {}
6627 def CheckPrereq(self):
6628 """Check prerequisites.
6630 This checks the pattern passed for validity by compiling it.
6634 self.re = re.compile(self.op.pattern)
6635 except re.error, err:
6636 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6637 (self.op.pattern, err))
6639 def Exec(self, feedback_fn):
6640 """Returns the tag list.
6644 tgts = [("/cluster", cfg.GetClusterInfo())]
6645 ilist = cfg.GetAllInstancesInfo().values()
6646 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6647 nlist = cfg.GetAllNodesInfo().values()
6648 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6650 for path, target in tgts:
6651 for tag in target.GetTags():
6652 if self.re.search(tag):
6653 results.append((path, tag))
6657 class LUAddTags(TagsLU):
6658 """Sets a tag on a given object.
6661 _OP_REQP = ["kind", "name", "tags"]
6664 def CheckPrereq(self):
6665 """Check prerequisites.
6667 This checks the type and length of the tag name and value.
6670 TagsLU.CheckPrereq(self)
6671 for tag in self.op.tags:
6672 objects.TaggableObject.ValidateTag(tag)
6674 def Exec(self, feedback_fn):
6679 for tag in self.op.tags:
6680 self.target.AddTag(tag)
6681 except errors.TagError, err:
6682 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6684 self.cfg.Update(self.target)
6685 except errors.ConfigurationError:
6686 raise errors.OpRetryError("There has been a modification to the"
6687 " config file and the operation has been"
6688 " aborted. Please retry.")
6691 class LUDelTags(TagsLU):
6692 """Delete a list of tags from a given object.
6695 _OP_REQP = ["kind", "name", "tags"]
6698 def CheckPrereq(self):
6699 """Check prerequisites.
6701 This checks that we have the given tag.
6704 TagsLU.CheckPrereq(self)
6705 for tag in self.op.tags:
6706 objects.TaggableObject.ValidateTag(tag)
6707 del_tags = frozenset(self.op.tags)
6708 cur_tags = self.target.GetTags()
6709 if not del_tags <= cur_tags:
6710 diff_tags = del_tags - cur_tags
6711 diff_names = ["'%s'" % tag for tag in diff_tags]
6713 raise errors.OpPrereqError("Tag(s) %s not found" %
6714 (",".join(diff_names)))
6716 def Exec(self, feedback_fn):
6717 """Remove the tag from the object.
6720 for tag in self.op.tags:
6721 self.target.RemoveTag(tag)
6723 self.cfg.Update(self.target)
6724 except errors.ConfigurationError:
6725 raise errors.OpRetryError("There has been a modification to the"
6726 " config file and the operation has been"
6727 " aborted. Please retry.")
6730 class LUTestDelay(NoHooksLU):
6731 """Sleep for a specified amount of time.
6733 This LU sleeps on the master and/or nodes for a specified amount of
6737 _OP_REQP = ["duration", "on_master", "on_nodes"]
6740 def ExpandNames(self):
6741 """Expand names and set required locks.
6743 This expands the node list, if any.
6746 self.needed_locks = {}
6747 if self.op.on_nodes:
6748 # _GetWantedNodes can be used here, but is not always appropriate to use
6749 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6751 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6752 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6754 def CheckPrereq(self):
6755 """Check prerequisites.
6759 def Exec(self, feedback_fn):
6760 """Do the actual sleep.
6763 if self.op.on_master:
6764 if not utils.TestDelay(self.op.duration):
6765 raise errors.OpExecError("Error during master delay test")
6766 if self.op.on_nodes:
6767 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6768 for node, node_result in result.items():
6769 node_result.Raise("Failure during rpc call to node %s" % node)
6772 class IAllocator(object):
6773 """IAllocator framework.
6775 An IAllocator instance has three sets of attributes:
6776 - cfg that is needed to query the cluster
6777 - input data (all members of the _KEYS class attribute are required)
6778 - four buffer attributes (in|out_data|text), that represent the
6779 input (to the external script) in text and data structure format,
6780 and the output from it, again in two formats
6781 - the result variables from the script (success, info, nodes) for
6786 "mem_size", "disks", "disk_template",
6787 "os", "tags", "nics", "vcpus", "hypervisor",
6793 def __init__(self, lu, mode, name, **kwargs):
6795 # init buffer variables
6796 self.in_text = self.out_text = self.in_data = self.out_data = None
6797 # init all input fields so that pylint is happy
6800 self.mem_size = self.disks = self.disk_template = None
6801 self.os = self.tags = self.nics = self.vcpus = None
6802 self.hypervisor = None
6803 self.relocate_from = None
6805 self.required_nodes = None
6806 # init result fields
6807 self.success = self.info = self.nodes = None
6808 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6809 keyset = self._ALLO_KEYS
6810 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6811 keyset = self._RELO_KEYS
6813 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6814 " IAllocator" % self.mode)
6816 if key not in keyset:
6817 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6818 " IAllocator" % key)
6819 setattr(self, key, kwargs[key])
6821 if key not in kwargs:
6822 raise errors.ProgrammerError("Missing input parameter '%s' to"
6823 " IAllocator" % key)
6824 self._BuildInputData()
6826 def _ComputeClusterData(self):
6827 """Compute the generic allocator input data.
6829 This is the data that is independent of the actual operation.
6833 cluster_info = cfg.GetClusterInfo()
6836 "version": constants.IALLOCATOR_VERSION,
6837 "cluster_name": cfg.GetClusterName(),
6838 "cluster_tags": list(cluster_info.GetTags()),
6839 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6840 # we don't have job IDs
6842 iinfo = cfg.GetAllInstancesInfo().values()
6843 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6847 node_list = cfg.GetNodeList()
6849 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6850 hypervisor_name = self.hypervisor
6851 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6852 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6854 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6856 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6857 cluster_info.enabled_hypervisors)
6858 for nname, nresult in node_data.items():
6859 # first fill in static (config-based) values
6860 ninfo = cfg.GetNodeInfo(nname)
6862 "tags": list(ninfo.GetTags()),
6863 "primary_ip": ninfo.primary_ip,
6864 "secondary_ip": ninfo.secondary_ip,
6865 "offline": ninfo.offline,
6866 "drained": ninfo.drained,
6867 "master_candidate": ninfo.master_candidate,
6870 if not ninfo.offline:
6871 nresult.Raise("Can't get data for node %s" % nname)
6872 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
6874 remote_info = nresult.payload
6875 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6876 'vg_size', 'vg_free', 'cpu_total']:
6877 if attr not in remote_info:
6878 raise errors.OpExecError("Node '%s' didn't return attribute"
6879 " '%s'" % (nname, attr))
6880 if not isinstance(remote_info[attr], int):
6881 raise errors.OpExecError("Node '%s' returned invalid value"
6883 (nname, attr, remote_info[attr]))
6884 # compute memory used by primary instances
6885 i_p_mem = i_p_up_mem = 0
6886 for iinfo, beinfo in i_list:
6887 if iinfo.primary_node == nname:
6888 i_p_mem += beinfo[constants.BE_MEMORY]
6889 if iinfo.name not in node_iinfo[nname].payload:
6892 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6893 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6894 remote_info['memory_free'] -= max(0, i_mem_diff)
6897 i_p_up_mem += beinfo[constants.BE_MEMORY]
6899 # compute memory used by instances
6901 "total_memory": remote_info['memory_total'],
6902 "reserved_memory": remote_info['memory_dom0'],
6903 "free_memory": remote_info['memory_free'],
6904 "total_disk": remote_info['vg_size'],
6905 "free_disk": remote_info['vg_free'],
6906 "total_cpus": remote_info['cpu_total'],
6907 "i_pri_memory": i_p_mem,
6908 "i_pri_up_memory": i_p_up_mem,
6912 node_results[nname] = pnr
6913 data["nodes"] = node_results
6917 for iinfo, beinfo in i_list:
6919 for nic in iinfo.nics:
6920 filled_params = objects.FillDict(
6921 cluster_info.nicparams[constants.PP_DEFAULT],
6923 nic_dict = {"mac": nic.mac,
6925 "mode": filled_params[constants.NIC_MODE],
6926 "link": filled_params[constants.NIC_LINK],
6928 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6929 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6930 nic_data.append(nic_dict)
6932 "tags": list(iinfo.GetTags()),
6933 "admin_up": iinfo.admin_up,
6934 "vcpus": beinfo[constants.BE_VCPUS],
6935 "memory": beinfo[constants.BE_MEMORY],
6937 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6939 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6940 "disk_template": iinfo.disk_template,
6941 "hypervisor": iinfo.hypervisor,
6943 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6945 instance_data[iinfo.name] = pir
6947 data["instances"] = instance_data
6951 def _AddNewInstance(self):
6952 """Add new instance data to allocator structure.
6954 This in combination with _AllocatorGetClusterData will create the
6955 correct structure needed as input for the allocator.
6957 The checks for the completeness of the opcode must have already been
6963 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6965 if self.disk_template in constants.DTS_NET_MIRROR:
6966 self.required_nodes = 2
6968 self.required_nodes = 1
6972 "disk_template": self.disk_template,
6975 "vcpus": self.vcpus,
6976 "memory": self.mem_size,
6977 "disks": self.disks,
6978 "disk_space_total": disk_space,
6980 "required_nodes": self.required_nodes,
6982 data["request"] = request
6984 def _AddRelocateInstance(self):
6985 """Add relocate instance data to allocator structure.
6987 This in combination with _IAllocatorGetClusterData will create the
6988 correct structure needed as input for the allocator.
6990 The checks for the completeness of the opcode must have already been
6994 instance = self.lu.cfg.GetInstanceInfo(self.name)
6995 if instance is None:
6996 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6997 " IAllocator" % self.name)
6999 if instance.disk_template not in constants.DTS_NET_MIRROR:
7000 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7002 if len(instance.secondary_nodes) != 1:
7003 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7005 self.required_nodes = 1
7006 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7007 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7012 "disk_space_total": disk_space,
7013 "required_nodes": self.required_nodes,
7014 "relocate_from": self.relocate_from,
7016 self.in_data["request"] = request
7018 def _BuildInputData(self):
7019 """Build input data structures.
7022 self._ComputeClusterData()
7024 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7025 self._AddNewInstance()
7027 self._AddRelocateInstance()
7029 self.in_text = serializer.Dump(self.in_data)
7031 def Run(self, name, validate=True, call_fn=None):
7032 """Run an instance allocator and return the results.
7036 call_fn = self.lu.rpc.call_iallocator_runner
7039 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7040 result.Raise("Failure while running the iallocator script")
7042 self.out_text = result.payload
7044 self._ValidateResult()
7046 def _ValidateResult(self):
7047 """Process the allocator results.
7049 This will process and if successful save the result in
7050 self.out_data and the other parameters.
7054 rdict = serializer.Load(self.out_text)
7055 except Exception, err:
7056 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7058 if not isinstance(rdict, dict):
7059 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7061 for key in "success", "info", "nodes":
7062 if key not in rdict:
7063 raise errors.OpExecError("Can't parse iallocator results:"
7064 " missing key '%s'" % key)
7065 setattr(self, key, rdict[key])
7067 if not isinstance(rdict["nodes"], list):
7068 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7070 self.out_data = rdict
7073 class LUTestAllocator(NoHooksLU):
7074 """Run allocator tests.
7076 This LU runs the allocator tests
7079 _OP_REQP = ["direction", "mode", "name"]
7081 def CheckPrereq(self):
7082 """Check prerequisites.
7084 This checks the opcode parameters depending on the director and mode test.
7087 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7088 for attr in ["name", "mem_size", "disks", "disk_template",
7089 "os", "tags", "nics", "vcpus"]:
7090 if not hasattr(self.op, attr):
7091 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7093 iname = self.cfg.ExpandInstanceName(self.op.name)
7094 if iname is not None:
7095 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7097 if not isinstance(self.op.nics, list):
7098 raise errors.OpPrereqError("Invalid parameter 'nics'")
7099 for row in self.op.nics:
7100 if (not isinstance(row, dict) or
7103 "bridge" not in row):
7104 raise errors.OpPrereqError("Invalid contents of the"
7105 " 'nics' parameter")
7106 if not isinstance(self.op.disks, list):
7107 raise errors.OpPrereqError("Invalid parameter 'disks'")
7108 for row in self.op.disks:
7109 if (not isinstance(row, dict) or
7110 "size" not in row or
7111 not isinstance(row["size"], int) or
7112 "mode" not in row or
7113 row["mode"] not in ['r', 'w']):
7114 raise errors.OpPrereqError("Invalid contents of the"
7115 " 'disks' parameter")
7116 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7117 self.op.hypervisor = self.cfg.GetHypervisorType()
7118 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7119 if not hasattr(self.op, "name"):
7120 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7121 fname = self.cfg.ExpandInstanceName(self.op.name)
7123 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7125 self.op.name = fname
7126 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7128 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7131 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7132 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7133 raise errors.OpPrereqError("Missing allocator name")
7134 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7135 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7138 def Exec(self, feedback_fn):
7139 """Run the allocator test.
7142 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7143 ial = IAllocator(self,
7146 mem_size=self.op.mem_size,
7147 disks=self.op.disks,
7148 disk_template=self.op.disk_template,
7152 vcpus=self.op.vcpus,
7153 hypervisor=self.op.hypervisor,
7156 ial = IAllocator(self,
7159 relocate_from=list(self.relocate_from),
7162 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7163 result = ial.in_text
7165 ial.Run(self.op.allocator, validate=False)
7166 result = ial.out_text