4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 self.dry_run_result = None
95 for attr_name in self._OP_REQP:
96 attr_val = getattr(op, attr_name, None)
98 raise errors.OpPrereqError("Required parameter '%s' missing" %
100 self.CheckArguments()
103 """Returns the SshRunner object
107 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
110 ssh = property(fget=__GetSSH)
112 def CheckArguments(self):
113 """Check syntactic validity for the opcode arguments.
115 This method is for doing a simple syntactic check and ensure
116 validity of opcode parameters, without any cluster-related
117 checks. While the same can be accomplished in ExpandNames and/or
118 CheckPrereq, doing these separate is better because:
120 - ExpandNames is left as as purely a lock-related function
121 - CheckPrereq is run after we have acquired locks (and possible
124 The function is allowed to change the self.op attribute so that
125 later methods can no longer worry about missing parameters.
130 def ExpandNames(self):
131 """Expand names for this LU.
133 This method is called before starting to execute the opcode, and it should
134 update all the parameters of the opcode to their canonical form (e.g. a
135 short node name must be fully expanded after this method has successfully
136 completed). This way locking, hooks, logging, ecc. can work correctly.
138 LUs which implement this method must also populate the self.needed_locks
139 member, as a dict with lock levels as keys, and a list of needed lock names
142 - use an empty dict if you don't need any lock
143 - if you don't need any lock at a particular level omit that level
144 - don't put anything for the BGL level
145 - if you want all locks at a level use locking.ALL_SET as a value
147 If you need to share locks (rather than acquire them exclusively) at one
148 level you can modify self.share_locks, setting a true value (usually 1) for
149 that level. By default locks are not shared.
153 # Acquire all nodes and one instance
154 self.needed_locks = {
155 locking.LEVEL_NODE: locking.ALL_SET,
156 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
158 # Acquire just two nodes
159 self.needed_locks = {
160 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
163 self.needed_locks = {} # No, you can't leave it to the default value None
166 # The implementation of this method is mandatory only if the new LU is
167 # concurrent, so that old LUs don't need to be changed all at the same
170 self.needed_locks = {} # Exclusive LUs don't need locks.
172 raise NotImplementedError
174 def DeclareLocks(self, level):
175 """Declare LU locking needs for a level
177 While most LUs can just declare their locking needs at ExpandNames time,
178 sometimes there's the need to calculate some locks after having acquired
179 the ones before. This function is called just before acquiring locks at a
180 particular level, but after acquiring the ones at lower levels, and permits
181 such calculations. It can be used to modify self.needed_locks, and by
182 default it does nothing.
184 This function is only called if you have something already set in
185 self.needed_locks for the level.
187 @param level: Locking level which is going to be locked
188 @type level: member of ganeti.locking.LEVELS
192 def CheckPrereq(self):
193 """Check prerequisites for this LU.
195 This method should check that the prerequisites for the execution
196 of this LU are fulfilled. It can do internode communication, but
197 it should be idempotent - no cluster or system changes are
200 The method should raise errors.OpPrereqError in case something is
201 not fulfilled. Its return value is ignored.
203 This method should also update all the parameters of the opcode to
204 their canonical form if it hasn't been done by ExpandNames before.
207 raise NotImplementedError
209 def Exec(self, feedback_fn):
212 This method should implement the actual work. It should raise
213 errors.OpExecError for failures that are somewhat dealt with in
217 raise NotImplementedError
219 def BuildHooksEnv(self):
220 """Build hooks environment for this LU.
222 This method should return a three-node tuple consisting of: a dict
223 containing the environment that will be used for running the
224 specific hook for this LU, a list of node names on which the hook
225 should run before the execution, and a list of node names on which
226 the hook should run after the execution.
228 The keys of the dict must not have 'GANETI_' prefixed as this will
229 be handled in the hooks runner. Also note additional keys will be
230 added by the hooks runner. If the LU doesn't define any
231 environment, an empty dict (and not None) should be returned.
233 No nodes should be returned as an empty list (and not None).
235 Note that if the HPATH for a LU class is None, this function will
239 raise NotImplementedError
241 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
242 """Notify the LU about the results of its hooks.
244 This method is called every time a hooks phase is executed, and notifies
245 the Logical Unit about the hooks' result. The LU can then use it to alter
246 its result based on the hooks. By default the method does nothing and the
247 previous result is passed back unchanged but any LU can define it if it
248 wants to use the local cluster hook-scripts somehow.
250 @param phase: one of L{constants.HOOKS_PHASE_POST} or
251 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
252 @param hook_results: the results of the multi-node hooks rpc call
253 @param feedback_fn: function used send feedback back to the caller
254 @param lu_result: the previous Exec result this LU had, or None
256 @return: the new Exec result, based on the previous result
262 def _ExpandAndLockInstance(self):
263 """Helper function to expand and lock an instance.
265 Many LUs that work on an instance take its name in self.op.instance_name
266 and need to expand it and then declare the expanded name for locking. This
267 function does it, and then updates self.op.instance_name to the expanded
268 name. It also initializes needed_locks as a dict, if this hasn't been done
272 if self.needed_locks is None:
273 self.needed_locks = {}
275 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
276 "_ExpandAndLockInstance called with instance-level locks set"
277 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
278 if expanded_name is None:
279 raise errors.OpPrereqError("Instance '%s' not known" %
280 self.op.instance_name)
281 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
282 self.op.instance_name = expanded_name
284 def _LockInstancesNodes(self, primary_only=False):
285 """Helper function to declare instances' nodes for locking.
287 This function should be called after locking one or more instances to lock
288 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
289 with all primary or secondary nodes for instances already locked and
290 present in self.needed_locks[locking.LEVEL_INSTANCE].
292 It should be called from DeclareLocks, and for safety only works if
293 self.recalculate_locks[locking.LEVEL_NODE] is set.
295 In the future it may grow parameters to just lock some instance's nodes, or
296 to just lock primaries or secondary nodes, if needed.
298 If should be called in DeclareLocks in a way similar to::
300 if level == locking.LEVEL_NODE:
301 self._LockInstancesNodes()
303 @type primary_only: boolean
304 @param primary_only: only lock primary nodes of locked instances
307 assert locking.LEVEL_NODE in self.recalculate_locks, \
308 "_LockInstancesNodes helper function called with no nodes to recalculate"
310 # TODO: check if we're really been called with the instance locks held
312 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
313 # future we might want to have different behaviors depending on the value
314 # of self.recalculate_locks[locking.LEVEL_NODE]
316 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
317 instance = self.context.cfg.GetInstanceInfo(instance_name)
318 wanted_nodes.append(instance.primary_node)
320 wanted_nodes.extend(instance.secondary_nodes)
322 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
323 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
324 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
325 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
327 del self.recalculate_locks[locking.LEVEL_NODE]
330 class NoHooksLU(LogicalUnit):
331 """Simple LU which runs no hooks.
333 This LU is intended as a parent for other LogicalUnits which will
334 run no hooks, in order to reduce duplicate code.
341 def _GetWantedNodes(lu, nodes):
342 """Returns list of checked and expanded node names.
344 @type lu: L{LogicalUnit}
345 @param lu: the logical unit on whose behalf we execute
347 @param nodes: list of node names or None for all nodes
349 @return: the list of nodes, sorted
350 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
353 if not isinstance(nodes, list):
354 raise errors.OpPrereqError("Invalid argument type 'nodes'")
357 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
358 " non-empty list of nodes whose name is to be expanded.")
362 node = lu.cfg.ExpandNodeName(name)
364 raise errors.OpPrereqError("No such node name '%s'" % name)
367 return utils.NiceSort(wanted)
370 def _GetWantedInstances(lu, instances):
371 """Returns list of checked and expanded instance names.
373 @type lu: L{LogicalUnit}
374 @param lu: the logical unit on whose behalf we execute
375 @type instances: list
376 @param instances: list of instance names or None for all instances
378 @return: the list of instances, sorted
379 @raise errors.OpPrereqError: if the instances parameter is wrong type
380 @raise errors.OpPrereqError: if any of the passed instances is not found
383 if not isinstance(instances, list):
384 raise errors.OpPrereqError("Invalid argument type 'instances'")
389 for name in instances:
390 instance = lu.cfg.ExpandInstanceName(name)
392 raise errors.OpPrereqError("No such instance name '%s'" % name)
393 wanted.append(instance)
396 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
400 def _CheckOutputFields(static, dynamic, selected):
401 """Checks whether all selected fields are valid.
403 @type static: L{utils.FieldSet}
404 @param static: static fields set
405 @type dynamic: L{utils.FieldSet}
406 @param dynamic: dynamic fields set
413 delta = f.NonMatching(selected)
415 raise errors.OpPrereqError("Unknown output fields selected: %s"
419 def _CheckBooleanOpField(op, name):
420 """Validates boolean opcode parameters.
422 This will ensure that an opcode parameter is either a boolean value,
423 or None (but that it always exists).
426 val = getattr(op, name, None)
427 if not (val is None or isinstance(val, bool)):
428 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
430 setattr(op, name, val)
433 def _CheckNodeOnline(lu, node):
434 """Ensure that a given node is online.
436 @param lu: the LU on behalf of which we make the check
437 @param node: the node to check
438 @raise errors.OpPrereqError: if the node is offline
441 if lu.cfg.GetNodeInfo(node).offline:
442 raise errors.OpPrereqError("Can't use offline node %s" % node)
445 def _CheckNodeNotDrained(lu, node):
446 """Ensure that a given node is not drained.
448 @param lu: the LU on behalf of which we make the check
449 @param node: the node to check
450 @raise errors.OpPrereqError: if the node is drained
453 if lu.cfg.GetNodeInfo(node).drained:
454 raise errors.OpPrereqError("Can't use drained node %s" % node)
457 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
458 memory, vcpus, nics, disk_template, disks,
459 bep, hvp, hypervisor_name):
460 """Builds instance related env variables for hooks
462 This builds the hook environment from individual variables.
465 @param name: the name of the instance
466 @type primary_node: string
467 @param primary_node: the name of the instance's primary node
468 @type secondary_nodes: list
469 @param secondary_nodes: list of secondary nodes as strings
470 @type os_type: string
471 @param os_type: the name of the instance's OS
472 @type status: boolean
473 @param status: the should_run status of the instance
475 @param memory: the memory size of the instance
477 @param vcpus: the count of VCPUs the instance has
479 @param nics: list of tuples (ip, mac, mode, link) representing
480 the NICs the instance has
481 @type disk_template: string
482 @param disk_template: the disk template of the instance
484 @param disks: the list of (size, mode) pairs
486 @param bep: the backend parameters for the instance
488 @param hvp: the hypervisor parameters for the instance
489 @type hypervisor_name: string
490 @param hypervisor_name: the hypervisor for the instance
492 @return: the hook environment for this instance
501 "INSTANCE_NAME": name,
502 "INSTANCE_PRIMARY": primary_node,
503 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
504 "INSTANCE_OS_TYPE": os_type,
505 "INSTANCE_STATUS": str_status,
506 "INSTANCE_MEMORY": memory,
507 "INSTANCE_VCPUS": vcpus,
508 "INSTANCE_DISK_TEMPLATE": disk_template,
509 "INSTANCE_HYPERVISOR": hypervisor_name,
513 nic_count = len(nics)
514 for idx, (ip, mac, mode, link) in enumerate(nics):
517 env["INSTANCE_NIC%d_IP" % idx] = ip
518 env["INSTANCE_NIC%d_MAC" % idx] = mac
519 env["INSTANCE_NIC%d_MODE" % idx] = mode
520 env["INSTANCE_NIC%d_LINK" % idx] = link
521 if mode == constants.NIC_MODE_BRIDGED:
522 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
526 env["INSTANCE_NIC_COUNT"] = nic_count
529 disk_count = len(disks)
530 for idx, (size, mode) in enumerate(disks):
531 env["INSTANCE_DISK%d_SIZE" % idx] = size
532 env["INSTANCE_DISK%d_MODE" % idx] = mode
536 env["INSTANCE_DISK_COUNT"] = disk_count
538 for source, kind in [(bep, "BE"), (hvp, "HV")]:
539 for key, value in source.items():
540 env["INSTANCE_%s_%s" % (kind, key)] = value
544 def _NICListToTuple(lu, nics):
545 """Build a list of nic information tuples.
547 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
548 value in LUQueryInstanceData.
550 @type lu: L{LogicalUnit}
551 @param lu: the logical unit on whose behalf we execute
552 @type nics: list of L{objects.NIC}
553 @param nics: list of nics to convert to hooks tuples
557 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
561 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
562 mode = filled_params[constants.NIC_MODE]
563 link = filled_params[constants.NIC_LINK]
564 hooks_nics.append((ip, mac, mode, link))
567 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
568 """Builds instance related env variables for hooks from an object.
570 @type lu: L{LogicalUnit}
571 @param lu: the logical unit on whose behalf we execute
572 @type instance: L{objects.Instance}
573 @param instance: the instance for which we should build the
576 @param override: dictionary with key/values that will override
579 @return: the hook environment dictionary
582 cluster = lu.cfg.GetClusterInfo()
583 bep = cluster.FillBE(instance)
584 hvp = cluster.FillHV(instance)
586 'name': instance.name,
587 'primary_node': instance.primary_node,
588 'secondary_nodes': instance.secondary_nodes,
589 'os_type': instance.os,
590 'status': instance.admin_up,
591 'memory': bep[constants.BE_MEMORY],
592 'vcpus': bep[constants.BE_VCPUS],
593 'nics': _NICListToTuple(lu, instance.nics),
594 'disk_template': instance.disk_template,
595 'disks': [(disk.size, disk.mode) for disk in instance.disks],
598 'hypervisor_name': instance.hypervisor,
601 args.update(override)
602 return _BuildInstanceHookEnv(**args)
605 def _AdjustCandidatePool(lu):
606 """Adjust the candidate pool after node operations.
609 mod_list = lu.cfg.MaintainCandidatePool()
611 lu.LogInfo("Promoted nodes to master candidate role: %s",
612 ", ".join(node.name for node in mod_list))
613 for name in mod_list:
614 lu.context.ReaddNode(name)
615 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
617 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
621 def _CheckNicsBridgesExist(lu, target_nics, target_node,
622 profile=constants.PP_DEFAULT):
623 """Check that the brigdes needed by a list of nics exist.
626 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
627 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
628 for nic in target_nics]
629 brlist = [params[constants.NIC_LINK] for params in paramslist
630 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
632 result = lu.rpc.call_bridges_exist(target_node, brlist)
633 result.Raise("Error checking bridges on destination node '%s'" %
634 target_node, prereq=True)
637 def _CheckInstanceBridgesExist(lu, instance, node=None):
638 """Check that the brigdes needed by an instance exist.
642 node = instance.primary_node
643 _CheckNicsBridgesExist(lu, instance.nics, node)
646 class LUDestroyCluster(NoHooksLU):
647 """Logical unit for destroying the cluster.
652 def CheckPrereq(self):
653 """Check prerequisites.
655 This checks whether the cluster is empty.
657 Any errors are signaled by raising errors.OpPrereqError.
660 master = self.cfg.GetMasterNode()
662 nodelist = self.cfg.GetNodeList()
663 if len(nodelist) != 1 or nodelist[0] != master:
664 raise errors.OpPrereqError("There are still %d node(s) in"
665 " this cluster." % (len(nodelist) - 1))
666 instancelist = self.cfg.GetInstanceList()
668 raise errors.OpPrereqError("There are still %d instance(s) in"
669 " this cluster." % len(instancelist))
671 def Exec(self, feedback_fn):
672 """Destroys the cluster.
675 master = self.cfg.GetMasterNode()
676 result = self.rpc.call_node_stop_master(master, False)
677 result.Raise("Could not disable the master role")
678 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
679 utils.CreateBackup(priv_key)
680 utils.CreateBackup(pub_key)
684 class LUVerifyCluster(LogicalUnit):
685 """Verifies the cluster status.
688 HPATH = "cluster-verify"
689 HTYPE = constants.HTYPE_CLUSTER
690 _OP_REQP = ["skip_checks"]
693 def ExpandNames(self):
694 self.needed_locks = {
695 locking.LEVEL_NODE: locking.ALL_SET,
696 locking.LEVEL_INSTANCE: locking.ALL_SET,
698 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
700 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
701 node_result, feedback_fn, master_files,
703 """Run multiple tests against a node.
707 - compares ganeti version
708 - checks vg existence and size > 20G
709 - checks config file checksum
710 - checks ssh to other nodes
712 @type nodeinfo: L{objects.Node}
713 @param nodeinfo: the node to check
714 @param file_list: required list of files
715 @param local_cksum: dictionary of local files and their checksums
716 @param node_result: the results from the node
717 @param feedback_fn: function used to accumulate results
718 @param master_files: list of files that only masters should have
719 @param drbd_map: the useddrbd minors for this node, in
720 form of minor: (instance, must_exist) which correspond to instances
721 and their running status
722 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
727 # main result, node_result should be a non-empty dict
728 if not node_result or not isinstance(node_result, dict):
729 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
732 # compares ganeti version
733 local_version = constants.PROTOCOL_VERSION
734 remote_version = node_result.get('version', None)
735 if not (remote_version and isinstance(remote_version, (list, tuple)) and
736 len(remote_version) == 2):
737 feedback_fn(" - ERROR: connection to %s failed" % (node))
740 if local_version != remote_version[0]:
741 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
742 " node %s %s" % (local_version, node, remote_version[0]))
745 # node seems compatible, we can actually try to look into its results
749 # full package version
750 if constants.RELEASE_VERSION != remote_version[1]:
751 feedback_fn(" - WARNING: software version mismatch: master %s,"
753 (constants.RELEASE_VERSION, node, remote_version[1]))
755 # checks vg existence and size > 20G
756 if vg_name is not None:
757 vglist = node_result.get(constants.NV_VGLIST, None)
759 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
763 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
764 constants.MIN_VG_SIZE)
766 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
769 # checks config file checksum
771 remote_cksum = node_result.get(constants.NV_FILELIST, None)
772 if not isinstance(remote_cksum, dict):
774 feedback_fn(" - ERROR: node hasn't returned file checksum data")
776 for file_name in file_list:
777 node_is_mc = nodeinfo.master_candidate
778 must_have_file = file_name not in master_files
779 if file_name not in remote_cksum:
780 if node_is_mc or must_have_file:
782 feedback_fn(" - ERROR: file '%s' missing" % file_name)
783 elif remote_cksum[file_name] != local_cksum[file_name]:
784 if node_is_mc or must_have_file:
786 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
788 # not candidate and this is not a must-have file
790 feedback_fn(" - ERROR: file '%s' should not exist on non master"
791 " candidates (and the file is outdated)" % file_name)
793 # all good, except non-master/non-must have combination
794 if not node_is_mc and not must_have_file:
795 feedback_fn(" - ERROR: file '%s' should not exist on non master"
796 " candidates" % file_name)
800 if constants.NV_NODELIST not in node_result:
802 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
804 if node_result[constants.NV_NODELIST]:
806 for node in node_result[constants.NV_NODELIST]:
807 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
808 (node, node_result[constants.NV_NODELIST][node]))
810 if constants.NV_NODENETTEST not in node_result:
812 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
814 if node_result[constants.NV_NODENETTEST]:
816 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
818 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
819 (node, node_result[constants.NV_NODENETTEST][node]))
821 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
822 if isinstance(hyp_result, dict):
823 for hv_name, hv_result in hyp_result.iteritems():
824 if hv_result is not None:
825 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
826 (hv_name, hv_result))
828 # check used drbd list
829 if vg_name is not None:
830 used_minors = node_result.get(constants.NV_DRBDLIST, [])
831 if not isinstance(used_minors, (tuple, list)):
832 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
835 for minor, (iname, must_exist) in drbd_map.items():
836 if minor not in used_minors and must_exist:
837 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
838 " not active" % (minor, iname))
840 for minor in used_minors:
841 if minor not in drbd_map:
842 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
848 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
849 node_instance, feedback_fn, n_offline):
850 """Verify an instance.
852 This function checks to see if the required block devices are
853 available on the instance's node.
858 node_current = instanceconfig.primary_node
861 instanceconfig.MapLVsByNode(node_vol_should)
863 for node in node_vol_should:
864 if node in n_offline:
865 # ignore missing volumes on offline nodes
867 for volume in node_vol_should[node]:
868 if node not in node_vol_is or volume not in node_vol_is[node]:
869 feedback_fn(" - ERROR: volume %s missing on node %s" %
873 if instanceconfig.admin_up:
874 if ((node_current not in node_instance or
875 not instance in node_instance[node_current]) and
876 node_current not in n_offline):
877 feedback_fn(" - ERROR: instance %s not running on node %s" %
878 (instance, node_current))
881 for node in node_instance:
882 if (not node == node_current):
883 if instance in node_instance[node]:
884 feedback_fn(" - ERROR: instance %s should not run on node %s" %
890 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
891 """Verify if there are any unknown volumes in the cluster.
893 The .os, .swap and backup volumes are ignored. All other volumes are
899 for node in node_vol_is:
900 for volume in node_vol_is[node]:
901 if node not in node_vol_should or volume not in node_vol_should[node]:
902 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
907 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
908 """Verify the list of running instances.
910 This checks what instances are running but unknown to the cluster.
914 for node in node_instance:
915 for runninginstance in node_instance[node]:
916 if runninginstance not in instancelist:
917 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
918 (runninginstance, node))
922 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
923 """Verify N+1 Memory Resilience.
925 Check that if one single node dies we can still start all the instances it
931 for node, nodeinfo in node_info.iteritems():
932 # This code checks that every node which is now listed as secondary has
933 # enough memory to host all instances it is supposed to should a single
934 # other node in the cluster fail.
935 # FIXME: not ready for failover to an arbitrary node
936 # FIXME: does not support file-backed instances
937 # WARNING: we currently take into account down instances as well as up
938 # ones, considering that even if they're down someone might want to start
939 # them even in the event of a node failure.
940 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
942 for instance in instances:
943 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
944 if bep[constants.BE_AUTO_BALANCE]:
945 needed_mem += bep[constants.BE_MEMORY]
946 if nodeinfo['mfree'] < needed_mem:
947 feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
948 " failovers should node %s fail" % (node, prinode))
952 def CheckPrereq(self):
953 """Check prerequisites.
955 Transform the list of checks we're going to skip into a set and check that
956 all its members are valid.
959 self.skip_set = frozenset(self.op.skip_checks)
960 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
961 raise errors.OpPrereqError("Invalid checks to be skipped specified")
963 def BuildHooksEnv(self):
966 Cluster-Verify hooks just ran in the post phase and their failure makes
967 the output be logged in the verify output and the verification to fail.
970 all_nodes = self.cfg.GetNodeList()
972 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
974 for node in self.cfg.GetAllNodesInfo().values():
975 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
977 return env, [], all_nodes
979 def Exec(self, feedback_fn):
980 """Verify integrity of cluster, performing various test on nodes.
984 feedback_fn("* Verifying global settings")
985 for msg in self.cfg.VerifyConfig():
986 feedback_fn(" - ERROR: %s" % msg)
988 vg_name = self.cfg.GetVGName()
989 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
990 nodelist = utils.NiceSort(self.cfg.GetNodeList())
991 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
992 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
993 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
994 for iname in instancelist)
995 i_non_redundant = [] # Non redundant instances
996 i_non_a_balanced = [] # Non auto-balanced instances
997 n_offline = [] # List of offline nodes
998 n_drained = [] # List of nodes being drained
1004 # FIXME: verify OS list
1005 # do local checksums
1006 master_files = [constants.CLUSTER_CONF_FILE]
1008 file_names = ssconf.SimpleStore().GetFileList()
1009 file_names.append(constants.SSL_CERT_FILE)
1010 file_names.append(constants.RAPI_CERT_FILE)
1011 file_names.extend(master_files)
1013 local_checksums = utils.FingerprintFiles(file_names)
1015 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1016 node_verify_param = {
1017 constants.NV_FILELIST: file_names,
1018 constants.NV_NODELIST: [node.name for node in nodeinfo
1019 if not node.offline],
1020 constants.NV_HYPERVISOR: hypervisors,
1021 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1022 node.secondary_ip) for node in nodeinfo
1023 if not node.offline],
1024 constants.NV_INSTANCELIST: hypervisors,
1025 constants.NV_VERSION: None,
1026 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1028 if vg_name is not None:
1029 node_verify_param[constants.NV_VGLIST] = None
1030 node_verify_param[constants.NV_LVLIST] = vg_name
1031 node_verify_param[constants.NV_DRBDLIST] = None
1032 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1033 self.cfg.GetClusterName())
1035 cluster = self.cfg.GetClusterInfo()
1036 master_node = self.cfg.GetMasterNode()
1037 all_drbd_map = self.cfg.ComputeDRBDMap()
1039 for node_i in nodeinfo:
1043 feedback_fn("* Skipping offline node %s" % (node,))
1044 n_offline.append(node)
1047 if node == master_node:
1049 elif node_i.master_candidate:
1050 ntype = "master candidate"
1051 elif node_i.drained:
1053 n_drained.append(node)
1056 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1058 msg = all_nvinfo[node].fail_msg
1060 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1064 nresult = all_nvinfo[node].payload
1066 for minor, instance in all_drbd_map[node].items():
1067 if instance not in instanceinfo:
1068 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1070 # ghost instance should not be running, but otherwise we
1071 # don't give double warnings (both ghost instance and
1072 # unallocated minor in use)
1073 node_drbd[minor] = (instance, False)
1075 instance = instanceinfo[instance]
1076 node_drbd[minor] = (instance.name, instance.admin_up)
1077 result = self._VerifyNode(node_i, file_names, local_checksums,
1078 nresult, feedback_fn, master_files,
1082 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1084 node_volume[node] = {}
1085 elif isinstance(lvdata, basestring):
1086 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1087 (node, utils.SafeEncode(lvdata)))
1089 node_volume[node] = {}
1090 elif not isinstance(lvdata, dict):
1091 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1095 node_volume[node] = lvdata
1098 idata = nresult.get(constants.NV_INSTANCELIST, None)
1099 if not isinstance(idata, list):
1100 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1105 node_instance[node] = idata
1108 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1109 if not isinstance(nodeinfo, dict):
1110 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1116 "mfree": int(nodeinfo['memory_free']),
1119 # dictionary holding all instances this node is secondary for,
1120 # grouped by their primary node. Each key is a cluster node, and each
1121 # value is a list of instances which have the key as primary and the
1122 # current node as secondary. this is handy to calculate N+1 memory
1123 # availability if you can only failover from a primary to its
1125 "sinst-by-pnode": {},
1127 # FIXME: devise a free space model for file based instances as well
1128 if vg_name is not None:
1129 if (constants.NV_VGLIST not in nresult or
1130 vg_name not in nresult[constants.NV_VGLIST]):
1131 feedback_fn(" - ERROR: node %s didn't return data for the"
1132 " volume group '%s' - it is either missing or broken" %
1136 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1137 except (ValueError, KeyError):
1138 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1139 " from node %s" % (node,))
1143 node_vol_should = {}
1145 for instance in instancelist:
1146 feedback_fn("* Verifying instance %s" % instance)
1147 inst_config = instanceinfo[instance]
1148 result = self._VerifyInstance(instance, inst_config, node_volume,
1149 node_instance, feedback_fn, n_offline)
1151 inst_nodes_offline = []
1153 inst_config.MapLVsByNode(node_vol_should)
1155 instance_cfg[instance] = inst_config
1157 pnode = inst_config.primary_node
1158 if pnode in node_info:
1159 node_info[pnode]['pinst'].append(instance)
1160 elif pnode not in n_offline:
1161 feedback_fn(" - ERROR: instance %s, connection to primary node"
1162 " %s failed" % (instance, pnode))
1165 if pnode in n_offline:
1166 inst_nodes_offline.append(pnode)
1168 # If the instance is non-redundant we cannot survive losing its primary
1169 # node, so we are not N+1 compliant. On the other hand we have no disk
1170 # templates with more than one secondary so that situation is not well
1172 # FIXME: does not support file-backed instances
1173 if len(inst_config.secondary_nodes) == 0:
1174 i_non_redundant.append(instance)
1175 elif len(inst_config.secondary_nodes) > 1:
1176 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1179 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1180 i_non_a_balanced.append(instance)
1182 for snode in inst_config.secondary_nodes:
1183 if snode in node_info:
1184 node_info[snode]['sinst'].append(instance)
1185 if pnode not in node_info[snode]['sinst-by-pnode']:
1186 node_info[snode]['sinst-by-pnode'][pnode] = []
1187 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1188 elif snode not in n_offline:
1189 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1190 " %s failed" % (instance, snode))
1192 if snode in n_offline:
1193 inst_nodes_offline.append(snode)
1195 if inst_nodes_offline:
1196 # warn that the instance lives on offline nodes, and set bad=True
1197 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1198 ", ".join(inst_nodes_offline))
1201 feedback_fn("* Verifying orphan volumes")
1202 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1206 feedback_fn("* Verifying remaining instances")
1207 result = self._VerifyOrphanInstances(instancelist, node_instance,
1211 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1212 feedback_fn("* Verifying N+1 Memory redundancy")
1213 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1216 feedback_fn("* Other Notes")
1218 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1219 % len(i_non_redundant))
1221 if i_non_a_balanced:
1222 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1223 % len(i_non_a_balanced))
1226 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1229 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1233 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1234 """Analyze the post-hooks' result
1236 This method analyses the hook result, handles it, and sends some
1237 nicely-formatted feedback back to the user.
1239 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1240 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1241 @param hooks_results: the results of the multi-node hooks rpc call
1242 @param feedback_fn: function used send feedback back to the caller
1243 @param lu_result: previous Exec result
1244 @return: the new Exec result, based on the previous result
1248 # We only really run POST phase hooks, and are only interested in
1250 if phase == constants.HOOKS_PHASE_POST:
1251 # Used to change hooks' output to proper indentation
1252 indent_re = re.compile('^', re.M)
1253 feedback_fn("* Hooks Results")
1254 if not hooks_results:
1255 feedback_fn(" - ERROR: general communication failure")
1258 for node_name in hooks_results:
1259 show_node_header = True
1260 res = hooks_results[node_name]
1264 # no need to warn or set fail return value
1266 feedback_fn(" Communication failure in hooks execution: %s" %
1270 for script, hkr, output in res.payload:
1271 if hkr == constants.HKR_FAIL:
1272 # The node header is only shown once, if there are
1273 # failing hooks on that node
1274 if show_node_header:
1275 feedback_fn(" Node %s:" % node_name)
1276 show_node_header = False
1277 feedback_fn(" ERROR: Script %s failed, output:" % script)
1278 output = indent_re.sub(' ', output)
1279 feedback_fn("%s" % output)
1285 class LUVerifyDisks(NoHooksLU):
1286 """Verifies the cluster disks status.
1292 def ExpandNames(self):
1293 self.needed_locks = {
1294 locking.LEVEL_NODE: locking.ALL_SET,
1295 locking.LEVEL_INSTANCE: locking.ALL_SET,
1297 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1299 def CheckPrereq(self):
1300 """Check prerequisites.
1302 This has no prerequisites.
1307 def Exec(self, feedback_fn):
1308 """Verify integrity of cluster disks.
1310 @rtype: tuple of three items
1311 @return: a tuple of (dict of node-to-node_error, list of instances
1312 which need activate-disks, dict of instance: (node, volume) for
1316 result = res_nodes, res_instances, res_missing = {}, [], {}
1318 vg_name = self.cfg.GetVGName()
1319 nodes = utils.NiceSort(self.cfg.GetNodeList())
1320 instances = [self.cfg.GetInstanceInfo(name)
1321 for name in self.cfg.GetInstanceList()]
1324 for inst in instances:
1326 if (not inst.admin_up or
1327 inst.disk_template not in constants.DTS_NET_MIRROR):
1329 inst.MapLVsByNode(inst_lvs)
1330 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1331 for node, vol_list in inst_lvs.iteritems():
1332 for vol in vol_list:
1333 nv_dict[(node, vol)] = inst
1338 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1342 node_res = node_lvs[node]
1343 if node_res.offline:
1345 msg = node_res.fail_msg
1347 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1348 res_nodes[node] = msg
1351 lvs = node_res.payload
1352 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1353 inst = nv_dict.pop((node, lv_name), None)
1354 if (not lv_online and inst is not None
1355 and inst.name not in res_instances):
1356 res_instances.append(inst.name)
1358 # any leftover items in nv_dict are missing LVs, let's arrange the
1360 for key, inst in nv_dict.iteritems():
1361 if inst.name not in res_missing:
1362 res_missing[inst.name] = []
1363 res_missing[inst.name].append(key)
1368 class LURenameCluster(LogicalUnit):
1369 """Rename the cluster.
1372 HPATH = "cluster-rename"
1373 HTYPE = constants.HTYPE_CLUSTER
1376 def BuildHooksEnv(self):
1381 "OP_TARGET": self.cfg.GetClusterName(),
1382 "NEW_NAME": self.op.name,
1384 mn = self.cfg.GetMasterNode()
1385 return env, [mn], [mn]
1387 def CheckPrereq(self):
1388 """Verify that the passed name is a valid one.
1391 hostname = utils.HostInfo(self.op.name)
1393 new_name = hostname.name
1394 self.ip = new_ip = hostname.ip
1395 old_name = self.cfg.GetClusterName()
1396 old_ip = self.cfg.GetMasterIP()
1397 if new_name == old_name and new_ip == old_ip:
1398 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1399 " cluster has changed")
1400 if new_ip != old_ip:
1401 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1402 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1403 " reachable on the network. Aborting." %
1406 self.op.name = new_name
1408 def Exec(self, feedback_fn):
1409 """Rename the cluster.
1412 clustername = self.op.name
1415 # shutdown the master IP
1416 master = self.cfg.GetMasterNode()
1417 result = self.rpc.call_node_stop_master(master, False)
1418 result.Raise("Could not disable the master role")
1421 cluster = self.cfg.GetClusterInfo()
1422 cluster.cluster_name = clustername
1423 cluster.master_ip = ip
1424 self.cfg.Update(cluster)
1426 # update the known hosts file
1427 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1428 node_list = self.cfg.GetNodeList()
1430 node_list.remove(master)
1433 result = self.rpc.call_upload_file(node_list,
1434 constants.SSH_KNOWN_HOSTS_FILE)
1435 for to_node, to_result in result.iteritems():
1436 msg = to_result.fail_msg
1438 msg = ("Copy of file %s to node %s failed: %s" %
1439 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1440 self.proc.LogWarning(msg)
1443 result = self.rpc.call_node_start_master(master, False, False)
1444 msg = result.fail_msg
1446 self.LogWarning("Could not re-enable the master role on"
1447 " the master, please restart manually: %s", msg)
1450 def _RecursiveCheckIfLVMBased(disk):
1451 """Check if the given disk or its children are lvm-based.
1453 @type disk: L{objects.Disk}
1454 @param disk: the disk to check
1456 @return: boolean indicating whether a LD_LV dev_type was found or not
1460 for chdisk in disk.children:
1461 if _RecursiveCheckIfLVMBased(chdisk):
1463 return disk.dev_type == constants.LD_LV
1466 class LUSetClusterParams(LogicalUnit):
1467 """Change the parameters of the cluster.
1470 HPATH = "cluster-modify"
1471 HTYPE = constants.HTYPE_CLUSTER
1475 def CheckArguments(self):
1479 if not hasattr(self.op, "candidate_pool_size"):
1480 self.op.candidate_pool_size = None
1481 if self.op.candidate_pool_size is not None:
1483 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1484 except (ValueError, TypeError), err:
1485 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1487 if self.op.candidate_pool_size < 1:
1488 raise errors.OpPrereqError("At least one master candidate needed")
1490 def ExpandNames(self):
1491 # FIXME: in the future maybe other cluster params won't require checking on
1492 # all nodes to be modified.
1493 self.needed_locks = {
1494 locking.LEVEL_NODE: locking.ALL_SET,
1496 self.share_locks[locking.LEVEL_NODE] = 1
1498 def BuildHooksEnv(self):
1503 "OP_TARGET": self.cfg.GetClusterName(),
1504 "NEW_VG_NAME": self.op.vg_name,
1506 mn = self.cfg.GetMasterNode()
1507 return env, [mn], [mn]
1509 def CheckPrereq(self):
1510 """Check prerequisites.
1512 This checks whether the given params don't conflict and
1513 if the given volume group is valid.
1516 if self.op.vg_name is not None and not self.op.vg_name:
1517 instances = self.cfg.GetAllInstancesInfo().values()
1518 for inst in instances:
1519 for disk in inst.disks:
1520 if _RecursiveCheckIfLVMBased(disk):
1521 raise errors.OpPrereqError("Cannot disable lvm storage while"
1522 " lvm-based instances exist")
1524 node_list = self.acquired_locks[locking.LEVEL_NODE]
1526 # if vg_name not None, checks given volume group on all nodes
1528 vglist = self.rpc.call_vg_list(node_list)
1529 for node in node_list:
1530 msg = vglist[node].fail_msg
1532 # ignoring down node
1533 self.LogWarning("Error while gathering data on node %s"
1534 " (ignoring node): %s", node, msg)
1536 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1538 constants.MIN_VG_SIZE)
1540 raise errors.OpPrereqError("Error on node '%s': %s" %
1543 self.cluster = cluster = self.cfg.GetClusterInfo()
1544 # validate params changes
1545 if self.op.beparams:
1546 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1547 self.new_beparams = objects.FillDict(
1548 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1550 if self.op.nicparams:
1551 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1552 self.new_nicparams = objects.FillDict(
1553 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1554 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1556 # hypervisor list/parameters
1557 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1558 if self.op.hvparams:
1559 if not isinstance(self.op.hvparams, dict):
1560 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1561 for hv_name, hv_dict in self.op.hvparams.items():
1562 if hv_name not in self.new_hvparams:
1563 self.new_hvparams[hv_name] = hv_dict
1565 self.new_hvparams[hv_name].update(hv_dict)
1567 if self.op.enabled_hypervisors is not None:
1568 self.hv_list = self.op.enabled_hypervisors
1569 if not self.hv_list:
1570 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1571 " least one member")
1572 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1574 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1575 " entries: %s" % invalid_hvs)
1577 self.hv_list = cluster.enabled_hypervisors
1579 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1580 # either the enabled list has changed, or the parameters have, validate
1581 for hv_name, hv_params in self.new_hvparams.items():
1582 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1583 (self.op.enabled_hypervisors and
1584 hv_name in self.op.enabled_hypervisors)):
1585 # either this is a new hypervisor, or its parameters have changed
1586 hv_class = hypervisor.GetHypervisor(hv_name)
1587 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1588 hv_class.CheckParameterSyntax(hv_params)
1589 _CheckHVParams(self, node_list, hv_name, hv_params)
1591 def Exec(self, feedback_fn):
1592 """Change the parameters of the cluster.
1595 if self.op.vg_name is not None:
1596 new_volume = self.op.vg_name
1599 if new_volume != self.cfg.GetVGName():
1600 self.cfg.SetVGName(new_volume)
1602 feedback_fn("Cluster LVM configuration already in desired"
1603 " state, not changing")
1604 if self.op.hvparams:
1605 self.cluster.hvparams = self.new_hvparams
1606 if self.op.enabled_hypervisors is not None:
1607 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1608 if self.op.beparams:
1609 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1610 if self.op.nicparams:
1611 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1613 if self.op.candidate_pool_size is not None:
1614 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1615 # we need to update the pool size here, otherwise the save will fail
1616 _AdjustCandidatePool(self)
1618 self.cfg.Update(self.cluster)
1621 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1622 """Distribute additional files which are part of the cluster configuration.
1624 ConfigWriter takes care of distributing the config and ssconf files, but
1625 there are more files which should be distributed to all nodes. This function
1626 makes sure those are copied.
1628 @param lu: calling logical unit
1629 @param additional_nodes: list of nodes not in the config to distribute to
1632 # 1. Gather target nodes
1633 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1634 dist_nodes = lu.cfg.GetNodeList()
1635 if additional_nodes is not None:
1636 dist_nodes.extend(additional_nodes)
1637 if myself.name in dist_nodes:
1638 dist_nodes.remove(myself.name)
1639 # 2. Gather files to distribute
1640 dist_files = set([constants.ETC_HOSTS,
1641 constants.SSH_KNOWN_HOSTS_FILE,
1642 constants.RAPI_CERT_FILE,
1643 constants.RAPI_USERS_FILE,
1646 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1647 for hv_name in enabled_hypervisors:
1648 hv_class = hypervisor.GetHypervisor(hv_name)
1649 dist_files.update(hv_class.GetAncillaryFiles())
1651 # 3. Perform the files upload
1652 for fname in dist_files:
1653 if os.path.exists(fname):
1654 result = lu.rpc.call_upload_file(dist_nodes, fname)
1655 for to_node, to_result in result.items():
1656 msg = to_result.fail_msg
1658 msg = ("Copy of file %s to node %s failed: %s" %
1659 (fname, to_node, msg))
1660 lu.proc.LogWarning(msg)
1663 class LURedistributeConfig(NoHooksLU):
1664 """Force the redistribution of cluster configuration.
1666 This is a very simple LU.
1672 def ExpandNames(self):
1673 self.needed_locks = {
1674 locking.LEVEL_NODE: locking.ALL_SET,
1676 self.share_locks[locking.LEVEL_NODE] = 1
1678 def CheckPrereq(self):
1679 """Check prerequisites.
1683 def Exec(self, feedback_fn):
1684 """Redistribute the configuration.
1687 self.cfg.Update(self.cfg.GetClusterInfo())
1688 _RedistributeAncillaryFiles(self)
1691 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1692 """Sleep and poll for an instance's disk to sync.
1695 if not instance.disks:
1699 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1701 node = instance.primary_node
1703 for dev in instance.disks:
1704 lu.cfg.SetDiskID(dev, node)
1707 degr_retries = 10 # in seconds, as we sleep 1 second each time
1711 cumul_degraded = False
1712 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1713 msg = rstats.fail_msg
1715 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1718 raise errors.RemoteError("Can't contact node %s for mirror data,"
1719 " aborting." % node)
1722 rstats = rstats.payload
1724 for i, mstat in enumerate(rstats):
1726 lu.LogWarning("Can't compute data for node %s/%s",
1727 node, instance.disks[i].iv_name)
1729 # we ignore the ldisk parameter
1730 perc_done, est_time, is_degraded, _ = mstat
1731 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1732 if perc_done is not None:
1734 if est_time is not None:
1735 rem_time = "%d estimated seconds remaining" % est_time
1738 rem_time = "no time estimate"
1739 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1740 (instance.disks[i].iv_name, perc_done, rem_time))
1742 # if we're done but degraded, let's do a few small retries, to
1743 # make sure we see a stable and not transient situation; therefore
1744 # we force restart of the loop
1745 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1746 logging.info("Degraded disks found, %d retries left", degr_retries)
1754 time.sleep(min(60, max_time))
1757 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1758 return not cumul_degraded
1761 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1762 """Check that mirrors are not degraded.
1764 The ldisk parameter, if True, will change the test from the
1765 is_degraded attribute (which represents overall non-ok status for
1766 the device(s)) to the ldisk (representing the local storage status).
1769 lu.cfg.SetDiskID(dev, node)
1776 if on_primary or dev.AssembleOnSecondary():
1777 rstats = lu.rpc.call_blockdev_find(node, dev)
1778 msg = rstats.fail_msg
1780 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1782 elif not rstats.payload:
1783 lu.LogWarning("Can't find disk on node %s", node)
1786 result = result and (not rstats.payload[idx])
1788 for child in dev.children:
1789 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1794 class LUDiagnoseOS(NoHooksLU):
1795 """Logical unit for OS diagnose/query.
1798 _OP_REQP = ["output_fields", "names"]
1800 _FIELDS_STATIC = utils.FieldSet()
1801 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1803 def ExpandNames(self):
1805 raise errors.OpPrereqError("Selective OS query not supported")
1807 _CheckOutputFields(static=self._FIELDS_STATIC,
1808 dynamic=self._FIELDS_DYNAMIC,
1809 selected=self.op.output_fields)
1811 # Lock all nodes, in shared mode
1812 # Temporary removal of locks, should be reverted later
1813 # TODO: reintroduce locks when they are lighter-weight
1814 self.needed_locks = {}
1815 #self.share_locks[locking.LEVEL_NODE] = 1
1816 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1818 def CheckPrereq(self):
1819 """Check prerequisites.
1824 def _DiagnoseByOS(node_list, rlist):
1825 """Remaps a per-node return list into an a per-os per-node dictionary
1827 @param node_list: a list with the names of all nodes
1828 @param rlist: a map with node names as keys and OS objects as values
1831 @return: a dictionary with osnames as keys and as value another map, with
1832 nodes as keys and tuples of (path, status, diagnose) as values, eg::
1834 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1835 (/srv/..., False, "invalid api")],
1836 "node2": [(/srv/..., True, "")]}
1841 # we build here the list of nodes that didn't fail the RPC (at RPC
1842 # level), so that nodes with a non-responding node daemon don't
1843 # make all OSes invalid
1844 good_nodes = [node_name for node_name in rlist
1845 if not rlist[node_name].fail_msg]
1846 for node_name, nr in rlist.items():
1847 if nr.fail_msg or not nr.payload:
1849 for name, path, status, diagnose in nr.payload:
1850 if name not in all_os:
1851 # build a list of nodes for this os containing empty lists
1852 # for each node in node_list
1854 for nname in good_nodes:
1855 all_os[name][nname] = []
1856 all_os[name][node_name].append((path, status, diagnose))
1859 def Exec(self, feedback_fn):
1860 """Compute the list of OSes.
1863 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1864 node_data = self.rpc.call_os_diagnose(valid_nodes)
1865 pol = self._DiagnoseByOS(valid_nodes, node_data)
1867 for os_name, os_data in pol.items():
1869 for field in self.op.output_fields:
1872 elif field == "valid":
1873 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1874 elif field == "node_status":
1875 # this is just a copy of the dict
1877 for node_name, nos_list in os_data.items():
1878 val[node_name] = nos_list
1880 raise errors.ParameterError(field)
1887 class LURemoveNode(LogicalUnit):
1888 """Logical unit for removing a node.
1891 HPATH = "node-remove"
1892 HTYPE = constants.HTYPE_NODE
1893 _OP_REQP = ["node_name"]
1895 def BuildHooksEnv(self):
1898 This doesn't run on the target node in the pre phase as a failed
1899 node would then be impossible to remove.
1903 "OP_TARGET": self.op.node_name,
1904 "NODE_NAME": self.op.node_name,
1906 all_nodes = self.cfg.GetNodeList()
1907 all_nodes.remove(self.op.node_name)
1908 return env, all_nodes, all_nodes
1910 def CheckPrereq(self):
1911 """Check prerequisites.
1914 - the node exists in the configuration
1915 - it does not have primary or secondary instances
1916 - it's not the master
1918 Any errors are signaled by raising errors.OpPrereqError.
1921 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1923 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1925 instance_list = self.cfg.GetInstanceList()
1927 masternode = self.cfg.GetMasterNode()
1928 if node.name == masternode:
1929 raise errors.OpPrereqError("Node is the master node,"
1930 " you need to failover first.")
1932 for instance_name in instance_list:
1933 instance = self.cfg.GetInstanceInfo(instance_name)
1934 if node.name in instance.all_nodes:
1935 raise errors.OpPrereqError("Instance %s is still running on the node,"
1936 " please remove first." % instance_name)
1937 self.op.node_name = node.name
1940 def Exec(self, feedback_fn):
1941 """Removes the node from the cluster.
1945 logging.info("Stopping the node daemon and removing configs from node %s",
1948 self.context.RemoveNode(node.name)
1950 result = self.rpc.call_node_leave_cluster(node.name)
1951 msg = result.fail_msg
1953 self.LogWarning("Errors encountered on the remote node while leaving"
1954 " the cluster: %s", msg)
1956 # Promote nodes to master candidate as needed
1957 _AdjustCandidatePool(self)
1960 class LUQueryNodes(NoHooksLU):
1961 """Logical unit for querying nodes.
1964 _OP_REQP = ["output_fields", "names", "use_locking"]
1966 _FIELDS_DYNAMIC = utils.FieldSet(
1968 "mtotal", "mnode", "mfree",
1970 "ctotal", "cnodes", "csockets",
1973 _FIELDS_STATIC = utils.FieldSet(
1974 "name", "pinst_cnt", "sinst_cnt",
1975 "pinst_list", "sinst_list",
1976 "pip", "sip", "tags",
1985 def ExpandNames(self):
1986 _CheckOutputFields(static=self._FIELDS_STATIC,
1987 dynamic=self._FIELDS_DYNAMIC,
1988 selected=self.op.output_fields)
1990 self.needed_locks = {}
1991 self.share_locks[locking.LEVEL_NODE] = 1
1994 self.wanted = _GetWantedNodes(self, self.op.names)
1996 self.wanted = locking.ALL_SET
1998 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1999 self.do_locking = self.do_node_query and self.op.use_locking
2001 # if we don't request only static fields, we need to lock the nodes
2002 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2005 def CheckPrereq(self):
2006 """Check prerequisites.
2009 # The validation of the node list is done in the _GetWantedNodes,
2010 # if non empty, and if empty, there's no validation to do
2013 def Exec(self, feedback_fn):
2014 """Computes the list of nodes and their attributes.
2017 all_info = self.cfg.GetAllNodesInfo()
2019 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2020 elif self.wanted != locking.ALL_SET:
2021 nodenames = self.wanted
2022 missing = set(nodenames).difference(all_info.keys())
2024 raise errors.OpExecError(
2025 "Some nodes were removed before retrieving their data: %s" % missing)
2027 nodenames = all_info.keys()
2029 nodenames = utils.NiceSort(nodenames)
2030 nodelist = [all_info[name] for name in nodenames]
2032 # begin data gathering
2034 if self.do_node_query:
2036 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2037 self.cfg.GetHypervisorType())
2038 for name in nodenames:
2039 nodeinfo = node_data[name]
2040 if not nodeinfo.fail_msg and nodeinfo.payload:
2041 nodeinfo = nodeinfo.payload
2042 fn = utils.TryConvert
2044 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2045 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2046 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2047 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2048 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2049 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2050 "bootid": nodeinfo.get('bootid', None),
2051 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2052 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2055 live_data[name] = {}
2057 live_data = dict.fromkeys(nodenames, {})
2059 node_to_primary = dict([(name, set()) for name in nodenames])
2060 node_to_secondary = dict([(name, set()) for name in nodenames])
2062 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2063 "sinst_cnt", "sinst_list"))
2064 if inst_fields & frozenset(self.op.output_fields):
2065 instancelist = self.cfg.GetInstanceList()
2067 for instance_name in instancelist:
2068 inst = self.cfg.GetInstanceInfo(instance_name)
2069 if inst.primary_node in node_to_primary:
2070 node_to_primary[inst.primary_node].add(inst.name)
2071 for secnode in inst.secondary_nodes:
2072 if secnode in node_to_secondary:
2073 node_to_secondary[secnode].add(inst.name)
2075 master_node = self.cfg.GetMasterNode()
2077 # end data gathering
2080 for node in nodelist:
2082 for field in self.op.output_fields:
2085 elif field == "pinst_list":
2086 val = list(node_to_primary[node.name])
2087 elif field == "sinst_list":
2088 val = list(node_to_secondary[node.name])
2089 elif field == "pinst_cnt":
2090 val = len(node_to_primary[node.name])
2091 elif field == "sinst_cnt":
2092 val = len(node_to_secondary[node.name])
2093 elif field == "pip":
2094 val = node.primary_ip
2095 elif field == "sip":
2096 val = node.secondary_ip
2097 elif field == "tags":
2098 val = list(node.GetTags())
2099 elif field == "serial_no":
2100 val = node.serial_no
2101 elif field == "master_candidate":
2102 val = node.master_candidate
2103 elif field == "master":
2104 val = node.name == master_node
2105 elif field == "offline":
2107 elif field == "drained":
2109 elif self._FIELDS_DYNAMIC.Matches(field):
2110 val = live_data[node.name].get(field, None)
2111 elif field == "role":
2112 if node.name == master_node:
2114 elif node.master_candidate:
2123 raise errors.ParameterError(field)
2124 node_output.append(val)
2125 output.append(node_output)
2130 class LUQueryNodeVolumes(NoHooksLU):
2131 """Logical unit for getting volumes on node(s).
2134 _OP_REQP = ["nodes", "output_fields"]
2136 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2137 _FIELDS_STATIC = utils.FieldSet("node")
2139 def ExpandNames(self):
2140 _CheckOutputFields(static=self._FIELDS_STATIC,
2141 dynamic=self._FIELDS_DYNAMIC,
2142 selected=self.op.output_fields)
2144 self.needed_locks = {}
2145 self.share_locks[locking.LEVEL_NODE] = 1
2146 if not self.op.nodes:
2147 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2149 self.needed_locks[locking.LEVEL_NODE] = \
2150 _GetWantedNodes(self, self.op.nodes)
2152 def CheckPrereq(self):
2153 """Check prerequisites.
2155 This checks that the fields required are valid output fields.
2158 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2160 def Exec(self, feedback_fn):
2161 """Computes the list of nodes and their attributes.
2164 nodenames = self.nodes
2165 volumes = self.rpc.call_node_volumes(nodenames)
2167 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2168 in self.cfg.GetInstanceList()]
2170 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2173 for node in nodenames:
2174 nresult = volumes[node]
2177 msg = nresult.fail_msg
2179 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2182 node_vols = nresult.payload[:]
2183 node_vols.sort(key=lambda vol: vol['dev'])
2185 for vol in node_vols:
2187 for field in self.op.output_fields:
2190 elif field == "phys":
2194 elif field == "name":
2196 elif field == "size":
2197 val = int(float(vol['size']))
2198 elif field == "instance":
2200 if node not in lv_by_node[inst]:
2202 if vol['name'] in lv_by_node[inst][node]:
2208 raise errors.ParameterError(field)
2209 node_output.append(str(val))
2211 output.append(node_output)
2216 class LUAddNode(LogicalUnit):
2217 """Logical unit for adding node to the cluster.
2221 HTYPE = constants.HTYPE_NODE
2222 _OP_REQP = ["node_name"]
2224 def BuildHooksEnv(self):
2227 This will run on all nodes before, and on all nodes + the new node after.
2231 "OP_TARGET": self.op.node_name,
2232 "NODE_NAME": self.op.node_name,
2233 "NODE_PIP": self.op.primary_ip,
2234 "NODE_SIP": self.op.secondary_ip,
2236 nodes_0 = self.cfg.GetNodeList()
2237 nodes_1 = nodes_0 + [self.op.node_name, ]
2238 return env, nodes_0, nodes_1
2240 def CheckPrereq(self):
2241 """Check prerequisites.
2244 - the new node is not already in the config
2246 - its parameters (single/dual homed) matches the cluster
2248 Any errors are signaled by raising errors.OpPrereqError.
2251 node_name = self.op.node_name
2254 dns_data = utils.HostInfo(node_name)
2256 node = dns_data.name
2257 primary_ip = self.op.primary_ip = dns_data.ip
2258 secondary_ip = getattr(self.op, "secondary_ip", None)
2259 if secondary_ip is None:
2260 secondary_ip = primary_ip
2261 if not utils.IsValidIP(secondary_ip):
2262 raise errors.OpPrereqError("Invalid secondary IP given")
2263 self.op.secondary_ip = secondary_ip
2265 node_list = cfg.GetNodeList()
2266 if not self.op.readd and node in node_list:
2267 raise errors.OpPrereqError("Node %s is already in the configuration" %
2269 elif self.op.readd and node not in node_list:
2270 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2272 for existing_node_name in node_list:
2273 existing_node = cfg.GetNodeInfo(existing_node_name)
2275 if self.op.readd and node == existing_node_name:
2276 if (existing_node.primary_ip != primary_ip or
2277 existing_node.secondary_ip != secondary_ip):
2278 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2279 " address configuration as before")
2282 if (existing_node.primary_ip == primary_ip or
2283 existing_node.secondary_ip == primary_ip or
2284 existing_node.primary_ip == secondary_ip or
2285 existing_node.secondary_ip == secondary_ip):
2286 raise errors.OpPrereqError("New node ip address(es) conflict with"
2287 " existing node %s" % existing_node.name)
2289 # check that the type of the node (single versus dual homed) is the
2290 # same as for the master
2291 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2292 master_singlehomed = myself.secondary_ip == myself.primary_ip
2293 newbie_singlehomed = secondary_ip == primary_ip
2294 if master_singlehomed != newbie_singlehomed:
2295 if master_singlehomed:
2296 raise errors.OpPrereqError("The master has no private ip but the"
2297 " new node has one")
2299 raise errors.OpPrereqError("The master has a private ip but the"
2300 " new node doesn't have one")
2302 # checks reachability
2303 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2304 raise errors.OpPrereqError("Node not reachable by ping")
2306 if not newbie_singlehomed:
2307 # check reachability from my secondary ip to newbie's secondary ip
2308 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2309 source=myself.secondary_ip):
2310 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2311 " based ping to noded port")
2313 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2318 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2319 # the new node will increase mc_max with one, so:
2320 mc_max = min(mc_max + 1, cp_size)
2321 self.master_candidate = mc_now < mc_max
2324 self.new_node = self.cfg.GetNodeInfo(node)
2325 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2327 self.new_node = objects.Node(name=node,
2328 primary_ip=primary_ip,
2329 secondary_ip=secondary_ip,
2330 master_candidate=self.master_candidate,
2331 offline=False, drained=False)
2333 def Exec(self, feedback_fn):
2334 """Adds the new node to the cluster.
2337 new_node = self.new_node
2338 node = new_node.name
2340 # for re-adds, reset the offline/drained/master-candidate flags;
2341 # we need to reset here, otherwise offline would prevent RPC calls
2342 # later in the procedure; this also means that if the re-add
2343 # fails, we are left with a non-offlined, broken node
2345 new_node.drained = new_node.offline = False
2346 self.LogInfo("Readding a node, the offline/drained flags were reset")
2347 # if we demote the node, we do cleanup later in the procedure
2348 new_node.master_candidate = self.master_candidate
2350 # notify the user about any possible mc promotion
2351 if new_node.master_candidate:
2352 self.LogInfo("Node will be a master candidate")
2354 # check connectivity
2355 result = self.rpc.call_version([node])[node]
2356 result.Raise("Can't get version information from node %s" % node)
2357 if constants.PROTOCOL_VERSION == result.payload:
2358 logging.info("Communication to node %s fine, sw version %s match",
2359 node, result.payload)
2361 raise errors.OpExecError("Version mismatch master version %s,"
2362 " node version %s" %
2363 (constants.PROTOCOL_VERSION, result.payload))
2366 logging.info("Copy ssh key to node %s", node)
2367 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2369 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2370 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2376 keyarray.append(f.read())
2380 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2382 keyarray[3], keyarray[4], keyarray[5])
2383 result.Raise("Cannot transfer ssh keys to the new node")
2385 # Add node to our /etc/hosts, and add key to known_hosts
2386 if self.cfg.GetClusterInfo().modify_etc_hosts:
2387 utils.AddHostToEtcHosts(new_node.name)
2389 if new_node.secondary_ip != new_node.primary_ip:
2390 result = self.rpc.call_node_has_ip_address(new_node.name,
2391 new_node.secondary_ip)
2392 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2394 if not result.payload:
2395 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2396 " you gave (%s). Please fix and re-run this"
2397 " command." % new_node.secondary_ip)
2399 node_verify_list = [self.cfg.GetMasterNode()]
2400 node_verify_param = {
2402 # TODO: do a node-net-test as well?
2405 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2406 self.cfg.GetClusterName())
2407 for verifier in node_verify_list:
2408 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2409 nl_payload = result[verifier].payload['nodelist']
2411 for failed in nl_payload:
2412 feedback_fn("ssh/hostname verification failed %s -> %s" %
2413 (verifier, nl_payload[failed]))
2414 raise errors.OpExecError("ssh/hostname verification failed.")
2417 _RedistributeAncillaryFiles(self)
2418 self.context.ReaddNode(new_node)
2419 # make sure we redistribute the config
2420 self.cfg.Update(new_node)
2421 # and make sure the new node will not have old files around
2422 if not new_node.master_candidate:
2423 result = self.rpc.call_node_demote_from_mc(new_node.name)
2424 msg = result.RemoteFailMsg()
2426 self.LogWarning("Node failed to demote itself from master"
2427 " candidate status: %s" % msg)
2429 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2430 self.context.AddNode(new_node)
2433 class LUSetNodeParams(LogicalUnit):
2434 """Modifies the parameters of a node.
2437 HPATH = "node-modify"
2438 HTYPE = constants.HTYPE_NODE
2439 _OP_REQP = ["node_name"]
2442 def CheckArguments(self):
2443 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2444 if node_name is None:
2445 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2446 self.op.node_name = node_name
2447 _CheckBooleanOpField(self.op, 'master_candidate')
2448 _CheckBooleanOpField(self.op, 'offline')
2449 _CheckBooleanOpField(self.op, 'drained')
2450 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2451 if all_mods.count(None) == 3:
2452 raise errors.OpPrereqError("Please pass at least one modification")
2453 if all_mods.count(True) > 1:
2454 raise errors.OpPrereqError("Can't set the node into more than one"
2455 " state at the same time")
2457 def ExpandNames(self):
2458 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2460 def BuildHooksEnv(self):
2463 This runs on the master node.
2467 "OP_TARGET": self.op.node_name,
2468 "MASTER_CANDIDATE": str(self.op.master_candidate),
2469 "OFFLINE": str(self.op.offline),
2470 "DRAINED": str(self.op.drained),
2472 nl = [self.cfg.GetMasterNode(),
2476 def CheckPrereq(self):
2477 """Check prerequisites.
2479 This only checks the instance list against the existing names.
2482 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2484 if ((self.op.master_candidate == False or self.op.offline == True or
2485 self.op.drained == True) and node.master_candidate):
2486 # we will demote the node from master_candidate
2487 if self.op.node_name == self.cfg.GetMasterNode():
2488 raise errors.OpPrereqError("The master node has to be a"
2489 " master candidate, online and not drained")
2490 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2491 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2492 if num_candidates <= cp_size:
2493 msg = ("Not enough master candidates (desired"
2494 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2496 self.LogWarning(msg)
2498 raise errors.OpPrereqError(msg)
2500 if (self.op.master_candidate == True and
2501 ((node.offline and not self.op.offline == False) or
2502 (node.drained and not self.op.drained == False))):
2503 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2504 " to master_candidate" % node.name)
2508 def Exec(self, feedback_fn):
2517 if self.op.offline is not None:
2518 node.offline = self.op.offline
2519 result.append(("offline", str(self.op.offline)))
2520 if self.op.offline == True:
2521 if node.master_candidate:
2522 node.master_candidate = False
2524 result.append(("master_candidate", "auto-demotion due to offline"))
2526 node.drained = False
2527 result.append(("drained", "clear drained status due to offline"))
2529 if self.op.master_candidate is not None:
2530 node.master_candidate = self.op.master_candidate
2532 result.append(("master_candidate", str(self.op.master_candidate)))
2533 if self.op.master_candidate == False:
2534 rrc = self.rpc.call_node_demote_from_mc(node.name)
2537 self.LogWarning("Node failed to demote itself: %s" % msg)
2539 if self.op.drained is not None:
2540 node.drained = self.op.drained
2541 result.append(("drained", str(self.op.drained)))
2542 if self.op.drained == True:
2543 if node.master_candidate:
2544 node.master_candidate = False
2546 result.append(("master_candidate", "auto-demotion due to drain"))
2547 rrc = self.rpc.call_node_demote_from_mc(node.name)
2548 msg = rrc.RemoteFailMsg()
2550 self.LogWarning("Node failed to demote itself: %s" % msg)
2552 node.offline = False
2553 result.append(("offline", "clear offline status due to drain"))
2555 # this will trigger configuration file update, if needed
2556 self.cfg.Update(node)
2557 # this will trigger job queue propagation or cleanup
2559 self.context.ReaddNode(node)
2564 class LUPowercycleNode(NoHooksLU):
2565 """Powercycles a node.
2568 _OP_REQP = ["node_name", "force"]
2571 def CheckArguments(self):
2572 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2573 if node_name is None:
2574 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2575 self.op.node_name = node_name
2576 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2577 raise errors.OpPrereqError("The node is the master and the force"
2578 " parameter was not set")
2580 def ExpandNames(self):
2581 """Locking for PowercycleNode.
2583 This is a last-resource option and shouldn't block on other
2584 jobs. Therefore, we grab no locks.
2587 self.needed_locks = {}
2589 def CheckPrereq(self):
2590 """Check prerequisites.
2592 This LU has no prereqs.
2597 def Exec(self, feedback_fn):
2601 result = self.rpc.call_node_powercycle(self.op.node_name,
2602 self.cfg.GetHypervisorType())
2603 result.Raise("Failed to schedule the reboot")
2604 return result.payload
2607 class LUQueryClusterInfo(NoHooksLU):
2608 """Query cluster configuration.
2614 def ExpandNames(self):
2615 self.needed_locks = {}
2617 def CheckPrereq(self):
2618 """No prerequsites needed for this LU.
2623 def Exec(self, feedback_fn):
2624 """Return cluster config.
2627 cluster = self.cfg.GetClusterInfo()
2629 "software_version": constants.RELEASE_VERSION,
2630 "protocol_version": constants.PROTOCOL_VERSION,
2631 "config_version": constants.CONFIG_VERSION,
2632 "os_api_version": max(constants.OS_API_VERSIONS),
2633 "export_version": constants.EXPORT_VERSION,
2634 "architecture": (platform.architecture()[0], platform.machine()),
2635 "name": cluster.cluster_name,
2636 "master": cluster.master_node,
2637 "default_hypervisor": cluster.enabled_hypervisors[0],
2638 "enabled_hypervisors": cluster.enabled_hypervisors,
2639 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2640 for hypervisor_name in cluster.enabled_hypervisors]),
2641 "beparams": cluster.beparams,
2642 "nicparams": cluster.nicparams,
2643 "candidate_pool_size": cluster.candidate_pool_size,
2644 "master_netdev": cluster.master_netdev,
2645 "volume_group_name": cluster.volume_group_name,
2646 "file_storage_dir": cluster.file_storage_dir,
2652 class LUQueryConfigValues(NoHooksLU):
2653 """Return configuration values.
2658 _FIELDS_DYNAMIC = utils.FieldSet()
2659 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2661 def ExpandNames(self):
2662 self.needed_locks = {}
2664 _CheckOutputFields(static=self._FIELDS_STATIC,
2665 dynamic=self._FIELDS_DYNAMIC,
2666 selected=self.op.output_fields)
2668 def CheckPrereq(self):
2669 """No prerequisites.
2674 def Exec(self, feedback_fn):
2675 """Dump a representation of the cluster config to the standard output.
2679 for field in self.op.output_fields:
2680 if field == "cluster_name":
2681 entry = self.cfg.GetClusterName()
2682 elif field == "master_node":
2683 entry = self.cfg.GetMasterNode()
2684 elif field == "drain_flag":
2685 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2687 raise errors.ParameterError(field)
2688 values.append(entry)
2692 class LUActivateInstanceDisks(NoHooksLU):
2693 """Bring up an instance's disks.
2696 _OP_REQP = ["instance_name"]
2699 def ExpandNames(self):
2700 self._ExpandAndLockInstance()
2701 self.needed_locks[locking.LEVEL_NODE] = []
2702 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2704 def DeclareLocks(self, level):
2705 if level == locking.LEVEL_NODE:
2706 self._LockInstancesNodes()
2708 def CheckPrereq(self):
2709 """Check prerequisites.
2711 This checks that the instance is in the cluster.
2714 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2715 assert self.instance is not None, \
2716 "Cannot retrieve locked instance %s" % self.op.instance_name
2717 _CheckNodeOnline(self, self.instance.primary_node)
2719 def Exec(self, feedback_fn):
2720 """Activate the disks.
2723 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2725 raise errors.OpExecError("Cannot activate block devices")
2730 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2731 """Prepare the block devices for an instance.
2733 This sets up the block devices on all nodes.
2735 @type lu: L{LogicalUnit}
2736 @param lu: the logical unit on whose behalf we execute
2737 @type instance: L{objects.Instance}
2738 @param instance: the instance for whose disks we assemble
2739 @type ignore_secondaries: boolean
2740 @param ignore_secondaries: if true, errors on secondary nodes
2741 won't result in an error return from the function
2742 @return: False if the operation failed, otherwise a list of
2743 (host, instance_visible_name, node_visible_name)
2744 with the mapping from node devices to instance devices
2749 iname = instance.name
2750 # With the two passes mechanism we try to reduce the window of
2751 # opportunity for the race condition of switching DRBD to primary
2752 # before handshaking occured, but we do not eliminate it
2754 # The proper fix would be to wait (with some limits) until the
2755 # connection has been made and drbd transitions from WFConnection
2756 # into any other network-connected state (Connected, SyncTarget,
2759 # 1st pass, assemble on all nodes in secondary mode
2760 for inst_disk in instance.disks:
2761 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2762 lu.cfg.SetDiskID(node_disk, node)
2763 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2764 msg = result.fail_msg
2766 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2767 " (is_primary=False, pass=1): %s",
2768 inst_disk.iv_name, node, msg)
2769 if not ignore_secondaries:
2772 # FIXME: race condition on drbd migration to primary
2774 # 2nd pass, do only the primary node
2775 for inst_disk in instance.disks:
2776 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2777 if node != instance.primary_node:
2779 lu.cfg.SetDiskID(node_disk, node)
2780 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2781 msg = result.fail_msg
2783 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2784 " (is_primary=True, pass=2): %s",
2785 inst_disk.iv_name, node, msg)
2787 device_info.append((instance.primary_node, inst_disk.iv_name,
2790 # leave the disks configured for the primary node
2791 # this is a workaround that would be fixed better by
2792 # improving the logical/physical id handling
2793 for disk in instance.disks:
2794 lu.cfg.SetDiskID(disk, instance.primary_node)
2796 return disks_ok, device_info
2799 def _StartInstanceDisks(lu, instance, force):
2800 """Start the disks of an instance.
2803 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2804 ignore_secondaries=force)
2806 _ShutdownInstanceDisks(lu, instance)
2807 if force is not None and not force:
2808 lu.proc.LogWarning("", hint="If the message above refers to a"
2810 " you can retry the operation using '--force'.")
2811 raise errors.OpExecError("Disk consistency error")
2814 class LUDeactivateInstanceDisks(NoHooksLU):
2815 """Shutdown an instance's disks.
2818 _OP_REQP = ["instance_name"]
2821 def ExpandNames(self):
2822 self._ExpandAndLockInstance()
2823 self.needed_locks[locking.LEVEL_NODE] = []
2824 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2826 def DeclareLocks(self, level):
2827 if level == locking.LEVEL_NODE:
2828 self._LockInstancesNodes()
2830 def CheckPrereq(self):
2831 """Check prerequisites.
2833 This checks that the instance is in the cluster.
2836 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2837 assert self.instance is not None, \
2838 "Cannot retrieve locked instance %s" % self.op.instance_name
2840 def Exec(self, feedback_fn):
2841 """Deactivate the disks
2844 instance = self.instance
2845 _SafeShutdownInstanceDisks(self, instance)
2848 def _SafeShutdownInstanceDisks(lu, instance):
2849 """Shutdown block devices of an instance.
2851 This function checks if an instance is running, before calling
2852 _ShutdownInstanceDisks.
2855 pnode = instance.primary_node
2856 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2857 ins_l.Raise("Can't contact node %s" % pnode)
2859 if instance.name in ins_l.payload:
2860 raise errors.OpExecError("Instance is running, can't shutdown"
2863 _ShutdownInstanceDisks(lu, instance)
2866 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2867 """Shutdown block devices of an instance.
2869 This does the shutdown on all nodes of the instance.
2871 If the ignore_primary is false, errors on the primary node are
2876 for disk in instance.disks:
2877 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2878 lu.cfg.SetDiskID(top_disk, node)
2879 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2880 msg = result.fail_msg
2882 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2883 disk.iv_name, node, msg)
2884 if not ignore_primary or node != instance.primary_node:
2889 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2890 """Checks if a node has enough free memory.
2892 This function check if a given node has the needed amount of free
2893 memory. In case the node has less memory or we cannot get the
2894 information from the node, this function raise an OpPrereqError
2897 @type lu: C{LogicalUnit}
2898 @param lu: a logical unit from which we get configuration data
2900 @param node: the node to check
2901 @type reason: C{str}
2902 @param reason: string to use in the error message
2903 @type requested: C{int}
2904 @param requested: the amount of memory in MiB to check for
2905 @type hypervisor_name: C{str}
2906 @param hypervisor_name: the hypervisor to ask for memory stats
2907 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2908 we cannot check the node
2911 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2912 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2913 free_mem = nodeinfo[node].payload.get('memory_free', None)
2914 if not isinstance(free_mem, int):
2915 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2916 " was '%s'" % (node, free_mem))
2917 if requested > free_mem:
2918 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2919 " needed %s MiB, available %s MiB" %
2920 (node, reason, requested, free_mem))
2923 class LUStartupInstance(LogicalUnit):
2924 """Starts an instance.
2927 HPATH = "instance-start"
2928 HTYPE = constants.HTYPE_INSTANCE
2929 _OP_REQP = ["instance_name", "force"]
2932 def ExpandNames(self):
2933 self._ExpandAndLockInstance()
2935 def BuildHooksEnv(self):
2938 This runs on master, primary and secondary nodes of the instance.
2942 "FORCE": self.op.force,
2944 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2945 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2948 def CheckPrereq(self):
2949 """Check prerequisites.
2951 This checks that the instance is in the cluster.
2954 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2955 assert self.instance is not None, \
2956 "Cannot retrieve locked instance %s" % self.op.instance_name
2959 self.beparams = getattr(self.op, "beparams", {})
2961 if not isinstance(self.beparams, dict):
2962 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2963 " dict" % (type(self.beparams), ))
2964 # fill the beparams dict
2965 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2966 self.op.beparams = self.beparams
2969 self.hvparams = getattr(self.op, "hvparams", {})
2971 if not isinstance(self.hvparams, dict):
2972 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2973 " dict" % (type(self.hvparams), ))
2975 # check hypervisor parameter syntax (locally)
2976 cluster = self.cfg.GetClusterInfo()
2977 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2978 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2980 filled_hvp.update(self.hvparams)
2981 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2982 hv_type.CheckParameterSyntax(filled_hvp)
2983 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2984 self.op.hvparams = self.hvparams
2986 _CheckNodeOnline(self, instance.primary_node)
2988 bep = self.cfg.GetClusterInfo().FillBE(instance)
2989 # check bridges existence
2990 _CheckInstanceBridgesExist(self, instance)
2992 remote_info = self.rpc.call_instance_info(instance.primary_node,
2994 instance.hypervisor)
2995 remote_info.Raise("Error checking node %s" % instance.primary_node,
2997 if not remote_info.payload: # not running already
2998 _CheckNodeFreeMemory(self, instance.primary_node,
2999 "starting instance %s" % instance.name,
3000 bep[constants.BE_MEMORY], instance.hypervisor)
3002 def Exec(self, feedback_fn):
3003 """Start the instance.
3006 instance = self.instance
3007 force = self.op.force
3009 self.cfg.MarkInstanceUp(instance.name)
3011 node_current = instance.primary_node
3013 _StartInstanceDisks(self, instance, force)
3015 result = self.rpc.call_instance_start(node_current, instance,
3016 self.hvparams, self.beparams)
3017 msg = result.fail_msg
3019 _ShutdownInstanceDisks(self, instance)
3020 raise errors.OpExecError("Could not start instance: %s" % msg)
3023 class LURebootInstance(LogicalUnit):
3024 """Reboot an instance.
3027 HPATH = "instance-reboot"
3028 HTYPE = constants.HTYPE_INSTANCE
3029 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3032 def ExpandNames(self):
3033 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3034 constants.INSTANCE_REBOOT_HARD,
3035 constants.INSTANCE_REBOOT_FULL]:
3036 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3037 (constants.INSTANCE_REBOOT_SOFT,
3038 constants.INSTANCE_REBOOT_HARD,
3039 constants.INSTANCE_REBOOT_FULL))
3040 self._ExpandAndLockInstance()
3042 def BuildHooksEnv(self):
3045 This runs on master, primary and secondary nodes of the instance.
3049 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3050 "REBOOT_TYPE": self.op.reboot_type,
3052 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3053 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3056 def CheckPrereq(self):
3057 """Check prerequisites.
3059 This checks that the instance is in the cluster.
3062 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3063 assert self.instance is not None, \
3064 "Cannot retrieve locked instance %s" % self.op.instance_name
3066 _CheckNodeOnline(self, instance.primary_node)
3068 # check bridges existence
3069 _CheckInstanceBridgesExist(self, instance)
3071 def Exec(self, feedback_fn):
3072 """Reboot the instance.
3075 instance = self.instance
3076 ignore_secondaries = self.op.ignore_secondaries
3077 reboot_type = self.op.reboot_type
3079 node_current = instance.primary_node
3081 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3082 constants.INSTANCE_REBOOT_HARD]:
3083 for disk in instance.disks:
3084 self.cfg.SetDiskID(disk, node_current)
3085 result = self.rpc.call_instance_reboot(node_current, instance,
3087 result.Raise("Could not reboot instance")
3089 result = self.rpc.call_instance_shutdown(node_current, instance)
3090 result.Raise("Could not shutdown instance for full reboot")
3091 _ShutdownInstanceDisks(self, instance)
3092 _StartInstanceDisks(self, instance, ignore_secondaries)
3093 result = self.rpc.call_instance_start(node_current, instance, None, None)
3094 msg = result.fail_msg
3096 _ShutdownInstanceDisks(self, instance)
3097 raise errors.OpExecError("Could not start instance for"
3098 " full reboot: %s" % msg)
3100 self.cfg.MarkInstanceUp(instance.name)
3103 class LUShutdownInstance(LogicalUnit):
3104 """Shutdown an instance.
3107 HPATH = "instance-stop"
3108 HTYPE = constants.HTYPE_INSTANCE
3109 _OP_REQP = ["instance_name"]
3112 def ExpandNames(self):
3113 self._ExpandAndLockInstance()
3115 def BuildHooksEnv(self):
3118 This runs on master, primary and secondary nodes of the instance.
3121 env = _BuildInstanceHookEnvByObject(self, self.instance)
3122 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3125 def CheckPrereq(self):
3126 """Check prerequisites.
3128 This checks that the instance is in the cluster.
3131 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3132 assert self.instance is not None, \
3133 "Cannot retrieve locked instance %s" % self.op.instance_name
3134 _CheckNodeOnline(self, self.instance.primary_node)
3136 def Exec(self, feedback_fn):
3137 """Shutdown the instance.
3140 instance = self.instance
3141 node_current = instance.primary_node
3142 self.cfg.MarkInstanceDown(instance.name)
3143 result = self.rpc.call_instance_shutdown(node_current, instance)
3144 msg = result.fail_msg
3146 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3148 _ShutdownInstanceDisks(self, instance)
3151 class LUReinstallInstance(LogicalUnit):
3152 """Reinstall an instance.
3155 HPATH = "instance-reinstall"
3156 HTYPE = constants.HTYPE_INSTANCE
3157 _OP_REQP = ["instance_name"]
3160 def ExpandNames(self):
3161 self._ExpandAndLockInstance()
3163 def BuildHooksEnv(self):
3166 This runs on master, primary and secondary nodes of the instance.
3169 env = _BuildInstanceHookEnvByObject(self, self.instance)
3170 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3173 def CheckPrereq(self):
3174 """Check prerequisites.
3176 This checks that the instance is in the cluster and is not running.
3179 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3180 assert instance is not None, \
3181 "Cannot retrieve locked instance %s" % self.op.instance_name
3182 _CheckNodeOnline(self, instance.primary_node)
3184 if instance.disk_template == constants.DT_DISKLESS:
3185 raise errors.OpPrereqError("Instance '%s' has no disks" %
3186 self.op.instance_name)
3187 if instance.admin_up:
3188 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3189 self.op.instance_name)
3190 remote_info = self.rpc.call_instance_info(instance.primary_node,
3192 instance.hypervisor)
3193 remote_info.Raise("Error checking node %s" % instance.primary_node,
3195 if remote_info.payload:
3196 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3197 (self.op.instance_name,
3198 instance.primary_node))
3200 self.op.os_type = getattr(self.op, "os_type", None)
3201 if self.op.os_type is not None:
3203 pnode = self.cfg.GetNodeInfo(
3204 self.cfg.ExpandNodeName(instance.primary_node))
3206 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3208 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3209 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3210 (self.op.os_type, pnode.name), prereq=True)
3212 self.instance = instance
3214 def Exec(self, feedback_fn):
3215 """Reinstall the instance.
3218 inst = self.instance
3220 if self.op.os_type is not None:
3221 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3222 inst.os = self.op.os_type
3223 self.cfg.Update(inst)
3225 _StartInstanceDisks(self, inst, None)
3227 feedback_fn("Running the instance OS create scripts...")
3228 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3229 result.Raise("Could not install OS for instance %s on node %s" %
3230 (inst.name, inst.primary_node))
3232 _ShutdownInstanceDisks(self, inst)
3235 class LURenameInstance(LogicalUnit):
3236 """Rename an instance.
3239 HPATH = "instance-rename"
3240 HTYPE = constants.HTYPE_INSTANCE
3241 _OP_REQP = ["instance_name", "new_name"]
3243 def BuildHooksEnv(self):
3246 This runs on master, primary and secondary nodes of the instance.
3249 env = _BuildInstanceHookEnvByObject(self, self.instance)
3250 env["INSTANCE_NEW_NAME"] = self.op.new_name
3251 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3254 def CheckPrereq(self):
3255 """Check prerequisites.
3257 This checks that the instance is in the cluster and is not running.
3260 instance = self.cfg.GetInstanceInfo(
3261 self.cfg.ExpandInstanceName(self.op.instance_name))
3262 if instance is None:
3263 raise errors.OpPrereqError("Instance '%s' not known" %
3264 self.op.instance_name)
3265 _CheckNodeOnline(self, instance.primary_node)
3267 if instance.admin_up:
3268 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3269 self.op.instance_name)
3270 remote_info = self.rpc.call_instance_info(instance.primary_node,
3272 instance.hypervisor)
3273 remote_info.Raise("Error checking node %s" % instance.primary_node,
3275 if remote_info.payload:
3276 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3277 (self.op.instance_name,
3278 instance.primary_node))
3279 self.instance = instance
3281 # new name verification
3282 name_info = utils.HostInfo(self.op.new_name)
3284 self.op.new_name = new_name = name_info.name
3285 instance_list = self.cfg.GetInstanceList()
3286 if new_name in instance_list:
3287 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3290 if not getattr(self.op, "ignore_ip", False):
3291 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3292 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3293 (name_info.ip, new_name))
3296 def Exec(self, feedback_fn):
3297 """Reinstall the instance.
3300 inst = self.instance
3301 old_name = inst.name
3303 if inst.disk_template == constants.DT_FILE:
3304 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3306 self.cfg.RenameInstance(inst.name, self.op.new_name)
3307 # Change the instance lock. This is definitely safe while we hold the BGL
3308 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3309 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3311 # re-read the instance from the configuration after rename
3312 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3314 if inst.disk_template == constants.DT_FILE:
3315 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3316 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3317 old_file_storage_dir,
3318 new_file_storage_dir)
3319 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3320 " (but the instance has been renamed in Ganeti)" %
3321 (inst.primary_node, old_file_storage_dir,
3322 new_file_storage_dir))
3324 _StartInstanceDisks(self, inst, None)
3326 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3328 msg = result.fail_msg
3330 msg = ("Could not run OS rename script for instance %s on node %s"
3331 " (but the instance has been renamed in Ganeti): %s" %
3332 (inst.name, inst.primary_node, msg))
3333 self.proc.LogWarning(msg)
3335 _ShutdownInstanceDisks(self, inst)
3338 class LURemoveInstance(LogicalUnit):
3339 """Remove an instance.
3342 HPATH = "instance-remove"
3343 HTYPE = constants.HTYPE_INSTANCE
3344 _OP_REQP = ["instance_name", "ignore_failures"]
3347 def ExpandNames(self):
3348 self._ExpandAndLockInstance()
3349 self.needed_locks[locking.LEVEL_NODE] = []
3350 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3352 def DeclareLocks(self, level):
3353 if level == locking.LEVEL_NODE:
3354 self._LockInstancesNodes()
3356 def BuildHooksEnv(self):
3359 This runs on master, primary and secondary nodes of the instance.
3362 env = _BuildInstanceHookEnvByObject(self, self.instance)
3363 nl = [self.cfg.GetMasterNode()]
3366 def CheckPrereq(self):
3367 """Check prerequisites.
3369 This checks that the instance is in the cluster.
3372 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3373 assert self.instance is not None, \
3374 "Cannot retrieve locked instance %s" % self.op.instance_name
3376 def Exec(self, feedback_fn):
3377 """Remove the instance.
3380 instance = self.instance
3381 logging.info("Shutting down instance %s on node %s",
3382 instance.name, instance.primary_node)
3384 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3385 msg = result.fail_msg
3387 if self.op.ignore_failures:
3388 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3390 raise errors.OpExecError("Could not shutdown instance %s on"
3392 (instance.name, instance.primary_node, msg))
3394 logging.info("Removing block devices for instance %s", instance.name)
3396 if not _RemoveDisks(self, instance):
3397 if self.op.ignore_failures:
3398 feedback_fn("Warning: can't remove instance's disks")
3400 raise errors.OpExecError("Can't remove instance's disks")
3402 logging.info("Removing instance %s out of cluster config", instance.name)
3404 self.cfg.RemoveInstance(instance.name)
3405 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3408 class LUQueryInstances(NoHooksLU):
3409 """Logical unit for querying instances.
3412 _OP_REQP = ["output_fields", "names", "use_locking"]
3414 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3416 "disk_template", "ip", "mac", "bridge",
3417 "nic_mode", "nic_link",
3418 "sda_size", "sdb_size", "vcpus", "tags",
3419 "network_port", "beparams",
3420 r"(disk)\.(size)/([0-9]+)",
3421 r"(disk)\.(sizes)", "disk_usage",
3422 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3423 r"(nic)\.(bridge)/([0-9]+)",
3424 r"(nic)\.(macs|ips|modes|links|bridges)",
3425 r"(disk|nic)\.(count)",
3426 "serial_no", "hypervisor", "hvparams",] +
3428 for name in constants.HVS_PARAMETERS] +
3430 for name in constants.BES_PARAMETERS])
3431 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3434 def ExpandNames(self):
3435 _CheckOutputFields(static=self._FIELDS_STATIC,
3436 dynamic=self._FIELDS_DYNAMIC,
3437 selected=self.op.output_fields)
3439 self.needed_locks = {}
3440 self.share_locks[locking.LEVEL_INSTANCE] = 1
3441 self.share_locks[locking.LEVEL_NODE] = 1
3444 self.wanted = _GetWantedInstances(self, self.op.names)
3446 self.wanted = locking.ALL_SET
3448 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3449 self.do_locking = self.do_node_query and self.op.use_locking
3451 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3452 self.needed_locks[locking.LEVEL_NODE] = []
3453 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3455 def DeclareLocks(self, level):
3456 if level == locking.LEVEL_NODE and self.do_locking:
3457 self._LockInstancesNodes()
3459 def CheckPrereq(self):
3460 """Check prerequisites.
3465 def Exec(self, feedback_fn):
3466 """Computes the list of nodes and their attributes.
3469 all_info = self.cfg.GetAllInstancesInfo()
3470 if self.wanted == locking.ALL_SET:
3471 # caller didn't specify instance names, so ordering is not important
3473 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3475 instance_names = all_info.keys()
3476 instance_names = utils.NiceSort(instance_names)
3478 # caller did specify names, so we must keep the ordering
3480 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3482 tgt_set = all_info.keys()
3483 missing = set(self.wanted).difference(tgt_set)
3485 raise errors.OpExecError("Some instances were removed before"
3486 " retrieving their data: %s" % missing)
3487 instance_names = self.wanted
3489 instance_list = [all_info[iname] for iname in instance_names]
3491 # begin data gathering
3493 nodes = frozenset([inst.primary_node for inst in instance_list])
3494 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3498 if self.do_node_query:
3500 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3502 result = node_data[name]
3504 # offline nodes will be in both lists
3505 off_nodes.append(name)
3506 if result.failed or result.fail_msg:
3507 bad_nodes.append(name)
3510 live_data.update(result.payload)
3511 # else no instance is alive
3513 live_data = dict([(name, {}) for name in instance_names])
3515 # end data gathering
3520 cluster = self.cfg.GetClusterInfo()
3521 for instance in instance_list:
3523 i_hv = cluster.FillHV(instance)
3524 i_be = cluster.FillBE(instance)
3525 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3526 nic.nicparams) for nic in instance.nics]
3527 for field in self.op.output_fields:
3528 st_match = self._FIELDS_STATIC.Matches(field)
3533 elif field == "pnode":
3534 val = instance.primary_node
3535 elif field == "snodes":
3536 val = list(instance.secondary_nodes)
3537 elif field == "admin_state":
3538 val = instance.admin_up
3539 elif field == "oper_state":
3540 if instance.primary_node in bad_nodes:
3543 val = bool(live_data.get(instance.name))
3544 elif field == "status":
3545 if instance.primary_node in off_nodes:
3546 val = "ERROR_nodeoffline"
3547 elif instance.primary_node in bad_nodes:
3548 val = "ERROR_nodedown"
3550 running = bool(live_data.get(instance.name))
3552 if instance.admin_up:
3557 if instance.admin_up:
3561 elif field == "oper_ram":
3562 if instance.primary_node in bad_nodes:
3564 elif instance.name in live_data:
3565 val = live_data[instance.name].get("memory", "?")
3568 elif field == "vcpus":
3569 val = i_be[constants.BE_VCPUS]
3570 elif field == "disk_template":
3571 val = instance.disk_template
3574 val = instance.nics[0].ip
3577 elif field == "nic_mode":
3579 val = i_nicp[0][constants.NIC_MODE]
3582 elif field == "nic_link":
3584 val = i_nicp[0][constants.NIC_LINK]
3587 elif field == "bridge":
3588 if (instance.nics and
3589 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3590 val = i_nicp[0][constants.NIC_LINK]
3593 elif field == "mac":
3595 val = instance.nics[0].mac
3598 elif field == "sda_size" or field == "sdb_size":
3599 idx = ord(field[2]) - ord('a')
3601 val = instance.FindDisk(idx).size
3602 except errors.OpPrereqError:
3604 elif field == "disk_usage": # total disk usage per node
3605 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3606 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3607 elif field == "tags":
3608 val = list(instance.GetTags())
3609 elif field == "serial_no":
3610 val = instance.serial_no
3611 elif field == "network_port":
3612 val = instance.network_port
3613 elif field == "hypervisor":
3614 val = instance.hypervisor
3615 elif field == "hvparams":
3617 elif (field.startswith(HVPREFIX) and
3618 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3619 val = i_hv.get(field[len(HVPREFIX):], None)
3620 elif field == "beparams":
3622 elif (field.startswith(BEPREFIX) and
3623 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3624 val = i_be.get(field[len(BEPREFIX):], None)
3625 elif st_match and st_match.groups():
3626 # matches a variable list
3627 st_groups = st_match.groups()
3628 if st_groups and st_groups[0] == "disk":
3629 if st_groups[1] == "count":
3630 val = len(instance.disks)
3631 elif st_groups[1] == "sizes":
3632 val = [disk.size for disk in instance.disks]
3633 elif st_groups[1] == "size":
3635 val = instance.FindDisk(st_groups[2]).size
3636 except errors.OpPrereqError:
3639 assert False, "Unhandled disk parameter"
3640 elif st_groups[0] == "nic":
3641 if st_groups[1] == "count":
3642 val = len(instance.nics)
3643 elif st_groups[1] == "macs":
3644 val = [nic.mac for nic in instance.nics]
3645 elif st_groups[1] == "ips":
3646 val = [nic.ip for nic in instance.nics]
3647 elif st_groups[1] == "modes":
3648 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3649 elif st_groups[1] == "links":
3650 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3651 elif st_groups[1] == "bridges":
3654 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3655 val.append(nicp[constants.NIC_LINK])
3660 nic_idx = int(st_groups[2])
3661 if nic_idx >= len(instance.nics):
3664 if st_groups[1] == "mac":
3665 val = instance.nics[nic_idx].mac
3666 elif st_groups[1] == "ip":
3667 val = instance.nics[nic_idx].ip
3668 elif st_groups[1] == "mode":
3669 val = i_nicp[nic_idx][constants.NIC_MODE]
3670 elif st_groups[1] == "link":
3671 val = i_nicp[nic_idx][constants.NIC_LINK]
3672 elif st_groups[1] == "bridge":
3673 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3674 if nic_mode == constants.NIC_MODE_BRIDGED:
3675 val = i_nicp[nic_idx][constants.NIC_LINK]
3679 assert False, "Unhandled NIC parameter"
3681 assert False, ("Declared but unhandled variable parameter '%s'" %
3684 assert False, "Declared but unhandled parameter '%s'" % field
3691 class LUFailoverInstance(LogicalUnit):
3692 """Failover an instance.
3695 HPATH = "instance-failover"
3696 HTYPE = constants.HTYPE_INSTANCE
3697 _OP_REQP = ["instance_name", "ignore_consistency"]
3700 def ExpandNames(self):
3701 self._ExpandAndLockInstance()
3702 self.needed_locks[locking.LEVEL_NODE] = []
3703 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3705 def DeclareLocks(self, level):
3706 if level == locking.LEVEL_NODE:
3707 self._LockInstancesNodes()
3709 def BuildHooksEnv(self):
3712 This runs on master, primary and secondary nodes of the instance.
3716 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3718 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3719 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3722 def CheckPrereq(self):
3723 """Check prerequisites.
3725 This checks that the instance is in the cluster.
3728 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3729 assert self.instance is not None, \
3730 "Cannot retrieve locked instance %s" % self.op.instance_name
3732 bep = self.cfg.GetClusterInfo().FillBE(instance)
3733 if instance.disk_template not in constants.DTS_NET_MIRROR:
3734 raise errors.OpPrereqError("Instance's disk layout is not"
3735 " network mirrored, cannot failover.")
3737 secondary_nodes = instance.secondary_nodes
3738 if not secondary_nodes:
3739 raise errors.ProgrammerError("no secondary node but using "
3740 "a mirrored disk template")
3742 target_node = secondary_nodes[0]
3743 _CheckNodeOnline(self, target_node)
3744 _CheckNodeNotDrained(self, target_node)
3745 if instance.admin_up:
3746 # check memory requirements on the secondary node
3747 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3748 instance.name, bep[constants.BE_MEMORY],
3749 instance.hypervisor)
3751 self.LogInfo("Not checking memory on the secondary node as"
3752 " instance will not be started")
3754 # check bridge existance
3755 _CheckInstanceBridgesExist(self, instance, node=target_node)
3757 def Exec(self, feedback_fn):
3758 """Failover an instance.
3760 The failover is done by shutting it down on its present node and
3761 starting it on the secondary.
3764 instance = self.instance
3766 source_node = instance.primary_node
3767 target_node = instance.secondary_nodes[0]
3769 feedback_fn("* checking disk consistency between source and target")
3770 for dev in instance.disks:
3771 # for drbd, these are drbd over lvm
3772 if not _CheckDiskConsistency(self, dev, target_node, False):
3773 if instance.admin_up and not self.op.ignore_consistency:
3774 raise errors.OpExecError("Disk %s is degraded on target node,"
3775 " aborting failover." % dev.iv_name)
3777 feedback_fn("* shutting down instance on source node")
3778 logging.info("Shutting down instance %s on node %s",
3779 instance.name, source_node)
3781 result = self.rpc.call_instance_shutdown(source_node, instance)
3782 msg = result.fail_msg
3784 if self.op.ignore_consistency:
3785 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3786 " Proceeding anyway. Please make sure node"
3787 " %s is down. Error details: %s",
3788 instance.name, source_node, source_node, msg)
3790 raise errors.OpExecError("Could not shutdown instance %s on"
3792 (instance.name, source_node, msg))
3794 feedback_fn("* deactivating the instance's disks on source node")
3795 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3796 raise errors.OpExecError("Can't shut down the instance's disks.")
3798 instance.primary_node = target_node
3799 # distribute new instance config to the other nodes
3800 self.cfg.Update(instance)
3802 # Only start the instance if it's marked as up
3803 if instance.admin_up:
3804 feedback_fn("* activating the instance's disks on target node")
3805 logging.info("Starting instance %s on node %s",
3806 instance.name, target_node)
3808 disks_ok, _ = _AssembleInstanceDisks(self, instance,
3809 ignore_secondaries=True)
3811 _ShutdownInstanceDisks(self, instance)
3812 raise errors.OpExecError("Can't activate the instance's disks")
3814 feedback_fn("* starting the instance on the target node")
3815 result = self.rpc.call_instance_start(target_node, instance, None, None)
3816 msg = result.fail_msg
3818 _ShutdownInstanceDisks(self, instance)
3819 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3820 (instance.name, target_node, msg))
3823 class LUMigrateInstance(LogicalUnit):
3824 """Migrate an instance.
3826 This is migration without shutting down, compared to the failover,
3827 which is done with shutdown.
3830 HPATH = "instance-migrate"
3831 HTYPE = constants.HTYPE_INSTANCE
3832 _OP_REQP = ["instance_name", "live", "cleanup"]
3836 def ExpandNames(self):
3837 self._ExpandAndLockInstance()
3838 self.needed_locks[locking.LEVEL_NODE] = []
3839 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3841 def DeclareLocks(self, level):
3842 if level == locking.LEVEL_NODE:
3843 self._LockInstancesNodes()
3845 def BuildHooksEnv(self):
3848 This runs on master, primary and secondary nodes of the instance.
3851 env = _BuildInstanceHookEnvByObject(self, self.instance)
3852 env["MIGRATE_LIVE"] = self.op.live
3853 env["MIGRATE_CLEANUP"] = self.op.cleanup
3854 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3857 def CheckPrereq(self):
3858 """Check prerequisites.
3860 This checks that the instance is in the cluster.
3863 instance = self.cfg.GetInstanceInfo(
3864 self.cfg.ExpandInstanceName(self.op.instance_name))
3865 if instance is None:
3866 raise errors.OpPrereqError("Instance '%s' not known" %
3867 self.op.instance_name)
3869 if instance.disk_template != constants.DT_DRBD8:
3870 raise errors.OpPrereqError("Instance's disk layout is not"
3871 " drbd8, cannot migrate.")
3873 secondary_nodes = instance.secondary_nodes
3874 if not secondary_nodes:
3875 raise errors.ConfigurationError("No secondary node but using"
3876 " drbd8 disk template")
3878 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3880 target_node = secondary_nodes[0]
3881 # check memory requirements on the secondary node
3882 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3883 instance.name, i_be[constants.BE_MEMORY],
3884 instance.hypervisor)
3886 # check bridge existance
3887 _CheckInstanceBridgesExist(self, instance, node=target_node)
3889 if not self.op.cleanup:
3890 _CheckNodeNotDrained(self, target_node)
3891 result = self.rpc.call_instance_migratable(instance.primary_node,
3893 result.Raise("Can't migrate, please use failover", prereq=True)
3895 self.instance = instance
3897 def _WaitUntilSync(self):
3898 """Poll with custom rpc for disk sync.
3900 This uses our own step-based rpc call.
3903 self.feedback_fn("* wait until resync is done")
3907 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3909 self.instance.disks)
3911 for node, nres in result.items():
3912 nres.Raise("Cannot resync disks on node %s" % node)
3913 node_done, node_percent = nres.payload
3914 all_done = all_done and node_done
3915 if node_percent is not None:
3916 min_percent = min(min_percent, node_percent)
3918 if min_percent < 100:
3919 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3922 def _EnsureSecondary(self, node):
3923 """Demote a node to secondary.
3926 self.feedback_fn("* switching node %s to secondary mode" % node)
3928 for dev in self.instance.disks:
3929 self.cfg.SetDiskID(dev, node)
3931 result = self.rpc.call_blockdev_close(node, self.instance.name,
3932 self.instance.disks)
3933 result.Raise("Cannot change disk to secondary on node %s" % node)
3935 def _GoStandalone(self):
3936 """Disconnect from the network.
3939 self.feedback_fn("* changing into standalone mode")
3940 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3941 self.instance.disks)
3942 for node, nres in result.items():
3943 nres.Raise("Cannot disconnect disks node %s" % node)
3945 def _GoReconnect(self, multimaster):
3946 """Reconnect to the network.
3952 msg = "single-master"
3953 self.feedback_fn("* changing disks into %s mode" % msg)
3954 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3955 self.instance.disks,
3956 self.instance.name, multimaster)
3957 for node, nres in result.items():
3958 nres.Raise("Cannot change disks config on node %s" % node)
3960 def _ExecCleanup(self):
3961 """Try to cleanup after a failed migration.
3963 The cleanup is done by:
3964 - check that the instance is running only on one node
3965 (and update the config if needed)
3966 - change disks on its secondary node to secondary
3967 - wait until disks are fully synchronized
3968 - disconnect from the network
3969 - change disks into single-master mode
3970 - wait again until disks are fully synchronized
3973 instance = self.instance
3974 target_node = self.target_node
3975 source_node = self.source_node
3977 # check running on only one node
3978 self.feedback_fn("* checking where the instance actually runs"
3979 " (if this hangs, the hypervisor might be in"
3981 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3982 for node, result in ins_l.items():
3983 result.Raise("Can't contact node %s" % node)
3985 runningon_source = instance.name in ins_l[source_node].payload
3986 runningon_target = instance.name in ins_l[target_node].payload
3988 if runningon_source and runningon_target:
3989 raise errors.OpExecError("Instance seems to be running on two nodes,"
3990 " or the hypervisor is confused. You will have"
3991 " to ensure manually that it runs only on one"
3992 " and restart this operation.")
3994 if not (runningon_source or runningon_target):
3995 raise errors.OpExecError("Instance does not seem to be running at all."
3996 " In this case, it's safer to repair by"
3997 " running 'gnt-instance stop' to ensure disk"
3998 " shutdown, and then restarting it.")
4000 if runningon_target:
4001 # the migration has actually succeeded, we need to update the config
4002 self.feedback_fn("* instance running on secondary node (%s),"
4003 " updating config" % target_node)
4004 instance.primary_node = target_node
4005 self.cfg.Update(instance)
4006 demoted_node = source_node
4008 self.feedback_fn("* instance confirmed to be running on its"
4009 " primary node (%s)" % source_node)
4010 demoted_node = target_node
4012 self._EnsureSecondary(demoted_node)
4014 self._WaitUntilSync()
4015 except errors.OpExecError:
4016 # we ignore here errors, since if the device is standalone, it
4017 # won't be able to sync
4019 self._GoStandalone()
4020 self._GoReconnect(False)
4021 self._WaitUntilSync()
4023 self.feedback_fn("* done")
4025 def _RevertDiskStatus(self):
4026 """Try to revert the disk status after a failed migration.
4029 target_node = self.target_node
4031 self._EnsureSecondary(target_node)
4032 self._GoStandalone()
4033 self._GoReconnect(False)
4034 self._WaitUntilSync()
4035 except errors.OpExecError, err:
4036 self.LogWarning("Migration failed and I can't reconnect the"
4037 " drives: error '%s'\n"
4038 "Please look and recover the instance status" %
4041 def _AbortMigration(self):
4042 """Call the hypervisor code to abort a started migration.
4045 instance = self.instance
4046 target_node = self.target_node
4047 migration_info = self.migration_info
4049 abort_result = self.rpc.call_finalize_migration(target_node,
4053 abort_msg = abort_result.fail_msg
4055 logging.error("Aborting migration failed on target node %s: %s" %
4056 (target_node, abort_msg))
4057 # Don't raise an exception here, as we stil have to try to revert the
4058 # disk status, even if this step failed.
4060 def _ExecMigration(self):
4061 """Migrate an instance.
4063 The migrate is done by:
4064 - change the disks into dual-master mode
4065 - wait until disks are fully synchronized again
4066 - migrate the instance
4067 - change disks on the new secondary node (the old primary) to secondary
4068 - wait until disks are fully synchronized
4069 - change disks into single-master mode
4072 instance = self.instance
4073 target_node = self.target_node
4074 source_node = self.source_node
4076 self.feedback_fn("* checking disk consistency between source and target")
4077 for dev in instance.disks:
4078 if not _CheckDiskConsistency(self, dev, target_node, False):
4079 raise errors.OpExecError("Disk %s is degraded or not fully"
4080 " synchronized on target node,"
4081 " aborting migrate." % dev.iv_name)
4083 # First get the migration information from the remote node
4084 result = self.rpc.call_migration_info(source_node, instance)
4085 msg = result.fail_msg
4087 log_err = ("Failed fetching source migration information from %s: %s" %
4089 logging.error(log_err)
4090 raise errors.OpExecError(log_err)
4092 self.migration_info = migration_info = result.payload
4094 # Then switch the disks to master/master mode
4095 self._EnsureSecondary(target_node)
4096 self._GoStandalone()
4097 self._GoReconnect(True)
4098 self._WaitUntilSync()
4100 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4101 result = self.rpc.call_accept_instance(target_node,
4104 self.nodes_ip[target_node])
4106 msg = result.fail_msg
4108 logging.error("Instance pre-migration failed, trying to revert"
4109 " disk status: %s", msg)
4110 self._AbortMigration()
4111 self._RevertDiskStatus()
4112 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4113 (instance.name, msg))
4115 self.feedback_fn("* migrating instance to %s" % target_node)
4117 result = self.rpc.call_instance_migrate(source_node, instance,
4118 self.nodes_ip[target_node],
4120 msg = result.fail_msg
4122 logging.error("Instance migration failed, trying to revert"
4123 " disk status: %s", msg)
4124 self._AbortMigration()
4125 self._RevertDiskStatus()
4126 raise errors.OpExecError("Could not migrate instance %s: %s" %
4127 (instance.name, msg))
4130 instance.primary_node = target_node
4131 # distribute new instance config to the other nodes
4132 self.cfg.Update(instance)
4134 result = self.rpc.call_finalize_migration(target_node,
4138 msg = result.fail_msg
4140 logging.error("Instance migration succeeded, but finalization failed:"
4142 raise errors.OpExecError("Could not finalize instance migration: %s" %
4145 self._EnsureSecondary(source_node)
4146 self._WaitUntilSync()
4147 self._GoStandalone()
4148 self._GoReconnect(False)
4149 self._WaitUntilSync()
4151 self.feedback_fn("* done")
4153 def Exec(self, feedback_fn):
4154 """Perform the migration.
4157 self.feedback_fn = feedback_fn
4159 self.source_node = self.instance.primary_node
4160 self.target_node = self.instance.secondary_nodes[0]
4161 self.all_nodes = [self.source_node, self.target_node]
4163 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4164 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4167 return self._ExecCleanup()
4169 return self._ExecMigration()
4172 def _CreateBlockDev(lu, node, instance, device, force_create,
4174 """Create a tree of block devices on a given node.
4176 If this device type has to be created on secondaries, create it and
4179 If not, just recurse to children keeping the same 'force' value.
4181 @param lu: the lu on whose behalf we execute
4182 @param node: the node on which to create the device
4183 @type instance: L{objects.Instance}
4184 @param instance: the instance which owns the device
4185 @type device: L{objects.Disk}
4186 @param device: the device to create
4187 @type force_create: boolean
4188 @param force_create: whether to force creation of this device; this
4189 will be change to True whenever we find a device which has
4190 CreateOnSecondary() attribute
4191 @param info: the extra 'metadata' we should attach to the device
4192 (this will be represented as a LVM tag)
4193 @type force_open: boolean
4194 @param force_open: this parameter will be passes to the
4195 L{backend.BlockdevCreate} function where it specifies
4196 whether we run on primary or not, and it affects both
4197 the child assembly and the device own Open() execution
4200 if device.CreateOnSecondary():
4204 for child in device.children:
4205 _CreateBlockDev(lu, node, instance, child, force_create,
4208 if not force_create:
4211 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4214 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4215 """Create a single block device on a given node.
4217 This will not recurse over children of the device, so they must be
4220 @param lu: the lu on whose behalf we execute
4221 @param node: the node on which to create the device
4222 @type instance: L{objects.Instance}
4223 @param instance: the instance which owns the device
4224 @type device: L{objects.Disk}
4225 @param device: the device to create
4226 @param info: the extra 'metadata' we should attach to the device
4227 (this will be represented as a LVM tag)
4228 @type force_open: boolean
4229 @param force_open: this parameter will be passes to the
4230 L{backend.BlockdevCreate} function where it specifies
4231 whether we run on primary or not, and it affects both
4232 the child assembly and the device own Open() execution
4235 lu.cfg.SetDiskID(device, node)
4236 result = lu.rpc.call_blockdev_create(node, device, device.size,
4237 instance.name, force_open, info)
4238 result.Raise("Can't create block device %s on"
4239 " node %s for instance %s" % (device, node, instance.name))
4240 if device.physical_id is None:
4241 device.physical_id = result.payload
4244 def _GenerateUniqueNames(lu, exts):
4245 """Generate a suitable LV name.
4247 This will generate a logical volume name for the given instance.
4252 new_id = lu.cfg.GenerateUniqueID()
4253 results.append("%s%s" % (new_id, val))
4257 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4259 """Generate a drbd8 device complete with its children.
4262 port = lu.cfg.AllocatePort()
4263 vgname = lu.cfg.GetVGName()
4264 shared_secret = lu.cfg.GenerateDRBDSecret()
4265 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4266 logical_id=(vgname, names[0]))
4267 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4268 logical_id=(vgname, names[1]))
4269 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4270 logical_id=(primary, secondary, port,
4273 children=[dev_data, dev_meta],
4278 def _GenerateDiskTemplate(lu, template_name,
4279 instance_name, primary_node,
4280 secondary_nodes, disk_info,
4281 file_storage_dir, file_driver,
4283 """Generate the entire disk layout for a given template type.
4286 #TODO: compute space requirements
4288 vgname = lu.cfg.GetVGName()
4289 disk_count = len(disk_info)
4291 if template_name == constants.DT_DISKLESS:
4293 elif template_name == constants.DT_PLAIN:
4294 if len(secondary_nodes) != 0:
4295 raise errors.ProgrammerError("Wrong template configuration")
4297 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4298 for i in range(disk_count)])
4299 for idx, disk in enumerate(disk_info):
4300 disk_index = idx + base_index
4301 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4302 logical_id=(vgname, names[idx]),
4303 iv_name="disk/%d" % disk_index,
4305 disks.append(disk_dev)
4306 elif template_name == constants.DT_DRBD8:
4307 if len(secondary_nodes) != 1:
4308 raise errors.ProgrammerError("Wrong template configuration")
4309 remote_node = secondary_nodes[0]
4310 minors = lu.cfg.AllocateDRBDMinor(
4311 [primary_node, remote_node] * len(disk_info), instance_name)
4314 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4315 for i in range(disk_count)]):
4316 names.append(lv_prefix + "_data")
4317 names.append(lv_prefix + "_meta")
4318 for idx, disk in enumerate(disk_info):
4319 disk_index = idx + base_index
4320 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4321 disk["size"], names[idx*2:idx*2+2],
4322 "disk/%d" % disk_index,
4323 minors[idx*2], minors[idx*2+1])
4324 disk_dev.mode = disk["mode"]
4325 disks.append(disk_dev)
4326 elif template_name == constants.DT_FILE:
4327 if len(secondary_nodes) != 0:
4328 raise errors.ProgrammerError("Wrong template configuration")
4330 for idx, disk in enumerate(disk_info):
4331 disk_index = idx + base_index
4332 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4333 iv_name="disk/%d" % disk_index,
4334 logical_id=(file_driver,
4335 "%s/disk%d" % (file_storage_dir,
4338 disks.append(disk_dev)
4340 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4344 def _GetInstanceInfoText(instance):
4345 """Compute that text that should be added to the disk's metadata.
4348 return "originstname+%s" % instance.name
4351 def _CreateDisks(lu, instance):
4352 """Create all disks for an instance.
4354 This abstracts away some work from AddInstance.
4356 @type lu: L{LogicalUnit}
4357 @param lu: the logical unit on whose behalf we execute
4358 @type instance: L{objects.Instance}
4359 @param instance: the instance whose disks we should create
4361 @return: the success of the creation
4364 info = _GetInstanceInfoText(instance)
4365 pnode = instance.primary_node
4367 if instance.disk_template == constants.DT_FILE:
4368 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4369 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4371 result.Raise("Failed to create directory '%s' on"
4372 " node %s: %s" % (file_storage_dir, pnode))
4374 # Note: this needs to be kept in sync with adding of disks in
4375 # LUSetInstanceParams
4376 for device in instance.disks:
4377 logging.info("Creating volume %s for instance %s",
4378 device.iv_name, instance.name)
4380 for node in instance.all_nodes:
4381 f_create = node == pnode
4382 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4385 def _RemoveDisks(lu, instance):
4386 """Remove all disks for an instance.
4388 This abstracts away some work from `AddInstance()` and
4389 `RemoveInstance()`. Note that in case some of the devices couldn't
4390 be removed, the removal will continue with the other ones (compare
4391 with `_CreateDisks()`).
4393 @type lu: L{LogicalUnit}
4394 @param lu: the logical unit on whose behalf we execute
4395 @type instance: L{objects.Instance}
4396 @param instance: the instance whose disks we should remove
4398 @return: the success of the removal
4401 logging.info("Removing block devices for instance %s", instance.name)
4404 for device in instance.disks:
4405 for node, disk in device.ComputeNodeTree(instance.primary_node):
4406 lu.cfg.SetDiskID(disk, node)
4407 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4409 lu.LogWarning("Could not remove block device %s on node %s,"
4410 " continuing anyway: %s", device.iv_name, node, msg)
4413 if instance.disk_template == constants.DT_FILE:
4414 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4415 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4417 msg = result.fail_msg
4419 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4420 file_storage_dir, instance.primary_node, msg)
4426 def _ComputeDiskSize(disk_template, disks):
4427 """Compute disk size requirements in the volume group
4430 # Required free disk space as a function of disk and swap space
4432 constants.DT_DISKLESS: None,
4433 constants.DT_PLAIN: sum(d["size"] for d in disks),
4434 # 128 MB are added for drbd metadata for each disk
4435 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4436 constants.DT_FILE: None,
4439 if disk_template not in req_size_dict:
4440 raise errors.ProgrammerError("Disk template '%s' size requirement"
4441 " is unknown" % disk_template)
4443 return req_size_dict[disk_template]
4446 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4447 """Hypervisor parameter validation.
4449 This function abstract the hypervisor parameter validation to be
4450 used in both instance create and instance modify.
4452 @type lu: L{LogicalUnit}
4453 @param lu: the logical unit for which we check
4454 @type nodenames: list
4455 @param nodenames: the list of nodes on which we should check
4456 @type hvname: string
4457 @param hvname: the name of the hypervisor we should use
4458 @type hvparams: dict
4459 @param hvparams: the parameters which we need to check
4460 @raise errors.OpPrereqError: if the parameters are not valid
4463 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4466 for node in nodenames:
4470 info.Raise("Hypervisor parameter validation failed on node %s" % node)
4473 class LUCreateInstance(LogicalUnit):
4474 """Create an instance.
4477 HPATH = "instance-add"
4478 HTYPE = constants.HTYPE_INSTANCE
4479 _OP_REQP = ["instance_name", "disks", "disk_template",
4481 "wait_for_sync", "ip_check", "nics",
4482 "hvparams", "beparams"]
4485 def _ExpandNode(self, node):
4486 """Expands and checks one node name.
4489 node_full = self.cfg.ExpandNodeName(node)
4490 if node_full is None:
4491 raise errors.OpPrereqError("Unknown node %s" % node)
4494 def ExpandNames(self):
4495 """ExpandNames for CreateInstance.
4497 Figure out the right locks for instance creation.
4500 self.needed_locks = {}
4502 # set optional parameters to none if they don't exist
4503 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4504 if not hasattr(self.op, attr):
4505 setattr(self.op, attr, None)
4507 # cheap checks, mostly valid constants given
4509 # verify creation mode
4510 if self.op.mode not in (constants.INSTANCE_CREATE,
4511 constants.INSTANCE_IMPORT):
4512 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4515 # disk template and mirror node verification
4516 if self.op.disk_template not in constants.DISK_TEMPLATES:
4517 raise errors.OpPrereqError("Invalid disk template name")
4519 if self.op.hypervisor is None:
4520 self.op.hypervisor = self.cfg.GetHypervisorType()
4522 cluster = self.cfg.GetClusterInfo()
4523 enabled_hvs = cluster.enabled_hypervisors
4524 if self.op.hypervisor not in enabled_hvs:
4525 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4526 " cluster (%s)" % (self.op.hypervisor,
4527 ",".join(enabled_hvs)))
4529 # check hypervisor parameter syntax (locally)
4530 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4531 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4533 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4534 hv_type.CheckParameterSyntax(filled_hvp)
4535 self.hv_full = filled_hvp
4537 # fill and remember the beparams dict
4538 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4539 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4542 #### instance parameters check
4544 # instance name verification
4545 hostname1 = utils.HostInfo(self.op.instance_name)
4546 self.op.instance_name = instance_name = hostname1.name
4548 # this is just a preventive check, but someone might still add this
4549 # instance in the meantime, and creation will fail at lock-add time
4550 if instance_name in self.cfg.GetInstanceList():
4551 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4554 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4558 for idx, nic in enumerate(self.op.nics):
4559 nic_mode_req = nic.get("mode", None)
4560 nic_mode = nic_mode_req
4561 if nic_mode is None:
4562 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4564 # in routed mode, for the first nic, the default ip is 'auto'
4565 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4566 default_ip_mode = constants.VALUE_AUTO
4568 default_ip_mode = constants.VALUE_NONE
4570 # ip validity checks
4571 ip = nic.get("ip", default_ip_mode)
4572 if ip is None or ip.lower() == constants.VALUE_NONE:
4574 elif ip.lower() == constants.VALUE_AUTO:
4575 nic_ip = hostname1.ip
4577 if not utils.IsValidIP(ip):
4578 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4579 " like a valid IP" % ip)
4582 # TODO: check the ip for uniqueness !!
4583 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4584 raise errors.OpPrereqError("Routed nic mode requires an ip address")
4586 # MAC address verification
4587 mac = nic.get("mac", constants.VALUE_AUTO)
4588 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4589 if not utils.IsValidMac(mac.lower()):
4590 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4592 # bridge verification
4593 bridge = nic.get("bridge", None)
4594 link = nic.get("link", None)
4596 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
4597 " at the same time")
4598 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4599 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4605 nicparams[constants.NIC_MODE] = nic_mode_req
4607 nicparams[constants.NIC_LINK] = link
4609 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4611 objects.NIC.CheckParameterSyntax(check_params)
4612 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4614 # disk checks/pre-build
4616 for disk in self.op.disks:
4617 mode = disk.get("mode", constants.DISK_RDWR)
4618 if mode not in constants.DISK_ACCESS_SET:
4619 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4621 size = disk.get("size", None)
4623 raise errors.OpPrereqError("Missing disk size")
4627 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4628 self.disks.append({"size": size, "mode": mode})
4630 # used in CheckPrereq for ip ping check
4631 self.check_ip = hostname1.ip
4633 # file storage checks
4634 if (self.op.file_driver and
4635 not self.op.file_driver in constants.FILE_DRIVER):
4636 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4637 self.op.file_driver)
4639 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4640 raise errors.OpPrereqError("File storage directory path not absolute")
4642 ### Node/iallocator related checks
4643 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4644 raise errors.OpPrereqError("One and only one of iallocator and primary"
4645 " node must be given")
4647 if self.op.iallocator:
4648 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4650 self.op.pnode = self._ExpandNode(self.op.pnode)
4651 nodelist = [self.op.pnode]
4652 if self.op.snode is not None:
4653 self.op.snode = self._ExpandNode(self.op.snode)
4654 nodelist.append(self.op.snode)
4655 self.needed_locks[locking.LEVEL_NODE] = nodelist
4657 # in case of import lock the source node too
4658 if self.op.mode == constants.INSTANCE_IMPORT:
4659 src_node = getattr(self.op, "src_node", None)
4660 src_path = getattr(self.op, "src_path", None)
4662 if src_path is None:
4663 self.op.src_path = src_path = self.op.instance_name
4665 if src_node is None:
4666 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4667 self.op.src_node = None
4668 if os.path.isabs(src_path):
4669 raise errors.OpPrereqError("Importing an instance from an absolute"
4670 " path requires a source node option.")
4672 self.op.src_node = src_node = self._ExpandNode(src_node)
4673 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4674 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4675 if not os.path.isabs(src_path):
4676 self.op.src_path = src_path = \
4677 os.path.join(constants.EXPORT_DIR, src_path)
4679 else: # INSTANCE_CREATE
4680 if getattr(self.op, "os_type", None) is None:
4681 raise errors.OpPrereqError("No guest OS specified")
4683 def _RunAllocator(self):
4684 """Run the allocator based on input opcode.
4687 nics = [n.ToDict() for n in self.nics]
4688 ial = IAllocator(self,
4689 mode=constants.IALLOCATOR_MODE_ALLOC,
4690 name=self.op.instance_name,
4691 disk_template=self.op.disk_template,
4694 vcpus=self.be_full[constants.BE_VCPUS],
4695 mem_size=self.be_full[constants.BE_MEMORY],
4698 hypervisor=self.op.hypervisor,
4701 ial.Run(self.op.iallocator)
4704 raise errors.OpPrereqError("Can't compute nodes using"
4705 " iallocator '%s': %s" % (self.op.iallocator,
4707 if len(ial.nodes) != ial.required_nodes:
4708 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4709 " of nodes (%s), required %s" %
4710 (self.op.iallocator, len(ial.nodes),
4711 ial.required_nodes))
4712 self.op.pnode = ial.nodes[0]
4713 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4714 self.op.instance_name, self.op.iallocator,
4715 ", ".join(ial.nodes))
4716 if ial.required_nodes == 2:
4717 self.op.snode = ial.nodes[1]
4719 def BuildHooksEnv(self):
4722 This runs on master, primary and secondary nodes of the instance.
4726 "ADD_MODE": self.op.mode,
4728 if self.op.mode == constants.INSTANCE_IMPORT:
4729 env["SRC_NODE"] = self.op.src_node
4730 env["SRC_PATH"] = self.op.src_path
4731 env["SRC_IMAGES"] = self.src_images
4733 env.update(_BuildInstanceHookEnv(
4734 name=self.op.instance_name,
4735 primary_node=self.op.pnode,
4736 secondary_nodes=self.secondaries,
4737 status=self.op.start,
4738 os_type=self.op.os_type,
4739 memory=self.be_full[constants.BE_MEMORY],
4740 vcpus=self.be_full[constants.BE_VCPUS],
4741 nics=_NICListToTuple(self, self.nics),
4742 disk_template=self.op.disk_template,
4743 disks=[(d["size"], d["mode"]) for d in self.disks],
4746 hypervisor_name=self.op.hypervisor,
4749 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4754 def CheckPrereq(self):
4755 """Check prerequisites.
4758 if (not self.cfg.GetVGName() and
4759 self.op.disk_template not in constants.DTS_NOT_LVM):
4760 raise errors.OpPrereqError("Cluster does not support lvm-based"
4763 if self.op.mode == constants.INSTANCE_IMPORT:
4764 src_node = self.op.src_node
4765 src_path = self.op.src_path
4767 if src_node is None:
4768 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4769 exp_list = self.rpc.call_export_list(locked_nodes)
4771 for node in exp_list:
4772 if exp_list[node].fail_msg:
4774 if src_path in exp_list[node].payload:
4776 self.op.src_node = src_node = node
4777 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4781 raise errors.OpPrereqError("No export found for relative path %s" %
4784 _CheckNodeOnline(self, src_node)
4785 result = self.rpc.call_export_info(src_node, src_path)
4786 result.Raise("No export or invalid export found in dir %s" % src_path)
4788 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4789 if not export_info.has_section(constants.INISECT_EXP):
4790 raise errors.ProgrammerError("Corrupted export config")
4792 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4793 if (int(ei_version) != constants.EXPORT_VERSION):
4794 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4795 (ei_version, constants.EXPORT_VERSION))
4797 # Check that the new instance doesn't have less disks than the export
4798 instance_disks = len(self.disks)
4799 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4800 if instance_disks < export_disks:
4801 raise errors.OpPrereqError("Not enough disks to import."
4802 " (instance: %d, export: %d)" %
4803 (instance_disks, export_disks))
4805 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4807 for idx in range(export_disks):
4808 option = 'disk%d_dump' % idx
4809 if export_info.has_option(constants.INISECT_INS, option):
4810 # FIXME: are the old os-es, disk sizes, etc. useful?
4811 export_name = export_info.get(constants.INISECT_INS, option)
4812 image = os.path.join(src_path, export_name)
4813 disk_images.append(image)
4815 disk_images.append(False)
4817 self.src_images = disk_images
4819 old_name = export_info.get(constants.INISECT_INS, 'name')
4820 # FIXME: int() here could throw a ValueError on broken exports
4821 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4822 if self.op.instance_name == old_name:
4823 for idx, nic in enumerate(self.nics):
4824 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4825 nic_mac_ini = 'nic%d_mac' % idx
4826 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4828 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4829 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4830 if self.op.start and not self.op.ip_check:
4831 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4832 " adding an instance in start mode")
4834 if self.op.ip_check:
4835 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4836 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4837 (self.check_ip, self.op.instance_name))
4839 #### mac address generation
4840 # By generating here the mac address both the allocator and the hooks get
4841 # the real final mac address rather than the 'auto' or 'generate' value.
4842 # There is a race condition between the generation and the instance object
4843 # creation, which means that we know the mac is valid now, but we're not
4844 # sure it will be when we actually add the instance. If things go bad
4845 # adding the instance will abort because of a duplicate mac, and the
4846 # creation job will fail.
4847 for nic in self.nics:
4848 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4849 nic.mac = self.cfg.GenerateMAC()
4853 if self.op.iallocator is not None:
4854 self._RunAllocator()
4856 #### node related checks
4858 # check primary node
4859 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4860 assert self.pnode is not None, \
4861 "Cannot retrieve locked node %s" % self.op.pnode
4863 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4866 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4869 self.secondaries = []
4871 # mirror node verification
4872 if self.op.disk_template in constants.DTS_NET_MIRROR:
4873 if self.op.snode is None:
4874 raise errors.OpPrereqError("The networked disk templates need"
4876 if self.op.snode == pnode.name:
4877 raise errors.OpPrereqError("The secondary node cannot be"
4878 " the primary node.")
4879 _CheckNodeOnline(self, self.op.snode)
4880 _CheckNodeNotDrained(self, self.op.snode)
4881 self.secondaries.append(self.op.snode)
4883 nodenames = [pnode.name] + self.secondaries
4885 req_size = _ComputeDiskSize(self.op.disk_template,
4888 # Check lv size requirements
4889 if req_size is not None:
4890 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4892 for node in nodenames:
4893 info = nodeinfo[node]
4894 info.Raise("Cannot get current information from node %s" % node)
4896 vg_free = info.get('vg_free', None)
4897 if not isinstance(vg_free, int):
4898 raise errors.OpPrereqError("Can't compute free disk space on"
4900 if req_size > vg_free:
4901 raise errors.OpPrereqError("Not enough disk space on target node %s."
4902 " %d MB available, %d MB required" %
4903 (node, vg_free, req_size))
4905 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4908 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4909 result.Raise("OS '%s' not in supported os list for primary node %s" %
4910 (self.op.os_type, pnode.name), prereq=True)
4912 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4914 # memory check on primary node
4916 _CheckNodeFreeMemory(self, self.pnode.name,
4917 "creating instance %s" % self.op.instance_name,
4918 self.be_full[constants.BE_MEMORY],
4921 self.dry_run_result = list(nodenames)
4923 def Exec(self, feedback_fn):
4924 """Create and add the instance to the cluster.
4927 instance = self.op.instance_name
4928 pnode_name = self.pnode.name
4930 ht_kind = self.op.hypervisor
4931 if ht_kind in constants.HTS_REQ_PORT:
4932 network_port = self.cfg.AllocatePort()
4936 ##if self.op.vnc_bind_address is None:
4937 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4939 # this is needed because os.path.join does not accept None arguments
4940 if self.op.file_storage_dir is None:
4941 string_file_storage_dir = ""
4943 string_file_storage_dir = self.op.file_storage_dir
4945 # build the full file storage dir path
4946 file_storage_dir = os.path.normpath(os.path.join(
4947 self.cfg.GetFileStorageDir(),
4948 string_file_storage_dir, instance))
4951 disks = _GenerateDiskTemplate(self,
4952 self.op.disk_template,
4953 instance, pnode_name,
4957 self.op.file_driver,
4960 iobj = objects.Instance(name=instance, os=self.op.os_type,
4961 primary_node=pnode_name,
4962 nics=self.nics, disks=disks,
4963 disk_template=self.op.disk_template,
4965 network_port=network_port,
4966 beparams=self.op.beparams,
4967 hvparams=self.op.hvparams,
4968 hypervisor=self.op.hypervisor,
4971 feedback_fn("* creating instance disks...")
4973 _CreateDisks(self, iobj)
4974 except errors.OpExecError:
4975 self.LogWarning("Device creation failed, reverting...")
4977 _RemoveDisks(self, iobj)
4979 self.cfg.ReleaseDRBDMinors(instance)
4982 feedback_fn("adding instance %s to cluster config" % instance)
4984 self.cfg.AddInstance(iobj)
4985 # Declare that we don't want to remove the instance lock anymore, as we've
4986 # added the instance to the config
4987 del self.remove_locks[locking.LEVEL_INSTANCE]
4988 # Unlock all the nodes
4989 if self.op.mode == constants.INSTANCE_IMPORT:
4990 nodes_keep = [self.op.src_node]
4991 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4992 if node != self.op.src_node]
4993 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4994 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4996 self.context.glm.release(locking.LEVEL_NODE)
4997 del self.acquired_locks[locking.LEVEL_NODE]
4999 if self.op.wait_for_sync:
5000 disk_abort = not _WaitForSync(self, iobj)
5001 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5002 # make sure the disks are not degraded (still sync-ing is ok)
5004 feedback_fn("* checking mirrors status")
5005 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5010 _RemoveDisks(self, iobj)
5011 self.cfg.RemoveInstance(iobj.name)
5012 # Make sure the instance lock gets removed
5013 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5014 raise errors.OpExecError("There are some degraded disks for"
5017 feedback_fn("creating os for instance %s on node %s" %
5018 (instance, pnode_name))
5020 if iobj.disk_template != constants.DT_DISKLESS:
5021 if self.op.mode == constants.INSTANCE_CREATE:
5022 feedback_fn("* running the instance OS create scripts...")
5023 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5024 result.Raise("Could not add os for instance %s"
5025 " on node %s" % (instance, pnode_name))
5027 elif self.op.mode == constants.INSTANCE_IMPORT:
5028 feedback_fn("* running the instance OS import scripts...")
5029 src_node = self.op.src_node
5030 src_images = self.src_images
5031 cluster_name = self.cfg.GetClusterName()
5032 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5033 src_node, src_images,
5035 msg = import_result.fail_msg
5037 self.LogWarning("Error while importing the disk images for instance"
5038 " %s on node %s: %s" % (instance, pnode_name, msg))
5040 # also checked in the prereq part
5041 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5045 iobj.admin_up = True
5046 self.cfg.Update(iobj)
5047 logging.info("Starting instance %s on node %s", instance, pnode_name)
5048 feedback_fn("* starting instance...")
5049 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5050 result.Raise("Could not start instance")
5052 return list(iobj.all_nodes)
5055 class LUConnectConsole(NoHooksLU):
5056 """Connect to an instance's console.
5058 This is somewhat special in that it returns the command line that
5059 you need to run on the master node in order to connect to the
5063 _OP_REQP = ["instance_name"]
5066 def ExpandNames(self):
5067 self._ExpandAndLockInstance()
5069 def CheckPrereq(self):
5070 """Check prerequisites.
5072 This checks that the instance is in the cluster.
5075 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5076 assert self.instance is not None, \
5077 "Cannot retrieve locked instance %s" % self.op.instance_name
5078 _CheckNodeOnline(self, self.instance.primary_node)
5080 def Exec(self, feedback_fn):
5081 """Connect to the console of an instance
5084 instance = self.instance
5085 node = instance.primary_node
5087 node_insts = self.rpc.call_instance_list([node],
5088 [instance.hypervisor])[node]
5089 node_insts.Raise("Can't get node information from %s" % node)
5091 if instance.name not in node_insts.payload:
5092 raise errors.OpExecError("Instance %s is not running." % instance.name)
5094 logging.debug("Connecting to console of %s on %s", instance.name, node)
5096 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5097 cluster = self.cfg.GetClusterInfo()
5098 # beparams and hvparams are passed separately, to avoid editing the
5099 # instance and then saving the defaults in the instance itself.
5100 hvparams = cluster.FillHV(instance)
5101 beparams = cluster.FillBE(instance)
5102 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5105 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5108 class LUReplaceDisks(LogicalUnit):
5109 """Replace the disks of an instance.
5112 HPATH = "mirrors-replace"
5113 HTYPE = constants.HTYPE_INSTANCE
5114 _OP_REQP = ["instance_name", "mode", "disks"]
5117 def CheckArguments(self):
5118 if not hasattr(self.op, "remote_node"):
5119 self.op.remote_node = None
5120 if not hasattr(self.op, "iallocator"):
5121 self.op.iallocator = None
5123 # check for valid parameter combination
5124 cnt = [self.op.remote_node, self.op.iallocator].count(None)
5125 if self.op.mode == constants.REPLACE_DISK_CHG:
5127 raise errors.OpPrereqError("When changing the secondary either an"
5128 " iallocator script must be used or the"
5131 raise errors.OpPrereqError("Give either the iallocator or the new"
5132 " secondary, not both")
5133 else: # not replacing the secondary
5135 raise errors.OpPrereqError("The iallocator and new node options can"
5136 " be used only when changing the"
5139 def ExpandNames(self):
5140 self._ExpandAndLockInstance()
5142 if self.op.iallocator is not None:
5143 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5144 elif self.op.remote_node is not None:
5145 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5146 if remote_node is None:
5147 raise errors.OpPrereqError("Node '%s' not known" %
5148 self.op.remote_node)
5149 self.op.remote_node = remote_node
5150 # Warning: do not remove the locking of the new secondary here
5151 # unless DRBD8.AddChildren is changed to work in parallel;
5152 # currently it doesn't since parallel invocations of
5153 # FindUnusedMinor will conflict
5154 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5155 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5157 self.needed_locks[locking.LEVEL_NODE] = []
5158 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5160 def DeclareLocks(self, level):
5161 # If we're not already locking all nodes in the set we have to declare the
5162 # instance's primary/secondary nodes.
5163 if (level == locking.LEVEL_NODE and
5164 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5165 self._LockInstancesNodes()
5167 def _RunAllocator(self):
5168 """Compute a new secondary node using an IAllocator.
5171 ial = IAllocator(self,
5172 mode=constants.IALLOCATOR_MODE_RELOC,
5173 name=self.op.instance_name,
5174 relocate_from=[self.sec_node])
5176 ial.Run(self.op.iallocator)
5179 raise errors.OpPrereqError("Can't compute nodes using"
5180 " iallocator '%s': %s" % (self.op.iallocator,
5182 if len(ial.nodes) != ial.required_nodes:
5183 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5184 " of nodes (%s), required %s" %
5185 (len(ial.nodes), ial.required_nodes))
5186 self.op.remote_node = ial.nodes[0]
5187 self.LogInfo("Selected new secondary for the instance: %s",
5188 self.op.remote_node)
5190 def BuildHooksEnv(self):
5193 This runs on the master, the primary and all the secondaries.
5197 "MODE": self.op.mode,
5198 "NEW_SECONDARY": self.op.remote_node,
5199 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5201 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5203 self.cfg.GetMasterNode(),
5204 self.instance.primary_node,
5206 if self.op.remote_node is not None:
5207 nl.append(self.op.remote_node)
5210 def CheckPrereq(self):
5211 """Check prerequisites.
5213 This checks that the instance is in the cluster.
5216 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5217 assert instance is not None, \
5218 "Cannot retrieve locked instance %s" % self.op.instance_name
5219 self.instance = instance
5221 if instance.disk_template != constants.DT_DRBD8:
5222 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5225 if len(instance.secondary_nodes) != 1:
5226 raise errors.OpPrereqError("The instance has a strange layout,"
5227 " expected one secondary but found %d" %
5228 len(instance.secondary_nodes))
5230 self.sec_node = instance.secondary_nodes[0]
5232 if self.op.iallocator is not None:
5233 self._RunAllocator()
5235 remote_node = self.op.remote_node
5236 if remote_node is not None:
5237 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5238 assert self.remote_node_info is not None, \
5239 "Cannot retrieve locked node %s" % remote_node
5241 self.remote_node_info = None
5242 if remote_node == instance.primary_node:
5243 raise errors.OpPrereqError("The specified node is the primary node of"
5245 elif remote_node == self.sec_node:
5246 raise errors.OpPrereqError("The specified node is already the"
5247 " secondary node of the instance.")
5249 if self.op.mode == constants.REPLACE_DISK_PRI:
5250 n1 = self.tgt_node = instance.primary_node
5251 n2 = self.oth_node = self.sec_node
5252 elif self.op.mode == constants.REPLACE_DISK_SEC:
5253 n1 = self.tgt_node = self.sec_node
5254 n2 = self.oth_node = instance.primary_node
5255 elif self.op.mode == constants.REPLACE_DISK_CHG:
5256 n1 = self.new_node = remote_node
5257 n2 = self.oth_node = instance.primary_node
5258 self.tgt_node = self.sec_node
5259 _CheckNodeNotDrained(self, remote_node)
5261 raise errors.ProgrammerError("Unhandled disk replace mode")
5263 _CheckNodeOnline(self, n1)
5264 _CheckNodeOnline(self, n2)
5266 if not self.op.disks:
5267 self.op.disks = range(len(instance.disks))
5269 for disk_idx in self.op.disks:
5270 instance.FindDisk(disk_idx)
5272 def _ExecD8DiskOnly(self, feedback_fn):
5273 """Replace a disk on the primary or secondary for dbrd8.
5275 The algorithm for replace is quite complicated:
5277 1. for each disk to be replaced:
5279 1. create new LVs on the target node with unique names
5280 1. detach old LVs from the drbd device
5281 1. rename old LVs to name_replaced.<time_t>
5282 1. rename new LVs to old LVs
5283 1. attach the new LVs (with the old names now) to the drbd device
5285 1. wait for sync across all devices
5287 1. for each modified disk:
5289 1. remove old LVs (which have the name name_replaces.<time_t>)
5291 Failures are not very well handled.
5295 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5296 instance = self.instance
5298 vgname = self.cfg.GetVGName()
5301 tgt_node = self.tgt_node
5302 oth_node = self.oth_node
5304 # Step: check device activation
5305 self.proc.LogStep(1, steps_total, "check device existence")
5306 info("checking volume groups")
5307 my_vg = cfg.GetVGName()
5308 results = self.rpc.call_vg_list([oth_node, tgt_node])
5310 raise errors.OpExecError("Can't list volume groups on the nodes")
5311 for node in oth_node, tgt_node:
5313 res.Raise("Error checking node %s" % node)
5314 if my_vg not in res.payload:
5315 raise errors.OpExecError("Volume group '%s' not found on %s" %
5317 for idx, dev in enumerate(instance.disks):
5318 if idx not in self.op.disks:
5320 for node in tgt_node, oth_node:
5321 info("checking disk/%d on %s" % (idx, node))
5322 cfg.SetDiskID(dev, node)
5323 result = self.rpc.call_blockdev_find(node, dev)
5324 msg = result.fail_msg
5325 if not msg and not result.payload:
5326 msg = "disk not found"
5328 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5331 # Step: check other node consistency
5332 self.proc.LogStep(2, steps_total, "check peer consistency")
5333 for idx, dev in enumerate(instance.disks):
5334 if idx not in self.op.disks:
5336 info("checking disk/%d consistency on %s" % (idx, oth_node))
5337 if not _CheckDiskConsistency(self, dev, oth_node,
5338 oth_node==instance.primary_node):
5339 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5340 " to replace disks on this node (%s)" %
5341 (oth_node, tgt_node))
5343 # Step: create new storage
5344 self.proc.LogStep(3, steps_total, "allocate new storage")
5345 for idx, dev in enumerate(instance.disks):
5346 if idx not in self.op.disks:
5349 cfg.SetDiskID(dev, tgt_node)
5350 lv_names = [".disk%d_%s" % (idx, suf)
5351 for suf in ["data", "meta"]]
5352 names = _GenerateUniqueNames(self, lv_names)
5353 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5354 logical_id=(vgname, names[0]))
5355 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5356 logical_id=(vgname, names[1]))
5357 new_lvs = [lv_data, lv_meta]
5358 old_lvs = dev.children
5359 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5360 info("creating new local storage on %s for %s" %
5361 (tgt_node, dev.iv_name))
5362 # we pass force_create=True to force the LVM creation
5363 for new_lv in new_lvs:
5364 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5365 _GetInstanceInfoText(instance), False)
5367 # Step: for each lv, detach+rename*2+attach
5368 self.proc.LogStep(4, steps_total, "change drbd configuration")
5369 for dev, old_lvs, new_lvs in iv_names.itervalues():
5370 info("detaching %s drbd from local storage" % dev.iv_name)
5371 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5372 result.Raise("Can't detach drbd from local storage on node"
5373 " %s for device %s" % (tgt_node, dev.iv_name))
5375 #cfg.Update(instance)
5377 # ok, we created the new LVs, so now we know we have the needed
5378 # storage; as such, we proceed on the target node to rename
5379 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5380 # using the assumption that logical_id == physical_id (which in
5381 # turn is the unique_id on that node)
5383 # FIXME(iustin): use a better name for the replaced LVs
5384 temp_suffix = int(time.time())
5385 ren_fn = lambda d, suff: (d.physical_id[0],
5386 d.physical_id[1] + "_replaced-%s" % suff)
5387 # build the rename list based on what LVs exist on the node
5389 for to_ren in old_lvs:
5390 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5391 if not result.fail_msg and result.payload:
5393 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5395 info("renaming the old LVs on the target node")
5396 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5397 result.Raise("Can't rename old LVs on node %s" % tgt_node)
5398 # now we rename the new LVs to the old LVs
5399 info("renaming the new LVs on the target node")
5400 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5401 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5402 result.Raise("Can't rename new LVs on node %s" % tgt_node)
5404 for old, new in zip(old_lvs, new_lvs):
5405 new.logical_id = old.logical_id
5406 cfg.SetDiskID(new, tgt_node)
5408 for disk in old_lvs:
5409 disk.logical_id = ren_fn(disk, temp_suffix)
5410 cfg.SetDiskID(disk, tgt_node)
5412 # now that the new lvs have the old name, we can add them to the device
5413 info("adding new mirror component on %s" % tgt_node)
5414 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5415 msg = result.fail_msg
5417 for new_lv in new_lvs:
5418 msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
5420 warning("Can't rollback device %s: %s", dev, msg2,
5421 hint="cleanup manually the unused logical volumes")
5422 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5424 dev.children = new_lvs
5425 cfg.Update(instance)
5427 # Step: wait for sync
5429 # this can fail as the old devices are degraded and _WaitForSync
5430 # does a combined result over all disks, so we don't check its
5432 self.proc.LogStep(5, steps_total, "sync devices")
5433 _WaitForSync(self, instance, unlock=True)
5435 # so check manually all the devices
5436 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5437 cfg.SetDiskID(dev, instance.primary_node)
5438 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5439 msg = result.fail_msg
5440 if not msg and not result.payload:
5441 msg = "disk not found"
5443 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5445 if result.payload[5]:
5446 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5448 # Step: remove old storage
5449 self.proc.LogStep(6, steps_total, "removing old storage")
5450 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5451 info("remove logical volumes for %s" % name)
5453 cfg.SetDiskID(lv, tgt_node)
5454 msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
5456 warning("Can't remove old LV: %s" % msg,
5457 hint="manually remove unused LVs")
5460 def _ExecD8Secondary(self, feedback_fn):
5461 """Replace the secondary node for drbd8.
5463 The algorithm for replace is quite complicated:
5464 - for all disks of the instance:
5465 - create new LVs on the new node with same names
5466 - shutdown the drbd device on the old secondary
5467 - disconnect the drbd network on the primary
5468 - create the drbd device on the new secondary
5469 - network attach the drbd on the primary, using an artifice:
5470 the drbd code for Attach() will connect to the network if it
5471 finds a device which is connected to the good local disks but
5473 - wait for sync across all devices
5474 - remove all disks from the old secondary
5476 Failures are not very well handled.
5480 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5481 instance = self.instance
5485 old_node = self.tgt_node
5486 new_node = self.new_node
5487 pri_node = instance.primary_node
5489 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5490 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5491 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5494 # Step: check device activation
5495 self.proc.LogStep(1, steps_total, "check device existence")
5496 info("checking volume groups")
5497 my_vg = cfg.GetVGName()
5498 results = self.rpc.call_vg_list([pri_node, new_node])
5499 for node in pri_node, new_node:
5501 res.Raise("Error checking node %s" % node)
5502 if my_vg not in res.payload:
5503 raise errors.OpExecError("Volume group '%s' not found on %s" %
5505 for idx, dev in enumerate(instance.disks):
5506 if idx not in self.op.disks:
5508 info("checking disk/%d on %s" % (idx, pri_node))
5509 cfg.SetDiskID(dev, pri_node)
5510 result = self.rpc.call_blockdev_find(pri_node, dev)
5511 msg = result.fail_msg
5512 if not msg and not result.payload:
5513 msg = "disk not found"
5515 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5516 (idx, pri_node, msg))
5518 # Step: check other node consistency
5519 self.proc.LogStep(2, steps_total, "check peer consistency")
5520 for idx, dev in enumerate(instance.disks):
5521 if idx not in self.op.disks:
5523 info("checking disk/%d consistency on %s" % (idx, pri_node))
5524 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5525 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5526 " unsafe to replace the secondary" %
5529 # Step: create new storage
5530 self.proc.LogStep(3, steps_total, "allocate new storage")
5531 for idx, dev in enumerate(instance.disks):
5532 info("adding new local storage on %s for disk/%d" %
5534 # we pass force_create=True to force LVM creation
5535 for new_lv in dev.children:
5536 _CreateBlockDev(self, new_node, instance, new_lv, True,
5537 _GetInstanceInfoText(instance), False)
5539 # Step 4: dbrd minors and drbd setups changes
5540 # after this, we must manually remove the drbd minors on both the
5541 # error and the success paths
5542 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5544 logging.debug("Allocated minors %s" % (minors,))
5545 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5546 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5547 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5548 # create new devices on new_node; note that we create two IDs:
5549 # one without port, so the drbd will be activated without
5550 # networking information on the new node at this stage, and one
5551 # with network, for the latter activation in step 4
5552 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5553 if pri_node == o_node1:
5558 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5559 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5561 iv_names[idx] = (dev, dev.children, new_net_id)
5562 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5564 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5565 logical_id=new_alone_id,
5566 children=dev.children,
5569 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5570 _GetInstanceInfoText(instance), False)
5571 except errors.GenericError:
5572 self.cfg.ReleaseDRBDMinors(instance.name)
5575 for idx, dev in enumerate(instance.disks):
5576 # we have new devices, shutdown the drbd on the old secondary
5577 info("shutting down drbd for disk/%d on old node" % idx)
5578 cfg.SetDiskID(dev, old_node)
5579 msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
5581 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5583 hint="Please cleanup this device manually as soon as possible")
5585 info("detaching primary drbds from the network (=> standalone)")
5586 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5587 instance.disks)[pri_node]
5589 msg = result.fail_msg
5591 # detaches didn't succeed (unlikely)
5592 self.cfg.ReleaseDRBDMinors(instance.name)
5593 raise errors.OpExecError("Can't detach the disks from the network on"
5594 " old node: %s" % (msg,))
5596 # if we managed to detach at least one, we update all the disks of
5597 # the instance to point to the new secondary
5598 info("updating instance configuration")
5599 for dev, _, new_logical_id in iv_names.itervalues():
5600 dev.logical_id = new_logical_id
5601 cfg.SetDiskID(dev, pri_node)
5602 cfg.Update(instance)
5604 # and now perform the drbd attach
5605 info("attaching primary drbds to new secondary (standalone => connected)")
5606 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5607 instance.disks, instance.name,
5609 for to_node, to_result in result.items():
5610 msg = to_result.fail_msg
5612 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5613 hint="please do a gnt-instance info to see the"
5616 # this can fail as the old devices are degraded and _WaitForSync
5617 # does a combined result over all disks, so we don't check its
5619 self.proc.LogStep(5, steps_total, "sync devices")
5620 _WaitForSync(self, instance, unlock=True)
5622 # so check manually all the devices
5623 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5624 cfg.SetDiskID(dev, pri_node)
5625 result = self.rpc.call_blockdev_find(pri_node, dev)
5626 msg = result.fail_msg
5627 if not msg and not result.payload:
5628 msg = "disk not found"
5630 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5632 if result.payload[5]:
5633 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5635 self.proc.LogStep(6, steps_total, "removing old storage")
5636 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5637 info("remove logical volumes for disk/%d" % idx)
5639 cfg.SetDiskID(lv, old_node)
5640 msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
5642 warning("Can't remove LV on old secondary: %s", msg,
5643 hint="Cleanup stale volumes by hand")
5645 def Exec(self, feedback_fn):
5646 """Execute disk replacement.
5648 This dispatches the disk replacement to the appropriate handler.
5651 instance = self.instance
5653 # Activate the instance disks if we're replacing them on a down instance
5654 if not instance.admin_up:
5655 _StartInstanceDisks(self, instance, True)
5657 if self.op.mode == constants.REPLACE_DISK_CHG:
5658 fn = self._ExecD8Secondary
5660 fn = self._ExecD8DiskOnly
5662 ret = fn(feedback_fn)
5664 # Deactivate the instance disks if we're replacing them on a down instance
5665 if not instance.admin_up:
5666 _SafeShutdownInstanceDisks(self, instance)
5671 class LUGrowDisk(LogicalUnit):
5672 """Grow a disk of an instance.
5676 HTYPE = constants.HTYPE_INSTANCE
5677 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5680 def ExpandNames(self):
5681 self._ExpandAndLockInstance()
5682 self.needed_locks[locking.LEVEL_NODE] = []
5683 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5685 def DeclareLocks(self, level):
5686 if level == locking.LEVEL_NODE:
5687 self._LockInstancesNodes()
5689 def BuildHooksEnv(self):
5692 This runs on the master, the primary and all the secondaries.
5696 "DISK": self.op.disk,
5697 "AMOUNT": self.op.amount,
5699 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5701 self.cfg.GetMasterNode(),
5702 self.instance.primary_node,
5706 def CheckPrereq(self):
5707 """Check prerequisites.
5709 This checks that the instance is in the cluster.
5712 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5713 assert instance is not None, \
5714 "Cannot retrieve locked instance %s" % self.op.instance_name
5715 nodenames = list(instance.all_nodes)
5716 for node in nodenames:
5717 _CheckNodeOnline(self, node)
5720 self.instance = instance
5722 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5723 raise errors.OpPrereqError("Instance's disk layout does not support"
5726 self.disk = instance.FindDisk(self.op.disk)
5728 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5729 instance.hypervisor)
5730 for node in nodenames:
5731 info = nodeinfo[node]
5732 info.Raise("Cannot get current information from node %s" % node)
5733 vg_free = info.payload.get('vg_free', None)
5734 if not isinstance(vg_free, int):
5735 raise errors.OpPrereqError("Can't compute free disk space on"
5737 if self.op.amount > vg_free:
5738 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5739 " %d MiB available, %d MiB required" %
5740 (node, vg_free, self.op.amount))
5742 def Exec(self, feedback_fn):
5743 """Execute disk grow.
5746 instance = self.instance
5748 for node in instance.all_nodes:
5749 self.cfg.SetDiskID(disk, node)
5750 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5751 result.Raise("Grow request failed to node %s" % node)
5752 disk.RecordGrow(self.op.amount)
5753 self.cfg.Update(instance)
5754 if self.op.wait_for_sync:
5755 disk_abort = not _WaitForSync(self, instance)
5757 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5758 " status.\nPlease check the instance.")
5761 class LUQueryInstanceData(NoHooksLU):
5762 """Query runtime instance data.
5765 _OP_REQP = ["instances", "static"]
5768 def ExpandNames(self):
5769 self.needed_locks = {}
5770 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
5772 if not isinstance(self.op.instances, list):
5773 raise errors.OpPrereqError("Invalid argument type 'instances'")
5775 if self.op.instances:
5776 self.wanted_names = []
5777 for name in self.op.instances:
5778 full_name = self.cfg.ExpandInstanceName(name)
5779 if full_name is None:
5780 raise errors.OpPrereqError("Instance '%s' not known" % name)
5781 self.wanted_names.append(full_name)
5782 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5784 self.wanted_names = None
5785 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5787 self.needed_locks[locking.LEVEL_NODE] = []
5788 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5790 def DeclareLocks(self, level):
5791 if level == locking.LEVEL_NODE:
5792 self._LockInstancesNodes()
5794 def CheckPrereq(self):
5795 """Check prerequisites.
5797 This only checks the optional instance list against the existing names.
5800 if self.wanted_names is None:
5801 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5803 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5804 in self.wanted_names]
5807 def _ComputeDiskStatus(self, instance, snode, dev):
5808 """Compute block device status.
5811 static = self.op.static
5813 self.cfg.SetDiskID(dev, instance.primary_node)
5814 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5815 if dev_pstatus.offline:
5818 dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5819 dev_pstatus = dev_pstatus.payload
5823 if dev.dev_type in constants.LDS_DRBD:
5824 # we change the snode then (otherwise we use the one passed in)
5825 if dev.logical_id[0] == instance.primary_node:
5826 snode = dev.logical_id[1]
5828 snode = dev.logical_id[0]
5830 if snode and not static:
5831 self.cfg.SetDiskID(dev, snode)
5832 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5833 if dev_sstatus.offline:
5836 dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5837 dev_sstatus = dev_sstatus.payload
5842 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5843 for child in dev.children]
5848 "iv_name": dev.iv_name,
5849 "dev_type": dev.dev_type,
5850 "logical_id": dev.logical_id,
5851 "physical_id": dev.physical_id,
5852 "pstatus": dev_pstatus,
5853 "sstatus": dev_sstatus,
5854 "children": dev_children,
5861 def Exec(self, feedback_fn):
5862 """Gather and return data"""
5865 cluster = self.cfg.GetClusterInfo()
5867 for instance in self.wanted_instances:
5868 if not self.op.static:
5869 remote_info = self.rpc.call_instance_info(instance.primary_node,
5871 instance.hypervisor)
5872 remote_info.Raise("Error checking node %s" % instance.primary_node)
5873 remote_info = remote_info.payload
5874 if remote_info and "state" in remote_info:
5877 remote_state = "down"
5880 if instance.admin_up:
5883 config_state = "down"
5885 disks = [self._ComputeDiskStatus(instance, None, device)
5886 for device in instance.disks]
5889 "name": instance.name,
5890 "config_state": config_state,
5891 "run_state": remote_state,
5892 "pnode": instance.primary_node,
5893 "snodes": instance.secondary_nodes,
5895 # this happens to be the same format used for hooks
5896 "nics": _NICListToTuple(self, instance.nics),
5898 "hypervisor": instance.hypervisor,
5899 "network_port": instance.network_port,
5900 "hv_instance": instance.hvparams,
5901 "hv_actual": cluster.FillHV(instance),
5902 "be_instance": instance.beparams,
5903 "be_actual": cluster.FillBE(instance),
5906 result[instance.name] = idict
5911 class LUSetInstanceParams(LogicalUnit):
5912 """Modifies an instances's parameters.
5915 HPATH = "instance-modify"
5916 HTYPE = constants.HTYPE_INSTANCE
5917 _OP_REQP = ["instance_name"]
5920 def CheckArguments(self):
5921 if not hasattr(self.op, 'nics'):
5923 if not hasattr(self.op, 'disks'):
5925 if not hasattr(self.op, 'beparams'):
5926 self.op.beparams = {}
5927 if not hasattr(self.op, 'hvparams'):
5928 self.op.hvparams = {}
5929 self.op.force = getattr(self.op, "force", False)
5930 if not (self.op.nics or self.op.disks or
5931 self.op.hvparams or self.op.beparams):
5932 raise errors.OpPrereqError("No changes submitted")
5936 for disk_op, disk_dict in self.op.disks:
5937 if disk_op == constants.DDM_REMOVE:
5940 elif disk_op == constants.DDM_ADD:
5943 if not isinstance(disk_op, int):
5944 raise errors.OpPrereqError("Invalid disk index")
5945 if not isinstance(disk_dict, dict):
5946 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
5947 raise errors.OpPrereqError(msg)
5949 if disk_op == constants.DDM_ADD:
5950 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5951 if mode not in constants.DISK_ACCESS_SET:
5952 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5953 size = disk_dict.get('size', None)
5955 raise errors.OpPrereqError("Required disk parameter size missing")
5958 except ValueError, err:
5959 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5961 disk_dict['size'] = size
5963 # modification of disk
5964 if 'size' in disk_dict:
5965 raise errors.OpPrereqError("Disk size change not possible, use"
5968 if disk_addremove > 1:
5969 raise errors.OpPrereqError("Only one disk add or remove operation"
5970 " supported at a time")
5974 for nic_op, nic_dict in self.op.nics:
5975 if nic_op == constants.DDM_REMOVE:
5978 elif nic_op == constants.DDM_ADD:
5981 if not isinstance(nic_op, int):
5982 raise errors.OpPrereqError("Invalid nic index")
5983 if not isinstance(nic_dict, dict):
5984 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
5985 raise errors.OpPrereqError(msg)
5987 # nic_dict should be a dict
5988 nic_ip = nic_dict.get('ip', None)
5989 if nic_ip is not None:
5990 if nic_ip.lower() == constants.VALUE_NONE:
5991 nic_dict['ip'] = None
5993 if not utils.IsValidIP(nic_ip):
5994 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5996 nic_bridge = nic_dict.get('bridge', None)
5997 nic_link = nic_dict.get('link', None)
5998 if nic_bridge and nic_link:
5999 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
6000 " at the same time")
6001 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
6002 nic_dict['bridge'] = None
6003 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
6004 nic_dict['link'] = None
6006 if nic_op == constants.DDM_ADD:
6007 nic_mac = nic_dict.get('mac', None)
6009 nic_dict['mac'] = constants.VALUE_AUTO
6011 if 'mac' in nic_dict:
6012 nic_mac = nic_dict['mac']
6013 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6014 if not utils.IsValidMac(nic_mac):
6015 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6016 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6017 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6018 " modifying an existing nic")
6020 if nic_addremove > 1:
6021 raise errors.OpPrereqError("Only one NIC add or remove operation"
6022 " supported at a time")
6024 def ExpandNames(self):
6025 self._ExpandAndLockInstance()
6026 self.needed_locks[locking.LEVEL_NODE] = []
6027 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6029 def DeclareLocks(self, level):
6030 if level == locking.LEVEL_NODE:
6031 self._LockInstancesNodes()
6033 def BuildHooksEnv(self):
6036 This runs on the master, primary and secondaries.
6040 if constants.BE_MEMORY in self.be_new:
6041 args['memory'] = self.be_new[constants.BE_MEMORY]
6042 if constants.BE_VCPUS in self.be_new:
6043 args['vcpus'] = self.be_new[constants.BE_VCPUS]
6044 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6045 # information at all.
6048 nic_override = dict(self.op.nics)
6049 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6050 for idx, nic in enumerate(self.instance.nics):
6051 if idx in nic_override:
6052 this_nic_override = nic_override[idx]
6054 this_nic_override = {}
6055 if 'ip' in this_nic_override:
6056 ip = this_nic_override['ip']
6059 if 'mac' in this_nic_override:
6060 mac = this_nic_override['mac']
6063 if idx in self.nic_pnew:
6064 nicparams = self.nic_pnew[idx]
6066 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6067 mode = nicparams[constants.NIC_MODE]
6068 link = nicparams[constants.NIC_LINK]
6069 args['nics'].append((ip, mac, mode, link))
6070 if constants.DDM_ADD in nic_override:
6071 ip = nic_override[constants.DDM_ADD].get('ip', None)
6072 mac = nic_override[constants.DDM_ADD]['mac']
6073 nicparams = self.nic_pnew[constants.DDM_ADD]
6074 mode = nicparams[constants.NIC_MODE]
6075 link = nicparams[constants.NIC_LINK]
6076 args['nics'].append((ip, mac, mode, link))
6077 elif constants.DDM_REMOVE in nic_override:
6078 del args['nics'][-1]
6080 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6081 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6084 def _GetUpdatedParams(self, old_params, update_dict,
6085 default_values, parameter_types):
6086 """Return the new params dict for the given params.
6088 @type old_params: dict
6089 @param old_params: old parameters
6090 @type update_dict: dict
6091 @param update_dict: dict containing new parameter values,
6092 or constants.VALUE_DEFAULT to reset the
6093 parameter to its default value
6094 @type default_values: dict
6095 @param default_values: default values for the filled parameters
6096 @type parameter_types: dict
6097 @param parameter_types: dict mapping target dict keys to types
6098 in constants.ENFORCEABLE_TYPES
6099 @rtype: (dict, dict)
6100 @return: (new_parameters, filled_parameters)
6103 params_copy = copy.deepcopy(old_params)
6104 for key, val in update_dict.iteritems():
6105 if val == constants.VALUE_DEFAULT:
6107 del params_copy[key]
6111 params_copy[key] = val
6112 utils.ForceDictType(params_copy, parameter_types)
6113 params_filled = objects.FillDict(default_values, params_copy)
6114 return (params_copy, params_filled)
6116 def CheckPrereq(self):
6117 """Check prerequisites.
6119 This only checks the instance list against the existing names.
6122 self.force = self.op.force
6124 # checking the new params on the primary/secondary nodes
6126 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6127 cluster = self.cluster = self.cfg.GetClusterInfo()
6128 assert self.instance is not None, \
6129 "Cannot retrieve locked instance %s" % self.op.instance_name
6130 pnode = instance.primary_node
6131 nodelist = list(instance.all_nodes)
6133 # hvparams processing
6134 if self.op.hvparams:
6135 i_hvdict, hv_new = self._GetUpdatedParams(
6136 instance.hvparams, self.op.hvparams,
6137 cluster.hvparams[instance.hypervisor],
6138 constants.HVS_PARAMETER_TYPES)
6140 hypervisor.GetHypervisor(
6141 instance.hypervisor).CheckParameterSyntax(hv_new)
6142 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6143 self.hv_new = hv_new # the new actual values
6144 self.hv_inst = i_hvdict # the new dict (without defaults)
6146 self.hv_new = self.hv_inst = {}
6148 # beparams processing
6149 if self.op.beparams:
6150 i_bedict, be_new = self._GetUpdatedParams(
6151 instance.beparams, self.op.beparams,
6152 cluster.beparams[constants.PP_DEFAULT],
6153 constants.BES_PARAMETER_TYPES)
6154 self.be_new = be_new # the new actual values
6155 self.be_inst = i_bedict # the new dict (without defaults)
6157 self.be_new = self.be_inst = {}
6161 if constants.BE_MEMORY in self.op.beparams and not self.force:
6162 mem_check_list = [pnode]
6163 if be_new[constants.BE_AUTO_BALANCE]:
6164 # either we changed auto_balance to yes or it was from before
6165 mem_check_list.extend(instance.secondary_nodes)
6166 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6167 instance.hypervisor)
6168 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6169 instance.hypervisor)
6170 pninfo = nodeinfo[pnode]
6171 msg = pninfo.fail_msg
6173 # Assume the primary node is unreachable and go ahead
6174 self.warn.append("Can't get info from primary node %s: %s" %
6176 elif not isinstance(pninfo.payload.get('memory_free', None), int):
6177 self.warn.append("Node data from primary node %s doesn't contain"
6178 " free memory information" % pnode)
6179 elif instance_info.fail_msg:
6180 self.warn.append("Can't get instance runtime information: %s" %
6181 instance_info.fail_msg)
6183 if instance_info.payload:
6184 current_mem = int(instance_info.payload['memory'])
6186 # Assume instance not running
6187 # (there is a slight race condition here, but it's not very probable,
6188 # and we have no other way to check)
6190 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6191 pninfo.payload['memory_free'])
6193 raise errors.OpPrereqError("This change will prevent the instance"
6194 " from starting, due to %d MB of memory"
6195 " missing on its primary node" % miss_mem)
6197 if be_new[constants.BE_AUTO_BALANCE]:
6198 for node, nres in nodeinfo.items():
6199 if node not in instance.secondary_nodes:
6203 self.warn.append("Can't get info from secondary node %s: %s" %
6205 elif not isinstance(nres.payload.get('memory_free', None), int):
6206 self.warn.append("Secondary node %s didn't return free"
6207 " memory information" % node)
6208 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6209 self.warn.append("Not enough memory to failover instance to"
6210 " secondary node %s" % node)
6215 for nic_op, nic_dict in self.op.nics:
6216 if nic_op == constants.DDM_REMOVE:
6217 if not instance.nics:
6218 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6220 if nic_op != constants.DDM_ADD:
6222 if nic_op < 0 or nic_op >= len(instance.nics):
6223 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6225 (nic_op, len(instance.nics)))
6226 old_nic_params = instance.nics[nic_op].nicparams
6227 old_nic_ip = instance.nics[nic_op].ip
6232 update_params_dict = dict([(key, nic_dict[key])
6233 for key in constants.NICS_PARAMETERS
6234 if key in nic_dict])
6236 if 'bridge' in nic_dict:
6237 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6239 new_nic_params, new_filled_nic_params = \
6240 self._GetUpdatedParams(old_nic_params, update_params_dict,
6241 cluster.nicparams[constants.PP_DEFAULT],
6242 constants.NICS_PARAMETER_TYPES)
6243 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6244 self.nic_pinst[nic_op] = new_nic_params
6245 self.nic_pnew[nic_op] = new_filled_nic_params
6246 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6248 if new_nic_mode == constants.NIC_MODE_BRIDGED:
6249 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6250 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6252 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6254 self.warn.append(msg)
6256 raise errors.OpPrereqError(msg)
6257 if new_nic_mode == constants.NIC_MODE_ROUTED:
6258 if 'ip' in nic_dict:
6259 nic_ip = nic_dict['ip']
6263 raise errors.OpPrereqError('Cannot set the nic ip to None'
6265 if 'mac' in nic_dict:
6266 nic_mac = nic_dict['mac']
6268 raise errors.OpPrereqError('Cannot set the nic mac to None')
6269 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6270 # otherwise generate the mac
6271 nic_dict['mac'] = self.cfg.GenerateMAC()
6273 # or validate/reserve the current one
6274 if self.cfg.IsMacInUse(nic_mac):
6275 raise errors.OpPrereqError("MAC address %s already in use"
6276 " in cluster" % nic_mac)
6279 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6280 raise errors.OpPrereqError("Disk operations not supported for"
6281 " diskless instances")
6282 for disk_op, disk_dict in self.op.disks:
6283 if disk_op == constants.DDM_REMOVE:
6284 if len(instance.disks) == 1:
6285 raise errors.OpPrereqError("Cannot remove the last disk of"
6287 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6288 ins_l = ins_l[pnode]
6289 msg = ins_l.fail_msg
6291 raise errors.OpPrereqError("Can't contact node %s: %s" %
6293 if instance.name in ins_l.payload:
6294 raise errors.OpPrereqError("Instance is running, can't remove"
6297 if (disk_op == constants.DDM_ADD and
6298 len(instance.nics) >= constants.MAX_DISKS):
6299 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6300 " add more" % constants.MAX_DISKS)
6301 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6303 if disk_op < 0 or disk_op >= len(instance.disks):
6304 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6306 (disk_op, len(instance.disks)))
6310 def Exec(self, feedback_fn):
6311 """Modifies an instance.
6313 All parameters take effect only at the next restart of the instance.
6316 # Process here the warnings from CheckPrereq, as we don't have a
6317 # feedback_fn there.
6318 for warn in self.warn:
6319 feedback_fn("WARNING: %s" % warn)
6322 instance = self.instance
6323 cluster = self.cluster
6325 for disk_op, disk_dict in self.op.disks:
6326 if disk_op == constants.DDM_REMOVE:
6327 # remove the last disk
6328 device = instance.disks.pop()
6329 device_idx = len(instance.disks)
6330 for node, disk in device.ComputeNodeTree(instance.primary_node):
6331 self.cfg.SetDiskID(disk, node)
6332 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6334 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6335 " continuing anyway", device_idx, node, msg)
6336 result.append(("disk/%d" % device_idx, "remove"))
6337 elif disk_op == constants.DDM_ADD:
6339 if instance.disk_template == constants.DT_FILE:
6340 file_driver, file_path = instance.disks[0].logical_id
6341 file_path = os.path.dirname(file_path)
6343 file_driver = file_path = None
6344 disk_idx_base = len(instance.disks)
6345 new_disk = _GenerateDiskTemplate(self,
6346 instance.disk_template,
6347 instance.name, instance.primary_node,
6348 instance.secondary_nodes,
6353 instance.disks.append(new_disk)
6354 info = _GetInstanceInfoText(instance)
6356 logging.info("Creating volume %s for instance %s",
6357 new_disk.iv_name, instance.name)
6358 # Note: this needs to be kept in sync with _CreateDisks
6360 for node in instance.all_nodes:
6361 f_create = node == instance.primary_node
6363 _CreateBlockDev(self, node, instance, new_disk,
6364 f_create, info, f_create)
6365 except errors.OpExecError, err:
6366 self.LogWarning("Failed to create volume %s (%s) on"
6368 new_disk.iv_name, new_disk, node, err)
6369 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6370 (new_disk.size, new_disk.mode)))
6372 # change a given disk
6373 instance.disks[disk_op].mode = disk_dict['mode']
6374 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6376 for nic_op, nic_dict in self.op.nics:
6377 if nic_op == constants.DDM_REMOVE:
6378 # remove the last nic
6379 del instance.nics[-1]
6380 result.append(("nic.%d" % len(instance.nics), "remove"))
6381 elif nic_op == constants.DDM_ADD:
6382 # mac and bridge should be set, by now
6383 mac = nic_dict['mac']
6384 ip = nic_dict.get('ip', None)
6385 nicparams = self.nic_pinst[constants.DDM_ADD]
6386 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6387 instance.nics.append(new_nic)
6388 result.append(("nic.%d" % (len(instance.nics) - 1),
6389 "add:mac=%s,ip=%s,mode=%s,link=%s" %
6390 (new_nic.mac, new_nic.ip,
6391 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6392 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6395 for key in 'mac', 'ip':
6397 setattr(instance.nics[nic_op], key, nic_dict[key])
6398 if nic_op in self.nic_pnew:
6399 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6400 for key, val in nic_dict.iteritems():
6401 result.append(("nic.%s/%d" % (key, nic_op), val))
6404 if self.op.hvparams:
6405 instance.hvparams = self.hv_inst
6406 for key, val in self.op.hvparams.iteritems():
6407 result.append(("hv/%s" % key, val))
6410 if self.op.beparams:
6411 instance.beparams = self.be_inst
6412 for key, val in self.op.beparams.iteritems():
6413 result.append(("be/%s" % key, val))
6415 self.cfg.Update(instance)
6420 class LUQueryExports(NoHooksLU):
6421 """Query the exports list
6424 _OP_REQP = ['nodes']
6427 def ExpandNames(self):
6428 self.needed_locks = {}
6429 self.share_locks[locking.LEVEL_NODE] = 1
6430 if not self.op.nodes:
6431 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6433 self.needed_locks[locking.LEVEL_NODE] = \
6434 _GetWantedNodes(self, self.op.nodes)
6436 def CheckPrereq(self):
6437 """Check prerequisites.
6440 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6442 def Exec(self, feedback_fn):
6443 """Compute the list of all the exported system images.
6446 @return: a dictionary with the structure node->(export-list)
6447 where export-list is a list of the instances exported on
6451 rpcresult = self.rpc.call_export_list(self.nodes)
6453 for node in rpcresult:
6454 if rpcresult[node].fail_msg:
6455 result[node] = False
6457 result[node] = rpcresult[node].payload
6462 class LUExportInstance(LogicalUnit):
6463 """Export an instance to an image in the cluster.
6466 HPATH = "instance-export"
6467 HTYPE = constants.HTYPE_INSTANCE
6468 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6471 def ExpandNames(self):
6472 self._ExpandAndLockInstance()
6473 # FIXME: lock only instance primary and destination node
6475 # Sad but true, for now we have do lock all nodes, as we don't know where
6476 # the previous export might be, and and in this LU we search for it and
6477 # remove it from its current node. In the future we could fix this by:
6478 # - making a tasklet to search (share-lock all), then create the new one,
6479 # then one to remove, after
6480 # - removing the removal operation altogether
6481 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6483 def DeclareLocks(self, level):
6484 """Last minute lock declaration."""
6485 # All nodes are locked anyway, so nothing to do here.
6487 def BuildHooksEnv(self):
6490 This will run on the master, primary node and target node.
6494 "EXPORT_NODE": self.op.target_node,
6495 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6497 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6498 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6499 self.op.target_node]
6502 def CheckPrereq(self):
6503 """Check prerequisites.
6505 This checks that the instance and node names are valid.
6508 instance_name = self.op.instance_name
6509 self.instance = self.cfg.GetInstanceInfo(instance_name)
6510 assert self.instance is not None, \
6511 "Cannot retrieve locked instance %s" % self.op.instance_name
6512 _CheckNodeOnline(self, self.instance.primary_node)
6514 self.dst_node = self.cfg.GetNodeInfo(
6515 self.cfg.ExpandNodeName(self.op.target_node))
6517 if self.dst_node is None:
6518 # This is wrong node name, not a non-locked node
6519 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6520 _CheckNodeOnline(self, self.dst_node.name)
6521 _CheckNodeNotDrained(self, self.dst_node.name)
6523 # instance disk type verification
6524 for disk in self.instance.disks:
6525 if disk.dev_type == constants.LD_FILE:
6526 raise errors.OpPrereqError("Export not supported for instances with"
6527 " file-based disks")
6529 def Exec(self, feedback_fn):
6530 """Export an instance to an image in the cluster.
6533 instance = self.instance
6534 dst_node = self.dst_node
6535 src_node = instance.primary_node
6536 if self.op.shutdown:
6537 # shutdown the instance, but not the disks
6538 result = self.rpc.call_instance_shutdown(src_node, instance)
6539 result.Raise("Could not shutdown instance %s on"
6540 " node %s" % (instance.name, src_node))
6542 vgname = self.cfg.GetVGName()
6546 # set the disks ID correctly since call_instance_start needs the
6547 # correct drbd minor to create the symlinks
6548 for disk in instance.disks:
6549 self.cfg.SetDiskID(disk, src_node)
6552 for idx, disk in enumerate(instance.disks):
6553 # result.payload will be a snapshot of an lvm leaf of the one we passed
6554 result = self.rpc.call_blockdev_snapshot(src_node, disk)
6555 msg = result.fail_msg
6557 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6559 snap_disks.append(False)
6561 disk_id = (vgname, result.payload)
6562 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6563 logical_id=disk_id, physical_id=disk_id,
6564 iv_name=disk.iv_name)
6565 snap_disks.append(new_dev)
6568 if self.op.shutdown and instance.admin_up:
6569 result = self.rpc.call_instance_start(src_node, instance, None, None)
6570 msg = result.fail_msg
6572 _ShutdownInstanceDisks(self, instance)
6573 raise errors.OpExecError("Could not start instance: %s" % msg)
6575 # TODO: check for size
6577 cluster_name = self.cfg.GetClusterName()
6578 for idx, dev in enumerate(snap_disks):
6580 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6581 instance, cluster_name, idx)
6582 msg = result.fail_msg
6584 self.LogWarning("Could not export disk/%s from node %s to"
6585 " node %s: %s", idx, src_node, dst_node.name, msg)
6586 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6588 self.LogWarning("Could not remove snapshot for disk/%d from node"
6589 " %s: %s", idx, src_node, msg)
6591 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6592 msg = result.fail_msg
6594 self.LogWarning("Could not finalize export for instance %s"
6595 " on node %s: %s", instance.name, dst_node.name, msg)
6597 nodelist = self.cfg.GetNodeList()
6598 nodelist.remove(dst_node.name)
6600 # on one-node clusters nodelist will be empty after the removal
6601 # if we proceed the backup would be removed because OpQueryExports
6602 # substitutes an empty list with the full cluster node list.
6603 iname = instance.name
6605 exportlist = self.rpc.call_export_list(nodelist)
6606 for node in exportlist:
6607 if exportlist[node].fail_msg:
6609 if iname in exportlist[node].payload:
6610 msg = self.rpc.call_export_remove(node, iname).fail_msg
6612 self.LogWarning("Could not remove older export for instance %s"
6613 " on node %s: %s", iname, node, msg)
6616 class LURemoveExport(NoHooksLU):
6617 """Remove exports related to the named instance.
6620 _OP_REQP = ["instance_name"]
6623 def ExpandNames(self):
6624 self.needed_locks = {}
6625 # We need all nodes to be locked in order for RemoveExport to work, but we
6626 # don't need to lock the instance itself, as nothing will happen to it (and
6627 # we can remove exports also for a removed instance)
6628 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6630 def CheckPrereq(self):
6631 """Check prerequisites.
6635 def Exec(self, feedback_fn):
6636 """Remove any export.
6639 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6640 # If the instance was not found we'll try with the name that was passed in.
6641 # This will only work if it was an FQDN, though.
6643 if not instance_name:
6645 instance_name = self.op.instance_name
6647 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6648 exportlist = self.rpc.call_export_list(locked_nodes)
6650 for node in exportlist:
6651 msg = exportlist[node].fail_msg
6653 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6655 if instance_name in exportlist[node].payload:
6657 result = self.rpc.call_export_remove(node, instance_name)
6658 msg = result.fail_msg
6660 logging.error("Could not remove export for instance %s"
6661 " on node %s: %s", instance_name, node, msg)
6663 if fqdn_warn and not found:
6664 feedback_fn("Export not found. If trying to remove an export belonging"
6665 " to a deleted instance please use its Fully Qualified"
6669 class TagsLU(NoHooksLU):
6672 This is an abstract class which is the parent of all the other tags LUs.
6676 def ExpandNames(self):
6677 self.needed_locks = {}
6678 if self.op.kind == constants.TAG_NODE:
6679 name = self.cfg.ExpandNodeName(self.op.name)
6681 raise errors.OpPrereqError("Invalid node name (%s)" %
6684 self.needed_locks[locking.LEVEL_NODE] = name
6685 elif self.op.kind == constants.TAG_INSTANCE:
6686 name = self.cfg.ExpandInstanceName(self.op.name)
6688 raise errors.OpPrereqError("Invalid instance name (%s)" %
6691 self.needed_locks[locking.LEVEL_INSTANCE] = name
6693 def CheckPrereq(self):
6694 """Check prerequisites.
6697 if self.op.kind == constants.TAG_CLUSTER:
6698 self.target = self.cfg.GetClusterInfo()
6699 elif self.op.kind == constants.TAG_NODE:
6700 self.target = self.cfg.GetNodeInfo(self.op.name)
6701 elif self.op.kind == constants.TAG_INSTANCE:
6702 self.target = self.cfg.GetInstanceInfo(self.op.name)
6704 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6708 class LUGetTags(TagsLU):
6709 """Returns the tags of a given object.
6712 _OP_REQP = ["kind", "name"]
6715 def Exec(self, feedback_fn):
6716 """Returns the tag list.
6719 return list(self.target.GetTags())
6722 class LUSearchTags(NoHooksLU):
6723 """Searches the tags for a given pattern.
6726 _OP_REQP = ["pattern"]
6729 def ExpandNames(self):
6730 self.needed_locks = {}
6732 def CheckPrereq(self):
6733 """Check prerequisites.
6735 This checks the pattern passed for validity by compiling it.
6739 self.re = re.compile(self.op.pattern)
6740 except re.error, err:
6741 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6742 (self.op.pattern, err))
6744 def Exec(self, feedback_fn):
6745 """Returns the tag list.
6749 tgts = [("/cluster", cfg.GetClusterInfo())]
6750 ilist = cfg.GetAllInstancesInfo().values()
6751 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6752 nlist = cfg.GetAllNodesInfo().values()
6753 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6755 for path, target in tgts:
6756 for tag in target.GetTags():
6757 if self.re.search(tag):
6758 results.append((path, tag))
6762 class LUAddTags(TagsLU):
6763 """Sets a tag on a given object.
6766 _OP_REQP = ["kind", "name", "tags"]
6769 def CheckPrereq(self):
6770 """Check prerequisites.
6772 This checks the type and length of the tag name and value.
6775 TagsLU.CheckPrereq(self)
6776 for tag in self.op.tags:
6777 objects.TaggableObject.ValidateTag(tag)
6779 def Exec(self, feedback_fn):
6784 for tag in self.op.tags:
6785 self.target.AddTag(tag)
6786 except errors.TagError, err:
6787 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6789 self.cfg.Update(self.target)
6790 except errors.ConfigurationError:
6791 raise errors.OpRetryError("There has been a modification to the"
6792 " config file and the operation has been"
6793 " aborted. Please retry.")
6796 class LUDelTags(TagsLU):
6797 """Delete a list of tags from a given object.
6800 _OP_REQP = ["kind", "name", "tags"]
6803 def CheckPrereq(self):
6804 """Check prerequisites.
6806 This checks that we have the given tag.
6809 TagsLU.CheckPrereq(self)
6810 for tag in self.op.tags:
6811 objects.TaggableObject.ValidateTag(tag)
6812 del_tags = frozenset(self.op.tags)
6813 cur_tags = self.target.GetTags()
6814 if not del_tags <= cur_tags:
6815 diff_tags = del_tags - cur_tags
6816 diff_names = ["'%s'" % tag for tag in diff_tags]
6818 raise errors.OpPrereqError("Tag(s) %s not found" %
6819 (",".join(diff_names)))
6821 def Exec(self, feedback_fn):
6822 """Remove the tag from the object.
6825 for tag in self.op.tags:
6826 self.target.RemoveTag(tag)
6828 self.cfg.Update(self.target)
6829 except errors.ConfigurationError:
6830 raise errors.OpRetryError("There has been a modification to the"
6831 " config file and the operation has been"
6832 " aborted. Please retry.")
6835 class LUTestDelay(NoHooksLU):
6836 """Sleep for a specified amount of time.
6838 This LU sleeps on the master and/or nodes for a specified amount of
6842 _OP_REQP = ["duration", "on_master", "on_nodes"]
6845 def ExpandNames(self):
6846 """Expand names and set required locks.
6848 This expands the node list, if any.
6851 self.needed_locks = {}
6852 if self.op.on_nodes:
6853 # _GetWantedNodes can be used here, but is not always appropriate to use
6854 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6856 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6857 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6859 def CheckPrereq(self):
6860 """Check prerequisites.
6864 def Exec(self, feedback_fn):
6865 """Do the actual sleep.
6868 if self.op.on_master:
6869 if not utils.TestDelay(self.op.duration):
6870 raise errors.OpExecError("Error during master delay test")
6871 if self.op.on_nodes:
6872 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6873 for node, node_result in result.items():
6874 node_result.Raise("Failure during rpc call to node %s" % node)
6877 class IAllocator(object):
6878 """IAllocator framework.
6880 An IAllocator instance has three sets of attributes:
6881 - cfg that is needed to query the cluster
6882 - input data (all members of the _KEYS class attribute are required)
6883 - four buffer attributes (in|out_data|text), that represent the
6884 input (to the external script) in text and data structure format,
6885 and the output from it, again in two formats
6886 - the result variables from the script (success, info, nodes) for
6891 "mem_size", "disks", "disk_template",
6892 "os", "tags", "nics", "vcpus", "hypervisor",
6898 def __init__(self, lu, mode, name, **kwargs):
6900 # init buffer variables
6901 self.in_text = self.out_text = self.in_data = self.out_data = None
6902 # init all input fields so that pylint is happy
6905 self.mem_size = self.disks = self.disk_template = None
6906 self.os = self.tags = self.nics = self.vcpus = None
6907 self.hypervisor = None
6908 self.relocate_from = None
6910 self.required_nodes = None
6911 # init result fields
6912 self.success = self.info = self.nodes = None
6913 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6914 keyset = self._ALLO_KEYS
6915 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6916 keyset = self._RELO_KEYS
6918 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6919 " IAllocator" % self.mode)
6921 if key not in keyset:
6922 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6923 " IAllocator" % key)
6924 setattr(self, key, kwargs[key])
6926 if key not in kwargs:
6927 raise errors.ProgrammerError("Missing input parameter '%s' to"
6928 " IAllocator" % key)
6929 self._BuildInputData()
6931 def _ComputeClusterData(self):
6932 """Compute the generic allocator input data.
6934 This is the data that is independent of the actual operation.
6938 cluster_info = cfg.GetClusterInfo()
6941 "version": constants.IALLOCATOR_VERSION,
6942 "cluster_name": cfg.GetClusterName(),
6943 "cluster_tags": list(cluster_info.GetTags()),
6944 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6945 # we don't have job IDs
6947 iinfo = cfg.GetAllInstancesInfo().values()
6948 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6952 node_list = cfg.GetNodeList()
6954 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6955 hypervisor_name = self.hypervisor
6956 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6957 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6959 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6961 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6962 cluster_info.enabled_hypervisors)
6963 for nname, nresult in node_data.items():
6964 # first fill in static (config-based) values
6965 ninfo = cfg.GetNodeInfo(nname)
6967 "tags": list(ninfo.GetTags()),
6968 "primary_ip": ninfo.primary_ip,
6969 "secondary_ip": ninfo.secondary_ip,
6970 "offline": ninfo.offline,
6971 "drained": ninfo.drained,
6972 "master_candidate": ninfo.master_candidate,
6975 if not ninfo.offline:
6976 nresult.Raise("Can't get data for node %s" % nname)
6977 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
6979 remote_info = nresult.payload
6980 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6981 'vg_size', 'vg_free', 'cpu_total']:
6982 if attr not in remote_info:
6983 raise errors.OpExecError("Node '%s' didn't return attribute"
6984 " '%s'" % (nname, attr))
6985 if not isinstance(remote_info[attr], int):
6986 raise errors.OpExecError("Node '%s' returned invalid value"
6988 (nname, attr, remote_info[attr]))
6989 # compute memory used by primary instances
6990 i_p_mem = i_p_up_mem = 0
6991 for iinfo, beinfo in i_list:
6992 if iinfo.primary_node == nname:
6993 i_p_mem += beinfo[constants.BE_MEMORY]
6994 if iinfo.name not in node_iinfo[nname].payload:
6997 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6998 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6999 remote_info['memory_free'] -= max(0, i_mem_diff)
7002 i_p_up_mem += beinfo[constants.BE_MEMORY]
7004 # compute memory used by instances
7006 "total_memory": remote_info['memory_total'],
7007 "reserved_memory": remote_info['memory_dom0'],
7008 "free_memory": remote_info['memory_free'],
7009 "total_disk": remote_info['vg_size'],
7010 "free_disk": remote_info['vg_free'],
7011 "total_cpus": remote_info['cpu_total'],
7012 "i_pri_memory": i_p_mem,
7013 "i_pri_up_memory": i_p_up_mem,
7017 node_results[nname] = pnr
7018 data["nodes"] = node_results
7022 for iinfo, beinfo in i_list:
7024 for nic in iinfo.nics:
7025 filled_params = objects.FillDict(
7026 cluster_info.nicparams[constants.PP_DEFAULT],
7028 nic_dict = {"mac": nic.mac,
7030 "mode": filled_params[constants.NIC_MODE],
7031 "link": filled_params[constants.NIC_LINK],
7033 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7034 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7035 nic_data.append(nic_dict)
7037 "tags": list(iinfo.GetTags()),
7038 "admin_up": iinfo.admin_up,
7039 "vcpus": beinfo[constants.BE_VCPUS],
7040 "memory": beinfo[constants.BE_MEMORY],
7042 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7044 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7045 "disk_template": iinfo.disk_template,
7046 "hypervisor": iinfo.hypervisor,
7048 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7050 instance_data[iinfo.name] = pir
7052 data["instances"] = instance_data
7056 def _AddNewInstance(self):
7057 """Add new instance data to allocator structure.
7059 This in combination with _AllocatorGetClusterData will create the
7060 correct structure needed as input for the allocator.
7062 The checks for the completeness of the opcode must have already been
7068 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7070 if self.disk_template in constants.DTS_NET_MIRROR:
7071 self.required_nodes = 2
7073 self.required_nodes = 1
7077 "disk_template": self.disk_template,
7080 "vcpus": self.vcpus,
7081 "memory": self.mem_size,
7082 "disks": self.disks,
7083 "disk_space_total": disk_space,
7085 "required_nodes": self.required_nodes,
7087 data["request"] = request
7089 def _AddRelocateInstance(self):
7090 """Add relocate instance data to allocator structure.
7092 This in combination with _IAllocatorGetClusterData will create the
7093 correct structure needed as input for the allocator.
7095 The checks for the completeness of the opcode must have already been
7099 instance = self.lu.cfg.GetInstanceInfo(self.name)
7100 if instance is None:
7101 raise errors.ProgrammerError("Unknown instance '%s' passed to"
7102 " IAllocator" % self.name)
7104 if instance.disk_template not in constants.DTS_NET_MIRROR:
7105 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7107 if len(instance.secondary_nodes) != 1:
7108 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7110 self.required_nodes = 1
7111 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7112 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7117 "disk_space_total": disk_space,
7118 "required_nodes": self.required_nodes,
7119 "relocate_from": self.relocate_from,
7121 self.in_data["request"] = request
7123 def _BuildInputData(self):
7124 """Build input data structures.
7127 self._ComputeClusterData()
7129 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7130 self._AddNewInstance()
7132 self._AddRelocateInstance()
7134 self.in_text = serializer.Dump(self.in_data)
7136 def Run(self, name, validate=True, call_fn=None):
7137 """Run an instance allocator and return the results.
7141 call_fn = self.lu.rpc.call_iallocator_runner
7143 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7144 result.Raise("Failure while running the iallocator script")
7146 self.out_text = result.payload
7148 self._ValidateResult()
7150 def _ValidateResult(self):
7151 """Process the allocator results.
7153 This will process and if successful save the result in
7154 self.out_data and the other parameters.
7158 rdict = serializer.Load(self.out_text)
7159 except Exception, err:
7160 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7162 if not isinstance(rdict, dict):
7163 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7165 for key in "success", "info", "nodes":
7166 if key not in rdict:
7167 raise errors.OpExecError("Can't parse iallocator results:"
7168 " missing key '%s'" % key)
7169 setattr(self, key, rdict[key])
7171 if not isinstance(rdict["nodes"], list):
7172 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7174 self.out_data = rdict
7177 class LUTestAllocator(NoHooksLU):
7178 """Run allocator tests.
7180 This LU runs the allocator tests
7183 _OP_REQP = ["direction", "mode", "name"]
7185 def CheckPrereq(self):
7186 """Check prerequisites.
7188 This checks the opcode parameters depending on the director and mode test.
7191 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7192 for attr in ["name", "mem_size", "disks", "disk_template",
7193 "os", "tags", "nics", "vcpus"]:
7194 if not hasattr(self.op, attr):
7195 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7197 iname = self.cfg.ExpandInstanceName(self.op.name)
7198 if iname is not None:
7199 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7201 if not isinstance(self.op.nics, list):
7202 raise errors.OpPrereqError("Invalid parameter 'nics'")
7203 for row in self.op.nics:
7204 if (not isinstance(row, dict) or
7207 "bridge" not in row):
7208 raise errors.OpPrereqError("Invalid contents of the"
7209 " 'nics' parameter")
7210 if not isinstance(self.op.disks, list):
7211 raise errors.OpPrereqError("Invalid parameter 'disks'")
7212 for row in self.op.disks:
7213 if (not isinstance(row, dict) or
7214 "size" not in row or
7215 not isinstance(row["size"], int) or
7216 "mode" not in row or
7217 row["mode"] not in ['r', 'w']):
7218 raise errors.OpPrereqError("Invalid contents of the"
7219 " 'disks' parameter")
7220 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7221 self.op.hypervisor = self.cfg.GetHypervisorType()
7222 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7223 if not hasattr(self.op, "name"):
7224 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7225 fname = self.cfg.ExpandInstanceName(self.op.name)
7227 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7229 self.op.name = fname
7230 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7232 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7235 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7236 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7237 raise errors.OpPrereqError("Missing allocator name")
7238 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7239 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7242 def Exec(self, feedback_fn):
7243 """Run the allocator test.
7246 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7247 ial = IAllocator(self,
7250 mem_size=self.op.mem_size,
7251 disks=self.op.disks,
7252 disk_template=self.op.disk_template,
7256 vcpus=self.op.vcpus,
7257 hypervisor=self.op.hypervisor,
7260 ial = IAllocator(self,
7263 relocate_from=list(self.relocate_from),
7266 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7267 result = ial.in_text
7269 ial.Run(self.op.allocator, validate=False)
7270 result = ial.out_text