4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 self.dry_run_result = None
95 for attr_name in self._OP_REQP:
96 attr_val = getattr(op, attr_name, None)
98 raise errors.OpPrereqError("Required parameter '%s' missing" %
100 self.CheckArguments()
103 """Returns the SshRunner object
107 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
110 ssh = property(fget=__GetSSH)
112 def CheckArguments(self):
113 """Check syntactic validity for the opcode arguments.
115 This method is for doing a simple syntactic check and ensure
116 validity of opcode parameters, without any cluster-related
117 checks. While the same can be accomplished in ExpandNames and/or
118 CheckPrereq, doing these separate is better because:
120 - ExpandNames is left as as purely a lock-related function
121 - CheckPrereq is run after we have aquired locks (and possible
124 The function is allowed to change the self.op attribute so that
125 later methods can no longer worry about missing parameters.
130 def ExpandNames(self):
131 """Expand names for this LU.
133 This method is called before starting to execute the opcode, and it should
134 update all the parameters of the opcode to their canonical form (e.g. a
135 short node name must be fully expanded after this method has successfully
136 completed). This way locking, hooks, logging, ecc. can work correctly.
138 LUs which implement this method must also populate the self.needed_locks
139 member, as a dict with lock levels as keys, and a list of needed lock names
142 - use an empty dict if you don't need any lock
143 - if you don't need any lock at a particular level omit that level
144 - don't put anything for the BGL level
145 - if you want all locks at a level use locking.ALL_SET as a value
147 If you need to share locks (rather than acquire them exclusively) at one
148 level you can modify self.share_locks, setting a true value (usually 1) for
149 that level. By default locks are not shared.
153 # Acquire all nodes and one instance
154 self.needed_locks = {
155 locking.LEVEL_NODE: locking.ALL_SET,
156 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
158 # Acquire just two nodes
159 self.needed_locks = {
160 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
163 self.needed_locks = {} # No, you can't leave it to the default value None
166 # The implementation of this method is mandatory only if the new LU is
167 # concurrent, so that old LUs don't need to be changed all at the same
170 self.needed_locks = {} # Exclusive LUs don't need locks.
172 raise NotImplementedError
174 def DeclareLocks(self, level):
175 """Declare LU locking needs for a level
177 While most LUs can just declare their locking needs at ExpandNames time,
178 sometimes there's the need to calculate some locks after having acquired
179 the ones before. This function is called just before acquiring locks at a
180 particular level, but after acquiring the ones at lower levels, and permits
181 such calculations. It can be used to modify self.needed_locks, and by
182 default it does nothing.
184 This function is only called if you have something already set in
185 self.needed_locks for the level.
187 @param level: Locking level which is going to be locked
188 @type level: member of ganeti.locking.LEVELS
192 def CheckPrereq(self):
193 """Check prerequisites for this LU.
195 This method should check that the prerequisites for the execution
196 of this LU are fulfilled. It can do internode communication, but
197 it should be idempotent - no cluster or system changes are
200 The method should raise errors.OpPrereqError in case something is
201 not fulfilled. Its return value is ignored.
203 This method should also update all the parameters of the opcode to
204 their canonical form if it hasn't been done by ExpandNames before.
207 raise NotImplementedError
209 def Exec(self, feedback_fn):
212 This method should implement the actual work. It should raise
213 errors.OpExecError for failures that are somewhat dealt with in
217 raise NotImplementedError
219 def BuildHooksEnv(self):
220 """Build hooks environment for this LU.
222 This method should return a three-node tuple consisting of: a dict
223 containing the environment that will be used for running the
224 specific hook for this LU, a list of node names on which the hook
225 should run before the execution, and a list of node names on which
226 the hook should run after the execution.
228 The keys of the dict must not have 'GANETI_' prefixed as this will
229 be handled in the hooks runner. Also note additional keys will be
230 added by the hooks runner. If the LU doesn't define any
231 environment, an empty dict (and not None) should be returned.
233 No nodes should be returned as an empty list (and not None).
235 Note that if the HPATH for a LU class is None, this function will
239 raise NotImplementedError
241 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
242 """Notify the LU about the results of its hooks.
244 This method is called every time a hooks phase is executed, and notifies
245 the Logical Unit about the hooks' result. The LU can then use it to alter
246 its result based on the hooks. By default the method does nothing and the
247 previous result is passed back unchanged but any LU can define it if it
248 wants to use the local cluster hook-scripts somehow.
250 @param phase: one of L{constants.HOOKS_PHASE_POST} or
251 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
252 @param hook_results: the results of the multi-node hooks rpc call
253 @param feedback_fn: function used send feedback back to the caller
254 @param lu_result: the previous Exec result this LU had, or None
256 @return: the new Exec result, based on the previous result
262 def _ExpandAndLockInstance(self):
263 """Helper function to expand and lock an instance.
265 Many LUs that work on an instance take its name in self.op.instance_name
266 and need to expand it and then declare the expanded name for locking. This
267 function does it, and then updates self.op.instance_name to the expanded
268 name. It also initializes needed_locks as a dict, if this hasn't been done
272 if self.needed_locks is None:
273 self.needed_locks = {}
275 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
276 "_ExpandAndLockInstance called with instance-level locks set"
277 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
278 if expanded_name is None:
279 raise errors.OpPrereqError("Instance '%s' not known" %
280 self.op.instance_name)
281 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
282 self.op.instance_name = expanded_name
284 def _LockInstancesNodes(self, primary_only=False):
285 """Helper function to declare instances' nodes for locking.
287 This function should be called after locking one or more instances to lock
288 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
289 with all primary or secondary nodes for instances already locked and
290 present in self.needed_locks[locking.LEVEL_INSTANCE].
292 It should be called from DeclareLocks, and for safety only works if
293 self.recalculate_locks[locking.LEVEL_NODE] is set.
295 In the future it may grow parameters to just lock some instance's nodes, or
296 to just lock primaries or secondary nodes, if needed.
298 If should be called in DeclareLocks in a way similar to::
300 if level == locking.LEVEL_NODE:
301 self._LockInstancesNodes()
303 @type primary_only: boolean
304 @param primary_only: only lock primary nodes of locked instances
307 assert locking.LEVEL_NODE in self.recalculate_locks, \
308 "_LockInstancesNodes helper function called with no nodes to recalculate"
310 # TODO: check if we're really been called with the instance locks held
312 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
313 # future we might want to have different behaviors depending on the value
314 # of self.recalculate_locks[locking.LEVEL_NODE]
316 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
317 instance = self.context.cfg.GetInstanceInfo(instance_name)
318 wanted_nodes.append(instance.primary_node)
320 wanted_nodes.extend(instance.secondary_nodes)
322 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
323 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
324 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
325 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
327 del self.recalculate_locks[locking.LEVEL_NODE]
330 class NoHooksLU(LogicalUnit):
331 """Simple LU which runs no hooks.
333 This LU is intended as a parent for other LogicalUnits which will
334 run no hooks, in order to reduce duplicate code.
341 def _GetWantedNodes(lu, nodes):
342 """Returns list of checked and expanded node names.
344 @type lu: L{LogicalUnit}
345 @param lu: the logical unit on whose behalf we execute
347 @param nodes: list of node names or None for all nodes
349 @return: the list of nodes, sorted
350 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
353 if not isinstance(nodes, list):
354 raise errors.OpPrereqError("Invalid argument type 'nodes'")
357 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
358 " non-empty list of nodes whose name is to be expanded.")
362 node = lu.cfg.ExpandNodeName(name)
364 raise errors.OpPrereqError("No such node name '%s'" % name)
367 return utils.NiceSort(wanted)
370 def _GetWantedInstances(lu, instances):
371 """Returns list of checked and expanded instance names.
373 @type lu: L{LogicalUnit}
374 @param lu: the logical unit on whose behalf we execute
375 @type instances: list
376 @param instances: list of instance names or None for all instances
378 @return: the list of instances, sorted
379 @raise errors.OpPrereqError: if the instances parameter is wrong type
380 @raise errors.OpPrereqError: if any of the passed instances is not found
383 if not isinstance(instances, list):
384 raise errors.OpPrereqError("Invalid argument type 'instances'")
389 for name in instances:
390 instance = lu.cfg.ExpandInstanceName(name)
392 raise errors.OpPrereqError("No such instance name '%s'" % name)
393 wanted.append(instance)
396 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
400 def _CheckOutputFields(static, dynamic, selected):
401 """Checks whether all selected fields are valid.
403 @type static: L{utils.FieldSet}
404 @param static: static fields set
405 @type dynamic: L{utils.FieldSet}
406 @param dynamic: dynamic fields set
413 delta = f.NonMatching(selected)
415 raise errors.OpPrereqError("Unknown output fields selected: %s"
419 def _CheckBooleanOpField(op, name):
420 """Validates boolean opcode parameters.
422 This will ensure that an opcode parameter is either a boolean value,
423 or None (but that it always exists).
426 val = getattr(op, name, None)
427 if not (val is None or isinstance(val, bool)):
428 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
430 setattr(op, name, val)
433 def _CheckNodeOnline(lu, node):
434 """Ensure that a given node is online.
436 @param lu: the LU on behalf of which we make the check
437 @param node: the node to check
438 @raise errors.OpPrereqError: if the node is offline
441 if lu.cfg.GetNodeInfo(node).offline:
442 raise errors.OpPrereqError("Can't use offline node %s" % node)
445 def _CheckNodeNotDrained(lu, node):
446 """Ensure that a given node is not drained.
448 @param lu: the LU on behalf of which we make the check
449 @param node: the node to check
450 @raise errors.OpPrereqError: if the node is drained
453 if lu.cfg.GetNodeInfo(node).drained:
454 raise errors.OpPrereqError("Can't use drained node %s" % node)
457 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
458 memory, vcpus, nics, disk_template, disks,
459 bep, hvp, hypervisor):
460 """Builds instance related env variables for hooks
462 This builds the hook environment from individual variables.
465 @param name: the name of the instance
466 @type primary_node: string
467 @param primary_node: the name of the instance's primary node
468 @type secondary_nodes: list
469 @param secondary_nodes: list of secondary nodes as strings
470 @type os_type: string
471 @param os_type: the name of the instance's OS
472 @type status: boolean
473 @param status: the should_run status of the instance
475 @param memory: the memory size of the instance
477 @param vcpus: the count of VCPUs the instance has
479 @param nics: list of tuples (ip, mac, mode, link) representing
480 the NICs the instance has
481 @type disk_template: string
482 @param disk_template: the distk template of the instance
484 @param disks: the list of (size, mode) pairs
486 @param bep: the backend parameters for the instance
488 @param hvp: the hypervisor parameters for the instance
489 @type hypervisor: string
490 @param hypervisor: the hypervisor for the instance
492 @return: the hook environment for this instance
501 "INSTANCE_NAME": name,
502 "INSTANCE_PRIMARY": primary_node,
503 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
504 "INSTANCE_OS_TYPE": os_type,
505 "INSTANCE_STATUS": str_status,
506 "INSTANCE_MEMORY": memory,
507 "INSTANCE_VCPUS": vcpus,
508 "INSTANCE_DISK_TEMPLATE": disk_template,
509 "INSTANCE_HYPERVISOR": hypervisor,
513 nic_count = len(nics)
514 for idx, (ip, mac, mode, link) in enumerate(nics):
517 env["INSTANCE_NIC%d_IP" % idx] = ip
518 env["INSTANCE_NIC%d_MAC" % idx] = mac
519 env["INSTANCE_NIC%d_MODE" % idx] = mode
520 env["INSTANCE_NIC%d_LINK" % idx] = link
521 if mode == constants.NIC_MODE_BRIDGED:
522 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
526 env["INSTANCE_NIC_COUNT"] = nic_count
529 disk_count = len(disks)
530 for idx, (size, mode) in enumerate(disks):
531 env["INSTANCE_DISK%d_SIZE" % idx] = size
532 env["INSTANCE_DISK%d_MODE" % idx] = mode
536 env["INSTANCE_DISK_COUNT"] = disk_count
538 for source, kind in [(bep, "BE"), (hvp, "HV")]:
539 for key, value in source.items():
540 env["INSTANCE_%s_%s" % (kind, key)] = value
544 def _NICListToTuple(lu, nics):
545 """Build a list of nic information tuples.
547 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
548 value in LUQueryInstanceData.
550 @type lu: L{LogicalUnit}
551 @param lu: the logical unit on whose behalf we execute
552 @type nics: list of L{objects.NIC}
553 @param nics: list of nics to convert to hooks tuples
557 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
561 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
562 mode = filled_params[constants.NIC_MODE]
563 link = filled_params[constants.NIC_LINK]
564 hooks_nics.append((ip, mac, mode, link))
567 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
568 """Builds instance related env variables for hooks from an object.
570 @type lu: L{LogicalUnit}
571 @param lu: the logical unit on whose behalf we execute
572 @type instance: L{objects.Instance}
573 @param instance: the instance for which we should build the
576 @param override: dictionary with key/values that will override
579 @return: the hook environment dictionary
582 cluster = lu.cfg.GetClusterInfo()
583 bep = cluster.FillBE(instance)
584 hvp = cluster.FillHV(instance)
586 'name': instance.name,
587 'primary_node': instance.primary_node,
588 'secondary_nodes': instance.secondary_nodes,
589 'os_type': instance.os,
590 'status': instance.admin_up,
591 'memory': bep[constants.BE_MEMORY],
592 'vcpus': bep[constants.BE_VCPUS],
593 'nics': _NICListToTuple(lu, instance.nics),
594 'disk_template': instance.disk_template,
595 'disks': [(disk.size, disk.mode) for disk in instance.disks],
598 'hypervisor': instance.hypervisor,
601 args.update(override)
602 return _BuildInstanceHookEnv(**args)
605 def _AdjustCandidatePool(lu):
606 """Adjust the candidate pool after node operations.
609 mod_list = lu.cfg.MaintainCandidatePool()
611 lu.LogInfo("Promoted nodes to master candidate role: %s",
612 ", ".join(node.name for node in mod_list))
613 for name in mod_list:
614 lu.context.ReaddNode(name)
615 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
617 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
621 def _CheckNicsBridgesExist(lu, target_nics, target_node,
622 profile=constants.PP_DEFAULT):
623 """Check that the brigdes needed by a list of nics exist.
626 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
627 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
628 for nic in target_nics]
629 brlist = [params[constants.NIC_LINK] for params in paramslist
630 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
632 result = lu.rpc.call_bridges_exist(target_node, brlist)
633 result.Raise("Error checking bridges on destination node '%s'" %
634 target_node, prereq=True)
637 def _CheckInstanceBridgesExist(lu, instance, node=None):
638 """Check that the brigdes needed by an instance exist.
642 node = instance.primary_node
643 _CheckNicsBridgesExist(lu, instance.nics, node)
646 class LUDestroyCluster(NoHooksLU):
647 """Logical unit for destroying the cluster.
652 def CheckPrereq(self):
653 """Check prerequisites.
655 This checks whether the cluster is empty.
657 Any errors are signalled by raising errors.OpPrereqError.
660 master = self.cfg.GetMasterNode()
662 nodelist = self.cfg.GetNodeList()
663 if len(nodelist) != 1 or nodelist[0] != master:
664 raise errors.OpPrereqError("There are still %d node(s) in"
665 " this cluster." % (len(nodelist) - 1))
666 instancelist = self.cfg.GetInstanceList()
668 raise errors.OpPrereqError("There are still %d instance(s) in"
669 " this cluster." % len(instancelist))
671 def Exec(self, feedback_fn):
672 """Destroys the cluster.
675 master = self.cfg.GetMasterNode()
676 result = self.rpc.call_node_stop_master(master, False)
677 result.Raise("Could not disable the master role")
678 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
679 utils.CreateBackup(priv_key)
680 utils.CreateBackup(pub_key)
684 class LUVerifyCluster(LogicalUnit):
685 """Verifies the cluster status.
688 HPATH = "cluster-verify"
689 HTYPE = constants.HTYPE_CLUSTER
690 _OP_REQP = ["skip_checks"]
693 def ExpandNames(self):
694 self.needed_locks = {
695 locking.LEVEL_NODE: locking.ALL_SET,
696 locking.LEVEL_INSTANCE: locking.ALL_SET,
698 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
700 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
701 node_result, feedback_fn, master_files,
703 """Run multiple tests against a node.
707 - compares ganeti version
708 - checks vg existance and size > 20G
709 - checks config file checksum
710 - checks ssh to other nodes
712 @type nodeinfo: L{objects.Node}
713 @param nodeinfo: the node to check
714 @param file_list: required list of files
715 @param local_cksum: dictionary of local files and their checksums
716 @param node_result: the results from the node
717 @param feedback_fn: function used to accumulate results
718 @param master_files: list of files that only masters should have
719 @param drbd_map: the useddrbd minors for this node, in
720 form of minor: (instance, must_exist) which correspond to instances
721 and their running status
722 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
727 # main result, node_result should be a non-empty dict
728 if not node_result or not isinstance(node_result, dict):
729 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
732 # compares ganeti version
733 local_version = constants.PROTOCOL_VERSION
734 remote_version = node_result.get('version', None)
735 if not (remote_version and isinstance(remote_version, (list, tuple)) and
736 len(remote_version) == 2):
737 feedback_fn(" - ERROR: connection to %s failed" % (node))
740 if local_version != remote_version[0]:
741 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
742 " node %s %s" % (local_version, node, remote_version[0]))
745 # node seems compatible, we can actually try to look into its results
749 # full package version
750 if constants.RELEASE_VERSION != remote_version[1]:
751 feedback_fn(" - WARNING: software version mismatch: master %s,"
753 (constants.RELEASE_VERSION, node, remote_version[1]))
755 # checks vg existence and size > 20G
756 if vg_name is not None:
757 vglist = node_result.get(constants.NV_VGLIST, None)
759 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
763 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
764 constants.MIN_VG_SIZE)
766 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
769 # checks config file checksum
771 remote_cksum = node_result.get(constants.NV_FILELIST, None)
772 if not isinstance(remote_cksum, dict):
774 feedback_fn(" - ERROR: node hasn't returned file checksum data")
776 for file_name in file_list:
777 node_is_mc = nodeinfo.master_candidate
778 must_have_file = file_name not in master_files
779 if file_name not in remote_cksum:
780 if node_is_mc or must_have_file:
782 feedback_fn(" - ERROR: file '%s' missing" % file_name)
783 elif remote_cksum[file_name] != local_cksum[file_name]:
784 if node_is_mc or must_have_file:
786 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
788 # not candidate and this is not a must-have file
790 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
793 # all good, except non-master/non-must have combination
794 if not node_is_mc and not must_have_file:
795 feedback_fn(" - ERROR: file '%s' should not exist on non master"
796 " candidates" % file_name)
800 if constants.NV_NODELIST not in node_result:
802 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
804 if node_result[constants.NV_NODELIST]:
806 for node in node_result[constants.NV_NODELIST]:
807 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
808 (node, node_result[constants.NV_NODELIST][node]))
810 if constants.NV_NODENETTEST not in node_result:
812 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
814 if node_result[constants.NV_NODENETTEST]:
816 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
818 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
819 (node, node_result[constants.NV_NODENETTEST][node]))
821 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
822 if isinstance(hyp_result, dict):
823 for hv_name, hv_result in hyp_result.iteritems():
824 if hv_result is not None:
825 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
826 (hv_name, hv_result))
828 # check used drbd list
829 if vg_name is not None:
830 used_minors = node_result.get(constants.NV_DRBDLIST, [])
831 if not isinstance(used_minors, (tuple, list)):
832 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
835 for minor, (iname, must_exist) in drbd_map.items():
836 if minor not in used_minors and must_exist:
837 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
838 " not active" % (minor, iname))
840 for minor in used_minors:
841 if minor not in drbd_map:
842 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
848 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
849 node_instance, feedback_fn, n_offline):
850 """Verify an instance.
852 This function checks to see if the required block devices are
853 available on the instance's node.
858 node_current = instanceconfig.primary_node
861 instanceconfig.MapLVsByNode(node_vol_should)
863 for node in node_vol_should:
864 if node in n_offline:
865 # ignore missing volumes on offline nodes
867 for volume in node_vol_should[node]:
868 if node not in node_vol_is or volume not in node_vol_is[node]:
869 feedback_fn(" - ERROR: volume %s missing on node %s" %
873 if instanceconfig.admin_up:
874 if ((node_current not in node_instance or
875 not instance in node_instance[node_current]) and
876 node_current not in n_offline):
877 feedback_fn(" - ERROR: instance %s not running on node %s" %
878 (instance, node_current))
881 for node in node_instance:
882 if (not node == node_current):
883 if instance in node_instance[node]:
884 feedback_fn(" - ERROR: instance %s should not run on node %s" %
890 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
891 """Verify if there are any unknown volumes in the cluster.
893 The .os, .swap and backup volumes are ignored. All other volumes are
899 for node in node_vol_is:
900 for volume in node_vol_is[node]:
901 if node not in node_vol_should or volume not in node_vol_should[node]:
902 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
907 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
908 """Verify the list of running instances.
910 This checks what instances are running but unknown to the cluster.
914 for node in node_instance:
915 for runninginstance in node_instance[node]:
916 if runninginstance not in instancelist:
917 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
918 (runninginstance, node))
922 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
923 """Verify N+1 Memory Resilience.
925 Check that if one single node dies we can still start all the instances it
931 for node, nodeinfo in node_info.iteritems():
932 # This code checks that every node which is now listed as secondary has
933 # enough memory to host all instances it is supposed to should a single
934 # other node in the cluster fail.
935 # FIXME: not ready for failover to an arbitrary node
936 # FIXME: does not support file-backed instances
937 # WARNING: we currently take into account down instances as well as up
938 # ones, considering that even if they're down someone might want to start
939 # them even in the event of a node failure.
940 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
942 for instance in instances:
943 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
944 if bep[constants.BE_AUTO_BALANCE]:
945 needed_mem += bep[constants.BE_MEMORY]
946 if nodeinfo['mfree'] < needed_mem:
947 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
948 " failovers should node %s fail" % (node, prinode))
952 def CheckPrereq(self):
953 """Check prerequisites.
955 Transform the list of checks we're going to skip into a set and check that
956 all its members are valid.
959 self.skip_set = frozenset(self.op.skip_checks)
960 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
961 raise errors.OpPrereqError("Invalid checks to be skipped specified")
963 def BuildHooksEnv(self):
966 Cluster-Verify hooks just rone in the post phase and their failure makes
967 the output be logged in the verify output and the verification to fail.
970 all_nodes = self.cfg.GetNodeList()
972 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
974 for node in self.cfg.GetAllNodesInfo().values():
975 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
977 return env, [], all_nodes
979 def Exec(self, feedback_fn):
980 """Verify integrity of cluster, performing various test on nodes.
984 feedback_fn("* Verifying global settings")
985 for msg in self.cfg.VerifyConfig():
986 feedback_fn(" - ERROR: %s" % msg)
988 vg_name = self.cfg.GetVGName()
989 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
990 nodelist = utils.NiceSort(self.cfg.GetNodeList())
991 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
992 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
993 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
994 for iname in instancelist)
995 i_non_redundant = [] # Non redundant instances
996 i_non_a_balanced = [] # Non auto-balanced instances
997 n_offline = [] # List of offline nodes
998 n_drained = [] # List of nodes being drained
1004 # FIXME: verify OS list
1005 # do local checksums
1006 master_files = [constants.CLUSTER_CONF_FILE]
1008 file_names = ssconf.SimpleStore().GetFileList()
1009 file_names.append(constants.SSL_CERT_FILE)
1010 file_names.append(constants.RAPI_CERT_FILE)
1011 file_names.extend(master_files)
1013 local_checksums = utils.FingerprintFiles(file_names)
1015 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1016 node_verify_param = {
1017 constants.NV_FILELIST: file_names,
1018 constants.NV_NODELIST: [node.name for node in nodeinfo
1019 if not node.offline],
1020 constants.NV_HYPERVISOR: hypervisors,
1021 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1022 node.secondary_ip) for node in nodeinfo
1023 if not node.offline],
1024 constants.NV_INSTANCELIST: hypervisors,
1025 constants.NV_VERSION: None,
1026 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1028 if vg_name is not None:
1029 node_verify_param[constants.NV_VGLIST] = None
1030 node_verify_param[constants.NV_LVLIST] = vg_name
1031 node_verify_param[constants.NV_DRBDLIST] = None
1032 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1033 self.cfg.GetClusterName())
1035 cluster = self.cfg.GetClusterInfo()
1036 master_node = self.cfg.GetMasterNode()
1037 all_drbd_map = self.cfg.ComputeDRBDMap()
1039 for node_i in nodeinfo:
1043 feedback_fn("* Skipping offline node %s" % (node,))
1044 n_offline.append(node)
1047 if node == master_node:
1049 elif node_i.master_candidate:
1050 ntype = "master candidate"
1051 elif node_i.drained:
1053 n_drained.append(node)
1056 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1058 msg = all_nvinfo[node].fail_msg
1060 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1064 nresult = all_nvinfo[node].payload
1066 for minor, instance in all_drbd_map[node].items():
1067 if instance not in instanceinfo:
1068 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1070 # ghost instance should not be running, but otherwise we
1071 # don't give double warnings (both ghost instance and
1072 # unallocated minor in use)
1073 node_drbd[minor] = (instance, False)
1075 instance = instanceinfo[instance]
1076 node_drbd[minor] = (instance.name, instance.admin_up)
1077 result = self._VerifyNode(node_i, file_names, local_checksums,
1078 nresult, feedback_fn, master_files,
1082 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1084 node_volume[node] = {}
1085 elif isinstance(lvdata, basestring):
1086 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1087 (node, utils.SafeEncode(lvdata)))
1089 node_volume[node] = {}
1090 elif not isinstance(lvdata, dict):
1091 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1095 node_volume[node] = lvdata
1098 idata = nresult.get(constants.NV_INSTANCELIST, None)
1099 if not isinstance(idata, list):
1100 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1105 node_instance[node] = idata
1108 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1109 if not isinstance(nodeinfo, dict):
1110 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1116 "mfree": int(nodeinfo['memory_free']),
1119 # dictionary holding all instances this node is secondary for,
1120 # grouped by their primary node. Each key is a cluster node, and each
1121 # value is a list of instances which have the key as primary and the
1122 # current node as secondary. this is handy to calculate N+1 memory
1123 # availability if you can only failover from a primary to its
1125 "sinst-by-pnode": {},
1127 # FIXME: devise a free space model for file based instances as well
1128 if vg_name is not None:
1129 if (constants.NV_VGLIST not in nresult or
1130 vg_name not in nresult[constants.NV_VGLIST]):
1131 feedback_fn(" - ERROR: node %s didn't return data for the"
1132 " volume group '%s' - it is either missing or broken" %
1136 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1137 except (ValueError, KeyError):
1138 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1139 " from node %s" % (node,))
1143 node_vol_should = {}
1145 for instance in instancelist:
1146 feedback_fn("* Verifying instance %s" % instance)
1147 inst_config = instanceinfo[instance]
1148 result = self._VerifyInstance(instance, inst_config, node_volume,
1149 node_instance, feedback_fn, n_offline)
1151 inst_nodes_offline = []
1153 inst_config.MapLVsByNode(node_vol_should)
1155 instance_cfg[instance] = inst_config
1157 pnode = inst_config.primary_node
1158 if pnode in node_info:
1159 node_info[pnode]['pinst'].append(instance)
1160 elif pnode not in n_offline:
1161 feedback_fn(" - ERROR: instance %s, connection to primary node"
1162 " %s failed" % (instance, pnode))
1165 if pnode in n_offline:
1166 inst_nodes_offline.append(pnode)
1168 # If the instance is non-redundant we cannot survive losing its primary
1169 # node, so we are not N+1 compliant. On the other hand we have no disk
1170 # templates with more than one secondary so that situation is not well
1172 # FIXME: does not support file-backed instances
1173 if len(inst_config.secondary_nodes) == 0:
1174 i_non_redundant.append(instance)
1175 elif len(inst_config.secondary_nodes) > 1:
1176 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1179 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1180 i_non_a_balanced.append(instance)
1182 for snode in inst_config.secondary_nodes:
1183 if snode in node_info:
1184 node_info[snode]['sinst'].append(instance)
1185 if pnode not in node_info[snode]['sinst-by-pnode']:
1186 node_info[snode]['sinst-by-pnode'][pnode] = []
1187 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1188 elif snode not in n_offline:
1189 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1190 " %s failed" % (instance, snode))
1192 if snode in n_offline:
1193 inst_nodes_offline.append(snode)
1195 if inst_nodes_offline:
1196 # warn that the instance lives on offline nodes, and set bad=True
1197 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1198 ", ".join(inst_nodes_offline))
1201 feedback_fn("* Verifying orphan volumes")
1202 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1206 feedback_fn("* Verifying remaining instances")
1207 result = self._VerifyOrphanInstances(instancelist, node_instance,
1211 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1212 feedback_fn("* Verifying N+1 Memory redundancy")
1213 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1216 feedback_fn("* Other Notes")
1218 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1219 % len(i_non_redundant))
1221 if i_non_a_balanced:
1222 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1223 % len(i_non_a_balanced))
1226 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1229 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1233 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1234 """Analize the post-hooks' result
1236 This method analyses the hook result, handles it, and sends some
1237 nicely-formatted feedback back to the user.
1239 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1240 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1241 @param hooks_results: the results of the multi-node hooks rpc call
1242 @param feedback_fn: function used send feedback back to the caller
1243 @param lu_result: previous Exec result
1244 @return: the new Exec result, based on the previous result
1248 # We only really run POST phase hooks, and are only interested in
1250 if phase == constants.HOOKS_PHASE_POST:
1251 # Used to change hooks' output to proper indentation
1252 indent_re = re.compile('^', re.M)
1253 feedback_fn("* Hooks Results")
1254 if not hooks_results:
1255 feedback_fn(" - ERROR: general communication failure")
1258 for node_name in hooks_results:
1259 show_node_header = True
1260 res = hooks_results[node_name]
1264 # no need to warn or set fail return value
1266 feedback_fn(" Communication failure in hooks execution: %s" %
1270 for script, hkr, output in res.payload:
1271 if hkr == constants.HKR_FAIL:
1272 # The node header is only shown once, if there are
1273 # failing hooks on that node
1274 if show_node_header:
1275 feedback_fn(" Node %s:" % node_name)
1276 show_node_header = False
1277 feedback_fn(" ERROR: Script %s failed, output:" % script)
1278 output = indent_re.sub(' ', output)
1279 feedback_fn("%s" % output)
1285 class LUVerifyDisks(NoHooksLU):
1286 """Verifies the cluster disks status.
1292 def ExpandNames(self):
1293 self.needed_locks = {
1294 locking.LEVEL_NODE: locking.ALL_SET,
1295 locking.LEVEL_INSTANCE: locking.ALL_SET,
1297 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1299 def CheckPrereq(self):
1300 """Check prerequisites.
1302 This has no prerequisites.
1307 def Exec(self, feedback_fn):
1308 """Verify integrity of cluster disks.
1310 @rtype: tuple of three items
1311 @return: a tuple of (dict of node-to-node_error, list of instances
1312 which need activate-disks, dict of instance: (node, volume) for
1316 result = res_nodes, res_instances, res_missing = {}, [], {}
1318 vg_name = self.cfg.GetVGName()
1319 nodes = utils.NiceSort(self.cfg.GetNodeList())
1320 instances = [self.cfg.GetInstanceInfo(name)
1321 for name in self.cfg.GetInstanceList()]
1324 for inst in instances:
1326 if (not inst.admin_up or
1327 inst.disk_template not in constants.DTS_NET_MIRROR):
1329 inst.MapLVsByNode(inst_lvs)
1330 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1331 for node, vol_list in inst_lvs.iteritems():
1332 for vol in vol_list:
1333 nv_dict[(node, vol)] = inst
1338 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1343 node_res = node_lvs[node]
1344 if node_res.offline:
1346 msg = node_res.fail_msg
1348 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1349 res_nodes[node] = msg
1352 lvs = node_res.payload
1353 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1354 inst = nv_dict.pop((node, lv_name), None)
1355 if (not lv_online and inst is not None
1356 and inst.name not in res_instances):
1357 res_instances.append(inst.name)
1359 # any leftover items in nv_dict are missing LVs, let's arrange the
1361 for key, inst in nv_dict.iteritems():
1362 if inst.name not in res_missing:
1363 res_missing[inst.name] = []
1364 res_missing[inst.name].append(key)
1369 class LURenameCluster(LogicalUnit):
1370 """Rename the cluster.
1373 HPATH = "cluster-rename"
1374 HTYPE = constants.HTYPE_CLUSTER
1377 def BuildHooksEnv(self):
1382 "OP_TARGET": self.cfg.GetClusterName(),
1383 "NEW_NAME": self.op.name,
1385 mn = self.cfg.GetMasterNode()
1386 return env, [mn], [mn]
1388 def CheckPrereq(self):
1389 """Verify that the passed name is a valid one.
1392 hostname = utils.HostInfo(self.op.name)
1394 new_name = hostname.name
1395 self.ip = new_ip = hostname.ip
1396 old_name = self.cfg.GetClusterName()
1397 old_ip = self.cfg.GetMasterIP()
1398 if new_name == old_name and new_ip == old_ip:
1399 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1400 " cluster has changed")
1401 if new_ip != old_ip:
1402 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1403 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1404 " reachable on the network. Aborting." %
1407 self.op.name = new_name
1409 def Exec(self, feedback_fn):
1410 """Rename the cluster.
1413 clustername = self.op.name
1416 # shutdown the master IP
1417 master = self.cfg.GetMasterNode()
1418 result = self.rpc.call_node_stop_master(master, False)
1419 result.Raise("Could not disable the master role")
1422 cluster = self.cfg.GetClusterInfo()
1423 cluster.cluster_name = clustername
1424 cluster.master_ip = ip
1425 self.cfg.Update(cluster)
1427 # update the known hosts file
1428 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1429 node_list = self.cfg.GetNodeList()
1431 node_list.remove(master)
1434 result = self.rpc.call_upload_file(node_list,
1435 constants.SSH_KNOWN_HOSTS_FILE)
1436 for to_node, to_result in result.iteritems():
1437 msg = to_result.fail_msg
1439 msg = ("Copy of file %s to node %s failed: %s" %
1440 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1441 self.proc.LogWarning(msg)
1444 result = self.rpc.call_node_start_master(master, False)
1445 msg = result.fail_msg
1447 self.LogWarning("Could not re-enable the master role on"
1448 " the master, please restart manually: %s", msg)
1451 def _RecursiveCheckIfLVMBased(disk):
1452 """Check if the given disk or its children are lvm-based.
1454 @type disk: L{objects.Disk}
1455 @param disk: the disk to check
1457 @return: boolean indicating whether a LD_LV dev_type was found or not
1461 for chdisk in disk.children:
1462 if _RecursiveCheckIfLVMBased(chdisk):
1464 return disk.dev_type == constants.LD_LV
1467 class LUSetClusterParams(LogicalUnit):
1468 """Change the parameters of the cluster.
1471 HPATH = "cluster-modify"
1472 HTYPE = constants.HTYPE_CLUSTER
1476 def CheckArguments(self):
1480 if not hasattr(self.op, "candidate_pool_size"):
1481 self.op.candidate_pool_size = None
1482 if self.op.candidate_pool_size is not None:
1484 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1485 except (ValueError, TypeError), err:
1486 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1488 if self.op.candidate_pool_size < 1:
1489 raise errors.OpPrereqError("At least one master candidate needed")
1491 def ExpandNames(self):
1492 # FIXME: in the future maybe other cluster params won't require checking on
1493 # all nodes to be modified.
1494 self.needed_locks = {
1495 locking.LEVEL_NODE: locking.ALL_SET,
1497 self.share_locks[locking.LEVEL_NODE] = 1
1499 def BuildHooksEnv(self):
1504 "OP_TARGET": self.cfg.GetClusterName(),
1505 "NEW_VG_NAME": self.op.vg_name,
1507 mn = self.cfg.GetMasterNode()
1508 return env, [mn], [mn]
1510 def CheckPrereq(self):
1511 """Check prerequisites.
1513 This checks whether the given params don't conflict and
1514 if the given volume group is valid.
1517 if self.op.vg_name is not None and not self.op.vg_name:
1518 instances = self.cfg.GetAllInstancesInfo().values()
1519 for inst in instances:
1520 for disk in inst.disks:
1521 if _RecursiveCheckIfLVMBased(disk):
1522 raise errors.OpPrereqError("Cannot disable lvm storage while"
1523 " lvm-based instances exist")
1525 node_list = self.acquired_locks[locking.LEVEL_NODE]
1527 # if vg_name not None, checks given volume group on all nodes
1529 vglist = self.rpc.call_vg_list(node_list)
1530 for node in node_list:
1531 msg = vglist[node].fail_msg
1533 # ignoring down node
1534 self.LogWarning("Error while gathering data on node %s"
1535 " (ignoring node): %s", node, msg)
1537 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1539 constants.MIN_VG_SIZE)
1541 raise errors.OpPrereqError("Error on node '%s': %s" %
1544 self.cluster = cluster = self.cfg.GetClusterInfo()
1545 # validate params changes
1546 if self.op.beparams:
1547 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1548 self.new_beparams = objects.FillDict(
1549 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1551 if self.op.nicparams:
1552 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1553 self.new_nicparams = objects.FillDict(
1554 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1555 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1557 # hypervisor list/parameters
1558 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1559 if self.op.hvparams:
1560 if not isinstance(self.op.hvparams, dict):
1561 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1562 for hv_name, hv_dict in self.op.hvparams.items():
1563 if hv_name not in self.new_hvparams:
1564 self.new_hvparams[hv_name] = hv_dict
1566 self.new_hvparams[hv_name].update(hv_dict)
1568 if self.op.enabled_hypervisors is not None:
1569 self.hv_list = self.op.enabled_hypervisors
1571 self.hv_list = cluster.enabled_hypervisors
1573 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1574 # either the enabled list has changed, or the parameters have, validate
1575 for hv_name, hv_params in self.new_hvparams.items():
1576 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1577 (self.op.enabled_hypervisors and
1578 hv_name in self.op.enabled_hypervisors)):
1579 # either this is a new hypervisor, or its parameters have changed
1580 hv_class = hypervisor.GetHypervisor(hv_name)
1581 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1582 hv_class.CheckParameterSyntax(hv_params)
1583 _CheckHVParams(self, node_list, hv_name, hv_params)
1585 def Exec(self, feedback_fn):
1586 """Change the parameters of the cluster.
1589 if self.op.vg_name is not None:
1590 new_volume = self.op.vg_name
1593 if new_volume != self.cfg.GetVGName():
1594 self.cfg.SetVGName(new_volume)
1596 feedback_fn("Cluster LVM configuration already in desired"
1597 " state, not changing")
1598 if self.op.hvparams:
1599 self.cluster.hvparams = self.new_hvparams
1600 if self.op.enabled_hypervisors is not None:
1601 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1602 if self.op.beparams:
1603 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1604 if self.op.nicparams:
1605 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1607 if self.op.candidate_pool_size is not None:
1608 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1610 self.cfg.Update(self.cluster)
1612 # we want to update nodes after the cluster so that if any errors
1613 # happen, we have recorded and saved the cluster info
1614 if self.op.candidate_pool_size is not None:
1615 _AdjustCandidatePool(self)
1618 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1619 """Distribute additional files which are part of the cluster configuration.
1621 ConfigWriter takes care of distributing the config and ssconf files, but
1622 there are more files which should be distributed to all nodes. This function
1623 makes sure those are copied.
1625 @param lu: calling logical unit
1626 @param additional_nodes: list of nodes not in the config to distribute to
1629 # 1. Gather target nodes
1630 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1631 dist_nodes = lu.cfg.GetNodeList()
1632 if additional_nodes is not None:
1633 dist_nodes.extend(additional_nodes)
1634 if myself.name in dist_nodes:
1635 dist_nodes.remove(myself.name)
1636 # 2. Gather files to distribute
1637 dist_files = set([constants.ETC_HOSTS,
1638 constants.SSH_KNOWN_HOSTS_FILE,
1639 constants.RAPI_CERT_FILE,
1640 constants.RAPI_USERS_FILE,
1643 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1644 for hv_name in enabled_hypervisors:
1645 hv_class = hypervisor.GetHypervisor(hv_name)
1646 dist_files.update(hv_class.GetAncillaryFiles())
1648 # 3. Perform the files upload
1649 for fname in dist_files:
1650 if os.path.exists(fname):
1651 result = lu.rpc.call_upload_file(dist_nodes, fname)
1652 for to_node, to_result in result.items():
1653 msg = to_result.fail_msg
1655 msg = ("Copy of file %s to node %s failed: %s" %
1656 (fname, to_node, msg))
1657 lu.proc.LogWarning(msg)
1660 class LURedistributeConfig(NoHooksLU):
1661 """Force the redistribution of cluster configuration.
1663 This is a very simple LU.
1669 def ExpandNames(self):
1670 self.needed_locks = {
1671 locking.LEVEL_NODE: locking.ALL_SET,
1673 self.share_locks[locking.LEVEL_NODE] = 1
1675 def CheckPrereq(self):
1676 """Check prerequisites.
1680 def Exec(self, feedback_fn):
1681 """Redistribute the configuration.
1684 self.cfg.Update(self.cfg.GetClusterInfo())
1685 _RedistributeAncillaryFiles(self)
1688 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1689 """Sleep and poll for an instance's disk to sync.
1692 if not instance.disks:
1696 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1698 node = instance.primary_node
1700 for dev in instance.disks:
1701 lu.cfg.SetDiskID(dev, node)
1704 degr_retries = 10 # in seconds, as we sleep 1 second each time
1708 cumul_degraded = False
1709 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1710 msg = rstats.fail_msg
1712 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1715 raise errors.RemoteError("Can't contact node %s for mirror data,"
1716 " aborting." % node)
1719 rstats = rstats.payload
1721 for i, mstat in enumerate(rstats):
1723 lu.LogWarning("Can't compute data for node %s/%s",
1724 node, instance.disks[i].iv_name)
1726 # we ignore the ldisk parameter
1727 perc_done, est_time, is_degraded, _ = mstat
1728 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1729 if perc_done is not None:
1731 if est_time is not None:
1732 rem_time = "%d estimated seconds remaining" % est_time
1735 rem_time = "no time estimate"
1736 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1737 (instance.disks[i].iv_name, perc_done, rem_time))
1739 # if we're done but degraded, let's do a few small retries, to
1740 # make sure we see a stable and not transient situation; therefore
1741 # we force restart of the loop
1742 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1743 logging.info("Degraded disks found, %d retries left", degr_retries)
1751 time.sleep(min(60, max_time))
1754 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1755 return not cumul_degraded
1758 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1759 """Check that mirrors are not degraded.
1761 The ldisk parameter, if True, will change the test from the
1762 is_degraded attribute (which represents overall non-ok status for
1763 the device(s)) to the ldisk (representing the local storage status).
1766 lu.cfg.SetDiskID(dev, node)
1773 if on_primary or dev.AssembleOnSecondary():
1774 rstats = lu.rpc.call_blockdev_find(node, dev)
1775 msg = rstats.fail_msg
1777 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1779 elif not rstats.payload:
1780 lu.LogWarning("Can't find disk on node %s", node)
1783 result = result and (not rstats.payload[idx])
1785 for child in dev.children:
1786 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1791 class LUDiagnoseOS(NoHooksLU):
1792 """Logical unit for OS diagnose/query.
1795 _OP_REQP = ["output_fields", "names"]
1797 _FIELDS_STATIC = utils.FieldSet()
1798 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1800 def ExpandNames(self):
1802 raise errors.OpPrereqError("Selective OS query not supported")
1804 _CheckOutputFields(static=self._FIELDS_STATIC,
1805 dynamic=self._FIELDS_DYNAMIC,
1806 selected=self.op.output_fields)
1808 # Lock all nodes, in shared mode
1809 # Temporary removal of locks, should be reverted later
1810 # TODO: reintroduce locks when they are lighter-weight
1811 self.needed_locks = {}
1812 #self.share_locks[locking.LEVEL_NODE] = 1
1813 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1815 def CheckPrereq(self):
1816 """Check prerequisites.
1821 def _DiagnoseByOS(node_list, rlist):
1822 """Remaps a per-node return list into an a per-os per-node dictionary
1824 @param node_list: a list with the names of all nodes
1825 @param rlist: a map with node names as keys and OS objects as values
1828 @return: a dictionary with osnames as keys and as value another map, with
1829 nodes as keys and tuples of (path, status, diagnose) as values, eg::
1831 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1832 (/srv/..., False, "invalid api")],
1833 "node2": [(/srv/..., True, "")]}
1838 # we build here the list of nodes that didn't fail the RPC (at RPC
1839 # level), so that nodes with a non-responding node daemon don't
1840 # make all OSes invalid
1841 good_nodes = [node_name for node_name in rlist
1842 if not rlist[node_name].fail_msg]
1843 for node_name, nr in rlist.items():
1844 if nr.fail_msg or not nr.payload:
1846 for name, path, status, diagnose in nr.payload:
1847 if name not in all_os:
1848 # build a list of nodes for this os containing empty lists
1849 # for each node in node_list
1851 for nname in good_nodes:
1852 all_os[name][nname] = []
1853 all_os[name][node_name].append((path, status, diagnose))
1856 def Exec(self, feedback_fn):
1857 """Compute the list of OSes.
1860 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1861 node_data = self.rpc.call_os_diagnose(valid_nodes)
1862 pol = self._DiagnoseByOS(valid_nodes, node_data)
1864 for os_name, os_data in pol.items():
1866 for field in self.op.output_fields:
1869 elif field == "valid":
1870 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1871 elif field == "node_status":
1872 # this is just a copy of the dict
1874 for node_name, nos_list in os_data.items():
1875 val[node_name] = nos_list
1877 raise errors.ParameterError(field)
1884 class LURemoveNode(LogicalUnit):
1885 """Logical unit for removing a node.
1888 HPATH = "node-remove"
1889 HTYPE = constants.HTYPE_NODE
1890 _OP_REQP = ["node_name"]
1892 def BuildHooksEnv(self):
1895 This doesn't run on the target node in the pre phase as a failed
1896 node would then be impossible to remove.
1900 "OP_TARGET": self.op.node_name,
1901 "NODE_NAME": self.op.node_name,
1903 all_nodes = self.cfg.GetNodeList()
1904 all_nodes.remove(self.op.node_name)
1905 return env, all_nodes, all_nodes
1907 def CheckPrereq(self):
1908 """Check prerequisites.
1911 - the node exists in the configuration
1912 - it does not have primary or secondary instances
1913 - it's not the master
1915 Any errors are signalled by raising errors.OpPrereqError.
1918 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1920 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1922 instance_list = self.cfg.GetInstanceList()
1924 masternode = self.cfg.GetMasterNode()
1925 if node.name == masternode:
1926 raise errors.OpPrereqError("Node is the master node,"
1927 " you need to failover first.")
1929 for instance_name in instance_list:
1930 instance = self.cfg.GetInstanceInfo(instance_name)
1931 if node.name in instance.all_nodes:
1932 raise errors.OpPrereqError("Instance %s is still running on the node,"
1933 " please remove first." % instance_name)
1934 self.op.node_name = node.name
1937 def Exec(self, feedback_fn):
1938 """Removes the node from the cluster.
1942 logging.info("Stopping the node daemon and removing configs from node %s",
1945 self.context.RemoveNode(node.name)
1947 result = self.rpc.call_node_leave_cluster(node.name)
1948 msg = result.fail_msg
1950 self.LogWarning("Errors encountered on the remote node while leaving"
1951 " the cluster: %s", msg)
1953 # Promote nodes to master candidate as needed
1954 _AdjustCandidatePool(self)
1957 class LUQueryNodes(NoHooksLU):
1958 """Logical unit for querying nodes.
1961 _OP_REQP = ["output_fields", "names", "use_locking"]
1963 _FIELDS_DYNAMIC = utils.FieldSet(
1965 "mtotal", "mnode", "mfree",
1967 "ctotal", "cnodes", "csockets",
1970 _FIELDS_STATIC = utils.FieldSet(
1971 "name", "pinst_cnt", "sinst_cnt",
1972 "pinst_list", "sinst_list",
1973 "pip", "sip", "tags",
1981 def ExpandNames(self):
1982 _CheckOutputFields(static=self._FIELDS_STATIC,
1983 dynamic=self._FIELDS_DYNAMIC,
1984 selected=self.op.output_fields)
1986 self.needed_locks = {}
1987 self.share_locks[locking.LEVEL_NODE] = 1
1990 self.wanted = _GetWantedNodes(self, self.op.names)
1992 self.wanted = locking.ALL_SET
1994 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1995 self.do_locking = self.do_node_query and self.op.use_locking
1997 # if we don't request only static fields, we need to lock the nodes
1998 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2001 def CheckPrereq(self):
2002 """Check prerequisites.
2005 # The validation of the node list is done in the _GetWantedNodes,
2006 # if non empty, and if empty, there's no validation to do
2009 def Exec(self, feedback_fn):
2010 """Computes the list of nodes and their attributes.
2013 all_info = self.cfg.GetAllNodesInfo()
2015 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2016 elif self.wanted != locking.ALL_SET:
2017 nodenames = self.wanted
2018 missing = set(nodenames).difference(all_info.keys())
2020 raise errors.OpExecError(
2021 "Some nodes were removed before retrieving their data: %s" % missing)
2023 nodenames = all_info.keys()
2025 nodenames = utils.NiceSort(nodenames)
2026 nodelist = [all_info[name] for name in nodenames]
2028 # begin data gathering
2030 if self.do_node_query:
2032 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2033 self.cfg.GetHypervisorType())
2034 for name in nodenames:
2035 nodeinfo = node_data[name]
2036 if not nodeinfo.fail_msg and nodeinfo.payload:
2037 nodeinfo = nodeinfo.payload
2038 fn = utils.TryConvert
2040 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2041 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2042 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2043 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2044 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2045 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2046 "bootid": nodeinfo.get('bootid', None),
2047 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2048 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2051 live_data[name] = {}
2053 live_data = dict.fromkeys(nodenames, {})
2055 node_to_primary = dict([(name, set()) for name in nodenames])
2056 node_to_secondary = dict([(name, set()) for name in nodenames])
2058 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2059 "sinst_cnt", "sinst_list"))
2060 if inst_fields & frozenset(self.op.output_fields):
2061 instancelist = self.cfg.GetInstanceList()
2063 for instance_name in instancelist:
2064 inst = self.cfg.GetInstanceInfo(instance_name)
2065 if inst.primary_node in node_to_primary:
2066 node_to_primary[inst.primary_node].add(inst.name)
2067 for secnode in inst.secondary_nodes:
2068 if secnode in node_to_secondary:
2069 node_to_secondary[secnode].add(inst.name)
2071 master_node = self.cfg.GetMasterNode()
2073 # end data gathering
2076 for node in nodelist:
2078 for field in self.op.output_fields:
2081 elif field == "pinst_list":
2082 val = list(node_to_primary[node.name])
2083 elif field == "sinst_list":
2084 val = list(node_to_secondary[node.name])
2085 elif field == "pinst_cnt":
2086 val = len(node_to_primary[node.name])
2087 elif field == "sinst_cnt":
2088 val = len(node_to_secondary[node.name])
2089 elif field == "pip":
2090 val = node.primary_ip
2091 elif field == "sip":
2092 val = node.secondary_ip
2093 elif field == "tags":
2094 val = list(node.GetTags())
2095 elif field == "serial_no":
2096 val = node.serial_no
2097 elif field == "master_candidate":
2098 val = node.master_candidate
2099 elif field == "master":
2100 val = node.name == master_node
2101 elif field == "offline":
2103 elif field == "drained":
2105 elif self._FIELDS_DYNAMIC.Matches(field):
2106 val = live_data[node.name].get(field, None)
2108 raise errors.ParameterError(field)
2109 node_output.append(val)
2110 output.append(node_output)
2115 class LUQueryNodeVolumes(NoHooksLU):
2116 """Logical unit for getting volumes on node(s).
2119 _OP_REQP = ["nodes", "output_fields"]
2121 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2122 _FIELDS_STATIC = utils.FieldSet("node")
2124 def ExpandNames(self):
2125 _CheckOutputFields(static=self._FIELDS_STATIC,
2126 dynamic=self._FIELDS_DYNAMIC,
2127 selected=self.op.output_fields)
2129 self.needed_locks = {}
2130 self.share_locks[locking.LEVEL_NODE] = 1
2131 if not self.op.nodes:
2132 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2134 self.needed_locks[locking.LEVEL_NODE] = \
2135 _GetWantedNodes(self, self.op.nodes)
2137 def CheckPrereq(self):
2138 """Check prerequisites.
2140 This checks that the fields required are valid output fields.
2143 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2145 def Exec(self, feedback_fn):
2146 """Computes the list of nodes and their attributes.
2149 nodenames = self.nodes
2150 volumes = self.rpc.call_node_volumes(nodenames)
2152 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2153 in self.cfg.GetInstanceList()]
2155 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2158 for node in nodenames:
2159 nresult = volumes[node]
2162 msg = nresult.fail_msg
2164 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2167 node_vols = nresult.payload[:]
2168 node_vols.sort(key=lambda vol: vol['dev'])
2170 for vol in node_vols:
2172 for field in self.op.output_fields:
2175 elif field == "phys":
2179 elif field == "name":
2181 elif field == "size":
2182 val = int(float(vol['size']))
2183 elif field == "instance":
2185 if node not in lv_by_node[inst]:
2187 if vol['name'] in lv_by_node[inst][node]:
2193 raise errors.ParameterError(field)
2194 node_output.append(str(val))
2196 output.append(node_output)
2201 class LUAddNode(LogicalUnit):
2202 """Logical unit for adding node to the cluster.
2206 HTYPE = constants.HTYPE_NODE
2207 _OP_REQP = ["node_name"]
2209 def BuildHooksEnv(self):
2212 This will run on all nodes before, and on all nodes + the new node after.
2216 "OP_TARGET": self.op.node_name,
2217 "NODE_NAME": self.op.node_name,
2218 "NODE_PIP": self.op.primary_ip,
2219 "NODE_SIP": self.op.secondary_ip,
2221 nodes_0 = self.cfg.GetNodeList()
2222 nodes_1 = nodes_0 + [self.op.node_name, ]
2223 return env, nodes_0, nodes_1
2225 def CheckPrereq(self):
2226 """Check prerequisites.
2229 - the new node is not already in the config
2231 - its parameters (single/dual homed) matches the cluster
2233 Any errors are signalled by raising errors.OpPrereqError.
2236 node_name = self.op.node_name
2239 dns_data = utils.HostInfo(node_name)
2241 node = dns_data.name
2242 primary_ip = self.op.primary_ip = dns_data.ip
2243 secondary_ip = getattr(self.op, "secondary_ip", None)
2244 if secondary_ip is None:
2245 secondary_ip = primary_ip
2246 if not utils.IsValidIP(secondary_ip):
2247 raise errors.OpPrereqError("Invalid secondary IP given")
2248 self.op.secondary_ip = secondary_ip
2250 node_list = cfg.GetNodeList()
2251 if not self.op.readd and node in node_list:
2252 raise errors.OpPrereqError("Node %s is already in the configuration" %
2254 elif self.op.readd and node not in node_list:
2255 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2257 for existing_node_name in node_list:
2258 existing_node = cfg.GetNodeInfo(existing_node_name)
2260 if self.op.readd and node == existing_node_name:
2261 if (existing_node.primary_ip != primary_ip or
2262 existing_node.secondary_ip != secondary_ip):
2263 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2264 " address configuration as before")
2267 if (existing_node.primary_ip == primary_ip or
2268 existing_node.secondary_ip == primary_ip or
2269 existing_node.primary_ip == secondary_ip or
2270 existing_node.secondary_ip == secondary_ip):
2271 raise errors.OpPrereqError("New node ip address(es) conflict with"
2272 " existing node %s" % existing_node.name)
2274 # check that the type of the node (single versus dual homed) is the
2275 # same as for the master
2276 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2277 master_singlehomed = myself.secondary_ip == myself.primary_ip
2278 newbie_singlehomed = secondary_ip == primary_ip
2279 if master_singlehomed != newbie_singlehomed:
2280 if master_singlehomed:
2281 raise errors.OpPrereqError("The master has no private ip but the"
2282 " new node has one")
2284 raise errors.OpPrereqError("The master has a private ip but the"
2285 " new node doesn't have one")
2287 # checks reachablity
2288 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2289 raise errors.OpPrereqError("Node not reachable by ping")
2291 if not newbie_singlehomed:
2292 # check reachability from my secondary ip to newbie's secondary ip
2293 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2294 source=myself.secondary_ip):
2295 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2296 " based ping to noded port")
2298 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2299 mc_now, _ = self.cfg.GetMasterCandidateStats()
2300 master_candidate = mc_now < cp_size
2302 self.new_node = objects.Node(name=node,
2303 primary_ip=primary_ip,
2304 secondary_ip=secondary_ip,
2305 master_candidate=master_candidate,
2306 offline=False, drained=False)
2308 def Exec(self, feedback_fn):
2309 """Adds the new node to the cluster.
2312 new_node = self.new_node
2313 node = new_node.name
2315 # check connectivity
2316 result = self.rpc.call_version([node])[node]
2317 result.Raise("Can't get version information from node %s" % node)
2318 if constants.PROTOCOL_VERSION == result.payload:
2319 logging.info("Communication to node %s fine, sw version %s match",
2320 node, result.payload)
2322 raise errors.OpExecError("Version mismatch master version %s,"
2323 " node version %s" %
2324 (constants.PROTOCOL_VERSION, result.payload))
2327 logging.info("Copy ssh key to node %s", node)
2328 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2330 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2331 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2337 keyarray.append(f.read())
2341 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2343 keyarray[3], keyarray[4], keyarray[5])
2344 result.Raise("Cannot transfer ssh keys to the new node")
2346 # Add node to our /etc/hosts, and add key to known_hosts
2347 if self.cfg.GetClusterInfo().modify_etc_hosts:
2348 utils.AddHostToEtcHosts(new_node.name)
2350 if new_node.secondary_ip != new_node.primary_ip:
2351 result = self.rpc.call_node_has_ip_address(new_node.name,
2352 new_node.secondary_ip)
2353 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2355 if not result.payload:
2356 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2357 " you gave (%s). Please fix and re-run this"
2358 " command." % new_node.secondary_ip)
2360 node_verify_list = [self.cfg.GetMasterNode()]
2361 node_verify_param = {
2363 # TODO: do a node-net-test as well?
2366 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2367 self.cfg.GetClusterName())
2368 for verifier in node_verify_list:
2369 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2370 nl_payload = result[verifier].payload['nodelist']
2372 for failed in nl_payload:
2373 feedback_fn("ssh/hostname verification failed %s -> %s" %
2374 (verifier, nl_payload[failed]))
2375 raise errors.OpExecError("ssh/hostname verification failed.")
2378 _RedistributeAncillaryFiles(self)
2379 self.context.ReaddNode(new_node)
2381 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2382 self.context.AddNode(new_node)
2385 class LUSetNodeParams(LogicalUnit):
2386 """Modifies the parameters of a node.
2389 HPATH = "node-modify"
2390 HTYPE = constants.HTYPE_NODE
2391 _OP_REQP = ["node_name"]
2394 def CheckArguments(self):
2395 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2396 if node_name is None:
2397 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2398 self.op.node_name = node_name
2399 _CheckBooleanOpField(self.op, 'master_candidate')
2400 _CheckBooleanOpField(self.op, 'offline')
2401 _CheckBooleanOpField(self.op, 'drained')
2402 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2403 if all_mods.count(None) == 3:
2404 raise errors.OpPrereqError("Please pass at least one modification")
2405 if all_mods.count(True) > 1:
2406 raise errors.OpPrereqError("Can't set the node into more than one"
2407 " state at the same time")
2409 def ExpandNames(self):
2410 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2412 def BuildHooksEnv(self):
2415 This runs on the master node.
2419 "OP_TARGET": self.op.node_name,
2420 "MASTER_CANDIDATE": str(self.op.master_candidate),
2421 "OFFLINE": str(self.op.offline),
2422 "DRAINED": str(self.op.drained),
2424 nl = [self.cfg.GetMasterNode(),
2428 def CheckPrereq(self):
2429 """Check prerequisites.
2431 This only checks the instance list against the existing names.
2434 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2436 if ((self.op.master_candidate == False or self.op.offline == True or
2437 self.op.drained == True) and node.master_candidate):
2438 # we will demote the node from master_candidate
2439 if self.op.node_name == self.cfg.GetMasterNode():
2440 raise errors.OpPrereqError("The master node has to be a"
2441 " master candidate, online and not drained")
2442 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2443 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2444 if num_candidates <= cp_size:
2445 msg = ("Not enough master candidates (desired"
2446 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2448 self.LogWarning(msg)
2450 raise errors.OpPrereqError(msg)
2452 if (self.op.master_candidate == True and
2453 ((node.offline and not self.op.offline == False) or
2454 (node.drained and not self.op.drained == False))):
2455 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2456 " to master_candidate" % node.name)
2460 def Exec(self, feedback_fn):
2469 if self.op.offline is not None:
2470 node.offline = self.op.offline
2471 result.append(("offline", str(self.op.offline)))
2472 if self.op.offline == True:
2473 if node.master_candidate:
2474 node.master_candidate = False
2476 result.append(("master_candidate", "auto-demotion due to offline"))
2478 node.drained = False
2479 result.append(("drained", "clear drained status due to offline"))
2481 if self.op.master_candidate is not None:
2482 node.master_candidate = self.op.master_candidate
2484 result.append(("master_candidate", str(self.op.master_candidate)))
2485 if self.op.master_candidate == False:
2486 rrc = self.rpc.call_node_demote_from_mc(node.name)
2489 self.LogWarning("Node failed to demote itself: %s" % msg)
2491 if self.op.drained is not None:
2492 node.drained = self.op.drained
2493 result.append(("drained", str(self.op.drained)))
2494 if self.op.drained == True:
2495 if node.master_candidate:
2496 node.master_candidate = False
2498 result.append(("master_candidate", "auto-demotion due to drain"))
2500 node.offline = False
2501 result.append(("offline", "clear offline status due to drain"))
2503 # this will trigger configuration file update, if needed
2504 self.cfg.Update(node)
2505 # this will trigger job queue propagation or cleanup
2507 self.context.ReaddNode(node)
2512 class LUPowercycleNode(NoHooksLU):
2513 """Powercycles a node.
2516 _OP_REQP = ["node_name", "force"]
2519 def CheckArguments(self):
2520 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2521 if node_name is None:
2522 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2523 self.op.node_name = node_name
2524 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2525 raise errors.OpPrereqError("The node is the master and the force"
2526 " parameter was not set")
2528 def ExpandNames(self):
2529 """Locking for PowercycleNode.
2531 This is a last-resource option and shouldn't block on other
2532 jobs. Therefore, we grab no locks.
2535 self.needed_locks = {}
2537 def CheckPrereq(self):
2538 """Check prerequisites.
2540 This LU has no prereqs.
2545 def Exec(self, feedback_fn):
2549 result = self.rpc.call_node_powercycle(self.op.node_name,
2550 self.cfg.GetHypervisorType())
2551 result.Raise("Failed to schedule the reboot")
2552 return result.payload
2555 class LUQueryClusterInfo(NoHooksLU):
2556 """Query cluster configuration.
2562 def ExpandNames(self):
2563 self.needed_locks = {}
2565 def CheckPrereq(self):
2566 """No prerequsites needed for this LU.
2571 def Exec(self, feedback_fn):
2572 """Return cluster config.
2575 cluster = self.cfg.GetClusterInfo()
2577 "software_version": constants.RELEASE_VERSION,
2578 "protocol_version": constants.PROTOCOL_VERSION,
2579 "config_version": constants.CONFIG_VERSION,
2580 "os_api_version": max(constants.OS_API_VERSIONS),
2581 "export_version": constants.EXPORT_VERSION,
2582 "architecture": (platform.architecture()[0], platform.machine()),
2583 "name": cluster.cluster_name,
2584 "master": cluster.master_node,
2585 "default_hypervisor": cluster.default_hypervisor,
2586 "enabled_hypervisors": cluster.enabled_hypervisors,
2587 "hvparams": dict([(hvname, cluster.hvparams[hvname])
2588 for hvname in cluster.enabled_hypervisors]),
2589 "beparams": cluster.beparams,
2590 "nicparams": cluster.nicparams,
2591 "candidate_pool_size": cluster.candidate_pool_size,
2592 "master_netdev": cluster.master_netdev,
2593 "volume_group_name": cluster.volume_group_name,
2594 "file_storage_dir": cluster.file_storage_dir,
2600 class LUQueryConfigValues(NoHooksLU):
2601 """Return configuration values.
2606 _FIELDS_DYNAMIC = utils.FieldSet()
2607 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2609 def ExpandNames(self):
2610 self.needed_locks = {}
2612 _CheckOutputFields(static=self._FIELDS_STATIC,
2613 dynamic=self._FIELDS_DYNAMIC,
2614 selected=self.op.output_fields)
2616 def CheckPrereq(self):
2617 """No prerequisites.
2622 def Exec(self, feedback_fn):
2623 """Dump a representation of the cluster config to the standard output.
2627 for field in self.op.output_fields:
2628 if field == "cluster_name":
2629 entry = self.cfg.GetClusterName()
2630 elif field == "master_node":
2631 entry = self.cfg.GetMasterNode()
2632 elif field == "drain_flag":
2633 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2635 raise errors.ParameterError(field)
2636 values.append(entry)
2640 class LUActivateInstanceDisks(NoHooksLU):
2641 """Bring up an instance's disks.
2644 _OP_REQP = ["instance_name"]
2647 def ExpandNames(self):
2648 self._ExpandAndLockInstance()
2649 self.needed_locks[locking.LEVEL_NODE] = []
2650 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2652 def DeclareLocks(self, level):
2653 if level == locking.LEVEL_NODE:
2654 self._LockInstancesNodes()
2656 def CheckPrereq(self):
2657 """Check prerequisites.
2659 This checks that the instance is in the cluster.
2662 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2663 assert self.instance is not None, \
2664 "Cannot retrieve locked instance %s" % self.op.instance_name
2665 _CheckNodeOnline(self, self.instance.primary_node)
2667 def Exec(self, feedback_fn):
2668 """Activate the disks.
2671 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2673 raise errors.OpExecError("Cannot activate block devices")
2678 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2679 """Prepare the block devices for an instance.
2681 This sets up the block devices on all nodes.
2683 @type lu: L{LogicalUnit}
2684 @param lu: the logical unit on whose behalf we execute
2685 @type instance: L{objects.Instance}
2686 @param instance: the instance for whose disks we assemble
2687 @type ignore_secondaries: boolean
2688 @param ignore_secondaries: if true, errors on secondary nodes
2689 won't result in an error return from the function
2690 @return: False if the operation failed, otherwise a list of
2691 (host, instance_visible_name, node_visible_name)
2692 with the mapping from node devices to instance devices
2697 iname = instance.name
2698 # With the two passes mechanism we try to reduce the window of
2699 # opportunity for the race condition of switching DRBD to primary
2700 # before handshaking occured, but we do not eliminate it
2702 # The proper fix would be to wait (with some limits) until the
2703 # connection has been made and drbd transitions from WFConnection
2704 # into any other network-connected state (Connected, SyncTarget,
2707 # 1st pass, assemble on all nodes in secondary mode
2708 for inst_disk in instance.disks:
2709 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2710 lu.cfg.SetDiskID(node_disk, node)
2711 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2712 msg = result.fail_msg
2714 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2715 " (is_primary=False, pass=1): %s",
2716 inst_disk.iv_name, node, msg)
2717 if not ignore_secondaries:
2720 # FIXME: race condition on drbd migration to primary
2722 # 2nd pass, do only the primary node
2723 for inst_disk in instance.disks:
2724 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2725 if node != instance.primary_node:
2727 lu.cfg.SetDiskID(node_disk, node)
2728 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2729 msg = result.fail_msg
2731 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2732 " (is_primary=True, pass=2): %s",
2733 inst_disk.iv_name, node, msg)
2735 device_info.append((instance.primary_node, inst_disk.iv_name,
2738 # leave the disks configured for the primary node
2739 # this is a workaround that would be fixed better by
2740 # improving the logical/physical id handling
2741 for disk in instance.disks:
2742 lu.cfg.SetDiskID(disk, instance.primary_node)
2744 return disks_ok, device_info
2747 def _StartInstanceDisks(lu, instance, force):
2748 """Start the disks of an instance.
2751 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2752 ignore_secondaries=force)
2754 _ShutdownInstanceDisks(lu, instance)
2755 if force is not None and not force:
2756 lu.proc.LogWarning("", hint="If the message above refers to a"
2758 " you can retry the operation using '--force'.")
2759 raise errors.OpExecError("Disk consistency error")
2762 class LUDeactivateInstanceDisks(NoHooksLU):
2763 """Shutdown an instance's disks.
2766 _OP_REQP = ["instance_name"]
2769 def ExpandNames(self):
2770 self._ExpandAndLockInstance()
2771 self.needed_locks[locking.LEVEL_NODE] = []
2772 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2774 def DeclareLocks(self, level):
2775 if level == locking.LEVEL_NODE:
2776 self._LockInstancesNodes()
2778 def CheckPrereq(self):
2779 """Check prerequisites.
2781 This checks that the instance is in the cluster.
2784 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2785 assert self.instance is not None, \
2786 "Cannot retrieve locked instance %s" % self.op.instance_name
2788 def Exec(self, feedback_fn):
2789 """Deactivate the disks
2792 instance = self.instance
2793 _SafeShutdownInstanceDisks(self, instance)
2796 def _SafeShutdownInstanceDisks(lu, instance):
2797 """Shutdown block devices of an instance.
2799 This function checks if an instance is running, before calling
2800 _ShutdownInstanceDisks.
2803 pnode = instance.primary_node
2804 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2805 ins_l.Raise("Can't contact node %s" % pnode)
2807 if instance.name in ins_l.payload:
2808 raise errors.OpExecError("Instance is running, can't shutdown"
2811 _ShutdownInstanceDisks(lu, instance)
2814 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2815 """Shutdown block devices of an instance.
2817 This does the shutdown on all nodes of the instance.
2819 If the ignore_primary is false, errors on the primary node are
2824 for disk in instance.disks:
2825 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2826 lu.cfg.SetDiskID(top_disk, node)
2827 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2828 msg = result.fail_msg
2830 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2831 disk.iv_name, node, msg)
2832 if not ignore_primary or node != instance.primary_node:
2837 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2838 """Checks if a node has enough free memory.
2840 This function check if a given node has the needed amount of free
2841 memory. In case the node has less memory or we cannot get the
2842 information from the node, this function raise an OpPrereqError
2845 @type lu: C{LogicalUnit}
2846 @param lu: a logical unit from which we get configuration data
2848 @param node: the node to check
2849 @type reason: C{str}
2850 @param reason: string to use in the error message
2851 @type requested: C{int}
2852 @param requested: the amount of memory in MiB to check for
2853 @type hypervisor_name: C{str}
2854 @param hypervisor_name: the hypervisor to ask for memory stats
2855 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2856 we cannot check the node
2859 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2860 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2861 free_mem = nodeinfo[node].payload.get('memory_free', None)
2862 if not isinstance(free_mem, int):
2863 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2864 " was '%s'" % (node, free_mem))
2865 if requested > free_mem:
2866 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2867 " needed %s MiB, available %s MiB" %
2868 (node, reason, requested, free_mem))
2871 class LUStartupInstance(LogicalUnit):
2872 """Starts an instance.
2875 HPATH = "instance-start"
2876 HTYPE = constants.HTYPE_INSTANCE
2877 _OP_REQP = ["instance_name", "force"]
2880 def ExpandNames(self):
2881 self._ExpandAndLockInstance()
2883 def BuildHooksEnv(self):
2886 This runs on master, primary and secondary nodes of the instance.
2890 "FORCE": self.op.force,
2892 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2893 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2896 def CheckPrereq(self):
2897 """Check prerequisites.
2899 This checks that the instance is in the cluster.
2902 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2903 assert self.instance is not None, \
2904 "Cannot retrieve locked instance %s" % self.op.instance_name
2907 self.beparams = getattr(self.op, "beparams", {})
2909 if not isinstance(self.beparams, dict):
2910 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2911 " dict" % (type(self.beparams), ))
2912 # fill the beparams dict
2913 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2914 self.op.beparams = self.beparams
2917 self.hvparams = getattr(self.op, "hvparams", {})
2919 if not isinstance(self.hvparams, dict):
2920 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2921 " dict" % (type(self.hvparams), ))
2923 # check hypervisor parameter syntax (locally)
2924 cluster = self.cfg.GetClusterInfo()
2925 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2926 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2928 filled_hvp.update(self.hvparams)
2929 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2930 hv_type.CheckParameterSyntax(filled_hvp)
2931 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2932 self.op.hvparams = self.hvparams
2934 _CheckNodeOnline(self, instance.primary_node)
2936 bep = self.cfg.GetClusterInfo().FillBE(instance)
2937 # check bridges existance
2938 _CheckInstanceBridgesExist(self, instance)
2940 remote_info = self.rpc.call_instance_info(instance.primary_node,
2942 instance.hypervisor)
2943 remote_info.Raise("Error checking node %s" % instance.primary_node,
2945 if not remote_info.payload: # not running already
2946 _CheckNodeFreeMemory(self, instance.primary_node,
2947 "starting instance %s" % instance.name,
2948 bep[constants.BE_MEMORY], instance.hypervisor)
2950 def Exec(self, feedback_fn):
2951 """Start the instance.
2954 instance = self.instance
2955 force = self.op.force
2957 self.cfg.MarkInstanceUp(instance.name)
2959 node_current = instance.primary_node
2961 _StartInstanceDisks(self, instance, force)
2963 result = self.rpc.call_instance_start(node_current, instance,
2964 self.hvparams, self.beparams)
2965 msg = result.fail_msg
2967 _ShutdownInstanceDisks(self, instance)
2968 raise errors.OpExecError("Could not start instance: %s" % msg)
2971 class LURebootInstance(LogicalUnit):
2972 """Reboot an instance.
2975 HPATH = "instance-reboot"
2976 HTYPE = constants.HTYPE_INSTANCE
2977 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2980 def ExpandNames(self):
2981 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2982 constants.INSTANCE_REBOOT_HARD,
2983 constants.INSTANCE_REBOOT_FULL]:
2984 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2985 (constants.INSTANCE_REBOOT_SOFT,
2986 constants.INSTANCE_REBOOT_HARD,
2987 constants.INSTANCE_REBOOT_FULL))
2988 self._ExpandAndLockInstance()
2990 def BuildHooksEnv(self):
2993 This runs on master, primary and secondary nodes of the instance.
2997 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2998 "REBOOT_TYPE": self.op.reboot_type,
3000 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3001 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3004 def CheckPrereq(self):
3005 """Check prerequisites.
3007 This checks that the instance is in the cluster.
3010 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3011 assert self.instance is not None, \
3012 "Cannot retrieve locked instance %s" % self.op.instance_name
3014 _CheckNodeOnline(self, instance.primary_node)
3016 # check bridges existance
3017 _CheckInstanceBridgesExist(self, instance)
3019 def Exec(self, feedback_fn):
3020 """Reboot the instance.
3023 instance = self.instance
3024 ignore_secondaries = self.op.ignore_secondaries
3025 reboot_type = self.op.reboot_type
3027 node_current = instance.primary_node
3029 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3030 constants.INSTANCE_REBOOT_HARD]:
3031 for disk in instance.disks:
3032 self.cfg.SetDiskID(disk, node_current)
3033 result = self.rpc.call_instance_reboot(node_current, instance,
3035 result.Raise("Could not reboot instance")
3037 result = self.rpc.call_instance_shutdown(node_current, instance)
3038 result.Raise("Could not shutdown instance for full reboot")
3039 _ShutdownInstanceDisks(self, instance)
3040 _StartInstanceDisks(self, instance, ignore_secondaries)
3041 result = self.rpc.call_instance_start(node_current, instance, None, None)
3042 msg = result.fail_msg
3044 _ShutdownInstanceDisks(self, instance)
3045 raise errors.OpExecError("Could not start instance for"
3046 " full reboot: %s" % msg)
3048 self.cfg.MarkInstanceUp(instance.name)
3051 class LUShutdownInstance(LogicalUnit):
3052 """Shutdown an instance.
3055 HPATH = "instance-stop"
3056 HTYPE = constants.HTYPE_INSTANCE
3057 _OP_REQP = ["instance_name"]
3060 def ExpandNames(self):
3061 self._ExpandAndLockInstance()
3063 def BuildHooksEnv(self):
3066 This runs on master, primary and secondary nodes of the instance.
3069 env = _BuildInstanceHookEnvByObject(self, self.instance)
3070 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3073 def CheckPrereq(self):
3074 """Check prerequisites.
3076 This checks that the instance is in the cluster.
3079 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3080 assert self.instance is not None, \
3081 "Cannot retrieve locked instance %s" % self.op.instance_name
3082 _CheckNodeOnline(self, self.instance.primary_node)
3084 def Exec(self, feedback_fn):
3085 """Shutdown the instance.
3088 instance = self.instance
3089 node_current = instance.primary_node
3090 self.cfg.MarkInstanceDown(instance.name)
3091 result = self.rpc.call_instance_shutdown(node_current, instance)
3092 msg = result.fail_msg
3094 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3096 _ShutdownInstanceDisks(self, instance)
3099 class LUReinstallInstance(LogicalUnit):
3100 """Reinstall an instance.
3103 HPATH = "instance-reinstall"
3104 HTYPE = constants.HTYPE_INSTANCE
3105 _OP_REQP = ["instance_name"]
3108 def ExpandNames(self):
3109 self._ExpandAndLockInstance()
3111 def BuildHooksEnv(self):
3114 This runs on master, primary and secondary nodes of the instance.
3117 env = _BuildInstanceHookEnvByObject(self, self.instance)
3118 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3121 def CheckPrereq(self):
3122 """Check prerequisites.
3124 This checks that the instance is in the cluster and is not running.
3127 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3128 assert instance is not None, \
3129 "Cannot retrieve locked instance %s" % self.op.instance_name
3130 _CheckNodeOnline(self, instance.primary_node)
3132 if instance.disk_template == constants.DT_DISKLESS:
3133 raise errors.OpPrereqError("Instance '%s' has no disks" %
3134 self.op.instance_name)
3135 if instance.admin_up:
3136 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3137 self.op.instance_name)
3138 remote_info = self.rpc.call_instance_info(instance.primary_node,
3140 instance.hypervisor)
3141 remote_info.Raise("Error checking node %s" % instance.primary_node,
3143 if remote_info.payload:
3144 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3145 (self.op.instance_name,
3146 instance.primary_node))
3148 self.op.os_type = getattr(self.op, "os_type", None)
3149 if self.op.os_type is not None:
3151 pnode = self.cfg.GetNodeInfo(
3152 self.cfg.ExpandNodeName(instance.primary_node))
3154 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3156 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3157 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3158 (self.op.os_type, pnode.name), prereq=True)
3160 self.instance = instance
3162 def Exec(self, feedback_fn):
3163 """Reinstall the instance.
3166 inst = self.instance
3168 if self.op.os_type is not None:
3169 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3170 inst.os = self.op.os_type
3171 self.cfg.Update(inst)
3173 _StartInstanceDisks(self, inst, None)
3175 feedback_fn("Running the instance OS create scripts...")
3176 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3177 result.Raise("Could not install OS for instance %s on node %s" %
3178 (inst.name, inst.primary_node))
3180 _ShutdownInstanceDisks(self, inst)
3183 class LURenameInstance(LogicalUnit):
3184 """Rename an instance.
3187 HPATH = "instance-rename"
3188 HTYPE = constants.HTYPE_INSTANCE
3189 _OP_REQP = ["instance_name", "new_name"]
3191 def BuildHooksEnv(self):
3194 This runs on master, primary and secondary nodes of the instance.
3197 env = _BuildInstanceHookEnvByObject(self, self.instance)
3198 env["INSTANCE_NEW_NAME"] = self.op.new_name
3199 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3202 def CheckPrereq(self):
3203 """Check prerequisites.
3205 This checks that the instance is in the cluster and is not running.
3208 instance = self.cfg.GetInstanceInfo(
3209 self.cfg.ExpandInstanceName(self.op.instance_name))
3210 if instance is None:
3211 raise errors.OpPrereqError("Instance '%s' not known" %
3212 self.op.instance_name)
3213 _CheckNodeOnline(self, instance.primary_node)
3215 if instance.admin_up:
3216 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3217 self.op.instance_name)
3218 remote_info = self.rpc.call_instance_info(instance.primary_node,
3220 instance.hypervisor)
3221 remote_info.Raise("Error checking node %s" % instance.primary_node,
3223 if remote_info.payload:
3224 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3225 (self.op.instance_name,
3226 instance.primary_node))
3227 self.instance = instance
3229 # new name verification
3230 name_info = utils.HostInfo(self.op.new_name)
3232 self.op.new_name = new_name = name_info.name
3233 instance_list = self.cfg.GetInstanceList()
3234 if new_name in instance_list:
3235 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3238 if not getattr(self.op, "ignore_ip", False):
3239 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3240 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3241 (name_info.ip, new_name))
3244 def Exec(self, feedback_fn):
3245 """Reinstall the instance.
3248 inst = self.instance
3249 old_name = inst.name
3251 if inst.disk_template == constants.DT_FILE:
3252 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3254 self.cfg.RenameInstance(inst.name, self.op.new_name)
3255 # Change the instance lock. This is definitely safe while we hold the BGL
3256 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3257 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3259 # re-read the instance from the configuration after rename
3260 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3262 if inst.disk_template == constants.DT_FILE:
3263 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3264 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3265 old_file_storage_dir,
3266 new_file_storage_dir)
3267 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3268 " (but the instance has been renamed in Ganeti)" %
3269 (inst.primary_node, old_file_storage_dir,
3270 new_file_storage_dir))
3272 _StartInstanceDisks(self, inst, None)
3274 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3276 msg = result.fail_msg
3278 msg = ("Could not run OS rename script for instance %s on node %s"
3279 " (but the instance has been renamed in Ganeti): %s" %
3280 (inst.name, inst.primary_node, msg))
3281 self.proc.LogWarning(msg)
3283 _ShutdownInstanceDisks(self, inst)
3286 class LURemoveInstance(LogicalUnit):
3287 """Remove an instance.
3290 HPATH = "instance-remove"
3291 HTYPE = constants.HTYPE_INSTANCE
3292 _OP_REQP = ["instance_name", "ignore_failures"]
3295 def ExpandNames(self):
3296 self._ExpandAndLockInstance()
3297 self.needed_locks[locking.LEVEL_NODE] = []
3298 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3300 def DeclareLocks(self, level):
3301 if level == locking.LEVEL_NODE:
3302 self._LockInstancesNodes()
3304 def BuildHooksEnv(self):
3307 This runs on master, primary and secondary nodes of the instance.
3310 env = _BuildInstanceHookEnvByObject(self, self.instance)
3311 nl = [self.cfg.GetMasterNode()]
3314 def CheckPrereq(self):
3315 """Check prerequisites.
3317 This checks that the instance is in the cluster.
3320 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3321 assert self.instance is not None, \
3322 "Cannot retrieve locked instance %s" % self.op.instance_name
3324 def Exec(self, feedback_fn):
3325 """Remove the instance.
3328 instance = self.instance
3329 logging.info("Shutting down instance %s on node %s",
3330 instance.name, instance.primary_node)
3332 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3333 msg = result.fail_msg
3335 if self.op.ignore_failures:
3336 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3338 raise errors.OpExecError("Could not shutdown instance %s on"
3340 (instance.name, instance.primary_node, msg))
3342 logging.info("Removing block devices for instance %s", instance.name)
3344 if not _RemoveDisks(self, instance):
3345 if self.op.ignore_failures:
3346 feedback_fn("Warning: can't remove instance's disks")
3348 raise errors.OpExecError("Can't remove instance's disks")
3350 logging.info("Removing instance %s out of cluster config", instance.name)
3352 self.cfg.RemoveInstance(instance.name)
3353 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3356 class LUQueryInstances(NoHooksLU):
3357 """Logical unit for querying instances.
3360 _OP_REQP = ["output_fields", "names", "use_locking"]
3362 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3364 "disk_template", "ip", "mac", "bridge",
3365 "nic_mode", "nic_link",
3366 "sda_size", "sdb_size", "vcpus", "tags",
3367 "network_port", "beparams",
3368 r"(disk)\.(size)/([0-9]+)",
3369 r"(disk)\.(sizes)", "disk_usage",
3370 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3371 r"(nic)\.(bridge)/([0-9]+)",
3372 r"(nic)\.(macs|ips|modes|links|bridges)",
3373 r"(disk|nic)\.(count)",
3374 "serial_no", "hypervisor", "hvparams",] +
3376 for name in constants.HVS_PARAMETERS] +
3378 for name in constants.BES_PARAMETERS])
3379 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3382 def ExpandNames(self):
3383 _CheckOutputFields(static=self._FIELDS_STATIC,
3384 dynamic=self._FIELDS_DYNAMIC,
3385 selected=self.op.output_fields)
3387 self.needed_locks = {}
3388 self.share_locks[locking.LEVEL_INSTANCE] = 1
3389 self.share_locks[locking.LEVEL_NODE] = 1
3392 self.wanted = _GetWantedInstances(self, self.op.names)
3394 self.wanted = locking.ALL_SET
3396 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3397 self.do_locking = self.do_node_query and self.op.use_locking
3399 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3400 self.needed_locks[locking.LEVEL_NODE] = []
3401 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3403 def DeclareLocks(self, level):
3404 if level == locking.LEVEL_NODE and self.do_locking:
3405 self._LockInstancesNodes()
3407 def CheckPrereq(self):
3408 """Check prerequisites.
3413 def Exec(self, feedback_fn):
3414 """Computes the list of nodes and their attributes.
3417 all_info = self.cfg.GetAllInstancesInfo()
3418 if self.wanted == locking.ALL_SET:
3419 # caller didn't specify instance names, so ordering is not important
3421 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3423 instance_names = all_info.keys()
3424 instance_names = utils.NiceSort(instance_names)
3426 # caller did specify names, so we must keep the ordering
3428 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3430 tgt_set = all_info.keys()
3431 missing = set(self.wanted).difference(tgt_set)
3433 raise errors.OpExecError("Some instances were removed before"
3434 " retrieving their data: %s" % missing)
3435 instance_names = self.wanted
3437 instance_list = [all_info[iname] for iname in instance_names]
3439 # begin data gathering
3441 nodes = frozenset([inst.primary_node for inst in instance_list])
3442 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3446 if self.do_node_query:
3448 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3450 result = node_data[name]
3452 # offline nodes will be in both lists
3453 off_nodes.append(name)
3454 if result.failed or result.fail_msg:
3455 bad_nodes.append(name)
3458 live_data.update(result.payload)
3459 # else no instance is alive
3461 live_data = dict([(name, {}) for name in instance_names])
3463 # end data gathering
3468 cluster = self.cfg.GetClusterInfo()
3469 for instance in instance_list:
3471 i_hv = cluster.FillHV(instance)
3472 i_be = cluster.FillBE(instance)
3473 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3474 nic.nicparams) for nic in instance.nics]
3475 for field in self.op.output_fields:
3476 st_match = self._FIELDS_STATIC.Matches(field)
3481 elif field == "pnode":
3482 val = instance.primary_node
3483 elif field == "snodes":
3484 val = list(instance.secondary_nodes)
3485 elif field == "admin_state":
3486 val = instance.admin_up
3487 elif field == "oper_state":
3488 if instance.primary_node in bad_nodes:
3491 val = bool(live_data.get(instance.name))
3492 elif field == "status":
3493 if instance.primary_node in off_nodes:
3494 val = "ERROR_nodeoffline"
3495 elif instance.primary_node in bad_nodes:
3496 val = "ERROR_nodedown"
3498 running = bool(live_data.get(instance.name))
3500 if instance.admin_up:
3505 if instance.admin_up:
3509 elif field == "oper_ram":
3510 if instance.primary_node in bad_nodes:
3512 elif instance.name in live_data:
3513 val = live_data[instance.name].get("memory", "?")
3516 elif field == "disk_template":
3517 val = instance.disk_template
3520 val = instance.nics[0].ip
3523 elif field == "nic_mode":
3525 val = i_nicp[0][constants.NIC_MODE]
3528 elif field == "nic_link":
3530 val = i_nicp[0][constants.NIC_LINK]
3533 elif field == "bridge":
3534 if (instance.nics and
3535 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3536 val = i_nicp[0][constants.NIC_LINK]
3539 elif field == "mac":
3541 val = instance.nics[0].mac
3544 elif field == "sda_size" or field == "sdb_size":
3545 idx = ord(field[2]) - ord('a')
3547 val = instance.FindDisk(idx).size
3548 except errors.OpPrereqError:
3550 elif field == "disk_usage": # total disk usage per node
3551 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3552 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3553 elif field == "tags":
3554 val = list(instance.GetTags())
3555 elif field == "serial_no":
3556 val = instance.serial_no
3557 elif field == "network_port":
3558 val = instance.network_port
3559 elif field == "hypervisor":
3560 val = instance.hypervisor
3561 elif field == "hvparams":
3563 elif (field.startswith(HVPREFIX) and
3564 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3565 val = i_hv.get(field[len(HVPREFIX):], None)
3566 elif field == "beparams":
3568 elif (field.startswith(BEPREFIX) and
3569 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3570 val = i_be.get(field[len(BEPREFIX):], None)
3571 elif st_match and st_match.groups():
3572 # matches a variable list
3573 st_groups = st_match.groups()
3574 if st_groups and st_groups[0] == "disk":
3575 if st_groups[1] == "count":
3576 val = len(instance.disks)
3577 elif st_groups[1] == "sizes":
3578 val = [disk.size for disk in instance.disks]
3579 elif st_groups[1] == "size":
3581 val = instance.FindDisk(st_groups[2]).size
3582 except errors.OpPrereqError:
3585 assert False, "Unhandled disk parameter"
3586 elif st_groups[0] == "nic":
3587 if st_groups[1] == "count":
3588 val = len(instance.nics)
3589 elif st_groups[1] == "macs":
3590 val = [nic.mac for nic in instance.nics]
3591 elif st_groups[1] == "ips":
3592 val = [nic.ip for nic in instance.nics]
3593 elif st_groups[1] == "modes":
3594 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3595 elif st_groups[1] == "links":
3596 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3597 elif st_groups[1] == "bridges":
3600 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3601 val.append(nicp[constants.NIC_LINK])
3606 nic_idx = int(st_groups[2])
3607 if nic_idx >= len(instance.nics):
3610 if st_groups[1] == "mac":
3611 val = instance.nics[nic_idx].mac
3612 elif st_groups[1] == "ip":
3613 val = instance.nics[nic_idx].ip
3614 elif st_groups[1] == "mode":
3615 val = i_nicp[nic_idx][constants.NIC_MODE]
3616 elif st_groups[1] == "link":
3617 val = i_nicp[nic_idx][constants.NIC_LINK]
3618 elif st_groups[1] == "bridge":
3619 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3620 if nic_mode == constants.NIC_MODE_BRIDGED:
3621 val = i_nicp[nic_idx][constants.NIC_LINK]
3625 assert False, "Unhandled NIC parameter"
3627 assert False, "Unhandled variable parameter"
3629 raise errors.ParameterError(field)
3636 class LUFailoverInstance(LogicalUnit):
3637 """Failover an instance.
3640 HPATH = "instance-failover"
3641 HTYPE = constants.HTYPE_INSTANCE
3642 _OP_REQP = ["instance_name", "ignore_consistency"]
3645 def ExpandNames(self):
3646 self._ExpandAndLockInstance()
3647 self.needed_locks[locking.LEVEL_NODE] = []
3648 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3650 def DeclareLocks(self, level):
3651 if level == locking.LEVEL_NODE:
3652 self._LockInstancesNodes()
3654 def BuildHooksEnv(self):
3657 This runs on master, primary and secondary nodes of the instance.
3661 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3663 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3664 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3667 def CheckPrereq(self):
3668 """Check prerequisites.
3670 This checks that the instance is in the cluster.
3673 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3674 assert self.instance is not None, \
3675 "Cannot retrieve locked instance %s" % self.op.instance_name
3677 bep = self.cfg.GetClusterInfo().FillBE(instance)
3678 if instance.disk_template not in constants.DTS_NET_MIRROR:
3679 raise errors.OpPrereqError("Instance's disk layout is not"
3680 " network mirrored, cannot failover.")
3682 secondary_nodes = instance.secondary_nodes
3683 if not secondary_nodes:
3684 raise errors.ProgrammerError("no secondary node but using "
3685 "a mirrored disk template")
3687 target_node = secondary_nodes[0]
3688 _CheckNodeOnline(self, target_node)
3689 _CheckNodeNotDrained(self, target_node)
3690 if instance.admin_up:
3691 # check memory requirements on the secondary node
3692 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3693 instance.name, bep[constants.BE_MEMORY],
3694 instance.hypervisor)
3696 self.LogInfo("Not checking memory on the secondary node as"
3697 " instance will not be started")
3699 # check bridge existance
3700 _CheckInstanceBridgesExist(self, instance, node=target_node)
3702 def Exec(self, feedback_fn):
3703 """Failover an instance.
3705 The failover is done by shutting it down on its present node and
3706 starting it on the secondary.
3709 instance = self.instance
3711 source_node = instance.primary_node
3712 target_node = instance.secondary_nodes[0]
3714 feedback_fn("* checking disk consistency between source and target")
3715 for dev in instance.disks:
3716 # for drbd, these are drbd over lvm
3717 if not _CheckDiskConsistency(self, dev, target_node, False):
3718 if instance.admin_up and not self.op.ignore_consistency:
3719 raise errors.OpExecError("Disk %s is degraded on target node,"
3720 " aborting failover." % dev.iv_name)
3722 feedback_fn("* shutting down instance on source node")
3723 logging.info("Shutting down instance %s on node %s",
3724 instance.name, source_node)
3726 result = self.rpc.call_instance_shutdown(source_node, instance)
3727 msg = result.fail_msg
3729 if self.op.ignore_consistency:
3730 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3731 " Proceeding anyway. Please make sure node"
3732 " %s is down. Error details: %s",
3733 instance.name, source_node, source_node, msg)
3735 raise errors.OpExecError("Could not shutdown instance %s on"
3737 (instance.name, source_node, msg))
3739 feedback_fn("* deactivating the instance's disks on source node")
3740 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3741 raise errors.OpExecError("Can't shut down the instance's disks.")
3743 instance.primary_node = target_node
3744 # distribute new instance config to the other nodes
3745 self.cfg.Update(instance)
3747 # Only start the instance if it's marked as up
3748 if instance.admin_up:
3749 feedback_fn("* activating the instance's disks on target node")
3750 logging.info("Starting instance %s on node %s",
3751 instance.name, target_node)
3753 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3754 ignore_secondaries=True)
3756 _ShutdownInstanceDisks(self, instance)
3757 raise errors.OpExecError("Can't activate the instance's disks")
3759 feedback_fn("* starting the instance on the target node")
3760 result = self.rpc.call_instance_start(target_node, instance, None, None)
3761 msg = result.fail_msg
3763 _ShutdownInstanceDisks(self, instance)
3764 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3765 (instance.name, target_node, msg))
3768 class LUMigrateInstance(LogicalUnit):
3769 """Migrate an instance.
3771 This is migration without shutting down, compared to the failover,
3772 which is done with shutdown.
3775 HPATH = "instance-migrate"
3776 HTYPE = constants.HTYPE_INSTANCE
3777 _OP_REQP = ["instance_name", "live", "cleanup"]
3781 def ExpandNames(self):
3782 self._ExpandAndLockInstance()
3783 self.needed_locks[locking.LEVEL_NODE] = []
3784 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3786 def DeclareLocks(self, level):
3787 if level == locking.LEVEL_NODE:
3788 self._LockInstancesNodes()
3790 def BuildHooksEnv(self):
3793 This runs on master, primary and secondary nodes of the instance.
3796 env = _BuildInstanceHookEnvByObject(self, self.instance)
3797 env["MIGRATE_LIVE"] = self.op.live
3798 env["MIGRATE_CLEANUP"] = self.op.cleanup
3799 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3802 def CheckPrereq(self):
3803 """Check prerequisites.
3805 This checks that the instance is in the cluster.
3808 instance = self.cfg.GetInstanceInfo(
3809 self.cfg.ExpandInstanceName(self.op.instance_name))
3810 if instance is None:
3811 raise errors.OpPrereqError("Instance '%s' not known" %
3812 self.op.instance_name)
3814 if instance.disk_template != constants.DT_DRBD8:
3815 raise errors.OpPrereqError("Instance's disk layout is not"
3816 " drbd8, cannot migrate.")
3818 secondary_nodes = instance.secondary_nodes
3819 if not secondary_nodes:
3820 raise errors.ConfigurationError("No secondary node but using"
3821 " drbd8 disk template")
3823 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3825 target_node = secondary_nodes[0]
3826 # check memory requirements on the secondary node
3827 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3828 instance.name, i_be[constants.BE_MEMORY],
3829 instance.hypervisor)
3831 # check bridge existance
3832 _CheckInstanceBridgesExist(self, instance, node=target_node)
3834 if not self.op.cleanup:
3835 _CheckNodeNotDrained(self, target_node)
3836 result = self.rpc.call_instance_migratable(instance.primary_node,
3838 result.Raise("Can't migrate, please use failover", prereq=True)
3840 self.instance = instance
3842 def _WaitUntilSync(self):
3843 """Poll with custom rpc for disk sync.
3845 This uses our own step-based rpc call.
3848 self.feedback_fn("* wait until resync is done")
3852 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3854 self.instance.disks)
3856 for node, nres in result.items():
3857 nres.Raise("Cannot resync disks on node %s" % node)
3858 node_done, node_percent = nres.payload
3859 all_done = all_done and node_done
3860 if node_percent is not None:
3861 min_percent = min(min_percent, node_percent)
3863 if min_percent < 100:
3864 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3867 def _EnsureSecondary(self, node):
3868 """Demote a node to secondary.
3871 self.feedback_fn("* switching node %s to secondary mode" % node)
3873 for dev in self.instance.disks:
3874 self.cfg.SetDiskID(dev, node)
3876 result = self.rpc.call_blockdev_close(node, self.instance.name,
3877 self.instance.disks)
3878 result.Raise("Cannot change disk to secondary on node %s" % node)
3880 def _GoStandalone(self):
3881 """Disconnect from the network.
3884 self.feedback_fn("* changing into standalone mode")
3885 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3886 self.instance.disks)
3887 for node, nres in result.items():
3888 nres.Raise("Cannot disconnect disks node %s" % node)
3890 def _GoReconnect(self, multimaster):
3891 """Reconnect to the network.
3897 msg = "single-master"
3898 self.feedback_fn("* changing disks into %s mode" % msg)
3899 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3900 self.instance.disks,
3901 self.instance.name, multimaster)
3902 for node, nres in result.items():
3903 nres.Raise("Cannot change disks config on node %s" % node)
3905 def _ExecCleanup(self):
3906 """Try to cleanup after a failed migration.
3908 The cleanup is done by:
3909 - check that the instance is running only on one node
3910 (and update the config if needed)
3911 - change disks on its secondary node to secondary
3912 - wait until disks are fully synchronized
3913 - disconnect from the network
3914 - change disks into single-master mode
3915 - wait again until disks are fully synchronized
3918 instance = self.instance
3919 target_node = self.target_node
3920 source_node = self.source_node
3922 # check running on only one node
3923 self.feedback_fn("* checking where the instance actually runs"
3924 " (if this hangs, the hypervisor might be in"
3926 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3927 for node, result in ins_l.items():
3928 result.Raise("Can't contact node %s" % node)
3930 runningon_source = instance.name in ins_l[source_node].payload
3931 runningon_target = instance.name in ins_l[target_node].payload
3933 if runningon_source and runningon_target:
3934 raise errors.OpExecError("Instance seems to be running on two nodes,"
3935 " or the hypervisor is confused. You will have"
3936 " to ensure manually that it runs only on one"
3937 " and restart this operation.")
3939 if not (runningon_source or runningon_target):
3940 raise errors.OpExecError("Instance does not seem to be running at all."
3941 " In this case, it's safer to repair by"
3942 " running 'gnt-instance stop' to ensure disk"
3943 " shutdown, and then restarting it.")
3945 if runningon_target:
3946 # the migration has actually succeeded, we need to update the config
3947 self.feedback_fn("* instance running on secondary node (%s),"
3948 " updating config" % target_node)
3949 instance.primary_node = target_node
3950 self.cfg.Update(instance)
3951 demoted_node = source_node
3953 self.feedback_fn("* instance confirmed to be running on its"
3954 " primary node (%s)" % source_node)
3955 demoted_node = target_node
3957 self._EnsureSecondary(demoted_node)
3959 self._WaitUntilSync()
3960 except errors.OpExecError:
3961 # we ignore here errors, since if the device is standalone, it
3962 # won't be able to sync
3964 self._GoStandalone()
3965 self._GoReconnect(False)
3966 self._WaitUntilSync()
3968 self.feedback_fn("* done")
3970 def _RevertDiskStatus(self):
3971 """Try to revert the disk status after a failed migration.
3974 target_node = self.target_node
3976 self._EnsureSecondary(target_node)
3977 self._GoStandalone()
3978 self._GoReconnect(False)
3979 self._WaitUntilSync()
3980 except errors.OpExecError, err:
3981 self.LogWarning("Migration failed and I can't reconnect the"
3982 " drives: error '%s'\n"
3983 "Please look and recover the instance status" %
3986 def _AbortMigration(self):
3987 """Call the hypervisor code to abort a started migration.
3990 instance = self.instance
3991 target_node = self.target_node
3992 migration_info = self.migration_info
3994 abort_result = self.rpc.call_finalize_migration(target_node,
3998 abort_msg = abort_result.fail_msg
4000 logging.error("Aborting migration failed on target node %s: %s" %
4001 (target_node, abort_msg))
4002 # Don't raise an exception here, as we stil have to try to revert the
4003 # disk status, even if this step failed.
4005 def _ExecMigration(self):
4006 """Migrate an instance.
4008 The migrate is done by:
4009 - change the disks into dual-master mode
4010 - wait until disks are fully synchronized again
4011 - migrate the instance
4012 - change disks on the new secondary node (the old primary) to secondary
4013 - wait until disks are fully synchronized
4014 - change disks into single-master mode
4017 instance = self.instance
4018 target_node = self.target_node
4019 source_node = self.source_node
4021 self.feedback_fn("* checking disk consistency between source and target")
4022 for dev in instance.disks:
4023 if not _CheckDiskConsistency(self, dev, target_node, False):
4024 raise errors.OpExecError("Disk %s is degraded or not fully"
4025 " synchronized on target node,"
4026 " aborting migrate." % dev.iv_name)
4028 # First get the migration information from the remote node
4029 result = self.rpc.call_migration_info(source_node, instance)
4030 msg = result.fail_msg
4032 log_err = ("Failed fetching source migration information from %s: %s" %
4034 logging.error(log_err)
4035 raise errors.OpExecError(log_err)
4037 self.migration_info = migration_info = result.payload
4039 # Then switch the disks to master/master mode
4040 self._EnsureSecondary(target_node)
4041 self._GoStandalone()
4042 self._GoReconnect(True)
4043 self._WaitUntilSync()
4045 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4046 result = self.rpc.call_accept_instance(target_node,
4049 self.nodes_ip[target_node])
4051 msg = result.fail_msg
4053 logging.error("Instance pre-migration failed, trying to revert"
4054 " disk status: %s", msg)
4055 self._AbortMigration()
4056 self._RevertDiskStatus()
4057 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4058 (instance.name, msg))
4060 self.feedback_fn("* migrating instance to %s" % target_node)
4062 result = self.rpc.call_instance_migrate(source_node, instance,
4063 self.nodes_ip[target_node],
4065 msg = result.fail_msg
4067 logging.error("Instance migration failed, trying to revert"
4068 " disk status: %s", msg)
4069 self._AbortMigration()
4070 self._RevertDiskStatus()
4071 raise errors.OpExecError("Could not migrate instance %s: %s" %
4072 (instance.name, msg))
4075 instance.primary_node = target_node
4076 # distribute new instance config to the other nodes
4077 self.cfg.Update(instance)
4079 result = self.rpc.call_finalize_migration(target_node,
4083 msg = result.fail_msg
4085 logging.error("Instance migration succeeded, but finalization failed:"
4087 raise errors.OpExecError("Could not finalize instance migration: %s" %
4090 self._EnsureSecondary(source_node)
4091 self._WaitUntilSync()
4092 self._GoStandalone()
4093 self._GoReconnect(False)
4094 self._WaitUntilSync()
4096 self.feedback_fn("* done")
4098 def Exec(self, feedback_fn):
4099 """Perform the migration.
4102 self.feedback_fn = feedback_fn
4104 self.source_node = self.instance.primary_node
4105 self.target_node = self.instance.secondary_nodes[0]
4106 self.all_nodes = [self.source_node, self.target_node]
4108 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4109 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4112 return self._ExecCleanup()
4114 return self._ExecMigration()
4117 def _CreateBlockDev(lu, node, instance, device, force_create,
4119 """Create a tree of block devices on a given node.
4121 If this device type has to be created on secondaries, create it and
4124 If not, just recurse to children keeping the same 'force' value.
4126 @param lu: the lu on whose behalf we execute
4127 @param node: the node on which to create the device
4128 @type instance: L{objects.Instance}
4129 @param instance: the instance which owns the device
4130 @type device: L{objects.Disk}
4131 @param device: the device to create
4132 @type force_create: boolean
4133 @param force_create: whether to force creation of this device; this
4134 will be change to True whenever we find a device which has
4135 CreateOnSecondary() attribute
4136 @param info: the extra 'metadata' we should attach to the device
4137 (this will be represented as a LVM tag)
4138 @type force_open: boolean
4139 @param force_open: this parameter will be passes to the
4140 L{backend.BlockdevCreate} function where it specifies
4141 whether we run on primary or not, and it affects both
4142 the child assembly and the device own Open() execution
4145 if device.CreateOnSecondary():
4149 for child in device.children:
4150 _CreateBlockDev(lu, node, instance, child, force_create,
4153 if not force_create:
4156 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4159 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4160 """Create a single block device on a given node.
4162 This will not recurse over children of the device, so they must be
4165 @param lu: the lu on whose behalf we execute
4166 @param node: the node on which to create the device
4167 @type instance: L{objects.Instance}
4168 @param instance: the instance which owns the device
4169 @type device: L{objects.Disk}
4170 @param device: the device to create
4171 @param info: the extra 'metadata' we should attach to the device
4172 (this will be represented as a LVM tag)
4173 @type force_open: boolean
4174 @param force_open: this parameter will be passes to the
4175 L{backend.BlockdevCreate} function where it specifies
4176 whether we run on primary or not, and it affects both
4177 the child assembly and the device own Open() execution
4180 lu.cfg.SetDiskID(device, node)
4181 result = lu.rpc.call_blockdev_create(node, device, device.size,
4182 instance.name, force_open, info)
4183 result.Raise("Can't create block device %s on"
4184 " node %s for instance %s" % (device, node, instance.name))
4185 if device.physical_id is None:
4186 device.physical_id = result.payload
4189 def _GenerateUniqueNames(lu, exts):
4190 """Generate a suitable LV name.
4192 This will generate a logical volume name for the given instance.
4197 new_id = lu.cfg.GenerateUniqueID()
4198 results.append("%s%s" % (new_id, val))
4202 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4204 """Generate a drbd8 device complete with its children.
4207 port = lu.cfg.AllocatePort()
4208 vgname = lu.cfg.GetVGName()
4209 shared_secret = lu.cfg.GenerateDRBDSecret()
4210 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4211 logical_id=(vgname, names[0]))
4212 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4213 logical_id=(vgname, names[1]))
4214 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4215 logical_id=(primary, secondary, port,
4218 children=[dev_data, dev_meta],
4223 def _GenerateDiskTemplate(lu, template_name,
4224 instance_name, primary_node,
4225 secondary_nodes, disk_info,
4226 file_storage_dir, file_driver,
4228 """Generate the entire disk layout for a given template type.
4231 #TODO: compute space requirements
4233 vgname = lu.cfg.GetVGName()
4234 disk_count = len(disk_info)
4236 if template_name == constants.DT_DISKLESS:
4238 elif template_name == constants.DT_PLAIN:
4239 if len(secondary_nodes) != 0:
4240 raise errors.ProgrammerError("Wrong template configuration")
4242 names = _GenerateUniqueNames(lu, [".disk%d" % i
4243 for i in range(disk_count)])
4244 for idx, disk in enumerate(disk_info):
4245 disk_index = idx + base_index
4246 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4247 logical_id=(vgname, names[idx]),
4248 iv_name="disk/%d" % disk_index,
4250 disks.append(disk_dev)
4251 elif template_name == constants.DT_DRBD8:
4252 if len(secondary_nodes) != 1:
4253 raise errors.ProgrammerError("Wrong template configuration")
4254 remote_node = secondary_nodes[0]
4255 minors = lu.cfg.AllocateDRBDMinor(
4256 [primary_node, remote_node] * len(disk_info), instance_name)
4259 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4260 for i in range(disk_count)]):
4261 names.append(lv_prefix + "_data")
4262 names.append(lv_prefix + "_meta")
4263 for idx, disk in enumerate(disk_info):
4264 disk_index = idx + base_index
4265 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4266 disk["size"], names[idx*2:idx*2+2],
4267 "disk/%d" % disk_index,
4268 minors[idx*2], minors[idx*2+1])
4269 disk_dev.mode = disk["mode"]
4270 disks.append(disk_dev)
4271 elif template_name == constants.DT_FILE:
4272 if len(secondary_nodes) != 0:
4273 raise errors.ProgrammerError("Wrong template configuration")
4275 for idx, disk in enumerate(disk_info):
4276 disk_index = idx + base_index
4277 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4278 iv_name="disk/%d" % disk_index,
4279 logical_id=(file_driver,
4280 "%s/disk%d" % (file_storage_dir,
4283 disks.append(disk_dev)
4285 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4289 def _GetInstanceInfoText(instance):
4290 """Compute that text that should be added to the disk's metadata.
4293 return "originstname+%s" % instance.name
4296 def _CreateDisks(lu, instance):
4297 """Create all disks for an instance.
4299 This abstracts away some work from AddInstance.
4301 @type lu: L{LogicalUnit}
4302 @param lu: the logical unit on whose behalf we execute
4303 @type instance: L{objects.Instance}
4304 @param instance: the instance whose disks we should create
4306 @return: the success of the creation
4309 info = _GetInstanceInfoText(instance)
4310 pnode = instance.primary_node
4312 if instance.disk_template == constants.DT_FILE:
4313 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4314 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4316 result.Raise("Failed to create directory '%s' on"
4317 " node %s: %s" % (file_storage_dir, pnode))
4319 # Note: this needs to be kept in sync with adding of disks in
4320 # LUSetInstanceParams
4321 for device in instance.disks:
4322 logging.info("Creating volume %s for instance %s",
4323 device.iv_name, instance.name)
4325 for node in instance.all_nodes:
4326 f_create = node == pnode
4327 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4330 def _RemoveDisks(lu, instance):
4331 """Remove all disks for an instance.
4333 This abstracts away some work from `AddInstance()` and
4334 `RemoveInstance()`. Note that in case some of the devices couldn't
4335 be removed, the removal will continue with the other ones (compare
4336 with `_CreateDisks()`).
4338 @type lu: L{LogicalUnit}
4339 @param lu: the logical unit on whose behalf we execute
4340 @type instance: L{objects.Instance}
4341 @param instance: the instance whose disks we should remove
4343 @return: the success of the removal
4346 logging.info("Removing block devices for instance %s", instance.name)
4349 for device in instance.disks:
4350 for node, disk in device.ComputeNodeTree(instance.primary_node):
4351 lu.cfg.SetDiskID(disk, node)
4352 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4354 lu.LogWarning("Could not remove block device %s on node %s,"
4355 " continuing anyway: %s", device.iv_name, node, msg)
4358 if instance.disk_template == constants.DT_FILE:
4359 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4360 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4362 msg = result.fail_msg
4364 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4365 file_storage_dir, instance.primary_node, msg)
4371 def _ComputeDiskSize(disk_template, disks):
4372 """Compute disk size requirements in the volume group
4375 # Required free disk space as a function of disk and swap space
4377 constants.DT_DISKLESS: None,
4378 constants.DT_PLAIN: sum(d["size"] for d in disks),
4379 # 128 MB are added for drbd metadata for each disk
4380 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4381 constants.DT_FILE: None,
4384 if disk_template not in req_size_dict:
4385 raise errors.ProgrammerError("Disk template '%s' size requirement"
4386 " is unknown" % disk_template)
4388 return req_size_dict[disk_template]
4391 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4392 """Hypervisor parameter validation.
4394 This function abstract the hypervisor parameter validation to be
4395 used in both instance create and instance modify.
4397 @type lu: L{LogicalUnit}
4398 @param lu: the logical unit for which we check
4399 @type nodenames: list
4400 @param nodenames: the list of nodes on which we should check
4401 @type hvname: string
4402 @param hvname: the name of the hypervisor we should use
4403 @type hvparams: dict
4404 @param hvparams: the parameters which we need to check
4405 @raise errors.OpPrereqError: if the parameters are not valid
4408 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4411 for node in nodenames:
4415 info.Raise("Hypervisor parameter validation failed on node %s" % node)
4418 class LUCreateInstance(LogicalUnit):
4419 """Create an instance.
4422 HPATH = "instance-add"
4423 HTYPE = constants.HTYPE_INSTANCE
4424 _OP_REQP = ["instance_name", "disks", "disk_template",
4426 "wait_for_sync", "ip_check", "nics",
4427 "hvparams", "beparams"]
4430 def _ExpandNode(self, node):
4431 """Expands and checks one node name.
4434 node_full = self.cfg.ExpandNodeName(node)
4435 if node_full is None:
4436 raise errors.OpPrereqError("Unknown node %s" % node)
4439 def ExpandNames(self):
4440 """ExpandNames for CreateInstance.
4442 Figure out the right locks for instance creation.
4445 self.needed_locks = {}
4447 # set optional parameters to none if they don't exist
4448 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4449 if not hasattr(self.op, attr):
4450 setattr(self.op, attr, None)
4452 # cheap checks, mostly valid constants given
4454 # verify creation mode
4455 if self.op.mode not in (constants.INSTANCE_CREATE,
4456 constants.INSTANCE_IMPORT):
4457 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4460 # disk template and mirror node verification
4461 if self.op.disk_template not in constants.DISK_TEMPLATES:
4462 raise errors.OpPrereqError("Invalid disk template name")
4464 if self.op.hypervisor is None:
4465 self.op.hypervisor = self.cfg.GetHypervisorType()
4467 cluster = self.cfg.GetClusterInfo()
4468 enabled_hvs = cluster.enabled_hypervisors
4469 if self.op.hypervisor not in enabled_hvs:
4470 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4471 " cluster (%s)" % (self.op.hypervisor,
4472 ",".join(enabled_hvs)))
4474 # check hypervisor parameter syntax (locally)
4475 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4476 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4478 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4479 hv_type.CheckParameterSyntax(filled_hvp)
4480 self.hv_full = filled_hvp
4482 # fill and remember the beparams dict
4483 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4484 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4487 #### instance parameters check
4489 # instance name verification
4490 hostname1 = utils.HostInfo(self.op.instance_name)
4491 self.op.instance_name = instance_name = hostname1.name
4493 # this is just a preventive check, but someone might still add this
4494 # instance in the meantime, and creation will fail at lock-add time
4495 if instance_name in self.cfg.GetInstanceList():
4496 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4499 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4503 for idx, nic in enumerate(self.op.nics):
4504 nic_mode_req = nic.get("mode", None)
4505 nic_mode = nic_mode_req
4506 if nic_mode is None:
4507 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4509 # in routed mode, for the first nic, the default ip is 'auto'
4510 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4511 default_ip_mode = constants.VALUE_AUTO
4513 default_ip_mode = constants.VALUE_NONE
4515 # ip validity checks
4516 ip = nic.get("ip", default_ip_mode)
4517 if ip is None or ip.lower() == constants.VALUE_NONE:
4519 elif ip.lower() == constants.VALUE_AUTO:
4520 nic_ip = hostname1.ip
4522 if not utils.IsValidIP(ip):
4523 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4524 " like a valid IP" % ip)
4527 # TODO: check the ip for uniqueness !!
4528 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4529 raise errors.OpPrereqError("Routed nic mode requires an ip address")
4531 # MAC address verification
4532 mac = nic.get("mac", constants.VALUE_AUTO)
4533 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4534 if not utils.IsValidMac(mac.lower()):
4535 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4537 # bridge verification
4538 bridge = nic.get("bridge", None)
4539 link = nic.get("link", None)
4541 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
4542 " at the same time")
4543 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4544 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4550 nicparams[constants.NIC_MODE] = nic_mode_req
4552 nicparams[constants.NIC_LINK] = link
4554 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4556 objects.NIC.CheckParameterSyntax(check_params)
4557 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4559 # disk checks/pre-build
4561 for disk in self.op.disks:
4562 mode = disk.get("mode", constants.DISK_RDWR)
4563 if mode not in constants.DISK_ACCESS_SET:
4564 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4566 size = disk.get("size", None)
4568 raise errors.OpPrereqError("Missing disk size")
4572 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4573 self.disks.append({"size": size, "mode": mode})
4575 # used in CheckPrereq for ip ping check
4576 self.check_ip = hostname1.ip
4578 # file storage checks
4579 if (self.op.file_driver and
4580 not self.op.file_driver in constants.FILE_DRIVER):
4581 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4582 self.op.file_driver)
4584 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4585 raise errors.OpPrereqError("File storage directory path not absolute")
4587 ### Node/iallocator related checks
4588 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4589 raise errors.OpPrereqError("One and only one of iallocator and primary"
4590 " node must be given")
4592 if self.op.iallocator:
4593 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4595 self.op.pnode = self._ExpandNode(self.op.pnode)
4596 nodelist = [self.op.pnode]
4597 if self.op.snode is not None:
4598 self.op.snode = self._ExpandNode(self.op.snode)
4599 nodelist.append(self.op.snode)
4600 self.needed_locks[locking.LEVEL_NODE] = nodelist
4602 # in case of import lock the source node too
4603 if self.op.mode == constants.INSTANCE_IMPORT:
4604 src_node = getattr(self.op, "src_node", None)
4605 src_path = getattr(self.op, "src_path", None)
4607 if src_path is None:
4608 self.op.src_path = src_path = self.op.instance_name
4610 if src_node is None:
4611 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4612 self.op.src_node = None
4613 if os.path.isabs(src_path):
4614 raise errors.OpPrereqError("Importing an instance from an absolute"
4615 " path requires a source node option.")
4617 self.op.src_node = src_node = self._ExpandNode(src_node)
4618 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4619 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4620 if not os.path.isabs(src_path):
4621 self.op.src_path = src_path = \
4622 os.path.join(constants.EXPORT_DIR, src_path)
4624 else: # INSTANCE_CREATE
4625 if getattr(self.op, "os_type", None) is None:
4626 raise errors.OpPrereqError("No guest OS specified")
4628 def _RunAllocator(self):
4629 """Run the allocator based on input opcode.
4632 nics = [n.ToDict() for n in self.nics]
4633 ial = IAllocator(self,
4634 mode=constants.IALLOCATOR_MODE_ALLOC,
4635 name=self.op.instance_name,
4636 disk_template=self.op.disk_template,
4639 vcpus=self.be_full[constants.BE_VCPUS],
4640 mem_size=self.be_full[constants.BE_MEMORY],
4643 hypervisor=self.op.hypervisor,
4646 ial.Run(self.op.iallocator)
4649 raise errors.OpPrereqError("Can't compute nodes using"
4650 " iallocator '%s': %s" % (self.op.iallocator,
4652 if len(ial.nodes) != ial.required_nodes:
4653 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4654 " of nodes (%s), required %s" %
4655 (self.op.iallocator, len(ial.nodes),
4656 ial.required_nodes))
4657 self.op.pnode = ial.nodes[0]
4658 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4659 self.op.instance_name, self.op.iallocator,
4660 ", ".join(ial.nodes))
4661 if ial.required_nodes == 2:
4662 self.op.snode = ial.nodes[1]
4664 def BuildHooksEnv(self):
4667 This runs on master, primary and secondary nodes of the instance.
4671 "ADD_MODE": self.op.mode,
4673 if self.op.mode == constants.INSTANCE_IMPORT:
4674 env["SRC_NODE"] = self.op.src_node
4675 env["SRC_PATH"] = self.op.src_path
4676 env["SRC_IMAGES"] = self.src_images
4678 env.update(_BuildInstanceHookEnv(
4679 name=self.op.instance_name,
4680 primary_node=self.op.pnode,
4681 secondary_nodes=self.secondaries,
4682 status=self.op.start,
4683 os_type=self.op.os_type,
4684 memory=self.be_full[constants.BE_MEMORY],
4685 vcpus=self.be_full[constants.BE_VCPUS],
4686 nics=_NICListToTuple(self, self.nics),
4687 disk_template=self.op.disk_template,
4688 disks=[(d["size"], d["mode"]) for d in self.disks],
4691 hypervisor=self.op.hypervisor,
4694 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4699 def CheckPrereq(self):
4700 """Check prerequisites.
4703 if (not self.cfg.GetVGName() and
4704 self.op.disk_template not in constants.DTS_NOT_LVM):
4705 raise errors.OpPrereqError("Cluster does not support lvm-based"
4708 if self.op.mode == constants.INSTANCE_IMPORT:
4709 src_node = self.op.src_node
4710 src_path = self.op.src_path
4712 if src_node is None:
4713 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4714 exp_list = self.rpc.call_export_list(locked_nodes)
4716 for node in exp_list:
4717 if exp_list[node].fail_msg:
4719 if src_path in exp_list[node].payload:
4721 self.op.src_node = src_node = node
4722 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4726 raise errors.OpPrereqError("No export found for relative path %s" %
4729 _CheckNodeOnline(self, src_node)
4730 result = self.rpc.call_export_info(src_node, src_path)
4731 result.Raise("No export or invalid export found in dir %s" % src_path)
4733 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4734 if not export_info.has_section(constants.INISECT_EXP):
4735 raise errors.ProgrammerError("Corrupted export config")
4737 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4738 if (int(ei_version) != constants.EXPORT_VERSION):
4739 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4740 (ei_version, constants.EXPORT_VERSION))
4742 # Check that the new instance doesn't have less disks than the export
4743 instance_disks = len(self.disks)
4744 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4745 if instance_disks < export_disks:
4746 raise errors.OpPrereqError("Not enough disks to import."
4747 " (instance: %d, export: %d)" %
4748 (instance_disks, export_disks))
4750 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4752 for idx in range(export_disks):
4753 option = 'disk%d_dump' % idx
4754 if export_info.has_option(constants.INISECT_INS, option):
4755 # FIXME: are the old os-es, disk sizes, etc. useful?
4756 export_name = export_info.get(constants.INISECT_INS, option)
4757 image = os.path.join(src_path, export_name)
4758 disk_images.append(image)
4760 disk_images.append(False)
4762 self.src_images = disk_images
4764 old_name = export_info.get(constants.INISECT_INS, 'name')
4765 # FIXME: int() here could throw a ValueError on broken exports
4766 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4767 if self.op.instance_name == old_name:
4768 for idx, nic in enumerate(self.nics):
4769 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4770 nic_mac_ini = 'nic%d_mac' % idx
4771 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4773 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4774 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4775 if self.op.start and not self.op.ip_check:
4776 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4777 " adding an instance in start mode")
4779 if self.op.ip_check:
4780 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4781 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4782 (self.check_ip, self.op.instance_name))
4784 #### mac address generation
4785 # By generating here the mac address both the allocator and the hooks get
4786 # the real final mac address rather than the 'auto' or 'generate' value.
4787 # There is a race condition between the generation and the instance object
4788 # creation, which means that we know the mac is valid now, but we're not
4789 # sure it will be when we actually add the instance. If things go bad
4790 # adding the instance will abort because of a duplicate mac, and the
4791 # creation job will fail.
4792 for nic in self.nics:
4793 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4794 nic.mac = self.cfg.GenerateMAC()
4798 if self.op.iallocator is not None:
4799 self._RunAllocator()
4801 #### node related checks
4803 # check primary node
4804 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4805 assert self.pnode is not None, \
4806 "Cannot retrieve locked node %s" % self.op.pnode
4808 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4811 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4814 self.secondaries = []
4816 # mirror node verification
4817 if self.op.disk_template in constants.DTS_NET_MIRROR:
4818 if self.op.snode is None:
4819 raise errors.OpPrereqError("The networked disk templates need"
4821 if self.op.snode == pnode.name:
4822 raise errors.OpPrereqError("The secondary node cannot be"
4823 " the primary node.")
4824 _CheckNodeOnline(self, self.op.snode)
4825 _CheckNodeNotDrained(self, self.op.snode)
4826 self.secondaries.append(self.op.snode)
4828 nodenames = [pnode.name] + self.secondaries
4830 req_size = _ComputeDiskSize(self.op.disk_template,
4833 # Check lv size requirements
4834 if req_size is not None:
4835 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4837 for node in nodenames:
4838 info = nodeinfo[node]
4839 info.Raise("Cannot get current information from node %s" % node)
4841 vg_free = info.get('vg_free', None)
4842 if not isinstance(vg_free, int):
4843 raise errors.OpPrereqError("Can't compute free disk space on"
4845 if req_size > vg_free:
4846 raise errors.OpPrereqError("Not enough disk space on target node %s."
4847 " %d MB available, %d MB required" %
4848 (node, vg_free, req_size))
4850 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4853 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4854 result.Raise("OS '%s' not in supported os list for primary node %s" %
4855 (self.op.os_type, pnode.name), prereq=True)
4857 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4859 # memory check on primary node
4861 _CheckNodeFreeMemory(self, self.pnode.name,
4862 "creating instance %s" % self.op.instance_name,
4863 self.be_full[constants.BE_MEMORY],
4866 self.dry_run_result = list(nodenames)
4868 def Exec(self, feedback_fn):
4869 """Create and add the instance to the cluster.
4872 instance = self.op.instance_name
4873 pnode_name = self.pnode.name
4875 ht_kind = self.op.hypervisor
4876 if ht_kind in constants.HTS_REQ_PORT:
4877 network_port = self.cfg.AllocatePort()
4881 ##if self.op.vnc_bind_address is None:
4882 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4884 # this is needed because os.path.join does not accept None arguments
4885 if self.op.file_storage_dir is None:
4886 string_file_storage_dir = ""
4888 string_file_storage_dir = self.op.file_storage_dir
4890 # build the full file storage dir path
4891 file_storage_dir = os.path.normpath(os.path.join(
4892 self.cfg.GetFileStorageDir(),
4893 string_file_storage_dir, instance))
4896 disks = _GenerateDiskTemplate(self,
4897 self.op.disk_template,
4898 instance, pnode_name,
4902 self.op.file_driver,
4905 iobj = objects.Instance(name=instance, os=self.op.os_type,
4906 primary_node=pnode_name,
4907 nics=self.nics, disks=disks,
4908 disk_template=self.op.disk_template,
4910 network_port=network_port,
4911 beparams=self.op.beparams,
4912 hvparams=self.op.hvparams,
4913 hypervisor=self.op.hypervisor,
4916 feedback_fn("* creating instance disks...")
4918 _CreateDisks(self, iobj)
4919 except errors.OpExecError:
4920 self.LogWarning("Device creation failed, reverting...")
4922 _RemoveDisks(self, iobj)
4924 self.cfg.ReleaseDRBDMinors(instance)
4927 feedback_fn("adding instance %s to cluster config" % instance)
4929 self.cfg.AddInstance(iobj)
4930 # Declare that we don't want to remove the instance lock anymore, as we've
4931 # added the instance to the config
4932 del self.remove_locks[locking.LEVEL_INSTANCE]
4933 # Unlock all the nodes
4934 if self.op.mode == constants.INSTANCE_IMPORT:
4935 nodes_keep = [self.op.src_node]
4936 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4937 if node != self.op.src_node]
4938 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4939 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4941 self.context.glm.release(locking.LEVEL_NODE)
4942 del self.acquired_locks[locking.LEVEL_NODE]
4944 if self.op.wait_for_sync:
4945 disk_abort = not _WaitForSync(self, iobj)
4946 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4947 # make sure the disks are not degraded (still sync-ing is ok)
4949 feedback_fn("* checking mirrors status")
4950 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4955 _RemoveDisks(self, iobj)
4956 self.cfg.RemoveInstance(iobj.name)
4957 # Make sure the instance lock gets removed
4958 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4959 raise errors.OpExecError("There are some degraded disks for"
4962 feedback_fn("creating os for instance %s on node %s" %
4963 (instance, pnode_name))
4965 if iobj.disk_template != constants.DT_DISKLESS:
4966 if self.op.mode == constants.INSTANCE_CREATE:
4967 feedback_fn("* running the instance OS create scripts...")
4968 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4969 result.Raise("Could not add os for instance %s"
4970 " on node %s" % (instance, pnode_name))
4972 elif self.op.mode == constants.INSTANCE_IMPORT:
4973 feedback_fn("* running the instance OS import scripts...")
4974 src_node = self.op.src_node
4975 src_images = self.src_images
4976 cluster_name = self.cfg.GetClusterName()
4977 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4978 src_node, src_images,
4980 msg = import_result.fail_msg
4982 self.LogWarning("Error while importing the disk images for instance"
4983 " %s on node %s: %s" % (instance, pnode_name, msg))
4985 # also checked in the prereq part
4986 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4990 iobj.admin_up = True
4991 self.cfg.Update(iobj)
4992 logging.info("Starting instance %s on node %s", instance, pnode_name)
4993 feedback_fn("* starting instance...")
4994 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4995 result.Raise("Could not start instance")
4997 return list(iobj.all_nodes)
5000 class LUConnectConsole(NoHooksLU):
5001 """Connect to an instance's console.
5003 This is somewhat special in that it returns the command line that
5004 you need to run on the master node in order to connect to the
5008 _OP_REQP = ["instance_name"]
5011 def ExpandNames(self):
5012 self._ExpandAndLockInstance()
5014 def CheckPrereq(self):
5015 """Check prerequisites.
5017 This checks that the instance is in the cluster.
5020 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5021 assert self.instance is not None, \
5022 "Cannot retrieve locked instance %s" % self.op.instance_name
5023 _CheckNodeOnline(self, self.instance.primary_node)
5025 def Exec(self, feedback_fn):
5026 """Connect to the console of an instance
5029 instance = self.instance
5030 node = instance.primary_node
5032 node_insts = self.rpc.call_instance_list([node],
5033 [instance.hypervisor])[node]
5034 node_insts.Raise("Can't get node information from %s" % node)
5036 if instance.name not in node_insts.payload:
5037 raise errors.OpExecError("Instance %s is not running." % instance.name)
5039 logging.debug("Connecting to console of %s on %s", instance.name, node)
5041 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5042 cluster = self.cfg.GetClusterInfo()
5043 # beparams and hvparams are passed separately, to avoid editing the
5044 # instance and then saving the defaults in the instance itself.
5045 hvparams = cluster.FillHV(instance)
5046 beparams = cluster.FillBE(instance)
5047 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5050 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5053 class LUReplaceDisks(LogicalUnit):
5054 """Replace the disks of an instance.
5057 HPATH = "mirrors-replace"
5058 HTYPE = constants.HTYPE_INSTANCE
5059 _OP_REQP = ["instance_name", "mode", "disks"]
5062 def CheckArguments(self):
5063 if not hasattr(self.op, "remote_node"):
5064 self.op.remote_node = None
5065 if not hasattr(self.op, "iallocator"):
5066 self.op.iallocator = None
5068 # check for valid parameter combination
5069 cnt = [self.op.remote_node, self.op.iallocator].count(None)
5070 if self.op.mode == constants.REPLACE_DISK_CHG:
5072 raise errors.OpPrereqError("When changing the secondary either an"
5073 " iallocator script must be used or the"
5076 raise errors.OpPrereqError("Give either the iallocator or the new"
5077 " secondary, not both")
5078 else: # not replacing the secondary
5080 raise errors.OpPrereqError("The iallocator and new node options can"
5081 " be used only when changing the"
5084 def ExpandNames(self):
5085 self._ExpandAndLockInstance()
5087 if self.op.iallocator is not None:
5088 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5089 elif self.op.remote_node is not None:
5090 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5091 if remote_node is None:
5092 raise errors.OpPrereqError("Node '%s' not known" %
5093 self.op.remote_node)
5094 self.op.remote_node = remote_node
5095 # Warning: do not remove the locking of the new secondary here
5096 # unless DRBD8.AddChildren is changed to work in parallel;
5097 # currently it doesn't since parallel invocations of
5098 # FindUnusedMinor will conflict
5099 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5100 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5102 self.needed_locks[locking.LEVEL_NODE] = []
5103 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5105 def DeclareLocks(self, level):
5106 # If we're not already locking all nodes in the set we have to declare the
5107 # instance's primary/secondary nodes.
5108 if (level == locking.LEVEL_NODE and
5109 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5110 self._LockInstancesNodes()
5112 def _RunAllocator(self):
5113 """Compute a new secondary node using an IAllocator.
5116 ial = IAllocator(self,
5117 mode=constants.IALLOCATOR_MODE_RELOC,
5118 name=self.op.instance_name,
5119 relocate_from=[self.sec_node])
5121 ial.Run(self.op.iallocator)
5124 raise errors.OpPrereqError("Can't compute nodes using"
5125 " iallocator '%s': %s" % (self.op.iallocator,
5127 if len(ial.nodes) != ial.required_nodes:
5128 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5129 " of nodes (%s), required %s" %
5130 (len(ial.nodes), ial.required_nodes))
5131 self.op.remote_node = ial.nodes[0]
5132 self.LogInfo("Selected new secondary for the instance: %s",
5133 self.op.remote_node)
5135 def BuildHooksEnv(self):
5138 This runs on the master, the primary and all the secondaries.
5142 "MODE": self.op.mode,
5143 "NEW_SECONDARY": self.op.remote_node,
5144 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5146 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5148 self.cfg.GetMasterNode(),
5149 self.instance.primary_node,
5151 if self.op.remote_node is not None:
5152 nl.append(self.op.remote_node)
5155 def CheckPrereq(self):
5156 """Check prerequisites.
5158 This checks that the instance is in the cluster.
5161 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5162 assert instance is not None, \
5163 "Cannot retrieve locked instance %s" % self.op.instance_name
5164 self.instance = instance
5166 if instance.disk_template != constants.DT_DRBD8:
5167 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5170 if len(instance.secondary_nodes) != 1:
5171 raise errors.OpPrereqError("The instance has a strange layout,"
5172 " expected one secondary but found %d" %
5173 len(instance.secondary_nodes))
5175 self.sec_node = instance.secondary_nodes[0]
5177 if self.op.iallocator is not None:
5178 self._RunAllocator()
5180 remote_node = self.op.remote_node
5181 if remote_node is not None:
5182 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5183 assert self.remote_node_info is not None, \
5184 "Cannot retrieve locked node %s" % remote_node
5186 self.remote_node_info = None
5187 if remote_node == instance.primary_node:
5188 raise errors.OpPrereqError("The specified node is the primary node of"
5190 elif remote_node == self.sec_node:
5191 raise errors.OpPrereqError("The specified node is already the"
5192 " secondary node of the instance.")
5194 if self.op.mode == constants.REPLACE_DISK_PRI:
5195 n1 = self.tgt_node = instance.primary_node
5196 n2 = self.oth_node = self.sec_node
5197 elif self.op.mode == constants.REPLACE_DISK_SEC:
5198 n1 = self.tgt_node = self.sec_node
5199 n2 = self.oth_node = instance.primary_node
5200 elif self.op.mode == constants.REPLACE_DISK_CHG:
5201 n1 = self.new_node = remote_node
5202 n2 = self.oth_node = instance.primary_node
5203 self.tgt_node = self.sec_node
5204 _CheckNodeNotDrained(self, remote_node)
5206 raise errors.ProgrammerError("Unhandled disk replace mode")
5208 _CheckNodeOnline(self, n1)
5209 _CheckNodeOnline(self, n2)
5211 if not self.op.disks:
5212 self.op.disks = range(len(instance.disks))
5214 for disk_idx in self.op.disks:
5215 instance.FindDisk(disk_idx)
5217 def _ExecD8DiskOnly(self, feedback_fn):
5218 """Replace a disk on the primary or secondary for dbrd8.
5220 The algorithm for replace is quite complicated:
5222 1. for each disk to be replaced:
5224 1. create new LVs on the target node with unique names
5225 1. detach old LVs from the drbd device
5226 1. rename old LVs to name_replaced.<time_t>
5227 1. rename new LVs to old LVs
5228 1. attach the new LVs (with the old names now) to the drbd device
5230 1. wait for sync across all devices
5232 1. for each modified disk:
5234 1. remove old LVs (which have the name name_replaces.<time_t>)
5236 Failures are not very well handled.
5240 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5241 instance = self.instance
5243 vgname = self.cfg.GetVGName()
5246 tgt_node = self.tgt_node
5247 oth_node = self.oth_node
5249 # Step: check device activation
5250 self.proc.LogStep(1, steps_total, "check device existence")
5251 info("checking volume groups")
5252 my_vg = cfg.GetVGName()
5253 results = self.rpc.call_vg_list([oth_node, tgt_node])
5255 raise errors.OpExecError("Can't list volume groups on the nodes")
5256 for node in oth_node, tgt_node:
5258 res.Raise("Error checking node %s" % node)
5259 if my_vg not in res.payload:
5260 raise errors.OpExecError("Volume group '%s' not found on %s" %
5262 for idx, dev in enumerate(instance.disks):
5263 if idx not in self.op.disks:
5265 for node in tgt_node, oth_node:
5266 info("checking disk/%d on %s" % (idx, node))
5267 cfg.SetDiskID(dev, node)
5268 result = self.rpc.call_blockdev_find(node, dev)
5269 msg = result.fail_msg
5270 if not msg and not result.payload:
5271 msg = "disk not found"
5273 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5276 # Step: check other node consistency
5277 self.proc.LogStep(2, steps_total, "check peer consistency")
5278 for idx, dev in enumerate(instance.disks):
5279 if idx not in self.op.disks:
5281 info("checking disk/%d consistency on %s" % (idx, oth_node))
5282 if not _CheckDiskConsistency(self, dev, oth_node,
5283 oth_node==instance.primary_node):
5284 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5285 " to replace disks on this node (%s)" %
5286 (oth_node, tgt_node))
5288 # Step: create new storage
5289 self.proc.LogStep(3, steps_total, "allocate new storage")
5290 for idx, dev in enumerate(instance.disks):
5291 if idx not in self.op.disks:
5294 cfg.SetDiskID(dev, tgt_node)
5295 lv_names = [".disk%d_%s" % (idx, suf)
5296 for suf in ["data", "meta"]]
5297 names = _GenerateUniqueNames(self, lv_names)
5298 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5299 logical_id=(vgname, names[0]))
5300 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5301 logical_id=(vgname, names[1]))
5302 new_lvs = [lv_data, lv_meta]
5303 old_lvs = dev.children
5304 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5305 info("creating new local storage on %s for %s" %
5306 (tgt_node, dev.iv_name))
5307 # we pass force_create=True to force the LVM creation
5308 for new_lv in new_lvs:
5309 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5310 _GetInstanceInfoText(instance), False)
5312 # Step: for each lv, detach+rename*2+attach
5313 self.proc.LogStep(4, steps_total, "change drbd configuration")
5314 for dev, old_lvs, new_lvs in iv_names.itervalues():
5315 info("detaching %s drbd from local storage" % dev.iv_name)
5316 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5317 result.Raise("Can't detach drbd from local storage on node"
5318 " %s for device %s" % (tgt_node, dev.iv_name))
5320 #cfg.Update(instance)
5322 # ok, we created the new LVs, so now we know we have the needed
5323 # storage; as such, we proceed on the target node to rename
5324 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5325 # using the assumption that logical_id == physical_id (which in
5326 # turn is the unique_id on that node)
5328 # FIXME(iustin): use a better name for the replaced LVs
5329 temp_suffix = int(time.time())
5330 ren_fn = lambda d, suff: (d.physical_id[0],
5331 d.physical_id[1] + "_replaced-%s" % suff)
5332 # build the rename list based on what LVs exist on the node
5334 for to_ren in old_lvs:
5335 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5336 if not result.fail_msg and result.payload:
5338 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5340 info("renaming the old LVs on the target node")
5341 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5342 result.Raise("Can't rename old LVs on node %s" % tgt_node)
5343 # now we rename the new LVs to the old LVs
5344 info("renaming the new LVs on the target node")
5345 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5346 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5347 result.Raise("Can't rename new LVs on node %s" % tgt_node)
5349 for old, new in zip(old_lvs, new_lvs):
5350 new.logical_id = old.logical_id
5351 cfg.SetDiskID(new, tgt_node)
5353 for disk in old_lvs:
5354 disk.logical_id = ren_fn(disk, temp_suffix)
5355 cfg.SetDiskID(disk, tgt_node)
5357 # now that the new lvs have the old name, we can add them to the device
5358 info("adding new mirror component on %s" % tgt_node)
5359 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5360 msg = result.fail_msg
5362 for new_lv in new_lvs:
5363 msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
5365 warning("Can't rollback device %s: %s", dev, msg2,
5366 hint="cleanup manually the unused logical volumes")
5367 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5369 dev.children = new_lvs
5370 cfg.Update(instance)
5372 # Step: wait for sync
5374 # this can fail as the old devices are degraded and _WaitForSync
5375 # does a combined result over all disks, so we don't check its
5377 self.proc.LogStep(5, steps_total, "sync devices")
5378 _WaitForSync(self, instance, unlock=True)
5380 # so check manually all the devices
5381 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5382 cfg.SetDiskID(dev, instance.primary_node)
5383 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5384 msg = result.fail_msg
5385 if not msg and not result.payload:
5386 msg = "disk not found"
5388 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5390 if result.payload[5]:
5391 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5393 # Step: remove old storage
5394 self.proc.LogStep(6, steps_total, "removing old storage")
5395 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5396 info("remove logical volumes for %s" % name)
5398 cfg.SetDiskID(lv, tgt_node)
5399 msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
5401 warning("Can't remove old LV: %s" % msg,
5402 hint="manually remove unused LVs")
5405 def _ExecD8Secondary(self, feedback_fn):
5406 """Replace the secondary node for drbd8.
5408 The algorithm for replace is quite complicated:
5409 - for all disks of the instance:
5410 - create new LVs on the new node with same names
5411 - shutdown the drbd device on the old secondary
5412 - disconnect the drbd network on the primary
5413 - create the drbd device on the new secondary
5414 - network attach the drbd on the primary, using an artifice:
5415 the drbd code for Attach() will connect to the network if it
5416 finds a device which is connected to the good local disks but
5418 - wait for sync across all devices
5419 - remove all disks from the old secondary
5421 Failures are not very well handled.
5425 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5426 instance = self.instance
5430 old_node = self.tgt_node
5431 new_node = self.new_node
5432 pri_node = instance.primary_node
5434 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5435 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5436 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5439 # Step: check device activation
5440 self.proc.LogStep(1, steps_total, "check device existence")
5441 info("checking volume groups")
5442 my_vg = cfg.GetVGName()
5443 results = self.rpc.call_vg_list([pri_node, new_node])
5444 for node in pri_node, new_node:
5446 res.Raise("Error checking node %s" % node)
5447 if my_vg not in res.payload:
5448 raise errors.OpExecError("Volume group '%s' not found on %s" %
5450 for idx, dev in enumerate(instance.disks):
5451 if idx not in self.op.disks:
5453 info("checking disk/%d on %s" % (idx, pri_node))
5454 cfg.SetDiskID(dev, pri_node)
5455 result = self.rpc.call_blockdev_find(pri_node, dev)
5456 msg = result.fail_msg
5457 if not msg and not result.payload:
5458 msg = "disk not found"
5460 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5461 (idx, pri_node, msg))
5463 # Step: check other node consistency
5464 self.proc.LogStep(2, steps_total, "check peer consistency")
5465 for idx, dev in enumerate(instance.disks):
5466 if idx not in self.op.disks:
5468 info("checking disk/%d consistency on %s" % (idx, pri_node))
5469 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5470 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5471 " unsafe to replace the secondary" %
5474 # Step: create new storage
5475 self.proc.LogStep(3, steps_total, "allocate new storage")
5476 for idx, dev in enumerate(instance.disks):
5477 info("adding new local storage on %s for disk/%d" %
5479 # we pass force_create=True to force LVM creation
5480 for new_lv in dev.children:
5481 _CreateBlockDev(self, new_node, instance, new_lv, True,
5482 _GetInstanceInfoText(instance), False)
5484 # Step 4: dbrd minors and drbd setups changes
5485 # after this, we must manually remove the drbd minors on both the
5486 # error and the success paths
5487 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5489 logging.debug("Allocated minors %s" % (minors,))
5490 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5491 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5493 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5494 # create new devices on new_node; note that we create two IDs:
5495 # one without port, so the drbd will be activated without
5496 # networking information on the new node at this stage, and one
5497 # with network, for the latter activation in step 4
5498 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5499 if pri_node == o_node1:
5504 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5505 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5507 iv_names[idx] = (dev, dev.children, new_net_id)
5508 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5510 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5511 logical_id=new_alone_id,
5512 children=dev.children,
5515 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5516 _GetInstanceInfoText(instance), False)
5517 except errors.GenericError:
5518 self.cfg.ReleaseDRBDMinors(instance.name)
5521 for idx, dev in enumerate(instance.disks):
5522 # we have new devices, shutdown the drbd on the old secondary
5523 info("shutting down drbd for disk/%d on old node" % idx)
5524 cfg.SetDiskID(dev, old_node)
5525 msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
5527 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5529 hint="Please cleanup this device manually as soon as possible")
5531 info("detaching primary drbds from the network (=> standalone)")
5532 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5533 instance.disks)[pri_node]
5535 msg = result.fail_msg
5537 # detaches didn't succeed (unlikely)
5538 self.cfg.ReleaseDRBDMinors(instance.name)
5539 raise errors.OpExecError("Can't detach the disks from the network on"
5540 " old node: %s" % (msg,))
5542 # if we managed to detach at least one, we update all the disks of
5543 # the instance to point to the new secondary
5544 info("updating instance configuration")
5545 for dev, _, new_logical_id in iv_names.itervalues():
5546 dev.logical_id = new_logical_id
5547 cfg.SetDiskID(dev, pri_node)
5548 cfg.Update(instance)
5550 # and now perform the drbd attach
5551 info("attaching primary drbds to new secondary (standalone => connected)")
5552 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5553 instance.disks, instance.name,
5555 for to_node, to_result in result.items():
5556 msg = to_result.fail_msg
5558 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5559 hint="please do a gnt-instance info to see the"
5562 # this can fail as the old devices are degraded and _WaitForSync
5563 # does a combined result over all disks, so we don't check its
5565 self.proc.LogStep(5, steps_total, "sync devices")
5566 _WaitForSync(self, instance, unlock=True)
5568 # so check manually all the devices
5569 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5570 cfg.SetDiskID(dev, pri_node)
5571 result = self.rpc.call_blockdev_find(pri_node, dev)
5572 msg = result.fail_msg
5573 if not msg and not result.payload:
5574 msg = "disk not found"
5576 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5578 if result.payload[5]:
5579 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5581 self.proc.LogStep(6, steps_total, "removing old storage")
5582 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5583 info("remove logical volumes for disk/%d" % idx)
5585 cfg.SetDiskID(lv, old_node)
5586 msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
5588 warning("Can't remove LV on old secondary: %s", msg,
5589 hint="Cleanup stale volumes by hand")
5591 def Exec(self, feedback_fn):
5592 """Execute disk replacement.
5594 This dispatches the disk replacement to the appropriate handler.
5597 instance = self.instance
5599 # Activate the instance disks if we're replacing them on a down instance
5600 if not instance.admin_up:
5601 _StartInstanceDisks(self, instance, True)
5603 if self.op.mode == constants.REPLACE_DISK_CHG:
5604 fn = self._ExecD8Secondary
5606 fn = self._ExecD8DiskOnly
5608 ret = fn(feedback_fn)
5610 # Deactivate the instance disks if we're replacing them on a down instance
5611 if not instance.admin_up:
5612 _SafeShutdownInstanceDisks(self, instance)
5617 class LUGrowDisk(LogicalUnit):
5618 """Grow a disk of an instance.
5622 HTYPE = constants.HTYPE_INSTANCE
5623 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5626 def ExpandNames(self):
5627 self._ExpandAndLockInstance()
5628 self.needed_locks[locking.LEVEL_NODE] = []
5629 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5631 def DeclareLocks(self, level):
5632 if level == locking.LEVEL_NODE:
5633 self._LockInstancesNodes()
5635 def BuildHooksEnv(self):
5638 This runs on the master, the primary and all the secondaries.
5642 "DISK": self.op.disk,
5643 "AMOUNT": self.op.amount,
5645 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5647 self.cfg.GetMasterNode(),
5648 self.instance.primary_node,
5652 def CheckPrereq(self):
5653 """Check prerequisites.
5655 This checks that the instance is in the cluster.
5658 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5659 assert instance is not None, \
5660 "Cannot retrieve locked instance %s" % self.op.instance_name
5661 nodenames = list(instance.all_nodes)
5662 for node in nodenames:
5663 _CheckNodeOnline(self, node)
5666 self.instance = instance
5668 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5669 raise errors.OpPrereqError("Instance's disk layout does not support"
5672 self.disk = instance.FindDisk(self.op.disk)
5674 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5675 instance.hypervisor)
5676 for node in nodenames:
5677 info = nodeinfo[node]
5678 info.Raise("Cannot get current information from node %s" % node)
5679 vg_free = info.payload.get('vg_free', None)
5680 if not isinstance(vg_free, int):
5681 raise errors.OpPrereqError("Can't compute free disk space on"
5683 if self.op.amount > vg_free:
5684 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5685 " %d MiB available, %d MiB required" %
5686 (node, vg_free, self.op.amount))
5688 def Exec(self, feedback_fn):
5689 """Execute disk grow.
5692 instance = self.instance
5694 for node in instance.all_nodes:
5695 self.cfg.SetDiskID(disk, node)
5696 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5697 result.Raise("Grow request failed to node %s" % node)
5698 disk.RecordGrow(self.op.amount)
5699 self.cfg.Update(instance)
5700 if self.op.wait_for_sync:
5701 disk_abort = not _WaitForSync(self, instance)
5703 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5704 " status.\nPlease check the instance.")
5707 class LUQueryInstanceData(NoHooksLU):
5708 """Query runtime instance data.
5711 _OP_REQP = ["instances", "static"]
5714 def ExpandNames(self):
5715 self.needed_locks = {}
5716 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5718 if not isinstance(self.op.instances, list):
5719 raise errors.OpPrereqError("Invalid argument type 'instances'")
5721 if self.op.instances:
5722 self.wanted_names = []
5723 for name in self.op.instances:
5724 full_name = self.cfg.ExpandInstanceName(name)
5725 if full_name is None:
5726 raise errors.OpPrereqError("Instance '%s' not known" % name)
5727 self.wanted_names.append(full_name)
5728 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5730 self.wanted_names = None
5731 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5733 self.needed_locks[locking.LEVEL_NODE] = []
5734 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5736 def DeclareLocks(self, level):
5737 if level == locking.LEVEL_NODE:
5738 self._LockInstancesNodes()
5740 def CheckPrereq(self):
5741 """Check prerequisites.
5743 This only checks the optional instance list against the existing names.
5746 if self.wanted_names is None:
5747 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5749 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5750 in self.wanted_names]
5753 def _ComputeDiskStatus(self, instance, snode, dev):
5754 """Compute block device status.
5757 static = self.op.static
5759 self.cfg.SetDiskID(dev, instance.primary_node)
5760 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5761 if dev_pstatus.offline:
5764 dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5765 dev_pstatus = dev_pstatus.payload
5769 if dev.dev_type in constants.LDS_DRBD:
5770 # we change the snode then (otherwise we use the one passed in)
5771 if dev.logical_id[0] == instance.primary_node:
5772 snode = dev.logical_id[1]
5774 snode = dev.logical_id[0]
5776 if snode and not static:
5777 self.cfg.SetDiskID(dev, snode)
5778 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5779 if dev_sstatus.offline:
5782 dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5783 dev_sstatus = dev_sstatus.payload
5788 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5789 for child in dev.children]
5794 "iv_name": dev.iv_name,
5795 "dev_type": dev.dev_type,
5796 "logical_id": dev.logical_id,
5797 "physical_id": dev.physical_id,
5798 "pstatus": dev_pstatus,
5799 "sstatus": dev_sstatus,
5800 "children": dev_children,
5806 def Exec(self, feedback_fn):
5807 """Gather and return data"""
5810 cluster = self.cfg.GetClusterInfo()
5812 for instance in self.wanted_instances:
5813 if not self.op.static:
5814 remote_info = self.rpc.call_instance_info(instance.primary_node,
5816 instance.hypervisor)
5817 remote_info.Raise("Error checking node %s" % instance.primary_node)
5818 remote_info = remote_info.payload
5819 if remote_info and "state" in remote_info:
5822 remote_state = "down"
5825 if instance.admin_up:
5828 config_state = "down"
5830 disks = [self._ComputeDiskStatus(instance, None, device)
5831 for device in instance.disks]
5834 "name": instance.name,
5835 "config_state": config_state,
5836 "run_state": remote_state,
5837 "pnode": instance.primary_node,
5838 "snodes": instance.secondary_nodes,
5840 # this happens to be the same format used for hooks
5841 "nics": _NICListToTuple(self, instance.nics),
5843 "hypervisor": instance.hypervisor,
5844 "network_port": instance.network_port,
5845 "hv_instance": instance.hvparams,
5846 "hv_actual": cluster.FillHV(instance),
5847 "be_instance": instance.beparams,
5848 "be_actual": cluster.FillBE(instance),
5851 result[instance.name] = idict
5856 class LUSetInstanceParams(LogicalUnit):
5857 """Modifies an instances's parameters.
5860 HPATH = "instance-modify"
5861 HTYPE = constants.HTYPE_INSTANCE
5862 _OP_REQP = ["instance_name"]
5865 def CheckArguments(self):
5866 if not hasattr(self.op, 'nics'):
5868 if not hasattr(self.op, 'disks'):
5870 if not hasattr(self.op, 'beparams'):
5871 self.op.beparams = {}
5872 if not hasattr(self.op, 'hvparams'):
5873 self.op.hvparams = {}
5874 self.op.force = getattr(self.op, "force", False)
5875 if not (self.op.nics or self.op.disks or
5876 self.op.hvparams or self.op.beparams):
5877 raise errors.OpPrereqError("No changes submitted")
5881 for disk_op, disk_dict in self.op.disks:
5882 if disk_op == constants.DDM_REMOVE:
5885 elif disk_op == constants.DDM_ADD:
5888 if not isinstance(disk_op, int):
5889 raise errors.OpPrereqError("Invalid disk index")
5890 if not isinstance(disk_dict, dict):
5891 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
5892 raise errors.OpPrereqError(msg)
5894 if disk_op == constants.DDM_ADD:
5895 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5896 if mode not in constants.DISK_ACCESS_SET:
5897 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5898 size = disk_dict.get('size', None)
5900 raise errors.OpPrereqError("Required disk parameter size missing")
5903 except ValueError, err:
5904 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5906 disk_dict['size'] = size
5908 # modification of disk
5909 if 'size' in disk_dict:
5910 raise errors.OpPrereqError("Disk size change not possible, use"
5913 if disk_addremove > 1:
5914 raise errors.OpPrereqError("Only one disk add or remove operation"
5915 " supported at a time")
5919 for nic_op, nic_dict in self.op.nics:
5920 if nic_op == constants.DDM_REMOVE:
5923 elif nic_op == constants.DDM_ADD:
5926 if not isinstance(nic_op, int):
5927 raise errors.OpPrereqError("Invalid nic index")
5928 if not isinstance(nic_dict, dict):
5929 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
5930 raise errors.OpPrereqError(msg)
5932 # nic_dict should be a dict
5933 nic_ip = nic_dict.get('ip', None)
5934 if nic_ip is not None:
5935 if nic_ip.lower() == constants.VALUE_NONE:
5936 nic_dict['ip'] = None
5938 if not utils.IsValidIP(nic_ip):
5939 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5941 nic_bridge = nic_dict.get('bridge', None)
5942 nic_link = nic_dict.get('link', None)
5943 if nic_bridge and nic_link:
5944 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5945 " at the same time")
5946 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5947 nic_dict['bridge'] = None
5948 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5949 nic_dict['link'] = None
5951 if nic_op == constants.DDM_ADD:
5952 nic_mac = nic_dict.get('mac', None)
5954 nic_dict['mac'] = constants.VALUE_AUTO
5956 if 'mac' in nic_dict:
5957 nic_mac = nic_dict['mac']
5958 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5959 if not utils.IsValidMac(nic_mac):
5960 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5961 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5962 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5963 " modifying an existing nic")
5965 if nic_addremove > 1:
5966 raise errors.OpPrereqError("Only one NIC add or remove operation"
5967 " supported at a time")
5969 def ExpandNames(self):
5970 self._ExpandAndLockInstance()
5971 self.needed_locks[locking.LEVEL_NODE] = []
5972 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5974 def DeclareLocks(self, level):
5975 if level == locking.LEVEL_NODE:
5976 self._LockInstancesNodes()
5978 def BuildHooksEnv(self):
5981 This runs on the master, primary and secondaries.
5985 if constants.BE_MEMORY in self.be_new:
5986 args['memory'] = self.be_new[constants.BE_MEMORY]
5987 if constants.BE_VCPUS in self.be_new:
5988 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5989 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5990 # information at all.
5993 nic_override = dict(self.op.nics)
5994 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
5995 for idx, nic in enumerate(self.instance.nics):
5996 if idx in nic_override:
5997 this_nic_override = nic_override[idx]
5999 this_nic_override = {}
6000 if 'ip' in this_nic_override:
6001 ip = this_nic_override['ip']
6004 if 'mac' in this_nic_override:
6005 mac = this_nic_override['mac']
6008 if idx in self.nic_pnew:
6009 nicparams = self.nic_pnew[idx]
6011 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6012 mode = nicparams[constants.NIC_MODE]
6013 link = nicparams[constants.NIC_LINK]
6014 args['nics'].append((ip, mac, mode, link))
6015 if constants.DDM_ADD in nic_override:
6016 ip = nic_override[constants.DDM_ADD].get('ip', None)
6017 mac = nic_override[constants.DDM_ADD]['mac']
6018 nicparams = self.nic_pnew[constants.DDM_ADD]
6019 mode = nicparams[constants.NIC_MODE]
6020 link = nicparams[constants.NIC_LINK]
6021 args['nics'].append((ip, mac, mode, link))
6022 elif constants.DDM_REMOVE in nic_override:
6023 del args['nics'][-1]
6025 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6026 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6029 def _GetUpdatedParams(self, old_params, update_dict,
6030 default_values, parameter_types):
6031 """Return the new params dict for the given params.
6033 @type old_params: dict
6034 @param old_params: old parameters
6035 @type update_dict: dict
6036 @param update_dict: dict containing new parameter values,
6037 or constants.VALUE_DEFAULT to reset the
6038 parameter to its default value
6039 @type default_values: dict
6040 @param default_values: default values for the filled parameters
6041 @type parameter_types: dict
6042 @param parameter_types: dict mapping target dict keys to types
6043 in constants.ENFORCEABLE_TYPES
6044 @rtype: (dict, dict)
6045 @return: (new_parameters, filled_parameters)
6048 params_copy = copy.deepcopy(old_params)
6049 for key, val in update_dict.iteritems():
6050 if val == constants.VALUE_DEFAULT:
6052 del params_copy[key]
6056 params_copy[key] = val
6057 utils.ForceDictType(params_copy, parameter_types)
6058 params_filled = objects.FillDict(default_values, params_copy)
6059 return (params_copy, params_filled)
6061 def CheckPrereq(self):
6062 """Check prerequisites.
6064 This only checks the instance list against the existing names.
6067 force = self.force = self.op.force
6069 # checking the new params on the primary/secondary nodes
6071 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6072 cluster = self.cluster = self.cfg.GetClusterInfo()
6073 assert self.instance is not None, \
6074 "Cannot retrieve locked instance %s" % self.op.instance_name
6075 pnode = instance.primary_node
6076 nodelist = list(instance.all_nodes)
6078 # hvparams processing
6079 if self.op.hvparams:
6080 i_hvdict, hv_new = self._GetUpdatedParams(
6081 instance.hvparams, self.op.hvparams,
6082 cluster.hvparams[instance.hypervisor],
6083 constants.HVS_PARAMETER_TYPES)
6085 hypervisor.GetHypervisor(
6086 instance.hypervisor).CheckParameterSyntax(hv_new)
6087 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6088 self.hv_new = hv_new # the new actual values
6089 self.hv_inst = i_hvdict # the new dict (without defaults)
6091 self.hv_new = self.hv_inst = {}
6093 # beparams processing
6094 if self.op.beparams:
6095 i_bedict, be_new = self._GetUpdatedParams(
6096 instance.beparams, self.op.beparams,
6097 cluster.beparams[constants.PP_DEFAULT],
6098 constants.BES_PARAMETER_TYPES)
6099 self.be_new = be_new # the new actual values
6100 self.be_inst = i_bedict # the new dict (without defaults)
6102 self.be_new = self.be_inst = {}
6106 if constants.BE_MEMORY in self.op.beparams and not self.force:
6107 mem_check_list = [pnode]
6108 if be_new[constants.BE_AUTO_BALANCE]:
6109 # either we changed auto_balance to yes or it was from before
6110 mem_check_list.extend(instance.secondary_nodes)
6111 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6112 instance.hypervisor)
6113 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6114 instance.hypervisor)
6115 pninfo = nodeinfo[pnode]
6116 msg = pninfo.fail_msg
6118 # Assume the primary node is unreachable and go ahead
6119 self.warn.append("Can't get info from primary node %s: %s" %
6121 elif not isinstance(pninfo.payload.get('memory_free', None), int):
6122 self.warn.append("Node data from primary node %s doesn't contain"
6123 " free memory information" % pnode)
6124 elif instance_info.fail_msg:
6125 self.warn.append("Can't get instance runtime information: %s" %
6126 instance_info.fail_msg)
6128 if instance_info.payload:
6129 current_mem = int(instance_info.payload['memory'])
6131 # Assume instance not running
6132 # (there is a slight race condition here, but it's not very probable,
6133 # and we have no other way to check)
6135 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6136 pninfo.payload['memory_free'])
6138 raise errors.OpPrereqError("This change will prevent the instance"
6139 " from starting, due to %d MB of memory"
6140 " missing on its primary node" % miss_mem)
6142 if be_new[constants.BE_AUTO_BALANCE]:
6143 for node, nres in nodeinfo.items():
6144 if node not in instance.secondary_nodes:
6148 self.warn.append("Can't get info from secondary node %s: %s" %
6150 elif not isinstance(nres.payload.get('memory_free', None), int):
6151 self.warn.append("Secondary node %s didn't return free"
6152 " memory information" % node)
6153 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6154 self.warn.append("Not enough memory to failover instance to"
6155 " secondary node %s" % node)
6160 for nic_op, nic_dict in self.op.nics:
6161 if nic_op == constants.DDM_REMOVE:
6162 if not instance.nics:
6163 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6165 if nic_op != constants.DDM_ADD:
6167 if nic_op < 0 or nic_op >= len(instance.nics):
6168 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6170 (nic_op, len(instance.nics)))
6171 old_nic_params = instance.nics[nic_op].nicparams
6172 old_nic_ip = instance.nics[nic_op].ip
6177 update_params_dict = dict([(key, nic_dict[key])
6178 for key in constants.NICS_PARAMETERS
6179 if key in nic_dict])
6181 if 'bridge' in nic_dict:
6182 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6184 new_nic_params, new_filled_nic_params = \
6185 self._GetUpdatedParams(old_nic_params, update_params_dict,
6186 cluster.nicparams[constants.PP_DEFAULT],
6187 constants.NICS_PARAMETER_TYPES)
6188 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6189 self.nic_pinst[nic_op] = new_nic_params
6190 self.nic_pnew[nic_op] = new_filled_nic_params
6191 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6193 if new_nic_mode == constants.NIC_MODE_BRIDGED:
6194 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6195 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6197 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6199 self.warn.append(msg)
6201 raise errors.OpPrereqError(msg)
6202 if new_nic_mode == constants.NIC_MODE_ROUTED:
6203 if 'ip' in nic_dict:
6204 nic_ip = nic_dict['ip']
6208 raise errors.OpPrereqError('Cannot set the nic ip to None'
6210 if 'mac' in nic_dict:
6211 nic_mac = nic_dict['mac']
6213 raise errors.OpPrereqError('Cannot set the nic mac to None')
6214 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6215 # otherwise generate the mac
6216 nic_dict['mac'] = self.cfg.GenerateMAC()
6218 # or validate/reserve the current one
6219 if self.cfg.IsMacInUse(nic_mac):
6220 raise errors.OpPrereqError("MAC address %s already in use"
6221 " in cluster" % nic_mac)
6224 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6225 raise errors.OpPrereqError("Disk operations not supported for"
6226 " diskless instances")
6227 for disk_op, disk_dict in self.op.disks:
6228 if disk_op == constants.DDM_REMOVE:
6229 if len(instance.disks) == 1:
6230 raise errors.OpPrereqError("Cannot remove the last disk of"
6232 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6233 ins_l = ins_l[pnode]
6234 msg = ins_l.fail_msg
6236 raise errors.OpPrereqError("Can't contact node %s: %s" %
6238 if instance.name in ins_l.payload:
6239 raise errors.OpPrereqError("Instance is running, can't remove"
6242 if (disk_op == constants.DDM_ADD and
6243 len(instance.nics) >= constants.MAX_DISKS):
6244 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6245 " add more" % constants.MAX_DISKS)
6246 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6248 if disk_op < 0 or disk_op >= len(instance.disks):
6249 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6251 (disk_op, len(instance.disks)))
6255 def Exec(self, feedback_fn):
6256 """Modifies an instance.
6258 All parameters take effect only at the next restart of the instance.
6261 # Process here the warnings from CheckPrereq, as we don't have a
6262 # feedback_fn there.
6263 for warn in self.warn:
6264 feedback_fn("WARNING: %s" % warn)
6267 instance = self.instance
6268 cluster = self.cluster
6270 for disk_op, disk_dict in self.op.disks:
6271 if disk_op == constants.DDM_REMOVE:
6272 # remove the last disk
6273 device = instance.disks.pop()
6274 device_idx = len(instance.disks)
6275 for node, disk in device.ComputeNodeTree(instance.primary_node):
6276 self.cfg.SetDiskID(disk, node)
6277 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6279 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6280 " continuing anyway", device_idx, node, msg)
6281 result.append(("disk/%d" % device_idx, "remove"))
6282 elif disk_op == constants.DDM_ADD:
6284 if instance.disk_template == constants.DT_FILE:
6285 file_driver, file_path = instance.disks[0].logical_id
6286 file_path = os.path.dirname(file_path)
6288 file_driver = file_path = None
6289 disk_idx_base = len(instance.disks)
6290 new_disk = _GenerateDiskTemplate(self,
6291 instance.disk_template,
6292 instance.name, instance.primary_node,
6293 instance.secondary_nodes,
6298 instance.disks.append(new_disk)
6299 info = _GetInstanceInfoText(instance)
6301 logging.info("Creating volume %s for instance %s",
6302 new_disk.iv_name, instance.name)
6303 # Note: this needs to be kept in sync with _CreateDisks
6305 for node in instance.all_nodes:
6306 f_create = node == instance.primary_node
6308 _CreateBlockDev(self, node, instance, new_disk,
6309 f_create, info, f_create)
6310 except errors.OpExecError, err:
6311 self.LogWarning("Failed to create volume %s (%s) on"
6313 new_disk.iv_name, new_disk, node, err)
6314 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6315 (new_disk.size, new_disk.mode)))
6317 # change a given disk
6318 instance.disks[disk_op].mode = disk_dict['mode']
6319 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6321 for nic_op, nic_dict in self.op.nics:
6322 if nic_op == constants.DDM_REMOVE:
6323 # remove the last nic
6324 del instance.nics[-1]
6325 result.append(("nic.%d" % len(instance.nics), "remove"))
6326 elif nic_op == constants.DDM_ADD:
6327 # mac and bridge should be set, by now
6328 mac = nic_dict['mac']
6329 ip = nic_dict.get('ip', None)
6330 nicparams = self.nic_pinst[constants.DDM_ADD]
6331 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6332 instance.nics.append(new_nic)
6333 result.append(("nic.%d" % (len(instance.nics) - 1),
6334 "add:mac=%s,ip=%s,mode=%s,link=%s" %
6335 (new_nic.mac, new_nic.ip,
6336 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6337 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6340 for key in 'mac', 'ip':
6342 setattr(instance.nics[nic_op], key, nic_dict[key])
6343 if nic_op in self.nic_pnew:
6344 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6345 for key, val in nic_dict.iteritems():
6346 result.append(("nic.%s/%d" % (key, nic_op), val))
6349 if self.op.hvparams:
6350 instance.hvparams = self.hv_inst
6351 for key, val in self.op.hvparams.iteritems():
6352 result.append(("hv/%s" % key, val))
6355 if self.op.beparams:
6356 instance.beparams = self.be_inst
6357 for key, val in self.op.beparams.iteritems():
6358 result.append(("be/%s" % key, val))
6360 self.cfg.Update(instance)
6365 class LUQueryExports(NoHooksLU):
6366 """Query the exports list
6369 _OP_REQP = ['nodes']
6372 def ExpandNames(self):
6373 self.needed_locks = {}
6374 self.share_locks[locking.LEVEL_NODE] = 1
6375 if not self.op.nodes:
6376 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6378 self.needed_locks[locking.LEVEL_NODE] = \
6379 _GetWantedNodes(self, self.op.nodes)
6381 def CheckPrereq(self):
6382 """Check prerequisites.
6385 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6387 def Exec(self, feedback_fn):
6388 """Compute the list of all the exported system images.
6391 @return: a dictionary with the structure node->(export-list)
6392 where export-list is a list of the instances exported on
6396 rpcresult = self.rpc.call_export_list(self.nodes)
6398 for node in rpcresult:
6399 if rpcresult[node].fail_msg:
6400 result[node] = False
6402 result[node] = rpcresult[node].payload
6407 class LUExportInstance(LogicalUnit):
6408 """Export an instance to an image in the cluster.
6411 HPATH = "instance-export"
6412 HTYPE = constants.HTYPE_INSTANCE
6413 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6416 def ExpandNames(self):
6417 self._ExpandAndLockInstance()
6418 # FIXME: lock only instance primary and destination node
6420 # Sad but true, for now we have do lock all nodes, as we don't know where
6421 # the previous export might be, and and in this LU we search for it and
6422 # remove it from its current node. In the future we could fix this by:
6423 # - making a tasklet to search (share-lock all), then create the new one,
6424 # then one to remove, after
6425 # - removing the removal operation altoghether
6426 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6428 def DeclareLocks(self, level):
6429 """Last minute lock declaration."""
6430 # All nodes are locked anyway, so nothing to do here.
6432 def BuildHooksEnv(self):
6435 This will run on the master, primary node and target node.
6439 "EXPORT_NODE": self.op.target_node,
6440 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6442 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6443 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6444 self.op.target_node]
6447 def CheckPrereq(self):
6448 """Check prerequisites.
6450 This checks that the instance and node names are valid.
6453 instance_name = self.op.instance_name
6454 self.instance = self.cfg.GetInstanceInfo(instance_name)
6455 assert self.instance is not None, \
6456 "Cannot retrieve locked instance %s" % self.op.instance_name
6457 _CheckNodeOnline(self, self.instance.primary_node)
6459 self.dst_node = self.cfg.GetNodeInfo(
6460 self.cfg.ExpandNodeName(self.op.target_node))
6462 if self.dst_node is None:
6463 # This is wrong node name, not a non-locked node
6464 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6465 _CheckNodeOnline(self, self.dst_node.name)
6466 _CheckNodeNotDrained(self, self.dst_node.name)
6468 # instance disk type verification
6469 for disk in self.instance.disks:
6470 if disk.dev_type == constants.LD_FILE:
6471 raise errors.OpPrereqError("Export not supported for instances with"
6472 " file-based disks")
6474 def Exec(self, feedback_fn):
6475 """Export an instance to an image in the cluster.
6478 instance = self.instance
6479 dst_node = self.dst_node
6480 src_node = instance.primary_node
6481 if self.op.shutdown:
6482 # shutdown the instance, but not the disks
6483 result = self.rpc.call_instance_shutdown(src_node, instance)
6484 result.Raise("Could not shutdown instance %s on"
6485 " node %s" % (instance.name, src_node))
6487 vgname = self.cfg.GetVGName()
6491 # set the disks ID correctly since call_instance_start needs the
6492 # correct drbd minor to create the symlinks
6493 for disk in instance.disks:
6494 self.cfg.SetDiskID(disk, src_node)
6497 for idx, disk in enumerate(instance.disks):
6498 # result.payload will be a snapshot of an lvm leaf of the one we passed
6499 result = self.rpc.call_blockdev_snapshot(src_node, disk)
6500 msg = result.fail_msg
6502 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6504 snap_disks.append(False)
6506 disk_id = (vgname, result.payload)
6507 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6508 logical_id=disk_id, physical_id=disk_id,
6509 iv_name=disk.iv_name)
6510 snap_disks.append(new_dev)
6513 if self.op.shutdown and instance.admin_up:
6514 result = self.rpc.call_instance_start(src_node, instance, None, None)
6515 msg = result.fail_msg
6517 _ShutdownInstanceDisks(self, instance)
6518 raise errors.OpExecError("Could not start instance: %s" % msg)
6520 # TODO: check for size
6522 cluster_name = self.cfg.GetClusterName()
6523 for idx, dev in enumerate(snap_disks):
6525 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6526 instance, cluster_name, idx)
6527 msg = result.fail_msg
6529 self.LogWarning("Could not export disk/%s from node %s to"
6530 " node %s: %s", idx, src_node, dst_node.name, msg)
6531 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6533 self.LogWarning("Could not remove snapshot for disk/%d from node"
6534 " %s: %s", idx, src_node, msg)
6536 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6537 msg = result.fail_msg
6539 self.LogWarning("Could not finalize export for instance %s"
6540 " on node %s: %s", instance.name, dst_node.name, msg)
6542 nodelist = self.cfg.GetNodeList()
6543 nodelist.remove(dst_node.name)
6545 # on one-node clusters nodelist will be empty after the removal
6546 # if we proceed the backup would be removed because OpQueryExports
6547 # substitutes an empty list with the full cluster node list.
6548 iname = instance.name
6550 exportlist = self.rpc.call_export_list(nodelist)
6551 for node in exportlist:
6552 if exportlist[node].fail_msg:
6554 if iname in exportlist[node].payload:
6555 msg = self.rpc.call_export_remove(node, iname).fail_msg
6557 self.LogWarning("Could not remove older export for instance %s"
6558 " on node %s: %s", iname, node, msg)
6561 class LURemoveExport(NoHooksLU):
6562 """Remove exports related to the named instance.
6565 _OP_REQP = ["instance_name"]
6568 def ExpandNames(self):
6569 self.needed_locks = {}
6570 # We need all nodes to be locked in order for RemoveExport to work, but we
6571 # don't need to lock the instance itself, as nothing will happen to it (and
6572 # we can remove exports also for a removed instance)
6573 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6575 def CheckPrereq(self):
6576 """Check prerequisites.
6580 def Exec(self, feedback_fn):
6581 """Remove any export.
6584 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6585 # If the instance was not found we'll try with the name that was passed in.
6586 # This will only work if it was an FQDN, though.
6588 if not instance_name:
6590 instance_name = self.op.instance_name
6592 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6593 exportlist = self.rpc.call_export_list(locked_nodes)
6595 for node in exportlist:
6596 msg = exportlist[node].fail_msg
6598 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6600 if instance_name in exportlist[node].payload:
6602 result = self.rpc.call_export_remove(node, instance_name)
6603 msg = result.fail_msg
6605 logging.error("Could not remove export for instance %s"
6606 " on node %s: %s", instance_name, node, msg)
6608 if fqdn_warn and not found:
6609 feedback_fn("Export not found. If trying to remove an export belonging"
6610 " to a deleted instance please use its Fully Qualified"
6614 class TagsLU(NoHooksLU):
6617 This is an abstract class which is the parent of all the other tags LUs.
6621 def ExpandNames(self):
6622 self.needed_locks = {}
6623 if self.op.kind == constants.TAG_NODE:
6624 name = self.cfg.ExpandNodeName(self.op.name)
6626 raise errors.OpPrereqError("Invalid node name (%s)" %
6629 self.needed_locks[locking.LEVEL_NODE] = name
6630 elif self.op.kind == constants.TAG_INSTANCE:
6631 name = self.cfg.ExpandInstanceName(self.op.name)
6633 raise errors.OpPrereqError("Invalid instance name (%s)" %
6636 self.needed_locks[locking.LEVEL_INSTANCE] = name
6638 def CheckPrereq(self):
6639 """Check prerequisites.
6642 if self.op.kind == constants.TAG_CLUSTER:
6643 self.target = self.cfg.GetClusterInfo()
6644 elif self.op.kind == constants.TAG_NODE:
6645 self.target = self.cfg.GetNodeInfo(self.op.name)
6646 elif self.op.kind == constants.TAG_INSTANCE:
6647 self.target = self.cfg.GetInstanceInfo(self.op.name)
6649 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6653 class LUGetTags(TagsLU):
6654 """Returns the tags of a given object.
6657 _OP_REQP = ["kind", "name"]
6660 def Exec(self, feedback_fn):
6661 """Returns the tag list.
6664 return list(self.target.GetTags())
6667 class LUSearchTags(NoHooksLU):
6668 """Searches the tags for a given pattern.
6671 _OP_REQP = ["pattern"]
6674 def ExpandNames(self):
6675 self.needed_locks = {}
6677 def CheckPrereq(self):
6678 """Check prerequisites.
6680 This checks the pattern passed for validity by compiling it.
6684 self.re = re.compile(self.op.pattern)
6685 except re.error, err:
6686 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6687 (self.op.pattern, err))
6689 def Exec(self, feedback_fn):
6690 """Returns the tag list.
6694 tgts = [("/cluster", cfg.GetClusterInfo())]
6695 ilist = cfg.GetAllInstancesInfo().values()
6696 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6697 nlist = cfg.GetAllNodesInfo().values()
6698 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6700 for path, target in tgts:
6701 for tag in target.GetTags():
6702 if self.re.search(tag):
6703 results.append((path, tag))
6707 class LUAddTags(TagsLU):
6708 """Sets a tag on a given object.
6711 _OP_REQP = ["kind", "name", "tags"]
6714 def CheckPrereq(self):
6715 """Check prerequisites.
6717 This checks the type and length of the tag name and value.
6720 TagsLU.CheckPrereq(self)
6721 for tag in self.op.tags:
6722 objects.TaggableObject.ValidateTag(tag)
6724 def Exec(self, feedback_fn):
6729 for tag in self.op.tags:
6730 self.target.AddTag(tag)
6731 except errors.TagError, err:
6732 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6734 self.cfg.Update(self.target)
6735 except errors.ConfigurationError:
6736 raise errors.OpRetryError("There has been a modification to the"
6737 " config file and the operation has been"
6738 " aborted. Please retry.")
6741 class LUDelTags(TagsLU):
6742 """Delete a list of tags from a given object.
6745 _OP_REQP = ["kind", "name", "tags"]
6748 def CheckPrereq(self):
6749 """Check prerequisites.
6751 This checks that we have the given tag.
6754 TagsLU.CheckPrereq(self)
6755 for tag in self.op.tags:
6756 objects.TaggableObject.ValidateTag(tag)
6757 del_tags = frozenset(self.op.tags)
6758 cur_tags = self.target.GetTags()
6759 if not del_tags <= cur_tags:
6760 diff_tags = del_tags - cur_tags
6761 diff_names = ["'%s'" % tag for tag in diff_tags]
6763 raise errors.OpPrereqError("Tag(s) %s not found" %
6764 (",".join(diff_names)))
6766 def Exec(self, feedback_fn):
6767 """Remove the tag from the object.
6770 for tag in self.op.tags:
6771 self.target.RemoveTag(tag)
6773 self.cfg.Update(self.target)
6774 except errors.ConfigurationError:
6775 raise errors.OpRetryError("There has been a modification to the"
6776 " config file and the operation has been"
6777 " aborted. Please retry.")
6780 class LUTestDelay(NoHooksLU):
6781 """Sleep for a specified amount of time.
6783 This LU sleeps on the master and/or nodes for a specified amount of
6787 _OP_REQP = ["duration", "on_master", "on_nodes"]
6790 def ExpandNames(self):
6791 """Expand names and set required locks.
6793 This expands the node list, if any.
6796 self.needed_locks = {}
6797 if self.op.on_nodes:
6798 # _GetWantedNodes can be used here, but is not always appropriate to use
6799 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6801 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6802 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6804 def CheckPrereq(self):
6805 """Check prerequisites.
6809 def Exec(self, feedback_fn):
6810 """Do the actual sleep.
6813 if self.op.on_master:
6814 if not utils.TestDelay(self.op.duration):
6815 raise errors.OpExecError("Error during master delay test")
6816 if self.op.on_nodes:
6817 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6818 for node, node_result in result.items():
6819 node_result.Raise("Failure during rpc call to node %s" % node)
6822 class IAllocator(object):
6823 """IAllocator framework.
6825 An IAllocator instance has three sets of attributes:
6826 - cfg that is needed to query the cluster
6827 - input data (all members of the _KEYS class attribute are required)
6828 - four buffer attributes (in|out_data|text), that represent the
6829 input (to the external script) in text and data structure format,
6830 and the output from it, again in two formats
6831 - the result variables from the script (success, info, nodes) for
6836 "mem_size", "disks", "disk_template",
6837 "os", "tags", "nics", "vcpus", "hypervisor",
6843 def __init__(self, lu, mode, name, **kwargs):
6845 # init buffer variables
6846 self.in_text = self.out_text = self.in_data = self.out_data = None
6847 # init all input fields so that pylint is happy
6850 self.mem_size = self.disks = self.disk_template = None
6851 self.os = self.tags = self.nics = self.vcpus = None
6852 self.hypervisor = None
6853 self.relocate_from = None
6855 self.required_nodes = None
6856 # init result fields
6857 self.success = self.info = self.nodes = None
6858 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6859 keyset = self._ALLO_KEYS
6860 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6861 keyset = self._RELO_KEYS
6863 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6864 " IAllocator" % self.mode)
6866 if key not in keyset:
6867 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6868 " IAllocator" % key)
6869 setattr(self, key, kwargs[key])
6871 if key not in kwargs:
6872 raise errors.ProgrammerError("Missing input parameter '%s' to"
6873 " IAllocator" % key)
6874 self._BuildInputData()
6876 def _ComputeClusterData(self):
6877 """Compute the generic allocator input data.
6879 This is the data that is independent of the actual operation.
6883 cluster_info = cfg.GetClusterInfo()
6886 "version": constants.IALLOCATOR_VERSION,
6887 "cluster_name": cfg.GetClusterName(),
6888 "cluster_tags": list(cluster_info.GetTags()),
6889 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6890 # we don't have job IDs
6892 iinfo = cfg.GetAllInstancesInfo().values()
6893 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6897 node_list = cfg.GetNodeList()
6899 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6900 hypervisor_name = self.hypervisor
6901 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6902 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6904 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6906 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6907 cluster_info.enabled_hypervisors)
6908 for nname, nresult in node_data.items():
6909 # first fill in static (config-based) values
6910 ninfo = cfg.GetNodeInfo(nname)
6912 "tags": list(ninfo.GetTags()),
6913 "primary_ip": ninfo.primary_ip,
6914 "secondary_ip": ninfo.secondary_ip,
6915 "offline": ninfo.offline,
6916 "drained": ninfo.drained,
6917 "master_candidate": ninfo.master_candidate,
6920 if not ninfo.offline:
6921 nresult.Raise("Can't get data for node %s" % nname)
6922 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
6924 remote_info = nresult.payload
6925 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6926 'vg_size', 'vg_free', 'cpu_total']:
6927 if attr not in remote_info:
6928 raise errors.OpExecError("Node '%s' didn't return attribute"
6929 " '%s'" % (nname, attr))
6930 if not isinstance(remote_info[attr], int):
6931 raise errors.OpExecError("Node '%s' returned invalid value"
6933 (nname, attr, remote_info[attr]))
6934 # compute memory used by primary instances
6935 i_p_mem = i_p_up_mem = 0
6936 for iinfo, beinfo in i_list:
6937 if iinfo.primary_node == nname:
6938 i_p_mem += beinfo[constants.BE_MEMORY]
6939 if iinfo.name not in node_iinfo[nname].payload:
6942 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6943 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6944 remote_info['memory_free'] -= max(0, i_mem_diff)
6947 i_p_up_mem += beinfo[constants.BE_MEMORY]
6949 # compute memory used by instances
6951 "total_memory": remote_info['memory_total'],
6952 "reserved_memory": remote_info['memory_dom0'],
6953 "free_memory": remote_info['memory_free'],
6954 "total_disk": remote_info['vg_size'],
6955 "free_disk": remote_info['vg_free'],
6956 "total_cpus": remote_info['cpu_total'],
6957 "i_pri_memory": i_p_mem,
6958 "i_pri_up_memory": i_p_up_mem,
6962 node_results[nname] = pnr
6963 data["nodes"] = node_results
6967 for iinfo, beinfo in i_list:
6969 for nic in iinfo.nics:
6970 filled_params = objects.FillDict(
6971 cluster_info.nicparams[constants.PP_DEFAULT],
6973 nic_dict = {"mac": nic.mac,
6975 "mode": filled_params[constants.NIC_MODE],
6976 "link": filled_params[constants.NIC_LINK],
6978 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6979 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6980 nic_data.append(nic_dict)
6982 "tags": list(iinfo.GetTags()),
6983 "admin_up": iinfo.admin_up,
6984 "vcpus": beinfo[constants.BE_VCPUS],
6985 "memory": beinfo[constants.BE_MEMORY],
6987 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6989 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6990 "disk_template": iinfo.disk_template,
6991 "hypervisor": iinfo.hypervisor,
6993 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6995 instance_data[iinfo.name] = pir
6997 data["instances"] = instance_data
7001 def _AddNewInstance(self):
7002 """Add new instance data to allocator structure.
7004 This in combination with _AllocatorGetClusterData will create the
7005 correct structure needed as input for the allocator.
7007 The checks for the completeness of the opcode must have already been
7013 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7015 if self.disk_template in constants.DTS_NET_MIRROR:
7016 self.required_nodes = 2
7018 self.required_nodes = 1
7022 "disk_template": self.disk_template,
7025 "vcpus": self.vcpus,
7026 "memory": self.mem_size,
7027 "disks": self.disks,
7028 "disk_space_total": disk_space,
7030 "required_nodes": self.required_nodes,
7032 data["request"] = request
7034 def _AddRelocateInstance(self):
7035 """Add relocate instance data to allocator structure.
7037 This in combination with _IAllocatorGetClusterData will create the
7038 correct structure needed as input for the allocator.
7040 The checks for the completeness of the opcode must have already been
7044 instance = self.lu.cfg.GetInstanceInfo(self.name)
7045 if instance is None:
7046 raise errors.ProgrammerError("Unknown instance '%s' passed to"
7047 " IAllocator" % self.name)
7049 if instance.disk_template not in constants.DTS_NET_MIRROR:
7050 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7052 if len(instance.secondary_nodes) != 1:
7053 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7055 self.required_nodes = 1
7056 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7057 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7062 "disk_space_total": disk_space,
7063 "required_nodes": self.required_nodes,
7064 "relocate_from": self.relocate_from,
7066 self.in_data["request"] = request
7068 def _BuildInputData(self):
7069 """Build input data structures.
7072 self._ComputeClusterData()
7074 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7075 self._AddNewInstance()
7077 self._AddRelocateInstance()
7079 self.in_text = serializer.Dump(self.in_data)
7081 def Run(self, name, validate=True, call_fn=None):
7082 """Run an instance allocator and return the results.
7086 call_fn = self.lu.rpc.call_iallocator_runner
7089 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7090 result.Raise("Failure while running the iallocator script")
7092 self.out_text = result.payload
7094 self._ValidateResult()
7096 def _ValidateResult(self):
7097 """Process the allocator results.
7099 This will process and if successful save the result in
7100 self.out_data and the other parameters.
7104 rdict = serializer.Load(self.out_text)
7105 except Exception, err:
7106 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7108 if not isinstance(rdict, dict):
7109 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7111 for key in "success", "info", "nodes":
7112 if key not in rdict:
7113 raise errors.OpExecError("Can't parse iallocator results:"
7114 " missing key '%s'" % key)
7115 setattr(self, key, rdict[key])
7117 if not isinstance(rdict["nodes"], list):
7118 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7120 self.out_data = rdict
7123 class LUTestAllocator(NoHooksLU):
7124 """Run allocator tests.
7126 This LU runs the allocator tests
7129 _OP_REQP = ["direction", "mode", "name"]
7131 def CheckPrereq(self):
7132 """Check prerequisites.
7134 This checks the opcode parameters depending on the director and mode test.
7137 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7138 for attr in ["name", "mem_size", "disks", "disk_template",
7139 "os", "tags", "nics", "vcpus"]:
7140 if not hasattr(self.op, attr):
7141 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7143 iname = self.cfg.ExpandInstanceName(self.op.name)
7144 if iname is not None:
7145 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7147 if not isinstance(self.op.nics, list):
7148 raise errors.OpPrereqError("Invalid parameter 'nics'")
7149 for row in self.op.nics:
7150 if (not isinstance(row, dict) or
7153 "bridge" not in row):
7154 raise errors.OpPrereqError("Invalid contents of the"
7155 " 'nics' parameter")
7156 if not isinstance(self.op.disks, list):
7157 raise errors.OpPrereqError("Invalid parameter 'disks'")
7158 for row in self.op.disks:
7159 if (not isinstance(row, dict) or
7160 "size" not in row or
7161 not isinstance(row["size"], int) or
7162 "mode" not in row or
7163 row["mode"] not in ['r', 'w']):
7164 raise errors.OpPrereqError("Invalid contents of the"
7165 " 'disks' parameter")
7166 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7167 self.op.hypervisor = self.cfg.GetHypervisorType()
7168 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7169 if not hasattr(self.op, "name"):
7170 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7171 fname = self.cfg.ExpandInstanceName(self.op.name)
7173 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7175 self.op.name = fname
7176 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7178 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7181 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7182 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7183 raise errors.OpPrereqError("Missing allocator name")
7184 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7185 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7188 def Exec(self, feedback_fn):
7189 """Run the allocator test.
7192 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7193 ial = IAllocator(self,
7196 mem_size=self.op.mem_size,
7197 disks=self.op.disks,
7198 disk_template=self.op.disk_template,
7202 vcpus=self.op.vcpus,
7203 hypervisor=self.op.hypervisor,
7206 ial = IAllocator(self,
7209 relocate_from=list(self.relocate_from),
7212 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7213 result = ial.in_text
7215 ial.Run(self.op.allocator, validate=False)
7216 result = ial.out_text