4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 self.dry_run_result = None
95 for attr_name in self._OP_REQP:
96 attr_val = getattr(op, attr_name, None)
98 raise errors.OpPrereqError("Required parameter '%s' missing" %
100 self.CheckArguments()
103 """Returns the SshRunner object
107 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
110 ssh = property(fget=__GetSSH)
112 def CheckArguments(self):
113 """Check syntactic validity for the opcode arguments.
115 This method is for doing a simple syntactic check and ensure
116 validity of opcode parameters, without any cluster-related
117 checks. While the same can be accomplished in ExpandNames and/or
118 CheckPrereq, doing these separate is better because:
120 - ExpandNames is left as as purely a lock-related function
121 - CheckPrereq is run after we have aquired locks (and possible
124 The function is allowed to change the self.op attribute so that
125 later methods can no longer worry about missing parameters.
130 def ExpandNames(self):
131 """Expand names for this LU.
133 This method is called before starting to execute the opcode, and it should
134 update all the parameters of the opcode to their canonical form (e.g. a
135 short node name must be fully expanded after this method has successfully
136 completed). This way locking, hooks, logging, ecc. can work correctly.
138 LUs which implement this method must also populate the self.needed_locks
139 member, as a dict with lock levels as keys, and a list of needed lock names
142 - use an empty dict if you don't need any lock
143 - if you don't need any lock at a particular level omit that level
144 - don't put anything for the BGL level
145 - if you want all locks at a level use locking.ALL_SET as a value
147 If you need to share locks (rather than acquire them exclusively) at one
148 level you can modify self.share_locks, setting a true value (usually 1) for
149 that level. By default locks are not shared.
153 # Acquire all nodes and one instance
154 self.needed_locks = {
155 locking.LEVEL_NODE: locking.ALL_SET,
156 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
158 # Acquire just two nodes
159 self.needed_locks = {
160 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
163 self.needed_locks = {} # No, you can't leave it to the default value None
166 # The implementation of this method is mandatory only if the new LU is
167 # concurrent, so that old LUs don't need to be changed all at the same
170 self.needed_locks = {} # Exclusive LUs don't need locks.
172 raise NotImplementedError
174 def DeclareLocks(self, level):
175 """Declare LU locking needs for a level
177 While most LUs can just declare their locking needs at ExpandNames time,
178 sometimes there's the need to calculate some locks after having acquired
179 the ones before. This function is called just before acquiring locks at a
180 particular level, but after acquiring the ones at lower levels, and permits
181 such calculations. It can be used to modify self.needed_locks, and by
182 default it does nothing.
184 This function is only called if you have something already set in
185 self.needed_locks for the level.
187 @param level: Locking level which is going to be locked
188 @type level: member of ganeti.locking.LEVELS
192 def CheckPrereq(self):
193 """Check prerequisites for this LU.
195 This method should check that the prerequisites for the execution
196 of this LU are fulfilled. It can do internode communication, but
197 it should be idempotent - no cluster or system changes are
200 The method should raise errors.OpPrereqError in case something is
201 not fulfilled. Its return value is ignored.
203 This method should also update all the parameters of the opcode to
204 their canonical form if it hasn't been done by ExpandNames before.
207 raise NotImplementedError
209 def Exec(self, feedback_fn):
212 This method should implement the actual work. It should raise
213 errors.OpExecError for failures that are somewhat dealt with in
217 raise NotImplementedError
219 def BuildHooksEnv(self):
220 """Build hooks environment for this LU.
222 This method should return a three-node tuple consisting of: a dict
223 containing the environment that will be used for running the
224 specific hook for this LU, a list of node names on which the hook
225 should run before the execution, and a list of node names on which
226 the hook should run after the execution.
228 The keys of the dict must not have 'GANETI_' prefixed as this will
229 be handled in the hooks runner. Also note additional keys will be
230 added by the hooks runner. If the LU doesn't define any
231 environment, an empty dict (and not None) should be returned.
233 No nodes should be returned as an empty list (and not None).
235 Note that if the HPATH for a LU class is None, this function will
239 raise NotImplementedError
241 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
242 """Notify the LU about the results of its hooks.
244 This method is called every time a hooks phase is executed, and notifies
245 the Logical Unit about the hooks' result. The LU can then use it to alter
246 its result based on the hooks. By default the method does nothing and the
247 previous result is passed back unchanged but any LU can define it if it
248 wants to use the local cluster hook-scripts somehow.
250 @param phase: one of L{constants.HOOKS_PHASE_POST} or
251 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
252 @param hook_results: the results of the multi-node hooks rpc call
253 @param feedback_fn: function used send feedback back to the caller
254 @param lu_result: the previous Exec result this LU had, or None
256 @return: the new Exec result, based on the previous result
262 def _ExpandAndLockInstance(self):
263 """Helper function to expand and lock an instance.
265 Many LUs that work on an instance take its name in self.op.instance_name
266 and need to expand it and then declare the expanded name for locking. This
267 function does it, and then updates self.op.instance_name to the expanded
268 name. It also initializes needed_locks as a dict, if this hasn't been done
272 if self.needed_locks is None:
273 self.needed_locks = {}
275 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
276 "_ExpandAndLockInstance called with instance-level locks set"
277 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
278 if expanded_name is None:
279 raise errors.OpPrereqError("Instance '%s' not known" %
280 self.op.instance_name)
281 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
282 self.op.instance_name = expanded_name
284 def _LockInstancesNodes(self, primary_only=False):
285 """Helper function to declare instances' nodes for locking.
287 This function should be called after locking one or more instances to lock
288 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
289 with all primary or secondary nodes for instances already locked and
290 present in self.needed_locks[locking.LEVEL_INSTANCE].
292 It should be called from DeclareLocks, and for safety only works if
293 self.recalculate_locks[locking.LEVEL_NODE] is set.
295 In the future it may grow parameters to just lock some instance's nodes, or
296 to just lock primaries or secondary nodes, if needed.
298 If should be called in DeclareLocks in a way similar to::
300 if level == locking.LEVEL_NODE:
301 self._LockInstancesNodes()
303 @type primary_only: boolean
304 @param primary_only: only lock primary nodes of locked instances
307 assert locking.LEVEL_NODE in self.recalculate_locks, \
308 "_LockInstancesNodes helper function called with no nodes to recalculate"
310 # TODO: check if we're really been called with the instance locks held
312 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
313 # future we might want to have different behaviors depending on the value
314 # of self.recalculate_locks[locking.LEVEL_NODE]
316 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
317 instance = self.context.cfg.GetInstanceInfo(instance_name)
318 wanted_nodes.append(instance.primary_node)
320 wanted_nodes.extend(instance.secondary_nodes)
322 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
323 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
324 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
325 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
327 del self.recalculate_locks[locking.LEVEL_NODE]
330 class NoHooksLU(LogicalUnit):
331 """Simple LU which runs no hooks.
333 This LU is intended as a parent for other LogicalUnits which will
334 run no hooks, in order to reduce duplicate code.
341 def _GetWantedNodes(lu, nodes):
342 """Returns list of checked and expanded node names.
344 @type lu: L{LogicalUnit}
345 @param lu: the logical unit on whose behalf we execute
347 @param nodes: list of node names or None for all nodes
349 @return: the list of nodes, sorted
350 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
353 if not isinstance(nodes, list):
354 raise errors.OpPrereqError("Invalid argument type 'nodes'")
357 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
358 " non-empty list of nodes whose name is to be expanded.")
362 node = lu.cfg.ExpandNodeName(name)
364 raise errors.OpPrereqError("No such node name '%s'" % name)
367 return utils.NiceSort(wanted)
370 def _GetWantedInstances(lu, instances):
371 """Returns list of checked and expanded instance names.
373 @type lu: L{LogicalUnit}
374 @param lu: the logical unit on whose behalf we execute
375 @type instances: list
376 @param instances: list of instance names or None for all instances
378 @return: the list of instances, sorted
379 @raise errors.OpPrereqError: if the instances parameter is wrong type
380 @raise errors.OpPrereqError: if any of the passed instances is not found
383 if not isinstance(instances, list):
384 raise errors.OpPrereqError("Invalid argument type 'instances'")
389 for name in instances:
390 instance = lu.cfg.ExpandInstanceName(name)
392 raise errors.OpPrereqError("No such instance name '%s'" % name)
393 wanted.append(instance)
396 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
400 def _CheckOutputFields(static, dynamic, selected):
401 """Checks whether all selected fields are valid.
403 @type static: L{utils.FieldSet}
404 @param static: static fields set
405 @type dynamic: L{utils.FieldSet}
406 @param dynamic: dynamic fields set
413 delta = f.NonMatching(selected)
415 raise errors.OpPrereqError("Unknown output fields selected: %s"
419 def _CheckBooleanOpField(op, name):
420 """Validates boolean opcode parameters.
422 This will ensure that an opcode parameter is either a boolean value,
423 or None (but that it always exists).
426 val = getattr(op, name, None)
427 if not (val is None or isinstance(val, bool)):
428 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
430 setattr(op, name, val)
433 def _CheckNodeOnline(lu, node):
434 """Ensure that a given node is online.
436 @param lu: the LU on behalf of which we make the check
437 @param node: the node to check
438 @raise errors.OpPrereqError: if the node is offline
441 if lu.cfg.GetNodeInfo(node).offline:
442 raise errors.OpPrereqError("Can't use offline node %s" % node)
445 def _CheckNodeNotDrained(lu, node):
446 """Ensure that a given node is not drained.
448 @param lu: the LU on behalf of which we make the check
449 @param node: the node to check
450 @raise errors.OpPrereqError: if the node is drained
453 if lu.cfg.GetNodeInfo(node).drained:
454 raise errors.OpPrereqError("Can't use drained node %s" % node)
457 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
458 memory, vcpus, nics, disk_template, disks,
459 bep, hvp, hypervisor):
460 """Builds instance related env variables for hooks
462 This builds the hook environment from individual variables.
465 @param name: the name of the instance
466 @type primary_node: string
467 @param primary_node: the name of the instance's primary node
468 @type secondary_nodes: list
469 @param secondary_nodes: list of secondary nodes as strings
470 @type os_type: string
471 @param os_type: the name of the instance's OS
472 @type status: boolean
473 @param status: the should_run status of the instance
475 @param memory: the memory size of the instance
477 @param vcpus: the count of VCPUs the instance has
479 @param nics: list of tuples (ip, mac, mode, link) representing
480 the NICs the instance has
481 @type disk_template: string
482 @param disk_template: the distk template of the instance
484 @param disks: the list of (size, mode) pairs
486 @param bep: the backend parameters for the instance
488 @param hvp: the hypervisor parameters for the instance
489 @type hypervisor: string
490 @param hypervisor: the hypervisor for the instance
492 @return: the hook environment for this instance
501 "INSTANCE_NAME": name,
502 "INSTANCE_PRIMARY": primary_node,
503 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
504 "INSTANCE_OS_TYPE": os_type,
505 "INSTANCE_STATUS": str_status,
506 "INSTANCE_MEMORY": memory,
507 "INSTANCE_VCPUS": vcpus,
508 "INSTANCE_DISK_TEMPLATE": disk_template,
509 "INSTANCE_HYPERVISOR": hypervisor,
513 nic_count = len(nics)
514 for idx, (ip, mac, mode, link) in enumerate(nics):
517 env["INSTANCE_NIC%d_IP" % idx] = ip
518 env["INSTANCE_NIC%d_MAC" % idx] = mac
519 env["INSTANCE_NIC%d_MODE" % idx] = mode
520 env["INSTANCE_NIC%d_LINK" % idx] = link
521 if mode == constants.NIC_MODE_BRIDGED:
522 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
526 env["INSTANCE_NIC_COUNT"] = nic_count
529 disk_count = len(disks)
530 for idx, (size, mode) in enumerate(disks):
531 env["INSTANCE_DISK%d_SIZE" % idx] = size
532 env["INSTANCE_DISK%d_MODE" % idx] = mode
536 env["INSTANCE_DISK_COUNT"] = disk_count
538 for source, kind in [(bep, "BE"), (hvp, "HV")]:
539 for key, value in source.items():
540 env["INSTANCE_%s_%s" % (kind, key)] = value
544 def _NICListToTuple(lu, nics):
545 """Build a list of nic information tuples.
547 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
548 value in LUQueryInstanceData.
550 @type lu: L{LogicalUnit}
551 @param lu: the logical unit on whose behalf we execute
552 @type nics: list of L{objects.NIC}
553 @param nics: list of nics to convert to hooks tuples
557 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
561 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
562 mode = filled_params[constants.NIC_MODE]
563 link = filled_params[constants.NIC_LINK]
564 hooks_nics.append((ip, mac, mode, link))
567 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
568 """Builds instance related env variables for hooks from an object.
570 @type lu: L{LogicalUnit}
571 @param lu: the logical unit on whose behalf we execute
572 @type instance: L{objects.Instance}
573 @param instance: the instance for which we should build the
576 @param override: dictionary with key/values that will override
579 @return: the hook environment dictionary
582 cluster = lu.cfg.GetClusterInfo()
583 bep = cluster.FillBE(instance)
584 hvp = cluster.FillHV(instance)
586 'name': instance.name,
587 'primary_node': instance.primary_node,
588 'secondary_nodes': instance.secondary_nodes,
589 'os_type': instance.os,
590 'status': instance.admin_up,
591 'memory': bep[constants.BE_MEMORY],
592 'vcpus': bep[constants.BE_VCPUS],
593 'nics': _NICListToTuple(lu, instance.nics),
594 'disk_template': instance.disk_template,
595 'disks': [(disk.size, disk.mode) for disk in instance.disks],
598 'hypervisor': instance.hypervisor,
601 args.update(override)
602 return _BuildInstanceHookEnv(**args)
605 def _AdjustCandidatePool(lu):
606 """Adjust the candidate pool after node operations.
609 mod_list = lu.cfg.MaintainCandidatePool()
611 lu.LogInfo("Promoted nodes to master candidate role: %s",
612 ", ".join(node.name for node in mod_list))
613 for name in mod_list:
614 lu.context.ReaddNode(name)
615 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
617 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
621 def _CheckNicsBridgesExist(lu, target_nics, target_node,
622 profile=constants.PP_DEFAULT):
623 """Check that the brigdes needed by a list of nics exist.
626 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
627 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
628 for nic in target_nics]
629 brlist = [params[constants.NIC_LINK] for params in paramslist
630 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
632 result = lu.rpc.call_bridges_exist(target_node, brlist)
633 result.Raise("Error checking bridges on destination node '%s'" %
634 target_node, prereq=True)
637 def _CheckInstanceBridgesExist(lu, instance, node=None):
638 """Check that the brigdes needed by an instance exist.
642 node = instance.primary_node
643 _CheckNicsBridgesExist(lu, instance.nics, node)
646 class LUDestroyCluster(NoHooksLU):
647 """Logical unit for destroying the cluster.
652 def CheckPrereq(self):
653 """Check prerequisites.
655 This checks whether the cluster is empty.
657 Any errors are signalled by raising errors.OpPrereqError.
660 master = self.cfg.GetMasterNode()
662 nodelist = self.cfg.GetNodeList()
663 if len(nodelist) != 1 or nodelist[0] != master:
664 raise errors.OpPrereqError("There are still %d node(s) in"
665 " this cluster." % (len(nodelist) - 1))
666 instancelist = self.cfg.GetInstanceList()
668 raise errors.OpPrereqError("There are still %d instance(s) in"
669 " this cluster." % len(instancelist))
671 def Exec(self, feedback_fn):
672 """Destroys the cluster.
675 master = self.cfg.GetMasterNode()
676 result = self.rpc.call_node_stop_master(master, False)
677 result.Raise("Could not disable the master role")
678 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
679 utils.CreateBackup(priv_key)
680 utils.CreateBackup(pub_key)
684 class LUVerifyCluster(LogicalUnit):
685 """Verifies the cluster status.
688 HPATH = "cluster-verify"
689 HTYPE = constants.HTYPE_CLUSTER
690 _OP_REQP = ["skip_checks"]
693 def ExpandNames(self):
694 self.needed_locks = {
695 locking.LEVEL_NODE: locking.ALL_SET,
696 locking.LEVEL_INSTANCE: locking.ALL_SET,
698 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
700 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
701 node_result, feedback_fn, master_files,
703 """Run multiple tests against a node.
707 - compares ganeti version
708 - checks vg existance and size > 20G
709 - checks config file checksum
710 - checks ssh to other nodes
712 @type nodeinfo: L{objects.Node}
713 @param nodeinfo: the node to check
714 @param file_list: required list of files
715 @param local_cksum: dictionary of local files and their checksums
716 @param node_result: the results from the node
717 @param feedback_fn: function used to accumulate results
718 @param master_files: list of files that only masters should have
719 @param drbd_map: the useddrbd minors for this node, in
720 form of minor: (instance, must_exist) which correspond to instances
721 and their running status
722 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
727 # main result, node_result should be a non-empty dict
728 if not node_result or not isinstance(node_result, dict):
729 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
732 # compares ganeti version
733 local_version = constants.PROTOCOL_VERSION
734 remote_version = node_result.get('version', None)
735 if not (remote_version and isinstance(remote_version, (list, tuple)) and
736 len(remote_version) == 2):
737 feedback_fn(" - ERROR: connection to %s failed" % (node))
740 if local_version != remote_version[0]:
741 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
742 " node %s %s" % (local_version, node, remote_version[0]))
745 # node seems compatible, we can actually try to look into its results
749 # full package version
750 if constants.RELEASE_VERSION != remote_version[1]:
751 feedback_fn(" - WARNING: software version mismatch: master %s,"
753 (constants.RELEASE_VERSION, node, remote_version[1]))
755 # checks vg existence and size > 20G
756 if vg_name is not None:
757 vglist = node_result.get(constants.NV_VGLIST, None)
759 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
763 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
764 constants.MIN_VG_SIZE)
766 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
769 # checks config file checksum
771 remote_cksum = node_result.get(constants.NV_FILELIST, None)
772 if not isinstance(remote_cksum, dict):
774 feedback_fn(" - ERROR: node hasn't returned file checksum data")
776 for file_name in file_list:
777 node_is_mc = nodeinfo.master_candidate
778 must_have_file = file_name not in master_files
779 if file_name not in remote_cksum:
780 if node_is_mc or must_have_file:
782 feedback_fn(" - ERROR: file '%s' missing" % file_name)
783 elif remote_cksum[file_name] != local_cksum[file_name]:
784 if node_is_mc or must_have_file:
786 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
788 # not candidate and this is not a must-have file
790 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
793 # all good, except non-master/non-must have combination
794 if not node_is_mc and not must_have_file:
795 feedback_fn(" - ERROR: file '%s' should not exist on non master"
796 " candidates" % file_name)
800 if constants.NV_NODELIST not in node_result:
802 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
804 if node_result[constants.NV_NODELIST]:
806 for node in node_result[constants.NV_NODELIST]:
807 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
808 (node, node_result[constants.NV_NODELIST][node]))
810 if constants.NV_NODENETTEST not in node_result:
812 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
814 if node_result[constants.NV_NODENETTEST]:
816 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
818 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
819 (node, node_result[constants.NV_NODENETTEST][node]))
821 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
822 if isinstance(hyp_result, dict):
823 for hv_name, hv_result in hyp_result.iteritems():
824 if hv_result is not None:
825 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
826 (hv_name, hv_result))
828 # check used drbd list
829 if vg_name is not None:
830 used_minors = node_result.get(constants.NV_DRBDLIST, [])
831 if not isinstance(used_minors, (tuple, list)):
832 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
835 for minor, (iname, must_exist) in drbd_map.items():
836 if minor not in used_minors and must_exist:
837 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
838 " not active" % (minor, iname))
840 for minor in used_minors:
841 if minor not in drbd_map:
842 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
848 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
849 node_instance, feedback_fn, n_offline):
850 """Verify an instance.
852 This function checks to see if the required block devices are
853 available on the instance's node.
858 node_current = instanceconfig.primary_node
861 instanceconfig.MapLVsByNode(node_vol_should)
863 for node in node_vol_should:
864 if node in n_offline:
865 # ignore missing volumes on offline nodes
867 for volume in node_vol_should[node]:
868 if node not in node_vol_is or volume not in node_vol_is[node]:
869 feedback_fn(" - ERROR: volume %s missing on node %s" %
873 if instanceconfig.admin_up:
874 if ((node_current not in node_instance or
875 not instance in node_instance[node_current]) and
876 node_current not in n_offline):
877 feedback_fn(" - ERROR: instance %s not running on node %s" %
878 (instance, node_current))
881 for node in node_instance:
882 if (not node == node_current):
883 if instance in node_instance[node]:
884 feedback_fn(" - ERROR: instance %s should not run on node %s" %
890 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
891 """Verify if there are any unknown volumes in the cluster.
893 The .os, .swap and backup volumes are ignored. All other volumes are
899 for node in node_vol_is:
900 for volume in node_vol_is[node]:
901 if node not in node_vol_should or volume not in node_vol_should[node]:
902 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
907 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
908 """Verify the list of running instances.
910 This checks what instances are running but unknown to the cluster.
914 for node in node_instance:
915 for runninginstance in node_instance[node]:
916 if runninginstance not in instancelist:
917 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
918 (runninginstance, node))
922 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
923 """Verify N+1 Memory Resilience.
925 Check that if one single node dies we can still start all the instances it
931 for node, nodeinfo in node_info.iteritems():
932 # This code checks that every node which is now listed as secondary has
933 # enough memory to host all instances it is supposed to should a single
934 # other node in the cluster fail.
935 # FIXME: not ready for failover to an arbitrary node
936 # FIXME: does not support file-backed instances
937 # WARNING: we currently take into account down instances as well as up
938 # ones, considering that even if they're down someone might want to start
939 # them even in the event of a node failure.
940 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
942 for instance in instances:
943 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
944 if bep[constants.BE_AUTO_BALANCE]:
945 needed_mem += bep[constants.BE_MEMORY]
946 if nodeinfo['mfree'] < needed_mem:
947 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
948 " failovers should node %s fail" % (node, prinode))
952 def CheckPrereq(self):
953 """Check prerequisites.
955 Transform the list of checks we're going to skip into a set and check that
956 all its members are valid.
959 self.skip_set = frozenset(self.op.skip_checks)
960 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
961 raise errors.OpPrereqError("Invalid checks to be skipped specified")
963 def BuildHooksEnv(self):
966 Cluster-Verify hooks just rone in the post phase and their failure makes
967 the output be logged in the verify output and the verification to fail.
970 all_nodes = self.cfg.GetNodeList()
972 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
974 for node in self.cfg.GetAllNodesInfo().values():
975 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
977 return env, [], all_nodes
979 def Exec(self, feedback_fn):
980 """Verify integrity of cluster, performing various test on nodes.
984 feedback_fn("* Verifying global settings")
985 for msg in self.cfg.VerifyConfig():
986 feedback_fn(" - ERROR: %s" % msg)
988 vg_name = self.cfg.GetVGName()
989 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
990 nodelist = utils.NiceSort(self.cfg.GetNodeList())
991 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
992 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
993 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
994 for iname in instancelist)
995 i_non_redundant = [] # Non redundant instances
996 i_non_a_balanced = [] # Non auto-balanced instances
997 n_offline = [] # List of offline nodes
998 n_drained = [] # List of nodes being drained
1004 # FIXME: verify OS list
1005 # do local checksums
1006 master_files = [constants.CLUSTER_CONF_FILE]
1008 file_names = ssconf.SimpleStore().GetFileList()
1009 file_names.append(constants.SSL_CERT_FILE)
1010 file_names.append(constants.RAPI_CERT_FILE)
1011 file_names.extend(master_files)
1013 local_checksums = utils.FingerprintFiles(file_names)
1015 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1016 node_verify_param = {
1017 constants.NV_FILELIST: file_names,
1018 constants.NV_NODELIST: [node.name for node in nodeinfo
1019 if not node.offline],
1020 constants.NV_HYPERVISOR: hypervisors,
1021 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1022 node.secondary_ip) for node in nodeinfo
1023 if not node.offline],
1024 constants.NV_INSTANCELIST: hypervisors,
1025 constants.NV_VERSION: None,
1026 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1028 if vg_name is not None:
1029 node_verify_param[constants.NV_VGLIST] = None
1030 node_verify_param[constants.NV_LVLIST] = vg_name
1031 node_verify_param[constants.NV_DRBDLIST] = None
1032 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1033 self.cfg.GetClusterName())
1035 cluster = self.cfg.GetClusterInfo()
1036 master_node = self.cfg.GetMasterNode()
1037 all_drbd_map = self.cfg.ComputeDRBDMap()
1039 for node_i in nodeinfo:
1043 feedback_fn("* Skipping offline node %s" % (node,))
1044 n_offline.append(node)
1047 if node == master_node:
1049 elif node_i.master_candidate:
1050 ntype = "master candidate"
1051 elif node_i.drained:
1053 n_drained.append(node)
1056 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1058 msg = all_nvinfo[node].fail_msg
1060 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1064 nresult = all_nvinfo[node].payload
1066 for minor, instance in all_drbd_map[node].items():
1067 if instance not in instanceinfo:
1068 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1070 # ghost instance should not be running, but otherwise we
1071 # don't give double warnings (both ghost instance and
1072 # unallocated minor in use)
1073 node_drbd[minor] = (instance, False)
1075 instance = instanceinfo[instance]
1076 node_drbd[minor] = (instance.name, instance.admin_up)
1077 result = self._VerifyNode(node_i, file_names, local_checksums,
1078 nresult, feedback_fn, master_files,
1082 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1084 node_volume[node] = {}
1085 elif isinstance(lvdata, basestring):
1086 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1087 (node, utils.SafeEncode(lvdata)))
1089 node_volume[node] = {}
1090 elif not isinstance(lvdata, dict):
1091 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1095 node_volume[node] = lvdata
1098 idata = nresult.get(constants.NV_INSTANCELIST, None)
1099 if not isinstance(idata, list):
1100 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1105 node_instance[node] = idata
1108 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1109 if not isinstance(nodeinfo, dict):
1110 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1116 "mfree": int(nodeinfo['memory_free']),
1119 # dictionary holding all instances this node is secondary for,
1120 # grouped by their primary node. Each key is a cluster node, and each
1121 # value is a list of instances which have the key as primary and the
1122 # current node as secondary. this is handy to calculate N+1 memory
1123 # availability if you can only failover from a primary to its
1125 "sinst-by-pnode": {},
1127 # FIXME: devise a free space model for file based instances as well
1128 if vg_name is not None:
1129 if (constants.NV_VGLIST not in nresult or
1130 vg_name not in nresult[constants.NV_VGLIST]):
1131 feedback_fn(" - ERROR: node %s didn't return data for the"
1132 " volume group '%s' - it is either missing or broken" %
1136 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1137 except (ValueError, KeyError):
1138 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1139 " from node %s" % (node,))
1143 node_vol_should = {}
1145 for instance in instancelist:
1146 feedback_fn("* Verifying instance %s" % instance)
1147 inst_config = instanceinfo[instance]
1148 result = self._VerifyInstance(instance, inst_config, node_volume,
1149 node_instance, feedback_fn, n_offline)
1151 inst_nodes_offline = []
1153 inst_config.MapLVsByNode(node_vol_should)
1155 instance_cfg[instance] = inst_config
1157 pnode = inst_config.primary_node
1158 if pnode in node_info:
1159 node_info[pnode]['pinst'].append(instance)
1160 elif pnode not in n_offline:
1161 feedback_fn(" - ERROR: instance %s, connection to primary node"
1162 " %s failed" % (instance, pnode))
1165 if pnode in n_offline:
1166 inst_nodes_offline.append(pnode)
1168 # If the instance is non-redundant we cannot survive losing its primary
1169 # node, so we are not N+1 compliant. On the other hand we have no disk
1170 # templates with more than one secondary so that situation is not well
1172 # FIXME: does not support file-backed instances
1173 if len(inst_config.secondary_nodes) == 0:
1174 i_non_redundant.append(instance)
1175 elif len(inst_config.secondary_nodes) > 1:
1176 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1179 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1180 i_non_a_balanced.append(instance)
1182 for snode in inst_config.secondary_nodes:
1183 if snode in node_info:
1184 node_info[snode]['sinst'].append(instance)
1185 if pnode not in node_info[snode]['sinst-by-pnode']:
1186 node_info[snode]['sinst-by-pnode'][pnode] = []
1187 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1188 elif snode not in n_offline:
1189 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1190 " %s failed" % (instance, snode))
1192 if snode in n_offline:
1193 inst_nodes_offline.append(snode)
1195 if inst_nodes_offline:
1196 # warn that the instance lives on offline nodes, and set bad=True
1197 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1198 ", ".join(inst_nodes_offline))
1201 feedback_fn("* Verifying orphan volumes")
1202 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1206 feedback_fn("* Verifying remaining instances")
1207 result = self._VerifyOrphanInstances(instancelist, node_instance,
1211 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1212 feedback_fn("* Verifying N+1 Memory redundancy")
1213 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1216 feedback_fn("* Other Notes")
1218 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1219 % len(i_non_redundant))
1221 if i_non_a_balanced:
1222 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1223 % len(i_non_a_balanced))
1226 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1229 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1233 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1234 """Analize the post-hooks' result
1236 This method analyses the hook result, handles it, and sends some
1237 nicely-formatted feedback back to the user.
1239 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1240 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1241 @param hooks_results: the results of the multi-node hooks rpc call
1242 @param feedback_fn: function used send feedback back to the caller
1243 @param lu_result: previous Exec result
1244 @return: the new Exec result, based on the previous result
1248 # We only really run POST phase hooks, and are only interested in
1250 if phase == constants.HOOKS_PHASE_POST:
1251 # Used to change hooks' output to proper indentation
1252 indent_re = re.compile('^', re.M)
1253 feedback_fn("* Hooks Results")
1254 if not hooks_results:
1255 feedback_fn(" - ERROR: general communication failure")
1258 for node_name in hooks_results:
1259 show_node_header = True
1260 res = hooks_results[node_name]
1264 # no need to warn or set fail return value
1266 feedback_fn(" Communication failure in hooks execution: %s" %
1270 for script, hkr, output in res.payload:
1271 if hkr == constants.HKR_FAIL:
1272 # The node header is only shown once, if there are
1273 # failing hooks on that node
1274 if show_node_header:
1275 feedback_fn(" Node %s:" % node_name)
1276 show_node_header = False
1277 feedback_fn(" ERROR: Script %s failed, output:" % script)
1278 output = indent_re.sub(' ', output)
1279 feedback_fn("%s" % output)
1285 class LUVerifyDisks(NoHooksLU):
1286 """Verifies the cluster disks status.
1292 def ExpandNames(self):
1293 self.needed_locks = {
1294 locking.LEVEL_NODE: locking.ALL_SET,
1295 locking.LEVEL_INSTANCE: locking.ALL_SET,
1297 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1299 def CheckPrereq(self):
1300 """Check prerequisites.
1302 This has no prerequisites.
1307 def Exec(self, feedback_fn):
1308 """Verify integrity of cluster disks.
1310 @rtype: tuple of three items
1311 @return: a tuple of (dict of node-to-node_error, list of instances
1312 which need activate-disks, dict of instance: (node, volume) for
1316 result = res_nodes, res_instances, res_missing = {}, [], {}
1318 vg_name = self.cfg.GetVGName()
1319 nodes = utils.NiceSort(self.cfg.GetNodeList())
1320 instances = [self.cfg.GetInstanceInfo(name)
1321 for name in self.cfg.GetInstanceList()]
1324 for inst in instances:
1326 if (not inst.admin_up or
1327 inst.disk_template not in constants.DTS_NET_MIRROR):
1329 inst.MapLVsByNode(inst_lvs)
1330 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1331 for node, vol_list in inst_lvs.iteritems():
1332 for vol in vol_list:
1333 nv_dict[(node, vol)] = inst
1338 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1343 node_res = node_lvs[node]
1344 if node_res.offline:
1346 msg = node_res.fail_msg
1348 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1349 res_nodes[node] = msg
1352 lvs = node_res.payload
1353 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1354 inst = nv_dict.pop((node, lv_name), None)
1355 if (not lv_online and inst is not None
1356 and inst.name not in res_instances):
1357 res_instances.append(inst.name)
1359 # any leftover items in nv_dict are missing LVs, let's arrange the
1361 for key, inst in nv_dict.iteritems():
1362 if inst.name not in res_missing:
1363 res_missing[inst.name] = []
1364 res_missing[inst.name].append(key)
1369 class LURenameCluster(LogicalUnit):
1370 """Rename the cluster.
1373 HPATH = "cluster-rename"
1374 HTYPE = constants.HTYPE_CLUSTER
1377 def BuildHooksEnv(self):
1382 "OP_TARGET": self.cfg.GetClusterName(),
1383 "NEW_NAME": self.op.name,
1385 mn = self.cfg.GetMasterNode()
1386 return env, [mn], [mn]
1388 def CheckPrereq(self):
1389 """Verify that the passed name is a valid one.
1392 hostname = utils.HostInfo(self.op.name)
1394 new_name = hostname.name
1395 self.ip = new_ip = hostname.ip
1396 old_name = self.cfg.GetClusterName()
1397 old_ip = self.cfg.GetMasterIP()
1398 if new_name == old_name and new_ip == old_ip:
1399 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1400 " cluster has changed")
1401 if new_ip != old_ip:
1402 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1403 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1404 " reachable on the network. Aborting." %
1407 self.op.name = new_name
1409 def Exec(self, feedback_fn):
1410 """Rename the cluster.
1413 clustername = self.op.name
1416 # shutdown the master IP
1417 master = self.cfg.GetMasterNode()
1418 result = self.rpc.call_node_stop_master(master, False)
1419 result.Raise("Could not disable the master role")
1422 cluster = self.cfg.GetClusterInfo()
1423 cluster.cluster_name = clustername
1424 cluster.master_ip = ip
1425 self.cfg.Update(cluster)
1427 # update the known hosts file
1428 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1429 node_list = self.cfg.GetNodeList()
1431 node_list.remove(master)
1434 result = self.rpc.call_upload_file(node_list,
1435 constants.SSH_KNOWN_HOSTS_FILE)
1436 for to_node, to_result in result.iteritems():
1437 msg = to_result.fail_msg
1439 msg = ("Copy of file %s to node %s failed: %s" %
1440 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1441 self.proc.LogWarning(msg)
1444 result = self.rpc.call_node_start_master(master, False)
1445 msg = result.fail_msg
1447 self.LogWarning("Could not re-enable the master role on"
1448 " the master, please restart manually: %s", msg)
1451 def _RecursiveCheckIfLVMBased(disk):
1452 """Check if the given disk or its children are lvm-based.
1454 @type disk: L{objects.Disk}
1455 @param disk: the disk to check
1457 @return: boolean indicating whether a LD_LV dev_type was found or not
1461 for chdisk in disk.children:
1462 if _RecursiveCheckIfLVMBased(chdisk):
1464 return disk.dev_type == constants.LD_LV
1467 class LUSetClusterParams(LogicalUnit):
1468 """Change the parameters of the cluster.
1471 HPATH = "cluster-modify"
1472 HTYPE = constants.HTYPE_CLUSTER
1476 def CheckArguments(self):
1480 if not hasattr(self.op, "candidate_pool_size"):
1481 self.op.candidate_pool_size = None
1482 if self.op.candidate_pool_size is not None:
1484 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1485 except (ValueError, TypeError), err:
1486 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1488 if self.op.candidate_pool_size < 1:
1489 raise errors.OpPrereqError("At least one master candidate needed")
1491 def ExpandNames(self):
1492 # FIXME: in the future maybe other cluster params won't require checking on
1493 # all nodes to be modified.
1494 self.needed_locks = {
1495 locking.LEVEL_NODE: locking.ALL_SET,
1497 self.share_locks[locking.LEVEL_NODE] = 1
1499 def BuildHooksEnv(self):
1504 "OP_TARGET": self.cfg.GetClusterName(),
1505 "NEW_VG_NAME": self.op.vg_name,
1507 mn = self.cfg.GetMasterNode()
1508 return env, [mn], [mn]
1510 def CheckPrereq(self):
1511 """Check prerequisites.
1513 This checks whether the given params don't conflict and
1514 if the given volume group is valid.
1517 if self.op.vg_name is not None and not self.op.vg_name:
1518 instances = self.cfg.GetAllInstancesInfo().values()
1519 for inst in instances:
1520 for disk in inst.disks:
1521 if _RecursiveCheckIfLVMBased(disk):
1522 raise errors.OpPrereqError("Cannot disable lvm storage while"
1523 " lvm-based instances exist")
1525 node_list = self.acquired_locks[locking.LEVEL_NODE]
1527 # if vg_name not None, checks given volume group on all nodes
1529 vglist = self.rpc.call_vg_list(node_list)
1530 for node in node_list:
1531 msg = vglist[node].fail_msg
1533 # ignoring down node
1534 self.LogWarning("Error while gathering data on node %s"
1535 " (ignoring node): %s", node, msg)
1537 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1539 constants.MIN_VG_SIZE)
1541 raise errors.OpPrereqError("Error on node '%s': %s" %
1544 self.cluster = cluster = self.cfg.GetClusterInfo()
1545 # validate params changes
1546 if self.op.beparams:
1547 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1548 self.new_beparams = objects.FillDict(
1549 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1551 if self.op.nicparams:
1552 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1553 self.new_nicparams = objects.FillDict(
1554 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1555 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1557 # hypervisor list/parameters
1558 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1559 if self.op.hvparams:
1560 if not isinstance(self.op.hvparams, dict):
1561 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1562 for hv_name, hv_dict in self.op.hvparams.items():
1563 if hv_name not in self.new_hvparams:
1564 self.new_hvparams[hv_name] = hv_dict
1566 self.new_hvparams[hv_name].update(hv_dict)
1568 if self.op.enabled_hypervisors is not None:
1569 self.hv_list = self.op.enabled_hypervisors
1571 self.hv_list = cluster.enabled_hypervisors
1573 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1574 # either the enabled list has changed, or the parameters have, validate
1575 for hv_name, hv_params in self.new_hvparams.items():
1576 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1577 (self.op.enabled_hypervisors and
1578 hv_name in self.op.enabled_hypervisors)):
1579 # either this is a new hypervisor, or its parameters have changed
1580 hv_class = hypervisor.GetHypervisor(hv_name)
1581 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1582 hv_class.CheckParameterSyntax(hv_params)
1583 _CheckHVParams(self, node_list, hv_name, hv_params)
1585 def Exec(self, feedback_fn):
1586 """Change the parameters of the cluster.
1589 if self.op.vg_name is not None:
1590 new_volume = self.op.vg_name
1593 if new_volume != self.cfg.GetVGName():
1594 self.cfg.SetVGName(new_volume)
1596 feedback_fn("Cluster LVM configuration already in desired"
1597 " state, not changing")
1598 if self.op.hvparams:
1599 self.cluster.hvparams = self.new_hvparams
1600 if self.op.enabled_hypervisors is not None:
1601 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1602 if self.op.beparams:
1603 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1604 if self.op.nicparams:
1605 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1607 if self.op.candidate_pool_size is not None:
1608 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1609 # we need to update the pool size here, otherwise the save will fail
1610 _AdjustCandidatePool(self)
1612 self.cfg.Update(self.cluster)
1615 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1616 """Distribute additional files which are part of the cluster configuration.
1618 ConfigWriter takes care of distributing the config and ssconf files, but
1619 there are more files which should be distributed to all nodes. This function
1620 makes sure those are copied.
1622 @param lu: calling logical unit
1623 @param additional_nodes: list of nodes not in the config to distribute to
1626 # 1. Gather target nodes
1627 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1628 dist_nodes = lu.cfg.GetNodeList()
1629 if additional_nodes is not None:
1630 dist_nodes.extend(additional_nodes)
1631 if myself.name in dist_nodes:
1632 dist_nodes.remove(myself.name)
1633 # 2. Gather files to distribute
1634 dist_files = set([constants.ETC_HOSTS,
1635 constants.SSH_KNOWN_HOSTS_FILE,
1636 constants.RAPI_CERT_FILE,
1637 constants.RAPI_USERS_FILE,
1640 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1641 for hv_name in enabled_hypervisors:
1642 hv_class = hypervisor.GetHypervisor(hv_name)
1643 dist_files.update(hv_class.GetAncillaryFiles())
1645 # 3. Perform the files upload
1646 for fname in dist_files:
1647 if os.path.exists(fname):
1648 result = lu.rpc.call_upload_file(dist_nodes, fname)
1649 for to_node, to_result in result.items():
1650 msg = to_result.fail_msg
1652 msg = ("Copy of file %s to node %s failed: %s" %
1653 (fname, to_node, msg))
1654 lu.proc.LogWarning(msg)
1657 class LURedistributeConfig(NoHooksLU):
1658 """Force the redistribution of cluster configuration.
1660 This is a very simple LU.
1666 def ExpandNames(self):
1667 self.needed_locks = {
1668 locking.LEVEL_NODE: locking.ALL_SET,
1670 self.share_locks[locking.LEVEL_NODE] = 1
1672 def CheckPrereq(self):
1673 """Check prerequisites.
1677 def Exec(self, feedback_fn):
1678 """Redistribute the configuration.
1681 self.cfg.Update(self.cfg.GetClusterInfo())
1682 _RedistributeAncillaryFiles(self)
1685 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1686 """Sleep and poll for an instance's disk to sync.
1689 if not instance.disks:
1693 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1695 node = instance.primary_node
1697 for dev in instance.disks:
1698 lu.cfg.SetDiskID(dev, node)
1701 degr_retries = 10 # in seconds, as we sleep 1 second each time
1705 cumul_degraded = False
1706 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1707 msg = rstats.fail_msg
1709 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1712 raise errors.RemoteError("Can't contact node %s for mirror data,"
1713 " aborting." % node)
1716 rstats = rstats.payload
1718 for i, mstat in enumerate(rstats):
1720 lu.LogWarning("Can't compute data for node %s/%s",
1721 node, instance.disks[i].iv_name)
1723 # we ignore the ldisk parameter
1724 perc_done, est_time, is_degraded, _ = mstat
1725 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1726 if perc_done is not None:
1728 if est_time is not None:
1729 rem_time = "%d estimated seconds remaining" % est_time
1732 rem_time = "no time estimate"
1733 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1734 (instance.disks[i].iv_name, perc_done, rem_time))
1736 # if we're done but degraded, let's do a few small retries, to
1737 # make sure we see a stable and not transient situation; therefore
1738 # we force restart of the loop
1739 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1740 logging.info("Degraded disks found, %d retries left", degr_retries)
1748 time.sleep(min(60, max_time))
1751 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1752 return not cumul_degraded
1755 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1756 """Check that mirrors are not degraded.
1758 The ldisk parameter, if True, will change the test from the
1759 is_degraded attribute (which represents overall non-ok status for
1760 the device(s)) to the ldisk (representing the local storage status).
1763 lu.cfg.SetDiskID(dev, node)
1770 if on_primary or dev.AssembleOnSecondary():
1771 rstats = lu.rpc.call_blockdev_find(node, dev)
1772 msg = rstats.fail_msg
1774 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1776 elif not rstats.payload:
1777 lu.LogWarning("Can't find disk on node %s", node)
1780 result = result and (not rstats.payload[idx])
1782 for child in dev.children:
1783 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1788 class LUDiagnoseOS(NoHooksLU):
1789 """Logical unit for OS diagnose/query.
1792 _OP_REQP = ["output_fields", "names"]
1794 _FIELDS_STATIC = utils.FieldSet()
1795 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1797 def ExpandNames(self):
1799 raise errors.OpPrereqError("Selective OS query not supported")
1801 _CheckOutputFields(static=self._FIELDS_STATIC,
1802 dynamic=self._FIELDS_DYNAMIC,
1803 selected=self.op.output_fields)
1805 # Lock all nodes, in shared mode
1806 # Temporary removal of locks, should be reverted later
1807 # TODO: reintroduce locks when they are lighter-weight
1808 self.needed_locks = {}
1809 #self.share_locks[locking.LEVEL_NODE] = 1
1810 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1812 def CheckPrereq(self):
1813 """Check prerequisites.
1818 def _DiagnoseByOS(node_list, rlist):
1819 """Remaps a per-node return list into an a per-os per-node dictionary
1821 @param node_list: a list with the names of all nodes
1822 @param rlist: a map with node names as keys and OS objects as values
1825 @return: a dictionary with osnames as keys and as value another map, with
1826 nodes as keys and tuples of (path, status, diagnose) as values, eg::
1828 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1829 (/srv/..., False, "invalid api")],
1830 "node2": [(/srv/..., True, "")]}
1835 # we build here the list of nodes that didn't fail the RPC (at RPC
1836 # level), so that nodes with a non-responding node daemon don't
1837 # make all OSes invalid
1838 good_nodes = [node_name for node_name in rlist
1839 if not rlist[node_name].fail_msg]
1840 for node_name, nr in rlist.items():
1841 if nr.fail_msg or not nr.payload:
1843 for name, path, status, diagnose in nr.payload:
1844 if name not in all_os:
1845 # build a list of nodes for this os containing empty lists
1846 # for each node in node_list
1848 for nname in good_nodes:
1849 all_os[name][nname] = []
1850 all_os[name][node_name].append((path, status, diagnose))
1853 def Exec(self, feedback_fn):
1854 """Compute the list of OSes.
1857 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1858 node_data = self.rpc.call_os_diagnose(valid_nodes)
1859 pol = self._DiagnoseByOS(valid_nodes, node_data)
1861 for os_name, os_data in pol.items():
1863 for field in self.op.output_fields:
1866 elif field == "valid":
1867 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1868 elif field == "node_status":
1869 # this is just a copy of the dict
1871 for node_name, nos_list in os_data.items():
1872 val[node_name] = nos_list
1874 raise errors.ParameterError(field)
1881 class LURemoveNode(LogicalUnit):
1882 """Logical unit for removing a node.
1885 HPATH = "node-remove"
1886 HTYPE = constants.HTYPE_NODE
1887 _OP_REQP = ["node_name"]
1889 def BuildHooksEnv(self):
1892 This doesn't run on the target node in the pre phase as a failed
1893 node would then be impossible to remove.
1897 "OP_TARGET": self.op.node_name,
1898 "NODE_NAME": self.op.node_name,
1900 all_nodes = self.cfg.GetNodeList()
1901 all_nodes.remove(self.op.node_name)
1902 return env, all_nodes, all_nodes
1904 def CheckPrereq(self):
1905 """Check prerequisites.
1908 - the node exists in the configuration
1909 - it does not have primary or secondary instances
1910 - it's not the master
1912 Any errors are signalled by raising errors.OpPrereqError.
1915 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1917 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1919 instance_list = self.cfg.GetInstanceList()
1921 masternode = self.cfg.GetMasterNode()
1922 if node.name == masternode:
1923 raise errors.OpPrereqError("Node is the master node,"
1924 " you need to failover first.")
1926 for instance_name in instance_list:
1927 instance = self.cfg.GetInstanceInfo(instance_name)
1928 if node.name in instance.all_nodes:
1929 raise errors.OpPrereqError("Instance %s is still running on the node,"
1930 " please remove first." % instance_name)
1931 self.op.node_name = node.name
1934 def Exec(self, feedback_fn):
1935 """Removes the node from the cluster.
1939 logging.info("Stopping the node daemon and removing configs from node %s",
1942 self.context.RemoveNode(node.name)
1944 result = self.rpc.call_node_leave_cluster(node.name)
1945 msg = result.fail_msg
1947 self.LogWarning("Errors encountered on the remote node while leaving"
1948 " the cluster: %s", msg)
1950 # Promote nodes to master candidate as needed
1951 _AdjustCandidatePool(self)
1954 class LUQueryNodes(NoHooksLU):
1955 """Logical unit for querying nodes.
1958 _OP_REQP = ["output_fields", "names", "use_locking"]
1960 _FIELDS_DYNAMIC = utils.FieldSet(
1962 "mtotal", "mnode", "mfree",
1964 "ctotal", "cnodes", "csockets",
1967 _FIELDS_STATIC = utils.FieldSet(
1968 "name", "pinst_cnt", "sinst_cnt",
1969 "pinst_list", "sinst_list",
1970 "pip", "sip", "tags",
1979 def ExpandNames(self):
1980 _CheckOutputFields(static=self._FIELDS_STATIC,
1981 dynamic=self._FIELDS_DYNAMIC,
1982 selected=self.op.output_fields)
1984 self.needed_locks = {}
1985 self.share_locks[locking.LEVEL_NODE] = 1
1988 self.wanted = _GetWantedNodes(self, self.op.names)
1990 self.wanted = locking.ALL_SET
1992 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1993 self.do_locking = self.do_node_query and self.op.use_locking
1995 # if we don't request only static fields, we need to lock the nodes
1996 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1999 def CheckPrereq(self):
2000 """Check prerequisites.
2003 # The validation of the node list is done in the _GetWantedNodes,
2004 # if non empty, and if empty, there's no validation to do
2007 def Exec(self, feedback_fn):
2008 """Computes the list of nodes and their attributes.
2011 all_info = self.cfg.GetAllNodesInfo()
2013 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2014 elif self.wanted != locking.ALL_SET:
2015 nodenames = self.wanted
2016 missing = set(nodenames).difference(all_info.keys())
2018 raise errors.OpExecError(
2019 "Some nodes were removed before retrieving their data: %s" % missing)
2021 nodenames = all_info.keys()
2023 nodenames = utils.NiceSort(nodenames)
2024 nodelist = [all_info[name] for name in nodenames]
2026 # begin data gathering
2028 if self.do_node_query:
2030 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2031 self.cfg.GetHypervisorType())
2032 for name in nodenames:
2033 nodeinfo = node_data[name]
2034 if not nodeinfo.fail_msg and nodeinfo.payload:
2035 nodeinfo = nodeinfo.payload
2036 fn = utils.TryConvert
2038 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2039 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2040 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2041 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2042 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2043 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2044 "bootid": nodeinfo.get('bootid', None),
2045 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2046 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2049 live_data[name] = {}
2051 live_data = dict.fromkeys(nodenames, {})
2053 node_to_primary = dict([(name, set()) for name in nodenames])
2054 node_to_secondary = dict([(name, set()) for name in nodenames])
2056 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2057 "sinst_cnt", "sinst_list"))
2058 if inst_fields & frozenset(self.op.output_fields):
2059 instancelist = self.cfg.GetInstanceList()
2061 for instance_name in instancelist:
2062 inst = self.cfg.GetInstanceInfo(instance_name)
2063 if inst.primary_node in node_to_primary:
2064 node_to_primary[inst.primary_node].add(inst.name)
2065 for secnode in inst.secondary_nodes:
2066 if secnode in node_to_secondary:
2067 node_to_secondary[secnode].add(inst.name)
2069 master_node = self.cfg.GetMasterNode()
2071 # end data gathering
2074 for node in nodelist:
2076 for field in self.op.output_fields:
2079 elif field == "pinst_list":
2080 val = list(node_to_primary[node.name])
2081 elif field == "sinst_list":
2082 val = list(node_to_secondary[node.name])
2083 elif field == "pinst_cnt":
2084 val = len(node_to_primary[node.name])
2085 elif field == "sinst_cnt":
2086 val = len(node_to_secondary[node.name])
2087 elif field == "pip":
2088 val = node.primary_ip
2089 elif field == "sip":
2090 val = node.secondary_ip
2091 elif field == "tags":
2092 val = list(node.GetTags())
2093 elif field == "serial_no":
2094 val = node.serial_no
2095 elif field == "master_candidate":
2096 val = node.master_candidate
2097 elif field == "master":
2098 val = node.name == master_node
2099 elif field == "offline":
2101 elif field == "drained":
2103 elif self._FIELDS_DYNAMIC.Matches(field):
2104 val = live_data[node.name].get(field, None)
2105 elif field == "role":
2106 if node.name == master_node:
2108 elif node.master_candidate:
2117 raise errors.ParameterError(field)
2118 node_output.append(val)
2119 output.append(node_output)
2124 class LUQueryNodeVolumes(NoHooksLU):
2125 """Logical unit for getting volumes on node(s).
2128 _OP_REQP = ["nodes", "output_fields"]
2130 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2131 _FIELDS_STATIC = utils.FieldSet("node")
2133 def ExpandNames(self):
2134 _CheckOutputFields(static=self._FIELDS_STATIC,
2135 dynamic=self._FIELDS_DYNAMIC,
2136 selected=self.op.output_fields)
2138 self.needed_locks = {}
2139 self.share_locks[locking.LEVEL_NODE] = 1
2140 if not self.op.nodes:
2141 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2143 self.needed_locks[locking.LEVEL_NODE] = \
2144 _GetWantedNodes(self, self.op.nodes)
2146 def CheckPrereq(self):
2147 """Check prerequisites.
2149 This checks that the fields required are valid output fields.
2152 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2154 def Exec(self, feedback_fn):
2155 """Computes the list of nodes and their attributes.
2158 nodenames = self.nodes
2159 volumes = self.rpc.call_node_volumes(nodenames)
2161 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2162 in self.cfg.GetInstanceList()]
2164 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2167 for node in nodenames:
2168 nresult = volumes[node]
2171 msg = nresult.fail_msg
2173 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2176 node_vols = nresult.payload[:]
2177 node_vols.sort(key=lambda vol: vol['dev'])
2179 for vol in node_vols:
2181 for field in self.op.output_fields:
2184 elif field == "phys":
2188 elif field == "name":
2190 elif field == "size":
2191 val = int(float(vol['size']))
2192 elif field == "instance":
2194 if node not in lv_by_node[inst]:
2196 if vol['name'] in lv_by_node[inst][node]:
2202 raise errors.ParameterError(field)
2203 node_output.append(str(val))
2205 output.append(node_output)
2210 class LUAddNode(LogicalUnit):
2211 """Logical unit for adding node to the cluster.
2215 HTYPE = constants.HTYPE_NODE
2216 _OP_REQP = ["node_name"]
2218 def BuildHooksEnv(self):
2221 This will run on all nodes before, and on all nodes + the new node after.
2225 "OP_TARGET": self.op.node_name,
2226 "NODE_NAME": self.op.node_name,
2227 "NODE_PIP": self.op.primary_ip,
2228 "NODE_SIP": self.op.secondary_ip,
2230 nodes_0 = self.cfg.GetNodeList()
2231 nodes_1 = nodes_0 + [self.op.node_name, ]
2232 return env, nodes_0, nodes_1
2234 def CheckPrereq(self):
2235 """Check prerequisites.
2238 - the new node is not already in the config
2240 - its parameters (single/dual homed) matches the cluster
2242 Any errors are signalled by raising errors.OpPrereqError.
2245 node_name = self.op.node_name
2248 dns_data = utils.HostInfo(node_name)
2250 node = dns_data.name
2251 primary_ip = self.op.primary_ip = dns_data.ip
2252 secondary_ip = getattr(self.op, "secondary_ip", None)
2253 if secondary_ip is None:
2254 secondary_ip = primary_ip
2255 if not utils.IsValidIP(secondary_ip):
2256 raise errors.OpPrereqError("Invalid secondary IP given")
2257 self.op.secondary_ip = secondary_ip
2259 node_list = cfg.GetNodeList()
2260 if not self.op.readd and node in node_list:
2261 raise errors.OpPrereqError("Node %s is already in the configuration" %
2263 elif self.op.readd and node not in node_list:
2264 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2266 for existing_node_name in node_list:
2267 existing_node = cfg.GetNodeInfo(existing_node_name)
2269 if self.op.readd and node == existing_node_name:
2270 if (existing_node.primary_ip != primary_ip or
2271 existing_node.secondary_ip != secondary_ip):
2272 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2273 " address configuration as before")
2276 if (existing_node.primary_ip == primary_ip or
2277 existing_node.secondary_ip == primary_ip or
2278 existing_node.primary_ip == secondary_ip or
2279 existing_node.secondary_ip == secondary_ip):
2280 raise errors.OpPrereqError("New node ip address(es) conflict with"
2281 " existing node %s" % existing_node.name)
2283 # check that the type of the node (single versus dual homed) is the
2284 # same as for the master
2285 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2286 master_singlehomed = myself.secondary_ip == myself.primary_ip
2287 newbie_singlehomed = secondary_ip == primary_ip
2288 if master_singlehomed != newbie_singlehomed:
2289 if master_singlehomed:
2290 raise errors.OpPrereqError("The master has no private ip but the"
2291 " new node has one")
2293 raise errors.OpPrereqError("The master has a private ip but the"
2294 " new node doesn't have one")
2296 # checks reachablity
2297 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2298 raise errors.OpPrereqError("Node not reachable by ping")
2300 if not newbie_singlehomed:
2301 # check reachability from my secondary ip to newbie's secondary ip
2302 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2303 source=myself.secondary_ip):
2304 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2305 " based ping to noded port")
2307 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2308 mc_now, _ = self.cfg.GetMasterCandidateStats()
2309 master_candidate = mc_now < cp_size
2311 self.new_node = objects.Node(name=node,
2312 primary_ip=primary_ip,
2313 secondary_ip=secondary_ip,
2314 master_candidate=master_candidate,
2315 offline=False, drained=False)
2317 def Exec(self, feedback_fn):
2318 """Adds the new node to the cluster.
2321 new_node = self.new_node
2322 node = new_node.name
2324 # check connectivity
2325 result = self.rpc.call_version([node])[node]
2326 result.Raise("Can't get version information from node %s" % node)
2327 if constants.PROTOCOL_VERSION == result.payload:
2328 logging.info("Communication to node %s fine, sw version %s match",
2329 node, result.payload)
2331 raise errors.OpExecError("Version mismatch master version %s,"
2332 " node version %s" %
2333 (constants.PROTOCOL_VERSION, result.payload))
2336 logging.info("Copy ssh key to node %s", node)
2337 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2339 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2340 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2346 keyarray.append(f.read())
2350 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2352 keyarray[3], keyarray[4], keyarray[5])
2353 result.Raise("Cannot transfer ssh keys to the new node")
2355 # Add node to our /etc/hosts, and add key to known_hosts
2356 if self.cfg.GetClusterInfo().modify_etc_hosts:
2357 utils.AddHostToEtcHosts(new_node.name)
2359 if new_node.secondary_ip != new_node.primary_ip:
2360 result = self.rpc.call_node_has_ip_address(new_node.name,
2361 new_node.secondary_ip)
2362 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2364 if not result.payload:
2365 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2366 " you gave (%s). Please fix and re-run this"
2367 " command." % new_node.secondary_ip)
2369 node_verify_list = [self.cfg.GetMasterNode()]
2370 node_verify_param = {
2372 # TODO: do a node-net-test as well?
2375 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2376 self.cfg.GetClusterName())
2377 for verifier in node_verify_list:
2378 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2379 nl_payload = result[verifier].payload['nodelist']
2381 for failed in nl_payload:
2382 feedback_fn("ssh/hostname verification failed %s -> %s" %
2383 (verifier, nl_payload[failed]))
2384 raise errors.OpExecError("ssh/hostname verification failed.")
2387 _RedistributeAncillaryFiles(self)
2388 self.context.ReaddNode(new_node)
2390 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2391 self.context.AddNode(new_node)
2394 class LUSetNodeParams(LogicalUnit):
2395 """Modifies the parameters of a node.
2398 HPATH = "node-modify"
2399 HTYPE = constants.HTYPE_NODE
2400 _OP_REQP = ["node_name"]
2403 def CheckArguments(self):
2404 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2405 if node_name is None:
2406 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2407 self.op.node_name = node_name
2408 _CheckBooleanOpField(self.op, 'master_candidate')
2409 _CheckBooleanOpField(self.op, 'offline')
2410 _CheckBooleanOpField(self.op, 'drained')
2411 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2412 if all_mods.count(None) == 3:
2413 raise errors.OpPrereqError("Please pass at least one modification")
2414 if all_mods.count(True) > 1:
2415 raise errors.OpPrereqError("Can't set the node into more than one"
2416 " state at the same time")
2418 def ExpandNames(self):
2419 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2421 def BuildHooksEnv(self):
2424 This runs on the master node.
2428 "OP_TARGET": self.op.node_name,
2429 "MASTER_CANDIDATE": str(self.op.master_candidate),
2430 "OFFLINE": str(self.op.offline),
2431 "DRAINED": str(self.op.drained),
2433 nl = [self.cfg.GetMasterNode(),
2437 def CheckPrereq(self):
2438 """Check prerequisites.
2440 This only checks the instance list against the existing names.
2443 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2445 if ((self.op.master_candidate == False or self.op.offline == True or
2446 self.op.drained == True) and node.master_candidate):
2447 # we will demote the node from master_candidate
2448 if self.op.node_name == self.cfg.GetMasterNode():
2449 raise errors.OpPrereqError("The master node has to be a"
2450 " master candidate, online and not drained")
2451 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2452 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2453 if num_candidates <= cp_size:
2454 msg = ("Not enough master candidates (desired"
2455 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2457 self.LogWarning(msg)
2459 raise errors.OpPrereqError(msg)
2461 if (self.op.master_candidate == True and
2462 ((node.offline and not self.op.offline == False) or
2463 (node.drained and not self.op.drained == False))):
2464 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2465 " to master_candidate" % node.name)
2469 def Exec(self, feedback_fn):
2478 if self.op.offline is not None:
2479 node.offline = self.op.offline
2480 result.append(("offline", str(self.op.offline)))
2481 if self.op.offline == True:
2482 if node.master_candidate:
2483 node.master_candidate = False
2485 result.append(("master_candidate", "auto-demotion due to offline"))
2487 node.drained = False
2488 result.append(("drained", "clear drained status due to offline"))
2490 if self.op.master_candidate is not None:
2491 node.master_candidate = self.op.master_candidate
2493 result.append(("master_candidate", str(self.op.master_candidate)))
2494 if self.op.master_candidate == False:
2495 rrc = self.rpc.call_node_demote_from_mc(node.name)
2498 self.LogWarning("Node failed to demote itself: %s" % msg)
2500 if self.op.drained is not None:
2501 node.drained = self.op.drained
2502 result.append(("drained", str(self.op.drained)))
2503 if self.op.drained == True:
2504 if node.master_candidate:
2505 node.master_candidate = False
2507 result.append(("master_candidate", "auto-demotion due to drain"))
2509 node.offline = False
2510 result.append(("offline", "clear offline status due to drain"))
2512 # this will trigger configuration file update, if needed
2513 self.cfg.Update(node)
2514 # this will trigger job queue propagation or cleanup
2516 self.context.ReaddNode(node)
2521 class LUPowercycleNode(NoHooksLU):
2522 """Powercycles a node.
2525 _OP_REQP = ["node_name", "force"]
2528 def CheckArguments(self):
2529 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2530 if node_name is None:
2531 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2532 self.op.node_name = node_name
2533 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2534 raise errors.OpPrereqError("The node is the master and the force"
2535 " parameter was not set")
2537 def ExpandNames(self):
2538 """Locking for PowercycleNode.
2540 This is a last-resource option and shouldn't block on other
2541 jobs. Therefore, we grab no locks.
2544 self.needed_locks = {}
2546 def CheckPrereq(self):
2547 """Check prerequisites.
2549 This LU has no prereqs.
2554 def Exec(self, feedback_fn):
2558 result = self.rpc.call_node_powercycle(self.op.node_name,
2559 self.cfg.GetHypervisorType())
2560 result.Raise("Failed to schedule the reboot")
2561 return result.payload
2564 class LUQueryClusterInfo(NoHooksLU):
2565 """Query cluster configuration.
2571 def ExpandNames(self):
2572 self.needed_locks = {}
2574 def CheckPrereq(self):
2575 """No prerequsites needed for this LU.
2580 def Exec(self, feedback_fn):
2581 """Return cluster config.
2584 cluster = self.cfg.GetClusterInfo()
2586 "software_version": constants.RELEASE_VERSION,
2587 "protocol_version": constants.PROTOCOL_VERSION,
2588 "config_version": constants.CONFIG_VERSION,
2589 "os_api_version": max(constants.OS_API_VERSIONS),
2590 "export_version": constants.EXPORT_VERSION,
2591 "architecture": (platform.architecture()[0], platform.machine()),
2592 "name": cluster.cluster_name,
2593 "master": cluster.master_node,
2594 "default_hypervisor": cluster.default_hypervisor,
2595 "enabled_hypervisors": cluster.enabled_hypervisors,
2596 "hvparams": dict([(hvname, cluster.hvparams[hvname])
2597 for hvname in cluster.enabled_hypervisors]),
2598 "beparams": cluster.beparams,
2599 "nicparams": cluster.nicparams,
2600 "candidate_pool_size": cluster.candidate_pool_size,
2601 "master_netdev": cluster.master_netdev,
2602 "volume_group_name": cluster.volume_group_name,
2603 "file_storage_dir": cluster.file_storage_dir,
2609 class LUQueryConfigValues(NoHooksLU):
2610 """Return configuration values.
2615 _FIELDS_DYNAMIC = utils.FieldSet()
2616 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2618 def ExpandNames(self):
2619 self.needed_locks = {}
2621 _CheckOutputFields(static=self._FIELDS_STATIC,
2622 dynamic=self._FIELDS_DYNAMIC,
2623 selected=self.op.output_fields)
2625 def CheckPrereq(self):
2626 """No prerequisites.
2631 def Exec(self, feedback_fn):
2632 """Dump a representation of the cluster config to the standard output.
2636 for field in self.op.output_fields:
2637 if field == "cluster_name":
2638 entry = self.cfg.GetClusterName()
2639 elif field == "master_node":
2640 entry = self.cfg.GetMasterNode()
2641 elif field == "drain_flag":
2642 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2644 raise errors.ParameterError(field)
2645 values.append(entry)
2649 class LUActivateInstanceDisks(NoHooksLU):
2650 """Bring up an instance's disks.
2653 _OP_REQP = ["instance_name"]
2656 def ExpandNames(self):
2657 self._ExpandAndLockInstance()
2658 self.needed_locks[locking.LEVEL_NODE] = []
2659 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2661 def DeclareLocks(self, level):
2662 if level == locking.LEVEL_NODE:
2663 self._LockInstancesNodes()
2665 def CheckPrereq(self):
2666 """Check prerequisites.
2668 This checks that the instance is in the cluster.
2671 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2672 assert self.instance is not None, \
2673 "Cannot retrieve locked instance %s" % self.op.instance_name
2674 _CheckNodeOnline(self, self.instance.primary_node)
2676 def Exec(self, feedback_fn):
2677 """Activate the disks.
2680 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2682 raise errors.OpExecError("Cannot activate block devices")
2687 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2688 """Prepare the block devices for an instance.
2690 This sets up the block devices on all nodes.
2692 @type lu: L{LogicalUnit}
2693 @param lu: the logical unit on whose behalf we execute
2694 @type instance: L{objects.Instance}
2695 @param instance: the instance for whose disks we assemble
2696 @type ignore_secondaries: boolean
2697 @param ignore_secondaries: if true, errors on secondary nodes
2698 won't result in an error return from the function
2699 @return: False if the operation failed, otherwise a list of
2700 (host, instance_visible_name, node_visible_name)
2701 with the mapping from node devices to instance devices
2706 iname = instance.name
2707 # With the two passes mechanism we try to reduce the window of
2708 # opportunity for the race condition of switching DRBD to primary
2709 # before handshaking occured, but we do not eliminate it
2711 # The proper fix would be to wait (with some limits) until the
2712 # connection has been made and drbd transitions from WFConnection
2713 # into any other network-connected state (Connected, SyncTarget,
2716 # 1st pass, assemble on all nodes in secondary mode
2717 for inst_disk in instance.disks:
2718 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2719 lu.cfg.SetDiskID(node_disk, node)
2720 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2721 msg = result.fail_msg
2723 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2724 " (is_primary=False, pass=1): %s",
2725 inst_disk.iv_name, node, msg)
2726 if not ignore_secondaries:
2729 # FIXME: race condition on drbd migration to primary
2731 # 2nd pass, do only the primary node
2732 for inst_disk in instance.disks:
2733 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2734 if node != instance.primary_node:
2736 lu.cfg.SetDiskID(node_disk, node)
2737 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2738 msg = result.fail_msg
2740 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2741 " (is_primary=True, pass=2): %s",
2742 inst_disk.iv_name, node, msg)
2744 device_info.append((instance.primary_node, inst_disk.iv_name,
2747 # leave the disks configured for the primary node
2748 # this is a workaround that would be fixed better by
2749 # improving the logical/physical id handling
2750 for disk in instance.disks:
2751 lu.cfg.SetDiskID(disk, instance.primary_node)
2753 return disks_ok, device_info
2756 def _StartInstanceDisks(lu, instance, force):
2757 """Start the disks of an instance.
2760 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2761 ignore_secondaries=force)
2763 _ShutdownInstanceDisks(lu, instance)
2764 if force is not None and not force:
2765 lu.proc.LogWarning("", hint="If the message above refers to a"
2767 " you can retry the operation using '--force'.")
2768 raise errors.OpExecError("Disk consistency error")
2771 class LUDeactivateInstanceDisks(NoHooksLU):
2772 """Shutdown an instance's disks.
2775 _OP_REQP = ["instance_name"]
2778 def ExpandNames(self):
2779 self._ExpandAndLockInstance()
2780 self.needed_locks[locking.LEVEL_NODE] = []
2781 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2783 def DeclareLocks(self, level):
2784 if level == locking.LEVEL_NODE:
2785 self._LockInstancesNodes()
2787 def CheckPrereq(self):
2788 """Check prerequisites.
2790 This checks that the instance is in the cluster.
2793 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2794 assert self.instance is not None, \
2795 "Cannot retrieve locked instance %s" % self.op.instance_name
2797 def Exec(self, feedback_fn):
2798 """Deactivate the disks
2801 instance = self.instance
2802 _SafeShutdownInstanceDisks(self, instance)
2805 def _SafeShutdownInstanceDisks(lu, instance):
2806 """Shutdown block devices of an instance.
2808 This function checks if an instance is running, before calling
2809 _ShutdownInstanceDisks.
2812 pnode = instance.primary_node
2813 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2814 ins_l.Raise("Can't contact node %s" % pnode)
2816 if instance.name in ins_l.payload:
2817 raise errors.OpExecError("Instance is running, can't shutdown"
2820 _ShutdownInstanceDisks(lu, instance)
2823 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2824 """Shutdown block devices of an instance.
2826 This does the shutdown on all nodes of the instance.
2828 If the ignore_primary is false, errors on the primary node are
2833 for disk in instance.disks:
2834 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2835 lu.cfg.SetDiskID(top_disk, node)
2836 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2837 msg = result.fail_msg
2839 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2840 disk.iv_name, node, msg)
2841 if not ignore_primary or node != instance.primary_node:
2846 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2847 """Checks if a node has enough free memory.
2849 This function check if a given node has the needed amount of free
2850 memory. In case the node has less memory or we cannot get the
2851 information from the node, this function raise an OpPrereqError
2854 @type lu: C{LogicalUnit}
2855 @param lu: a logical unit from which we get configuration data
2857 @param node: the node to check
2858 @type reason: C{str}
2859 @param reason: string to use in the error message
2860 @type requested: C{int}
2861 @param requested: the amount of memory in MiB to check for
2862 @type hypervisor_name: C{str}
2863 @param hypervisor_name: the hypervisor to ask for memory stats
2864 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2865 we cannot check the node
2868 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2869 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2870 free_mem = nodeinfo[node].payload.get('memory_free', None)
2871 if not isinstance(free_mem, int):
2872 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2873 " was '%s'" % (node, free_mem))
2874 if requested > free_mem:
2875 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2876 " needed %s MiB, available %s MiB" %
2877 (node, reason, requested, free_mem))
2880 class LUStartupInstance(LogicalUnit):
2881 """Starts an instance.
2884 HPATH = "instance-start"
2885 HTYPE = constants.HTYPE_INSTANCE
2886 _OP_REQP = ["instance_name", "force"]
2889 def ExpandNames(self):
2890 self._ExpandAndLockInstance()
2892 def BuildHooksEnv(self):
2895 This runs on master, primary and secondary nodes of the instance.
2899 "FORCE": self.op.force,
2901 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2902 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2905 def CheckPrereq(self):
2906 """Check prerequisites.
2908 This checks that the instance is in the cluster.
2911 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2912 assert self.instance is not None, \
2913 "Cannot retrieve locked instance %s" % self.op.instance_name
2916 self.beparams = getattr(self.op, "beparams", {})
2918 if not isinstance(self.beparams, dict):
2919 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2920 " dict" % (type(self.beparams), ))
2921 # fill the beparams dict
2922 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2923 self.op.beparams = self.beparams
2926 self.hvparams = getattr(self.op, "hvparams", {})
2928 if not isinstance(self.hvparams, dict):
2929 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2930 " dict" % (type(self.hvparams), ))
2932 # check hypervisor parameter syntax (locally)
2933 cluster = self.cfg.GetClusterInfo()
2934 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2935 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2937 filled_hvp.update(self.hvparams)
2938 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2939 hv_type.CheckParameterSyntax(filled_hvp)
2940 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2941 self.op.hvparams = self.hvparams
2943 _CheckNodeOnline(self, instance.primary_node)
2945 bep = self.cfg.GetClusterInfo().FillBE(instance)
2946 # check bridges existance
2947 _CheckInstanceBridgesExist(self, instance)
2949 remote_info = self.rpc.call_instance_info(instance.primary_node,
2951 instance.hypervisor)
2952 remote_info.Raise("Error checking node %s" % instance.primary_node,
2954 if not remote_info.payload: # not running already
2955 _CheckNodeFreeMemory(self, instance.primary_node,
2956 "starting instance %s" % instance.name,
2957 bep[constants.BE_MEMORY], instance.hypervisor)
2959 def Exec(self, feedback_fn):
2960 """Start the instance.
2963 instance = self.instance
2964 force = self.op.force
2966 self.cfg.MarkInstanceUp(instance.name)
2968 node_current = instance.primary_node
2970 _StartInstanceDisks(self, instance, force)
2972 result = self.rpc.call_instance_start(node_current, instance,
2973 self.hvparams, self.beparams)
2974 msg = result.fail_msg
2976 _ShutdownInstanceDisks(self, instance)
2977 raise errors.OpExecError("Could not start instance: %s" % msg)
2980 class LURebootInstance(LogicalUnit):
2981 """Reboot an instance.
2984 HPATH = "instance-reboot"
2985 HTYPE = constants.HTYPE_INSTANCE
2986 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2989 def ExpandNames(self):
2990 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2991 constants.INSTANCE_REBOOT_HARD,
2992 constants.INSTANCE_REBOOT_FULL]:
2993 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2994 (constants.INSTANCE_REBOOT_SOFT,
2995 constants.INSTANCE_REBOOT_HARD,
2996 constants.INSTANCE_REBOOT_FULL))
2997 self._ExpandAndLockInstance()
2999 def BuildHooksEnv(self):
3002 This runs on master, primary and secondary nodes of the instance.
3006 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3007 "REBOOT_TYPE": self.op.reboot_type,
3009 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3010 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3013 def CheckPrereq(self):
3014 """Check prerequisites.
3016 This checks that the instance is in the cluster.
3019 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3020 assert self.instance is not None, \
3021 "Cannot retrieve locked instance %s" % self.op.instance_name
3023 _CheckNodeOnline(self, instance.primary_node)
3025 # check bridges existance
3026 _CheckInstanceBridgesExist(self, instance)
3028 def Exec(self, feedback_fn):
3029 """Reboot the instance.
3032 instance = self.instance
3033 ignore_secondaries = self.op.ignore_secondaries
3034 reboot_type = self.op.reboot_type
3036 node_current = instance.primary_node
3038 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3039 constants.INSTANCE_REBOOT_HARD]:
3040 for disk in instance.disks:
3041 self.cfg.SetDiskID(disk, node_current)
3042 result = self.rpc.call_instance_reboot(node_current, instance,
3044 result.Raise("Could not reboot instance")
3046 result = self.rpc.call_instance_shutdown(node_current, instance)
3047 result.Raise("Could not shutdown instance for full reboot")
3048 _ShutdownInstanceDisks(self, instance)
3049 _StartInstanceDisks(self, instance, ignore_secondaries)
3050 result = self.rpc.call_instance_start(node_current, instance, None, None)
3051 msg = result.fail_msg
3053 _ShutdownInstanceDisks(self, instance)
3054 raise errors.OpExecError("Could not start instance for"
3055 " full reboot: %s" % msg)
3057 self.cfg.MarkInstanceUp(instance.name)
3060 class LUShutdownInstance(LogicalUnit):
3061 """Shutdown an instance.
3064 HPATH = "instance-stop"
3065 HTYPE = constants.HTYPE_INSTANCE
3066 _OP_REQP = ["instance_name"]
3069 def ExpandNames(self):
3070 self._ExpandAndLockInstance()
3072 def BuildHooksEnv(self):
3075 This runs on master, primary and secondary nodes of the instance.
3078 env = _BuildInstanceHookEnvByObject(self, self.instance)
3079 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3082 def CheckPrereq(self):
3083 """Check prerequisites.
3085 This checks that the instance is in the cluster.
3088 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3089 assert self.instance is not None, \
3090 "Cannot retrieve locked instance %s" % self.op.instance_name
3091 _CheckNodeOnline(self, self.instance.primary_node)
3093 def Exec(self, feedback_fn):
3094 """Shutdown the instance.
3097 instance = self.instance
3098 node_current = instance.primary_node
3099 self.cfg.MarkInstanceDown(instance.name)
3100 result = self.rpc.call_instance_shutdown(node_current, instance)
3101 msg = result.fail_msg
3103 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3105 _ShutdownInstanceDisks(self, instance)
3108 class LUReinstallInstance(LogicalUnit):
3109 """Reinstall an instance.
3112 HPATH = "instance-reinstall"
3113 HTYPE = constants.HTYPE_INSTANCE
3114 _OP_REQP = ["instance_name"]
3117 def ExpandNames(self):
3118 self._ExpandAndLockInstance()
3120 def BuildHooksEnv(self):
3123 This runs on master, primary and secondary nodes of the instance.
3126 env = _BuildInstanceHookEnvByObject(self, self.instance)
3127 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3130 def CheckPrereq(self):
3131 """Check prerequisites.
3133 This checks that the instance is in the cluster and is not running.
3136 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3137 assert instance is not None, \
3138 "Cannot retrieve locked instance %s" % self.op.instance_name
3139 _CheckNodeOnline(self, instance.primary_node)
3141 if instance.disk_template == constants.DT_DISKLESS:
3142 raise errors.OpPrereqError("Instance '%s' has no disks" %
3143 self.op.instance_name)
3144 if instance.admin_up:
3145 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3146 self.op.instance_name)
3147 remote_info = self.rpc.call_instance_info(instance.primary_node,
3149 instance.hypervisor)
3150 remote_info.Raise("Error checking node %s" % instance.primary_node,
3152 if remote_info.payload:
3153 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3154 (self.op.instance_name,
3155 instance.primary_node))
3157 self.op.os_type = getattr(self.op, "os_type", None)
3158 if self.op.os_type is not None:
3160 pnode = self.cfg.GetNodeInfo(
3161 self.cfg.ExpandNodeName(instance.primary_node))
3163 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3165 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3166 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3167 (self.op.os_type, pnode.name), prereq=True)
3169 self.instance = instance
3171 def Exec(self, feedback_fn):
3172 """Reinstall the instance.
3175 inst = self.instance
3177 if self.op.os_type is not None:
3178 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3179 inst.os = self.op.os_type
3180 self.cfg.Update(inst)
3182 _StartInstanceDisks(self, inst, None)
3184 feedback_fn("Running the instance OS create scripts...")
3185 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3186 result.Raise("Could not install OS for instance %s on node %s" %
3187 (inst.name, inst.primary_node))
3189 _ShutdownInstanceDisks(self, inst)
3192 class LURenameInstance(LogicalUnit):
3193 """Rename an instance.
3196 HPATH = "instance-rename"
3197 HTYPE = constants.HTYPE_INSTANCE
3198 _OP_REQP = ["instance_name", "new_name"]
3200 def BuildHooksEnv(self):
3203 This runs on master, primary and secondary nodes of the instance.
3206 env = _BuildInstanceHookEnvByObject(self, self.instance)
3207 env["INSTANCE_NEW_NAME"] = self.op.new_name
3208 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3211 def CheckPrereq(self):
3212 """Check prerequisites.
3214 This checks that the instance is in the cluster and is not running.
3217 instance = self.cfg.GetInstanceInfo(
3218 self.cfg.ExpandInstanceName(self.op.instance_name))
3219 if instance is None:
3220 raise errors.OpPrereqError("Instance '%s' not known" %
3221 self.op.instance_name)
3222 _CheckNodeOnline(self, instance.primary_node)
3224 if instance.admin_up:
3225 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3226 self.op.instance_name)
3227 remote_info = self.rpc.call_instance_info(instance.primary_node,
3229 instance.hypervisor)
3230 remote_info.Raise("Error checking node %s" % instance.primary_node,
3232 if remote_info.payload:
3233 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3234 (self.op.instance_name,
3235 instance.primary_node))
3236 self.instance = instance
3238 # new name verification
3239 name_info = utils.HostInfo(self.op.new_name)
3241 self.op.new_name = new_name = name_info.name
3242 instance_list = self.cfg.GetInstanceList()
3243 if new_name in instance_list:
3244 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3247 if not getattr(self.op, "ignore_ip", False):
3248 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3249 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3250 (name_info.ip, new_name))
3253 def Exec(self, feedback_fn):
3254 """Reinstall the instance.
3257 inst = self.instance
3258 old_name = inst.name
3260 if inst.disk_template == constants.DT_FILE:
3261 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3263 self.cfg.RenameInstance(inst.name, self.op.new_name)
3264 # Change the instance lock. This is definitely safe while we hold the BGL
3265 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3266 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3268 # re-read the instance from the configuration after rename
3269 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3271 if inst.disk_template == constants.DT_FILE:
3272 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3273 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3274 old_file_storage_dir,
3275 new_file_storage_dir)
3276 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3277 " (but the instance has been renamed in Ganeti)" %
3278 (inst.primary_node, old_file_storage_dir,
3279 new_file_storage_dir))
3281 _StartInstanceDisks(self, inst, None)
3283 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3285 msg = result.fail_msg
3287 msg = ("Could not run OS rename script for instance %s on node %s"
3288 " (but the instance has been renamed in Ganeti): %s" %
3289 (inst.name, inst.primary_node, msg))
3290 self.proc.LogWarning(msg)
3292 _ShutdownInstanceDisks(self, inst)
3295 class LURemoveInstance(LogicalUnit):
3296 """Remove an instance.
3299 HPATH = "instance-remove"
3300 HTYPE = constants.HTYPE_INSTANCE
3301 _OP_REQP = ["instance_name", "ignore_failures"]
3304 def ExpandNames(self):
3305 self._ExpandAndLockInstance()
3306 self.needed_locks[locking.LEVEL_NODE] = []
3307 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3309 def DeclareLocks(self, level):
3310 if level == locking.LEVEL_NODE:
3311 self._LockInstancesNodes()
3313 def BuildHooksEnv(self):
3316 This runs on master, primary and secondary nodes of the instance.
3319 env = _BuildInstanceHookEnvByObject(self, self.instance)
3320 nl = [self.cfg.GetMasterNode()]
3323 def CheckPrereq(self):
3324 """Check prerequisites.
3326 This checks that the instance is in the cluster.
3329 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3330 assert self.instance is not None, \
3331 "Cannot retrieve locked instance %s" % self.op.instance_name
3333 def Exec(self, feedback_fn):
3334 """Remove the instance.
3337 instance = self.instance
3338 logging.info("Shutting down instance %s on node %s",
3339 instance.name, instance.primary_node)
3341 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3342 msg = result.fail_msg
3344 if self.op.ignore_failures:
3345 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3347 raise errors.OpExecError("Could not shutdown instance %s on"
3349 (instance.name, instance.primary_node, msg))
3351 logging.info("Removing block devices for instance %s", instance.name)
3353 if not _RemoveDisks(self, instance):
3354 if self.op.ignore_failures:
3355 feedback_fn("Warning: can't remove instance's disks")
3357 raise errors.OpExecError("Can't remove instance's disks")
3359 logging.info("Removing instance %s out of cluster config", instance.name)
3361 self.cfg.RemoveInstance(instance.name)
3362 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3365 class LUQueryInstances(NoHooksLU):
3366 """Logical unit for querying instances.
3369 _OP_REQP = ["output_fields", "names", "use_locking"]
3371 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3373 "disk_template", "ip", "mac", "bridge",
3374 "nic_mode", "nic_link",
3375 "sda_size", "sdb_size", "vcpus", "tags",
3376 "network_port", "beparams",
3377 r"(disk)\.(size)/([0-9]+)",
3378 r"(disk)\.(sizes)", "disk_usage",
3379 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3380 r"(nic)\.(bridge)/([0-9]+)",
3381 r"(nic)\.(macs|ips|modes|links|bridges)",
3382 r"(disk|nic)\.(count)",
3383 "serial_no", "hypervisor", "hvparams",] +
3385 for name in constants.HVS_PARAMETERS] +
3387 for name in constants.BES_PARAMETERS])
3388 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3391 def ExpandNames(self):
3392 _CheckOutputFields(static=self._FIELDS_STATIC,
3393 dynamic=self._FIELDS_DYNAMIC,
3394 selected=self.op.output_fields)
3396 self.needed_locks = {}
3397 self.share_locks[locking.LEVEL_INSTANCE] = 1
3398 self.share_locks[locking.LEVEL_NODE] = 1
3401 self.wanted = _GetWantedInstances(self, self.op.names)
3403 self.wanted = locking.ALL_SET
3405 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3406 self.do_locking = self.do_node_query and self.op.use_locking
3408 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3409 self.needed_locks[locking.LEVEL_NODE] = []
3410 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3412 def DeclareLocks(self, level):
3413 if level == locking.LEVEL_NODE and self.do_locking:
3414 self._LockInstancesNodes()
3416 def CheckPrereq(self):
3417 """Check prerequisites.
3422 def Exec(self, feedback_fn):
3423 """Computes the list of nodes and their attributes.
3426 all_info = self.cfg.GetAllInstancesInfo()
3427 if self.wanted == locking.ALL_SET:
3428 # caller didn't specify instance names, so ordering is not important
3430 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3432 instance_names = all_info.keys()
3433 instance_names = utils.NiceSort(instance_names)
3435 # caller did specify names, so we must keep the ordering
3437 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3439 tgt_set = all_info.keys()
3440 missing = set(self.wanted).difference(tgt_set)
3442 raise errors.OpExecError("Some instances were removed before"
3443 " retrieving their data: %s" % missing)
3444 instance_names = self.wanted
3446 instance_list = [all_info[iname] for iname in instance_names]
3448 # begin data gathering
3450 nodes = frozenset([inst.primary_node for inst in instance_list])
3451 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3455 if self.do_node_query:
3457 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3459 result = node_data[name]
3461 # offline nodes will be in both lists
3462 off_nodes.append(name)
3463 if result.failed or result.fail_msg:
3464 bad_nodes.append(name)
3467 live_data.update(result.payload)
3468 # else no instance is alive
3470 live_data = dict([(name, {}) for name in instance_names])
3472 # end data gathering
3477 cluster = self.cfg.GetClusterInfo()
3478 for instance in instance_list:
3480 i_hv = cluster.FillHV(instance)
3481 i_be = cluster.FillBE(instance)
3482 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3483 nic.nicparams) for nic in instance.nics]
3484 for field in self.op.output_fields:
3485 st_match = self._FIELDS_STATIC.Matches(field)
3490 elif field == "pnode":
3491 val = instance.primary_node
3492 elif field == "snodes":
3493 val = list(instance.secondary_nodes)
3494 elif field == "admin_state":
3495 val = instance.admin_up
3496 elif field == "oper_state":
3497 if instance.primary_node in bad_nodes:
3500 val = bool(live_data.get(instance.name))
3501 elif field == "status":
3502 if instance.primary_node in off_nodes:
3503 val = "ERROR_nodeoffline"
3504 elif instance.primary_node in bad_nodes:
3505 val = "ERROR_nodedown"
3507 running = bool(live_data.get(instance.name))
3509 if instance.admin_up:
3514 if instance.admin_up:
3518 elif field == "oper_ram":
3519 if instance.primary_node in bad_nodes:
3521 elif instance.name in live_data:
3522 val = live_data[instance.name].get("memory", "?")
3525 elif field == "vcpus":
3526 val = i_be[constants.BE_VCPUS]
3527 elif field == "disk_template":
3528 val = instance.disk_template
3531 val = instance.nics[0].ip
3534 elif field == "nic_mode":
3536 val = i_nicp[0][constants.NIC_MODE]
3539 elif field == "nic_link":
3541 val = i_nicp[0][constants.NIC_LINK]
3544 elif field == "bridge":
3545 if (instance.nics and
3546 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3547 val = i_nicp[0][constants.NIC_LINK]
3550 elif field == "mac":
3552 val = instance.nics[0].mac
3555 elif field == "sda_size" or field == "sdb_size":
3556 idx = ord(field[2]) - ord('a')
3558 val = instance.FindDisk(idx).size
3559 except errors.OpPrereqError:
3561 elif field == "disk_usage": # total disk usage per node
3562 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3563 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3564 elif field == "tags":
3565 val = list(instance.GetTags())
3566 elif field == "serial_no":
3567 val = instance.serial_no
3568 elif field == "network_port":
3569 val = instance.network_port
3570 elif field == "hypervisor":
3571 val = instance.hypervisor
3572 elif field == "hvparams":
3574 elif (field.startswith(HVPREFIX) and
3575 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3576 val = i_hv.get(field[len(HVPREFIX):], None)
3577 elif field == "beparams":
3579 elif (field.startswith(BEPREFIX) and
3580 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3581 val = i_be.get(field[len(BEPREFIX):], None)
3582 elif st_match and st_match.groups():
3583 # matches a variable list
3584 st_groups = st_match.groups()
3585 if st_groups and st_groups[0] == "disk":
3586 if st_groups[1] == "count":
3587 val = len(instance.disks)
3588 elif st_groups[1] == "sizes":
3589 val = [disk.size for disk in instance.disks]
3590 elif st_groups[1] == "size":
3592 val = instance.FindDisk(st_groups[2]).size
3593 except errors.OpPrereqError:
3596 assert False, "Unhandled disk parameter"
3597 elif st_groups[0] == "nic":
3598 if st_groups[1] == "count":
3599 val = len(instance.nics)
3600 elif st_groups[1] == "macs":
3601 val = [nic.mac for nic in instance.nics]
3602 elif st_groups[1] == "ips":
3603 val = [nic.ip for nic in instance.nics]
3604 elif st_groups[1] == "modes":
3605 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3606 elif st_groups[1] == "links":
3607 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3608 elif st_groups[1] == "bridges":
3611 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3612 val.append(nicp[constants.NIC_LINK])
3617 nic_idx = int(st_groups[2])
3618 if nic_idx >= len(instance.nics):
3621 if st_groups[1] == "mac":
3622 val = instance.nics[nic_idx].mac
3623 elif st_groups[1] == "ip":
3624 val = instance.nics[nic_idx].ip
3625 elif st_groups[1] == "mode":
3626 val = i_nicp[nic_idx][constants.NIC_MODE]
3627 elif st_groups[1] == "link":
3628 val = i_nicp[nic_idx][constants.NIC_LINK]
3629 elif st_groups[1] == "bridge":
3630 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3631 if nic_mode == constants.NIC_MODE_BRIDGED:
3632 val = i_nicp[nic_idx][constants.NIC_LINK]
3636 assert False, "Unhandled NIC parameter"
3638 assert False, ("Declared but unhandled variable parameter '%s'" %
3641 assert False, "Declared but unhandled parameter '%s'" % field
3648 class LUFailoverInstance(LogicalUnit):
3649 """Failover an instance.
3652 HPATH = "instance-failover"
3653 HTYPE = constants.HTYPE_INSTANCE
3654 _OP_REQP = ["instance_name", "ignore_consistency"]
3657 def ExpandNames(self):
3658 self._ExpandAndLockInstance()
3659 self.needed_locks[locking.LEVEL_NODE] = []
3660 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3662 def DeclareLocks(self, level):
3663 if level == locking.LEVEL_NODE:
3664 self._LockInstancesNodes()
3666 def BuildHooksEnv(self):
3669 This runs on master, primary and secondary nodes of the instance.
3673 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3675 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3676 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3679 def CheckPrereq(self):
3680 """Check prerequisites.
3682 This checks that the instance is in the cluster.
3685 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3686 assert self.instance is not None, \
3687 "Cannot retrieve locked instance %s" % self.op.instance_name
3689 bep = self.cfg.GetClusterInfo().FillBE(instance)
3690 if instance.disk_template not in constants.DTS_NET_MIRROR:
3691 raise errors.OpPrereqError("Instance's disk layout is not"
3692 " network mirrored, cannot failover.")
3694 secondary_nodes = instance.secondary_nodes
3695 if not secondary_nodes:
3696 raise errors.ProgrammerError("no secondary node but using "
3697 "a mirrored disk template")
3699 target_node = secondary_nodes[0]
3700 _CheckNodeOnline(self, target_node)
3701 _CheckNodeNotDrained(self, target_node)
3702 if instance.admin_up:
3703 # check memory requirements on the secondary node
3704 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3705 instance.name, bep[constants.BE_MEMORY],
3706 instance.hypervisor)
3708 self.LogInfo("Not checking memory on the secondary node as"
3709 " instance will not be started")
3711 # check bridge existance
3712 _CheckInstanceBridgesExist(self, instance, node=target_node)
3714 def Exec(self, feedback_fn):
3715 """Failover an instance.
3717 The failover is done by shutting it down on its present node and
3718 starting it on the secondary.
3721 instance = self.instance
3723 source_node = instance.primary_node
3724 target_node = instance.secondary_nodes[0]
3726 feedback_fn("* checking disk consistency between source and target")
3727 for dev in instance.disks:
3728 # for drbd, these are drbd over lvm
3729 if not _CheckDiskConsistency(self, dev, target_node, False):
3730 if instance.admin_up and not self.op.ignore_consistency:
3731 raise errors.OpExecError("Disk %s is degraded on target node,"
3732 " aborting failover." % dev.iv_name)
3734 feedback_fn("* shutting down instance on source node")
3735 logging.info("Shutting down instance %s on node %s",
3736 instance.name, source_node)
3738 result = self.rpc.call_instance_shutdown(source_node, instance)
3739 msg = result.fail_msg
3741 if self.op.ignore_consistency:
3742 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3743 " Proceeding anyway. Please make sure node"
3744 " %s is down. Error details: %s",
3745 instance.name, source_node, source_node, msg)
3747 raise errors.OpExecError("Could not shutdown instance %s on"
3749 (instance.name, source_node, msg))
3751 feedback_fn("* deactivating the instance's disks on source node")
3752 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3753 raise errors.OpExecError("Can't shut down the instance's disks.")
3755 instance.primary_node = target_node
3756 # distribute new instance config to the other nodes
3757 self.cfg.Update(instance)
3759 # Only start the instance if it's marked as up
3760 if instance.admin_up:
3761 feedback_fn("* activating the instance's disks on target node")
3762 logging.info("Starting instance %s on node %s",
3763 instance.name, target_node)
3765 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3766 ignore_secondaries=True)
3768 _ShutdownInstanceDisks(self, instance)
3769 raise errors.OpExecError("Can't activate the instance's disks")
3771 feedback_fn("* starting the instance on the target node")
3772 result = self.rpc.call_instance_start(target_node, instance, None, None)
3773 msg = result.fail_msg
3775 _ShutdownInstanceDisks(self, instance)
3776 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3777 (instance.name, target_node, msg))
3780 class LUMigrateInstance(LogicalUnit):
3781 """Migrate an instance.
3783 This is migration without shutting down, compared to the failover,
3784 which is done with shutdown.
3787 HPATH = "instance-migrate"
3788 HTYPE = constants.HTYPE_INSTANCE
3789 _OP_REQP = ["instance_name", "live", "cleanup"]
3793 def ExpandNames(self):
3794 self._ExpandAndLockInstance()
3795 self.needed_locks[locking.LEVEL_NODE] = []
3796 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3798 def DeclareLocks(self, level):
3799 if level == locking.LEVEL_NODE:
3800 self._LockInstancesNodes()
3802 def BuildHooksEnv(self):
3805 This runs on master, primary and secondary nodes of the instance.
3808 env = _BuildInstanceHookEnvByObject(self, self.instance)
3809 env["MIGRATE_LIVE"] = self.op.live
3810 env["MIGRATE_CLEANUP"] = self.op.cleanup
3811 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3814 def CheckPrereq(self):
3815 """Check prerequisites.
3817 This checks that the instance is in the cluster.
3820 instance = self.cfg.GetInstanceInfo(
3821 self.cfg.ExpandInstanceName(self.op.instance_name))
3822 if instance is None:
3823 raise errors.OpPrereqError("Instance '%s' not known" %
3824 self.op.instance_name)
3826 if instance.disk_template != constants.DT_DRBD8:
3827 raise errors.OpPrereqError("Instance's disk layout is not"
3828 " drbd8, cannot migrate.")
3830 secondary_nodes = instance.secondary_nodes
3831 if not secondary_nodes:
3832 raise errors.ConfigurationError("No secondary node but using"
3833 " drbd8 disk template")
3835 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3837 target_node = secondary_nodes[0]
3838 # check memory requirements on the secondary node
3839 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3840 instance.name, i_be[constants.BE_MEMORY],
3841 instance.hypervisor)
3843 # check bridge existance
3844 _CheckInstanceBridgesExist(self, instance, node=target_node)
3846 if not self.op.cleanup:
3847 _CheckNodeNotDrained(self, target_node)
3848 result = self.rpc.call_instance_migratable(instance.primary_node,
3850 result.Raise("Can't migrate, please use failover", prereq=True)
3852 self.instance = instance
3854 def _WaitUntilSync(self):
3855 """Poll with custom rpc for disk sync.
3857 This uses our own step-based rpc call.
3860 self.feedback_fn("* wait until resync is done")
3864 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3866 self.instance.disks)
3868 for node, nres in result.items():
3869 nres.Raise("Cannot resync disks on node %s" % node)
3870 node_done, node_percent = nres.payload
3871 all_done = all_done and node_done
3872 if node_percent is not None:
3873 min_percent = min(min_percent, node_percent)
3875 if min_percent < 100:
3876 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3879 def _EnsureSecondary(self, node):
3880 """Demote a node to secondary.
3883 self.feedback_fn("* switching node %s to secondary mode" % node)
3885 for dev in self.instance.disks:
3886 self.cfg.SetDiskID(dev, node)
3888 result = self.rpc.call_blockdev_close(node, self.instance.name,
3889 self.instance.disks)
3890 result.Raise("Cannot change disk to secondary on node %s" % node)
3892 def _GoStandalone(self):
3893 """Disconnect from the network.
3896 self.feedback_fn("* changing into standalone mode")
3897 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3898 self.instance.disks)
3899 for node, nres in result.items():
3900 nres.Raise("Cannot disconnect disks node %s" % node)
3902 def _GoReconnect(self, multimaster):
3903 """Reconnect to the network.
3909 msg = "single-master"
3910 self.feedback_fn("* changing disks into %s mode" % msg)
3911 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3912 self.instance.disks,
3913 self.instance.name, multimaster)
3914 for node, nres in result.items():
3915 nres.Raise("Cannot change disks config on node %s" % node)
3917 def _ExecCleanup(self):
3918 """Try to cleanup after a failed migration.
3920 The cleanup is done by:
3921 - check that the instance is running only on one node
3922 (and update the config if needed)
3923 - change disks on its secondary node to secondary
3924 - wait until disks are fully synchronized
3925 - disconnect from the network
3926 - change disks into single-master mode
3927 - wait again until disks are fully synchronized
3930 instance = self.instance
3931 target_node = self.target_node
3932 source_node = self.source_node
3934 # check running on only one node
3935 self.feedback_fn("* checking where the instance actually runs"
3936 " (if this hangs, the hypervisor might be in"
3938 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3939 for node, result in ins_l.items():
3940 result.Raise("Can't contact node %s" % node)
3942 runningon_source = instance.name in ins_l[source_node].payload
3943 runningon_target = instance.name in ins_l[target_node].payload
3945 if runningon_source and runningon_target:
3946 raise errors.OpExecError("Instance seems to be running on two nodes,"
3947 " or the hypervisor is confused. You will have"
3948 " to ensure manually that it runs only on one"
3949 " and restart this operation.")
3951 if not (runningon_source or runningon_target):
3952 raise errors.OpExecError("Instance does not seem to be running at all."
3953 " In this case, it's safer to repair by"
3954 " running 'gnt-instance stop' to ensure disk"
3955 " shutdown, and then restarting it.")
3957 if runningon_target:
3958 # the migration has actually succeeded, we need to update the config
3959 self.feedback_fn("* instance running on secondary node (%s),"
3960 " updating config" % target_node)
3961 instance.primary_node = target_node
3962 self.cfg.Update(instance)
3963 demoted_node = source_node
3965 self.feedback_fn("* instance confirmed to be running on its"
3966 " primary node (%s)" % source_node)
3967 demoted_node = target_node
3969 self._EnsureSecondary(demoted_node)
3971 self._WaitUntilSync()
3972 except errors.OpExecError:
3973 # we ignore here errors, since if the device is standalone, it
3974 # won't be able to sync
3976 self._GoStandalone()
3977 self._GoReconnect(False)
3978 self._WaitUntilSync()
3980 self.feedback_fn("* done")
3982 def _RevertDiskStatus(self):
3983 """Try to revert the disk status after a failed migration.
3986 target_node = self.target_node
3988 self._EnsureSecondary(target_node)
3989 self._GoStandalone()
3990 self._GoReconnect(False)
3991 self._WaitUntilSync()
3992 except errors.OpExecError, err:
3993 self.LogWarning("Migration failed and I can't reconnect the"
3994 " drives: error '%s'\n"
3995 "Please look and recover the instance status" %
3998 def _AbortMigration(self):
3999 """Call the hypervisor code to abort a started migration.
4002 instance = self.instance
4003 target_node = self.target_node
4004 migration_info = self.migration_info
4006 abort_result = self.rpc.call_finalize_migration(target_node,
4010 abort_msg = abort_result.fail_msg
4012 logging.error("Aborting migration failed on target node %s: %s" %
4013 (target_node, abort_msg))
4014 # Don't raise an exception here, as we stil have to try to revert the
4015 # disk status, even if this step failed.
4017 def _ExecMigration(self):
4018 """Migrate an instance.
4020 The migrate is done by:
4021 - change the disks into dual-master mode
4022 - wait until disks are fully synchronized again
4023 - migrate the instance
4024 - change disks on the new secondary node (the old primary) to secondary
4025 - wait until disks are fully synchronized
4026 - change disks into single-master mode
4029 instance = self.instance
4030 target_node = self.target_node
4031 source_node = self.source_node
4033 self.feedback_fn("* checking disk consistency between source and target")
4034 for dev in instance.disks:
4035 if not _CheckDiskConsistency(self, dev, target_node, False):
4036 raise errors.OpExecError("Disk %s is degraded or not fully"
4037 " synchronized on target node,"
4038 " aborting migrate." % dev.iv_name)
4040 # First get the migration information from the remote node
4041 result = self.rpc.call_migration_info(source_node, instance)
4042 msg = result.fail_msg
4044 log_err = ("Failed fetching source migration information from %s: %s" %
4046 logging.error(log_err)
4047 raise errors.OpExecError(log_err)
4049 self.migration_info = migration_info = result.payload
4051 # Then switch the disks to master/master mode
4052 self._EnsureSecondary(target_node)
4053 self._GoStandalone()
4054 self._GoReconnect(True)
4055 self._WaitUntilSync()
4057 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4058 result = self.rpc.call_accept_instance(target_node,
4061 self.nodes_ip[target_node])
4063 msg = result.fail_msg
4065 logging.error("Instance pre-migration failed, trying to revert"
4066 " disk status: %s", msg)
4067 self._AbortMigration()
4068 self._RevertDiskStatus()
4069 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4070 (instance.name, msg))
4072 self.feedback_fn("* migrating instance to %s" % target_node)
4074 result = self.rpc.call_instance_migrate(source_node, instance,
4075 self.nodes_ip[target_node],
4077 msg = result.fail_msg
4079 logging.error("Instance migration failed, trying to revert"
4080 " disk status: %s", msg)
4081 self._AbortMigration()
4082 self._RevertDiskStatus()
4083 raise errors.OpExecError("Could not migrate instance %s: %s" %
4084 (instance.name, msg))
4087 instance.primary_node = target_node
4088 # distribute new instance config to the other nodes
4089 self.cfg.Update(instance)
4091 result = self.rpc.call_finalize_migration(target_node,
4095 msg = result.fail_msg
4097 logging.error("Instance migration succeeded, but finalization failed:"
4099 raise errors.OpExecError("Could not finalize instance migration: %s" %
4102 self._EnsureSecondary(source_node)
4103 self._WaitUntilSync()
4104 self._GoStandalone()
4105 self._GoReconnect(False)
4106 self._WaitUntilSync()
4108 self.feedback_fn("* done")
4110 def Exec(self, feedback_fn):
4111 """Perform the migration.
4114 self.feedback_fn = feedback_fn
4116 self.source_node = self.instance.primary_node
4117 self.target_node = self.instance.secondary_nodes[0]
4118 self.all_nodes = [self.source_node, self.target_node]
4120 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4121 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4124 return self._ExecCleanup()
4126 return self._ExecMigration()
4129 def _CreateBlockDev(lu, node, instance, device, force_create,
4131 """Create a tree of block devices on a given node.
4133 If this device type has to be created on secondaries, create it and
4136 If not, just recurse to children keeping the same 'force' value.
4138 @param lu: the lu on whose behalf we execute
4139 @param node: the node on which to create the device
4140 @type instance: L{objects.Instance}
4141 @param instance: the instance which owns the device
4142 @type device: L{objects.Disk}
4143 @param device: the device to create
4144 @type force_create: boolean
4145 @param force_create: whether to force creation of this device; this
4146 will be change to True whenever we find a device which has
4147 CreateOnSecondary() attribute
4148 @param info: the extra 'metadata' we should attach to the device
4149 (this will be represented as a LVM tag)
4150 @type force_open: boolean
4151 @param force_open: this parameter will be passes to the
4152 L{backend.BlockdevCreate} function where it specifies
4153 whether we run on primary or not, and it affects both
4154 the child assembly and the device own Open() execution
4157 if device.CreateOnSecondary():
4161 for child in device.children:
4162 _CreateBlockDev(lu, node, instance, child, force_create,
4165 if not force_create:
4168 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4171 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4172 """Create a single block device on a given node.
4174 This will not recurse over children of the device, so they must be
4177 @param lu: the lu on whose behalf we execute
4178 @param node: the node on which to create the device
4179 @type instance: L{objects.Instance}
4180 @param instance: the instance which owns the device
4181 @type device: L{objects.Disk}
4182 @param device: the device to create
4183 @param info: the extra 'metadata' we should attach to the device
4184 (this will be represented as a LVM tag)
4185 @type force_open: boolean
4186 @param force_open: this parameter will be passes to the
4187 L{backend.BlockdevCreate} function where it specifies
4188 whether we run on primary or not, and it affects both
4189 the child assembly and the device own Open() execution
4192 lu.cfg.SetDiskID(device, node)
4193 result = lu.rpc.call_blockdev_create(node, device, device.size,
4194 instance.name, force_open, info)
4195 result.Raise("Can't create block device %s on"
4196 " node %s for instance %s" % (device, node, instance.name))
4197 if device.physical_id is None:
4198 device.physical_id = result.payload
4201 def _GenerateUniqueNames(lu, exts):
4202 """Generate a suitable LV name.
4204 This will generate a logical volume name for the given instance.
4209 new_id = lu.cfg.GenerateUniqueID()
4210 results.append("%s%s" % (new_id, val))
4214 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4216 """Generate a drbd8 device complete with its children.
4219 port = lu.cfg.AllocatePort()
4220 vgname = lu.cfg.GetVGName()
4221 shared_secret = lu.cfg.GenerateDRBDSecret()
4222 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4223 logical_id=(vgname, names[0]))
4224 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4225 logical_id=(vgname, names[1]))
4226 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4227 logical_id=(primary, secondary, port,
4230 children=[dev_data, dev_meta],
4235 def _GenerateDiskTemplate(lu, template_name,
4236 instance_name, primary_node,
4237 secondary_nodes, disk_info,
4238 file_storage_dir, file_driver,
4240 """Generate the entire disk layout for a given template type.
4243 #TODO: compute space requirements
4245 vgname = lu.cfg.GetVGName()
4246 disk_count = len(disk_info)
4248 if template_name == constants.DT_DISKLESS:
4250 elif template_name == constants.DT_PLAIN:
4251 if len(secondary_nodes) != 0:
4252 raise errors.ProgrammerError("Wrong template configuration")
4254 names = _GenerateUniqueNames(lu, [".disk%d" % i
4255 for i in range(disk_count)])
4256 for idx, disk in enumerate(disk_info):
4257 disk_index = idx + base_index
4258 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4259 logical_id=(vgname, names[idx]),
4260 iv_name="disk/%d" % disk_index,
4262 disks.append(disk_dev)
4263 elif template_name == constants.DT_DRBD8:
4264 if len(secondary_nodes) != 1:
4265 raise errors.ProgrammerError("Wrong template configuration")
4266 remote_node = secondary_nodes[0]
4267 minors = lu.cfg.AllocateDRBDMinor(
4268 [primary_node, remote_node] * len(disk_info), instance_name)
4271 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4272 for i in range(disk_count)]):
4273 names.append(lv_prefix + "_data")
4274 names.append(lv_prefix + "_meta")
4275 for idx, disk in enumerate(disk_info):
4276 disk_index = idx + base_index
4277 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4278 disk["size"], names[idx*2:idx*2+2],
4279 "disk/%d" % disk_index,
4280 minors[idx*2], minors[idx*2+1])
4281 disk_dev.mode = disk["mode"]
4282 disks.append(disk_dev)
4283 elif template_name == constants.DT_FILE:
4284 if len(secondary_nodes) != 0:
4285 raise errors.ProgrammerError("Wrong template configuration")
4287 for idx, disk in enumerate(disk_info):
4288 disk_index = idx + base_index
4289 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4290 iv_name="disk/%d" % disk_index,
4291 logical_id=(file_driver,
4292 "%s/disk%d" % (file_storage_dir,
4295 disks.append(disk_dev)
4297 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4301 def _GetInstanceInfoText(instance):
4302 """Compute that text that should be added to the disk's metadata.
4305 return "originstname+%s" % instance.name
4308 def _CreateDisks(lu, instance):
4309 """Create all disks for an instance.
4311 This abstracts away some work from AddInstance.
4313 @type lu: L{LogicalUnit}
4314 @param lu: the logical unit on whose behalf we execute
4315 @type instance: L{objects.Instance}
4316 @param instance: the instance whose disks we should create
4318 @return: the success of the creation
4321 info = _GetInstanceInfoText(instance)
4322 pnode = instance.primary_node
4324 if instance.disk_template == constants.DT_FILE:
4325 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4326 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4328 result.Raise("Failed to create directory '%s' on"
4329 " node %s: %s" % (file_storage_dir, pnode))
4331 # Note: this needs to be kept in sync with adding of disks in
4332 # LUSetInstanceParams
4333 for device in instance.disks:
4334 logging.info("Creating volume %s for instance %s",
4335 device.iv_name, instance.name)
4337 for node in instance.all_nodes:
4338 f_create = node == pnode
4339 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4342 def _RemoveDisks(lu, instance):
4343 """Remove all disks for an instance.
4345 This abstracts away some work from `AddInstance()` and
4346 `RemoveInstance()`. Note that in case some of the devices couldn't
4347 be removed, the removal will continue with the other ones (compare
4348 with `_CreateDisks()`).
4350 @type lu: L{LogicalUnit}
4351 @param lu: the logical unit on whose behalf we execute
4352 @type instance: L{objects.Instance}
4353 @param instance: the instance whose disks we should remove
4355 @return: the success of the removal
4358 logging.info("Removing block devices for instance %s", instance.name)
4361 for device in instance.disks:
4362 for node, disk in device.ComputeNodeTree(instance.primary_node):
4363 lu.cfg.SetDiskID(disk, node)
4364 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4366 lu.LogWarning("Could not remove block device %s on node %s,"
4367 " continuing anyway: %s", device.iv_name, node, msg)
4370 if instance.disk_template == constants.DT_FILE:
4371 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4372 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4374 msg = result.fail_msg
4376 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4377 file_storage_dir, instance.primary_node, msg)
4383 def _ComputeDiskSize(disk_template, disks):
4384 """Compute disk size requirements in the volume group
4387 # Required free disk space as a function of disk and swap space
4389 constants.DT_DISKLESS: None,
4390 constants.DT_PLAIN: sum(d["size"] for d in disks),
4391 # 128 MB are added for drbd metadata for each disk
4392 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4393 constants.DT_FILE: None,
4396 if disk_template not in req_size_dict:
4397 raise errors.ProgrammerError("Disk template '%s' size requirement"
4398 " is unknown" % disk_template)
4400 return req_size_dict[disk_template]
4403 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4404 """Hypervisor parameter validation.
4406 This function abstract the hypervisor parameter validation to be
4407 used in both instance create and instance modify.
4409 @type lu: L{LogicalUnit}
4410 @param lu: the logical unit for which we check
4411 @type nodenames: list
4412 @param nodenames: the list of nodes on which we should check
4413 @type hvname: string
4414 @param hvname: the name of the hypervisor we should use
4415 @type hvparams: dict
4416 @param hvparams: the parameters which we need to check
4417 @raise errors.OpPrereqError: if the parameters are not valid
4420 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4423 for node in nodenames:
4427 info.Raise("Hypervisor parameter validation failed on node %s" % node)
4430 class LUCreateInstance(LogicalUnit):
4431 """Create an instance.
4434 HPATH = "instance-add"
4435 HTYPE = constants.HTYPE_INSTANCE
4436 _OP_REQP = ["instance_name", "disks", "disk_template",
4438 "wait_for_sync", "ip_check", "nics",
4439 "hvparams", "beparams"]
4442 def _ExpandNode(self, node):
4443 """Expands and checks one node name.
4446 node_full = self.cfg.ExpandNodeName(node)
4447 if node_full is None:
4448 raise errors.OpPrereqError("Unknown node %s" % node)
4451 def ExpandNames(self):
4452 """ExpandNames for CreateInstance.
4454 Figure out the right locks for instance creation.
4457 self.needed_locks = {}
4459 # set optional parameters to none if they don't exist
4460 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4461 if not hasattr(self.op, attr):
4462 setattr(self.op, attr, None)
4464 # cheap checks, mostly valid constants given
4466 # verify creation mode
4467 if self.op.mode not in (constants.INSTANCE_CREATE,
4468 constants.INSTANCE_IMPORT):
4469 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4472 # disk template and mirror node verification
4473 if self.op.disk_template not in constants.DISK_TEMPLATES:
4474 raise errors.OpPrereqError("Invalid disk template name")
4476 if self.op.hypervisor is None:
4477 self.op.hypervisor = self.cfg.GetHypervisorType()
4479 cluster = self.cfg.GetClusterInfo()
4480 enabled_hvs = cluster.enabled_hypervisors
4481 if self.op.hypervisor not in enabled_hvs:
4482 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4483 " cluster (%s)" % (self.op.hypervisor,
4484 ",".join(enabled_hvs)))
4486 # check hypervisor parameter syntax (locally)
4487 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4488 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4490 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4491 hv_type.CheckParameterSyntax(filled_hvp)
4492 self.hv_full = filled_hvp
4494 # fill and remember the beparams dict
4495 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4496 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4499 #### instance parameters check
4501 # instance name verification
4502 hostname1 = utils.HostInfo(self.op.instance_name)
4503 self.op.instance_name = instance_name = hostname1.name
4505 # this is just a preventive check, but someone might still add this
4506 # instance in the meantime, and creation will fail at lock-add time
4507 if instance_name in self.cfg.GetInstanceList():
4508 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4511 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4515 for idx, nic in enumerate(self.op.nics):
4516 nic_mode_req = nic.get("mode", None)
4517 nic_mode = nic_mode_req
4518 if nic_mode is None:
4519 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4521 # in routed mode, for the first nic, the default ip is 'auto'
4522 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4523 default_ip_mode = constants.VALUE_AUTO
4525 default_ip_mode = constants.VALUE_NONE
4527 # ip validity checks
4528 ip = nic.get("ip", default_ip_mode)
4529 if ip is None or ip.lower() == constants.VALUE_NONE:
4531 elif ip.lower() == constants.VALUE_AUTO:
4532 nic_ip = hostname1.ip
4534 if not utils.IsValidIP(ip):
4535 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4536 " like a valid IP" % ip)
4539 # TODO: check the ip for uniqueness !!
4540 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4541 raise errors.OpPrereqError("Routed nic mode requires an ip address")
4543 # MAC address verification
4544 mac = nic.get("mac", constants.VALUE_AUTO)
4545 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4546 if not utils.IsValidMac(mac.lower()):
4547 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4549 # bridge verification
4550 bridge = nic.get("bridge", None)
4551 link = nic.get("link", None)
4553 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
4554 " at the same time")
4555 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4556 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4562 nicparams[constants.NIC_MODE] = nic_mode_req
4564 nicparams[constants.NIC_LINK] = link
4566 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4568 objects.NIC.CheckParameterSyntax(check_params)
4569 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4571 # disk checks/pre-build
4573 for disk in self.op.disks:
4574 mode = disk.get("mode", constants.DISK_RDWR)
4575 if mode not in constants.DISK_ACCESS_SET:
4576 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4578 size = disk.get("size", None)
4580 raise errors.OpPrereqError("Missing disk size")
4584 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4585 self.disks.append({"size": size, "mode": mode})
4587 # used in CheckPrereq for ip ping check
4588 self.check_ip = hostname1.ip
4590 # file storage checks
4591 if (self.op.file_driver and
4592 not self.op.file_driver in constants.FILE_DRIVER):
4593 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4594 self.op.file_driver)
4596 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4597 raise errors.OpPrereqError("File storage directory path not absolute")
4599 ### Node/iallocator related checks
4600 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4601 raise errors.OpPrereqError("One and only one of iallocator and primary"
4602 " node must be given")
4604 if self.op.iallocator:
4605 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4607 self.op.pnode = self._ExpandNode(self.op.pnode)
4608 nodelist = [self.op.pnode]
4609 if self.op.snode is not None:
4610 self.op.snode = self._ExpandNode(self.op.snode)
4611 nodelist.append(self.op.snode)
4612 self.needed_locks[locking.LEVEL_NODE] = nodelist
4614 # in case of import lock the source node too
4615 if self.op.mode == constants.INSTANCE_IMPORT:
4616 src_node = getattr(self.op, "src_node", None)
4617 src_path = getattr(self.op, "src_path", None)
4619 if src_path is None:
4620 self.op.src_path = src_path = self.op.instance_name
4622 if src_node is None:
4623 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4624 self.op.src_node = None
4625 if os.path.isabs(src_path):
4626 raise errors.OpPrereqError("Importing an instance from an absolute"
4627 " path requires a source node option.")
4629 self.op.src_node = src_node = self._ExpandNode(src_node)
4630 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4631 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4632 if not os.path.isabs(src_path):
4633 self.op.src_path = src_path = \
4634 os.path.join(constants.EXPORT_DIR, src_path)
4636 else: # INSTANCE_CREATE
4637 if getattr(self.op, "os_type", None) is None:
4638 raise errors.OpPrereqError("No guest OS specified")
4640 def _RunAllocator(self):
4641 """Run the allocator based on input opcode.
4644 nics = [n.ToDict() for n in self.nics]
4645 ial = IAllocator(self,
4646 mode=constants.IALLOCATOR_MODE_ALLOC,
4647 name=self.op.instance_name,
4648 disk_template=self.op.disk_template,
4651 vcpus=self.be_full[constants.BE_VCPUS],
4652 mem_size=self.be_full[constants.BE_MEMORY],
4655 hypervisor=self.op.hypervisor,
4658 ial.Run(self.op.iallocator)
4661 raise errors.OpPrereqError("Can't compute nodes using"
4662 " iallocator '%s': %s" % (self.op.iallocator,
4664 if len(ial.nodes) != ial.required_nodes:
4665 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4666 " of nodes (%s), required %s" %
4667 (self.op.iallocator, len(ial.nodes),
4668 ial.required_nodes))
4669 self.op.pnode = ial.nodes[0]
4670 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4671 self.op.instance_name, self.op.iallocator,
4672 ", ".join(ial.nodes))
4673 if ial.required_nodes == 2:
4674 self.op.snode = ial.nodes[1]
4676 def BuildHooksEnv(self):
4679 This runs on master, primary and secondary nodes of the instance.
4683 "ADD_MODE": self.op.mode,
4685 if self.op.mode == constants.INSTANCE_IMPORT:
4686 env["SRC_NODE"] = self.op.src_node
4687 env["SRC_PATH"] = self.op.src_path
4688 env["SRC_IMAGES"] = self.src_images
4690 env.update(_BuildInstanceHookEnv(
4691 name=self.op.instance_name,
4692 primary_node=self.op.pnode,
4693 secondary_nodes=self.secondaries,
4694 status=self.op.start,
4695 os_type=self.op.os_type,
4696 memory=self.be_full[constants.BE_MEMORY],
4697 vcpus=self.be_full[constants.BE_VCPUS],
4698 nics=_NICListToTuple(self, self.nics),
4699 disk_template=self.op.disk_template,
4700 disks=[(d["size"], d["mode"]) for d in self.disks],
4703 hypervisor=self.op.hypervisor,
4706 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4711 def CheckPrereq(self):
4712 """Check prerequisites.
4715 if (not self.cfg.GetVGName() and
4716 self.op.disk_template not in constants.DTS_NOT_LVM):
4717 raise errors.OpPrereqError("Cluster does not support lvm-based"
4720 if self.op.mode == constants.INSTANCE_IMPORT:
4721 src_node = self.op.src_node
4722 src_path = self.op.src_path
4724 if src_node is None:
4725 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4726 exp_list = self.rpc.call_export_list(locked_nodes)
4728 for node in exp_list:
4729 if exp_list[node].fail_msg:
4731 if src_path in exp_list[node].payload:
4733 self.op.src_node = src_node = node
4734 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4738 raise errors.OpPrereqError("No export found for relative path %s" %
4741 _CheckNodeOnline(self, src_node)
4742 result = self.rpc.call_export_info(src_node, src_path)
4743 result.Raise("No export or invalid export found in dir %s" % src_path)
4745 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4746 if not export_info.has_section(constants.INISECT_EXP):
4747 raise errors.ProgrammerError("Corrupted export config")
4749 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4750 if (int(ei_version) != constants.EXPORT_VERSION):
4751 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4752 (ei_version, constants.EXPORT_VERSION))
4754 # Check that the new instance doesn't have less disks than the export
4755 instance_disks = len(self.disks)
4756 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4757 if instance_disks < export_disks:
4758 raise errors.OpPrereqError("Not enough disks to import."
4759 " (instance: %d, export: %d)" %
4760 (instance_disks, export_disks))
4762 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4764 for idx in range(export_disks):
4765 option = 'disk%d_dump' % idx
4766 if export_info.has_option(constants.INISECT_INS, option):
4767 # FIXME: are the old os-es, disk sizes, etc. useful?
4768 export_name = export_info.get(constants.INISECT_INS, option)
4769 image = os.path.join(src_path, export_name)
4770 disk_images.append(image)
4772 disk_images.append(False)
4774 self.src_images = disk_images
4776 old_name = export_info.get(constants.INISECT_INS, 'name')
4777 # FIXME: int() here could throw a ValueError on broken exports
4778 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4779 if self.op.instance_name == old_name:
4780 for idx, nic in enumerate(self.nics):
4781 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4782 nic_mac_ini = 'nic%d_mac' % idx
4783 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4785 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4786 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4787 if self.op.start and not self.op.ip_check:
4788 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4789 " adding an instance in start mode")
4791 if self.op.ip_check:
4792 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4793 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4794 (self.check_ip, self.op.instance_name))
4796 #### mac address generation
4797 # By generating here the mac address both the allocator and the hooks get
4798 # the real final mac address rather than the 'auto' or 'generate' value.
4799 # There is a race condition between the generation and the instance object
4800 # creation, which means that we know the mac is valid now, but we're not
4801 # sure it will be when we actually add the instance. If things go bad
4802 # adding the instance will abort because of a duplicate mac, and the
4803 # creation job will fail.
4804 for nic in self.nics:
4805 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4806 nic.mac = self.cfg.GenerateMAC()
4810 if self.op.iallocator is not None:
4811 self._RunAllocator()
4813 #### node related checks
4815 # check primary node
4816 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4817 assert self.pnode is not None, \
4818 "Cannot retrieve locked node %s" % self.op.pnode
4820 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4823 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4826 self.secondaries = []
4828 # mirror node verification
4829 if self.op.disk_template in constants.DTS_NET_MIRROR:
4830 if self.op.snode is None:
4831 raise errors.OpPrereqError("The networked disk templates need"
4833 if self.op.snode == pnode.name:
4834 raise errors.OpPrereqError("The secondary node cannot be"
4835 " the primary node.")
4836 _CheckNodeOnline(self, self.op.snode)
4837 _CheckNodeNotDrained(self, self.op.snode)
4838 self.secondaries.append(self.op.snode)
4840 nodenames = [pnode.name] + self.secondaries
4842 req_size = _ComputeDiskSize(self.op.disk_template,
4845 # Check lv size requirements
4846 if req_size is not None:
4847 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4849 for node in nodenames:
4850 info = nodeinfo[node]
4851 info.Raise("Cannot get current information from node %s" % node)
4853 vg_free = info.get('vg_free', None)
4854 if not isinstance(vg_free, int):
4855 raise errors.OpPrereqError("Can't compute free disk space on"
4857 if req_size > vg_free:
4858 raise errors.OpPrereqError("Not enough disk space on target node %s."
4859 " %d MB available, %d MB required" %
4860 (node, vg_free, req_size))
4862 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4865 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4866 result.Raise("OS '%s' not in supported os list for primary node %s" %
4867 (self.op.os_type, pnode.name), prereq=True)
4869 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4871 # memory check on primary node
4873 _CheckNodeFreeMemory(self, self.pnode.name,
4874 "creating instance %s" % self.op.instance_name,
4875 self.be_full[constants.BE_MEMORY],
4878 self.dry_run_result = list(nodenames)
4880 def Exec(self, feedback_fn):
4881 """Create and add the instance to the cluster.
4884 instance = self.op.instance_name
4885 pnode_name = self.pnode.name
4887 ht_kind = self.op.hypervisor
4888 if ht_kind in constants.HTS_REQ_PORT:
4889 network_port = self.cfg.AllocatePort()
4893 ##if self.op.vnc_bind_address is None:
4894 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4896 # this is needed because os.path.join does not accept None arguments
4897 if self.op.file_storage_dir is None:
4898 string_file_storage_dir = ""
4900 string_file_storage_dir = self.op.file_storage_dir
4902 # build the full file storage dir path
4903 file_storage_dir = os.path.normpath(os.path.join(
4904 self.cfg.GetFileStorageDir(),
4905 string_file_storage_dir, instance))
4908 disks = _GenerateDiskTemplate(self,
4909 self.op.disk_template,
4910 instance, pnode_name,
4914 self.op.file_driver,
4917 iobj = objects.Instance(name=instance, os=self.op.os_type,
4918 primary_node=pnode_name,
4919 nics=self.nics, disks=disks,
4920 disk_template=self.op.disk_template,
4922 network_port=network_port,
4923 beparams=self.op.beparams,
4924 hvparams=self.op.hvparams,
4925 hypervisor=self.op.hypervisor,
4928 feedback_fn("* creating instance disks...")
4930 _CreateDisks(self, iobj)
4931 except errors.OpExecError:
4932 self.LogWarning("Device creation failed, reverting...")
4934 _RemoveDisks(self, iobj)
4936 self.cfg.ReleaseDRBDMinors(instance)
4939 feedback_fn("adding instance %s to cluster config" % instance)
4941 self.cfg.AddInstance(iobj)
4942 # Declare that we don't want to remove the instance lock anymore, as we've
4943 # added the instance to the config
4944 del self.remove_locks[locking.LEVEL_INSTANCE]
4945 # Unlock all the nodes
4946 if self.op.mode == constants.INSTANCE_IMPORT:
4947 nodes_keep = [self.op.src_node]
4948 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4949 if node != self.op.src_node]
4950 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4951 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4953 self.context.glm.release(locking.LEVEL_NODE)
4954 del self.acquired_locks[locking.LEVEL_NODE]
4956 if self.op.wait_for_sync:
4957 disk_abort = not _WaitForSync(self, iobj)
4958 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4959 # make sure the disks are not degraded (still sync-ing is ok)
4961 feedback_fn("* checking mirrors status")
4962 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4967 _RemoveDisks(self, iobj)
4968 self.cfg.RemoveInstance(iobj.name)
4969 # Make sure the instance lock gets removed
4970 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4971 raise errors.OpExecError("There are some degraded disks for"
4974 feedback_fn("creating os for instance %s on node %s" %
4975 (instance, pnode_name))
4977 if iobj.disk_template != constants.DT_DISKLESS:
4978 if self.op.mode == constants.INSTANCE_CREATE:
4979 feedback_fn("* running the instance OS create scripts...")
4980 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4981 result.Raise("Could not add os for instance %s"
4982 " on node %s" % (instance, pnode_name))
4984 elif self.op.mode == constants.INSTANCE_IMPORT:
4985 feedback_fn("* running the instance OS import scripts...")
4986 src_node = self.op.src_node
4987 src_images = self.src_images
4988 cluster_name = self.cfg.GetClusterName()
4989 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4990 src_node, src_images,
4992 msg = import_result.fail_msg
4994 self.LogWarning("Error while importing the disk images for instance"
4995 " %s on node %s: %s" % (instance, pnode_name, msg))
4997 # also checked in the prereq part
4998 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5002 iobj.admin_up = True
5003 self.cfg.Update(iobj)
5004 logging.info("Starting instance %s on node %s", instance, pnode_name)
5005 feedback_fn("* starting instance...")
5006 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5007 result.Raise("Could not start instance")
5009 return list(iobj.all_nodes)
5012 class LUConnectConsole(NoHooksLU):
5013 """Connect to an instance's console.
5015 This is somewhat special in that it returns the command line that
5016 you need to run on the master node in order to connect to the
5020 _OP_REQP = ["instance_name"]
5023 def ExpandNames(self):
5024 self._ExpandAndLockInstance()
5026 def CheckPrereq(self):
5027 """Check prerequisites.
5029 This checks that the instance is in the cluster.
5032 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5033 assert self.instance is not None, \
5034 "Cannot retrieve locked instance %s" % self.op.instance_name
5035 _CheckNodeOnline(self, self.instance.primary_node)
5037 def Exec(self, feedback_fn):
5038 """Connect to the console of an instance
5041 instance = self.instance
5042 node = instance.primary_node
5044 node_insts = self.rpc.call_instance_list([node],
5045 [instance.hypervisor])[node]
5046 node_insts.Raise("Can't get node information from %s" % node)
5048 if instance.name not in node_insts.payload:
5049 raise errors.OpExecError("Instance %s is not running." % instance.name)
5051 logging.debug("Connecting to console of %s on %s", instance.name, node)
5053 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5054 cluster = self.cfg.GetClusterInfo()
5055 # beparams and hvparams are passed separately, to avoid editing the
5056 # instance and then saving the defaults in the instance itself.
5057 hvparams = cluster.FillHV(instance)
5058 beparams = cluster.FillBE(instance)
5059 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5062 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5065 class LUReplaceDisks(LogicalUnit):
5066 """Replace the disks of an instance.
5069 HPATH = "mirrors-replace"
5070 HTYPE = constants.HTYPE_INSTANCE
5071 _OP_REQP = ["instance_name", "mode", "disks"]
5074 def CheckArguments(self):
5075 if not hasattr(self.op, "remote_node"):
5076 self.op.remote_node = None
5077 if not hasattr(self.op, "iallocator"):
5078 self.op.iallocator = None
5080 # check for valid parameter combination
5081 cnt = [self.op.remote_node, self.op.iallocator].count(None)
5082 if self.op.mode == constants.REPLACE_DISK_CHG:
5084 raise errors.OpPrereqError("When changing the secondary either an"
5085 " iallocator script must be used or the"
5088 raise errors.OpPrereqError("Give either the iallocator or the new"
5089 " secondary, not both")
5090 else: # not replacing the secondary
5092 raise errors.OpPrereqError("The iallocator and new node options can"
5093 " be used only when changing the"
5096 def ExpandNames(self):
5097 self._ExpandAndLockInstance()
5099 if self.op.iallocator is not None:
5100 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5101 elif self.op.remote_node is not None:
5102 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5103 if remote_node is None:
5104 raise errors.OpPrereqError("Node '%s' not known" %
5105 self.op.remote_node)
5106 self.op.remote_node = remote_node
5107 # Warning: do not remove the locking of the new secondary here
5108 # unless DRBD8.AddChildren is changed to work in parallel;
5109 # currently it doesn't since parallel invocations of
5110 # FindUnusedMinor will conflict
5111 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5112 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5114 self.needed_locks[locking.LEVEL_NODE] = []
5115 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5117 def DeclareLocks(self, level):
5118 # If we're not already locking all nodes in the set we have to declare the
5119 # instance's primary/secondary nodes.
5120 if (level == locking.LEVEL_NODE and
5121 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5122 self._LockInstancesNodes()
5124 def _RunAllocator(self):
5125 """Compute a new secondary node using an IAllocator.
5128 ial = IAllocator(self,
5129 mode=constants.IALLOCATOR_MODE_RELOC,
5130 name=self.op.instance_name,
5131 relocate_from=[self.sec_node])
5133 ial.Run(self.op.iallocator)
5136 raise errors.OpPrereqError("Can't compute nodes using"
5137 " iallocator '%s': %s" % (self.op.iallocator,
5139 if len(ial.nodes) != ial.required_nodes:
5140 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5141 " of nodes (%s), required %s" %
5142 (len(ial.nodes), ial.required_nodes))
5143 self.op.remote_node = ial.nodes[0]
5144 self.LogInfo("Selected new secondary for the instance: %s",
5145 self.op.remote_node)
5147 def BuildHooksEnv(self):
5150 This runs on the master, the primary and all the secondaries.
5154 "MODE": self.op.mode,
5155 "NEW_SECONDARY": self.op.remote_node,
5156 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5158 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5160 self.cfg.GetMasterNode(),
5161 self.instance.primary_node,
5163 if self.op.remote_node is not None:
5164 nl.append(self.op.remote_node)
5167 def CheckPrereq(self):
5168 """Check prerequisites.
5170 This checks that the instance is in the cluster.
5173 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5174 assert instance is not None, \
5175 "Cannot retrieve locked instance %s" % self.op.instance_name
5176 self.instance = instance
5178 if instance.disk_template != constants.DT_DRBD8:
5179 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5182 if len(instance.secondary_nodes) != 1:
5183 raise errors.OpPrereqError("The instance has a strange layout,"
5184 " expected one secondary but found %d" %
5185 len(instance.secondary_nodes))
5187 self.sec_node = instance.secondary_nodes[0]
5189 if self.op.iallocator is not None:
5190 self._RunAllocator()
5192 remote_node = self.op.remote_node
5193 if remote_node is not None:
5194 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5195 assert self.remote_node_info is not None, \
5196 "Cannot retrieve locked node %s" % remote_node
5198 self.remote_node_info = None
5199 if remote_node == instance.primary_node:
5200 raise errors.OpPrereqError("The specified node is the primary node of"
5202 elif remote_node == self.sec_node:
5203 raise errors.OpPrereqError("The specified node is already the"
5204 " secondary node of the instance.")
5206 if self.op.mode == constants.REPLACE_DISK_PRI:
5207 n1 = self.tgt_node = instance.primary_node
5208 n2 = self.oth_node = self.sec_node
5209 elif self.op.mode == constants.REPLACE_DISK_SEC:
5210 n1 = self.tgt_node = self.sec_node
5211 n2 = self.oth_node = instance.primary_node
5212 elif self.op.mode == constants.REPLACE_DISK_CHG:
5213 n1 = self.new_node = remote_node
5214 n2 = self.oth_node = instance.primary_node
5215 self.tgt_node = self.sec_node
5216 _CheckNodeNotDrained(self, remote_node)
5218 raise errors.ProgrammerError("Unhandled disk replace mode")
5220 _CheckNodeOnline(self, n1)
5221 _CheckNodeOnline(self, n2)
5223 if not self.op.disks:
5224 self.op.disks = range(len(instance.disks))
5226 for disk_idx in self.op.disks:
5227 instance.FindDisk(disk_idx)
5229 def _ExecD8DiskOnly(self, feedback_fn):
5230 """Replace a disk on the primary or secondary for dbrd8.
5232 The algorithm for replace is quite complicated:
5234 1. for each disk to be replaced:
5236 1. create new LVs on the target node with unique names
5237 1. detach old LVs from the drbd device
5238 1. rename old LVs to name_replaced.<time_t>
5239 1. rename new LVs to old LVs
5240 1. attach the new LVs (with the old names now) to the drbd device
5242 1. wait for sync across all devices
5244 1. for each modified disk:
5246 1. remove old LVs (which have the name name_replaces.<time_t>)
5248 Failures are not very well handled.
5252 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5253 instance = self.instance
5255 vgname = self.cfg.GetVGName()
5258 tgt_node = self.tgt_node
5259 oth_node = self.oth_node
5261 # Step: check device activation
5262 self.proc.LogStep(1, steps_total, "check device existence")
5263 info("checking volume groups")
5264 my_vg = cfg.GetVGName()
5265 results = self.rpc.call_vg_list([oth_node, tgt_node])
5267 raise errors.OpExecError("Can't list volume groups on the nodes")
5268 for node in oth_node, tgt_node:
5270 res.Raise("Error checking node %s" % node)
5271 if my_vg not in res.payload:
5272 raise errors.OpExecError("Volume group '%s' not found on %s" %
5274 for idx, dev in enumerate(instance.disks):
5275 if idx not in self.op.disks:
5277 for node in tgt_node, oth_node:
5278 info("checking disk/%d on %s" % (idx, node))
5279 cfg.SetDiskID(dev, node)
5280 result = self.rpc.call_blockdev_find(node, dev)
5281 msg = result.fail_msg
5282 if not msg and not result.payload:
5283 msg = "disk not found"
5285 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5288 # Step: check other node consistency
5289 self.proc.LogStep(2, steps_total, "check peer consistency")
5290 for idx, dev in enumerate(instance.disks):
5291 if idx not in self.op.disks:
5293 info("checking disk/%d consistency on %s" % (idx, oth_node))
5294 if not _CheckDiskConsistency(self, dev, oth_node,
5295 oth_node==instance.primary_node):
5296 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5297 " to replace disks on this node (%s)" %
5298 (oth_node, tgt_node))
5300 # Step: create new storage
5301 self.proc.LogStep(3, steps_total, "allocate new storage")
5302 for idx, dev in enumerate(instance.disks):
5303 if idx not in self.op.disks:
5306 cfg.SetDiskID(dev, tgt_node)
5307 lv_names = [".disk%d_%s" % (idx, suf)
5308 for suf in ["data", "meta"]]
5309 names = _GenerateUniqueNames(self, lv_names)
5310 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5311 logical_id=(vgname, names[0]))
5312 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5313 logical_id=(vgname, names[1]))
5314 new_lvs = [lv_data, lv_meta]
5315 old_lvs = dev.children
5316 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5317 info("creating new local storage on %s for %s" %
5318 (tgt_node, dev.iv_name))
5319 # we pass force_create=True to force the LVM creation
5320 for new_lv in new_lvs:
5321 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5322 _GetInstanceInfoText(instance), False)
5324 # Step: for each lv, detach+rename*2+attach
5325 self.proc.LogStep(4, steps_total, "change drbd configuration")
5326 for dev, old_lvs, new_lvs in iv_names.itervalues():
5327 info("detaching %s drbd from local storage" % dev.iv_name)
5328 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5329 result.Raise("Can't detach drbd from local storage on node"
5330 " %s for device %s" % (tgt_node, dev.iv_name))
5332 #cfg.Update(instance)
5334 # ok, we created the new LVs, so now we know we have the needed
5335 # storage; as such, we proceed on the target node to rename
5336 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5337 # using the assumption that logical_id == physical_id (which in
5338 # turn is the unique_id on that node)
5340 # FIXME(iustin): use a better name for the replaced LVs
5341 temp_suffix = int(time.time())
5342 ren_fn = lambda d, suff: (d.physical_id[0],
5343 d.physical_id[1] + "_replaced-%s" % suff)
5344 # build the rename list based on what LVs exist on the node
5346 for to_ren in old_lvs:
5347 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5348 if not result.fail_msg and result.payload:
5350 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5352 info("renaming the old LVs on the target node")
5353 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5354 result.Raise("Can't rename old LVs on node %s" % tgt_node)
5355 # now we rename the new LVs to the old LVs
5356 info("renaming the new LVs on the target node")
5357 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5358 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5359 result.Raise("Can't rename new LVs on node %s" % tgt_node)
5361 for old, new in zip(old_lvs, new_lvs):
5362 new.logical_id = old.logical_id
5363 cfg.SetDiskID(new, tgt_node)
5365 for disk in old_lvs:
5366 disk.logical_id = ren_fn(disk, temp_suffix)
5367 cfg.SetDiskID(disk, tgt_node)
5369 # now that the new lvs have the old name, we can add them to the device
5370 info("adding new mirror component on %s" % tgt_node)
5371 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5372 msg = result.fail_msg
5374 for new_lv in new_lvs:
5375 msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
5377 warning("Can't rollback device %s: %s", dev, msg2,
5378 hint="cleanup manually the unused logical volumes")
5379 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5381 dev.children = new_lvs
5382 cfg.Update(instance)
5384 # Step: wait for sync
5386 # this can fail as the old devices are degraded and _WaitForSync
5387 # does a combined result over all disks, so we don't check its
5389 self.proc.LogStep(5, steps_total, "sync devices")
5390 _WaitForSync(self, instance, unlock=True)
5392 # so check manually all the devices
5393 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5394 cfg.SetDiskID(dev, instance.primary_node)
5395 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5396 msg = result.fail_msg
5397 if not msg and not result.payload:
5398 msg = "disk not found"
5400 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5402 if result.payload[5]:
5403 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5405 # Step: remove old storage
5406 self.proc.LogStep(6, steps_total, "removing old storage")
5407 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5408 info("remove logical volumes for %s" % name)
5410 cfg.SetDiskID(lv, tgt_node)
5411 msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
5413 warning("Can't remove old LV: %s" % msg,
5414 hint="manually remove unused LVs")
5417 def _ExecD8Secondary(self, feedback_fn):
5418 """Replace the secondary node for drbd8.
5420 The algorithm for replace is quite complicated:
5421 - for all disks of the instance:
5422 - create new LVs on the new node with same names
5423 - shutdown the drbd device on the old secondary
5424 - disconnect the drbd network on the primary
5425 - create the drbd device on the new secondary
5426 - network attach the drbd on the primary, using an artifice:
5427 the drbd code for Attach() will connect to the network if it
5428 finds a device which is connected to the good local disks but
5430 - wait for sync across all devices
5431 - remove all disks from the old secondary
5433 Failures are not very well handled.
5437 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5438 instance = self.instance
5442 old_node = self.tgt_node
5443 new_node = self.new_node
5444 pri_node = instance.primary_node
5446 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5447 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5448 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5451 # Step: check device activation
5452 self.proc.LogStep(1, steps_total, "check device existence")
5453 info("checking volume groups")
5454 my_vg = cfg.GetVGName()
5455 results = self.rpc.call_vg_list([pri_node, new_node])
5456 for node in pri_node, new_node:
5458 res.Raise("Error checking node %s" % node)
5459 if my_vg not in res.payload:
5460 raise errors.OpExecError("Volume group '%s' not found on %s" %
5462 for idx, dev in enumerate(instance.disks):
5463 if idx not in self.op.disks:
5465 info("checking disk/%d on %s" % (idx, pri_node))
5466 cfg.SetDiskID(dev, pri_node)
5467 result = self.rpc.call_blockdev_find(pri_node, dev)
5468 msg = result.fail_msg
5469 if not msg and not result.payload:
5470 msg = "disk not found"
5472 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5473 (idx, pri_node, msg))
5475 # Step: check other node consistency
5476 self.proc.LogStep(2, steps_total, "check peer consistency")
5477 for idx, dev in enumerate(instance.disks):
5478 if idx not in self.op.disks:
5480 info("checking disk/%d consistency on %s" % (idx, pri_node))
5481 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5482 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5483 " unsafe to replace the secondary" %
5486 # Step: create new storage
5487 self.proc.LogStep(3, steps_total, "allocate new storage")
5488 for idx, dev in enumerate(instance.disks):
5489 info("adding new local storage on %s for disk/%d" %
5491 # we pass force_create=True to force LVM creation
5492 for new_lv in dev.children:
5493 _CreateBlockDev(self, new_node, instance, new_lv, True,
5494 _GetInstanceInfoText(instance), False)
5496 # Step 4: dbrd minors and drbd setups changes
5497 # after this, we must manually remove the drbd minors on both the
5498 # error and the success paths
5499 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5501 logging.debug("Allocated minors %s" % (minors,))
5502 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5503 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5505 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5506 # create new devices on new_node; note that we create two IDs:
5507 # one without port, so the drbd will be activated without
5508 # networking information on the new node at this stage, and one
5509 # with network, for the latter activation in step 4
5510 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5511 if pri_node == o_node1:
5516 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5517 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5519 iv_names[idx] = (dev, dev.children, new_net_id)
5520 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5522 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5523 logical_id=new_alone_id,
5524 children=dev.children,
5527 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5528 _GetInstanceInfoText(instance), False)
5529 except errors.GenericError:
5530 self.cfg.ReleaseDRBDMinors(instance.name)
5533 for idx, dev in enumerate(instance.disks):
5534 # we have new devices, shutdown the drbd on the old secondary
5535 info("shutting down drbd for disk/%d on old node" % idx)
5536 cfg.SetDiskID(dev, old_node)
5537 msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
5539 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5541 hint="Please cleanup this device manually as soon as possible")
5543 info("detaching primary drbds from the network (=> standalone)")
5544 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5545 instance.disks)[pri_node]
5547 msg = result.fail_msg
5549 # detaches didn't succeed (unlikely)
5550 self.cfg.ReleaseDRBDMinors(instance.name)
5551 raise errors.OpExecError("Can't detach the disks from the network on"
5552 " old node: %s" % (msg,))
5554 # if we managed to detach at least one, we update all the disks of
5555 # the instance to point to the new secondary
5556 info("updating instance configuration")
5557 for dev, _, new_logical_id in iv_names.itervalues():
5558 dev.logical_id = new_logical_id
5559 cfg.SetDiskID(dev, pri_node)
5560 cfg.Update(instance)
5562 # and now perform the drbd attach
5563 info("attaching primary drbds to new secondary (standalone => connected)")
5564 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5565 instance.disks, instance.name,
5567 for to_node, to_result in result.items():
5568 msg = to_result.fail_msg
5570 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5571 hint="please do a gnt-instance info to see the"
5574 # this can fail as the old devices are degraded and _WaitForSync
5575 # does a combined result over all disks, so we don't check its
5577 self.proc.LogStep(5, steps_total, "sync devices")
5578 _WaitForSync(self, instance, unlock=True)
5580 # so check manually all the devices
5581 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5582 cfg.SetDiskID(dev, pri_node)
5583 result = self.rpc.call_blockdev_find(pri_node, dev)
5584 msg = result.fail_msg
5585 if not msg and not result.payload:
5586 msg = "disk not found"
5588 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5590 if result.payload[5]:
5591 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5593 self.proc.LogStep(6, steps_total, "removing old storage")
5594 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5595 info("remove logical volumes for disk/%d" % idx)
5597 cfg.SetDiskID(lv, old_node)
5598 msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
5600 warning("Can't remove LV on old secondary: %s", msg,
5601 hint="Cleanup stale volumes by hand")
5603 def Exec(self, feedback_fn):
5604 """Execute disk replacement.
5606 This dispatches the disk replacement to the appropriate handler.
5609 instance = self.instance
5611 # Activate the instance disks if we're replacing them on a down instance
5612 if not instance.admin_up:
5613 _StartInstanceDisks(self, instance, True)
5615 if self.op.mode == constants.REPLACE_DISK_CHG:
5616 fn = self._ExecD8Secondary
5618 fn = self._ExecD8DiskOnly
5620 ret = fn(feedback_fn)
5622 # Deactivate the instance disks if we're replacing them on a down instance
5623 if not instance.admin_up:
5624 _SafeShutdownInstanceDisks(self, instance)
5629 class LUGrowDisk(LogicalUnit):
5630 """Grow a disk of an instance.
5634 HTYPE = constants.HTYPE_INSTANCE
5635 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5638 def ExpandNames(self):
5639 self._ExpandAndLockInstance()
5640 self.needed_locks[locking.LEVEL_NODE] = []
5641 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5643 def DeclareLocks(self, level):
5644 if level == locking.LEVEL_NODE:
5645 self._LockInstancesNodes()
5647 def BuildHooksEnv(self):
5650 This runs on the master, the primary and all the secondaries.
5654 "DISK": self.op.disk,
5655 "AMOUNT": self.op.amount,
5657 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5659 self.cfg.GetMasterNode(),
5660 self.instance.primary_node,
5664 def CheckPrereq(self):
5665 """Check prerequisites.
5667 This checks that the instance is in the cluster.
5670 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5671 assert instance is not None, \
5672 "Cannot retrieve locked instance %s" % self.op.instance_name
5673 nodenames = list(instance.all_nodes)
5674 for node in nodenames:
5675 _CheckNodeOnline(self, node)
5678 self.instance = instance
5680 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5681 raise errors.OpPrereqError("Instance's disk layout does not support"
5684 self.disk = instance.FindDisk(self.op.disk)
5686 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5687 instance.hypervisor)
5688 for node in nodenames:
5689 info = nodeinfo[node]
5690 info.Raise("Cannot get current information from node %s" % node)
5691 vg_free = info.payload.get('vg_free', None)
5692 if not isinstance(vg_free, int):
5693 raise errors.OpPrereqError("Can't compute free disk space on"
5695 if self.op.amount > vg_free:
5696 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5697 " %d MiB available, %d MiB required" %
5698 (node, vg_free, self.op.amount))
5700 def Exec(self, feedback_fn):
5701 """Execute disk grow.
5704 instance = self.instance
5706 for node in instance.all_nodes:
5707 self.cfg.SetDiskID(disk, node)
5708 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5709 result.Raise("Grow request failed to node %s" % node)
5710 disk.RecordGrow(self.op.amount)
5711 self.cfg.Update(instance)
5712 if self.op.wait_for_sync:
5713 disk_abort = not _WaitForSync(self, instance)
5715 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5716 " status.\nPlease check the instance.")
5719 class LUQueryInstanceData(NoHooksLU):
5720 """Query runtime instance data.
5723 _OP_REQP = ["instances", "static"]
5726 def ExpandNames(self):
5727 self.needed_locks = {}
5728 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5730 if not isinstance(self.op.instances, list):
5731 raise errors.OpPrereqError("Invalid argument type 'instances'")
5733 if self.op.instances:
5734 self.wanted_names = []
5735 for name in self.op.instances:
5736 full_name = self.cfg.ExpandInstanceName(name)
5737 if full_name is None:
5738 raise errors.OpPrereqError("Instance '%s' not known" % name)
5739 self.wanted_names.append(full_name)
5740 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5742 self.wanted_names = None
5743 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5745 self.needed_locks[locking.LEVEL_NODE] = []
5746 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5748 def DeclareLocks(self, level):
5749 if level == locking.LEVEL_NODE:
5750 self._LockInstancesNodes()
5752 def CheckPrereq(self):
5753 """Check prerequisites.
5755 This only checks the optional instance list against the existing names.
5758 if self.wanted_names is None:
5759 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5761 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5762 in self.wanted_names]
5765 def _ComputeDiskStatus(self, instance, snode, dev):
5766 """Compute block device status.
5769 static = self.op.static
5771 self.cfg.SetDiskID(dev, instance.primary_node)
5772 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5773 if dev_pstatus.offline:
5776 dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5777 dev_pstatus = dev_pstatus.payload
5781 if dev.dev_type in constants.LDS_DRBD:
5782 # we change the snode then (otherwise we use the one passed in)
5783 if dev.logical_id[0] == instance.primary_node:
5784 snode = dev.logical_id[1]
5786 snode = dev.logical_id[0]
5788 if snode and not static:
5789 self.cfg.SetDiskID(dev, snode)
5790 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5791 if dev_sstatus.offline:
5794 dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5795 dev_sstatus = dev_sstatus.payload
5800 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5801 for child in dev.children]
5806 "iv_name": dev.iv_name,
5807 "dev_type": dev.dev_type,
5808 "logical_id": dev.logical_id,
5809 "physical_id": dev.physical_id,
5810 "pstatus": dev_pstatus,
5811 "sstatus": dev_sstatus,
5812 "children": dev_children,
5819 def Exec(self, feedback_fn):
5820 """Gather and return data"""
5823 cluster = self.cfg.GetClusterInfo()
5825 for instance in self.wanted_instances:
5826 if not self.op.static:
5827 remote_info = self.rpc.call_instance_info(instance.primary_node,
5829 instance.hypervisor)
5830 remote_info.Raise("Error checking node %s" % instance.primary_node)
5831 remote_info = remote_info.payload
5832 if remote_info and "state" in remote_info:
5835 remote_state = "down"
5838 if instance.admin_up:
5841 config_state = "down"
5843 disks = [self._ComputeDiskStatus(instance, None, device)
5844 for device in instance.disks]
5847 "name": instance.name,
5848 "config_state": config_state,
5849 "run_state": remote_state,
5850 "pnode": instance.primary_node,
5851 "snodes": instance.secondary_nodes,
5853 # this happens to be the same format used for hooks
5854 "nics": _NICListToTuple(self, instance.nics),
5856 "hypervisor": instance.hypervisor,
5857 "network_port": instance.network_port,
5858 "hv_instance": instance.hvparams,
5859 "hv_actual": cluster.FillHV(instance),
5860 "be_instance": instance.beparams,
5861 "be_actual": cluster.FillBE(instance),
5864 result[instance.name] = idict
5869 class LUSetInstanceParams(LogicalUnit):
5870 """Modifies an instances's parameters.
5873 HPATH = "instance-modify"
5874 HTYPE = constants.HTYPE_INSTANCE
5875 _OP_REQP = ["instance_name"]
5878 def CheckArguments(self):
5879 if not hasattr(self.op, 'nics'):
5881 if not hasattr(self.op, 'disks'):
5883 if not hasattr(self.op, 'beparams'):
5884 self.op.beparams = {}
5885 if not hasattr(self.op, 'hvparams'):
5886 self.op.hvparams = {}
5887 self.op.force = getattr(self.op, "force", False)
5888 if not (self.op.nics or self.op.disks or
5889 self.op.hvparams or self.op.beparams):
5890 raise errors.OpPrereqError("No changes submitted")
5894 for disk_op, disk_dict in self.op.disks:
5895 if disk_op == constants.DDM_REMOVE:
5898 elif disk_op == constants.DDM_ADD:
5901 if not isinstance(disk_op, int):
5902 raise errors.OpPrereqError("Invalid disk index")
5903 if not isinstance(disk_dict, dict):
5904 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
5905 raise errors.OpPrereqError(msg)
5907 if disk_op == constants.DDM_ADD:
5908 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5909 if mode not in constants.DISK_ACCESS_SET:
5910 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5911 size = disk_dict.get('size', None)
5913 raise errors.OpPrereqError("Required disk parameter size missing")
5916 except ValueError, err:
5917 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5919 disk_dict['size'] = size
5921 # modification of disk
5922 if 'size' in disk_dict:
5923 raise errors.OpPrereqError("Disk size change not possible, use"
5926 if disk_addremove > 1:
5927 raise errors.OpPrereqError("Only one disk add or remove operation"
5928 " supported at a time")
5932 for nic_op, nic_dict in self.op.nics:
5933 if nic_op == constants.DDM_REMOVE:
5936 elif nic_op == constants.DDM_ADD:
5939 if not isinstance(nic_op, int):
5940 raise errors.OpPrereqError("Invalid nic index")
5941 if not isinstance(nic_dict, dict):
5942 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
5943 raise errors.OpPrereqError(msg)
5945 # nic_dict should be a dict
5946 nic_ip = nic_dict.get('ip', None)
5947 if nic_ip is not None:
5948 if nic_ip.lower() == constants.VALUE_NONE:
5949 nic_dict['ip'] = None
5951 if not utils.IsValidIP(nic_ip):
5952 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5954 nic_bridge = nic_dict.get('bridge', None)
5955 nic_link = nic_dict.get('link', None)
5956 if nic_bridge and nic_link:
5957 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5958 " at the same time")
5959 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5960 nic_dict['bridge'] = None
5961 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5962 nic_dict['link'] = None
5964 if nic_op == constants.DDM_ADD:
5965 nic_mac = nic_dict.get('mac', None)
5967 nic_dict['mac'] = constants.VALUE_AUTO
5969 if 'mac' in nic_dict:
5970 nic_mac = nic_dict['mac']
5971 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5972 if not utils.IsValidMac(nic_mac):
5973 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5974 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5975 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5976 " modifying an existing nic")
5978 if nic_addremove > 1:
5979 raise errors.OpPrereqError("Only one NIC add or remove operation"
5980 " supported at a time")
5982 def ExpandNames(self):
5983 self._ExpandAndLockInstance()
5984 self.needed_locks[locking.LEVEL_NODE] = []
5985 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5987 def DeclareLocks(self, level):
5988 if level == locking.LEVEL_NODE:
5989 self._LockInstancesNodes()
5991 def BuildHooksEnv(self):
5994 This runs on the master, primary and secondaries.
5998 if constants.BE_MEMORY in self.be_new:
5999 args['memory'] = self.be_new[constants.BE_MEMORY]
6000 if constants.BE_VCPUS in self.be_new:
6001 args['vcpus'] = self.be_new[constants.BE_VCPUS]
6002 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6003 # information at all.
6006 nic_override = dict(self.op.nics)
6007 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6008 for idx, nic in enumerate(self.instance.nics):
6009 if idx in nic_override:
6010 this_nic_override = nic_override[idx]
6012 this_nic_override = {}
6013 if 'ip' in this_nic_override:
6014 ip = this_nic_override['ip']
6017 if 'mac' in this_nic_override:
6018 mac = this_nic_override['mac']
6021 if idx in self.nic_pnew:
6022 nicparams = self.nic_pnew[idx]
6024 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6025 mode = nicparams[constants.NIC_MODE]
6026 link = nicparams[constants.NIC_LINK]
6027 args['nics'].append((ip, mac, mode, link))
6028 if constants.DDM_ADD in nic_override:
6029 ip = nic_override[constants.DDM_ADD].get('ip', None)
6030 mac = nic_override[constants.DDM_ADD]['mac']
6031 nicparams = self.nic_pnew[constants.DDM_ADD]
6032 mode = nicparams[constants.NIC_MODE]
6033 link = nicparams[constants.NIC_LINK]
6034 args['nics'].append((ip, mac, mode, link))
6035 elif constants.DDM_REMOVE in nic_override:
6036 del args['nics'][-1]
6038 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6039 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6042 def _GetUpdatedParams(self, old_params, update_dict,
6043 default_values, parameter_types):
6044 """Return the new params dict for the given params.
6046 @type old_params: dict
6047 @param old_params: old parameters
6048 @type update_dict: dict
6049 @param update_dict: dict containing new parameter values,
6050 or constants.VALUE_DEFAULT to reset the
6051 parameter to its default value
6052 @type default_values: dict
6053 @param default_values: default values for the filled parameters
6054 @type parameter_types: dict
6055 @param parameter_types: dict mapping target dict keys to types
6056 in constants.ENFORCEABLE_TYPES
6057 @rtype: (dict, dict)
6058 @return: (new_parameters, filled_parameters)
6061 params_copy = copy.deepcopy(old_params)
6062 for key, val in update_dict.iteritems():
6063 if val == constants.VALUE_DEFAULT:
6065 del params_copy[key]
6069 params_copy[key] = val
6070 utils.ForceDictType(params_copy, parameter_types)
6071 params_filled = objects.FillDict(default_values, params_copy)
6072 return (params_copy, params_filled)
6074 def CheckPrereq(self):
6075 """Check prerequisites.
6077 This only checks the instance list against the existing names.
6080 force = self.force = self.op.force
6082 # checking the new params on the primary/secondary nodes
6084 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6085 cluster = self.cluster = self.cfg.GetClusterInfo()
6086 assert self.instance is not None, \
6087 "Cannot retrieve locked instance %s" % self.op.instance_name
6088 pnode = instance.primary_node
6089 nodelist = list(instance.all_nodes)
6091 # hvparams processing
6092 if self.op.hvparams:
6093 i_hvdict, hv_new = self._GetUpdatedParams(
6094 instance.hvparams, self.op.hvparams,
6095 cluster.hvparams[instance.hypervisor],
6096 constants.HVS_PARAMETER_TYPES)
6098 hypervisor.GetHypervisor(
6099 instance.hypervisor).CheckParameterSyntax(hv_new)
6100 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6101 self.hv_new = hv_new # the new actual values
6102 self.hv_inst = i_hvdict # the new dict (without defaults)
6104 self.hv_new = self.hv_inst = {}
6106 # beparams processing
6107 if self.op.beparams:
6108 i_bedict, be_new = self._GetUpdatedParams(
6109 instance.beparams, self.op.beparams,
6110 cluster.beparams[constants.PP_DEFAULT],
6111 constants.BES_PARAMETER_TYPES)
6112 self.be_new = be_new # the new actual values
6113 self.be_inst = i_bedict # the new dict (without defaults)
6115 self.be_new = self.be_inst = {}
6119 if constants.BE_MEMORY in self.op.beparams and not self.force:
6120 mem_check_list = [pnode]
6121 if be_new[constants.BE_AUTO_BALANCE]:
6122 # either we changed auto_balance to yes or it was from before
6123 mem_check_list.extend(instance.secondary_nodes)
6124 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6125 instance.hypervisor)
6126 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6127 instance.hypervisor)
6128 pninfo = nodeinfo[pnode]
6129 msg = pninfo.fail_msg
6131 # Assume the primary node is unreachable and go ahead
6132 self.warn.append("Can't get info from primary node %s: %s" %
6134 elif not isinstance(pninfo.payload.get('memory_free', None), int):
6135 self.warn.append("Node data from primary node %s doesn't contain"
6136 " free memory information" % pnode)
6137 elif instance_info.fail_msg:
6138 self.warn.append("Can't get instance runtime information: %s" %
6139 instance_info.fail_msg)
6141 if instance_info.payload:
6142 current_mem = int(instance_info.payload['memory'])
6144 # Assume instance not running
6145 # (there is a slight race condition here, but it's not very probable,
6146 # and we have no other way to check)
6148 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6149 pninfo.payload['memory_free'])
6151 raise errors.OpPrereqError("This change will prevent the instance"
6152 " from starting, due to %d MB of memory"
6153 " missing on its primary node" % miss_mem)
6155 if be_new[constants.BE_AUTO_BALANCE]:
6156 for node, nres in nodeinfo.items():
6157 if node not in instance.secondary_nodes:
6161 self.warn.append("Can't get info from secondary node %s: %s" %
6163 elif not isinstance(nres.payload.get('memory_free', None), int):
6164 self.warn.append("Secondary node %s didn't return free"
6165 " memory information" % node)
6166 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6167 self.warn.append("Not enough memory to failover instance to"
6168 " secondary node %s" % node)
6173 for nic_op, nic_dict in self.op.nics:
6174 if nic_op == constants.DDM_REMOVE:
6175 if not instance.nics:
6176 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6178 if nic_op != constants.DDM_ADD:
6180 if nic_op < 0 or nic_op >= len(instance.nics):
6181 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6183 (nic_op, len(instance.nics)))
6184 old_nic_params = instance.nics[nic_op].nicparams
6185 old_nic_ip = instance.nics[nic_op].ip
6190 update_params_dict = dict([(key, nic_dict[key])
6191 for key in constants.NICS_PARAMETERS
6192 if key in nic_dict])
6194 if 'bridge' in nic_dict:
6195 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6197 new_nic_params, new_filled_nic_params = \
6198 self._GetUpdatedParams(old_nic_params, update_params_dict,
6199 cluster.nicparams[constants.PP_DEFAULT],
6200 constants.NICS_PARAMETER_TYPES)
6201 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6202 self.nic_pinst[nic_op] = new_nic_params
6203 self.nic_pnew[nic_op] = new_filled_nic_params
6204 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6206 if new_nic_mode == constants.NIC_MODE_BRIDGED:
6207 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6208 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6210 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6212 self.warn.append(msg)
6214 raise errors.OpPrereqError(msg)
6215 if new_nic_mode == constants.NIC_MODE_ROUTED:
6216 if 'ip' in nic_dict:
6217 nic_ip = nic_dict['ip']
6221 raise errors.OpPrereqError('Cannot set the nic ip to None'
6223 if 'mac' in nic_dict:
6224 nic_mac = nic_dict['mac']
6226 raise errors.OpPrereqError('Cannot set the nic mac to None')
6227 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6228 # otherwise generate the mac
6229 nic_dict['mac'] = self.cfg.GenerateMAC()
6231 # or validate/reserve the current one
6232 if self.cfg.IsMacInUse(nic_mac):
6233 raise errors.OpPrereqError("MAC address %s already in use"
6234 " in cluster" % nic_mac)
6237 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6238 raise errors.OpPrereqError("Disk operations not supported for"
6239 " diskless instances")
6240 for disk_op, disk_dict in self.op.disks:
6241 if disk_op == constants.DDM_REMOVE:
6242 if len(instance.disks) == 1:
6243 raise errors.OpPrereqError("Cannot remove the last disk of"
6245 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6246 ins_l = ins_l[pnode]
6247 msg = ins_l.fail_msg
6249 raise errors.OpPrereqError("Can't contact node %s: %s" %
6251 if instance.name in ins_l.payload:
6252 raise errors.OpPrereqError("Instance is running, can't remove"
6255 if (disk_op == constants.DDM_ADD and
6256 len(instance.nics) >= constants.MAX_DISKS):
6257 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6258 " add more" % constants.MAX_DISKS)
6259 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6261 if disk_op < 0 or disk_op >= len(instance.disks):
6262 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6264 (disk_op, len(instance.disks)))
6268 def Exec(self, feedback_fn):
6269 """Modifies an instance.
6271 All parameters take effect only at the next restart of the instance.
6274 # Process here the warnings from CheckPrereq, as we don't have a
6275 # feedback_fn there.
6276 for warn in self.warn:
6277 feedback_fn("WARNING: %s" % warn)
6280 instance = self.instance
6281 cluster = self.cluster
6283 for disk_op, disk_dict in self.op.disks:
6284 if disk_op == constants.DDM_REMOVE:
6285 # remove the last disk
6286 device = instance.disks.pop()
6287 device_idx = len(instance.disks)
6288 for node, disk in device.ComputeNodeTree(instance.primary_node):
6289 self.cfg.SetDiskID(disk, node)
6290 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6292 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6293 " continuing anyway", device_idx, node, msg)
6294 result.append(("disk/%d" % device_idx, "remove"))
6295 elif disk_op == constants.DDM_ADD:
6297 if instance.disk_template == constants.DT_FILE:
6298 file_driver, file_path = instance.disks[0].logical_id
6299 file_path = os.path.dirname(file_path)
6301 file_driver = file_path = None
6302 disk_idx_base = len(instance.disks)
6303 new_disk = _GenerateDiskTemplate(self,
6304 instance.disk_template,
6305 instance.name, instance.primary_node,
6306 instance.secondary_nodes,
6311 instance.disks.append(new_disk)
6312 info = _GetInstanceInfoText(instance)
6314 logging.info("Creating volume %s for instance %s",
6315 new_disk.iv_name, instance.name)
6316 # Note: this needs to be kept in sync with _CreateDisks
6318 for node in instance.all_nodes:
6319 f_create = node == instance.primary_node
6321 _CreateBlockDev(self, node, instance, new_disk,
6322 f_create, info, f_create)
6323 except errors.OpExecError, err:
6324 self.LogWarning("Failed to create volume %s (%s) on"
6326 new_disk.iv_name, new_disk, node, err)
6327 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6328 (new_disk.size, new_disk.mode)))
6330 # change a given disk
6331 instance.disks[disk_op].mode = disk_dict['mode']
6332 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6334 for nic_op, nic_dict in self.op.nics:
6335 if nic_op == constants.DDM_REMOVE:
6336 # remove the last nic
6337 del instance.nics[-1]
6338 result.append(("nic.%d" % len(instance.nics), "remove"))
6339 elif nic_op == constants.DDM_ADD:
6340 # mac and bridge should be set, by now
6341 mac = nic_dict['mac']
6342 ip = nic_dict.get('ip', None)
6343 nicparams = self.nic_pinst[constants.DDM_ADD]
6344 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6345 instance.nics.append(new_nic)
6346 result.append(("nic.%d" % (len(instance.nics) - 1),
6347 "add:mac=%s,ip=%s,mode=%s,link=%s" %
6348 (new_nic.mac, new_nic.ip,
6349 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6350 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6353 for key in 'mac', 'ip':
6355 setattr(instance.nics[nic_op], key, nic_dict[key])
6356 if nic_op in self.nic_pnew:
6357 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6358 for key, val in nic_dict.iteritems():
6359 result.append(("nic.%s/%d" % (key, nic_op), val))
6362 if self.op.hvparams:
6363 instance.hvparams = self.hv_inst
6364 for key, val in self.op.hvparams.iteritems():
6365 result.append(("hv/%s" % key, val))
6368 if self.op.beparams:
6369 instance.beparams = self.be_inst
6370 for key, val in self.op.beparams.iteritems():
6371 result.append(("be/%s" % key, val))
6373 self.cfg.Update(instance)
6378 class LUQueryExports(NoHooksLU):
6379 """Query the exports list
6382 _OP_REQP = ['nodes']
6385 def ExpandNames(self):
6386 self.needed_locks = {}
6387 self.share_locks[locking.LEVEL_NODE] = 1
6388 if not self.op.nodes:
6389 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6391 self.needed_locks[locking.LEVEL_NODE] = \
6392 _GetWantedNodes(self, self.op.nodes)
6394 def CheckPrereq(self):
6395 """Check prerequisites.
6398 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6400 def Exec(self, feedback_fn):
6401 """Compute the list of all the exported system images.
6404 @return: a dictionary with the structure node->(export-list)
6405 where export-list is a list of the instances exported on
6409 rpcresult = self.rpc.call_export_list(self.nodes)
6411 for node in rpcresult:
6412 if rpcresult[node].fail_msg:
6413 result[node] = False
6415 result[node] = rpcresult[node].payload
6420 class LUExportInstance(LogicalUnit):
6421 """Export an instance to an image in the cluster.
6424 HPATH = "instance-export"
6425 HTYPE = constants.HTYPE_INSTANCE
6426 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6429 def ExpandNames(self):
6430 self._ExpandAndLockInstance()
6431 # FIXME: lock only instance primary and destination node
6433 # Sad but true, for now we have do lock all nodes, as we don't know where
6434 # the previous export might be, and and in this LU we search for it and
6435 # remove it from its current node. In the future we could fix this by:
6436 # - making a tasklet to search (share-lock all), then create the new one,
6437 # then one to remove, after
6438 # - removing the removal operation altoghether
6439 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6441 def DeclareLocks(self, level):
6442 """Last minute lock declaration."""
6443 # All nodes are locked anyway, so nothing to do here.
6445 def BuildHooksEnv(self):
6448 This will run on the master, primary node and target node.
6452 "EXPORT_NODE": self.op.target_node,
6453 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6455 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6456 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6457 self.op.target_node]
6460 def CheckPrereq(self):
6461 """Check prerequisites.
6463 This checks that the instance and node names are valid.
6466 instance_name = self.op.instance_name
6467 self.instance = self.cfg.GetInstanceInfo(instance_name)
6468 assert self.instance is not None, \
6469 "Cannot retrieve locked instance %s" % self.op.instance_name
6470 _CheckNodeOnline(self, self.instance.primary_node)
6472 self.dst_node = self.cfg.GetNodeInfo(
6473 self.cfg.ExpandNodeName(self.op.target_node))
6475 if self.dst_node is None:
6476 # This is wrong node name, not a non-locked node
6477 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6478 _CheckNodeOnline(self, self.dst_node.name)
6479 _CheckNodeNotDrained(self, self.dst_node.name)
6481 # instance disk type verification
6482 for disk in self.instance.disks:
6483 if disk.dev_type == constants.LD_FILE:
6484 raise errors.OpPrereqError("Export not supported for instances with"
6485 " file-based disks")
6487 def Exec(self, feedback_fn):
6488 """Export an instance to an image in the cluster.
6491 instance = self.instance
6492 dst_node = self.dst_node
6493 src_node = instance.primary_node
6494 if self.op.shutdown:
6495 # shutdown the instance, but not the disks
6496 result = self.rpc.call_instance_shutdown(src_node, instance)
6497 result.Raise("Could not shutdown instance %s on"
6498 " node %s" % (instance.name, src_node))
6500 vgname = self.cfg.GetVGName()
6504 # set the disks ID correctly since call_instance_start needs the
6505 # correct drbd minor to create the symlinks
6506 for disk in instance.disks:
6507 self.cfg.SetDiskID(disk, src_node)
6510 for idx, disk in enumerate(instance.disks):
6511 # result.payload will be a snapshot of an lvm leaf of the one we passed
6512 result = self.rpc.call_blockdev_snapshot(src_node, disk)
6513 msg = result.fail_msg
6515 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6517 snap_disks.append(False)
6519 disk_id = (vgname, result.payload)
6520 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6521 logical_id=disk_id, physical_id=disk_id,
6522 iv_name=disk.iv_name)
6523 snap_disks.append(new_dev)
6526 if self.op.shutdown and instance.admin_up:
6527 result = self.rpc.call_instance_start(src_node, instance, None, None)
6528 msg = result.fail_msg
6530 _ShutdownInstanceDisks(self, instance)
6531 raise errors.OpExecError("Could not start instance: %s" % msg)
6533 # TODO: check for size
6535 cluster_name = self.cfg.GetClusterName()
6536 for idx, dev in enumerate(snap_disks):
6538 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6539 instance, cluster_name, idx)
6540 msg = result.fail_msg
6542 self.LogWarning("Could not export disk/%s from node %s to"
6543 " node %s: %s", idx, src_node, dst_node.name, msg)
6544 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6546 self.LogWarning("Could not remove snapshot for disk/%d from node"
6547 " %s: %s", idx, src_node, msg)
6549 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6550 msg = result.fail_msg
6552 self.LogWarning("Could not finalize export for instance %s"
6553 " on node %s: %s", instance.name, dst_node.name, msg)
6555 nodelist = self.cfg.GetNodeList()
6556 nodelist.remove(dst_node.name)
6558 # on one-node clusters nodelist will be empty after the removal
6559 # if we proceed the backup would be removed because OpQueryExports
6560 # substitutes an empty list with the full cluster node list.
6561 iname = instance.name
6563 exportlist = self.rpc.call_export_list(nodelist)
6564 for node in exportlist:
6565 if exportlist[node].fail_msg:
6567 if iname in exportlist[node].payload:
6568 msg = self.rpc.call_export_remove(node, iname).fail_msg
6570 self.LogWarning("Could not remove older export for instance %s"
6571 " on node %s: %s", iname, node, msg)
6574 class LURemoveExport(NoHooksLU):
6575 """Remove exports related to the named instance.
6578 _OP_REQP = ["instance_name"]
6581 def ExpandNames(self):
6582 self.needed_locks = {}
6583 # We need all nodes to be locked in order for RemoveExport to work, but we
6584 # don't need to lock the instance itself, as nothing will happen to it (and
6585 # we can remove exports also for a removed instance)
6586 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6588 def CheckPrereq(self):
6589 """Check prerequisites.
6593 def Exec(self, feedback_fn):
6594 """Remove any export.
6597 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6598 # If the instance was not found we'll try with the name that was passed in.
6599 # This will only work if it was an FQDN, though.
6601 if not instance_name:
6603 instance_name = self.op.instance_name
6605 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6606 exportlist = self.rpc.call_export_list(locked_nodes)
6608 for node in exportlist:
6609 msg = exportlist[node].fail_msg
6611 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6613 if instance_name in exportlist[node].payload:
6615 result = self.rpc.call_export_remove(node, instance_name)
6616 msg = result.fail_msg
6618 logging.error("Could not remove export for instance %s"
6619 " on node %s: %s", instance_name, node, msg)
6621 if fqdn_warn and not found:
6622 feedback_fn("Export not found. If trying to remove an export belonging"
6623 " to a deleted instance please use its Fully Qualified"
6627 class TagsLU(NoHooksLU):
6630 This is an abstract class which is the parent of all the other tags LUs.
6634 def ExpandNames(self):
6635 self.needed_locks = {}
6636 if self.op.kind == constants.TAG_NODE:
6637 name = self.cfg.ExpandNodeName(self.op.name)
6639 raise errors.OpPrereqError("Invalid node name (%s)" %
6642 self.needed_locks[locking.LEVEL_NODE] = name
6643 elif self.op.kind == constants.TAG_INSTANCE:
6644 name = self.cfg.ExpandInstanceName(self.op.name)
6646 raise errors.OpPrereqError("Invalid instance name (%s)" %
6649 self.needed_locks[locking.LEVEL_INSTANCE] = name
6651 def CheckPrereq(self):
6652 """Check prerequisites.
6655 if self.op.kind == constants.TAG_CLUSTER:
6656 self.target = self.cfg.GetClusterInfo()
6657 elif self.op.kind == constants.TAG_NODE:
6658 self.target = self.cfg.GetNodeInfo(self.op.name)
6659 elif self.op.kind == constants.TAG_INSTANCE:
6660 self.target = self.cfg.GetInstanceInfo(self.op.name)
6662 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6666 class LUGetTags(TagsLU):
6667 """Returns the tags of a given object.
6670 _OP_REQP = ["kind", "name"]
6673 def Exec(self, feedback_fn):
6674 """Returns the tag list.
6677 return list(self.target.GetTags())
6680 class LUSearchTags(NoHooksLU):
6681 """Searches the tags for a given pattern.
6684 _OP_REQP = ["pattern"]
6687 def ExpandNames(self):
6688 self.needed_locks = {}
6690 def CheckPrereq(self):
6691 """Check prerequisites.
6693 This checks the pattern passed for validity by compiling it.
6697 self.re = re.compile(self.op.pattern)
6698 except re.error, err:
6699 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6700 (self.op.pattern, err))
6702 def Exec(self, feedback_fn):
6703 """Returns the tag list.
6707 tgts = [("/cluster", cfg.GetClusterInfo())]
6708 ilist = cfg.GetAllInstancesInfo().values()
6709 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6710 nlist = cfg.GetAllNodesInfo().values()
6711 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6713 for path, target in tgts:
6714 for tag in target.GetTags():
6715 if self.re.search(tag):
6716 results.append((path, tag))
6720 class LUAddTags(TagsLU):
6721 """Sets a tag on a given object.
6724 _OP_REQP = ["kind", "name", "tags"]
6727 def CheckPrereq(self):
6728 """Check prerequisites.
6730 This checks the type and length of the tag name and value.
6733 TagsLU.CheckPrereq(self)
6734 for tag in self.op.tags:
6735 objects.TaggableObject.ValidateTag(tag)
6737 def Exec(self, feedback_fn):
6742 for tag in self.op.tags:
6743 self.target.AddTag(tag)
6744 except errors.TagError, err:
6745 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6747 self.cfg.Update(self.target)
6748 except errors.ConfigurationError:
6749 raise errors.OpRetryError("There has been a modification to the"
6750 " config file and the operation has been"
6751 " aborted. Please retry.")
6754 class LUDelTags(TagsLU):
6755 """Delete a list of tags from a given object.
6758 _OP_REQP = ["kind", "name", "tags"]
6761 def CheckPrereq(self):
6762 """Check prerequisites.
6764 This checks that we have the given tag.
6767 TagsLU.CheckPrereq(self)
6768 for tag in self.op.tags:
6769 objects.TaggableObject.ValidateTag(tag)
6770 del_tags = frozenset(self.op.tags)
6771 cur_tags = self.target.GetTags()
6772 if not del_tags <= cur_tags:
6773 diff_tags = del_tags - cur_tags
6774 diff_names = ["'%s'" % tag for tag in diff_tags]
6776 raise errors.OpPrereqError("Tag(s) %s not found" %
6777 (",".join(diff_names)))
6779 def Exec(self, feedback_fn):
6780 """Remove the tag from the object.
6783 for tag in self.op.tags:
6784 self.target.RemoveTag(tag)
6786 self.cfg.Update(self.target)
6787 except errors.ConfigurationError:
6788 raise errors.OpRetryError("There has been a modification to the"
6789 " config file and the operation has been"
6790 " aborted. Please retry.")
6793 class LUTestDelay(NoHooksLU):
6794 """Sleep for a specified amount of time.
6796 This LU sleeps on the master and/or nodes for a specified amount of
6800 _OP_REQP = ["duration", "on_master", "on_nodes"]
6803 def ExpandNames(self):
6804 """Expand names and set required locks.
6806 This expands the node list, if any.
6809 self.needed_locks = {}
6810 if self.op.on_nodes:
6811 # _GetWantedNodes can be used here, but is not always appropriate to use
6812 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6814 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6815 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6817 def CheckPrereq(self):
6818 """Check prerequisites.
6822 def Exec(self, feedback_fn):
6823 """Do the actual sleep.
6826 if self.op.on_master:
6827 if not utils.TestDelay(self.op.duration):
6828 raise errors.OpExecError("Error during master delay test")
6829 if self.op.on_nodes:
6830 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6831 for node, node_result in result.items():
6832 node_result.Raise("Failure during rpc call to node %s" % node)
6835 class IAllocator(object):
6836 """IAllocator framework.
6838 An IAllocator instance has three sets of attributes:
6839 - cfg that is needed to query the cluster
6840 - input data (all members of the _KEYS class attribute are required)
6841 - four buffer attributes (in|out_data|text), that represent the
6842 input (to the external script) in text and data structure format,
6843 and the output from it, again in two formats
6844 - the result variables from the script (success, info, nodes) for
6849 "mem_size", "disks", "disk_template",
6850 "os", "tags", "nics", "vcpus", "hypervisor",
6856 def __init__(self, lu, mode, name, **kwargs):
6858 # init buffer variables
6859 self.in_text = self.out_text = self.in_data = self.out_data = None
6860 # init all input fields so that pylint is happy
6863 self.mem_size = self.disks = self.disk_template = None
6864 self.os = self.tags = self.nics = self.vcpus = None
6865 self.hypervisor = None
6866 self.relocate_from = None
6868 self.required_nodes = None
6869 # init result fields
6870 self.success = self.info = self.nodes = None
6871 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6872 keyset = self._ALLO_KEYS
6873 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6874 keyset = self._RELO_KEYS
6876 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6877 " IAllocator" % self.mode)
6879 if key not in keyset:
6880 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6881 " IAllocator" % key)
6882 setattr(self, key, kwargs[key])
6884 if key not in kwargs:
6885 raise errors.ProgrammerError("Missing input parameter '%s' to"
6886 " IAllocator" % key)
6887 self._BuildInputData()
6889 def _ComputeClusterData(self):
6890 """Compute the generic allocator input data.
6892 This is the data that is independent of the actual operation.
6896 cluster_info = cfg.GetClusterInfo()
6899 "version": constants.IALLOCATOR_VERSION,
6900 "cluster_name": cfg.GetClusterName(),
6901 "cluster_tags": list(cluster_info.GetTags()),
6902 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6903 # we don't have job IDs
6905 iinfo = cfg.GetAllInstancesInfo().values()
6906 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6910 node_list = cfg.GetNodeList()
6912 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6913 hypervisor_name = self.hypervisor
6914 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6915 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6917 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6919 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6920 cluster_info.enabled_hypervisors)
6921 for nname, nresult in node_data.items():
6922 # first fill in static (config-based) values
6923 ninfo = cfg.GetNodeInfo(nname)
6925 "tags": list(ninfo.GetTags()),
6926 "primary_ip": ninfo.primary_ip,
6927 "secondary_ip": ninfo.secondary_ip,
6928 "offline": ninfo.offline,
6929 "drained": ninfo.drained,
6930 "master_candidate": ninfo.master_candidate,
6933 if not ninfo.offline:
6934 nresult.Raise("Can't get data for node %s" % nname)
6935 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
6937 remote_info = nresult.payload
6938 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6939 'vg_size', 'vg_free', 'cpu_total']:
6940 if attr not in remote_info:
6941 raise errors.OpExecError("Node '%s' didn't return attribute"
6942 " '%s'" % (nname, attr))
6943 if not isinstance(remote_info[attr], int):
6944 raise errors.OpExecError("Node '%s' returned invalid value"
6946 (nname, attr, remote_info[attr]))
6947 # compute memory used by primary instances
6948 i_p_mem = i_p_up_mem = 0
6949 for iinfo, beinfo in i_list:
6950 if iinfo.primary_node == nname:
6951 i_p_mem += beinfo[constants.BE_MEMORY]
6952 if iinfo.name not in node_iinfo[nname].payload:
6955 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6956 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6957 remote_info['memory_free'] -= max(0, i_mem_diff)
6960 i_p_up_mem += beinfo[constants.BE_MEMORY]
6962 # compute memory used by instances
6964 "total_memory": remote_info['memory_total'],
6965 "reserved_memory": remote_info['memory_dom0'],
6966 "free_memory": remote_info['memory_free'],
6967 "total_disk": remote_info['vg_size'],
6968 "free_disk": remote_info['vg_free'],
6969 "total_cpus": remote_info['cpu_total'],
6970 "i_pri_memory": i_p_mem,
6971 "i_pri_up_memory": i_p_up_mem,
6975 node_results[nname] = pnr
6976 data["nodes"] = node_results
6980 for iinfo, beinfo in i_list:
6982 for nic in iinfo.nics:
6983 filled_params = objects.FillDict(
6984 cluster_info.nicparams[constants.PP_DEFAULT],
6986 nic_dict = {"mac": nic.mac,
6988 "mode": filled_params[constants.NIC_MODE],
6989 "link": filled_params[constants.NIC_LINK],
6991 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6992 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6993 nic_data.append(nic_dict)
6995 "tags": list(iinfo.GetTags()),
6996 "admin_up": iinfo.admin_up,
6997 "vcpus": beinfo[constants.BE_VCPUS],
6998 "memory": beinfo[constants.BE_MEMORY],
7000 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7002 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7003 "disk_template": iinfo.disk_template,
7004 "hypervisor": iinfo.hypervisor,
7006 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7008 instance_data[iinfo.name] = pir
7010 data["instances"] = instance_data
7014 def _AddNewInstance(self):
7015 """Add new instance data to allocator structure.
7017 This in combination with _AllocatorGetClusterData will create the
7018 correct structure needed as input for the allocator.
7020 The checks for the completeness of the opcode must have already been
7026 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7028 if self.disk_template in constants.DTS_NET_MIRROR:
7029 self.required_nodes = 2
7031 self.required_nodes = 1
7035 "disk_template": self.disk_template,
7038 "vcpus": self.vcpus,
7039 "memory": self.mem_size,
7040 "disks": self.disks,
7041 "disk_space_total": disk_space,
7043 "required_nodes": self.required_nodes,
7045 data["request"] = request
7047 def _AddRelocateInstance(self):
7048 """Add relocate instance data to allocator structure.
7050 This in combination with _IAllocatorGetClusterData will create the
7051 correct structure needed as input for the allocator.
7053 The checks for the completeness of the opcode must have already been
7057 instance = self.lu.cfg.GetInstanceInfo(self.name)
7058 if instance is None:
7059 raise errors.ProgrammerError("Unknown instance '%s' passed to"
7060 " IAllocator" % self.name)
7062 if instance.disk_template not in constants.DTS_NET_MIRROR:
7063 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7065 if len(instance.secondary_nodes) != 1:
7066 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7068 self.required_nodes = 1
7069 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7070 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7075 "disk_space_total": disk_space,
7076 "required_nodes": self.required_nodes,
7077 "relocate_from": self.relocate_from,
7079 self.in_data["request"] = request
7081 def _BuildInputData(self):
7082 """Build input data structures.
7085 self._ComputeClusterData()
7087 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7088 self._AddNewInstance()
7090 self._AddRelocateInstance()
7092 self.in_text = serializer.Dump(self.in_data)
7094 def Run(self, name, validate=True, call_fn=None):
7095 """Run an instance allocator and return the results.
7099 call_fn = self.lu.rpc.call_iallocator_runner
7102 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7103 result.Raise("Failure while running the iallocator script")
7105 self.out_text = result.payload
7107 self._ValidateResult()
7109 def _ValidateResult(self):
7110 """Process the allocator results.
7112 This will process and if successful save the result in
7113 self.out_data and the other parameters.
7117 rdict = serializer.Load(self.out_text)
7118 except Exception, err:
7119 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7121 if not isinstance(rdict, dict):
7122 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7124 for key in "success", "info", "nodes":
7125 if key not in rdict:
7126 raise errors.OpExecError("Can't parse iallocator results:"
7127 " missing key '%s'" % key)
7128 setattr(self, key, rdict[key])
7130 if not isinstance(rdict["nodes"], list):
7131 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7133 self.out_data = rdict
7136 class LUTestAllocator(NoHooksLU):
7137 """Run allocator tests.
7139 This LU runs the allocator tests
7142 _OP_REQP = ["direction", "mode", "name"]
7144 def CheckPrereq(self):
7145 """Check prerequisites.
7147 This checks the opcode parameters depending on the director and mode test.
7150 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7151 for attr in ["name", "mem_size", "disks", "disk_template",
7152 "os", "tags", "nics", "vcpus"]:
7153 if not hasattr(self.op, attr):
7154 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7156 iname = self.cfg.ExpandInstanceName(self.op.name)
7157 if iname is not None:
7158 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7160 if not isinstance(self.op.nics, list):
7161 raise errors.OpPrereqError("Invalid parameter 'nics'")
7162 for row in self.op.nics:
7163 if (not isinstance(row, dict) or
7166 "bridge" not in row):
7167 raise errors.OpPrereqError("Invalid contents of the"
7168 " 'nics' parameter")
7169 if not isinstance(self.op.disks, list):
7170 raise errors.OpPrereqError("Invalid parameter 'disks'")
7171 for row in self.op.disks:
7172 if (not isinstance(row, dict) or
7173 "size" not in row or
7174 not isinstance(row["size"], int) or
7175 "mode" not in row or
7176 row["mode"] not in ['r', 'w']):
7177 raise errors.OpPrereqError("Invalid contents of the"
7178 " 'disks' parameter")
7179 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7180 self.op.hypervisor = self.cfg.GetHypervisorType()
7181 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7182 if not hasattr(self.op, "name"):
7183 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7184 fname = self.cfg.ExpandInstanceName(self.op.name)
7186 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7188 self.op.name = fname
7189 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7191 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7194 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7195 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7196 raise errors.OpPrereqError("Missing allocator name")
7197 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7198 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7201 def Exec(self, feedback_fn):
7202 """Run the allocator test.
7205 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7206 ial = IAllocator(self,
7209 mem_size=self.op.mem_size,
7210 disks=self.op.disks,
7211 disk_template=self.op.disk_template,
7215 vcpus=self.op.vcpus,
7216 hypervisor=self.op.hypervisor,
7219 ial = IAllocator(self,
7222 relocate_from=list(self.relocate_from),
7225 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7226 result = ial.in_text
7228 ial.Run(self.op.allocator, validate=False)
7229 result = ial.out_text