4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 self.dry_run_result = None
95 for attr_name in self._OP_REQP:
96 attr_val = getattr(op, attr_name, None)
98 raise errors.OpPrereqError("Required parameter '%s' missing" %
100 self.CheckArguments()
103 """Returns the SshRunner object
107 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
110 ssh = property(fget=__GetSSH)
112 def CheckArguments(self):
113 """Check syntactic validity for the opcode arguments.
115 This method is for doing a simple syntactic check and ensure
116 validity of opcode parameters, without any cluster-related
117 checks. While the same can be accomplished in ExpandNames and/or
118 CheckPrereq, doing these separate is better because:
120 - ExpandNames is left as as purely a lock-related function
121 - CheckPrereq is run after we have acquired locks (and possible
124 The function is allowed to change the self.op attribute so that
125 later methods can no longer worry about missing parameters.
130 def ExpandNames(self):
131 """Expand names for this LU.
133 This method is called before starting to execute the opcode, and it should
134 update all the parameters of the opcode to their canonical form (e.g. a
135 short node name must be fully expanded after this method has successfully
136 completed). This way locking, hooks, logging, ecc. can work correctly.
138 LUs which implement this method must also populate the self.needed_locks
139 member, as a dict with lock levels as keys, and a list of needed lock names
142 - use an empty dict if you don't need any lock
143 - if you don't need any lock at a particular level omit that level
144 - don't put anything for the BGL level
145 - if you want all locks at a level use locking.ALL_SET as a value
147 If you need to share locks (rather than acquire them exclusively) at one
148 level you can modify self.share_locks, setting a true value (usually 1) for
149 that level. By default locks are not shared.
153 # Acquire all nodes and one instance
154 self.needed_locks = {
155 locking.LEVEL_NODE: locking.ALL_SET,
156 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
158 # Acquire just two nodes
159 self.needed_locks = {
160 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
163 self.needed_locks = {} # No, you can't leave it to the default value None
166 # The implementation of this method is mandatory only if the new LU is
167 # concurrent, so that old LUs don't need to be changed all at the same
170 self.needed_locks = {} # Exclusive LUs don't need locks.
172 raise NotImplementedError
174 def DeclareLocks(self, level):
175 """Declare LU locking needs for a level
177 While most LUs can just declare their locking needs at ExpandNames time,
178 sometimes there's the need to calculate some locks after having acquired
179 the ones before. This function is called just before acquiring locks at a
180 particular level, but after acquiring the ones at lower levels, and permits
181 such calculations. It can be used to modify self.needed_locks, and by
182 default it does nothing.
184 This function is only called if you have something already set in
185 self.needed_locks for the level.
187 @param level: Locking level which is going to be locked
188 @type level: member of ganeti.locking.LEVELS
192 def CheckPrereq(self):
193 """Check prerequisites for this LU.
195 This method should check that the prerequisites for the execution
196 of this LU are fulfilled. It can do internode communication, but
197 it should be idempotent - no cluster or system changes are
200 The method should raise errors.OpPrereqError in case something is
201 not fulfilled. Its return value is ignored.
203 This method should also update all the parameters of the opcode to
204 their canonical form if it hasn't been done by ExpandNames before.
207 raise NotImplementedError
209 def Exec(self, feedback_fn):
212 This method should implement the actual work. It should raise
213 errors.OpExecError for failures that are somewhat dealt with in
217 raise NotImplementedError
219 def BuildHooksEnv(self):
220 """Build hooks environment for this LU.
222 This method should return a three-node tuple consisting of: a dict
223 containing the environment that will be used for running the
224 specific hook for this LU, a list of node names on which the hook
225 should run before the execution, and a list of node names on which
226 the hook should run after the execution.
228 The keys of the dict must not have 'GANETI_' prefixed as this will
229 be handled in the hooks runner. Also note additional keys will be
230 added by the hooks runner. If the LU doesn't define any
231 environment, an empty dict (and not None) should be returned.
233 No nodes should be returned as an empty list (and not None).
235 Note that if the HPATH for a LU class is None, this function will
239 raise NotImplementedError
241 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
242 """Notify the LU about the results of its hooks.
244 This method is called every time a hooks phase is executed, and notifies
245 the Logical Unit about the hooks' result. The LU can then use it to alter
246 its result based on the hooks. By default the method does nothing and the
247 previous result is passed back unchanged but any LU can define it if it
248 wants to use the local cluster hook-scripts somehow.
250 @param phase: one of L{constants.HOOKS_PHASE_POST} or
251 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
252 @param hook_results: the results of the multi-node hooks rpc call
253 @param feedback_fn: function used send feedback back to the caller
254 @param lu_result: the previous Exec result this LU had, or None
256 @return: the new Exec result, based on the previous result
262 def _ExpandAndLockInstance(self):
263 """Helper function to expand and lock an instance.
265 Many LUs that work on an instance take its name in self.op.instance_name
266 and need to expand it and then declare the expanded name for locking. This
267 function does it, and then updates self.op.instance_name to the expanded
268 name. It also initializes needed_locks as a dict, if this hasn't been done
272 if self.needed_locks is None:
273 self.needed_locks = {}
275 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
276 "_ExpandAndLockInstance called with instance-level locks set"
277 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
278 if expanded_name is None:
279 raise errors.OpPrereqError("Instance '%s' not known" %
280 self.op.instance_name)
281 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
282 self.op.instance_name = expanded_name
284 def _LockInstancesNodes(self, primary_only=False):
285 """Helper function to declare instances' nodes for locking.
287 This function should be called after locking one or more instances to lock
288 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
289 with all primary or secondary nodes for instances already locked and
290 present in self.needed_locks[locking.LEVEL_INSTANCE].
292 It should be called from DeclareLocks, and for safety only works if
293 self.recalculate_locks[locking.LEVEL_NODE] is set.
295 In the future it may grow parameters to just lock some instance's nodes, or
296 to just lock primaries or secondary nodes, if needed.
298 If should be called in DeclareLocks in a way similar to::
300 if level == locking.LEVEL_NODE:
301 self._LockInstancesNodes()
303 @type primary_only: boolean
304 @param primary_only: only lock primary nodes of locked instances
307 assert locking.LEVEL_NODE in self.recalculate_locks, \
308 "_LockInstancesNodes helper function called with no nodes to recalculate"
310 # TODO: check if we're really been called with the instance locks held
312 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
313 # future we might want to have different behaviors depending on the value
314 # of self.recalculate_locks[locking.LEVEL_NODE]
316 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
317 instance = self.context.cfg.GetInstanceInfo(instance_name)
318 wanted_nodes.append(instance.primary_node)
320 wanted_nodes.extend(instance.secondary_nodes)
322 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
323 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
324 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
325 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
327 del self.recalculate_locks[locking.LEVEL_NODE]
330 class NoHooksLU(LogicalUnit):
331 """Simple LU which runs no hooks.
333 This LU is intended as a parent for other LogicalUnits which will
334 run no hooks, in order to reduce duplicate code.
341 def _GetWantedNodes(lu, nodes):
342 """Returns list of checked and expanded node names.
344 @type lu: L{LogicalUnit}
345 @param lu: the logical unit on whose behalf we execute
347 @param nodes: list of node names or None for all nodes
349 @return: the list of nodes, sorted
350 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
353 if not isinstance(nodes, list):
354 raise errors.OpPrereqError("Invalid argument type 'nodes'")
357 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
358 " non-empty list of nodes whose name is to be expanded.")
362 node = lu.cfg.ExpandNodeName(name)
364 raise errors.OpPrereqError("No such node name '%s'" % name)
367 return utils.NiceSort(wanted)
370 def _GetWantedInstances(lu, instances):
371 """Returns list of checked and expanded instance names.
373 @type lu: L{LogicalUnit}
374 @param lu: the logical unit on whose behalf we execute
375 @type instances: list
376 @param instances: list of instance names or None for all instances
378 @return: the list of instances, sorted
379 @raise errors.OpPrereqError: if the instances parameter is wrong type
380 @raise errors.OpPrereqError: if any of the passed instances is not found
383 if not isinstance(instances, list):
384 raise errors.OpPrereqError("Invalid argument type 'instances'")
389 for name in instances:
390 instance = lu.cfg.ExpandInstanceName(name)
392 raise errors.OpPrereqError("No such instance name '%s'" % name)
393 wanted.append(instance)
396 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
400 def _CheckOutputFields(static, dynamic, selected):
401 """Checks whether all selected fields are valid.
403 @type static: L{utils.FieldSet}
404 @param static: static fields set
405 @type dynamic: L{utils.FieldSet}
406 @param dynamic: dynamic fields set
413 delta = f.NonMatching(selected)
415 raise errors.OpPrereqError("Unknown output fields selected: %s"
419 def _CheckBooleanOpField(op, name):
420 """Validates boolean opcode parameters.
422 This will ensure that an opcode parameter is either a boolean value,
423 or None (but that it always exists).
426 val = getattr(op, name, None)
427 if not (val is None or isinstance(val, bool)):
428 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
430 setattr(op, name, val)
433 def _CheckNodeOnline(lu, node):
434 """Ensure that a given node is online.
436 @param lu: the LU on behalf of which we make the check
437 @param node: the node to check
438 @raise errors.OpPrereqError: if the node is offline
441 if lu.cfg.GetNodeInfo(node).offline:
442 raise errors.OpPrereqError("Can't use offline node %s" % node)
445 def _CheckNodeNotDrained(lu, node):
446 """Ensure that a given node is not drained.
448 @param lu: the LU on behalf of which we make the check
449 @param node: the node to check
450 @raise errors.OpPrereqError: if the node is drained
453 if lu.cfg.GetNodeInfo(node).drained:
454 raise errors.OpPrereqError("Can't use drained node %s" % node)
457 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
458 memory, vcpus, nics, disk_template, disks,
459 bep, hvp, hypervisor_name):
460 """Builds instance related env variables for hooks
462 This builds the hook environment from individual variables.
465 @param name: the name of the instance
466 @type primary_node: string
467 @param primary_node: the name of the instance's primary node
468 @type secondary_nodes: list
469 @param secondary_nodes: list of secondary nodes as strings
470 @type os_type: string
471 @param os_type: the name of the instance's OS
472 @type status: boolean
473 @param status: the should_run status of the instance
475 @param memory: the memory size of the instance
477 @param vcpus: the count of VCPUs the instance has
479 @param nics: list of tuples (ip, mac, mode, link) representing
480 the NICs the instance has
481 @type disk_template: string
482 @param disk_template: the disk template of the instance
484 @param disks: the list of (size, mode) pairs
486 @param bep: the backend parameters for the instance
488 @param hvp: the hypervisor parameters for the instance
489 @type hypervisor_name: string
490 @param hypervisor_name: the hypervisor for the instance
492 @return: the hook environment for this instance
501 "INSTANCE_NAME": name,
502 "INSTANCE_PRIMARY": primary_node,
503 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
504 "INSTANCE_OS_TYPE": os_type,
505 "INSTANCE_STATUS": str_status,
506 "INSTANCE_MEMORY": memory,
507 "INSTANCE_VCPUS": vcpus,
508 "INSTANCE_DISK_TEMPLATE": disk_template,
509 "INSTANCE_HYPERVISOR": hypervisor_name,
513 nic_count = len(nics)
514 for idx, (ip, mac, mode, link) in enumerate(nics):
517 env["INSTANCE_NIC%d_IP" % idx] = ip
518 env["INSTANCE_NIC%d_MAC" % idx] = mac
519 env["INSTANCE_NIC%d_MODE" % idx] = mode
520 env["INSTANCE_NIC%d_LINK" % idx] = link
521 if mode == constants.NIC_MODE_BRIDGED:
522 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
526 env["INSTANCE_NIC_COUNT"] = nic_count
529 disk_count = len(disks)
530 for idx, (size, mode) in enumerate(disks):
531 env["INSTANCE_DISK%d_SIZE" % idx] = size
532 env["INSTANCE_DISK%d_MODE" % idx] = mode
536 env["INSTANCE_DISK_COUNT"] = disk_count
538 for source, kind in [(bep, "BE"), (hvp, "HV")]:
539 for key, value in source.items():
540 env["INSTANCE_%s_%s" % (kind, key)] = value
544 def _NICListToTuple(lu, nics):
545 """Build a list of nic information tuples.
547 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
548 value in LUQueryInstanceData.
550 @type lu: L{LogicalUnit}
551 @param lu: the logical unit on whose behalf we execute
552 @type nics: list of L{objects.NIC}
553 @param nics: list of nics to convert to hooks tuples
557 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
561 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
562 mode = filled_params[constants.NIC_MODE]
563 link = filled_params[constants.NIC_LINK]
564 hooks_nics.append((ip, mac, mode, link))
567 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
568 """Builds instance related env variables for hooks from an object.
570 @type lu: L{LogicalUnit}
571 @param lu: the logical unit on whose behalf we execute
572 @type instance: L{objects.Instance}
573 @param instance: the instance for which we should build the
576 @param override: dictionary with key/values that will override
579 @return: the hook environment dictionary
582 cluster = lu.cfg.GetClusterInfo()
583 bep = cluster.FillBE(instance)
584 hvp = cluster.FillHV(instance)
586 'name': instance.name,
587 'primary_node': instance.primary_node,
588 'secondary_nodes': instance.secondary_nodes,
589 'os_type': instance.os,
590 'status': instance.admin_up,
591 'memory': bep[constants.BE_MEMORY],
592 'vcpus': bep[constants.BE_VCPUS],
593 'nics': _NICListToTuple(lu, instance.nics),
594 'disk_template': instance.disk_template,
595 'disks': [(disk.size, disk.mode) for disk in instance.disks],
598 'hypervisor': instance.hypervisor,
601 args.update(override)
602 return _BuildInstanceHookEnv(**args)
605 def _AdjustCandidatePool(lu):
606 """Adjust the candidate pool after node operations.
609 mod_list = lu.cfg.MaintainCandidatePool()
611 lu.LogInfo("Promoted nodes to master candidate role: %s",
612 ", ".join(node.name for node in mod_list))
613 for name in mod_list:
614 lu.context.ReaddNode(name)
615 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
617 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
621 def _CheckNicsBridgesExist(lu, target_nics, target_node,
622 profile=constants.PP_DEFAULT):
623 """Check that the brigdes needed by a list of nics exist.
626 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
627 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
628 for nic in target_nics]
629 brlist = [params[constants.NIC_LINK] for params in paramslist
630 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
632 result = lu.rpc.call_bridges_exist(target_node, brlist)
633 result.Raise("Error checking bridges on destination node '%s'" %
634 target_node, prereq=True)
637 def _CheckInstanceBridgesExist(lu, instance, node=None):
638 """Check that the brigdes needed by an instance exist.
642 node = instance.primary_node
643 _CheckNicsBridgesExist(lu, instance.nics, node)
646 class LUDestroyCluster(NoHooksLU):
647 """Logical unit for destroying the cluster.
652 def CheckPrereq(self):
653 """Check prerequisites.
655 This checks whether the cluster is empty.
657 Any errors are signaled by raising errors.OpPrereqError.
660 master = self.cfg.GetMasterNode()
662 nodelist = self.cfg.GetNodeList()
663 if len(nodelist) != 1 or nodelist[0] != master:
664 raise errors.OpPrereqError("There are still %d node(s) in"
665 " this cluster." % (len(nodelist) - 1))
666 instancelist = self.cfg.GetInstanceList()
668 raise errors.OpPrereqError("There are still %d instance(s) in"
669 " this cluster." % len(instancelist))
671 def Exec(self, feedback_fn):
672 """Destroys the cluster.
675 master = self.cfg.GetMasterNode()
676 result = self.rpc.call_node_stop_master(master, False)
677 result.Raise("Could not disable the master role")
678 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
679 utils.CreateBackup(priv_key)
680 utils.CreateBackup(pub_key)
684 class LUVerifyCluster(LogicalUnit):
685 """Verifies the cluster status.
688 HPATH = "cluster-verify"
689 HTYPE = constants.HTYPE_CLUSTER
690 _OP_REQP = ["skip_checks"]
693 def ExpandNames(self):
694 self.needed_locks = {
695 locking.LEVEL_NODE: locking.ALL_SET,
696 locking.LEVEL_INSTANCE: locking.ALL_SET,
698 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
700 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
701 node_result, feedback_fn, master_files,
703 """Run multiple tests against a node.
707 - compares ganeti version
708 - checks vg existence and size > 20G
709 - checks config file checksum
710 - checks ssh to other nodes
712 @type nodeinfo: L{objects.Node}
713 @param nodeinfo: the node to check
714 @param file_list: required list of files
715 @param local_cksum: dictionary of local files and their checksums
716 @param node_result: the results from the node
717 @param feedback_fn: function used to accumulate results
718 @param master_files: list of files that only masters should have
719 @param drbd_map: the useddrbd minors for this node, in
720 form of minor: (instance, must_exist) which correspond to instances
721 and their running status
722 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
727 # main result, node_result should be a non-empty dict
728 if not node_result or not isinstance(node_result, dict):
729 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
732 # compares ganeti version
733 local_version = constants.PROTOCOL_VERSION
734 remote_version = node_result.get('version', None)
735 if not (remote_version and isinstance(remote_version, (list, tuple)) and
736 len(remote_version) == 2):
737 feedback_fn(" - ERROR: connection to %s failed" % (node))
740 if local_version != remote_version[0]:
741 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
742 " node %s %s" % (local_version, node, remote_version[0]))
745 # node seems compatible, we can actually try to look into its results
749 # full package version
750 if constants.RELEASE_VERSION != remote_version[1]:
751 feedback_fn(" - WARNING: software version mismatch: master %s,"
753 (constants.RELEASE_VERSION, node, remote_version[1]))
755 # checks vg existence and size > 20G
756 if vg_name is not None:
757 vglist = node_result.get(constants.NV_VGLIST, None)
759 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
763 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
764 constants.MIN_VG_SIZE)
766 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
769 # checks config file checksum
771 remote_cksum = node_result.get(constants.NV_FILELIST, None)
772 if not isinstance(remote_cksum, dict):
774 feedback_fn(" - ERROR: node hasn't returned file checksum data")
776 for file_name in file_list:
777 node_is_mc = nodeinfo.master_candidate
778 must_have_file = file_name not in master_files
779 if file_name not in remote_cksum:
780 if node_is_mc or must_have_file:
782 feedback_fn(" - ERROR: file '%s' missing" % file_name)
783 elif remote_cksum[file_name] != local_cksum[file_name]:
784 if node_is_mc or must_have_file:
786 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
788 # not candidate and this is not a must-have file
790 feedback_fn(" - ERROR: file '%s' should not exist on non master"
791 " candidates (and the file is outdated)" % file_name)
793 # all good, except non-master/non-must have combination
794 if not node_is_mc and not must_have_file:
795 feedback_fn(" - ERROR: file '%s' should not exist on non master"
796 " candidates" % file_name)
800 if constants.NV_NODELIST not in node_result:
802 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
804 if node_result[constants.NV_NODELIST]:
806 for node in node_result[constants.NV_NODELIST]:
807 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
808 (node, node_result[constants.NV_NODELIST][node]))
810 if constants.NV_NODENETTEST not in node_result:
812 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
814 if node_result[constants.NV_NODENETTEST]:
816 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
818 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
819 (node, node_result[constants.NV_NODENETTEST][node]))
821 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
822 if isinstance(hyp_result, dict):
823 for hv_name, hv_result in hyp_result.iteritems():
824 if hv_result is not None:
825 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
826 (hv_name, hv_result))
828 # check used drbd list
829 if vg_name is not None:
830 used_minors = node_result.get(constants.NV_DRBDLIST, [])
831 if not isinstance(used_minors, (tuple, list)):
832 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
835 for minor, (iname, must_exist) in drbd_map.items():
836 if minor not in used_minors and must_exist:
837 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
838 " not active" % (minor, iname))
840 for minor in used_minors:
841 if minor not in drbd_map:
842 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
848 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
849 node_instance, feedback_fn, n_offline):
850 """Verify an instance.
852 This function checks to see if the required block devices are
853 available on the instance's node.
858 node_current = instanceconfig.primary_node
861 instanceconfig.MapLVsByNode(node_vol_should)
863 for node in node_vol_should:
864 if node in n_offline:
865 # ignore missing volumes on offline nodes
867 for volume in node_vol_should[node]:
868 if node not in node_vol_is or volume not in node_vol_is[node]:
869 feedback_fn(" - ERROR: volume %s missing on node %s" %
873 if instanceconfig.admin_up:
874 if ((node_current not in node_instance or
875 not instance in node_instance[node_current]) and
876 node_current not in n_offline):
877 feedback_fn(" - ERROR: instance %s not running on node %s" %
878 (instance, node_current))
881 for node in node_instance:
882 if (not node == node_current):
883 if instance in node_instance[node]:
884 feedback_fn(" - ERROR: instance %s should not run on node %s" %
890 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
891 """Verify if there are any unknown volumes in the cluster.
893 The .os, .swap and backup volumes are ignored. All other volumes are
899 for node in node_vol_is:
900 for volume in node_vol_is[node]:
901 if node not in node_vol_should or volume not in node_vol_should[node]:
902 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
907 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
908 """Verify the list of running instances.
910 This checks what instances are running but unknown to the cluster.
914 for node in node_instance:
915 for runninginstance in node_instance[node]:
916 if runninginstance not in instancelist:
917 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
918 (runninginstance, node))
922 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
923 """Verify N+1 Memory Resilience.
925 Check that if one single node dies we can still start all the instances it
931 for node, nodeinfo in node_info.iteritems():
932 # This code checks that every node which is now listed as secondary has
933 # enough memory to host all instances it is supposed to should a single
934 # other node in the cluster fail.
935 # FIXME: not ready for failover to an arbitrary node
936 # FIXME: does not support file-backed instances
937 # WARNING: we currently take into account down instances as well as up
938 # ones, considering that even if they're down someone might want to start
939 # them even in the event of a node failure.
940 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
942 for instance in instances:
943 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
944 if bep[constants.BE_AUTO_BALANCE]:
945 needed_mem += bep[constants.BE_MEMORY]
946 if nodeinfo['mfree'] < needed_mem:
947 feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
948 " failovers should node %s fail" % (node, prinode))
952 def CheckPrereq(self):
953 """Check prerequisites.
955 Transform the list of checks we're going to skip into a set and check that
956 all its members are valid.
959 self.skip_set = frozenset(self.op.skip_checks)
960 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
961 raise errors.OpPrereqError("Invalid checks to be skipped specified")
963 def BuildHooksEnv(self):
966 Cluster-Verify hooks just ran in the post phase and their failure makes
967 the output be logged in the verify output and the verification to fail.
970 all_nodes = self.cfg.GetNodeList()
972 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
974 for node in self.cfg.GetAllNodesInfo().values():
975 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
977 return env, [], all_nodes
979 def Exec(self, feedback_fn):
980 """Verify integrity of cluster, performing various test on nodes.
984 feedback_fn("* Verifying global settings")
985 for msg in self.cfg.VerifyConfig():
986 feedback_fn(" - ERROR: %s" % msg)
988 vg_name = self.cfg.GetVGName()
989 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
990 nodelist = utils.NiceSort(self.cfg.GetNodeList())
991 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
992 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
993 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
994 for iname in instancelist)
995 i_non_redundant = [] # Non redundant instances
996 i_non_a_balanced = [] # Non auto-balanced instances
997 n_offline = [] # List of offline nodes
998 n_drained = [] # List of nodes being drained
1004 # FIXME: verify OS list
1005 # do local checksums
1006 master_files = [constants.CLUSTER_CONF_FILE]
1008 file_names = ssconf.SimpleStore().GetFileList()
1009 file_names.append(constants.SSL_CERT_FILE)
1010 file_names.append(constants.RAPI_CERT_FILE)
1011 file_names.extend(master_files)
1013 local_checksums = utils.FingerprintFiles(file_names)
1015 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1016 node_verify_param = {
1017 constants.NV_FILELIST: file_names,
1018 constants.NV_NODELIST: [node.name for node in nodeinfo
1019 if not node.offline],
1020 constants.NV_HYPERVISOR: hypervisors,
1021 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1022 node.secondary_ip) for node in nodeinfo
1023 if not node.offline],
1024 constants.NV_INSTANCELIST: hypervisors,
1025 constants.NV_VERSION: None,
1026 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1028 if vg_name is not None:
1029 node_verify_param[constants.NV_VGLIST] = None
1030 node_verify_param[constants.NV_LVLIST] = vg_name
1031 node_verify_param[constants.NV_DRBDLIST] = None
1032 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1033 self.cfg.GetClusterName())
1035 cluster = self.cfg.GetClusterInfo()
1036 master_node = self.cfg.GetMasterNode()
1037 all_drbd_map = self.cfg.ComputeDRBDMap()
1039 for node_i in nodeinfo:
1043 feedback_fn("* Skipping offline node %s" % (node,))
1044 n_offline.append(node)
1047 if node == master_node:
1049 elif node_i.master_candidate:
1050 ntype = "master candidate"
1051 elif node_i.drained:
1053 n_drained.append(node)
1056 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1058 msg = all_nvinfo[node].fail_msg
1060 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1064 nresult = all_nvinfo[node].payload
1066 for minor, instance in all_drbd_map[node].items():
1067 if instance not in instanceinfo:
1068 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1070 # ghost instance should not be running, but otherwise we
1071 # don't give double warnings (both ghost instance and
1072 # unallocated minor in use)
1073 node_drbd[minor] = (instance, False)
1075 instance = instanceinfo[instance]
1076 node_drbd[minor] = (instance.name, instance.admin_up)
1077 result = self._VerifyNode(node_i, file_names, local_checksums,
1078 nresult, feedback_fn, master_files,
1082 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1084 node_volume[node] = {}
1085 elif isinstance(lvdata, basestring):
1086 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1087 (node, utils.SafeEncode(lvdata)))
1089 node_volume[node] = {}
1090 elif not isinstance(lvdata, dict):
1091 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1095 node_volume[node] = lvdata
1098 idata = nresult.get(constants.NV_INSTANCELIST, None)
1099 if not isinstance(idata, list):
1100 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1105 node_instance[node] = idata
1108 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1109 if not isinstance(nodeinfo, dict):
1110 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1116 "mfree": int(nodeinfo['memory_free']),
1119 # dictionary holding all instances this node is secondary for,
1120 # grouped by their primary node. Each key is a cluster node, and each
1121 # value is a list of instances which have the key as primary and the
1122 # current node as secondary. this is handy to calculate N+1 memory
1123 # availability if you can only failover from a primary to its
1125 "sinst-by-pnode": {},
1127 # FIXME: devise a free space model for file based instances as well
1128 if vg_name is not None:
1129 if (constants.NV_VGLIST not in nresult or
1130 vg_name not in nresult[constants.NV_VGLIST]):
1131 feedback_fn(" - ERROR: node %s didn't return data for the"
1132 " volume group '%s' - it is either missing or broken" %
1136 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1137 except (ValueError, KeyError):
1138 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1139 " from node %s" % (node,))
1143 node_vol_should = {}
1145 for instance in instancelist:
1146 feedback_fn("* Verifying instance %s" % instance)
1147 inst_config = instanceinfo[instance]
1148 result = self._VerifyInstance(instance, inst_config, node_volume,
1149 node_instance, feedback_fn, n_offline)
1151 inst_nodes_offline = []
1153 inst_config.MapLVsByNode(node_vol_should)
1155 instance_cfg[instance] = inst_config
1157 pnode = inst_config.primary_node
1158 if pnode in node_info:
1159 node_info[pnode]['pinst'].append(instance)
1160 elif pnode not in n_offline:
1161 feedback_fn(" - ERROR: instance %s, connection to primary node"
1162 " %s failed" % (instance, pnode))
1165 if pnode in n_offline:
1166 inst_nodes_offline.append(pnode)
1168 # If the instance is non-redundant we cannot survive losing its primary
1169 # node, so we are not N+1 compliant. On the other hand we have no disk
1170 # templates with more than one secondary so that situation is not well
1172 # FIXME: does not support file-backed instances
1173 if len(inst_config.secondary_nodes) == 0:
1174 i_non_redundant.append(instance)
1175 elif len(inst_config.secondary_nodes) > 1:
1176 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1179 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1180 i_non_a_balanced.append(instance)
1182 for snode in inst_config.secondary_nodes:
1183 if snode in node_info:
1184 node_info[snode]['sinst'].append(instance)
1185 if pnode not in node_info[snode]['sinst-by-pnode']:
1186 node_info[snode]['sinst-by-pnode'][pnode] = []
1187 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1188 elif snode not in n_offline:
1189 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1190 " %s failed" % (instance, snode))
1192 if snode in n_offline:
1193 inst_nodes_offline.append(snode)
1195 if inst_nodes_offline:
1196 # warn that the instance lives on offline nodes, and set bad=True
1197 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1198 ", ".join(inst_nodes_offline))
1201 feedback_fn("* Verifying orphan volumes")
1202 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1206 feedback_fn("* Verifying remaining instances")
1207 result = self._VerifyOrphanInstances(instancelist, node_instance,
1211 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1212 feedback_fn("* Verifying N+1 Memory redundancy")
1213 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1216 feedback_fn("* Other Notes")
1218 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1219 % len(i_non_redundant))
1221 if i_non_a_balanced:
1222 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1223 % len(i_non_a_balanced))
1226 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1229 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1233 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1234 """Analyze the post-hooks' result
1236 This method analyses the hook result, handles it, and sends some
1237 nicely-formatted feedback back to the user.
1239 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1240 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1241 @param hooks_results: the results of the multi-node hooks rpc call
1242 @param feedback_fn: function used send feedback back to the caller
1243 @param lu_result: previous Exec result
1244 @return: the new Exec result, based on the previous result
1248 # We only really run POST phase hooks, and are only interested in
1250 if phase == constants.HOOKS_PHASE_POST:
1251 # Used to change hooks' output to proper indentation
1252 indent_re = re.compile('^', re.M)
1253 feedback_fn("* Hooks Results")
1254 if not hooks_results:
1255 feedback_fn(" - ERROR: general communication failure")
1258 for node_name in hooks_results:
1259 show_node_header = True
1260 res = hooks_results[node_name]
1264 # no need to warn or set fail return value
1266 feedback_fn(" Communication failure in hooks execution: %s" %
1270 for script, hkr, output in res.payload:
1271 if hkr == constants.HKR_FAIL:
1272 # The node header is only shown once, if there are
1273 # failing hooks on that node
1274 if show_node_header:
1275 feedback_fn(" Node %s:" % node_name)
1276 show_node_header = False
1277 feedback_fn(" ERROR: Script %s failed, output:" % script)
1278 output = indent_re.sub(' ', output)
1279 feedback_fn("%s" % output)
1285 class LUVerifyDisks(NoHooksLU):
1286 """Verifies the cluster disks status.
1292 def ExpandNames(self):
1293 self.needed_locks = {
1294 locking.LEVEL_NODE: locking.ALL_SET,
1295 locking.LEVEL_INSTANCE: locking.ALL_SET,
1297 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1299 def CheckPrereq(self):
1300 """Check prerequisites.
1302 This has no prerequisites.
1307 def Exec(self, feedback_fn):
1308 """Verify integrity of cluster disks.
1310 @rtype: tuple of three items
1311 @return: a tuple of (dict of node-to-node_error, list of instances
1312 which need activate-disks, dict of instance: (node, volume) for
1316 result = res_nodes, res_instances, res_missing = {}, [], {}
1318 vg_name = self.cfg.GetVGName()
1319 nodes = utils.NiceSort(self.cfg.GetNodeList())
1320 instances = [self.cfg.GetInstanceInfo(name)
1321 for name in self.cfg.GetInstanceList()]
1324 for inst in instances:
1326 if (not inst.admin_up or
1327 inst.disk_template not in constants.DTS_NET_MIRROR):
1329 inst.MapLVsByNode(inst_lvs)
1330 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1331 for node, vol_list in inst_lvs.iteritems():
1332 for vol in vol_list:
1333 nv_dict[(node, vol)] = inst
1338 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1342 node_res = node_lvs[node]
1343 if node_res.offline:
1345 msg = node_res.fail_msg
1347 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1348 res_nodes[node] = msg
1351 lvs = node_res.payload
1352 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1353 inst = nv_dict.pop((node, lv_name), None)
1354 if (not lv_online and inst is not None
1355 and inst.name not in res_instances):
1356 res_instances.append(inst.name)
1358 # any leftover items in nv_dict are missing LVs, let's arrange the
1360 for key, inst in nv_dict.iteritems():
1361 if inst.name not in res_missing:
1362 res_missing[inst.name] = []
1363 res_missing[inst.name].append(key)
1368 class LURenameCluster(LogicalUnit):
1369 """Rename the cluster.
1372 HPATH = "cluster-rename"
1373 HTYPE = constants.HTYPE_CLUSTER
1376 def BuildHooksEnv(self):
1381 "OP_TARGET": self.cfg.GetClusterName(),
1382 "NEW_NAME": self.op.name,
1384 mn = self.cfg.GetMasterNode()
1385 return env, [mn], [mn]
1387 def CheckPrereq(self):
1388 """Verify that the passed name is a valid one.
1391 hostname = utils.HostInfo(self.op.name)
1393 new_name = hostname.name
1394 self.ip = new_ip = hostname.ip
1395 old_name = self.cfg.GetClusterName()
1396 old_ip = self.cfg.GetMasterIP()
1397 if new_name == old_name and new_ip == old_ip:
1398 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1399 " cluster has changed")
1400 if new_ip != old_ip:
1401 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1402 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1403 " reachable on the network. Aborting." %
1406 self.op.name = new_name
1408 def Exec(self, feedback_fn):
1409 """Rename the cluster.
1412 clustername = self.op.name
1415 # shutdown the master IP
1416 master = self.cfg.GetMasterNode()
1417 result = self.rpc.call_node_stop_master(master, False)
1418 result.Raise("Could not disable the master role")
1421 cluster = self.cfg.GetClusterInfo()
1422 cluster.cluster_name = clustername
1423 cluster.master_ip = ip
1424 self.cfg.Update(cluster)
1426 # update the known hosts file
1427 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1428 node_list = self.cfg.GetNodeList()
1430 node_list.remove(master)
1433 result = self.rpc.call_upload_file(node_list,
1434 constants.SSH_KNOWN_HOSTS_FILE)
1435 for to_node, to_result in result.iteritems():
1436 msg = to_result.fail_msg
1438 msg = ("Copy of file %s to node %s failed: %s" %
1439 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1440 self.proc.LogWarning(msg)
1443 result = self.rpc.call_node_start_master(master, False, False)
1444 msg = result.fail_msg
1446 self.LogWarning("Could not re-enable the master role on"
1447 " the master, please restart manually: %s", msg)
1450 def _RecursiveCheckIfLVMBased(disk):
1451 """Check if the given disk or its children are lvm-based.
1453 @type disk: L{objects.Disk}
1454 @param disk: the disk to check
1456 @return: boolean indicating whether a LD_LV dev_type was found or not
1460 for chdisk in disk.children:
1461 if _RecursiveCheckIfLVMBased(chdisk):
1463 return disk.dev_type == constants.LD_LV
1466 class LUSetClusterParams(LogicalUnit):
1467 """Change the parameters of the cluster.
1470 HPATH = "cluster-modify"
1471 HTYPE = constants.HTYPE_CLUSTER
1475 def CheckArguments(self):
1479 if not hasattr(self.op, "candidate_pool_size"):
1480 self.op.candidate_pool_size = None
1481 if self.op.candidate_pool_size is not None:
1483 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1484 except (ValueError, TypeError), err:
1485 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1487 if self.op.candidate_pool_size < 1:
1488 raise errors.OpPrereqError("At least one master candidate needed")
1490 def ExpandNames(self):
1491 # FIXME: in the future maybe other cluster params won't require checking on
1492 # all nodes to be modified.
1493 self.needed_locks = {
1494 locking.LEVEL_NODE: locking.ALL_SET,
1496 self.share_locks[locking.LEVEL_NODE] = 1
1498 def BuildHooksEnv(self):
1503 "OP_TARGET": self.cfg.GetClusterName(),
1504 "NEW_VG_NAME": self.op.vg_name,
1506 mn = self.cfg.GetMasterNode()
1507 return env, [mn], [mn]
1509 def CheckPrereq(self):
1510 """Check prerequisites.
1512 This checks whether the given params don't conflict and
1513 if the given volume group is valid.
1516 if self.op.vg_name is not None and not self.op.vg_name:
1517 instances = self.cfg.GetAllInstancesInfo().values()
1518 for inst in instances:
1519 for disk in inst.disks:
1520 if _RecursiveCheckIfLVMBased(disk):
1521 raise errors.OpPrereqError("Cannot disable lvm storage while"
1522 " lvm-based instances exist")
1524 node_list = self.acquired_locks[locking.LEVEL_NODE]
1526 # if vg_name not None, checks given volume group on all nodes
1528 vglist = self.rpc.call_vg_list(node_list)
1529 for node in node_list:
1530 msg = vglist[node].fail_msg
1532 # ignoring down node
1533 self.LogWarning("Error while gathering data on node %s"
1534 " (ignoring node): %s", node, msg)
1536 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1538 constants.MIN_VG_SIZE)
1540 raise errors.OpPrereqError("Error on node '%s': %s" %
1543 self.cluster = cluster = self.cfg.GetClusterInfo()
1544 # validate params changes
1545 if self.op.beparams:
1546 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1547 self.new_beparams = objects.FillDict(
1548 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1550 if self.op.nicparams:
1551 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1552 self.new_nicparams = objects.FillDict(
1553 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1554 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1556 # hypervisor list/parameters
1557 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1558 if self.op.hvparams:
1559 if not isinstance(self.op.hvparams, dict):
1560 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1561 for hv_name, hv_dict in self.op.hvparams.items():
1562 if hv_name not in self.new_hvparams:
1563 self.new_hvparams[hv_name] = hv_dict
1565 self.new_hvparams[hv_name].update(hv_dict)
1567 if self.op.enabled_hypervisors is not None:
1568 self.hv_list = self.op.enabled_hypervisors
1570 self.hv_list = cluster.enabled_hypervisors
1572 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1573 # either the enabled list has changed, or the parameters have, validate
1574 for hv_name, hv_params in self.new_hvparams.items():
1575 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1576 (self.op.enabled_hypervisors and
1577 hv_name in self.op.enabled_hypervisors)):
1578 # either this is a new hypervisor, or its parameters have changed
1579 hv_class = hypervisor.GetHypervisor(hv_name)
1580 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1581 hv_class.CheckParameterSyntax(hv_params)
1582 _CheckHVParams(self, node_list, hv_name, hv_params)
1584 def Exec(self, feedback_fn):
1585 """Change the parameters of the cluster.
1588 if self.op.vg_name is not None:
1589 new_volume = self.op.vg_name
1592 if new_volume != self.cfg.GetVGName():
1593 self.cfg.SetVGName(new_volume)
1595 feedback_fn("Cluster LVM configuration already in desired"
1596 " state, not changing")
1597 if self.op.hvparams:
1598 self.cluster.hvparams = self.new_hvparams
1599 if self.op.enabled_hypervisors is not None:
1600 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1601 if self.op.beparams:
1602 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1603 if self.op.nicparams:
1604 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1606 if self.op.candidate_pool_size is not None:
1607 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1608 # we need to update the pool size here, otherwise the save will fail
1609 _AdjustCandidatePool(self)
1611 self.cfg.Update(self.cluster)
1614 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1615 """Distribute additional files which are part of the cluster configuration.
1617 ConfigWriter takes care of distributing the config and ssconf files, but
1618 there are more files which should be distributed to all nodes. This function
1619 makes sure those are copied.
1621 @param lu: calling logical unit
1622 @param additional_nodes: list of nodes not in the config to distribute to
1625 # 1. Gather target nodes
1626 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1627 dist_nodes = lu.cfg.GetNodeList()
1628 if additional_nodes is not None:
1629 dist_nodes.extend(additional_nodes)
1630 if myself.name in dist_nodes:
1631 dist_nodes.remove(myself.name)
1632 # 2. Gather files to distribute
1633 dist_files = set([constants.ETC_HOSTS,
1634 constants.SSH_KNOWN_HOSTS_FILE,
1635 constants.RAPI_CERT_FILE,
1636 constants.RAPI_USERS_FILE,
1639 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1640 for hv_name in enabled_hypervisors:
1641 hv_class = hypervisor.GetHypervisor(hv_name)
1642 dist_files.update(hv_class.GetAncillaryFiles())
1644 # 3. Perform the files upload
1645 for fname in dist_files:
1646 if os.path.exists(fname):
1647 result = lu.rpc.call_upload_file(dist_nodes, fname)
1648 for to_node, to_result in result.items():
1649 msg = to_result.fail_msg
1651 msg = ("Copy of file %s to node %s failed: %s" %
1652 (fname, to_node, msg))
1653 lu.proc.LogWarning(msg)
1656 class LURedistributeConfig(NoHooksLU):
1657 """Force the redistribution of cluster configuration.
1659 This is a very simple LU.
1665 def ExpandNames(self):
1666 self.needed_locks = {
1667 locking.LEVEL_NODE: locking.ALL_SET,
1669 self.share_locks[locking.LEVEL_NODE] = 1
1671 def CheckPrereq(self):
1672 """Check prerequisites.
1676 def Exec(self, feedback_fn):
1677 """Redistribute the configuration.
1680 self.cfg.Update(self.cfg.GetClusterInfo())
1681 _RedistributeAncillaryFiles(self)
1684 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1685 """Sleep and poll for an instance's disk to sync.
1688 if not instance.disks:
1692 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1694 node = instance.primary_node
1696 for dev in instance.disks:
1697 lu.cfg.SetDiskID(dev, node)
1700 degr_retries = 10 # in seconds, as we sleep 1 second each time
1704 cumul_degraded = False
1705 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1706 msg = rstats.fail_msg
1708 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1711 raise errors.RemoteError("Can't contact node %s for mirror data,"
1712 " aborting." % node)
1715 rstats = rstats.payload
1717 for i, mstat in enumerate(rstats):
1719 lu.LogWarning("Can't compute data for node %s/%s",
1720 node, instance.disks[i].iv_name)
1722 # we ignore the ldisk parameter
1723 perc_done, est_time, is_degraded, _ = mstat
1724 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1725 if perc_done is not None:
1727 if est_time is not None:
1728 rem_time = "%d estimated seconds remaining" % est_time
1731 rem_time = "no time estimate"
1732 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1733 (instance.disks[i].iv_name, perc_done, rem_time))
1735 # if we're done but degraded, let's do a few small retries, to
1736 # make sure we see a stable and not transient situation; therefore
1737 # we force restart of the loop
1738 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1739 logging.info("Degraded disks found, %d retries left", degr_retries)
1747 time.sleep(min(60, max_time))
1750 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1751 return not cumul_degraded
1754 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1755 """Check that mirrors are not degraded.
1757 The ldisk parameter, if True, will change the test from the
1758 is_degraded attribute (which represents overall non-ok status for
1759 the device(s)) to the ldisk (representing the local storage status).
1762 lu.cfg.SetDiskID(dev, node)
1769 if on_primary or dev.AssembleOnSecondary():
1770 rstats = lu.rpc.call_blockdev_find(node, dev)
1771 msg = rstats.fail_msg
1773 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1775 elif not rstats.payload:
1776 lu.LogWarning("Can't find disk on node %s", node)
1779 result = result and (not rstats.payload[idx])
1781 for child in dev.children:
1782 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1787 class LUDiagnoseOS(NoHooksLU):
1788 """Logical unit for OS diagnose/query.
1791 _OP_REQP = ["output_fields", "names"]
1793 _FIELDS_STATIC = utils.FieldSet()
1794 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1796 def ExpandNames(self):
1798 raise errors.OpPrereqError("Selective OS query not supported")
1800 _CheckOutputFields(static=self._FIELDS_STATIC,
1801 dynamic=self._FIELDS_DYNAMIC,
1802 selected=self.op.output_fields)
1804 # Lock all nodes, in shared mode
1805 # Temporary removal of locks, should be reverted later
1806 # TODO: reintroduce locks when they are lighter-weight
1807 self.needed_locks = {}
1808 #self.share_locks[locking.LEVEL_NODE] = 1
1809 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1811 def CheckPrereq(self):
1812 """Check prerequisites.
1817 def _DiagnoseByOS(node_list, rlist):
1818 """Remaps a per-node return list into an a per-os per-node dictionary
1820 @param node_list: a list with the names of all nodes
1821 @param rlist: a map with node names as keys and OS objects as values
1824 @return: a dictionary with osnames as keys and as value another map, with
1825 nodes as keys and tuples of (path, status, diagnose) as values, eg::
1827 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1828 (/srv/..., False, "invalid api")],
1829 "node2": [(/srv/..., True, "")]}
1834 # we build here the list of nodes that didn't fail the RPC (at RPC
1835 # level), so that nodes with a non-responding node daemon don't
1836 # make all OSes invalid
1837 good_nodes = [node_name for node_name in rlist
1838 if not rlist[node_name].fail_msg]
1839 for node_name, nr in rlist.items():
1840 if nr.fail_msg or not nr.payload:
1842 for name, path, status, diagnose in nr.payload:
1843 if name not in all_os:
1844 # build a list of nodes for this os containing empty lists
1845 # for each node in node_list
1847 for nname in good_nodes:
1848 all_os[name][nname] = []
1849 all_os[name][node_name].append((path, status, diagnose))
1852 def Exec(self, feedback_fn):
1853 """Compute the list of OSes.
1856 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1857 node_data = self.rpc.call_os_diagnose(valid_nodes)
1858 pol = self._DiagnoseByOS(valid_nodes, node_data)
1860 for os_name, os_data in pol.items():
1862 for field in self.op.output_fields:
1865 elif field == "valid":
1866 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1867 elif field == "node_status":
1868 # this is just a copy of the dict
1870 for node_name, nos_list in os_data.items():
1871 val[node_name] = nos_list
1873 raise errors.ParameterError(field)
1880 class LURemoveNode(LogicalUnit):
1881 """Logical unit for removing a node.
1884 HPATH = "node-remove"
1885 HTYPE = constants.HTYPE_NODE
1886 _OP_REQP = ["node_name"]
1888 def BuildHooksEnv(self):
1891 This doesn't run on the target node in the pre phase as a failed
1892 node would then be impossible to remove.
1896 "OP_TARGET": self.op.node_name,
1897 "NODE_NAME": self.op.node_name,
1899 all_nodes = self.cfg.GetNodeList()
1900 all_nodes.remove(self.op.node_name)
1901 return env, all_nodes, all_nodes
1903 def CheckPrereq(self):
1904 """Check prerequisites.
1907 - the node exists in the configuration
1908 - it does not have primary or secondary instances
1909 - it's not the master
1911 Any errors are signaled by raising errors.OpPrereqError.
1914 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1916 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1918 instance_list = self.cfg.GetInstanceList()
1920 masternode = self.cfg.GetMasterNode()
1921 if node.name == masternode:
1922 raise errors.OpPrereqError("Node is the master node,"
1923 " you need to failover first.")
1925 for instance_name in instance_list:
1926 instance = self.cfg.GetInstanceInfo(instance_name)
1927 if node.name in instance.all_nodes:
1928 raise errors.OpPrereqError("Instance %s is still running on the node,"
1929 " please remove first." % instance_name)
1930 self.op.node_name = node.name
1933 def Exec(self, feedback_fn):
1934 """Removes the node from the cluster.
1938 logging.info("Stopping the node daemon and removing configs from node %s",
1941 self.context.RemoveNode(node.name)
1943 result = self.rpc.call_node_leave_cluster(node.name)
1944 msg = result.fail_msg
1946 self.LogWarning("Errors encountered on the remote node while leaving"
1947 " the cluster: %s", msg)
1949 # Promote nodes to master candidate as needed
1950 _AdjustCandidatePool(self)
1953 class LUQueryNodes(NoHooksLU):
1954 """Logical unit for querying nodes.
1957 _OP_REQP = ["output_fields", "names", "use_locking"]
1959 _FIELDS_DYNAMIC = utils.FieldSet(
1961 "mtotal", "mnode", "mfree",
1963 "ctotal", "cnodes", "csockets",
1966 _FIELDS_STATIC = utils.FieldSet(
1967 "name", "pinst_cnt", "sinst_cnt",
1968 "pinst_list", "sinst_list",
1969 "pip", "sip", "tags",
1978 def ExpandNames(self):
1979 _CheckOutputFields(static=self._FIELDS_STATIC,
1980 dynamic=self._FIELDS_DYNAMIC,
1981 selected=self.op.output_fields)
1983 self.needed_locks = {}
1984 self.share_locks[locking.LEVEL_NODE] = 1
1987 self.wanted = _GetWantedNodes(self, self.op.names)
1989 self.wanted = locking.ALL_SET
1991 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1992 self.do_locking = self.do_node_query and self.op.use_locking
1994 # if we don't request only static fields, we need to lock the nodes
1995 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1998 def CheckPrereq(self):
1999 """Check prerequisites.
2002 # The validation of the node list is done in the _GetWantedNodes,
2003 # if non empty, and if empty, there's no validation to do
2006 def Exec(self, feedback_fn):
2007 """Computes the list of nodes and their attributes.
2010 all_info = self.cfg.GetAllNodesInfo()
2012 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2013 elif self.wanted != locking.ALL_SET:
2014 nodenames = self.wanted
2015 missing = set(nodenames).difference(all_info.keys())
2017 raise errors.OpExecError(
2018 "Some nodes were removed before retrieving their data: %s" % missing)
2020 nodenames = all_info.keys()
2022 nodenames = utils.NiceSort(nodenames)
2023 nodelist = [all_info[name] for name in nodenames]
2025 # begin data gathering
2027 if self.do_node_query:
2029 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2030 self.cfg.GetHypervisorType())
2031 for name in nodenames:
2032 nodeinfo = node_data[name]
2033 if not nodeinfo.fail_msg and nodeinfo.payload:
2034 nodeinfo = nodeinfo.payload
2035 fn = utils.TryConvert
2037 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2038 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2039 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2040 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2041 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2042 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2043 "bootid": nodeinfo.get('bootid', None),
2044 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2045 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2048 live_data[name] = {}
2050 live_data = dict.fromkeys(nodenames, {})
2052 node_to_primary = dict([(name, set()) for name in nodenames])
2053 node_to_secondary = dict([(name, set()) for name in nodenames])
2055 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2056 "sinst_cnt", "sinst_list"))
2057 if inst_fields & frozenset(self.op.output_fields):
2058 instancelist = self.cfg.GetInstanceList()
2060 for instance_name in instancelist:
2061 inst = self.cfg.GetInstanceInfo(instance_name)
2062 if inst.primary_node in node_to_primary:
2063 node_to_primary[inst.primary_node].add(inst.name)
2064 for secnode in inst.secondary_nodes:
2065 if secnode in node_to_secondary:
2066 node_to_secondary[secnode].add(inst.name)
2068 master_node = self.cfg.GetMasterNode()
2070 # end data gathering
2073 for node in nodelist:
2075 for field in self.op.output_fields:
2078 elif field == "pinst_list":
2079 val = list(node_to_primary[node.name])
2080 elif field == "sinst_list":
2081 val = list(node_to_secondary[node.name])
2082 elif field == "pinst_cnt":
2083 val = len(node_to_primary[node.name])
2084 elif field == "sinst_cnt":
2085 val = len(node_to_secondary[node.name])
2086 elif field == "pip":
2087 val = node.primary_ip
2088 elif field == "sip":
2089 val = node.secondary_ip
2090 elif field == "tags":
2091 val = list(node.GetTags())
2092 elif field == "serial_no":
2093 val = node.serial_no
2094 elif field == "master_candidate":
2095 val = node.master_candidate
2096 elif field == "master":
2097 val = node.name == master_node
2098 elif field == "offline":
2100 elif field == "drained":
2102 elif self._FIELDS_DYNAMIC.Matches(field):
2103 val = live_data[node.name].get(field, None)
2104 elif field == "role":
2105 if node.name == master_node:
2107 elif node.master_candidate:
2116 raise errors.ParameterError(field)
2117 node_output.append(val)
2118 output.append(node_output)
2123 class LUQueryNodeVolumes(NoHooksLU):
2124 """Logical unit for getting volumes on node(s).
2127 _OP_REQP = ["nodes", "output_fields"]
2129 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2130 _FIELDS_STATIC = utils.FieldSet("node")
2132 def ExpandNames(self):
2133 _CheckOutputFields(static=self._FIELDS_STATIC,
2134 dynamic=self._FIELDS_DYNAMIC,
2135 selected=self.op.output_fields)
2137 self.needed_locks = {}
2138 self.share_locks[locking.LEVEL_NODE] = 1
2139 if not self.op.nodes:
2140 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2142 self.needed_locks[locking.LEVEL_NODE] = \
2143 _GetWantedNodes(self, self.op.nodes)
2145 def CheckPrereq(self):
2146 """Check prerequisites.
2148 This checks that the fields required are valid output fields.
2151 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2153 def Exec(self, feedback_fn):
2154 """Computes the list of nodes and their attributes.
2157 nodenames = self.nodes
2158 volumes = self.rpc.call_node_volumes(nodenames)
2160 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2161 in self.cfg.GetInstanceList()]
2163 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2166 for node in nodenames:
2167 nresult = volumes[node]
2170 msg = nresult.fail_msg
2172 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2175 node_vols = nresult.payload[:]
2176 node_vols.sort(key=lambda vol: vol['dev'])
2178 for vol in node_vols:
2180 for field in self.op.output_fields:
2183 elif field == "phys":
2187 elif field == "name":
2189 elif field == "size":
2190 val = int(float(vol['size']))
2191 elif field == "instance":
2193 if node not in lv_by_node[inst]:
2195 if vol['name'] in lv_by_node[inst][node]:
2201 raise errors.ParameterError(field)
2202 node_output.append(str(val))
2204 output.append(node_output)
2209 class LUAddNode(LogicalUnit):
2210 """Logical unit for adding node to the cluster.
2214 HTYPE = constants.HTYPE_NODE
2215 _OP_REQP = ["node_name"]
2217 def BuildHooksEnv(self):
2220 This will run on all nodes before, and on all nodes + the new node after.
2224 "OP_TARGET": self.op.node_name,
2225 "NODE_NAME": self.op.node_name,
2226 "NODE_PIP": self.op.primary_ip,
2227 "NODE_SIP": self.op.secondary_ip,
2229 nodes_0 = self.cfg.GetNodeList()
2230 nodes_1 = nodes_0 + [self.op.node_name, ]
2231 return env, nodes_0, nodes_1
2233 def CheckPrereq(self):
2234 """Check prerequisites.
2237 - the new node is not already in the config
2239 - its parameters (single/dual homed) matches the cluster
2241 Any errors are signaled by raising errors.OpPrereqError.
2244 node_name = self.op.node_name
2247 dns_data = utils.HostInfo(node_name)
2249 node = dns_data.name
2250 primary_ip = self.op.primary_ip = dns_data.ip
2251 secondary_ip = getattr(self.op, "secondary_ip", None)
2252 if secondary_ip is None:
2253 secondary_ip = primary_ip
2254 if not utils.IsValidIP(secondary_ip):
2255 raise errors.OpPrereqError("Invalid secondary IP given")
2256 self.op.secondary_ip = secondary_ip
2258 node_list = cfg.GetNodeList()
2259 if not self.op.readd and node in node_list:
2260 raise errors.OpPrereqError("Node %s is already in the configuration" %
2262 elif self.op.readd and node not in node_list:
2263 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2265 for existing_node_name in node_list:
2266 existing_node = cfg.GetNodeInfo(existing_node_name)
2268 if self.op.readd and node == existing_node_name:
2269 if (existing_node.primary_ip != primary_ip or
2270 existing_node.secondary_ip != secondary_ip):
2271 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2272 " address configuration as before")
2275 if (existing_node.primary_ip == primary_ip or
2276 existing_node.secondary_ip == primary_ip or
2277 existing_node.primary_ip == secondary_ip or
2278 existing_node.secondary_ip == secondary_ip):
2279 raise errors.OpPrereqError("New node ip address(es) conflict with"
2280 " existing node %s" % existing_node.name)
2282 # check that the type of the node (single versus dual homed) is the
2283 # same as for the master
2284 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2285 master_singlehomed = myself.secondary_ip == myself.primary_ip
2286 newbie_singlehomed = secondary_ip == primary_ip
2287 if master_singlehomed != newbie_singlehomed:
2288 if master_singlehomed:
2289 raise errors.OpPrereqError("The master has no private ip but the"
2290 " new node has one")
2292 raise errors.OpPrereqError("The master has a private ip but the"
2293 " new node doesn't have one")
2295 # checks reachability
2296 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2297 raise errors.OpPrereqError("Node not reachable by ping")
2299 if not newbie_singlehomed:
2300 # check reachability from my secondary ip to newbie's secondary ip
2301 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2302 source=myself.secondary_ip):
2303 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2304 " based ping to noded port")
2306 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2311 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2312 # the new node will increase mc_max with one, so:
2313 mc_max = min(mc_max + 1, cp_size)
2314 self.master_candidate = mc_now < mc_max
2317 self.new_node = self.cfg.GetNodeInfo(node)
2318 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2320 self.new_node = objects.Node(name=node,
2321 primary_ip=primary_ip,
2322 secondary_ip=secondary_ip,
2323 master_candidate=self.master_candidate,
2324 offline=False, drained=False)
2326 def Exec(self, feedback_fn):
2327 """Adds the new node to the cluster.
2330 new_node = self.new_node
2331 node = new_node.name
2333 # for re-adds, reset the offline/drained/master-candidate flags;
2334 # we need to reset here, otherwise offline would prevent RPC calls
2335 # later in the procedure; this also means that if the re-add
2336 # fails, we are left with a non-offlined, broken node
2338 new_node.drained = new_node.offline = False
2339 self.LogInfo("Readding a node, the offline/drained flags were reset")
2340 # if we demote the node, we do cleanup later in the procedure
2341 new_node.master_candidate = self.master_candidate
2343 # notify the user about any possible mc promotion
2344 if new_node.master_candidate:
2345 self.LogInfo("Node will be a master candidate")
2347 # check connectivity
2348 result = self.rpc.call_version([node])[node]
2349 result.Raise("Can't get version information from node %s" % node)
2350 if constants.PROTOCOL_VERSION == result.payload:
2351 logging.info("Communication to node %s fine, sw version %s match",
2352 node, result.payload)
2354 raise errors.OpExecError("Version mismatch master version %s,"
2355 " node version %s" %
2356 (constants.PROTOCOL_VERSION, result.payload))
2359 logging.info("Copy ssh key to node %s", node)
2360 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2362 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2363 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2369 keyarray.append(f.read())
2373 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2375 keyarray[3], keyarray[4], keyarray[5])
2376 result.Raise("Cannot transfer ssh keys to the new node")
2378 # Add node to our /etc/hosts, and add key to known_hosts
2379 if self.cfg.GetClusterInfo().modify_etc_hosts:
2380 utils.AddHostToEtcHosts(new_node.name)
2382 if new_node.secondary_ip != new_node.primary_ip:
2383 result = self.rpc.call_node_has_ip_address(new_node.name,
2384 new_node.secondary_ip)
2385 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2387 if not result.payload:
2388 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2389 " you gave (%s). Please fix and re-run this"
2390 " command." % new_node.secondary_ip)
2392 node_verify_list = [self.cfg.GetMasterNode()]
2393 node_verify_param = {
2395 # TODO: do a node-net-test as well?
2398 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2399 self.cfg.GetClusterName())
2400 for verifier in node_verify_list:
2401 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2402 nl_payload = result[verifier].payload['nodelist']
2404 for failed in nl_payload:
2405 feedback_fn("ssh/hostname verification failed %s -> %s" %
2406 (verifier, nl_payload[failed]))
2407 raise errors.OpExecError("ssh/hostname verification failed.")
2410 _RedistributeAncillaryFiles(self)
2411 self.context.ReaddNode(new_node)
2412 # make sure we redistribute the config
2413 self.cfg.Update(new_node)
2414 # and make sure the new node will not have old files around
2415 if not new_node.master_candidate:
2416 result = self.rpc.call_node_demote_from_mc(new_node.name)
2417 msg = result.RemoteFailMsg()
2419 self.LogWarning("Node failed to demote itself from master"
2420 " candidate status: %s" % msg)
2422 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2423 self.context.AddNode(new_node)
2426 class LUSetNodeParams(LogicalUnit):
2427 """Modifies the parameters of a node.
2430 HPATH = "node-modify"
2431 HTYPE = constants.HTYPE_NODE
2432 _OP_REQP = ["node_name"]
2435 def CheckArguments(self):
2436 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2437 if node_name is None:
2438 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2439 self.op.node_name = node_name
2440 _CheckBooleanOpField(self.op, 'master_candidate')
2441 _CheckBooleanOpField(self.op, 'offline')
2442 _CheckBooleanOpField(self.op, 'drained')
2443 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2444 if all_mods.count(None) == 3:
2445 raise errors.OpPrereqError("Please pass at least one modification")
2446 if all_mods.count(True) > 1:
2447 raise errors.OpPrereqError("Can't set the node into more than one"
2448 " state at the same time")
2450 def ExpandNames(self):
2451 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2453 def BuildHooksEnv(self):
2456 This runs on the master node.
2460 "OP_TARGET": self.op.node_name,
2461 "MASTER_CANDIDATE": str(self.op.master_candidate),
2462 "OFFLINE": str(self.op.offline),
2463 "DRAINED": str(self.op.drained),
2465 nl = [self.cfg.GetMasterNode(),
2469 def CheckPrereq(self):
2470 """Check prerequisites.
2472 This only checks the instance list against the existing names.
2475 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2477 if ((self.op.master_candidate == False or self.op.offline == True or
2478 self.op.drained == True) and node.master_candidate):
2479 # we will demote the node from master_candidate
2480 if self.op.node_name == self.cfg.GetMasterNode():
2481 raise errors.OpPrereqError("The master node has to be a"
2482 " master candidate, online and not drained")
2483 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2484 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2485 if num_candidates <= cp_size:
2486 msg = ("Not enough master candidates (desired"
2487 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2489 self.LogWarning(msg)
2491 raise errors.OpPrereqError(msg)
2493 if (self.op.master_candidate == True and
2494 ((node.offline and not self.op.offline == False) or
2495 (node.drained and not self.op.drained == False))):
2496 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2497 " to master_candidate" % node.name)
2501 def Exec(self, feedback_fn):
2510 if self.op.offline is not None:
2511 node.offline = self.op.offline
2512 result.append(("offline", str(self.op.offline)))
2513 if self.op.offline == True:
2514 if node.master_candidate:
2515 node.master_candidate = False
2517 result.append(("master_candidate", "auto-demotion due to offline"))
2519 node.drained = False
2520 result.append(("drained", "clear drained status due to offline"))
2522 if self.op.master_candidate is not None:
2523 node.master_candidate = self.op.master_candidate
2525 result.append(("master_candidate", str(self.op.master_candidate)))
2526 if self.op.master_candidate == False:
2527 rrc = self.rpc.call_node_demote_from_mc(node.name)
2530 self.LogWarning("Node failed to demote itself: %s" % msg)
2532 if self.op.drained is not None:
2533 node.drained = self.op.drained
2534 result.append(("drained", str(self.op.drained)))
2535 if self.op.drained == True:
2536 if node.master_candidate:
2537 node.master_candidate = False
2539 result.append(("master_candidate", "auto-demotion due to drain"))
2540 rrc = self.rpc.call_node_demote_from_mc(node.name)
2541 msg = rrc.RemoteFailMsg()
2543 self.LogWarning("Node failed to demote itself: %s" % msg)
2545 node.offline = False
2546 result.append(("offline", "clear offline status due to drain"))
2548 # this will trigger configuration file update, if needed
2549 self.cfg.Update(node)
2550 # this will trigger job queue propagation or cleanup
2552 self.context.ReaddNode(node)
2557 class LUPowercycleNode(NoHooksLU):
2558 """Powercycles a node.
2561 _OP_REQP = ["node_name", "force"]
2564 def CheckArguments(self):
2565 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2566 if node_name is None:
2567 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2568 self.op.node_name = node_name
2569 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2570 raise errors.OpPrereqError("The node is the master and the force"
2571 " parameter was not set")
2573 def ExpandNames(self):
2574 """Locking for PowercycleNode.
2576 This is a last-resource option and shouldn't block on other
2577 jobs. Therefore, we grab no locks.
2580 self.needed_locks = {}
2582 def CheckPrereq(self):
2583 """Check prerequisites.
2585 This LU has no prereqs.
2590 def Exec(self, feedback_fn):
2594 result = self.rpc.call_node_powercycle(self.op.node_name,
2595 self.cfg.GetHypervisorType())
2596 result.Raise("Failed to schedule the reboot")
2597 return result.payload
2600 class LUQueryClusterInfo(NoHooksLU):
2601 """Query cluster configuration.
2607 def ExpandNames(self):
2608 self.needed_locks = {}
2610 def CheckPrereq(self):
2611 """No prerequsites needed for this LU.
2616 def Exec(self, feedback_fn):
2617 """Return cluster config.
2620 cluster = self.cfg.GetClusterInfo()
2622 "software_version": constants.RELEASE_VERSION,
2623 "protocol_version": constants.PROTOCOL_VERSION,
2624 "config_version": constants.CONFIG_VERSION,
2625 "os_api_version": max(constants.OS_API_VERSIONS),
2626 "export_version": constants.EXPORT_VERSION,
2627 "architecture": (platform.architecture()[0], platform.machine()),
2628 "name": cluster.cluster_name,
2629 "master": cluster.master_node,
2630 "default_hypervisor": cluster.default_hypervisor,
2631 "enabled_hypervisors": cluster.enabled_hypervisors,
2632 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2633 for hypervisor_name in cluster.enabled_hypervisors]),
2634 "beparams": cluster.beparams,
2635 "nicparams": cluster.nicparams,
2636 "candidate_pool_size": cluster.candidate_pool_size,
2637 "master_netdev": cluster.master_netdev,
2638 "volume_group_name": cluster.volume_group_name,
2639 "file_storage_dir": cluster.file_storage_dir,
2645 class LUQueryConfigValues(NoHooksLU):
2646 """Return configuration values.
2651 _FIELDS_DYNAMIC = utils.FieldSet()
2652 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2654 def ExpandNames(self):
2655 self.needed_locks = {}
2657 _CheckOutputFields(static=self._FIELDS_STATIC,
2658 dynamic=self._FIELDS_DYNAMIC,
2659 selected=self.op.output_fields)
2661 def CheckPrereq(self):
2662 """No prerequisites.
2667 def Exec(self, feedback_fn):
2668 """Dump a representation of the cluster config to the standard output.
2672 for field in self.op.output_fields:
2673 if field == "cluster_name":
2674 entry = self.cfg.GetClusterName()
2675 elif field == "master_node":
2676 entry = self.cfg.GetMasterNode()
2677 elif field == "drain_flag":
2678 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2680 raise errors.ParameterError(field)
2681 values.append(entry)
2685 class LUActivateInstanceDisks(NoHooksLU):
2686 """Bring up an instance's disks.
2689 _OP_REQP = ["instance_name"]
2692 def ExpandNames(self):
2693 self._ExpandAndLockInstance()
2694 self.needed_locks[locking.LEVEL_NODE] = []
2695 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2697 def DeclareLocks(self, level):
2698 if level == locking.LEVEL_NODE:
2699 self._LockInstancesNodes()
2701 def CheckPrereq(self):
2702 """Check prerequisites.
2704 This checks that the instance is in the cluster.
2707 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2708 assert self.instance is not None, \
2709 "Cannot retrieve locked instance %s" % self.op.instance_name
2710 _CheckNodeOnline(self, self.instance.primary_node)
2712 def Exec(self, feedback_fn):
2713 """Activate the disks.
2716 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2718 raise errors.OpExecError("Cannot activate block devices")
2723 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2724 """Prepare the block devices for an instance.
2726 This sets up the block devices on all nodes.
2728 @type lu: L{LogicalUnit}
2729 @param lu: the logical unit on whose behalf we execute
2730 @type instance: L{objects.Instance}
2731 @param instance: the instance for whose disks we assemble
2732 @type ignore_secondaries: boolean
2733 @param ignore_secondaries: if true, errors on secondary nodes
2734 won't result in an error return from the function
2735 @return: False if the operation failed, otherwise a list of
2736 (host, instance_visible_name, node_visible_name)
2737 with the mapping from node devices to instance devices
2742 iname = instance.name
2743 # With the two passes mechanism we try to reduce the window of
2744 # opportunity for the race condition of switching DRBD to primary
2745 # before handshaking occured, but we do not eliminate it
2747 # The proper fix would be to wait (with some limits) until the
2748 # connection has been made and drbd transitions from WFConnection
2749 # into any other network-connected state (Connected, SyncTarget,
2752 # 1st pass, assemble on all nodes in secondary mode
2753 for inst_disk in instance.disks:
2754 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2755 lu.cfg.SetDiskID(node_disk, node)
2756 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2757 msg = result.fail_msg
2759 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2760 " (is_primary=False, pass=1): %s",
2761 inst_disk.iv_name, node, msg)
2762 if not ignore_secondaries:
2765 # FIXME: race condition on drbd migration to primary
2767 # 2nd pass, do only the primary node
2768 for inst_disk in instance.disks:
2769 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2770 if node != instance.primary_node:
2772 lu.cfg.SetDiskID(node_disk, node)
2773 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2774 msg = result.fail_msg
2776 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2777 " (is_primary=True, pass=2): %s",
2778 inst_disk.iv_name, node, msg)
2780 device_info.append((instance.primary_node, inst_disk.iv_name,
2783 # leave the disks configured for the primary node
2784 # this is a workaround that would be fixed better by
2785 # improving the logical/physical id handling
2786 for disk in instance.disks:
2787 lu.cfg.SetDiskID(disk, instance.primary_node)
2789 return disks_ok, device_info
2792 def _StartInstanceDisks(lu, instance, force):
2793 """Start the disks of an instance.
2796 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2797 ignore_secondaries=force)
2799 _ShutdownInstanceDisks(lu, instance)
2800 if force is not None and not force:
2801 lu.proc.LogWarning("", hint="If the message above refers to a"
2803 " you can retry the operation using '--force'.")
2804 raise errors.OpExecError("Disk consistency error")
2807 class LUDeactivateInstanceDisks(NoHooksLU):
2808 """Shutdown an instance's disks.
2811 _OP_REQP = ["instance_name"]
2814 def ExpandNames(self):
2815 self._ExpandAndLockInstance()
2816 self.needed_locks[locking.LEVEL_NODE] = []
2817 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2819 def DeclareLocks(self, level):
2820 if level == locking.LEVEL_NODE:
2821 self._LockInstancesNodes()
2823 def CheckPrereq(self):
2824 """Check prerequisites.
2826 This checks that the instance is in the cluster.
2829 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2830 assert self.instance is not None, \
2831 "Cannot retrieve locked instance %s" % self.op.instance_name
2833 def Exec(self, feedback_fn):
2834 """Deactivate the disks
2837 instance = self.instance
2838 _SafeShutdownInstanceDisks(self, instance)
2841 def _SafeShutdownInstanceDisks(lu, instance):
2842 """Shutdown block devices of an instance.
2844 This function checks if an instance is running, before calling
2845 _ShutdownInstanceDisks.
2848 pnode = instance.primary_node
2849 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2850 ins_l.Raise("Can't contact node %s" % pnode)
2852 if instance.name in ins_l.payload:
2853 raise errors.OpExecError("Instance is running, can't shutdown"
2856 _ShutdownInstanceDisks(lu, instance)
2859 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2860 """Shutdown block devices of an instance.
2862 This does the shutdown on all nodes of the instance.
2864 If the ignore_primary is false, errors on the primary node are
2869 for disk in instance.disks:
2870 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2871 lu.cfg.SetDiskID(top_disk, node)
2872 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2873 msg = result.fail_msg
2875 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2876 disk.iv_name, node, msg)
2877 if not ignore_primary or node != instance.primary_node:
2882 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2883 """Checks if a node has enough free memory.
2885 This function check if a given node has the needed amount of free
2886 memory. In case the node has less memory or we cannot get the
2887 information from the node, this function raise an OpPrereqError
2890 @type lu: C{LogicalUnit}
2891 @param lu: a logical unit from which we get configuration data
2893 @param node: the node to check
2894 @type reason: C{str}
2895 @param reason: string to use in the error message
2896 @type requested: C{int}
2897 @param requested: the amount of memory in MiB to check for
2898 @type hypervisor_name: C{str}
2899 @param hypervisor_name: the hypervisor to ask for memory stats
2900 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2901 we cannot check the node
2904 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2905 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2906 free_mem = nodeinfo[node].payload.get('memory_free', None)
2907 if not isinstance(free_mem, int):
2908 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2909 " was '%s'" % (node, free_mem))
2910 if requested > free_mem:
2911 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2912 " needed %s MiB, available %s MiB" %
2913 (node, reason, requested, free_mem))
2916 class LUStartupInstance(LogicalUnit):
2917 """Starts an instance.
2920 HPATH = "instance-start"
2921 HTYPE = constants.HTYPE_INSTANCE
2922 _OP_REQP = ["instance_name", "force"]
2925 def ExpandNames(self):
2926 self._ExpandAndLockInstance()
2928 def BuildHooksEnv(self):
2931 This runs on master, primary and secondary nodes of the instance.
2935 "FORCE": self.op.force,
2937 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2938 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2941 def CheckPrereq(self):
2942 """Check prerequisites.
2944 This checks that the instance is in the cluster.
2947 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2948 assert self.instance is not None, \
2949 "Cannot retrieve locked instance %s" % self.op.instance_name
2952 self.beparams = getattr(self.op, "beparams", {})
2954 if not isinstance(self.beparams, dict):
2955 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2956 " dict" % (type(self.beparams), ))
2957 # fill the beparams dict
2958 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2959 self.op.beparams = self.beparams
2962 self.hvparams = getattr(self.op, "hvparams", {})
2964 if not isinstance(self.hvparams, dict):
2965 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2966 " dict" % (type(self.hvparams), ))
2968 # check hypervisor parameter syntax (locally)
2969 cluster = self.cfg.GetClusterInfo()
2970 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2971 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2973 filled_hvp.update(self.hvparams)
2974 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2975 hv_type.CheckParameterSyntax(filled_hvp)
2976 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2977 self.op.hvparams = self.hvparams
2979 _CheckNodeOnline(self, instance.primary_node)
2981 bep = self.cfg.GetClusterInfo().FillBE(instance)
2982 # check bridges existence
2983 _CheckInstanceBridgesExist(self, instance)
2985 remote_info = self.rpc.call_instance_info(instance.primary_node,
2987 instance.hypervisor)
2988 remote_info.Raise("Error checking node %s" % instance.primary_node,
2990 if not remote_info.payload: # not running already
2991 _CheckNodeFreeMemory(self, instance.primary_node,
2992 "starting instance %s" % instance.name,
2993 bep[constants.BE_MEMORY], instance.hypervisor)
2995 def Exec(self, feedback_fn):
2996 """Start the instance.
2999 instance = self.instance
3000 force = self.op.force
3002 self.cfg.MarkInstanceUp(instance.name)
3004 node_current = instance.primary_node
3006 _StartInstanceDisks(self, instance, force)
3008 result = self.rpc.call_instance_start(node_current, instance,
3009 self.hvparams, self.beparams)
3010 msg = result.fail_msg
3012 _ShutdownInstanceDisks(self, instance)
3013 raise errors.OpExecError("Could not start instance: %s" % msg)
3016 class LURebootInstance(LogicalUnit):
3017 """Reboot an instance.
3020 HPATH = "instance-reboot"
3021 HTYPE = constants.HTYPE_INSTANCE
3022 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3025 def ExpandNames(self):
3026 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3027 constants.INSTANCE_REBOOT_HARD,
3028 constants.INSTANCE_REBOOT_FULL]:
3029 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3030 (constants.INSTANCE_REBOOT_SOFT,
3031 constants.INSTANCE_REBOOT_HARD,
3032 constants.INSTANCE_REBOOT_FULL))
3033 self._ExpandAndLockInstance()
3035 def BuildHooksEnv(self):
3038 This runs on master, primary and secondary nodes of the instance.
3042 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3043 "REBOOT_TYPE": self.op.reboot_type,
3045 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3046 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3049 def CheckPrereq(self):
3050 """Check prerequisites.
3052 This checks that the instance is in the cluster.
3055 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3056 assert self.instance is not None, \
3057 "Cannot retrieve locked instance %s" % self.op.instance_name
3059 _CheckNodeOnline(self, instance.primary_node)
3061 # check bridges existence
3062 _CheckInstanceBridgesExist(self, instance)
3064 def Exec(self, feedback_fn):
3065 """Reboot the instance.
3068 instance = self.instance
3069 ignore_secondaries = self.op.ignore_secondaries
3070 reboot_type = self.op.reboot_type
3072 node_current = instance.primary_node
3074 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3075 constants.INSTANCE_REBOOT_HARD]:
3076 for disk in instance.disks:
3077 self.cfg.SetDiskID(disk, node_current)
3078 result = self.rpc.call_instance_reboot(node_current, instance,
3080 result.Raise("Could not reboot instance")
3082 result = self.rpc.call_instance_shutdown(node_current, instance)
3083 result.Raise("Could not shutdown instance for full reboot")
3084 _ShutdownInstanceDisks(self, instance)
3085 _StartInstanceDisks(self, instance, ignore_secondaries)
3086 result = self.rpc.call_instance_start(node_current, instance, None, None)
3087 msg = result.fail_msg
3089 _ShutdownInstanceDisks(self, instance)
3090 raise errors.OpExecError("Could not start instance for"
3091 " full reboot: %s" % msg)
3093 self.cfg.MarkInstanceUp(instance.name)
3096 class LUShutdownInstance(LogicalUnit):
3097 """Shutdown an instance.
3100 HPATH = "instance-stop"
3101 HTYPE = constants.HTYPE_INSTANCE
3102 _OP_REQP = ["instance_name"]
3105 def ExpandNames(self):
3106 self._ExpandAndLockInstance()
3108 def BuildHooksEnv(self):
3111 This runs on master, primary and secondary nodes of the instance.
3114 env = _BuildInstanceHookEnvByObject(self, self.instance)
3115 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3118 def CheckPrereq(self):
3119 """Check prerequisites.
3121 This checks that the instance is in the cluster.
3124 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3125 assert self.instance is not None, \
3126 "Cannot retrieve locked instance %s" % self.op.instance_name
3127 _CheckNodeOnline(self, self.instance.primary_node)
3129 def Exec(self, feedback_fn):
3130 """Shutdown the instance.
3133 instance = self.instance
3134 node_current = instance.primary_node
3135 self.cfg.MarkInstanceDown(instance.name)
3136 result = self.rpc.call_instance_shutdown(node_current, instance)
3137 msg = result.fail_msg
3139 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3141 _ShutdownInstanceDisks(self, instance)
3144 class LUReinstallInstance(LogicalUnit):
3145 """Reinstall an instance.
3148 HPATH = "instance-reinstall"
3149 HTYPE = constants.HTYPE_INSTANCE
3150 _OP_REQP = ["instance_name"]
3153 def ExpandNames(self):
3154 self._ExpandAndLockInstance()
3156 def BuildHooksEnv(self):
3159 This runs on master, primary and secondary nodes of the instance.
3162 env = _BuildInstanceHookEnvByObject(self, self.instance)
3163 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3166 def CheckPrereq(self):
3167 """Check prerequisites.
3169 This checks that the instance is in the cluster and is not running.
3172 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3173 assert instance is not None, \
3174 "Cannot retrieve locked instance %s" % self.op.instance_name
3175 _CheckNodeOnline(self, instance.primary_node)
3177 if instance.disk_template == constants.DT_DISKLESS:
3178 raise errors.OpPrereqError("Instance '%s' has no disks" %
3179 self.op.instance_name)
3180 if instance.admin_up:
3181 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3182 self.op.instance_name)
3183 remote_info = self.rpc.call_instance_info(instance.primary_node,
3185 instance.hypervisor)
3186 remote_info.Raise("Error checking node %s" % instance.primary_node,
3188 if remote_info.payload:
3189 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3190 (self.op.instance_name,
3191 instance.primary_node))
3193 self.op.os_type = getattr(self.op, "os_type", None)
3194 if self.op.os_type is not None:
3196 pnode = self.cfg.GetNodeInfo(
3197 self.cfg.ExpandNodeName(instance.primary_node))
3199 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3201 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3202 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3203 (self.op.os_type, pnode.name), prereq=True)
3205 self.instance = instance
3207 def Exec(self, feedback_fn):
3208 """Reinstall the instance.
3211 inst = self.instance
3213 if self.op.os_type is not None:
3214 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3215 inst.os = self.op.os_type
3216 self.cfg.Update(inst)
3218 _StartInstanceDisks(self, inst, None)
3220 feedback_fn("Running the instance OS create scripts...")
3221 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3222 result.Raise("Could not install OS for instance %s on node %s" %
3223 (inst.name, inst.primary_node))
3225 _ShutdownInstanceDisks(self, inst)
3228 class LURenameInstance(LogicalUnit):
3229 """Rename an instance.
3232 HPATH = "instance-rename"
3233 HTYPE = constants.HTYPE_INSTANCE
3234 _OP_REQP = ["instance_name", "new_name"]
3236 def BuildHooksEnv(self):
3239 This runs on master, primary and secondary nodes of the instance.
3242 env = _BuildInstanceHookEnvByObject(self, self.instance)
3243 env["INSTANCE_NEW_NAME"] = self.op.new_name
3244 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3247 def CheckPrereq(self):
3248 """Check prerequisites.
3250 This checks that the instance is in the cluster and is not running.
3253 instance = self.cfg.GetInstanceInfo(
3254 self.cfg.ExpandInstanceName(self.op.instance_name))
3255 if instance is None:
3256 raise errors.OpPrereqError("Instance '%s' not known" %
3257 self.op.instance_name)
3258 _CheckNodeOnline(self, instance.primary_node)
3260 if instance.admin_up:
3261 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3262 self.op.instance_name)
3263 remote_info = self.rpc.call_instance_info(instance.primary_node,
3265 instance.hypervisor)
3266 remote_info.Raise("Error checking node %s" % instance.primary_node,
3268 if remote_info.payload:
3269 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3270 (self.op.instance_name,
3271 instance.primary_node))
3272 self.instance = instance
3274 # new name verification
3275 name_info = utils.HostInfo(self.op.new_name)
3277 self.op.new_name = new_name = name_info.name
3278 instance_list = self.cfg.GetInstanceList()
3279 if new_name in instance_list:
3280 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3283 if not getattr(self.op, "ignore_ip", False):
3284 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3285 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3286 (name_info.ip, new_name))
3289 def Exec(self, feedback_fn):
3290 """Reinstall the instance.
3293 inst = self.instance
3294 old_name = inst.name
3296 if inst.disk_template == constants.DT_FILE:
3297 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3299 self.cfg.RenameInstance(inst.name, self.op.new_name)
3300 # Change the instance lock. This is definitely safe while we hold the BGL
3301 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3302 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3304 # re-read the instance from the configuration after rename
3305 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3307 if inst.disk_template == constants.DT_FILE:
3308 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3309 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3310 old_file_storage_dir,
3311 new_file_storage_dir)
3312 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3313 " (but the instance has been renamed in Ganeti)" %
3314 (inst.primary_node, old_file_storage_dir,
3315 new_file_storage_dir))
3317 _StartInstanceDisks(self, inst, None)
3319 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3321 msg = result.fail_msg
3323 msg = ("Could not run OS rename script for instance %s on node %s"
3324 " (but the instance has been renamed in Ganeti): %s" %
3325 (inst.name, inst.primary_node, msg))
3326 self.proc.LogWarning(msg)
3328 _ShutdownInstanceDisks(self, inst)
3331 class LURemoveInstance(LogicalUnit):
3332 """Remove an instance.
3335 HPATH = "instance-remove"
3336 HTYPE = constants.HTYPE_INSTANCE
3337 _OP_REQP = ["instance_name", "ignore_failures"]
3340 def ExpandNames(self):
3341 self._ExpandAndLockInstance()
3342 self.needed_locks[locking.LEVEL_NODE] = []
3343 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3345 def DeclareLocks(self, level):
3346 if level == locking.LEVEL_NODE:
3347 self._LockInstancesNodes()
3349 def BuildHooksEnv(self):
3352 This runs on master, primary and secondary nodes of the instance.
3355 env = _BuildInstanceHookEnvByObject(self, self.instance)
3356 nl = [self.cfg.GetMasterNode()]
3359 def CheckPrereq(self):
3360 """Check prerequisites.
3362 This checks that the instance is in the cluster.
3365 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3366 assert self.instance is not None, \
3367 "Cannot retrieve locked instance %s" % self.op.instance_name
3369 def Exec(self, feedback_fn):
3370 """Remove the instance.
3373 instance = self.instance
3374 logging.info("Shutting down instance %s on node %s",
3375 instance.name, instance.primary_node)
3377 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3378 msg = result.fail_msg
3380 if self.op.ignore_failures:
3381 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3383 raise errors.OpExecError("Could not shutdown instance %s on"
3385 (instance.name, instance.primary_node, msg))
3387 logging.info("Removing block devices for instance %s", instance.name)
3389 if not _RemoveDisks(self, instance):
3390 if self.op.ignore_failures:
3391 feedback_fn("Warning: can't remove instance's disks")
3393 raise errors.OpExecError("Can't remove instance's disks")
3395 logging.info("Removing instance %s out of cluster config", instance.name)
3397 self.cfg.RemoveInstance(instance.name)
3398 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3401 class LUQueryInstances(NoHooksLU):
3402 """Logical unit for querying instances.
3405 _OP_REQP = ["output_fields", "names", "use_locking"]
3407 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3409 "disk_template", "ip", "mac", "bridge",
3410 "nic_mode", "nic_link",
3411 "sda_size", "sdb_size", "vcpus", "tags",
3412 "network_port", "beparams",
3413 r"(disk)\.(size)/([0-9]+)",
3414 r"(disk)\.(sizes)", "disk_usage",
3415 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3416 r"(nic)\.(bridge)/([0-9]+)",
3417 r"(nic)\.(macs|ips|modes|links|bridges)",
3418 r"(disk|nic)\.(count)",
3419 "serial_no", "hypervisor", "hvparams",] +
3421 for name in constants.HVS_PARAMETERS] +
3423 for name in constants.BES_PARAMETERS])
3424 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3427 def ExpandNames(self):
3428 _CheckOutputFields(static=self._FIELDS_STATIC,
3429 dynamic=self._FIELDS_DYNAMIC,
3430 selected=self.op.output_fields)
3432 self.needed_locks = {}
3433 self.share_locks[locking.LEVEL_INSTANCE] = 1
3434 self.share_locks[locking.LEVEL_NODE] = 1
3437 self.wanted = _GetWantedInstances(self, self.op.names)
3439 self.wanted = locking.ALL_SET
3441 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3442 self.do_locking = self.do_node_query and self.op.use_locking
3444 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3445 self.needed_locks[locking.LEVEL_NODE] = []
3446 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3448 def DeclareLocks(self, level):
3449 if level == locking.LEVEL_NODE and self.do_locking:
3450 self._LockInstancesNodes()
3452 def CheckPrereq(self):
3453 """Check prerequisites.
3458 def Exec(self, feedback_fn):
3459 """Computes the list of nodes and their attributes.
3462 all_info = self.cfg.GetAllInstancesInfo()
3463 if self.wanted == locking.ALL_SET:
3464 # caller didn't specify instance names, so ordering is not important
3466 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3468 instance_names = all_info.keys()
3469 instance_names = utils.NiceSort(instance_names)
3471 # caller did specify names, so we must keep the ordering
3473 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3475 tgt_set = all_info.keys()
3476 missing = set(self.wanted).difference(tgt_set)
3478 raise errors.OpExecError("Some instances were removed before"
3479 " retrieving their data: %s" % missing)
3480 instance_names = self.wanted
3482 instance_list = [all_info[iname] for iname in instance_names]
3484 # begin data gathering
3486 nodes = frozenset([inst.primary_node for inst in instance_list])
3487 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3491 if self.do_node_query:
3493 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3495 result = node_data[name]
3497 # offline nodes will be in both lists
3498 off_nodes.append(name)
3499 if result.failed or result.fail_msg:
3500 bad_nodes.append(name)
3503 live_data.update(result.payload)
3504 # else no instance is alive
3506 live_data = dict([(name, {}) for name in instance_names])
3508 # end data gathering
3513 cluster = self.cfg.GetClusterInfo()
3514 for instance in instance_list:
3516 i_hv = cluster.FillHV(instance)
3517 i_be = cluster.FillBE(instance)
3518 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3519 nic.nicparams) for nic in instance.nics]
3520 for field in self.op.output_fields:
3521 st_match = self._FIELDS_STATIC.Matches(field)
3526 elif field == "pnode":
3527 val = instance.primary_node
3528 elif field == "snodes":
3529 val = list(instance.secondary_nodes)
3530 elif field == "admin_state":
3531 val = instance.admin_up
3532 elif field == "oper_state":
3533 if instance.primary_node in bad_nodes:
3536 val = bool(live_data.get(instance.name))
3537 elif field == "status":
3538 if instance.primary_node in off_nodes:
3539 val = "ERROR_nodeoffline"
3540 elif instance.primary_node in bad_nodes:
3541 val = "ERROR_nodedown"
3543 running = bool(live_data.get(instance.name))
3545 if instance.admin_up:
3550 if instance.admin_up:
3554 elif field == "oper_ram":
3555 if instance.primary_node in bad_nodes:
3557 elif instance.name in live_data:
3558 val = live_data[instance.name].get("memory", "?")
3561 elif field == "vcpus":
3562 val = i_be[constants.BE_VCPUS]
3563 elif field == "disk_template":
3564 val = instance.disk_template
3567 val = instance.nics[0].ip
3570 elif field == "nic_mode":
3572 val = i_nicp[0][constants.NIC_MODE]
3575 elif field == "nic_link":
3577 val = i_nicp[0][constants.NIC_LINK]
3580 elif field == "bridge":
3581 if (instance.nics and
3582 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3583 val = i_nicp[0][constants.NIC_LINK]
3586 elif field == "mac":
3588 val = instance.nics[0].mac
3591 elif field == "sda_size" or field == "sdb_size":
3592 idx = ord(field[2]) - ord('a')
3594 val = instance.FindDisk(idx).size
3595 except errors.OpPrereqError:
3597 elif field == "disk_usage": # total disk usage per node
3598 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3599 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3600 elif field == "tags":
3601 val = list(instance.GetTags())
3602 elif field == "serial_no":
3603 val = instance.serial_no
3604 elif field == "network_port":
3605 val = instance.network_port
3606 elif field == "hypervisor":
3607 val = instance.hypervisor
3608 elif field == "hvparams":
3610 elif (field.startswith(HVPREFIX) and
3611 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3612 val = i_hv.get(field[len(HVPREFIX):], None)
3613 elif field == "beparams":
3615 elif (field.startswith(BEPREFIX) and
3616 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3617 val = i_be.get(field[len(BEPREFIX):], None)
3618 elif st_match and st_match.groups():
3619 # matches a variable list
3620 st_groups = st_match.groups()
3621 if st_groups and st_groups[0] == "disk":
3622 if st_groups[1] == "count":
3623 val = len(instance.disks)
3624 elif st_groups[1] == "sizes":
3625 val = [disk.size for disk in instance.disks]
3626 elif st_groups[1] == "size":
3628 val = instance.FindDisk(st_groups[2]).size
3629 except errors.OpPrereqError:
3632 assert False, "Unhandled disk parameter"
3633 elif st_groups[0] == "nic":
3634 if st_groups[1] == "count":
3635 val = len(instance.nics)
3636 elif st_groups[1] == "macs":
3637 val = [nic.mac for nic in instance.nics]
3638 elif st_groups[1] == "ips":
3639 val = [nic.ip for nic in instance.nics]
3640 elif st_groups[1] == "modes":
3641 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3642 elif st_groups[1] == "links":
3643 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3644 elif st_groups[1] == "bridges":
3647 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3648 val.append(nicp[constants.NIC_LINK])
3653 nic_idx = int(st_groups[2])
3654 if nic_idx >= len(instance.nics):
3657 if st_groups[1] == "mac":
3658 val = instance.nics[nic_idx].mac
3659 elif st_groups[1] == "ip":
3660 val = instance.nics[nic_idx].ip
3661 elif st_groups[1] == "mode":
3662 val = i_nicp[nic_idx][constants.NIC_MODE]
3663 elif st_groups[1] == "link":
3664 val = i_nicp[nic_idx][constants.NIC_LINK]
3665 elif st_groups[1] == "bridge":
3666 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3667 if nic_mode == constants.NIC_MODE_BRIDGED:
3668 val = i_nicp[nic_idx][constants.NIC_LINK]
3672 assert False, "Unhandled NIC parameter"
3674 assert False, ("Declared but unhandled variable parameter '%s'" %
3677 assert False, "Declared but unhandled parameter '%s'" % field
3684 class LUFailoverInstance(LogicalUnit):
3685 """Failover an instance.
3688 HPATH = "instance-failover"
3689 HTYPE = constants.HTYPE_INSTANCE
3690 _OP_REQP = ["instance_name", "ignore_consistency"]
3693 def ExpandNames(self):
3694 self._ExpandAndLockInstance()
3695 self.needed_locks[locking.LEVEL_NODE] = []
3696 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3698 def DeclareLocks(self, level):
3699 if level == locking.LEVEL_NODE:
3700 self._LockInstancesNodes()
3702 def BuildHooksEnv(self):
3705 This runs on master, primary and secondary nodes of the instance.
3709 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3711 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3712 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3715 def CheckPrereq(self):
3716 """Check prerequisites.
3718 This checks that the instance is in the cluster.
3721 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3722 assert self.instance is not None, \
3723 "Cannot retrieve locked instance %s" % self.op.instance_name
3725 bep = self.cfg.GetClusterInfo().FillBE(instance)
3726 if instance.disk_template not in constants.DTS_NET_MIRROR:
3727 raise errors.OpPrereqError("Instance's disk layout is not"
3728 " network mirrored, cannot failover.")
3730 secondary_nodes = instance.secondary_nodes
3731 if not secondary_nodes:
3732 raise errors.ProgrammerError("no secondary node but using "
3733 "a mirrored disk template")
3735 target_node = secondary_nodes[0]
3736 _CheckNodeOnline(self, target_node)
3737 _CheckNodeNotDrained(self, target_node)
3738 if instance.admin_up:
3739 # check memory requirements on the secondary node
3740 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3741 instance.name, bep[constants.BE_MEMORY],
3742 instance.hypervisor)
3744 self.LogInfo("Not checking memory on the secondary node as"
3745 " instance will not be started")
3747 # check bridge existance
3748 _CheckInstanceBridgesExist(self, instance, node=target_node)
3750 def Exec(self, feedback_fn):
3751 """Failover an instance.
3753 The failover is done by shutting it down on its present node and
3754 starting it on the secondary.
3757 instance = self.instance
3759 source_node = instance.primary_node
3760 target_node = instance.secondary_nodes[0]
3762 feedback_fn("* checking disk consistency between source and target")
3763 for dev in instance.disks:
3764 # for drbd, these are drbd over lvm
3765 if not _CheckDiskConsistency(self, dev, target_node, False):
3766 if instance.admin_up and not self.op.ignore_consistency:
3767 raise errors.OpExecError("Disk %s is degraded on target node,"
3768 " aborting failover." % dev.iv_name)
3770 feedback_fn("* shutting down instance on source node")
3771 logging.info("Shutting down instance %s on node %s",
3772 instance.name, source_node)
3774 result = self.rpc.call_instance_shutdown(source_node, instance)
3775 msg = result.fail_msg
3777 if self.op.ignore_consistency:
3778 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3779 " Proceeding anyway. Please make sure node"
3780 " %s is down. Error details: %s",
3781 instance.name, source_node, source_node, msg)
3783 raise errors.OpExecError("Could not shutdown instance %s on"
3785 (instance.name, source_node, msg))
3787 feedback_fn("* deactivating the instance's disks on source node")
3788 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3789 raise errors.OpExecError("Can't shut down the instance's disks.")
3791 instance.primary_node = target_node
3792 # distribute new instance config to the other nodes
3793 self.cfg.Update(instance)
3795 # Only start the instance if it's marked as up
3796 if instance.admin_up:
3797 feedback_fn("* activating the instance's disks on target node")
3798 logging.info("Starting instance %s on node %s",
3799 instance.name, target_node)
3801 disks_ok, _ = _AssembleInstanceDisks(self, instance,
3802 ignore_secondaries=True)
3804 _ShutdownInstanceDisks(self, instance)
3805 raise errors.OpExecError("Can't activate the instance's disks")
3807 feedback_fn("* starting the instance on the target node")
3808 result = self.rpc.call_instance_start(target_node, instance, None, None)
3809 msg = result.fail_msg
3811 _ShutdownInstanceDisks(self, instance)
3812 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3813 (instance.name, target_node, msg))
3816 class LUMigrateInstance(LogicalUnit):
3817 """Migrate an instance.
3819 This is migration without shutting down, compared to the failover,
3820 which is done with shutdown.
3823 HPATH = "instance-migrate"
3824 HTYPE = constants.HTYPE_INSTANCE
3825 _OP_REQP = ["instance_name", "live", "cleanup"]
3829 def ExpandNames(self):
3830 self._ExpandAndLockInstance()
3831 self.needed_locks[locking.LEVEL_NODE] = []
3832 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3834 def DeclareLocks(self, level):
3835 if level == locking.LEVEL_NODE:
3836 self._LockInstancesNodes()
3838 def BuildHooksEnv(self):
3841 This runs on master, primary and secondary nodes of the instance.
3844 env = _BuildInstanceHookEnvByObject(self, self.instance)
3845 env["MIGRATE_LIVE"] = self.op.live
3846 env["MIGRATE_CLEANUP"] = self.op.cleanup
3847 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3850 def CheckPrereq(self):
3851 """Check prerequisites.
3853 This checks that the instance is in the cluster.
3856 instance = self.cfg.GetInstanceInfo(
3857 self.cfg.ExpandInstanceName(self.op.instance_name))
3858 if instance is None:
3859 raise errors.OpPrereqError("Instance '%s' not known" %
3860 self.op.instance_name)
3862 if instance.disk_template != constants.DT_DRBD8:
3863 raise errors.OpPrereqError("Instance's disk layout is not"
3864 " drbd8, cannot migrate.")
3866 secondary_nodes = instance.secondary_nodes
3867 if not secondary_nodes:
3868 raise errors.ConfigurationError("No secondary node but using"
3869 " drbd8 disk template")
3871 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3873 target_node = secondary_nodes[0]
3874 # check memory requirements on the secondary node
3875 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3876 instance.name, i_be[constants.BE_MEMORY],
3877 instance.hypervisor)
3879 # check bridge existance
3880 _CheckInstanceBridgesExist(self, instance, node=target_node)
3882 if not self.op.cleanup:
3883 _CheckNodeNotDrained(self, target_node)
3884 result = self.rpc.call_instance_migratable(instance.primary_node,
3886 result.Raise("Can't migrate, please use failover", prereq=True)
3888 self.instance = instance
3890 def _WaitUntilSync(self):
3891 """Poll with custom rpc for disk sync.
3893 This uses our own step-based rpc call.
3896 self.feedback_fn("* wait until resync is done")
3900 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3902 self.instance.disks)
3904 for node, nres in result.items():
3905 nres.Raise("Cannot resync disks on node %s" % node)
3906 node_done, node_percent = nres.payload
3907 all_done = all_done and node_done
3908 if node_percent is not None:
3909 min_percent = min(min_percent, node_percent)
3911 if min_percent < 100:
3912 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3915 def _EnsureSecondary(self, node):
3916 """Demote a node to secondary.
3919 self.feedback_fn("* switching node %s to secondary mode" % node)
3921 for dev in self.instance.disks:
3922 self.cfg.SetDiskID(dev, node)
3924 result = self.rpc.call_blockdev_close(node, self.instance.name,
3925 self.instance.disks)
3926 result.Raise("Cannot change disk to secondary on node %s" % node)
3928 def _GoStandalone(self):
3929 """Disconnect from the network.
3932 self.feedback_fn("* changing into standalone mode")
3933 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3934 self.instance.disks)
3935 for node, nres in result.items():
3936 nres.Raise("Cannot disconnect disks node %s" % node)
3938 def _GoReconnect(self, multimaster):
3939 """Reconnect to the network.
3945 msg = "single-master"
3946 self.feedback_fn("* changing disks into %s mode" % msg)
3947 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3948 self.instance.disks,
3949 self.instance.name, multimaster)
3950 for node, nres in result.items():
3951 nres.Raise("Cannot change disks config on node %s" % node)
3953 def _ExecCleanup(self):
3954 """Try to cleanup after a failed migration.
3956 The cleanup is done by:
3957 - check that the instance is running only on one node
3958 (and update the config if needed)
3959 - change disks on its secondary node to secondary
3960 - wait until disks are fully synchronized
3961 - disconnect from the network
3962 - change disks into single-master mode
3963 - wait again until disks are fully synchronized
3966 instance = self.instance
3967 target_node = self.target_node
3968 source_node = self.source_node
3970 # check running on only one node
3971 self.feedback_fn("* checking where the instance actually runs"
3972 " (if this hangs, the hypervisor might be in"
3974 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3975 for node, result in ins_l.items():
3976 result.Raise("Can't contact node %s" % node)
3978 runningon_source = instance.name in ins_l[source_node].payload
3979 runningon_target = instance.name in ins_l[target_node].payload
3981 if runningon_source and runningon_target:
3982 raise errors.OpExecError("Instance seems to be running on two nodes,"
3983 " or the hypervisor is confused. You will have"
3984 " to ensure manually that it runs only on one"
3985 " and restart this operation.")
3987 if not (runningon_source or runningon_target):
3988 raise errors.OpExecError("Instance does not seem to be running at all."
3989 " In this case, it's safer to repair by"
3990 " running 'gnt-instance stop' to ensure disk"
3991 " shutdown, and then restarting it.")
3993 if runningon_target:
3994 # the migration has actually succeeded, we need to update the config
3995 self.feedback_fn("* instance running on secondary node (%s),"
3996 " updating config" % target_node)
3997 instance.primary_node = target_node
3998 self.cfg.Update(instance)
3999 demoted_node = source_node
4001 self.feedback_fn("* instance confirmed to be running on its"
4002 " primary node (%s)" % source_node)
4003 demoted_node = target_node
4005 self._EnsureSecondary(demoted_node)
4007 self._WaitUntilSync()
4008 except errors.OpExecError:
4009 # we ignore here errors, since if the device is standalone, it
4010 # won't be able to sync
4012 self._GoStandalone()
4013 self._GoReconnect(False)
4014 self._WaitUntilSync()
4016 self.feedback_fn("* done")
4018 def _RevertDiskStatus(self):
4019 """Try to revert the disk status after a failed migration.
4022 target_node = self.target_node
4024 self._EnsureSecondary(target_node)
4025 self._GoStandalone()
4026 self._GoReconnect(False)
4027 self._WaitUntilSync()
4028 except errors.OpExecError, err:
4029 self.LogWarning("Migration failed and I can't reconnect the"
4030 " drives: error '%s'\n"
4031 "Please look and recover the instance status" %
4034 def _AbortMigration(self):
4035 """Call the hypervisor code to abort a started migration.
4038 instance = self.instance
4039 target_node = self.target_node
4040 migration_info = self.migration_info
4042 abort_result = self.rpc.call_finalize_migration(target_node,
4046 abort_msg = abort_result.fail_msg
4048 logging.error("Aborting migration failed on target node %s: %s" %
4049 (target_node, abort_msg))
4050 # Don't raise an exception here, as we stil have to try to revert the
4051 # disk status, even if this step failed.
4053 def _ExecMigration(self):
4054 """Migrate an instance.
4056 The migrate is done by:
4057 - change the disks into dual-master mode
4058 - wait until disks are fully synchronized again
4059 - migrate the instance
4060 - change disks on the new secondary node (the old primary) to secondary
4061 - wait until disks are fully synchronized
4062 - change disks into single-master mode
4065 instance = self.instance
4066 target_node = self.target_node
4067 source_node = self.source_node
4069 self.feedback_fn("* checking disk consistency between source and target")
4070 for dev in instance.disks:
4071 if not _CheckDiskConsistency(self, dev, target_node, False):
4072 raise errors.OpExecError("Disk %s is degraded or not fully"
4073 " synchronized on target node,"
4074 " aborting migrate." % dev.iv_name)
4076 # First get the migration information from the remote node
4077 result = self.rpc.call_migration_info(source_node, instance)
4078 msg = result.fail_msg
4080 log_err = ("Failed fetching source migration information from %s: %s" %
4082 logging.error(log_err)
4083 raise errors.OpExecError(log_err)
4085 self.migration_info = migration_info = result.payload
4087 # Then switch the disks to master/master mode
4088 self._EnsureSecondary(target_node)
4089 self._GoStandalone()
4090 self._GoReconnect(True)
4091 self._WaitUntilSync()
4093 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4094 result = self.rpc.call_accept_instance(target_node,
4097 self.nodes_ip[target_node])
4099 msg = result.fail_msg
4101 logging.error("Instance pre-migration failed, trying to revert"
4102 " disk status: %s", msg)
4103 self._AbortMigration()
4104 self._RevertDiskStatus()
4105 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4106 (instance.name, msg))
4108 self.feedback_fn("* migrating instance to %s" % target_node)
4110 result = self.rpc.call_instance_migrate(source_node, instance,
4111 self.nodes_ip[target_node],
4113 msg = result.fail_msg
4115 logging.error("Instance migration failed, trying to revert"
4116 " disk status: %s", msg)
4117 self._AbortMigration()
4118 self._RevertDiskStatus()
4119 raise errors.OpExecError("Could not migrate instance %s: %s" %
4120 (instance.name, msg))
4123 instance.primary_node = target_node
4124 # distribute new instance config to the other nodes
4125 self.cfg.Update(instance)
4127 result = self.rpc.call_finalize_migration(target_node,
4131 msg = result.fail_msg
4133 logging.error("Instance migration succeeded, but finalization failed:"
4135 raise errors.OpExecError("Could not finalize instance migration: %s" %
4138 self._EnsureSecondary(source_node)
4139 self._WaitUntilSync()
4140 self._GoStandalone()
4141 self._GoReconnect(False)
4142 self._WaitUntilSync()
4144 self.feedback_fn("* done")
4146 def Exec(self, feedback_fn):
4147 """Perform the migration.
4150 self.feedback_fn = feedback_fn
4152 self.source_node = self.instance.primary_node
4153 self.target_node = self.instance.secondary_nodes[0]
4154 self.all_nodes = [self.source_node, self.target_node]
4156 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4157 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4160 return self._ExecCleanup()
4162 return self._ExecMigration()
4165 def _CreateBlockDev(lu, node, instance, device, force_create,
4167 """Create a tree of block devices on a given node.
4169 If this device type has to be created on secondaries, create it and
4172 If not, just recurse to children keeping the same 'force' value.
4174 @param lu: the lu on whose behalf we execute
4175 @param node: the node on which to create the device
4176 @type instance: L{objects.Instance}
4177 @param instance: the instance which owns the device
4178 @type device: L{objects.Disk}
4179 @param device: the device to create
4180 @type force_create: boolean
4181 @param force_create: whether to force creation of this device; this
4182 will be change to True whenever we find a device which has
4183 CreateOnSecondary() attribute
4184 @param info: the extra 'metadata' we should attach to the device
4185 (this will be represented as a LVM tag)
4186 @type force_open: boolean
4187 @param force_open: this parameter will be passes to the
4188 L{backend.BlockdevCreate} function where it specifies
4189 whether we run on primary or not, and it affects both
4190 the child assembly and the device own Open() execution
4193 if device.CreateOnSecondary():
4197 for child in device.children:
4198 _CreateBlockDev(lu, node, instance, child, force_create,
4201 if not force_create:
4204 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4207 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4208 """Create a single block device on a given node.
4210 This will not recurse over children of the device, so they must be
4213 @param lu: the lu on whose behalf we execute
4214 @param node: the node on which to create the device
4215 @type instance: L{objects.Instance}
4216 @param instance: the instance which owns the device
4217 @type device: L{objects.Disk}
4218 @param device: the device to create
4219 @param info: the extra 'metadata' we should attach to the device
4220 (this will be represented as a LVM tag)
4221 @type force_open: boolean
4222 @param force_open: this parameter will be passes to the
4223 L{backend.BlockdevCreate} function where it specifies
4224 whether we run on primary or not, and it affects both
4225 the child assembly and the device own Open() execution
4228 lu.cfg.SetDiskID(device, node)
4229 result = lu.rpc.call_blockdev_create(node, device, device.size,
4230 instance.name, force_open, info)
4231 result.Raise("Can't create block device %s on"
4232 " node %s for instance %s" % (device, node, instance.name))
4233 if device.physical_id is None:
4234 device.physical_id = result.payload
4237 def _GenerateUniqueNames(lu, exts):
4238 """Generate a suitable LV name.
4240 This will generate a logical volume name for the given instance.
4245 new_id = lu.cfg.GenerateUniqueID()
4246 results.append("%s%s" % (new_id, val))
4250 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4252 """Generate a drbd8 device complete with its children.
4255 port = lu.cfg.AllocatePort()
4256 vgname = lu.cfg.GetVGName()
4257 shared_secret = lu.cfg.GenerateDRBDSecret()
4258 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4259 logical_id=(vgname, names[0]))
4260 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4261 logical_id=(vgname, names[1]))
4262 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4263 logical_id=(primary, secondary, port,
4266 children=[dev_data, dev_meta],
4271 def _GenerateDiskTemplate(lu, template_name,
4272 instance_name, primary_node,
4273 secondary_nodes, disk_info,
4274 file_storage_dir, file_driver,
4276 """Generate the entire disk layout for a given template type.
4279 #TODO: compute space requirements
4281 vgname = lu.cfg.GetVGName()
4282 disk_count = len(disk_info)
4284 if template_name == constants.DT_DISKLESS:
4286 elif template_name == constants.DT_PLAIN:
4287 if len(secondary_nodes) != 0:
4288 raise errors.ProgrammerError("Wrong template configuration")
4290 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4291 for i in range(disk_count)])
4292 for idx, disk in enumerate(disk_info):
4293 disk_index = idx + base_index
4294 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4295 logical_id=(vgname, names[idx]),
4296 iv_name="disk/%d" % disk_index,
4298 disks.append(disk_dev)
4299 elif template_name == constants.DT_DRBD8:
4300 if len(secondary_nodes) != 1:
4301 raise errors.ProgrammerError("Wrong template configuration")
4302 remote_node = secondary_nodes[0]
4303 minors = lu.cfg.AllocateDRBDMinor(
4304 [primary_node, remote_node] * len(disk_info), instance_name)
4307 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4308 for i in range(disk_count)]):
4309 names.append(lv_prefix + "_data")
4310 names.append(lv_prefix + "_meta")
4311 for idx, disk in enumerate(disk_info):
4312 disk_index = idx + base_index
4313 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4314 disk["size"], names[idx*2:idx*2+2],
4315 "disk/%d" % disk_index,
4316 minors[idx*2], minors[idx*2+1])
4317 disk_dev.mode = disk["mode"]
4318 disks.append(disk_dev)
4319 elif template_name == constants.DT_FILE:
4320 if len(secondary_nodes) != 0:
4321 raise errors.ProgrammerError("Wrong template configuration")
4323 for idx, disk in enumerate(disk_info):
4324 disk_index = idx + base_index
4325 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4326 iv_name="disk/%d" % disk_index,
4327 logical_id=(file_driver,
4328 "%s/disk%d" % (file_storage_dir,
4331 disks.append(disk_dev)
4333 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4337 def _GetInstanceInfoText(instance):
4338 """Compute that text that should be added to the disk's metadata.
4341 return "originstname+%s" % instance.name
4344 def _CreateDisks(lu, instance):
4345 """Create all disks for an instance.
4347 This abstracts away some work from AddInstance.
4349 @type lu: L{LogicalUnit}
4350 @param lu: the logical unit on whose behalf we execute
4351 @type instance: L{objects.Instance}
4352 @param instance: the instance whose disks we should create
4354 @return: the success of the creation
4357 info = _GetInstanceInfoText(instance)
4358 pnode = instance.primary_node
4360 if instance.disk_template == constants.DT_FILE:
4361 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4362 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4364 result.Raise("Failed to create directory '%s' on"
4365 " node %s: %s" % (file_storage_dir, pnode))
4367 # Note: this needs to be kept in sync with adding of disks in
4368 # LUSetInstanceParams
4369 for device in instance.disks:
4370 logging.info("Creating volume %s for instance %s",
4371 device.iv_name, instance.name)
4373 for node in instance.all_nodes:
4374 f_create = node == pnode
4375 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4378 def _RemoveDisks(lu, instance):
4379 """Remove all disks for an instance.
4381 This abstracts away some work from `AddInstance()` and
4382 `RemoveInstance()`. Note that in case some of the devices couldn't
4383 be removed, the removal will continue with the other ones (compare
4384 with `_CreateDisks()`).
4386 @type lu: L{LogicalUnit}
4387 @param lu: the logical unit on whose behalf we execute
4388 @type instance: L{objects.Instance}
4389 @param instance: the instance whose disks we should remove
4391 @return: the success of the removal
4394 logging.info("Removing block devices for instance %s", instance.name)
4397 for device in instance.disks:
4398 for node, disk in device.ComputeNodeTree(instance.primary_node):
4399 lu.cfg.SetDiskID(disk, node)
4400 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4402 lu.LogWarning("Could not remove block device %s on node %s,"
4403 " continuing anyway: %s", device.iv_name, node, msg)
4406 if instance.disk_template == constants.DT_FILE:
4407 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4408 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4410 msg = result.fail_msg
4412 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4413 file_storage_dir, instance.primary_node, msg)
4419 def _ComputeDiskSize(disk_template, disks):
4420 """Compute disk size requirements in the volume group
4423 # Required free disk space as a function of disk and swap space
4425 constants.DT_DISKLESS: None,
4426 constants.DT_PLAIN: sum(d["size"] for d in disks),
4427 # 128 MB are added for drbd metadata for each disk
4428 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4429 constants.DT_FILE: None,
4432 if disk_template not in req_size_dict:
4433 raise errors.ProgrammerError("Disk template '%s' size requirement"
4434 " is unknown" % disk_template)
4436 return req_size_dict[disk_template]
4439 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4440 """Hypervisor parameter validation.
4442 This function abstract the hypervisor parameter validation to be
4443 used in both instance create and instance modify.
4445 @type lu: L{LogicalUnit}
4446 @param lu: the logical unit for which we check
4447 @type nodenames: list
4448 @param nodenames: the list of nodes on which we should check
4449 @type hvname: string
4450 @param hvname: the name of the hypervisor we should use
4451 @type hvparams: dict
4452 @param hvparams: the parameters which we need to check
4453 @raise errors.OpPrereqError: if the parameters are not valid
4456 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4459 for node in nodenames:
4463 info.Raise("Hypervisor parameter validation failed on node %s" % node)
4466 class LUCreateInstance(LogicalUnit):
4467 """Create an instance.
4470 HPATH = "instance-add"
4471 HTYPE = constants.HTYPE_INSTANCE
4472 _OP_REQP = ["instance_name", "disks", "disk_template",
4474 "wait_for_sync", "ip_check", "nics",
4475 "hvparams", "beparams"]
4478 def _ExpandNode(self, node):
4479 """Expands and checks one node name.
4482 node_full = self.cfg.ExpandNodeName(node)
4483 if node_full is None:
4484 raise errors.OpPrereqError("Unknown node %s" % node)
4487 def ExpandNames(self):
4488 """ExpandNames for CreateInstance.
4490 Figure out the right locks for instance creation.
4493 self.needed_locks = {}
4495 # set optional parameters to none if they don't exist
4496 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4497 if not hasattr(self.op, attr):
4498 setattr(self.op, attr, None)
4500 # cheap checks, mostly valid constants given
4502 # verify creation mode
4503 if self.op.mode not in (constants.INSTANCE_CREATE,
4504 constants.INSTANCE_IMPORT):
4505 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4508 # disk template and mirror node verification
4509 if self.op.disk_template not in constants.DISK_TEMPLATES:
4510 raise errors.OpPrereqError("Invalid disk template name")
4512 if self.op.hypervisor is None:
4513 self.op.hypervisor = self.cfg.GetHypervisorType()
4515 cluster = self.cfg.GetClusterInfo()
4516 enabled_hvs = cluster.enabled_hypervisors
4517 if self.op.hypervisor not in enabled_hvs:
4518 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4519 " cluster (%s)" % (self.op.hypervisor,
4520 ",".join(enabled_hvs)))
4522 # check hypervisor parameter syntax (locally)
4523 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4524 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4526 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4527 hv_type.CheckParameterSyntax(filled_hvp)
4528 self.hv_full = filled_hvp
4530 # fill and remember the beparams dict
4531 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4532 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4535 #### instance parameters check
4537 # instance name verification
4538 hostname1 = utils.HostInfo(self.op.instance_name)
4539 self.op.instance_name = instance_name = hostname1.name
4541 # this is just a preventive check, but someone might still add this
4542 # instance in the meantime, and creation will fail at lock-add time
4543 if instance_name in self.cfg.GetInstanceList():
4544 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4547 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4551 for idx, nic in enumerate(self.op.nics):
4552 nic_mode_req = nic.get("mode", None)
4553 nic_mode = nic_mode_req
4554 if nic_mode is None:
4555 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4557 # in routed mode, for the first nic, the default ip is 'auto'
4558 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4559 default_ip_mode = constants.VALUE_AUTO
4561 default_ip_mode = constants.VALUE_NONE
4563 # ip validity checks
4564 ip = nic.get("ip", default_ip_mode)
4565 if ip is None or ip.lower() == constants.VALUE_NONE:
4567 elif ip.lower() == constants.VALUE_AUTO:
4568 nic_ip = hostname1.ip
4570 if not utils.IsValidIP(ip):
4571 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4572 " like a valid IP" % ip)
4575 # TODO: check the ip for uniqueness !!
4576 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4577 raise errors.OpPrereqError("Routed nic mode requires an ip address")
4579 # MAC address verification
4580 mac = nic.get("mac", constants.VALUE_AUTO)
4581 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4582 if not utils.IsValidMac(mac.lower()):
4583 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4585 # bridge verification
4586 bridge = nic.get("bridge", None)
4587 link = nic.get("link", None)
4589 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
4590 " at the same time")
4591 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4592 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4598 nicparams[constants.NIC_MODE] = nic_mode_req
4600 nicparams[constants.NIC_LINK] = link
4602 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4604 objects.NIC.CheckParameterSyntax(check_params)
4605 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4607 # disk checks/pre-build
4609 for disk in self.op.disks:
4610 mode = disk.get("mode", constants.DISK_RDWR)
4611 if mode not in constants.DISK_ACCESS_SET:
4612 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4614 size = disk.get("size", None)
4616 raise errors.OpPrereqError("Missing disk size")
4620 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4621 self.disks.append({"size": size, "mode": mode})
4623 # used in CheckPrereq for ip ping check
4624 self.check_ip = hostname1.ip
4626 # file storage checks
4627 if (self.op.file_driver and
4628 not self.op.file_driver in constants.FILE_DRIVER):
4629 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4630 self.op.file_driver)
4632 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4633 raise errors.OpPrereqError("File storage directory path not absolute")
4635 ### Node/iallocator related checks
4636 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4637 raise errors.OpPrereqError("One and only one of iallocator and primary"
4638 " node must be given")
4640 if self.op.iallocator:
4641 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4643 self.op.pnode = self._ExpandNode(self.op.pnode)
4644 nodelist = [self.op.pnode]
4645 if self.op.snode is not None:
4646 self.op.snode = self._ExpandNode(self.op.snode)
4647 nodelist.append(self.op.snode)
4648 self.needed_locks[locking.LEVEL_NODE] = nodelist
4650 # in case of import lock the source node too
4651 if self.op.mode == constants.INSTANCE_IMPORT:
4652 src_node = getattr(self.op, "src_node", None)
4653 src_path = getattr(self.op, "src_path", None)
4655 if src_path is None:
4656 self.op.src_path = src_path = self.op.instance_name
4658 if src_node is None:
4659 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4660 self.op.src_node = None
4661 if os.path.isabs(src_path):
4662 raise errors.OpPrereqError("Importing an instance from an absolute"
4663 " path requires a source node option.")
4665 self.op.src_node = src_node = self._ExpandNode(src_node)
4666 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4667 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4668 if not os.path.isabs(src_path):
4669 self.op.src_path = src_path = \
4670 os.path.join(constants.EXPORT_DIR, src_path)
4672 else: # INSTANCE_CREATE
4673 if getattr(self.op, "os_type", None) is None:
4674 raise errors.OpPrereqError("No guest OS specified")
4676 def _RunAllocator(self):
4677 """Run the allocator based on input opcode.
4680 nics = [n.ToDict() for n in self.nics]
4681 ial = IAllocator(self,
4682 mode=constants.IALLOCATOR_MODE_ALLOC,
4683 name=self.op.instance_name,
4684 disk_template=self.op.disk_template,
4687 vcpus=self.be_full[constants.BE_VCPUS],
4688 mem_size=self.be_full[constants.BE_MEMORY],
4691 hypervisor=self.op.hypervisor,
4694 ial.Run(self.op.iallocator)
4697 raise errors.OpPrereqError("Can't compute nodes using"
4698 " iallocator '%s': %s" % (self.op.iallocator,
4700 if len(ial.nodes) != ial.required_nodes:
4701 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4702 " of nodes (%s), required %s" %
4703 (self.op.iallocator, len(ial.nodes),
4704 ial.required_nodes))
4705 self.op.pnode = ial.nodes[0]
4706 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4707 self.op.instance_name, self.op.iallocator,
4708 ", ".join(ial.nodes))
4709 if ial.required_nodes == 2:
4710 self.op.snode = ial.nodes[1]
4712 def BuildHooksEnv(self):
4715 This runs on master, primary and secondary nodes of the instance.
4719 "ADD_MODE": self.op.mode,
4721 if self.op.mode == constants.INSTANCE_IMPORT:
4722 env["SRC_NODE"] = self.op.src_node
4723 env["SRC_PATH"] = self.op.src_path
4724 env["SRC_IMAGES"] = self.src_images
4726 env.update(_BuildInstanceHookEnv(
4727 name=self.op.instance_name,
4728 primary_node=self.op.pnode,
4729 secondary_nodes=self.secondaries,
4730 status=self.op.start,
4731 os_type=self.op.os_type,
4732 memory=self.be_full[constants.BE_MEMORY],
4733 vcpus=self.be_full[constants.BE_VCPUS],
4734 nics=_NICListToTuple(self, self.nics),
4735 disk_template=self.op.disk_template,
4736 disks=[(d["size"], d["mode"]) for d in self.disks],
4739 hypervisor=self.op.hypervisor,
4742 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4747 def CheckPrereq(self):
4748 """Check prerequisites.
4751 if (not self.cfg.GetVGName() and
4752 self.op.disk_template not in constants.DTS_NOT_LVM):
4753 raise errors.OpPrereqError("Cluster does not support lvm-based"
4756 if self.op.mode == constants.INSTANCE_IMPORT:
4757 src_node = self.op.src_node
4758 src_path = self.op.src_path
4760 if src_node is None:
4761 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4762 exp_list = self.rpc.call_export_list(locked_nodes)
4764 for node in exp_list:
4765 if exp_list[node].fail_msg:
4767 if src_path in exp_list[node].payload:
4769 self.op.src_node = src_node = node
4770 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4774 raise errors.OpPrereqError("No export found for relative path %s" %
4777 _CheckNodeOnline(self, src_node)
4778 result = self.rpc.call_export_info(src_node, src_path)
4779 result.Raise("No export or invalid export found in dir %s" % src_path)
4781 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4782 if not export_info.has_section(constants.INISECT_EXP):
4783 raise errors.ProgrammerError("Corrupted export config")
4785 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4786 if (int(ei_version) != constants.EXPORT_VERSION):
4787 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4788 (ei_version, constants.EXPORT_VERSION))
4790 # Check that the new instance doesn't have less disks than the export
4791 instance_disks = len(self.disks)
4792 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4793 if instance_disks < export_disks:
4794 raise errors.OpPrereqError("Not enough disks to import."
4795 " (instance: %d, export: %d)" %
4796 (instance_disks, export_disks))
4798 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4800 for idx in range(export_disks):
4801 option = 'disk%d_dump' % idx
4802 if export_info.has_option(constants.INISECT_INS, option):
4803 # FIXME: are the old os-es, disk sizes, etc. useful?
4804 export_name = export_info.get(constants.INISECT_INS, option)
4805 image = os.path.join(src_path, export_name)
4806 disk_images.append(image)
4808 disk_images.append(False)
4810 self.src_images = disk_images
4812 old_name = export_info.get(constants.INISECT_INS, 'name')
4813 # FIXME: int() here could throw a ValueError on broken exports
4814 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4815 if self.op.instance_name == old_name:
4816 for idx, nic in enumerate(self.nics):
4817 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4818 nic_mac_ini = 'nic%d_mac' % idx
4819 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4821 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4822 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4823 if self.op.start and not self.op.ip_check:
4824 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4825 " adding an instance in start mode")
4827 if self.op.ip_check:
4828 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4829 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4830 (self.check_ip, self.op.instance_name))
4832 #### mac address generation
4833 # By generating here the mac address both the allocator and the hooks get
4834 # the real final mac address rather than the 'auto' or 'generate' value.
4835 # There is a race condition between the generation and the instance object
4836 # creation, which means that we know the mac is valid now, but we're not
4837 # sure it will be when we actually add the instance. If things go bad
4838 # adding the instance will abort because of a duplicate mac, and the
4839 # creation job will fail.
4840 for nic in self.nics:
4841 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4842 nic.mac = self.cfg.GenerateMAC()
4846 if self.op.iallocator is not None:
4847 self._RunAllocator()
4849 #### node related checks
4851 # check primary node
4852 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4853 assert self.pnode is not None, \
4854 "Cannot retrieve locked node %s" % self.op.pnode
4856 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4859 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4862 self.secondaries = []
4864 # mirror node verification
4865 if self.op.disk_template in constants.DTS_NET_MIRROR:
4866 if self.op.snode is None:
4867 raise errors.OpPrereqError("The networked disk templates need"
4869 if self.op.snode == pnode.name:
4870 raise errors.OpPrereqError("The secondary node cannot be"
4871 " the primary node.")
4872 _CheckNodeOnline(self, self.op.snode)
4873 _CheckNodeNotDrained(self, self.op.snode)
4874 self.secondaries.append(self.op.snode)
4876 nodenames = [pnode.name] + self.secondaries
4878 req_size = _ComputeDiskSize(self.op.disk_template,
4881 # Check lv size requirements
4882 if req_size is not None:
4883 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4885 for node in nodenames:
4886 info = nodeinfo[node]
4887 info.Raise("Cannot get current information from node %s" % node)
4889 vg_free = info.get('vg_free', None)
4890 if not isinstance(vg_free, int):
4891 raise errors.OpPrereqError("Can't compute free disk space on"
4893 if req_size > vg_free:
4894 raise errors.OpPrereqError("Not enough disk space on target node %s."
4895 " %d MB available, %d MB required" %
4896 (node, vg_free, req_size))
4898 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4901 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4902 result.Raise("OS '%s' not in supported os list for primary node %s" %
4903 (self.op.os_type, pnode.name), prereq=True)
4905 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4907 # memory check on primary node
4909 _CheckNodeFreeMemory(self, self.pnode.name,
4910 "creating instance %s" % self.op.instance_name,
4911 self.be_full[constants.BE_MEMORY],
4914 self.dry_run_result = list(nodenames)
4916 def Exec(self, feedback_fn):
4917 """Create and add the instance to the cluster.
4920 instance = self.op.instance_name
4921 pnode_name = self.pnode.name
4923 ht_kind = self.op.hypervisor
4924 if ht_kind in constants.HTS_REQ_PORT:
4925 network_port = self.cfg.AllocatePort()
4929 ##if self.op.vnc_bind_address is None:
4930 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4932 # this is needed because os.path.join does not accept None arguments
4933 if self.op.file_storage_dir is None:
4934 string_file_storage_dir = ""
4936 string_file_storage_dir = self.op.file_storage_dir
4938 # build the full file storage dir path
4939 file_storage_dir = os.path.normpath(os.path.join(
4940 self.cfg.GetFileStorageDir(),
4941 string_file_storage_dir, instance))
4944 disks = _GenerateDiskTemplate(self,
4945 self.op.disk_template,
4946 instance, pnode_name,
4950 self.op.file_driver,
4953 iobj = objects.Instance(name=instance, os=self.op.os_type,
4954 primary_node=pnode_name,
4955 nics=self.nics, disks=disks,
4956 disk_template=self.op.disk_template,
4958 network_port=network_port,
4959 beparams=self.op.beparams,
4960 hvparams=self.op.hvparams,
4961 hypervisor=self.op.hypervisor,
4964 feedback_fn("* creating instance disks...")
4966 _CreateDisks(self, iobj)
4967 except errors.OpExecError:
4968 self.LogWarning("Device creation failed, reverting...")
4970 _RemoveDisks(self, iobj)
4972 self.cfg.ReleaseDRBDMinors(instance)
4975 feedback_fn("adding instance %s to cluster config" % instance)
4977 self.cfg.AddInstance(iobj)
4978 # Declare that we don't want to remove the instance lock anymore, as we've
4979 # added the instance to the config
4980 del self.remove_locks[locking.LEVEL_INSTANCE]
4981 # Unlock all the nodes
4982 if self.op.mode == constants.INSTANCE_IMPORT:
4983 nodes_keep = [self.op.src_node]
4984 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4985 if node != self.op.src_node]
4986 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4987 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4989 self.context.glm.release(locking.LEVEL_NODE)
4990 del self.acquired_locks[locking.LEVEL_NODE]
4992 if self.op.wait_for_sync:
4993 disk_abort = not _WaitForSync(self, iobj)
4994 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4995 # make sure the disks are not degraded (still sync-ing is ok)
4997 feedback_fn("* checking mirrors status")
4998 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5003 _RemoveDisks(self, iobj)
5004 self.cfg.RemoveInstance(iobj.name)
5005 # Make sure the instance lock gets removed
5006 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5007 raise errors.OpExecError("There are some degraded disks for"
5010 feedback_fn("creating os for instance %s on node %s" %
5011 (instance, pnode_name))
5013 if iobj.disk_template != constants.DT_DISKLESS:
5014 if self.op.mode == constants.INSTANCE_CREATE:
5015 feedback_fn("* running the instance OS create scripts...")
5016 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5017 result.Raise("Could not add os for instance %s"
5018 " on node %s" % (instance, pnode_name))
5020 elif self.op.mode == constants.INSTANCE_IMPORT:
5021 feedback_fn("* running the instance OS import scripts...")
5022 src_node = self.op.src_node
5023 src_images = self.src_images
5024 cluster_name = self.cfg.GetClusterName()
5025 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5026 src_node, src_images,
5028 msg = import_result.fail_msg
5030 self.LogWarning("Error while importing the disk images for instance"
5031 " %s on node %s: %s" % (instance, pnode_name, msg))
5033 # also checked in the prereq part
5034 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5038 iobj.admin_up = True
5039 self.cfg.Update(iobj)
5040 logging.info("Starting instance %s on node %s", instance, pnode_name)
5041 feedback_fn("* starting instance...")
5042 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5043 result.Raise("Could not start instance")
5045 return list(iobj.all_nodes)
5048 class LUConnectConsole(NoHooksLU):
5049 """Connect to an instance's console.
5051 This is somewhat special in that it returns the command line that
5052 you need to run on the master node in order to connect to the
5056 _OP_REQP = ["instance_name"]
5059 def ExpandNames(self):
5060 self._ExpandAndLockInstance()
5062 def CheckPrereq(self):
5063 """Check prerequisites.
5065 This checks that the instance is in the cluster.
5068 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5069 assert self.instance is not None, \
5070 "Cannot retrieve locked instance %s" % self.op.instance_name
5071 _CheckNodeOnline(self, self.instance.primary_node)
5073 def Exec(self, feedback_fn):
5074 """Connect to the console of an instance
5077 instance = self.instance
5078 node = instance.primary_node
5080 node_insts = self.rpc.call_instance_list([node],
5081 [instance.hypervisor])[node]
5082 node_insts.Raise("Can't get node information from %s" % node)
5084 if instance.name not in node_insts.payload:
5085 raise errors.OpExecError("Instance %s is not running." % instance.name)
5087 logging.debug("Connecting to console of %s on %s", instance.name, node)
5089 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5090 cluster = self.cfg.GetClusterInfo()
5091 # beparams and hvparams are passed separately, to avoid editing the
5092 # instance and then saving the defaults in the instance itself.
5093 hvparams = cluster.FillHV(instance)
5094 beparams = cluster.FillBE(instance)
5095 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5098 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5101 class LUReplaceDisks(LogicalUnit):
5102 """Replace the disks of an instance.
5105 HPATH = "mirrors-replace"
5106 HTYPE = constants.HTYPE_INSTANCE
5107 _OP_REQP = ["instance_name", "mode", "disks"]
5110 def CheckArguments(self):
5111 if not hasattr(self.op, "remote_node"):
5112 self.op.remote_node = None
5113 if not hasattr(self.op, "iallocator"):
5114 self.op.iallocator = None
5116 # check for valid parameter combination
5117 cnt = [self.op.remote_node, self.op.iallocator].count(None)
5118 if self.op.mode == constants.REPLACE_DISK_CHG:
5120 raise errors.OpPrereqError("When changing the secondary either an"
5121 " iallocator script must be used or the"
5124 raise errors.OpPrereqError("Give either the iallocator or the new"
5125 " secondary, not both")
5126 else: # not replacing the secondary
5128 raise errors.OpPrereqError("The iallocator and new node options can"
5129 " be used only when changing the"
5132 def ExpandNames(self):
5133 self._ExpandAndLockInstance()
5135 if self.op.iallocator is not None:
5136 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5137 elif self.op.remote_node is not None:
5138 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5139 if remote_node is None:
5140 raise errors.OpPrereqError("Node '%s' not known" %
5141 self.op.remote_node)
5142 self.op.remote_node = remote_node
5143 # Warning: do not remove the locking of the new secondary here
5144 # unless DRBD8.AddChildren is changed to work in parallel;
5145 # currently it doesn't since parallel invocations of
5146 # FindUnusedMinor will conflict
5147 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5148 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5150 self.needed_locks[locking.LEVEL_NODE] = []
5151 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5153 def DeclareLocks(self, level):
5154 # If we're not already locking all nodes in the set we have to declare the
5155 # instance's primary/secondary nodes.
5156 if (level == locking.LEVEL_NODE and
5157 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5158 self._LockInstancesNodes()
5160 def _RunAllocator(self):
5161 """Compute a new secondary node using an IAllocator.
5164 ial = IAllocator(self,
5165 mode=constants.IALLOCATOR_MODE_RELOC,
5166 name=self.op.instance_name,
5167 relocate_from=[self.sec_node])
5169 ial.Run(self.op.iallocator)
5172 raise errors.OpPrereqError("Can't compute nodes using"
5173 " iallocator '%s': %s" % (self.op.iallocator,
5175 if len(ial.nodes) != ial.required_nodes:
5176 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5177 " of nodes (%s), required %s" %
5178 (len(ial.nodes), ial.required_nodes))
5179 self.op.remote_node = ial.nodes[0]
5180 self.LogInfo("Selected new secondary for the instance: %s",
5181 self.op.remote_node)
5183 def BuildHooksEnv(self):
5186 This runs on the master, the primary and all the secondaries.
5190 "MODE": self.op.mode,
5191 "NEW_SECONDARY": self.op.remote_node,
5192 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5194 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5196 self.cfg.GetMasterNode(),
5197 self.instance.primary_node,
5199 if self.op.remote_node is not None:
5200 nl.append(self.op.remote_node)
5203 def CheckPrereq(self):
5204 """Check prerequisites.
5206 This checks that the instance is in the cluster.
5209 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5210 assert instance is not None, \
5211 "Cannot retrieve locked instance %s" % self.op.instance_name
5212 self.instance = instance
5214 if instance.disk_template != constants.DT_DRBD8:
5215 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5218 if len(instance.secondary_nodes) != 1:
5219 raise errors.OpPrereqError("The instance has a strange layout,"
5220 " expected one secondary but found %d" %
5221 len(instance.secondary_nodes))
5223 self.sec_node = instance.secondary_nodes[0]
5225 if self.op.iallocator is not None:
5226 self._RunAllocator()
5228 remote_node = self.op.remote_node
5229 if remote_node is not None:
5230 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5231 assert self.remote_node_info is not None, \
5232 "Cannot retrieve locked node %s" % remote_node
5234 self.remote_node_info = None
5235 if remote_node == instance.primary_node:
5236 raise errors.OpPrereqError("The specified node is the primary node of"
5238 elif remote_node == self.sec_node:
5239 raise errors.OpPrereqError("The specified node is already the"
5240 " secondary node of the instance.")
5242 if self.op.mode == constants.REPLACE_DISK_PRI:
5243 n1 = self.tgt_node = instance.primary_node
5244 n2 = self.oth_node = self.sec_node
5245 elif self.op.mode == constants.REPLACE_DISK_SEC:
5246 n1 = self.tgt_node = self.sec_node
5247 n2 = self.oth_node = instance.primary_node
5248 elif self.op.mode == constants.REPLACE_DISK_CHG:
5249 n1 = self.new_node = remote_node
5250 n2 = self.oth_node = instance.primary_node
5251 self.tgt_node = self.sec_node
5252 _CheckNodeNotDrained(self, remote_node)
5254 raise errors.ProgrammerError("Unhandled disk replace mode")
5256 _CheckNodeOnline(self, n1)
5257 _CheckNodeOnline(self, n2)
5259 if not self.op.disks:
5260 self.op.disks = range(len(instance.disks))
5262 for disk_idx in self.op.disks:
5263 instance.FindDisk(disk_idx)
5265 def _ExecD8DiskOnly(self, feedback_fn):
5266 """Replace a disk on the primary or secondary for dbrd8.
5268 The algorithm for replace is quite complicated:
5270 1. for each disk to be replaced:
5272 1. create new LVs on the target node with unique names
5273 1. detach old LVs from the drbd device
5274 1. rename old LVs to name_replaced.<time_t>
5275 1. rename new LVs to old LVs
5276 1. attach the new LVs (with the old names now) to the drbd device
5278 1. wait for sync across all devices
5280 1. for each modified disk:
5282 1. remove old LVs (which have the name name_replaces.<time_t>)
5284 Failures are not very well handled.
5288 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5289 instance = self.instance
5291 vgname = self.cfg.GetVGName()
5294 tgt_node = self.tgt_node
5295 oth_node = self.oth_node
5297 # Step: check device activation
5298 self.proc.LogStep(1, steps_total, "check device existence")
5299 info("checking volume groups")
5300 my_vg = cfg.GetVGName()
5301 results = self.rpc.call_vg_list([oth_node, tgt_node])
5303 raise errors.OpExecError("Can't list volume groups on the nodes")
5304 for node in oth_node, tgt_node:
5306 res.Raise("Error checking node %s" % node)
5307 if my_vg not in res.payload:
5308 raise errors.OpExecError("Volume group '%s' not found on %s" %
5310 for idx, dev in enumerate(instance.disks):
5311 if idx not in self.op.disks:
5313 for node in tgt_node, oth_node:
5314 info("checking disk/%d on %s" % (idx, node))
5315 cfg.SetDiskID(dev, node)
5316 result = self.rpc.call_blockdev_find(node, dev)
5317 msg = result.fail_msg
5318 if not msg and not result.payload:
5319 msg = "disk not found"
5321 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5324 # Step: check other node consistency
5325 self.proc.LogStep(2, steps_total, "check peer consistency")
5326 for idx, dev in enumerate(instance.disks):
5327 if idx not in self.op.disks:
5329 info("checking disk/%d consistency on %s" % (idx, oth_node))
5330 if not _CheckDiskConsistency(self, dev, oth_node,
5331 oth_node==instance.primary_node):
5332 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5333 " to replace disks on this node (%s)" %
5334 (oth_node, tgt_node))
5336 # Step: create new storage
5337 self.proc.LogStep(3, steps_total, "allocate new storage")
5338 for idx, dev in enumerate(instance.disks):
5339 if idx not in self.op.disks:
5342 cfg.SetDiskID(dev, tgt_node)
5343 lv_names = [".disk%d_%s" % (idx, suf)
5344 for suf in ["data", "meta"]]
5345 names = _GenerateUniqueNames(self, lv_names)
5346 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5347 logical_id=(vgname, names[0]))
5348 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5349 logical_id=(vgname, names[1]))
5350 new_lvs = [lv_data, lv_meta]
5351 old_lvs = dev.children
5352 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5353 info("creating new local storage on %s for %s" %
5354 (tgt_node, dev.iv_name))
5355 # we pass force_create=True to force the LVM creation
5356 for new_lv in new_lvs:
5357 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5358 _GetInstanceInfoText(instance), False)
5360 # Step: for each lv, detach+rename*2+attach
5361 self.proc.LogStep(4, steps_total, "change drbd configuration")
5362 for dev, old_lvs, new_lvs in iv_names.itervalues():
5363 info("detaching %s drbd from local storage" % dev.iv_name)
5364 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5365 result.Raise("Can't detach drbd from local storage on node"
5366 " %s for device %s" % (tgt_node, dev.iv_name))
5368 #cfg.Update(instance)
5370 # ok, we created the new LVs, so now we know we have the needed
5371 # storage; as such, we proceed on the target node to rename
5372 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5373 # using the assumption that logical_id == physical_id (which in
5374 # turn is the unique_id on that node)
5376 # FIXME(iustin): use a better name for the replaced LVs
5377 temp_suffix = int(time.time())
5378 ren_fn = lambda d, suff: (d.physical_id[0],
5379 d.physical_id[1] + "_replaced-%s" % suff)
5380 # build the rename list based on what LVs exist on the node
5382 for to_ren in old_lvs:
5383 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5384 if not result.fail_msg and result.payload:
5386 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5388 info("renaming the old LVs on the target node")
5389 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5390 result.Raise("Can't rename old LVs on node %s" % tgt_node)
5391 # now we rename the new LVs to the old LVs
5392 info("renaming the new LVs on the target node")
5393 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5394 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5395 result.Raise("Can't rename new LVs on node %s" % tgt_node)
5397 for old, new in zip(old_lvs, new_lvs):
5398 new.logical_id = old.logical_id
5399 cfg.SetDiskID(new, tgt_node)
5401 for disk in old_lvs:
5402 disk.logical_id = ren_fn(disk, temp_suffix)
5403 cfg.SetDiskID(disk, tgt_node)
5405 # now that the new lvs have the old name, we can add them to the device
5406 info("adding new mirror component on %s" % tgt_node)
5407 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5408 msg = result.fail_msg
5410 for new_lv in new_lvs:
5411 msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
5413 warning("Can't rollback device %s: %s", dev, msg2,
5414 hint="cleanup manually the unused logical volumes")
5415 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5417 dev.children = new_lvs
5418 cfg.Update(instance)
5420 # Step: wait for sync
5422 # this can fail as the old devices are degraded and _WaitForSync
5423 # does a combined result over all disks, so we don't check its
5425 self.proc.LogStep(5, steps_total, "sync devices")
5426 _WaitForSync(self, instance, unlock=True)
5428 # so check manually all the devices
5429 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5430 cfg.SetDiskID(dev, instance.primary_node)
5431 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5432 msg = result.fail_msg
5433 if not msg and not result.payload:
5434 msg = "disk not found"
5436 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5438 if result.payload[5]:
5439 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5441 # Step: remove old storage
5442 self.proc.LogStep(6, steps_total, "removing old storage")
5443 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5444 info("remove logical volumes for %s" % name)
5446 cfg.SetDiskID(lv, tgt_node)
5447 msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
5449 warning("Can't remove old LV: %s" % msg,
5450 hint="manually remove unused LVs")
5453 def _ExecD8Secondary(self, feedback_fn):
5454 """Replace the secondary node for drbd8.
5456 The algorithm for replace is quite complicated:
5457 - for all disks of the instance:
5458 - create new LVs on the new node with same names
5459 - shutdown the drbd device on the old secondary
5460 - disconnect the drbd network on the primary
5461 - create the drbd device on the new secondary
5462 - network attach the drbd on the primary, using an artifice:
5463 the drbd code for Attach() will connect to the network if it
5464 finds a device which is connected to the good local disks but
5466 - wait for sync across all devices
5467 - remove all disks from the old secondary
5469 Failures are not very well handled.
5473 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5474 instance = self.instance
5478 old_node = self.tgt_node
5479 new_node = self.new_node
5480 pri_node = instance.primary_node
5482 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5483 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5484 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5487 # Step: check device activation
5488 self.proc.LogStep(1, steps_total, "check device existence")
5489 info("checking volume groups")
5490 my_vg = cfg.GetVGName()
5491 results = self.rpc.call_vg_list([pri_node, new_node])
5492 for node in pri_node, new_node:
5494 res.Raise("Error checking node %s" % node)
5495 if my_vg not in res.payload:
5496 raise errors.OpExecError("Volume group '%s' not found on %s" %
5498 for idx, dev in enumerate(instance.disks):
5499 if idx not in self.op.disks:
5501 info("checking disk/%d on %s" % (idx, pri_node))
5502 cfg.SetDiskID(dev, pri_node)
5503 result = self.rpc.call_blockdev_find(pri_node, dev)
5504 msg = result.fail_msg
5505 if not msg and not result.payload:
5506 msg = "disk not found"
5508 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5509 (idx, pri_node, msg))
5511 # Step: check other node consistency
5512 self.proc.LogStep(2, steps_total, "check peer consistency")
5513 for idx, dev in enumerate(instance.disks):
5514 if idx not in self.op.disks:
5516 info("checking disk/%d consistency on %s" % (idx, pri_node))
5517 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5518 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5519 " unsafe to replace the secondary" %
5522 # Step: create new storage
5523 self.proc.LogStep(3, steps_total, "allocate new storage")
5524 for idx, dev in enumerate(instance.disks):
5525 info("adding new local storage on %s for disk/%d" %
5527 # we pass force_create=True to force LVM creation
5528 for new_lv in dev.children:
5529 _CreateBlockDev(self, new_node, instance, new_lv, True,
5530 _GetInstanceInfoText(instance), False)
5532 # Step 4: dbrd minors and drbd setups changes
5533 # after this, we must manually remove the drbd minors on both the
5534 # error and the success paths
5535 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5537 logging.debug("Allocated minors %s" % (minors,))
5538 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5539 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5540 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5541 # create new devices on new_node; note that we create two IDs:
5542 # one without port, so the drbd will be activated without
5543 # networking information on the new node at this stage, and one
5544 # with network, for the latter activation in step 4
5545 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5546 if pri_node == o_node1:
5551 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5552 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5554 iv_names[idx] = (dev, dev.children, new_net_id)
5555 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5557 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5558 logical_id=new_alone_id,
5559 children=dev.children,
5562 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5563 _GetInstanceInfoText(instance), False)
5564 except errors.GenericError:
5565 self.cfg.ReleaseDRBDMinors(instance.name)
5568 for idx, dev in enumerate(instance.disks):
5569 # we have new devices, shutdown the drbd on the old secondary
5570 info("shutting down drbd for disk/%d on old node" % idx)
5571 cfg.SetDiskID(dev, old_node)
5572 msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
5574 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5576 hint="Please cleanup this device manually as soon as possible")
5578 info("detaching primary drbds from the network (=> standalone)")
5579 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5580 instance.disks)[pri_node]
5582 msg = result.fail_msg
5584 # detaches didn't succeed (unlikely)
5585 self.cfg.ReleaseDRBDMinors(instance.name)
5586 raise errors.OpExecError("Can't detach the disks from the network on"
5587 " old node: %s" % (msg,))
5589 # if we managed to detach at least one, we update all the disks of
5590 # the instance to point to the new secondary
5591 info("updating instance configuration")
5592 for dev, _, new_logical_id in iv_names.itervalues():
5593 dev.logical_id = new_logical_id
5594 cfg.SetDiskID(dev, pri_node)
5595 cfg.Update(instance)
5597 # and now perform the drbd attach
5598 info("attaching primary drbds to new secondary (standalone => connected)")
5599 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5600 instance.disks, instance.name,
5602 for to_node, to_result in result.items():
5603 msg = to_result.fail_msg
5605 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5606 hint="please do a gnt-instance info to see the"
5609 # this can fail as the old devices are degraded and _WaitForSync
5610 # does a combined result over all disks, so we don't check its
5612 self.proc.LogStep(5, steps_total, "sync devices")
5613 _WaitForSync(self, instance, unlock=True)
5615 # so check manually all the devices
5616 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5617 cfg.SetDiskID(dev, pri_node)
5618 result = self.rpc.call_blockdev_find(pri_node, dev)
5619 msg = result.fail_msg
5620 if not msg and not result.payload:
5621 msg = "disk not found"
5623 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5625 if result.payload[5]:
5626 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5628 self.proc.LogStep(6, steps_total, "removing old storage")
5629 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5630 info("remove logical volumes for disk/%d" % idx)
5632 cfg.SetDiskID(lv, old_node)
5633 msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
5635 warning("Can't remove LV on old secondary: %s", msg,
5636 hint="Cleanup stale volumes by hand")
5638 def Exec(self, feedback_fn):
5639 """Execute disk replacement.
5641 This dispatches the disk replacement to the appropriate handler.
5644 instance = self.instance
5646 # Activate the instance disks if we're replacing them on a down instance
5647 if not instance.admin_up:
5648 _StartInstanceDisks(self, instance, True)
5650 if self.op.mode == constants.REPLACE_DISK_CHG:
5651 fn = self._ExecD8Secondary
5653 fn = self._ExecD8DiskOnly
5655 ret = fn(feedback_fn)
5657 # Deactivate the instance disks if we're replacing them on a down instance
5658 if not instance.admin_up:
5659 _SafeShutdownInstanceDisks(self, instance)
5664 class LUGrowDisk(LogicalUnit):
5665 """Grow a disk of an instance.
5669 HTYPE = constants.HTYPE_INSTANCE
5670 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5673 def ExpandNames(self):
5674 self._ExpandAndLockInstance()
5675 self.needed_locks[locking.LEVEL_NODE] = []
5676 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5678 def DeclareLocks(self, level):
5679 if level == locking.LEVEL_NODE:
5680 self._LockInstancesNodes()
5682 def BuildHooksEnv(self):
5685 This runs on the master, the primary and all the secondaries.
5689 "DISK": self.op.disk,
5690 "AMOUNT": self.op.amount,
5692 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5694 self.cfg.GetMasterNode(),
5695 self.instance.primary_node,
5699 def CheckPrereq(self):
5700 """Check prerequisites.
5702 This checks that the instance is in the cluster.
5705 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5706 assert instance is not None, \
5707 "Cannot retrieve locked instance %s" % self.op.instance_name
5708 nodenames = list(instance.all_nodes)
5709 for node in nodenames:
5710 _CheckNodeOnline(self, node)
5713 self.instance = instance
5715 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5716 raise errors.OpPrereqError("Instance's disk layout does not support"
5719 self.disk = instance.FindDisk(self.op.disk)
5721 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5722 instance.hypervisor)
5723 for node in nodenames:
5724 info = nodeinfo[node]
5725 info.Raise("Cannot get current information from node %s" % node)
5726 vg_free = info.payload.get('vg_free', None)
5727 if not isinstance(vg_free, int):
5728 raise errors.OpPrereqError("Can't compute free disk space on"
5730 if self.op.amount > vg_free:
5731 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5732 " %d MiB available, %d MiB required" %
5733 (node, vg_free, self.op.amount))
5735 def Exec(self, feedback_fn):
5736 """Execute disk grow.
5739 instance = self.instance
5741 for node in instance.all_nodes:
5742 self.cfg.SetDiskID(disk, node)
5743 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5744 result.Raise("Grow request failed to node %s" % node)
5745 disk.RecordGrow(self.op.amount)
5746 self.cfg.Update(instance)
5747 if self.op.wait_for_sync:
5748 disk_abort = not _WaitForSync(self, instance)
5750 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5751 " status.\nPlease check the instance.")
5754 class LUQueryInstanceData(NoHooksLU):
5755 """Query runtime instance data.
5758 _OP_REQP = ["instances", "static"]
5761 def ExpandNames(self):
5762 self.needed_locks = {}
5763 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5765 if not isinstance(self.op.instances, list):
5766 raise errors.OpPrereqError("Invalid argument type 'instances'")
5768 if self.op.instances:
5769 self.wanted_names = []
5770 for name in self.op.instances:
5771 full_name = self.cfg.ExpandInstanceName(name)
5772 if full_name is None:
5773 raise errors.OpPrereqError("Instance '%s' not known" % name)
5774 self.wanted_names.append(full_name)
5775 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5777 self.wanted_names = None
5778 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5780 self.needed_locks[locking.LEVEL_NODE] = []
5781 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5783 def DeclareLocks(self, level):
5784 if level == locking.LEVEL_NODE:
5785 self._LockInstancesNodes()
5787 def CheckPrereq(self):
5788 """Check prerequisites.
5790 This only checks the optional instance list against the existing names.
5793 if self.wanted_names is None:
5794 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5796 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5797 in self.wanted_names]
5800 def _ComputeDiskStatus(self, instance, snode, dev):
5801 """Compute block device status.
5804 static = self.op.static
5806 self.cfg.SetDiskID(dev, instance.primary_node)
5807 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5808 if dev_pstatus.offline:
5811 dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5812 dev_pstatus = dev_pstatus.payload
5816 if dev.dev_type in constants.LDS_DRBD:
5817 # we change the snode then (otherwise we use the one passed in)
5818 if dev.logical_id[0] == instance.primary_node:
5819 snode = dev.logical_id[1]
5821 snode = dev.logical_id[0]
5823 if snode and not static:
5824 self.cfg.SetDiskID(dev, snode)
5825 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5826 if dev_sstatus.offline:
5829 dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5830 dev_sstatus = dev_sstatus.payload
5835 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5836 for child in dev.children]
5841 "iv_name": dev.iv_name,
5842 "dev_type": dev.dev_type,
5843 "logical_id": dev.logical_id,
5844 "physical_id": dev.physical_id,
5845 "pstatus": dev_pstatus,
5846 "sstatus": dev_sstatus,
5847 "children": dev_children,
5854 def Exec(self, feedback_fn):
5855 """Gather and return data"""
5858 cluster = self.cfg.GetClusterInfo()
5860 for instance in self.wanted_instances:
5861 if not self.op.static:
5862 remote_info = self.rpc.call_instance_info(instance.primary_node,
5864 instance.hypervisor)
5865 remote_info.Raise("Error checking node %s" % instance.primary_node)
5866 remote_info = remote_info.payload
5867 if remote_info and "state" in remote_info:
5870 remote_state = "down"
5873 if instance.admin_up:
5876 config_state = "down"
5878 disks = [self._ComputeDiskStatus(instance, None, device)
5879 for device in instance.disks]
5882 "name": instance.name,
5883 "config_state": config_state,
5884 "run_state": remote_state,
5885 "pnode": instance.primary_node,
5886 "snodes": instance.secondary_nodes,
5888 # this happens to be the same format used for hooks
5889 "nics": _NICListToTuple(self, instance.nics),
5891 "hypervisor": instance.hypervisor,
5892 "network_port": instance.network_port,
5893 "hv_instance": instance.hvparams,
5894 "hv_actual": cluster.FillHV(instance),
5895 "be_instance": instance.beparams,
5896 "be_actual": cluster.FillBE(instance),
5899 result[instance.name] = idict
5904 class LUSetInstanceParams(LogicalUnit):
5905 """Modifies an instances's parameters.
5908 HPATH = "instance-modify"
5909 HTYPE = constants.HTYPE_INSTANCE
5910 _OP_REQP = ["instance_name"]
5913 def CheckArguments(self):
5914 if not hasattr(self.op, 'nics'):
5916 if not hasattr(self.op, 'disks'):
5918 if not hasattr(self.op, 'beparams'):
5919 self.op.beparams = {}
5920 if not hasattr(self.op, 'hvparams'):
5921 self.op.hvparams = {}
5922 self.op.force = getattr(self.op, "force", False)
5923 if not (self.op.nics or self.op.disks or
5924 self.op.hvparams or self.op.beparams):
5925 raise errors.OpPrereqError("No changes submitted")
5929 for disk_op, disk_dict in self.op.disks:
5930 if disk_op == constants.DDM_REMOVE:
5933 elif disk_op == constants.DDM_ADD:
5936 if not isinstance(disk_op, int):
5937 raise errors.OpPrereqError("Invalid disk index")
5938 if not isinstance(disk_dict, dict):
5939 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
5940 raise errors.OpPrereqError(msg)
5942 if disk_op == constants.DDM_ADD:
5943 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5944 if mode not in constants.DISK_ACCESS_SET:
5945 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5946 size = disk_dict.get('size', None)
5948 raise errors.OpPrereqError("Required disk parameter size missing")
5951 except ValueError, err:
5952 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5954 disk_dict['size'] = size
5956 # modification of disk
5957 if 'size' in disk_dict:
5958 raise errors.OpPrereqError("Disk size change not possible, use"
5961 if disk_addremove > 1:
5962 raise errors.OpPrereqError("Only one disk add or remove operation"
5963 " supported at a time")
5967 for nic_op, nic_dict in self.op.nics:
5968 if nic_op == constants.DDM_REMOVE:
5971 elif nic_op == constants.DDM_ADD:
5974 if not isinstance(nic_op, int):
5975 raise errors.OpPrereqError("Invalid nic index")
5976 if not isinstance(nic_dict, dict):
5977 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
5978 raise errors.OpPrereqError(msg)
5980 # nic_dict should be a dict
5981 nic_ip = nic_dict.get('ip', None)
5982 if nic_ip is not None:
5983 if nic_ip.lower() == constants.VALUE_NONE:
5984 nic_dict['ip'] = None
5986 if not utils.IsValidIP(nic_ip):
5987 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5989 nic_bridge = nic_dict.get('bridge', None)
5990 nic_link = nic_dict.get('link', None)
5991 if nic_bridge and nic_link:
5992 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5993 " at the same time")
5994 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5995 nic_dict['bridge'] = None
5996 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5997 nic_dict['link'] = None
5999 if nic_op == constants.DDM_ADD:
6000 nic_mac = nic_dict.get('mac', None)
6002 nic_dict['mac'] = constants.VALUE_AUTO
6004 if 'mac' in nic_dict:
6005 nic_mac = nic_dict['mac']
6006 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6007 if not utils.IsValidMac(nic_mac):
6008 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6009 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6010 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6011 " modifying an existing nic")
6013 if nic_addremove > 1:
6014 raise errors.OpPrereqError("Only one NIC add or remove operation"
6015 " supported at a time")
6017 def ExpandNames(self):
6018 self._ExpandAndLockInstance()
6019 self.needed_locks[locking.LEVEL_NODE] = []
6020 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6022 def DeclareLocks(self, level):
6023 if level == locking.LEVEL_NODE:
6024 self._LockInstancesNodes()
6026 def BuildHooksEnv(self):
6029 This runs on the master, primary and secondaries.
6033 if constants.BE_MEMORY in self.be_new:
6034 args['memory'] = self.be_new[constants.BE_MEMORY]
6035 if constants.BE_VCPUS in self.be_new:
6036 args['vcpus'] = self.be_new[constants.BE_VCPUS]
6037 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6038 # information at all.
6041 nic_override = dict(self.op.nics)
6042 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6043 for idx, nic in enumerate(self.instance.nics):
6044 if idx in nic_override:
6045 this_nic_override = nic_override[idx]
6047 this_nic_override = {}
6048 if 'ip' in this_nic_override:
6049 ip = this_nic_override['ip']
6052 if 'mac' in this_nic_override:
6053 mac = this_nic_override['mac']
6056 if idx in self.nic_pnew:
6057 nicparams = self.nic_pnew[idx]
6059 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6060 mode = nicparams[constants.NIC_MODE]
6061 link = nicparams[constants.NIC_LINK]
6062 args['nics'].append((ip, mac, mode, link))
6063 if constants.DDM_ADD in nic_override:
6064 ip = nic_override[constants.DDM_ADD].get('ip', None)
6065 mac = nic_override[constants.DDM_ADD]['mac']
6066 nicparams = self.nic_pnew[constants.DDM_ADD]
6067 mode = nicparams[constants.NIC_MODE]
6068 link = nicparams[constants.NIC_LINK]
6069 args['nics'].append((ip, mac, mode, link))
6070 elif constants.DDM_REMOVE in nic_override:
6071 del args['nics'][-1]
6073 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6074 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6077 def _GetUpdatedParams(self, old_params, update_dict,
6078 default_values, parameter_types):
6079 """Return the new params dict for the given params.
6081 @type old_params: dict
6082 @param old_params: old parameters
6083 @type update_dict: dict
6084 @param update_dict: dict containing new parameter values,
6085 or constants.VALUE_DEFAULT to reset the
6086 parameter to its default value
6087 @type default_values: dict
6088 @param default_values: default values for the filled parameters
6089 @type parameter_types: dict
6090 @param parameter_types: dict mapping target dict keys to types
6091 in constants.ENFORCEABLE_TYPES
6092 @rtype: (dict, dict)
6093 @return: (new_parameters, filled_parameters)
6096 params_copy = copy.deepcopy(old_params)
6097 for key, val in update_dict.iteritems():
6098 if val == constants.VALUE_DEFAULT:
6100 del params_copy[key]
6104 params_copy[key] = val
6105 utils.ForceDictType(params_copy, parameter_types)
6106 params_filled = objects.FillDict(default_values, params_copy)
6107 return (params_copy, params_filled)
6109 def CheckPrereq(self):
6110 """Check prerequisites.
6112 This only checks the instance list against the existing names.
6115 self.force = self.op.force
6117 # checking the new params on the primary/secondary nodes
6119 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6120 cluster = self.cluster = self.cfg.GetClusterInfo()
6121 assert self.instance is not None, \
6122 "Cannot retrieve locked instance %s" % self.op.instance_name
6123 pnode = instance.primary_node
6124 nodelist = list(instance.all_nodes)
6126 # hvparams processing
6127 if self.op.hvparams:
6128 i_hvdict, hv_new = self._GetUpdatedParams(
6129 instance.hvparams, self.op.hvparams,
6130 cluster.hvparams[instance.hypervisor],
6131 constants.HVS_PARAMETER_TYPES)
6133 hypervisor.GetHypervisor(
6134 instance.hypervisor).CheckParameterSyntax(hv_new)
6135 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6136 self.hv_new = hv_new # the new actual values
6137 self.hv_inst = i_hvdict # the new dict (without defaults)
6139 self.hv_new = self.hv_inst = {}
6141 # beparams processing
6142 if self.op.beparams:
6143 i_bedict, be_new = self._GetUpdatedParams(
6144 instance.beparams, self.op.beparams,
6145 cluster.beparams[constants.PP_DEFAULT],
6146 constants.BES_PARAMETER_TYPES)
6147 self.be_new = be_new # the new actual values
6148 self.be_inst = i_bedict # the new dict (without defaults)
6150 self.be_new = self.be_inst = {}
6154 if constants.BE_MEMORY in self.op.beparams and not self.force:
6155 mem_check_list = [pnode]
6156 if be_new[constants.BE_AUTO_BALANCE]:
6157 # either we changed auto_balance to yes or it was from before
6158 mem_check_list.extend(instance.secondary_nodes)
6159 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6160 instance.hypervisor)
6161 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6162 instance.hypervisor)
6163 pninfo = nodeinfo[pnode]
6164 msg = pninfo.fail_msg
6166 # Assume the primary node is unreachable and go ahead
6167 self.warn.append("Can't get info from primary node %s: %s" %
6169 elif not isinstance(pninfo.payload.get('memory_free', None), int):
6170 self.warn.append("Node data from primary node %s doesn't contain"
6171 " free memory information" % pnode)
6172 elif instance_info.fail_msg:
6173 self.warn.append("Can't get instance runtime information: %s" %
6174 instance_info.fail_msg)
6176 if instance_info.payload:
6177 current_mem = int(instance_info.payload['memory'])
6179 # Assume instance not running
6180 # (there is a slight race condition here, but it's not very probable,
6181 # and we have no other way to check)
6183 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6184 pninfo.payload['memory_free'])
6186 raise errors.OpPrereqError("This change will prevent the instance"
6187 " from starting, due to %d MB of memory"
6188 " missing on its primary node" % miss_mem)
6190 if be_new[constants.BE_AUTO_BALANCE]:
6191 for node, nres in nodeinfo.items():
6192 if node not in instance.secondary_nodes:
6196 self.warn.append("Can't get info from secondary node %s: %s" %
6198 elif not isinstance(nres.payload.get('memory_free', None), int):
6199 self.warn.append("Secondary node %s didn't return free"
6200 " memory information" % node)
6201 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6202 self.warn.append("Not enough memory to failover instance to"
6203 " secondary node %s" % node)
6208 for nic_op, nic_dict in self.op.nics:
6209 if nic_op == constants.DDM_REMOVE:
6210 if not instance.nics:
6211 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6213 if nic_op != constants.DDM_ADD:
6215 if nic_op < 0 or nic_op >= len(instance.nics):
6216 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6218 (nic_op, len(instance.nics)))
6219 old_nic_params = instance.nics[nic_op].nicparams
6220 old_nic_ip = instance.nics[nic_op].ip
6225 update_params_dict = dict([(key, nic_dict[key])
6226 for key in constants.NICS_PARAMETERS
6227 if key in nic_dict])
6229 if 'bridge' in nic_dict:
6230 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6232 new_nic_params, new_filled_nic_params = \
6233 self._GetUpdatedParams(old_nic_params, update_params_dict,
6234 cluster.nicparams[constants.PP_DEFAULT],
6235 constants.NICS_PARAMETER_TYPES)
6236 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6237 self.nic_pinst[nic_op] = new_nic_params
6238 self.nic_pnew[nic_op] = new_filled_nic_params
6239 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6241 if new_nic_mode == constants.NIC_MODE_BRIDGED:
6242 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6243 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6245 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6247 self.warn.append(msg)
6249 raise errors.OpPrereqError(msg)
6250 if new_nic_mode == constants.NIC_MODE_ROUTED:
6251 if 'ip' in nic_dict:
6252 nic_ip = nic_dict['ip']
6256 raise errors.OpPrereqError('Cannot set the nic ip to None'
6258 if 'mac' in nic_dict:
6259 nic_mac = nic_dict['mac']
6261 raise errors.OpPrereqError('Cannot set the nic mac to None')
6262 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6263 # otherwise generate the mac
6264 nic_dict['mac'] = self.cfg.GenerateMAC()
6266 # or validate/reserve the current one
6267 if self.cfg.IsMacInUse(nic_mac):
6268 raise errors.OpPrereqError("MAC address %s already in use"
6269 " in cluster" % nic_mac)
6272 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6273 raise errors.OpPrereqError("Disk operations not supported for"
6274 " diskless instances")
6275 for disk_op, disk_dict in self.op.disks:
6276 if disk_op == constants.DDM_REMOVE:
6277 if len(instance.disks) == 1:
6278 raise errors.OpPrereqError("Cannot remove the last disk of"
6280 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6281 ins_l = ins_l[pnode]
6282 msg = ins_l.fail_msg
6284 raise errors.OpPrereqError("Can't contact node %s: %s" %
6286 if instance.name in ins_l.payload:
6287 raise errors.OpPrereqError("Instance is running, can't remove"
6290 if (disk_op == constants.DDM_ADD and
6291 len(instance.nics) >= constants.MAX_DISKS):
6292 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6293 " add more" % constants.MAX_DISKS)
6294 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6296 if disk_op < 0 or disk_op >= len(instance.disks):
6297 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6299 (disk_op, len(instance.disks)))
6303 def Exec(self, feedback_fn):
6304 """Modifies an instance.
6306 All parameters take effect only at the next restart of the instance.
6309 # Process here the warnings from CheckPrereq, as we don't have a
6310 # feedback_fn there.
6311 for warn in self.warn:
6312 feedback_fn("WARNING: %s" % warn)
6315 instance = self.instance
6316 cluster = self.cluster
6318 for disk_op, disk_dict in self.op.disks:
6319 if disk_op == constants.DDM_REMOVE:
6320 # remove the last disk
6321 device = instance.disks.pop()
6322 device_idx = len(instance.disks)
6323 for node, disk in device.ComputeNodeTree(instance.primary_node):
6324 self.cfg.SetDiskID(disk, node)
6325 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6327 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6328 " continuing anyway", device_idx, node, msg)
6329 result.append(("disk/%d" % device_idx, "remove"))
6330 elif disk_op == constants.DDM_ADD:
6332 if instance.disk_template == constants.DT_FILE:
6333 file_driver, file_path = instance.disks[0].logical_id
6334 file_path = os.path.dirname(file_path)
6336 file_driver = file_path = None
6337 disk_idx_base = len(instance.disks)
6338 new_disk = _GenerateDiskTemplate(self,
6339 instance.disk_template,
6340 instance.name, instance.primary_node,
6341 instance.secondary_nodes,
6346 instance.disks.append(new_disk)
6347 info = _GetInstanceInfoText(instance)
6349 logging.info("Creating volume %s for instance %s",
6350 new_disk.iv_name, instance.name)
6351 # Note: this needs to be kept in sync with _CreateDisks
6353 for node in instance.all_nodes:
6354 f_create = node == instance.primary_node
6356 _CreateBlockDev(self, node, instance, new_disk,
6357 f_create, info, f_create)
6358 except errors.OpExecError, err:
6359 self.LogWarning("Failed to create volume %s (%s) on"
6361 new_disk.iv_name, new_disk, node, err)
6362 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6363 (new_disk.size, new_disk.mode)))
6365 # change a given disk
6366 instance.disks[disk_op].mode = disk_dict['mode']
6367 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6369 for nic_op, nic_dict in self.op.nics:
6370 if nic_op == constants.DDM_REMOVE:
6371 # remove the last nic
6372 del instance.nics[-1]
6373 result.append(("nic.%d" % len(instance.nics), "remove"))
6374 elif nic_op == constants.DDM_ADD:
6375 # mac and bridge should be set, by now
6376 mac = nic_dict['mac']
6377 ip = nic_dict.get('ip', None)
6378 nicparams = self.nic_pinst[constants.DDM_ADD]
6379 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6380 instance.nics.append(new_nic)
6381 result.append(("nic.%d" % (len(instance.nics) - 1),
6382 "add:mac=%s,ip=%s,mode=%s,link=%s" %
6383 (new_nic.mac, new_nic.ip,
6384 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6385 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6388 for key in 'mac', 'ip':
6390 setattr(instance.nics[nic_op], key, nic_dict[key])
6391 if nic_op in self.nic_pnew:
6392 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6393 for key, val in nic_dict.iteritems():
6394 result.append(("nic.%s/%d" % (key, nic_op), val))
6397 if self.op.hvparams:
6398 instance.hvparams = self.hv_inst
6399 for key, val in self.op.hvparams.iteritems():
6400 result.append(("hv/%s" % key, val))
6403 if self.op.beparams:
6404 instance.beparams = self.be_inst
6405 for key, val in self.op.beparams.iteritems():
6406 result.append(("be/%s" % key, val))
6408 self.cfg.Update(instance)
6413 class LUQueryExports(NoHooksLU):
6414 """Query the exports list
6417 _OP_REQP = ['nodes']
6420 def ExpandNames(self):
6421 self.needed_locks = {}
6422 self.share_locks[locking.LEVEL_NODE] = 1
6423 if not self.op.nodes:
6424 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6426 self.needed_locks[locking.LEVEL_NODE] = \
6427 _GetWantedNodes(self, self.op.nodes)
6429 def CheckPrereq(self):
6430 """Check prerequisites.
6433 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6435 def Exec(self, feedback_fn):
6436 """Compute the list of all the exported system images.
6439 @return: a dictionary with the structure node->(export-list)
6440 where export-list is a list of the instances exported on
6444 rpcresult = self.rpc.call_export_list(self.nodes)
6446 for node in rpcresult:
6447 if rpcresult[node].fail_msg:
6448 result[node] = False
6450 result[node] = rpcresult[node].payload
6455 class LUExportInstance(LogicalUnit):
6456 """Export an instance to an image in the cluster.
6459 HPATH = "instance-export"
6460 HTYPE = constants.HTYPE_INSTANCE
6461 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6464 def ExpandNames(self):
6465 self._ExpandAndLockInstance()
6466 # FIXME: lock only instance primary and destination node
6468 # Sad but true, for now we have do lock all nodes, as we don't know where
6469 # the previous export might be, and and in this LU we search for it and
6470 # remove it from its current node. In the future we could fix this by:
6471 # - making a tasklet to search (share-lock all), then create the new one,
6472 # then one to remove, after
6473 # - removing the removal operation altogether
6474 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6476 def DeclareLocks(self, level):
6477 """Last minute lock declaration."""
6478 # All nodes are locked anyway, so nothing to do here.
6480 def BuildHooksEnv(self):
6483 This will run on the master, primary node and target node.
6487 "EXPORT_NODE": self.op.target_node,
6488 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6490 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6491 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6492 self.op.target_node]
6495 def CheckPrereq(self):
6496 """Check prerequisites.
6498 This checks that the instance and node names are valid.
6501 instance_name = self.op.instance_name
6502 self.instance = self.cfg.GetInstanceInfo(instance_name)
6503 assert self.instance is not None, \
6504 "Cannot retrieve locked instance %s" % self.op.instance_name
6505 _CheckNodeOnline(self, self.instance.primary_node)
6507 self.dst_node = self.cfg.GetNodeInfo(
6508 self.cfg.ExpandNodeName(self.op.target_node))
6510 if self.dst_node is None:
6511 # This is wrong node name, not a non-locked node
6512 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6513 _CheckNodeOnline(self, self.dst_node.name)
6514 _CheckNodeNotDrained(self, self.dst_node.name)
6516 # instance disk type verification
6517 for disk in self.instance.disks:
6518 if disk.dev_type == constants.LD_FILE:
6519 raise errors.OpPrereqError("Export not supported for instances with"
6520 " file-based disks")
6522 def Exec(self, feedback_fn):
6523 """Export an instance to an image in the cluster.
6526 instance = self.instance
6527 dst_node = self.dst_node
6528 src_node = instance.primary_node
6529 if self.op.shutdown:
6530 # shutdown the instance, but not the disks
6531 result = self.rpc.call_instance_shutdown(src_node, instance)
6532 result.Raise("Could not shutdown instance %s on"
6533 " node %s" % (instance.name, src_node))
6535 vgname = self.cfg.GetVGName()
6539 # set the disks ID correctly since call_instance_start needs the
6540 # correct drbd minor to create the symlinks
6541 for disk in instance.disks:
6542 self.cfg.SetDiskID(disk, src_node)
6545 for idx, disk in enumerate(instance.disks):
6546 # result.payload will be a snapshot of an lvm leaf of the one we passed
6547 result = self.rpc.call_blockdev_snapshot(src_node, disk)
6548 msg = result.fail_msg
6550 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6552 snap_disks.append(False)
6554 disk_id = (vgname, result.payload)
6555 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6556 logical_id=disk_id, physical_id=disk_id,
6557 iv_name=disk.iv_name)
6558 snap_disks.append(new_dev)
6561 if self.op.shutdown and instance.admin_up:
6562 result = self.rpc.call_instance_start(src_node, instance, None, None)
6563 msg = result.fail_msg
6565 _ShutdownInstanceDisks(self, instance)
6566 raise errors.OpExecError("Could not start instance: %s" % msg)
6568 # TODO: check for size
6570 cluster_name = self.cfg.GetClusterName()
6571 for idx, dev in enumerate(snap_disks):
6573 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6574 instance, cluster_name, idx)
6575 msg = result.fail_msg
6577 self.LogWarning("Could not export disk/%s from node %s to"
6578 " node %s: %s", idx, src_node, dst_node.name, msg)
6579 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6581 self.LogWarning("Could not remove snapshot for disk/%d from node"
6582 " %s: %s", idx, src_node, msg)
6584 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6585 msg = result.fail_msg
6587 self.LogWarning("Could not finalize export for instance %s"
6588 " on node %s: %s", instance.name, dst_node.name, msg)
6590 nodelist = self.cfg.GetNodeList()
6591 nodelist.remove(dst_node.name)
6593 # on one-node clusters nodelist will be empty after the removal
6594 # if we proceed the backup would be removed because OpQueryExports
6595 # substitutes an empty list with the full cluster node list.
6596 iname = instance.name
6598 exportlist = self.rpc.call_export_list(nodelist)
6599 for node in exportlist:
6600 if exportlist[node].fail_msg:
6602 if iname in exportlist[node].payload:
6603 msg = self.rpc.call_export_remove(node, iname).fail_msg
6605 self.LogWarning("Could not remove older export for instance %s"
6606 " on node %s: %s", iname, node, msg)
6609 class LURemoveExport(NoHooksLU):
6610 """Remove exports related to the named instance.
6613 _OP_REQP = ["instance_name"]
6616 def ExpandNames(self):
6617 self.needed_locks = {}
6618 # We need all nodes to be locked in order for RemoveExport to work, but we
6619 # don't need to lock the instance itself, as nothing will happen to it (and
6620 # we can remove exports also for a removed instance)
6621 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6623 def CheckPrereq(self):
6624 """Check prerequisites.
6628 def Exec(self, feedback_fn):
6629 """Remove any export.
6632 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6633 # If the instance was not found we'll try with the name that was passed in.
6634 # This will only work if it was an FQDN, though.
6636 if not instance_name:
6638 instance_name = self.op.instance_name
6640 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6641 exportlist = self.rpc.call_export_list(locked_nodes)
6643 for node in exportlist:
6644 msg = exportlist[node].fail_msg
6646 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6648 if instance_name in exportlist[node].payload:
6650 result = self.rpc.call_export_remove(node, instance_name)
6651 msg = result.fail_msg
6653 logging.error("Could not remove export for instance %s"
6654 " on node %s: %s", instance_name, node, msg)
6656 if fqdn_warn and not found:
6657 feedback_fn("Export not found. If trying to remove an export belonging"
6658 " to a deleted instance please use its Fully Qualified"
6662 class TagsLU(NoHooksLU):
6665 This is an abstract class which is the parent of all the other tags LUs.
6669 def ExpandNames(self):
6670 self.needed_locks = {}
6671 if self.op.kind == constants.TAG_NODE:
6672 name = self.cfg.ExpandNodeName(self.op.name)
6674 raise errors.OpPrereqError("Invalid node name (%s)" %
6677 self.needed_locks[locking.LEVEL_NODE] = name
6678 elif self.op.kind == constants.TAG_INSTANCE:
6679 name = self.cfg.ExpandInstanceName(self.op.name)
6681 raise errors.OpPrereqError("Invalid instance name (%s)" %
6684 self.needed_locks[locking.LEVEL_INSTANCE] = name
6686 def CheckPrereq(self):
6687 """Check prerequisites.
6690 if self.op.kind == constants.TAG_CLUSTER:
6691 self.target = self.cfg.GetClusterInfo()
6692 elif self.op.kind == constants.TAG_NODE:
6693 self.target = self.cfg.GetNodeInfo(self.op.name)
6694 elif self.op.kind == constants.TAG_INSTANCE:
6695 self.target = self.cfg.GetInstanceInfo(self.op.name)
6697 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6701 class LUGetTags(TagsLU):
6702 """Returns the tags of a given object.
6705 _OP_REQP = ["kind", "name"]
6708 def Exec(self, feedback_fn):
6709 """Returns the tag list.
6712 return list(self.target.GetTags())
6715 class LUSearchTags(NoHooksLU):
6716 """Searches the tags for a given pattern.
6719 _OP_REQP = ["pattern"]
6722 def ExpandNames(self):
6723 self.needed_locks = {}
6725 def CheckPrereq(self):
6726 """Check prerequisites.
6728 This checks the pattern passed for validity by compiling it.
6732 self.re = re.compile(self.op.pattern)
6733 except re.error, err:
6734 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6735 (self.op.pattern, err))
6737 def Exec(self, feedback_fn):
6738 """Returns the tag list.
6742 tgts = [("/cluster", cfg.GetClusterInfo())]
6743 ilist = cfg.GetAllInstancesInfo().values()
6744 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6745 nlist = cfg.GetAllNodesInfo().values()
6746 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6748 for path, target in tgts:
6749 for tag in target.GetTags():
6750 if self.re.search(tag):
6751 results.append((path, tag))
6755 class LUAddTags(TagsLU):
6756 """Sets a tag on a given object.
6759 _OP_REQP = ["kind", "name", "tags"]
6762 def CheckPrereq(self):
6763 """Check prerequisites.
6765 This checks the type and length of the tag name and value.
6768 TagsLU.CheckPrereq(self)
6769 for tag in self.op.tags:
6770 objects.TaggableObject.ValidateTag(tag)
6772 def Exec(self, feedback_fn):
6777 for tag in self.op.tags:
6778 self.target.AddTag(tag)
6779 except errors.TagError, err:
6780 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6782 self.cfg.Update(self.target)
6783 except errors.ConfigurationError:
6784 raise errors.OpRetryError("There has been a modification to the"
6785 " config file and the operation has been"
6786 " aborted. Please retry.")
6789 class LUDelTags(TagsLU):
6790 """Delete a list of tags from a given object.
6793 _OP_REQP = ["kind", "name", "tags"]
6796 def CheckPrereq(self):
6797 """Check prerequisites.
6799 This checks that we have the given tag.
6802 TagsLU.CheckPrereq(self)
6803 for tag in self.op.tags:
6804 objects.TaggableObject.ValidateTag(tag)
6805 del_tags = frozenset(self.op.tags)
6806 cur_tags = self.target.GetTags()
6807 if not del_tags <= cur_tags:
6808 diff_tags = del_tags - cur_tags
6809 diff_names = ["'%s'" % tag for tag in diff_tags]
6811 raise errors.OpPrereqError("Tag(s) %s not found" %
6812 (",".join(diff_names)))
6814 def Exec(self, feedback_fn):
6815 """Remove the tag from the object.
6818 for tag in self.op.tags:
6819 self.target.RemoveTag(tag)
6821 self.cfg.Update(self.target)
6822 except errors.ConfigurationError:
6823 raise errors.OpRetryError("There has been a modification to the"
6824 " config file and the operation has been"
6825 " aborted. Please retry.")
6828 class LUTestDelay(NoHooksLU):
6829 """Sleep for a specified amount of time.
6831 This LU sleeps on the master and/or nodes for a specified amount of
6835 _OP_REQP = ["duration", "on_master", "on_nodes"]
6838 def ExpandNames(self):
6839 """Expand names and set required locks.
6841 This expands the node list, if any.
6844 self.needed_locks = {}
6845 if self.op.on_nodes:
6846 # _GetWantedNodes can be used here, but is not always appropriate to use
6847 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6849 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6850 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6852 def CheckPrereq(self):
6853 """Check prerequisites.
6857 def Exec(self, feedback_fn):
6858 """Do the actual sleep.
6861 if self.op.on_master:
6862 if not utils.TestDelay(self.op.duration):
6863 raise errors.OpExecError("Error during master delay test")
6864 if self.op.on_nodes:
6865 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6866 for node, node_result in result.items():
6867 node_result.Raise("Failure during rpc call to node %s" % node)
6870 class IAllocator(object):
6871 """IAllocator framework.
6873 An IAllocator instance has three sets of attributes:
6874 - cfg that is needed to query the cluster
6875 - input data (all members of the _KEYS class attribute are required)
6876 - four buffer attributes (in|out_data|text), that represent the
6877 input (to the external script) in text and data structure format,
6878 and the output from it, again in two formats
6879 - the result variables from the script (success, info, nodes) for
6884 "mem_size", "disks", "disk_template",
6885 "os", "tags", "nics", "vcpus", "hypervisor",
6891 def __init__(self, lu, mode, name, **kwargs):
6893 # init buffer variables
6894 self.in_text = self.out_text = self.in_data = self.out_data = None
6895 # init all input fields so that pylint is happy
6898 self.mem_size = self.disks = self.disk_template = None
6899 self.os = self.tags = self.nics = self.vcpus = None
6900 self.hypervisor = None
6901 self.relocate_from = None
6903 self.required_nodes = None
6904 # init result fields
6905 self.success = self.info = self.nodes = None
6906 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6907 keyset = self._ALLO_KEYS
6908 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6909 keyset = self._RELO_KEYS
6911 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6912 " IAllocator" % self.mode)
6914 if key not in keyset:
6915 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6916 " IAllocator" % key)
6917 setattr(self, key, kwargs[key])
6919 if key not in kwargs:
6920 raise errors.ProgrammerError("Missing input parameter '%s' to"
6921 " IAllocator" % key)
6922 self._BuildInputData()
6924 def _ComputeClusterData(self):
6925 """Compute the generic allocator input data.
6927 This is the data that is independent of the actual operation.
6931 cluster_info = cfg.GetClusterInfo()
6934 "version": constants.IALLOCATOR_VERSION,
6935 "cluster_name": cfg.GetClusterName(),
6936 "cluster_tags": list(cluster_info.GetTags()),
6937 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6938 # we don't have job IDs
6940 iinfo = cfg.GetAllInstancesInfo().values()
6941 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6945 node_list = cfg.GetNodeList()
6947 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6948 hypervisor_name = self.hypervisor
6949 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6950 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6952 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6954 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6955 cluster_info.enabled_hypervisors)
6956 for nname, nresult in node_data.items():
6957 # first fill in static (config-based) values
6958 ninfo = cfg.GetNodeInfo(nname)
6960 "tags": list(ninfo.GetTags()),
6961 "primary_ip": ninfo.primary_ip,
6962 "secondary_ip": ninfo.secondary_ip,
6963 "offline": ninfo.offline,
6964 "drained": ninfo.drained,
6965 "master_candidate": ninfo.master_candidate,
6968 if not ninfo.offline:
6969 nresult.Raise("Can't get data for node %s" % nname)
6970 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
6972 remote_info = nresult.payload
6973 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6974 'vg_size', 'vg_free', 'cpu_total']:
6975 if attr not in remote_info:
6976 raise errors.OpExecError("Node '%s' didn't return attribute"
6977 " '%s'" % (nname, attr))
6978 if not isinstance(remote_info[attr], int):
6979 raise errors.OpExecError("Node '%s' returned invalid value"
6981 (nname, attr, remote_info[attr]))
6982 # compute memory used by primary instances
6983 i_p_mem = i_p_up_mem = 0
6984 for iinfo, beinfo in i_list:
6985 if iinfo.primary_node == nname:
6986 i_p_mem += beinfo[constants.BE_MEMORY]
6987 if iinfo.name not in node_iinfo[nname].payload:
6990 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6991 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6992 remote_info['memory_free'] -= max(0, i_mem_diff)
6995 i_p_up_mem += beinfo[constants.BE_MEMORY]
6997 # compute memory used by instances
6999 "total_memory": remote_info['memory_total'],
7000 "reserved_memory": remote_info['memory_dom0'],
7001 "free_memory": remote_info['memory_free'],
7002 "total_disk": remote_info['vg_size'],
7003 "free_disk": remote_info['vg_free'],
7004 "total_cpus": remote_info['cpu_total'],
7005 "i_pri_memory": i_p_mem,
7006 "i_pri_up_memory": i_p_up_mem,
7010 node_results[nname] = pnr
7011 data["nodes"] = node_results
7015 for iinfo, beinfo in i_list:
7017 for nic in iinfo.nics:
7018 filled_params = objects.FillDict(
7019 cluster_info.nicparams[constants.PP_DEFAULT],
7021 nic_dict = {"mac": nic.mac,
7023 "mode": filled_params[constants.NIC_MODE],
7024 "link": filled_params[constants.NIC_LINK],
7026 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7027 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7028 nic_data.append(nic_dict)
7030 "tags": list(iinfo.GetTags()),
7031 "admin_up": iinfo.admin_up,
7032 "vcpus": beinfo[constants.BE_VCPUS],
7033 "memory": beinfo[constants.BE_MEMORY],
7035 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7037 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7038 "disk_template": iinfo.disk_template,
7039 "hypervisor": iinfo.hypervisor,
7041 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7043 instance_data[iinfo.name] = pir
7045 data["instances"] = instance_data
7049 def _AddNewInstance(self):
7050 """Add new instance data to allocator structure.
7052 This in combination with _AllocatorGetClusterData will create the
7053 correct structure needed as input for the allocator.
7055 The checks for the completeness of the opcode must have already been
7061 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7063 if self.disk_template in constants.DTS_NET_MIRROR:
7064 self.required_nodes = 2
7066 self.required_nodes = 1
7070 "disk_template": self.disk_template,
7073 "vcpus": self.vcpus,
7074 "memory": self.mem_size,
7075 "disks": self.disks,
7076 "disk_space_total": disk_space,
7078 "required_nodes": self.required_nodes,
7080 data["request"] = request
7082 def _AddRelocateInstance(self):
7083 """Add relocate instance data to allocator structure.
7085 This in combination with _IAllocatorGetClusterData will create the
7086 correct structure needed as input for the allocator.
7088 The checks for the completeness of the opcode must have already been
7092 instance = self.lu.cfg.GetInstanceInfo(self.name)
7093 if instance is None:
7094 raise errors.ProgrammerError("Unknown instance '%s' passed to"
7095 " IAllocator" % self.name)
7097 if instance.disk_template not in constants.DTS_NET_MIRROR:
7098 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7100 if len(instance.secondary_nodes) != 1:
7101 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7103 self.required_nodes = 1
7104 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7105 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7110 "disk_space_total": disk_space,
7111 "required_nodes": self.required_nodes,
7112 "relocate_from": self.relocate_from,
7114 self.in_data["request"] = request
7116 def _BuildInputData(self):
7117 """Build input data structures.
7120 self._ComputeClusterData()
7122 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7123 self._AddNewInstance()
7125 self._AddRelocateInstance()
7127 self.in_text = serializer.Dump(self.in_data)
7129 def Run(self, name, validate=True, call_fn=None):
7130 """Run an instance allocator and return the results.
7134 call_fn = self.lu.rpc.call_iallocator_runner
7136 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7137 result.Raise("Failure while running the iallocator script")
7139 self.out_text = result.payload
7141 self._ValidateResult()
7143 def _ValidateResult(self):
7144 """Process the allocator results.
7146 This will process and if successful save the result in
7147 self.out_data and the other parameters.
7151 rdict = serializer.Load(self.out_text)
7152 except Exception, err:
7153 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7155 if not isinstance(rdict, dict):
7156 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7158 for key in "success", "info", "nodes":
7159 if key not in rdict:
7160 raise errors.OpExecError("Can't parse iallocator results:"
7161 " missing key '%s'" % key)
7162 setattr(self, key, rdict[key])
7164 if not isinstance(rdict["nodes"], list):
7165 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7167 self.out_data = rdict
7170 class LUTestAllocator(NoHooksLU):
7171 """Run allocator tests.
7173 This LU runs the allocator tests
7176 _OP_REQP = ["direction", "mode", "name"]
7178 def CheckPrereq(self):
7179 """Check prerequisites.
7181 This checks the opcode parameters depending on the director and mode test.
7184 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7185 for attr in ["name", "mem_size", "disks", "disk_template",
7186 "os", "tags", "nics", "vcpus"]:
7187 if not hasattr(self.op, attr):
7188 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7190 iname = self.cfg.ExpandInstanceName(self.op.name)
7191 if iname is not None:
7192 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7194 if not isinstance(self.op.nics, list):
7195 raise errors.OpPrereqError("Invalid parameter 'nics'")
7196 for row in self.op.nics:
7197 if (not isinstance(row, dict) or
7200 "bridge" not in row):
7201 raise errors.OpPrereqError("Invalid contents of the"
7202 " 'nics' parameter")
7203 if not isinstance(self.op.disks, list):
7204 raise errors.OpPrereqError("Invalid parameter 'disks'")
7205 for row in self.op.disks:
7206 if (not isinstance(row, dict) or
7207 "size" not in row or
7208 not isinstance(row["size"], int) or
7209 "mode" not in row or
7210 row["mode"] not in ['r', 'w']):
7211 raise errors.OpPrereqError("Invalid contents of the"
7212 " 'disks' parameter")
7213 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7214 self.op.hypervisor = self.cfg.GetHypervisorType()
7215 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7216 if not hasattr(self.op, "name"):
7217 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7218 fname = self.cfg.ExpandInstanceName(self.op.name)
7220 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7222 self.op.name = fname
7223 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7225 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7228 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7229 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7230 raise errors.OpPrereqError("Missing allocator name")
7231 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7232 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7235 def Exec(self, feedback_fn):
7236 """Run the allocator test.
7239 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7240 ial = IAllocator(self,
7243 mem_size=self.op.mem_size,
7244 disks=self.op.disks,
7245 disk_template=self.op.disk_template,
7249 vcpus=self.op.vcpus,
7250 hypervisor=self.op.hypervisor,
7253 ial = IAllocator(self,
7256 relocate_from=list(self.relocate_from),
7259 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7260 result = ial.in_text
7262 ial.Run(self.op.allocator, validate=False)
7263 result = ial.out_text