4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
96 for attr_name in self._OP_REQP:
97 attr_val = getattr(op, attr_name, None)
99 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 self.CheckArguments()
104 """Returns the SshRunner object
108 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
111 ssh = property(fget=__GetSSH)
113 def CheckArguments(self):
114 """Check syntactic validity for the opcode arguments.
116 This method is for doing a simple syntactic check and ensure
117 validity of opcode parameters, without any cluster-related
118 checks. While the same can be accomplished in ExpandNames and/or
119 CheckPrereq, doing these separate is better because:
121 - ExpandNames is left as as purely a lock-related function
122 - CheckPrereq is run after we have acquired locks (and possible
125 The function is allowed to change the self.op attribute so that
126 later methods can no longer worry about missing parameters.
131 def ExpandNames(self):
132 """Expand names for this LU.
134 This method is called before starting to execute the opcode, and it should
135 update all the parameters of the opcode to their canonical form (e.g. a
136 short node name must be fully expanded after this method has successfully
137 completed). This way locking, hooks, logging, ecc. can work correctly.
139 LUs which implement this method must also populate the self.needed_locks
140 member, as a dict with lock levels as keys, and a list of needed lock names
143 - use an empty dict if you don't need any lock
144 - if you don't need any lock at a particular level omit that level
145 - don't put anything for the BGL level
146 - if you want all locks at a level use locking.ALL_SET as a value
148 If you need to share locks (rather than acquire them exclusively) at one
149 level you can modify self.share_locks, setting a true value (usually 1) for
150 that level. By default locks are not shared.
154 # Acquire all nodes and one instance
155 self.needed_locks = {
156 locking.LEVEL_NODE: locking.ALL_SET,
157 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
159 # Acquire just two nodes
160 self.needed_locks = {
161 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
164 self.needed_locks = {} # No, you can't leave it to the default value None
167 # The implementation of this method is mandatory only if the new LU is
168 # concurrent, so that old LUs don't need to be changed all at the same
171 self.needed_locks = {} # Exclusive LUs don't need locks.
173 raise NotImplementedError
175 def DeclareLocks(self, level):
176 """Declare LU locking needs for a level
178 While most LUs can just declare their locking needs at ExpandNames time,
179 sometimes there's the need to calculate some locks after having acquired
180 the ones before. This function is called just before acquiring locks at a
181 particular level, but after acquiring the ones at lower levels, and permits
182 such calculations. It can be used to modify self.needed_locks, and by
183 default it does nothing.
185 This function is only called if you have something already set in
186 self.needed_locks for the level.
188 @param level: Locking level which is going to be locked
189 @type level: member of ganeti.locking.LEVELS
193 def CheckPrereq(self):
194 """Check prerequisites for this LU.
196 This method should check that the prerequisites for the execution
197 of this LU are fulfilled. It can do internode communication, but
198 it should be idempotent - no cluster or system changes are
201 The method should raise errors.OpPrereqError in case something is
202 not fulfilled. Its return value is ignored.
204 This method should also update all the parameters of the opcode to
205 their canonical form if it hasn't been done by ExpandNames before.
208 raise NotImplementedError
210 def Exec(self, feedback_fn):
213 This method should implement the actual work. It should raise
214 errors.OpExecError for failures that are somewhat dealt with in
218 raise NotImplementedError
220 def BuildHooksEnv(self):
221 """Build hooks environment for this LU.
223 This method should return a three-node tuple consisting of: a dict
224 containing the environment that will be used for running the
225 specific hook for this LU, a list of node names on which the hook
226 should run before the execution, and a list of node names on which
227 the hook should run after the execution.
229 The keys of the dict must not have 'GANETI_' prefixed as this will
230 be handled in the hooks runner. Also note additional keys will be
231 added by the hooks runner. If the LU doesn't define any
232 environment, an empty dict (and not None) should be returned.
234 No nodes should be returned as an empty list (and not None).
236 Note that if the HPATH for a LU class is None, this function will
240 raise NotImplementedError
242 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
243 """Notify the LU about the results of its hooks.
245 This method is called every time a hooks phase is executed, and notifies
246 the Logical Unit about the hooks' result. The LU can then use it to alter
247 its result based on the hooks. By default the method does nothing and the
248 previous result is passed back unchanged but any LU can define it if it
249 wants to use the local cluster hook-scripts somehow.
251 @param phase: one of L{constants.HOOKS_PHASE_POST} or
252 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
253 @param hook_results: the results of the multi-node hooks rpc call
254 @param feedback_fn: function used send feedback back to the caller
255 @param lu_result: the previous Exec result this LU had, or None
257 @return: the new Exec result, based on the previous result
263 def _ExpandAndLockInstance(self):
264 """Helper function to expand and lock an instance.
266 Many LUs that work on an instance take its name in self.op.instance_name
267 and need to expand it and then declare the expanded name for locking. This
268 function does it, and then updates self.op.instance_name to the expanded
269 name. It also initializes needed_locks as a dict, if this hasn't been done
273 if self.needed_locks is None:
274 self.needed_locks = {}
276 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
277 "_ExpandAndLockInstance called with instance-level locks set"
278 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
279 if expanded_name is None:
280 raise errors.OpPrereqError("Instance '%s' not known" %
281 self.op.instance_name)
282 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
283 self.op.instance_name = expanded_name
285 def _LockInstancesNodes(self, primary_only=False):
286 """Helper function to declare instances' nodes for locking.
288 This function should be called after locking one or more instances to lock
289 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
290 with all primary or secondary nodes for instances already locked and
291 present in self.needed_locks[locking.LEVEL_INSTANCE].
293 It should be called from DeclareLocks, and for safety only works if
294 self.recalculate_locks[locking.LEVEL_NODE] is set.
296 In the future it may grow parameters to just lock some instance's nodes, or
297 to just lock primaries or secondary nodes, if needed.
299 If should be called in DeclareLocks in a way similar to::
301 if level == locking.LEVEL_NODE:
302 self._LockInstancesNodes()
304 @type primary_only: boolean
305 @param primary_only: only lock primary nodes of locked instances
308 assert locking.LEVEL_NODE in self.recalculate_locks, \
309 "_LockInstancesNodes helper function called with no nodes to recalculate"
311 # TODO: check if we're really been called with the instance locks held
313 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
314 # future we might want to have different behaviors depending on the value
315 # of self.recalculate_locks[locking.LEVEL_NODE]
317 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
318 instance = self.context.cfg.GetInstanceInfo(instance_name)
319 wanted_nodes.append(instance.primary_node)
321 wanted_nodes.extend(instance.secondary_nodes)
323 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
324 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
325 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
326 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
328 del self.recalculate_locks[locking.LEVEL_NODE]
331 class NoHooksLU(LogicalUnit):
332 """Simple LU which runs no hooks.
334 This LU is intended as a parent for other LogicalUnits which will
335 run no hooks, in order to reduce duplicate code.
342 def _GetWantedNodes(lu, nodes):
343 """Returns list of checked and expanded node names.
345 @type lu: L{LogicalUnit}
346 @param lu: the logical unit on whose behalf we execute
348 @param nodes: list of node names or None for all nodes
350 @return: the list of nodes, sorted
351 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
354 if not isinstance(nodes, list):
355 raise errors.OpPrereqError("Invalid argument type 'nodes'")
358 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
359 " non-empty list of nodes whose name is to be expanded.")
363 node = lu.cfg.ExpandNodeName(name)
365 raise errors.OpPrereqError("No such node name '%s'" % name)
368 return utils.NiceSort(wanted)
371 def _GetWantedInstances(lu, instances):
372 """Returns list of checked and expanded instance names.
374 @type lu: L{LogicalUnit}
375 @param lu: the logical unit on whose behalf we execute
376 @type instances: list
377 @param instances: list of instance names or None for all instances
379 @return: the list of instances, sorted
380 @raise errors.OpPrereqError: if the instances parameter is wrong type
381 @raise errors.OpPrereqError: if any of the passed instances is not found
384 if not isinstance(instances, list):
385 raise errors.OpPrereqError("Invalid argument type 'instances'")
390 for name in instances:
391 instance = lu.cfg.ExpandInstanceName(name)
393 raise errors.OpPrereqError("No such instance name '%s'" % name)
394 wanted.append(instance)
397 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
401 def _CheckOutputFields(static, dynamic, selected):
402 """Checks whether all selected fields are valid.
404 @type static: L{utils.FieldSet}
405 @param static: static fields set
406 @type dynamic: L{utils.FieldSet}
407 @param dynamic: dynamic fields set
414 delta = f.NonMatching(selected)
416 raise errors.OpPrereqError("Unknown output fields selected: %s"
420 def _CheckBooleanOpField(op, name):
421 """Validates boolean opcode parameters.
423 This will ensure that an opcode parameter is either a boolean value,
424 or None (but that it always exists).
427 val = getattr(op, name, None)
428 if not (val is None or isinstance(val, bool)):
429 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
431 setattr(op, name, val)
434 def _CheckNodeOnline(lu, node):
435 """Ensure that a given node is online.
437 @param lu: the LU on behalf of which we make the check
438 @param node: the node to check
439 @raise errors.OpPrereqError: if the node is offline
442 if lu.cfg.GetNodeInfo(node).offline:
443 raise errors.OpPrereqError("Can't use offline node %s" % node)
446 def _CheckNodeNotDrained(lu, node):
447 """Ensure that a given node is not drained.
449 @param lu: the LU on behalf of which we make the check
450 @param node: the node to check
451 @raise errors.OpPrereqError: if the node is drained
454 if lu.cfg.GetNodeInfo(node).drained:
455 raise errors.OpPrereqError("Can't use drained node %s" % node)
458 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
459 memory, vcpus, nics, disk_template, disks,
460 bep, hvp, hypervisor_name):
461 """Builds instance related env variables for hooks
463 This builds the hook environment from individual variables.
466 @param name: the name of the instance
467 @type primary_node: string
468 @param primary_node: the name of the instance's primary node
469 @type secondary_nodes: list
470 @param secondary_nodes: list of secondary nodes as strings
471 @type os_type: string
472 @param os_type: the name of the instance's OS
473 @type status: boolean
474 @param status: the should_run status of the instance
476 @param memory: the memory size of the instance
478 @param vcpus: the count of VCPUs the instance has
480 @param nics: list of tuples (ip, mac, mode, link) representing
481 the NICs the instance has
482 @type disk_template: string
483 @param disk_template: the disk template of the instance
485 @param disks: the list of (size, mode) pairs
487 @param bep: the backend parameters for the instance
489 @param hvp: the hypervisor parameters for the instance
490 @type hypervisor_name: string
491 @param hypervisor_name: the hypervisor for the instance
493 @return: the hook environment for this instance
502 "INSTANCE_NAME": name,
503 "INSTANCE_PRIMARY": primary_node,
504 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
505 "INSTANCE_OS_TYPE": os_type,
506 "INSTANCE_STATUS": str_status,
507 "INSTANCE_MEMORY": memory,
508 "INSTANCE_VCPUS": vcpus,
509 "INSTANCE_DISK_TEMPLATE": disk_template,
510 "INSTANCE_HYPERVISOR": hypervisor_name,
514 nic_count = len(nics)
515 for idx, (ip, mac, mode, link) in enumerate(nics):
518 env["INSTANCE_NIC%d_IP" % idx] = ip
519 env["INSTANCE_NIC%d_MAC" % idx] = mac
520 env["INSTANCE_NIC%d_MODE" % idx] = mode
521 env["INSTANCE_NIC%d_LINK" % idx] = link
522 if mode == constants.NIC_MODE_BRIDGED:
523 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
527 env["INSTANCE_NIC_COUNT"] = nic_count
530 disk_count = len(disks)
531 for idx, (size, mode) in enumerate(disks):
532 env["INSTANCE_DISK%d_SIZE" % idx] = size
533 env["INSTANCE_DISK%d_MODE" % idx] = mode
537 env["INSTANCE_DISK_COUNT"] = disk_count
539 for source, kind in [(bep, "BE"), (hvp, "HV")]:
540 for key, value in source.items():
541 env["INSTANCE_%s_%s" % (kind, key)] = value
545 def _NICListToTuple(lu, nics):
546 """Build a list of nic information tuples.
548 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
549 value in LUQueryInstanceData.
551 @type lu: L{LogicalUnit}
552 @param lu: the logical unit on whose behalf we execute
553 @type nics: list of L{objects.NIC}
554 @param nics: list of nics to convert to hooks tuples
558 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
562 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
563 mode = filled_params[constants.NIC_MODE]
564 link = filled_params[constants.NIC_LINK]
565 hooks_nics.append((ip, mac, mode, link))
568 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
569 """Builds instance related env variables for hooks from an object.
571 @type lu: L{LogicalUnit}
572 @param lu: the logical unit on whose behalf we execute
573 @type instance: L{objects.Instance}
574 @param instance: the instance for which we should build the
577 @param override: dictionary with key/values that will override
580 @return: the hook environment dictionary
583 cluster = lu.cfg.GetClusterInfo()
584 bep = cluster.FillBE(instance)
585 hvp = cluster.FillHV(instance)
587 'name': instance.name,
588 'primary_node': instance.primary_node,
589 'secondary_nodes': instance.secondary_nodes,
590 'os_type': instance.os,
591 'status': instance.admin_up,
592 'memory': bep[constants.BE_MEMORY],
593 'vcpus': bep[constants.BE_VCPUS],
594 'nics': _NICListToTuple(lu, instance.nics),
595 'disk_template': instance.disk_template,
596 'disks': [(disk.size, disk.mode) for disk in instance.disks],
599 'hypervisor_name': instance.hypervisor,
602 args.update(override)
603 return _BuildInstanceHookEnv(**args)
606 def _AdjustCandidatePool(lu):
607 """Adjust the candidate pool after node operations.
610 mod_list = lu.cfg.MaintainCandidatePool()
612 lu.LogInfo("Promoted nodes to master candidate role: %s",
613 ", ".join(node.name for node in mod_list))
614 for name in mod_list:
615 lu.context.ReaddNode(name)
616 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
618 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
622 def _CheckNicsBridgesExist(lu, target_nics, target_node,
623 profile=constants.PP_DEFAULT):
624 """Check that the brigdes needed by a list of nics exist.
627 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
628 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
629 for nic in target_nics]
630 brlist = [params[constants.NIC_LINK] for params in paramslist
631 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
633 result = lu.rpc.call_bridges_exist(target_node, brlist)
634 result.Raise("Error checking bridges on destination node '%s'" %
635 target_node, prereq=True)
638 def _CheckInstanceBridgesExist(lu, instance, node=None):
639 """Check that the brigdes needed by an instance exist.
643 node = instance.primary_node
644 _CheckNicsBridgesExist(lu, instance.nics, node)
647 class LUDestroyCluster(NoHooksLU):
648 """Logical unit for destroying the cluster.
653 def CheckPrereq(self):
654 """Check prerequisites.
656 This checks whether the cluster is empty.
658 Any errors are signaled by raising errors.OpPrereqError.
661 master = self.cfg.GetMasterNode()
663 nodelist = self.cfg.GetNodeList()
664 if len(nodelist) != 1 or nodelist[0] != master:
665 raise errors.OpPrereqError("There are still %d node(s) in"
666 " this cluster." % (len(nodelist) - 1))
667 instancelist = self.cfg.GetInstanceList()
669 raise errors.OpPrereqError("There are still %d instance(s) in"
670 " this cluster." % len(instancelist))
672 def Exec(self, feedback_fn):
673 """Destroys the cluster.
676 master = self.cfg.GetMasterNode()
677 result = self.rpc.call_node_stop_master(master, False)
678 result.Raise("Could not disable the master role")
679 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
680 utils.CreateBackup(priv_key)
681 utils.CreateBackup(pub_key)
685 class LUVerifyCluster(LogicalUnit):
686 """Verifies the cluster status.
689 HPATH = "cluster-verify"
690 HTYPE = constants.HTYPE_CLUSTER
691 _OP_REQP = ["skip_checks"]
694 def ExpandNames(self):
695 self.needed_locks = {
696 locking.LEVEL_NODE: locking.ALL_SET,
697 locking.LEVEL_INSTANCE: locking.ALL_SET,
699 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
701 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
702 node_result, feedback_fn, master_files,
704 """Run multiple tests against a node.
708 - compares ganeti version
709 - checks vg existence and size > 20G
710 - checks config file checksum
711 - checks ssh to other nodes
713 @type nodeinfo: L{objects.Node}
714 @param nodeinfo: the node to check
715 @param file_list: required list of files
716 @param local_cksum: dictionary of local files and their checksums
717 @param node_result: the results from the node
718 @param feedback_fn: function used to accumulate results
719 @param master_files: list of files that only masters should have
720 @param drbd_map: the useddrbd minors for this node, in
721 form of minor: (instance, must_exist) which correspond to instances
722 and their running status
723 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
728 # main result, node_result should be a non-empty dict
729 if not node_result or not isinstance(node_result, dict):
730 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
733 # compares ganeti version
734 local_version = constants.PROTOCOL_VERSION
735 remote_version = node_result.get('version', None)
736 if not (remote_version and isinstance(remote_version, (list, tuple)) and
737 len(remote_version) == 2):
738 feedback_fn(" - ERROR: connection to %s failed" % (node))
741 if local_version != remote_version[0]:
742 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
743 " node %s %s" % (local_version, node, remote_version[0]))
746 # node seems compatible, we can actually try to look into its results
750 # full package version
751 if constants.RELEASE_VERSION != remote_version[1]:
752 feedback_fn(" - WARNING: software version mismatch: master %s,"
754 (constants.RELEASE_VERSION, node, remote_version[1]))
756 # checks vg existence and size > 20G
757 if vg_name is not None:
758 vglist = node_result.get(constants.NV_VGLIST, None)
760 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
764 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
765 constants.MIN_VG_SIZE)
767 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
770 # checks config file checksum
772 remote_cksum = node_result.get(constants.NV_FILELIST, None)
773 if not isinstance(remote_cksum, dict):
775 feedback_fn(" - ERROR: node hasn't returned file checksum data")
777 for file_name in file_list:
778 node_is_mc = nodeinfo.master_candidate
779 must_have_file = file_name not in master_files
780 if file_name not in remote_cksum:
781 if node_is_mc or must_have_file:
783 feedback_fn(" - ERROR: file '%s' missing" % file_name)
784 elif remote_cksum[file_name] != local_cksum[file_name]:
785 if node_is_mc or must_have_file:
787 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
789 # not candidate and this is not a must-have file
791 feedback_fn(" - ERROR: file '%s' should not exist on non master"
792 " candidates (and the file is outdated)" % file_name)
794 # all good, except non-master/non-must have combination
795 if not node_is_mc and not must_have_file:
796 feedback_fn(" - ERROR: file '%s' should not exist on non master"
797 " candidates" % file_name)
801 if constants.NV_NODELIST not in node_result:
803 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
805 if node_result[constants.NV_NODELIST]:
807 for node in node_result[constants.NV_NODELIST]:
808 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
809 (node, node_result[constants.NV_NODELIST][node]))
811 if constants.NV_NODENETTEST not in node_result:
813 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
815 if node_result[constants.NV_NODENETTEST]:
817 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
819 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
820 (node, node_result[constants.NV_NODENETTEST][node]))
822 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
823 if isinstance(hyp_result, dict):
824 for hv_name, hv_result in hyp_result.iteritems():
825 if hv_result is not None:
826 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
827 (hv_name, hv_result))
829 # check used drbd list
830 if vg_name is not None:
831 used_minors = node_result.get(constants.NV_DRBDLIST, [])
832 if not isinstance(used_minors, (tuple, list)):
833 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
836 for minor, (iname, must_exist) in drbd_map.items():
837 if minor not in used_minors and must_exist:
838 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
839 " not active" % (minor, iname))
841 for minor in used_minors:
842 if minor not in drbd_map:
843 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
849 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
850 node_instance, feedback_fn, n_offline):
851 """Verify an instance.
853 This function checks to see if the required block devices are
854 available on the instance's node.
859 node_current = instanceconfig.primary_node
862 instanceconfig.MapLVsByNode(node_vol_should)
864 for node in node_vol_should:
865 if node in n_offline:
866 # ignore missing volumes on offline nodes
868 for volume in node_vol_should[node]:
869 if node not in node_vol_is or volume not in node_vol_is[node]:
870 feedback_fn(" - ERROR: volume %s missing on node %s" %
874 if instanceconfig.admin_up:
875 if ((node_current not in node_instance or
876 not instance in node_instance[node_current]) and
877 node_current not in n_offline):
878 feedback_fn(" - ERROR: instance %s not running on node %s" %
879 (instance, node_current))
882 for node in node_instance:
883 if (not node == node_current):
884 if instance in node_instance[node]:
885 feedback_fn(" - ERROR: instance %s should not run on node %s" %
891 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
892 """Verify if there are any unknown volumes in the cluster.
894 The .os, .swap and backup volumes are ignored. All other volumes are
900 for node in node_vol_is:
901 for volume in node_vol_is[node]:
902 if node not in node_vol_should or volume not in node_vol_should[node]:
903 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
908 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
909 """Verify the list of running instances.
911 This checks what instances are running but unknown to the cluster.
915 for node in node_instance:
916 for runninginstance in node_instance[node]:
917 if runninginstance not in instancelist:
918 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
919 (runninginstance, node))
923 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
924 """Verify N+1 Memory Resilience.
926 Check that if one single node dies we can still start all the instances it
932 for node, nodeinfo in node_info.iteritems():
933 # This code checks that every node which is now listed as secondary has
934 # enough memory to host all instances it is supposed to should a single
935 # other node in the cluster fail.
936 # FIXME: not ready for failover to an arbitrary node
937 # FIXME: does not support file-backed instances
938 # WARNING: we currently take into account down instances as well as up
939 # ones, considering that even if they're down someone might want to start
940 # them even in the event of a node failure.
941 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
943 for instance in instances:
944 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
945 if bep[constants.BE_AUTO_BALANCE]:
946 needed_mem += bep[constants.BE_MEMORY]
947 if nodeinfo['mfree'] < needed_mem:
948 feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
949 " failovers should node %s fail" % (node, prinode))
953 def CheckPrereq(self):
954 """Check prerequisites.
956 Transform the list of checks we're going to skip into a set and check that
957 all its members are valid.
960 self.skip_set = frozenset(self.op.skip_checks)
961 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
962 raise errors.OpPrereqError("Invalid checks to be skipped specified")
964 def BuildHooksEnv(self):
967 Cluster-Verify hooks just ran in the post phase and their failure makes
968 the output be logged in the verify output and the verification to fail.
971 all_nodes = self.cfg.GetNodeList()
973 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
975 for node in self.cfg.GetAllNodesInfo().values():
976 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
978 return env, [], all_nodes
980 def Exec(self, feedback_fn):
981 """Verify integrity of cluster, performing various test on nodes.
985 feedback_fn("* Verifying global settings")
986 for msg in self.cfg.VerifyConfig():
987 feedback_fn(" - ERROR: %s" % msg)
989 vg_name = self.cfg.GetVGName()
990 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
991 nodelist = utils.NiceSort(self.cfg.GetNodeList())
992 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
993 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
994 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
995 for iname in instancelist)
996 i_non_redundant = [] # Non redundant instances
997 i_non_a_balanced = [] # Non auto-balanced instances
998 n_offline = [] # List of offline nodes
999 n_drained = [] # List of nodes being drained
1005 # FIXME: verify OS list
1006 # do local checksums
1007 master_files = [constants.CLUSTER_CONF_FILE]
1009 file_names = ssconf.SimpleStore().GetFileList()
1010 file_names.append(constants.SSL_CERT_FILE)
1011 file_names.append(constants.RAPI_CERT_FILE)
1012 file_names.extend(master_files)
1014 local_checksums = utils.FingerprintFiles(file_names)
1016 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1017 node_verify_param = {
1018 constants.NV_FILELIST: file_names,
1019 constants.NV_NODELIST: [node.name for node in nodeinfo
1020 if not node.offline],
1021 constants.NV_HYPERVISOR: hypervisors,
1022 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1023 node.secondary_ip) for node in nodeinfo
1024 if not node.offline],
1025 constants.NV_INSTANCELIST: hypervisors,
1026 constants.NV_VERSION: None,
1027 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1029 if vg_name is not None:
1030 node_verify_param[constants.NV_VGLIST] = None
1031 node_verify_param[constants.NV_LVLIST] = vg_name
1032 node_verify_param[constants.NV_DRBDLIST] = None
1033 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1034 self.cfg.GetClusterName())
1036 cluster = self.cfg.GetClusterInfo()
1037 master_node = self.cfg.GetMasterNode()
1038 all_drbd_map = self.cfg.ComputeDRBDMap()
1040 for node_i in nodeinfo:
1044 feedback_fn("* Skipping offline node %s" % (node,))
1045 n_offline.append(node)
1048 if node == master_node:
1050 elif node_i.master_candidate:
1051 ntype = "master candidate"
1052 elif node_i.drained:
1054 n_drained.append(node)
1057 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1059 msg = all_nvinfo[node].fail_msg
1061 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1065 nresult = all_nvinfo[node].payload
1067 for minor, instance in all_drbd_map[node].items():
1068 if instance not in instanceinfo:
1069 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1071 # ghost instance should not be running, but otherwise we
1072 # don't give double warnings (both ghost instance and
1073 # unallocated minor in use)
1074 node_drbd[minor] = (instance, False)
1076 instance = instanceinfo[instance]
1077 node_drbd[minor] = (instance.name, instance.admin_up)
1078 result = self._VerifyNode(node_i, file_names, local_checksums,
1079 nresult, feedback_fn, master_files,
1083 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1085 node_volume[node] = {}
1086 elif isinstance(lvdata, basestring):
1087 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1088 (node, utils.SafeEncode(lvdata)))
1090 node_volume[node] = {}
1091 elif not isinstance(lvdata, dict):
1092 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1096 node_volume[node] = lvdata
1099 idata = nresult.get(constants.NV_INSTANCELIST, None)
1100 if not isinstance(idata, list):
1101 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1106 node_instance[node] = idata
1109 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1110 if not isinstance(nodeinfo, dict):
1111 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1117 "mfree": int(nodeinfo['memory_free']),
1120 # dictionary holding all instances this node is secondary for,
1121 # grouped by their primary node. Each key is a cluster node, and each
1122 # value is a list of instances which have the key as primary and the
1123 # current node as secondary. this is handy to calculate N+1 memory
1124 # availability if you can only failover from a primary to its
1126 "sinst-by-pnode": {},
1128 # FIXME: devise a free space model for file based instances as well
1129 if vg_name is not None:
1130 if (constants.NV_VGLIST not in nresult or
1131 vg_name not in nresult[constants.NV_VGLIST]):
1132 feedback_fn(" - ERROR: node %s didn't return data for the"
1133 " volume group '%s' - it is either missing or broken" %
1137 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1138 except (ValueError, KeyError):
1139 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1140 " from node %s" % (node,))
1144 node_vol_should = {}
1146 for instance in instancelist:
1147 feedback_fn("* Verifying instance %s" % instance)
1148 inst_config = instanceinfo[instance]
1149 result = self._VerifyInstance(instance, inst_config, node_volume,
1150 node_instance, feedback_fn, n_offline)
1152 inst_nodes_offline = []
1154 inst_config.MapLVsByNode(node_vol_should)
1156 instance_cfg[instance] = inst_config
1158 pnode = inst_config.primary_node
1159 if pnode in node_info:
1160 node_info[pnode]['pinst'].append(instance)
1161 elif pnode not in n_offline:
1162 feedback_fn(" - ERROR: instance %s, connection to primary node"
1163 " %s failed" % (instance, pnode))
1166 if pnode in n_offline:
1167 inst_nodes_offline.append(pnode)
1169 # If the instance is non-redundant we cannot survive losing its primary
1170 # node, so we are not N+1 compliant. On the other hand we have no disk
1171 # templates with more than one secondary so that situation is not well
1173 # FIXME: does not support file-backed instances
1174 if len(inst_config.secondary_nodes) == 0:
1175 i_non_redundant.append(instance)
1176 elif len(inst_config.secondary_nodes) > 1:
1177 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1180 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1181 i_non_a_balanced.append(instance)
1183 for snode in inst_config.secondary_nodes:
1184 if snode in node_info:
1185 node_info[snode]['sinst'].append(instance)
1186 if pnode not in node_info[snode]['sinst-by-pnode']:
1187 node_info[snode]['sinst-by-pnode'][pnode] = []
1188 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1189 elif snode not in n_offline:
1190 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1191 " %s failed" % (instance, snode))
1193 if snode in n_offline:
1194 inst_nodes_offline.append(snode)
1196 if inst_nodes_offline:
1197 # warn that the instance lives on offline nodes, and set bad=True
1198 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1199 ", ".join(inst_nodes_offline))
1202 feedback_fn("* Verifying orphan volumes")
1203 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1207 feedback_fn("* Verifying remaining instances")
1208 result = self._VerifyOrphanInstances(instancelist, node_instance,
1212 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1213 feedback_fn("* Verifying N+1 Memory redundancy")
1214 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1217 feedback_fn("* Other Notes")
1219 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1220 % len(i_non_redundant))
1222 if i_non_a_balanced:
1223 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1224 % len(i_non_a_balanced))
1227 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1230 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1234 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1235 """Analyze the post-hooks' result
1237 This method analyses the hook result, handles it, and sends some
1238 nicely-formatted feedback back to the user.
1240 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1241 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1242 @param hooks_results: the results of the multi-node hooks rpc call
1243 @param feedback_fn: function used send feedback back to the caller
1244 @param lu_result: previous Exec result
1245 @return: the new Exec result, based on the previous result
1249 # We only really run POST phase hooks, and are only interested in
1251 if phase == constants.HOOKS_PHASE_POST:
1252 # Used to change hooks' output to proper indentation
1253 indent_re = re.compile('^', re.M)
1254 feedback_fn("* Hooks Results")
1255 if not hooks_results:
1256 feedback_fn(" - ERROR: general communication failure")
1259 for node_name in hooks_results:
1260 show_node_header = True
1261 res = hooks_results[node_name]
1265 # no need to warn or set fail return value
1267 feedback_fn(" Communication failure in hooks execution: %s" %
1271 for script, hkr, output in res.payload:
1272 if hkr == constants.HKR_FAIL:
1273 # The node header is only shown once, if there are
1274 # failing hooks on that node
1275 if show_node_header:
1276 feedback_fn(" Node %s:" % node_name)
1277 show_node_header = False
1278 feedback_fn(" ERROR: Script %s failed, output:" % script)
1279 output = indent_re.sub(' ', output)
1280 feedback_fn("%s" % output)
1286 class LUVerifyDisks(NoHooksLU):
1287 """Verifies the cluster disks status.
1293 def ExpandNames(self):
1294 self.needed_locks = {
1295 locking.LEVEL_NODE: locking.ALL_SET,
1296 locking.LEVEL_INSTANCE: locking.ALL_SET,
1298 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1300 def CheckPrereq(self):
1301 """Check prerequisites.
1303 This has no prerequisites.
1308 def Exec(self, feedback_fn):
1309 """Verify integrity of cluster disks.
1311 @rtype: tuple of three items
1312 @return: a tuple of (dict of node-to-node_error, list of instances
1313 which need activate-disks, dict of instance: (node, volume) for
1317 result = res_nodes, res_instances, res_missing = {}, [], {}
1319 vg_name = self.cfg.GetVGName()
1320 nodes = utils.NiceSort(self.cfg.GetNodeList())
1321 instances = [self.cfg.GetInstanceInfo(name)
1322 for name in self.cfg.GetInstanceList()]
1325 for inst in instances:
1327 if (not inst.admin_up or
1328 inst.disk_template not in constants.DTS_NET_MIRROR):
1330 inst.MapLVsByNode(inst_lvs)
1331 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1332 for node, vol_list in inst_lvs.iteritems():
1333 for vol in vol_list:
1334 nv_dict[(node, vol)] = inst
1339 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1343 node_res = node_lvs[node]
1344 if node_res.offline:
1346 msg = node_res.fail_msg
1348 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1349 res_nodes[node] = msg
1352 lvs = node_res.payload
1353 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1354 inst = nv_dict.pop((node, lv_name), None)
1355 if (not lv_online and inst is not None
1356 and inst.name not in res_instances):
1357 res_instances.append(inst.name)
1359 # any leftover items in nv_dict are missing LVs, let's arrange the
1361 for key, inst in nv_dict.iteritems():
1362 if inst.name not in res_missing:
1363 res_missing[inst.name] = []
1364 res_missing[inst.name].append(key)
1369 class LURenameCluster(LogicalUnit):
1370 """Rename the cluster.
1373 HPATH = "cluster-rename"
1374 HTYPE = constants.HTYPE_CLUSTER
1377 def BuildHooksEnv(self):
1382 "OP_TARGET": self.cfg.GetClusterName(),
1383 "NEW_NAME": self.op.name,
1385 mn = self.cfg.GetMasterNode()
1386 return env, [mn], [mn]
1388 def CheckPrereq(self):
1389 """Verify that the passed name is a valid one.
1392 hostname = utils.HostInfo(self.op.name)
1394 new_name = hostname.name
1395 self.ip = new_ip = hostname.ip
1396 old_name = self.cfg.GetClusterName()
1397 old_ip = self.cfg.GetMasterIP()
1398 if new_name == old_name and new_ip == old_ip:
1399 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1400 " cluster has changed")
1401 if new_ip != old_ip:
1402 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1403 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1404 " reachable on the network. Aborting." %
1407 self.op.name = new_name
1409 def Exec(self, feedback_fn):
1410 """Rename the cluster.
1413 clustername = self.op.name
1416 # shutdown the master IP
1417 master = self.cfg.GetMasterNode()
1418 result = self.rpc.call_node_stop_master(master, False)
1419 result.Raise("Could not disable the master role")
1422 cluster = self.cfg.GetClusterInfo()
1423 cluster.cluster_name = clustername
1424 cluster.master_ip = ip
1425 self.cfg.Update(cluster)
1427 # update the known hosts file
1428 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1429 node_list = self.cfg.GetNodeList()
1431 node_list.remove(master)
1434 result = self.rpc.call_upload_file(node_list,
1435 constants.SSH_KNOWN_HOSTS_FILE)
1436 for to_node, to_result in result.iteritems():
1437 msg = to_result.fail_msg
1439 msg = ("Copy of file %s to node %s failed: %s" %
1440 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1441 self.proc.LogWarning(msg)
1444 result = self.rpc.call_node_start_master(master, False, False)
1445 msg = result.fail_msg
1447 self.LogWarning("Could not re-enable the master role on"
1448 " the master, please restart manually: %s", msg)
1451 def _RecursiveCheckIfLVMBased(disk):
1452 """Check if the given disk or its children are lvm-based.
1454 @type disk: L{objects.Disk}
1455 @param disk: the disk to check
1457 @return: boolean indicating whether a LD_LV dev_type was found or not
1461 for chdisk in disk.children:
1462 if _RecursiveCheckIfLVMBased(chdisk):
1464 return disk.dev_type == constants.LD_LV
1467 class LUSetClusterParams(LogicalUnit):
1468 """Change the parameters of the cluster.
1471 HPATH = "cluster-modify"
1472 HTYPE = constants.HTYPE_CLUSTER
1476 def CheckArguments(self):
1480 if not hasattr(self.op, "candidate_pool_size"):
1481 self.op.candidate_pool_size = None
1482 if self.op.candidate_pool_size is not None:
1484 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1485 except (ValueError, TypeError), err:
1486 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1488 if self.op.candidate_pool_size < 1:
1489 raise errors.OpPrereqError("At least one master candidate needed")
1491 def ExpandNames(self):
1492 # FIXME: in the future maybe other cluster params won't require checking on
1493 # all nodes to be modified.
1494 self.needed_locks = {
1495 locking.LEVEL_NODE: locking.ALL_SET,
1497 self.share_locks[locking.LEVEL_NODE] = 1
1499 def BuildHooksEnv(self):
1504 "OP_TARGET": self.cfg.GetClusterName(),
1505 "NEW_VG_NAME": self.op.vg_name,
1507 mn = self.cfg.GetMasterNode()
1508 return env, [mn], [mn]
1510 def CheckPrereq(self):
1511 """Check prerequisites.
1513 This checks whether the given params don't conflict and
1514 if the given volume group is valid.
1517 if self.op.vg_name is not None and not self.op.vg_name:
1518 instances = self.cfg.GetAllInstancesInfo().values()
1519 for inst in instances:
1520 for disk in inst.disks:
1521 if _RecursiveCheckIfLVMBased(disk):
1522 raise errors.OpPrereqError("Cannot disable lvm storage while"
1523 " lvm-based instances exist")
1525 node_list = self.acquired_locks[locking.LEVEL_NODE]
1527 # if vg_name not None, checks given volume group on all nodes
1529 vglist = self.rpc.call_vg_list(node_list)
1530 for node in node_list:
1531 msg = vglist[node].fail_msg
1533 # ignoring down node
1534 self.LogWarning("Error while gathering data on node %s"
1535 " (ignoring node): %s", node, msg)
1537 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1539 constants.MIN_VG_SIZE)
1541 raise errors.OpPrereqError("Error on node '%s': %s" %
1544 self.cluster = cluster = self.cfg.GetClusterInfo()
1545 # validate params changes
1546 if self.op.beparams:
1547 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1548 self.new_beparams = objects.FillDict(
1549 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1551 if self.op.nicparams:
1552 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1553 self.new_nicparams = objects.FillDict(
1554 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1555 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1557 # hypervisor list/parameters
1558 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1559 if self.op.hvparams:
1560 if not isinstance(self.op.hvparams, dict):
1561 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1562 for hv_name, hv_dict in self.op.hvparams.items():
1563 if hv_name not in self.new_hvparams:
1564 self.new_hvparams[hv_name] = hv_dict
1566 self.new_hvparams[hv_name].update(hv_dict)
1568 if self.op.enabled_hypervisors is not None:
1569 self.hv_list = self.op.enabled_hypervisors
1570 if not self.hv_list:
1571 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1572 " least one member")
1573 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1575 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1576 " entries: %s" % invalid_hvs)
1578 self.hv_list = cluster.enabled_hypervisors
1580 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1581 # either the enabled list has changed, or the parameters have, validate
1582 for hv_name, hv_params in self.new_hvparams.items():
1583 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1584 (self.op.enabled_hypervisors and
1585 hv_name in self.op.enabled_hypervisors)):
1586 # either this is a new hypervisor, or its parameters have changed
1587 hv_class = hypervisor.GetHypervisor(hv_name)
1588 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1589 hv_class.CheckParameterSyntax(hv_params)
1590 _CheckHVParams(self, node_list, hv_name, hv_params)
1592 def Exec(self, feedback_fn):
1593 """Change the parameters of the cluster.
1596 if self.op.vg_name is not None:
1597 new_volume = self.op.vg_name
1600 if new_volume != self.cfg.GetVGName():
1601 self.cfg.SetVGName(new_volume)
1603 feedback_fn("Cluster LVM configuration already in desired"
1604 " state, not changing")
1605 if self.op.hvparams:
1606 self.cluster.hvparams = self.new_hvparams
1607 if self.op.enabled_hypervisors is not None:
1608 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1609 if self.op.beparams:
1610 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1611 if self.op.nicparams:
1612 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1614 if self.op.candidate_pool_size is not None:
1615 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1616 # we need to update the pool size here, otherwise the save will fail
1617 _AdjustCandidatePool(self)
1619 self.cfg.Update(self.cluster)
1622 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1623 """Distribute additional files which are part of the cluster configuration.
1625 ConfigWriter takes care of distributing the config and ssconf files, but
1626 there are more files which should be distributed to all nodes. This function
1627 makes sure those are copied.
1629 @param lu: calling logical unit
1630 @param additional_nodes: list of nodes not in the config to distribute to
1633 # 1. Gather target nodes
1634 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1635 dist_nodes = lu.cfg.GetNodeList()
1636 if additional_nodes is not None:
1637 dist_nodes.extend(additional_nodes)
1638 if myself.name in dist_nodes:
1639 dist_nodes.remove(myself.name)
1640 # 2. Gather files to distribute
1641 dist_files = set([constants.ETC_HOSTS,
1642 constants.SSH_KNOWN_HOSTS_FILE,
1643 constants.RAPI_CERT_FILE,
1644 constants.RAPI_USERS_FILE,
1645 constants.HMAC_CLUSTER_KEY,
1648 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1649 for hv_name in enabled_hypervisors:
1650 hv_class = hypervisor.GetHypervisor(hv_name)
1651 dist_files.update(hv_class.GetAncillaryFiles())
1653 # 3. Perform the files upload
1654 for fname in dist_files:
1655 if os.path.exists(fname):
1656 result = lu.rpc.call_upload_file(dist_nodes, fname)
1657 for to_node, to_result in result.items():
1658 msg = to_result.fail_msg
1660 msg = ("Copy of file %s to node %s failed: %s" %
1661 (fname, to_node, msg))
1662 lu.proc.LogWarning(msg)
1665 class LURedistributeConfig(NoHooksLU):
1666 """Force the redistribution of cluster configuration.
1668 This is a very simple LU.
1674 def ExpandNames(self):
1675 self.needed_locks = {
1676 locking.LEVEL_NODE: locking.ALL_SET,
1678 self.share_locks[locking.LEVEL_NODE] = 1
1680 def CheckPrereq(self):
1681 """Check prerequisites.
1685 def Exec(self, feedback_fn):
1686 """Redistribute the configuration.
1689 self.cfg.Update(self.cfg.GetClusterInfo())
1690 _RedistributeAncillaryFiles(self)
1693 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1694 """Sleep and poll for an instance's disk to sync.
1697 if not instance.disks:
1701 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1703 node = instance.primary_node
1705 for dev in instance.disks:
1706 lu.cfg.SetDiskID(dev, node)
1709 degr_retries = 10 # in seconds, as we sleep 1 second each time
1713 cumul_degraded = False
1714 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1715 msg = rstats.fail_msg
1717 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1720 raise errors.RemoteError("Can't contact node %s for mirror data,"
1721 " aborting." % node)
1724 rstats = rstats.payload
1726 for i, mstat in enumerate(rstats):
1728 lu.LogWarning("Can't compute data for node %s/%s",
1729 node, instance.disks[i].iv_name)
1731 # we ignore the ldisk parameter
1732 perc_done, est_time, is_degraded, _ = mstat
1733 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1734 if perc_done is not None:
1736 if est_time is not None:
1737 rem_time = "%d estimated seconds remaining" % est_time
1740 rem_time = "no time estimate"
1741 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1742 (instance.disks[i].iv_name, perc_done, rem_time))
1744 # if we're done but degraded, let's do a few small retries, to
1745 # make sure we see a stable and not transient situation; therefore
1746 # we force restart of the loop
1747 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1748 logging.info("Degraded disks found, %d retries left", degr_retries)
1756 time.sleep(min(60, max_time))
1759 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1760 return not cumul_degraded
1763 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1764 """Check that mirrors are not degraded.
1766 The ldisk parameter, if True, will change the test from the
1767 is_degraded attribute (which represents overall non-ok status for
1768 the device(s)) to the ldisk (representing the local storage status).
1771 lu.cfg.SetDiskID(dev, node)
1778 if on_primary or dev.AssembleOnSecondary():
1779 rstats = lu.rpc.call_blockdev_find(node, dev)
1780 msg = rstats.fail_msg
1782 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1784 elif not rstats.payload:
1785 lu.LogWarning("Can't find disk on node %s", node)
1788 result = result and (not rstats.payload[idx])
1790 for child in dev.children:
1791 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1796 class LUDiagnoseOS(NoHooksLU):
1797 """Logical unit for OS diagnose/query.
1800 _OP_REQP = ["output_fields", "names"]
1802 _FIELDS_STATIC = utils.FieldSet()
1803 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1805 def ExpandNames(self):
1807 raise errors.OpPrereqError("Selective OS query not supported")
1809 _CheckOutputFields(static=self._FIELDS_STATIC,
1810 dynamic=self._FIELDS_DYNAMIC,
1811 selected=self.op.output_fields)
1813 # Lock all nodes, in shared mode
1814 # Temporary removal of locks, should be reverted later
1815 # TODO: reintroduce locks when they are lighter-weight
1816 self.needed_locks = {}
1817 #self.share_locks[locking.LEVEL_NODE] = 1
1818 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1820 def CheckPrereq(self):
1821 """Check prerequisites.
1826 def _DiagnoseByOS(node_list, rlist):
1827 """Remaps a per-node return list into an a per-os per-node dictionary
1829 @param node_list: a list with the names of all nodes
1830 @param rlist: a map with node names as keys and OS objects as values
1833 @return: a dictionary with osnames as keys and as value another map, with
1834 nodes as keys and tuples of (path, status, diagnose) as values, eg::
1836 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1837 (/srv/..., False, "invalid api")],
1838 "node2": [(/srv/..., True, "")]}
1843 # we build here the list of nodes that didn't fail the RPC (at RPC
1844 # level), so that nodes with a non-responding node daemon don't
1845 # make all OSes invalid
1846 good_nodes = [node_name for node_name in rlist
1847 if not rlist[node_name].fail_msg]
1848 for node_name, nr in rlist.items():
1849 if nr.fail_msg or not nr.payload:
1851 for name, path, status, diagnose in nr.payload:
1852 if name not in all_os:
1853 # build a list of nodes for this os containing empty lists
1854 # for each node in node_list
1856 for nname in good_nodes:
1857 all_os[name][nname] = []
1858 all_os[name][node_name].append((path, status, diagnose))
1861 def Exec(self, feedback_fn):
1862 """Compute the list of OSes.
1865 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1866 node_data = self.rpc.call_os_diagnose(valid_nodes)
1867 pol = self._DiagnoseByOS(valid_nodes, node_data)
1869 for os_name, os_data in pol.items():
1871 for field in self.op.output_fields:
1874 elif field == "valid":
1875 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1876 elif field == "node_status":
1877 # this is just a copy of the dict
1879 for node_name, nos_list in os_data.items():
1880 val[node_name] = nos_list
1882 raise errors.ParameterError(field)
1889 class LURemoveNode(LogicalUnit):
1890 """Logical unit for removing a node.
1893 HPATH = "node-remove"
1894 HTYPE = constants.HTYPE_NODE
1895 _OP_REQP = ["node_name"]
1897 def BuildHooksEnv(self):
1900 This doesn't run on the target node in the pre phase as a failed
1901 node would then be impossible to remove.
1905 "OP_TARGET": self.op.node_name,
1906 "NODE_NAME": self.op.node_name,
1908 all_nodes = self.cfg.GetNodeList()
1909 all_nodes.remove(self.op.node_name)
1910 return env, all_nodes, all_nodes
1912 def CheckPrereq(self):
1913 """Check prerequisites.
1916 - the node exists in the configuration
1917 - it does not have primary or secondary instances
1918 - it's not the master
1920 Any errors are signaled by raising errors.OpPrereqError.
1923 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1925 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1927 instance_list = self.cfg.GetInstanceList()
1929 masternode = self.cfg.GetMasterNode()
1930 if node.name == masternode:
1931 raise errors.OpPrereqError("Node is the master node,"
1932 " you need to failover first.")
1934 for instance_name in instance_list:
1935 instance = self.cfg.GetInstanceInfo(instance_name)
1936 if node.name in instance.all_nodes:
1937 raise errors.OpPrereqError("Instance %s is still running on the node,"
1938 " please remove first." % instance_name)
1939 self.op.node_name = node.name
1942 def Exec(self, feedback_fn):
1943 """Removes the node from the cluster.
1947 logging.info("Stopping the node daemon and removing configs from node %s",
1950 self.context.RemoveNode(node.name)
1952 result = self.rpc.call_node_leave_cluster(node.name)
1953 msg = result.fail_msg
1955 self.LogWarning("Errors encountered on the remote node while leaving"
1956 " the cluster: %s", msg)
1958 # Promote nodes to master candidate as needed
1959 _AdjustCandidatePool(self)
1962 class LUQueryNodes(NoHooksLU):
1963 """Logical unit for querying nodes.
1966 _OP_REQP = ["output_fields", "names", "use_locking"]
1968 _FIELDS_DYNAMIC = utils.FieldSet(
1970 "mtotal", "mnode", "mfree",
1972 "ctotal", "cnodes", "csockets",
1975 _FIELDS_STATIC = utils.FieldSet(
1976 "name", "pinst_cnt", "sinst_cnt",
1977 "pinst_list", "sinst_list",
1978 "pip", "sip", "tags",
1987 def ExpandNames(self):
1988 _CheckOutputFields(static=self._FIELDS_STATIC,
1989 dynamic=self._FIELDS_DYNAMIC,
1990 selected=self.op.output_fields)
1992 self.needed_locks = {}
1993 self.share_locks[locking.LEVEL_NODE] = 1
1996 self.wanted = _GetWantedNodes(self, self.op.names)
1998 self.wanted = locking.ALL_SET
2000 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2001 self.do_locking = self.do_node_query and self.op.use_locking
2003 # if we don't request only static fields, we need to lock the nodes
2004 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2007 def CheckPrereq(self):
2008 """Check prerequisites.
2011 # The validation of the node list is done in the _GetWantedNodes,
2012 # if non empty, and if empty, there's no validation to do
2015 def Exec(self, feedback_fn):
2016 """Computes the list of nodes and their attributes.
2019 all_info = self.cfg.GetAllNodesInfo()
2021 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2022 elif self.wanted != locking.ALL_SET:
2023 nodenames = self.wanted
2024 missing = set(nodenames).difference(all_info.keys())
2026 raise errors.OpExecError(
2027 "Some nodes were removed before retrieving their data: %s" % missing)
2029 nodenames = all_info.keys()
2031 nodenames = utils.NiceSort(nodenames)
2032 nodelist = [all_info[name] for name in nodenames]
2034 # begin data gathering
2036 if self.do_node_query:
2038 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2039 self.cfg.GetHypervisorType())
2040 for name in nodenames:
2041 nodeinfo = node_data[name]
2042 if not nodeinfo.fail_msg and nodeinfo.payload:
2043 nodeinfo = nodeinfo.payload
2044 fn = utils.TryConvert
2046 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2047 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2048 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2049 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2050 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2051 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2052 "bootid": nodeinfo.get('bootid', None),
2053 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2054 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2057 live_data[name] = {}
2059 live_data = dict.fromkeys(nodenames, {})
2061 node_to_primary = dict([(name, set()) for name in nodenames])
2062 node_to_secondary = dict([(name, set()) for name in nodenames])
2064 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2065 "sinst_cnt", "sinst_list"))
2066 if inst_fields & frozenset(self.op.output_fields):
2067 instancelist = self.cfg.GetInstanceList()
2069 for instance_name in instancelist:
2070 inst = self.cfg.GetInstanceInfo(instance_name)
2071 if inst.primary_node in node_to_primary:
2072 node_to_primary[inst.primary_node].add(inst.name)
2073 for secnode in inst.secondary_nodes:
2074 if secnode in node_to_secondary:
2075 node_to_secondary[secnode].add(inst.name)
2077 master_node = self.cfg.GetMasterNode()
2079 # end data gathering
2082 for node in nodelist:
2084 for field in self.op.output_fields:
2087 elif field == "pinst_list":
2088 val = list(node_to_primary[node.name])
2089 elif field == "sinst_list":
2090 val = list(node_to_secondary[node.name])
2091 elif field == "pinst_cnt":
2092 val = len(node_to_primary[node.name])
2093 elif field == "sinst_cnt":
2094 val = len(node_to_secondary[node.name])
2095 elif field == "pip":
2096 val = node.primary_ip
2097 elif field == "sip":
2098 val = node.secondary_ip
2099 elif field == "tags":
2100 val = list(node.GetTags())
2101 elif field == "serial_no":
2102 val = node.serial_no
2103 elif field == "master_candidate":
2104 val = node.master_candidate
2105 elif field == "master":
2106 val = node.name == master_node
2107 elif field == "offline":
2109 elif field == "drained":
2111 elif self._FIELDS_DYNAMIC.Matches(field):
2112 val = live_data[node.name].get(field, None)
2113 elif field == "role":
2114 if node.name == master_node:
2116 elif node.master_candidate:
2125 raise errors.ParameterError(field)
2126 node_output.append(val)
2127 output.append(node_output)
2132 class LUQueryNodeVolumes(NoHooksLU):
2133 """Logical unit for getting volumes on node(s).
2136 _OP_REQP = ["nodes", "output_fields"]
2138 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2139 _FIELDS_STATIC = utils.FieldSet("node")
2141 def ExpandNames(self):
2142 _CheckOutputFields(static=self._FIELDS_STATIC,
2143 dynamic=self._FIELDS_DYNAMIC,
2144 selected=self.op.output_fields)
2146 self.needed_locks = {}
2147 self.share_locks[locking.LEVEL_NODE] = 1
2148 if not self.op.nodes:
2149 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2151 self.needed_locks[locking.LEVEL_NODE] = \
2152 _GetWantedNodes(self, self.op.nodes)
2154 def CheckPrereq(self):
2155 """Check prerequisites.
2157 This checks that the fields required are valid output fields.
2160 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2162 def Exec(self, feedback_fn):
2163 """Computes the list of nodes and their attributes.
2166 nodenames = self.nodes
2167 volumes = self.rpc.call_node_volumes(nodenames)
2169 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2170 in self.cfg.GetInstanceList()]
2172 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2175 for node in nodenames:
2176 nresult = volumes[node]
2179 msg = nresult.fail_msg
2181 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2184 node_vols = nresult.payload[:]
2185 node_vols.sort(key=lambda vol: vol['dev'])
2187 for vol in node_vols:
2189 for field in self.op.output_fields:
2192 elif field == "phys":
2196 elif field == "name":
2198 elif field == "size":
2199 val = int(float(vol['size']))
2200 elif field == "instance":
2202 if node not in lv_by_node[inst]:
2204 if vol['name'] in lv_by_node[inst][node]:
2210 raise errors.ParameterError(field)
2211 node_output.append(str(val))
2213 output.append(node_output)
2218 class LUAddNode(LogicalUnit):
2219 """Logical unit for adding node to the cluster.
2223 HTYPE = constants.HTYPE_NODE
2224 _OP_REQP = ["node_name"]
2226 def BuildHooksEnv(self):
2229 This will run on all nodes before, and on all nodes + the new node after.
2233 "OP_TARGET": self.op.node_name,
2234 "NODE_NAME": self.op.node_name,
2235 "NODE_PIP": self.op.primary_ip,
2236 "NODE_SIP": self.op.secondary_ip,
2238 nodes_0 = self.cfg.GetNodeList()
2239 nodes_1 = nodes_0 + [self.op.node_name, ]
2240 return env, nodes_0, nodes_1
2242 def CheckPrereq(self):
2243 """Check prerequisites.
2246 - the new node is not already in the config
2248 - its parameters (single/dual homed) matches the cluster
2250 Any errors are signaled by raising errors.OpPrereqError.
2253 node_name = self.op.node_name
2256 dns_data = utils.HostInfo(node_name)
2258 node = dns_data.name
2259 primary_ip = self.op.primary_ip = dns_data.ip
2260 secondary_ip = getattr(self.op, "secondary_ip", None)
2261 if secondary_ip is None:
2262 secondary_ip = primary_ip
2263 if not utils.IsValidIP(secondary_ip):
2264 raise errors.OpPrereqError("Invalid secondary IP given")
2265 self.op.secondary_ip = secondary_ip
2267 node_list = cfg.GetNodeList()
2268 if not self.op.readd and node in node_list:
2269 raise errors.OpPrereqError("Node %s is already in the configuration" %
2271 elif self.op.readd and node not in node_list:
2272 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2274 for existing_node_name in node_list:
2275 existing_node = cfg.GetNodeInfo(existing_node_name)
2277 if self.op.readd and node == existing_node_name:
2278 if (existing_node.primary_ip != primary_ip or
2279 existing_node.secondary_ip != secondary_ip):
2280 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2281 " address configuration as before")
2284 if (existing_node.primary_ip == primary_ip or
2285 existing_node.secondary_ip == primary_ip or
2286 existing_node.primary_ip == secondary_ip or
2287 existing_node.secondary_ip == secondary_ip):
2288 raise errors.OpPrereqError("New node ip address(es) conflict with"
2289 " existing node %s" % existing_node.name)
2291 # check that the type of the node (single versus dual homed) is the
2292 # same as for the master
2293 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2294 master_singlehomed = myself.secondary_ip == myself.primary_ip
2295 newbie_singlehomed = secondary_ip == primary_ip
2296 if master_singlehomed != newbie_singlehomed:
2297 if master_singlehomed:
2298 raise errors.OpPrereqError("The master has no private ip but the"
2299 " new node has one")
2301 raise errors.OpPrereqError("The master has a private ip but the"
2302 " new node doesn't have one")
2304 # checks reachability
2305 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2306 raise errors.OpPrereqError("Node not reachable by ping")
2308 if not newbie_singlehomed:
2309 # check reachability from my secondary ip to newbie's secondary ip
2310 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2311 source=myself.secondary_ip):
2312 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2313 " based ping to noded port")
2315 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2320 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2321 # the new node will increase mc_max with one, so:
2322 mc_max = min(mc_max + 1, cp_size)
2323 self.master_candidate = mc_now < mc_max
2326 self.new_node = self.cfg.GetNodeInfo(node)
2327 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2329 self.new_node = objects.Node(name=node,
2330 primary_ip=primary_ip,
2331 secondary_ip=secondary_ip,
2332 master_candidate=self.master_candidate,
2333 offline=False, drained=False)
2335 def Exec(self, feedback_fn):
2336 """Adds the new node to the cluster.
2339 new_node = self.new_node
2340 node = new_node.name
2342 # for re-adds, reset the offline/drained/master-candidate flags;
2343 # we need to reset here, otherwise offline would prevent RPC calls
2344 # later in the procedure; this also means that if the re-add
2345 # fails, we are left with a non-offlined, broken node
2347 new_node.drained = new_node.offline = False
2348 self.LogInfo("Readding a node, the offline/drained flags were reset")
2349 # if we demote the node, we do cleanup later in the procedure
2350 new_node.master_candidate = self.master_candidate
2352 # notify the user about any possible mc promotion
2353 if new_node.master_candidate:
2354 self.LogInfo("Node will be a master candidate")
2356 # check connectivity
2357 result = self.rpc.call_version([node])[node]
2358 result.Raise("Can't get version information from node %s" % node)
2359 if constants.PROTOCOL_VERSION == result.payload:
2360 logging.info("Communication to node %s fine, sw version %s match",
2361 node, result.payload)
2363 raise errors.OpExecError("Version mismatch master version %s,"
2364 " node version %s" %
2365 (constants.PROTOCOL_VERSION, result.payload))
2368 logging.info("Copy ssh key to node %s", node)
2369 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2371 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2372 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2378 keyarray.append(f.read())
2382 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2384 keyarray[3], keyarray[4], keyarray[5])
2385 result.Raise("Cannot transfer ssh keys to the new node")
2387 # Add node to our /etc/hosts, and add key to known_hosts
2388 if self.cfg.GetClusterInfo().modify_etc_hosts:
2389 utils.AddHostToEtcHosts(new_node.name)
2391 if new_node.secondary_ip != new_node.primary_ip:
2392 result = self.rpc.call_node_has_ip_address(new_node.name,
2393 new_node.secondary_ip)
2394 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2396 if not result.payload:
2397 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2398 " you gave (%s). Please fix and re-run this"
2399 " command." % new_node.secondary_ip)
2401 node_verify_list = [self.cfg.GetMasterNode()]
2402 node_verify_param = {
2404 # TODO: do a node-net-test as well?
2407 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2408 self.cfg.GetClusterName())
2409 for verifier in node_verify_list:
2410 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2411 nl_payload = result[verifier].payload['nodelist']
2413 for failed in nl_payload:
2414 feedback_fn("ssh/hostname verification failed %s -> %s" %
2415 (verifier, nl_payload[failed]))
2416 raise errors.OpExecError("ssh/hostname verification failed.")
2419 _RedistributeAncillaryFiles(self)
2420 self.context.ReaddNode(new_node)
2421 # make sure we redistribute the config
2422 self.cfg.Update(new_node)
2423 # and make sure the new node will not have old files around
2424 if not new_node.master_candidate:
2425 result = self.rpc.call_node_demote_from_mc(new_node.name)
2426 msg = result.RemoteFailMsg()
2428 self.LogWarning("Node failed to demote itself from master"
2429 " candidate status: %s" % msg)
2431 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2432 self.context.AddNode(new_node)
2435 class LUSetNodeParams(LogicalUnit):
2436 """Modifies the parameters of a node.
2439 HPATH = "node-modify"
2440 HTYPE = constants.HTYPE_NODE
2441 _OP_REQP = ["node_name"]
2444 def CheckArguments(self):
2445 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2446 if node_name is None:
2447 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2448 self.op.node_name = node_name
2449 _CheckBooleanOpField(self.op, 'master_candidate')
2450 _CheckBooleanOpField(self.op, 'offline')
2451 _CheckBooleanOpField(self.op, 'drained')
2452 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2453 if all_mods.count(None) == 3:
2454 raise errors.OpPrereqError("Please pass at least one modification")
2455 if all_mods.count(True) > 1:
2456 raise errors.OpPrereqError("Can't set the node into more than one"
2457 " state at the same time")
2459 def ExpandNames(self):
2460 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2462 def BuildHooksEnv(self):
2465 This runs on the master node.
2469 "OP_TARGET": self.op.node_name,
2470 "MASTER_CANDIDATE": str(self.op.master_candidate),
2471 "OFFLINE": str(self.op.offline),
2472 "DRAINED": str(self.op.drained),
2474 nl = [self.cfg.GetMasterNode(),
2478 def CheckPrereq(self):
2479 """Check prerequisites.
2481 This only checks the instance list against the existing names.
2484 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2486 if ((self.op.master_candidate == False or self.op.offline == True or
2487 self.op.drained == True) and node.master_candidate):
2488 # we will demote the node from master_candidate
2489 if self.op.node_name == self.cfg.GetMasterNode():
2490 raise errors.OpPrereqError("The master node has to be a"
2491 " master candidate, online and not drained")
2492 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2493 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2494 if num_candidates <= cp_size:
2495 msg = ("Not enough master candidates (desired"
2496 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2498 self.LogWarning(msg)
2500 raise errors.OpPrereqError(msg)
2502 if (self.op.master_candidate == True and
2503 ((node.offline and not self.op.offline == False) or
2504 (node.drained and not self.op.drained == False))):
2505 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2506 " to master_candidate" % node.name)
2510 def Exec(self, feedback_fn):
2519 if self.op.offline is not None:
2520 node.offline = self.op.offline
2521 result.append(("offline", str(self.op.offline)))
2522 if self.op.offline == True:
2523 if node.master_candidate:
2524 node.master_candidate = False
2526 result.append(("master_candidate", "auto-demotion due to offline"))
2528 node.drained = False
2529 result.append(("drained", "clear drained status due to offline"))
2531 if self.op.master_candidate is not None:
2532 node.master_candidate = self.op.master_candidate
2534 result.append(("master_candidate", str(self.op.master_candidate)))
2535 if self.op.master_candidate == False:
2536 rrc = self.rpc.call_node_demote_from_mc(node.name)
2539 self.LogWarning("Node failed to demote itself: %s" % msg)
2541 if self.op.drained is not None:
2542 node.drained = self.op.drained
2543 result.append(("drained", str(self.op.drained)))
2544 if self.op.drained == True:
2545 if node.master_candidate:
2546 node.master_candidate = False
2548 result.append(("master_candidate", "auto-demotion due to drain"))
2549 rrc = self.rpc.call_node_demote_from_mc(node.name)
2550 msg = rrc.RemoteFailMsg()
2552 self.LogWarning("Node failed to demote itself: %s" % msg)
2554 node.offline = False
2555 result.append(("offline", "clear offline status due to drain"))
2557 # this will trigger configuration file update, if needed
2558 self.cfg.Update(node)
2559 # this will trigger job queue propagation or cleanup
2561 self.context.ReaddNode(node)
2566 class LUPowercycleNode(NoHooksLU):
2567 """Powercycles a node.
2570 _OP_REQP = ["node_name", "force"]
2573 def CheckArguments(self):
2574 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2575 if node_name is None:
2576 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2577 self.op.node_name = node_name
2578 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2579 raise errors.OpPrereqError("The node is the master and the force"
2580 " parameter was not set")
2582 def ExpandNames(self):
2583 """Locking for PowercycleNode.
2585 This is a last-resource option and shouldn't block on other
2586 jobs. Therefore, we grab no locks.
2589 self.needed_locks = {}
2591 def CheckPrereq(self):
2592 """Check prerequisites.
2594 This LU has no prereqs.
2599 def Exec(self, feedback_fn):
2603 result = self.rpc.call_node_powercycle(self.op.node_name,
2604 self.cfg.GetHypervisorType())
2605 result.Raise("Failed to schedule the reboot")
2606 return result.payload
2609 class LUQueryClusterInfo(NoHooksLU):
2610 """Query cluster configuration.
2616 def ExpandNames(self):
2617 self.needed_locks = {}
2619 def CheckPrereq(self):
2620 """No prerequsites needed for this LU.
2625 def Exec(self, feedback_fn):
2626 """Return cluster config.
2629 cluster = self.cfg.GetClusterInfo()
2631 "software_version": constants.RELEASE_VERSION,
2632 "protocol_version": constants.PROTOCOL_VERSION,
2633 "config_version": constants.CONFIG_VERSION,
2634 "os_api_version": max(constants.OS_API_VERSIONS),
2635 "export_version": constants.EXPORT_VERSION,
2636 "architecture": (platform.architecture()[0], platform.machine()),
2637 "name": cluster.cluster_name,
2638 "master": cluster.master_node,
2639 "default_hypervisor": cluster.enabled_hypervisors[0],
2640 "enabled_hypervisors": cluster.enabled_hypervisors,
2641 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2642 for hypervisor_name in cluster.enabled_hypervisors]),
2643 "beparams": cluster.beparams,
2644 "nicparams": cluster.nicparams,
2645 "candidate_pool_size": cluster.candidate_pool_size,
2646 "master_netdev": cluster.master_netdev,
2647 "volume_group_name": cluster.volume_group_name,
2648 "file_storage_dir": cluster.file_storage_dir,
2654 class LUQueryConfigValues(NoHooksLU):
2655 """Return configuration values.
2660 _FIELDS_DYNAMIC = utils.FieldSet()
2661 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2663 def ExpandNames(self):
2664 self.needed_locks = {}
2666 _CheckOutputFields(static=self._FIELDS_STATIC,
2667 dynamic=self._FIELDS_DYNAMIC,
2668 selected=self.op.output_fields)
2670 def CheckPrereq(self):
2671 """No prerequisites.
2676 def Exec(self, feedback_fn):
2677 """Dump a representation of the cluster config to the standard output.
2681 for field in self.op.output_fields:
2682 if field == "cluster_name":
2683 entry = self.cfg.GetClusterName()
2684 elif field == "master_node":
2685 entry = self.cfg.GetMasterNode()
2686 elif field == "drain_flag":
2687 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2689 raise errors.ParameterError(field)
2690 values.append(entry)
2694 class LUActivateInstanceDisks(NoHooksLU):
2695 """Bring up an instance's disks.
2698 _OP_REQP = ["instance_name"]
2701 def ExpandNames(self):
2702 self._ExpandAndLockInstance()
2703 self.needed_locks[locking.LEVEL_NODE] = []
2704 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2706 def DeclareLocks(self, level):
2707 if level == locking.LEVEL_NODE:
2708 self._LockInstancesNodes()
2710 def CheckPrereq(self):
2711 """Check prerequisites.
2713 This checks that the instance is in the cluster.
2716 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2717 assert self.instance is not None, \
2718 "Cannot retrieve locked instance %s" % self.op.instance_name
2719 _CheckNodeOnline(self, self.instance.primary_node)
2721 def Exec(self, feedback_fn):
2722 """Activate the disks.
2725 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2727 raise errors.OpExecError("Cannot activate block devices")
2732 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2733 """Prepare the block devices for an instance.
2735 This sets up the block devices on all nodes.
2737 @type lu: L{LogicalUnit}
2738 @param lu: the logical unit on whose behalf we execute
2739 @type instance: L{objects.Instance}
2740 @param instance: the instance for whose disks we assemble
2741 @type ignore_secondaries: boolean
2742 @param ignore_secondaries: if true, errors on secondary nodes
2743 won't result in an error return from the function
2744 @return: False if the operation failed, otherwise a list of
2745 (host, instance_visible_name, node_visible_name)
2746 with the mapping from node devices to instance devices
2751 iname = instance.name
2752 # With the two passes mechanism we try to reduce the window of
2753 # opportunity for the race condition of switching DRBD to primary
2754 # before handshaking occured, but we do not eliminate it
2756 # The proper fix would be to wait (with some limits) until the
2757 # connection has been made and drbd transitions from WFConnection
2758 # into any other network-connected state (Connected, SyncTarget,
2761 # 1st pass, assemble on all nodes in secondary mode
2762 for inst_disk in instance.disks:
2763 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2764 lu.cfg.SetDiskID(node_disk, node)
2765 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2766 msg = result.fail_msg
2768 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2769 " (is_primary=False, pass=1): %s",
2770 inst_disk.iv_name, node, msg)
2771 if not ignore_secondaries:
2774 # FIXME: race condition on drbd migration to primary
2776 # 2nd pass, do only the primary node
2777 for inst_disk in instance.disks:
2778 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2779 if node != instance.primary_node:
2781 lu.cfg.SetDiskID(node_disk, node)
2782 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2783 msg = result.fail_msg
2785 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2786 " (is_primary=True, pass=2): %s",
2787 inst_disk.iv_name, node, msg)
2789 device_info.append((instance.primary_node, inst_disk.iv_name,
2792 # leave the disks configured for the primary node
2793 # this is a workaround that would be fixed better by
2794 # improving the logical/physical id handling
2795 for disk in instance.disks:
2796 lu.cfg.SetDiskID(disk, instance.primary_node)
2798 return disks_ok, device_info
2801 def _StartInstanceDisks(lu, instance, force):
2802 """Start the disks of an instance.
2805 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2806 ignore_secondaries=force)
2808 _ShutdownInstanceDisks(lu, instance)
2809 if force is not None and not force:
2810 lu.proc.LogWarning("", hint="If the message above refers to a"
2812 " you can retry the operation using '--force'.")
2813 raise errors.OpExecError("Disk consistency error")
2816 class LUDeactivateInstanceDisks(NoHooksLU):
2817 """Shutdown an instance's disks.
2820 _OP_REQP = ["instance_name"]
2823 def ExpandNames(self):
2824 self._ExpandAndLockInstance()
2825 self.needed_locks[locking.LEVEL_NODE] = []
2826 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2828 def DeclareLocks(self, level):
2829 if level == locking.LEVEL_NODE:
2830 self._LockInstancesNodes()
2832 def CheckPrereq(self):
2833 """Check prerequisites.
2835 This checks that the instance is in the cluster.
2838 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2839 assert self.instance is not None, \
2840 "Cannot retrieve locked instance %s" % self.op.instance_name
2842 def Exec(self, feedback_fn):
2843 """Deactivate the disks
2846 instance = self.instance
2847 _SafeShutdownInstanceDisks(self, instance)
2850 def _SafeShutdownInstanceDisks(lu, instance):
2851 """Shutdown block devices of an instance.
2853 This function checks if an instance is running, before calling
2854 _ShutdownInstanceDisks.
2857 pnode = instance.primary_node
2858 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2859 ins_l.Raise("Can't contact node %s" % pnode)
2861 if instance.name in ins_l.payload:
2862 raise errors.OpExecError("Instance is running, can't shutdown"
2865 _ShutdownInstanceDisks(lu, instance)
2868 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2869 """Shutdown block devices of an instance.
2871 This does the shutdown on all nodes of the instance.
2873 If the ignore_primary is false, errors on the primary node are
2878 for disk in instance.disks:
2879 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2880 lu.cfg.SetDiskID(top_disk, node)
2881 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2882 msg = result.fail_msg
2884 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2885 disk.iv_name, node, msg)
2886 if not ignore_primary or node != instance.primary_node:
2891 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2892 """Checks if a node has enough free memory.
2894 This function check if a given node has the needed amount of free
2895 memory. In case the node has less memory or we cannot get the
2896 information from the node, this function raise an OpPrereqError
2899 @type lu: C{LogicalUnit}
2900 @param lu: a logical unit from which we get configuration data
2902 @param node: the node to check
2903 @type reason: C{str}
2904 @param reason: string to use in the error message
2905 @type requested: C{int}
2906 @param requested: the amount of memory in MiB to check for
2907 @type hypervisor_name: C{str}
2908 @param hypervisor_name: the hypervisor to ask for memory stats
2909 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2910 we cannot check the node
2913 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2914 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2915 free_mem = nodeinfo[node].payload.get('memory_free', None)
2916 if not isinstance(free_mem, int):
2917 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2918 " was '%s'" % (node, free_mem))
2919 if requested > free_mem:
2920 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2921 " needed %s MiB, available %s MiB" %
2922 (node, reason, requested, free_mem))
2925 class LUStartupInstance(LogicalUnit):
2926 """Starts an instance.
2929 HPATH = "instance-start"
2930 HTYPE = constants.HTYPE_INSTANCE
2931 _OP_REQP = ["instance_name", "force"]
2934 def ExpandNames(self):
2935 self._ExpandAndLockInstance()
2937 def BuildHooksEnv(self):
2940 This runs on master, primary and secondary nodes of the instance.
2944 "FORCE": self.op.force,
2946 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2947 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2950 def CheckPrereq(self):
2951 """Check prerequisites.
2953 This checks that the instance is in the cluster.
2956 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2957 assert self.instance is not None, \
2958 "Cannot retrieve locked instance %s" % self.op.instance_name
2961 self.beparams = getattr(self.op, "beparams", {})
2963 if not isinstance(self.beparams, dict):
2964 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2965 " dict" % (type(self.beparams), ))
2966 # fill the beparams dict
2967 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2968 self.op.beparams = self.beparams
2971 self.hvparams = getattr(self.op, "hvparams", {})
2973 if not isinstance(self.hvparams, dict):
2974 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2975 " dict" % (type(self.hvparams), ))
2977 # check hypervisor parameter syntax (locally)
2978 cluster = self.cfg.GetClusterInfo()
2979 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2980 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2982 filled_hvp.update(self.hvparams)
2983 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2984 hv_type.CheckParameterSyntax(filled_hvp)
2985 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2986 self.op.hvparams = self.hvparams
2988 _CheckNodeOnline(self, instance.primary_node)
2990 bep = self.cfg.GetClusterInfo().FillBE(instance)
2991 # check bridges existence
2992 _CheckInstanceBridgesExist(self, instance)
2994 remote_info = self.rpc.call_instance_info(instance.primary_node,
2996 instance.hypervisor)
2997 remote_info.Raise("Error checking node %s" % instance.primary_node,
2999 if not remote_info.payload: # not running already
3000 _CheckNodeFreeMemory(self, instance.primary_node,
3001 "starting instance %s" % instance.name,
3002 bep[constants.BE_MEMORY], instance.hypervisor)
3004 def Exec(self, feedback_fn):
3005 """Start the instance.
3008 instance = self.instance
3009 force = self.op.force
3011 self.cfg.MarkInstanceUp(instance.name)
3013 node_current = instance.primary_node
3015 _StartInstanceDisks(self, instance, force)
3017 result = self.rpc.call_instance_start(node_current, instance,
3018 self.hvparams, self.beparams)
3019 msg = result.fail_msg
3021 _ShutdownInstanceDisks(self, instance)
3022 raise errors.OpExecError("Could not start instance: %s" % msg)
3025 class LURebootInstance(LogicalUnit):
3026 """Reboot an instance.
3029 HPATH = "instance-reboot"
3030 HTYPE = constants.HTYPE_INSTANCE
3031 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3034 def ExpandNames(self):
3035 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3036 constants.INSTANCE_REBOOT_HARD,
3037 constants.INSTANCE_REBOOT_FULL]:
3038 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3039 (constants.INSTANCE_REBOOT_SOFT,
3040 constants.INSTANCE_REBOOT_HARD,
3041 constants.INSTANCE_REBOOT_FULL))
3042 self._ExpandAndLockInstance()
3044 def BuildHooksEnv(self):
3047 This runs on master, primary and secondary nodes of the instance.
3051 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3052 "REBOOT_TYPE": self.op.reboot_type,
3054 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3055 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3058 def CheckPrereq(self):
3059 """Check prerequisites.
3061 This checks that the instance is in the cluster.
3064 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3065 assert self.instance is not None, \
3066 "Cannot retrieve locked instance %s" % self.op.instance_name
3068 _CheckNodeOnline(self, instance.primary_node)
3070 # check bridges existence
3071 _CheckInstanceBridgesExist(self, instance)
3073 def Exec(self, feedback_fn):
3074 """Reboot the instance.
3077 instance = self.instance
3078 ignore_secondaries = self.op.ignore_secondaries
3079 reboot_type = self.op.reboot_type
3081 node_current = instance.primary_node
3083 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3084 constants.INSTANCE_REBOOT_HARD]:
3085 for disk in instance.disks:
3086 self.cfg.SetDiskID(disk, node_current)
3087 result = self.rpc.call_instance_reboot(node_current, instance,
3089 result.Raise("Could not reboot instance")
3091 result = self.rpc.call_instance_shutdown(node_current, instance)
3092 result.Raise("Could not shutdown instance for full reboot")
3093 _ShutdownInstanceDisks(self, instance)
3094 _StartInstanceDisks(self, instance, ignore_secondaries)
3095 result = self.rpc.call_instance_start(node_current, instance, None, None)
3096 msg = result.fail_msg
3098 _ShutdownInstanceDisks(self, instance)
3099 raise errors.OpExecError("Could not start instance for"
3100 " full reboot: %s" % msg)
3102 self.cfg.MarkInstanceUp(instance.name)
3105 class LUShutdownInstance(LogicalUnit):
3106 """Shutdown an instance.
3109 HPATH = "instance-stop"
3110 HTYPE = constants.HTYPE_INSTANCE
3111 _OP_REQP = ["instance_name"]
3114 def ExpandNames(self):
3115 self._ExpandAndLockInstance()
3117 def BuildHooksEnv(self):
3120 This runs on master, primary and secondary nodes of the instance.
3123 env = _BuildInstanceHookEnvByObject(self, self.instance)
3124 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3127 def CheckPrereq(self):
3128 """Check prerequisites.
3130 This checks that the instance is in the cluster.
3133 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3134 assert self.instance is not None, \
3135 "Cannot retrieve locked instance %s" % self.op.instance_name
3136 _CheckNodeOnline(self, self.instance.primary_node)
3138 def Exec(self, feedback_fn):
3139 """Shutdown the instance.
3142 instance = self.instance
3143 node_current = instance.primary_node
3144 self.cfg.MarkInstanceDown(instance.name)
3145 result = self.rpc.call_instance_shutdown(node_current, instance)
3146 msg = result.fail_msg
3148 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3150 _ShutdownInstanceDisks(self, instance)
3153 class LUReinstallInstance(LogicalUnit):
3154 """Reinstall an instance.
3157 HPATH = "instance-reinstall"
3158 HTYPE = constants.HTYPE_INSTANCE
3159 _OP_REQP = ["instance_name"]
3162 def ExpandNames(self):
3163 self._ExpandAndLockInstance()
3165 def BuildHooksEnv(self):
3168 This runs on master, primary and secondary nodes of the instance.
3171 env = _BuildInstanceHookEnvByObject(self, self.instance)
3172 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3175 def CheckPrereq(self):
3176 """Check prerequisites.
3178 This checks that the instance is in the cluster and is not running.
3181 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3182 assert instance is not None, \
3183 "Cannot retrieve locked instance %s" % self.op.instance_name
3184 _CheckNodeOnline(self, instance.primary_node)
3186 if instance.disk_template == constants.DT_DISKLESS:
3187 raise errors.OpPrereqError("Instance '%s' has no disks" %
3188 self.op.instance_name)
3189 if instance.admin_up:
3190 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3191 self.op.instance_name)
3192 remote_info = self.rpc.call_instance_info(instance.primary_node,
3194 instance.hypervisor)
3195 remote_info.Raise("Error checking node %s" % instance.primary_node,
3197 if remote_info.payload:
3198 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3199 (self.op.instance_name,
3200 instance.primary_node))
3202 self.op.os_type = getattr(self.op, "os_type", None)
3203 if self.op.os_type is not None:
3205 pnode = self.cfg.GetNodeInfo(
3206 self.cfg.ExpandNodeName(instance.primary_node))
3208 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3210 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3211 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3212 (self.op.os_type, pnode.name), prereq=True)
3214 self.instance = instance
3216 def Exec(self, feedback_fn):
3217 """Reinstall the instance.
3220 inst = self.instance
3222 if self.op.os_type is not None:
3223 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3224 inst.os = self.op.os_type
3225 self.cfg.Update(inst)
3227 _StartInstanceDisks(self, inst, None)
3229 feedback_fn("Running the instance OS create scripts...")
3230 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3231 result.Raise("Could not install OS for instance %s on node %s" %
3232 (inst.name, inst.primary_node))
3234 _ShutdownInstanceDisks(self, inst)
3237 class LURenameInstance(LogicalUnit):
3238 """Rename an instance.
3241 HPATH = "instance-rename"
3242 HTYPE = constants.HTYPE_INSTANCE
3243 _OP_REQP = ["instance_name", "new_name"]
3245 def BuildHooksEnv(self):
3248 This runs on master, primary and secondary nodes of the instance.
3251 env = _BuildInstanceHookEnvByObject(self, self.instance)
3252 env["INSTANCE_NEW_NAME"] = self.op.new_name
3253 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3256 def CheckPrereq(self):
3257 """Check prerequisites.
3259 This checks that the instance is in the cluster and is not running.
3262 instance = self.cfg.GetInstanceInfo(
3263 self.cfg.ExpandInstanceName(self.op.instance_name))
3264 if instance is None:
3265 raise errors.OpPrereqError("Instance '%s' not known" %
3266 self.op.instance_name)
3267 _CheckNodeOnline(self, instance.primary_node)
3269 if instance.admin_up:
3270 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3271 self.op.instance_name)
3272 remote_info = self.rpc.call_instance_info(instance.primary_node,
3274 instance.hypervisor)
3275 remote_info.Raise("Error checking node %s" % instance.primary_node,
3277 if remote_info.payload:
3278 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3279 (self.op.instance_name,
3280 instance.primary_node))
3281 self.instance = instance
3283 # new name verification
3284 name_info = utils.HostInfo(self.op.new_name)
3286 self.op.new_name = new_name = name_info.name
3287 instance_list = self.cfg.GetInstanceList()
3288 if new_name in instance_list:
3289 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3292 if not getattr(self.op, "ignore_ip", False):
3293 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3294 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3295 (name_info.ip, new_name))
3298 def Exec(self, feedback_fn):
3299 """Reinstall the instance.
3302 inst = self.instance
3303 old_name = inst.name
3305 if inst.disk_template == constants.DT_FILE:
3306 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3308 self.cfg.RenameInstance(inst.name, self.op.new_name)
3309 # Change the instance lock. This is definitely safe while we hold the BGL
3310 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3311 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3313 # re-read the instance from the configuration after rename
3314 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3316 if inst.disk_template == constants.DT_FILE:
3317 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3318 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3319 old_file_storage_dir,
3320 new_file_storage_dir)
3321 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3322 " (but the instance has been renamed in Ganeti)" %
3323 (inst.primary_node, old_file_storage_dir,
3324 new_file_storage_dir))
3326 _StartInstanceDisks(self, inst, None)
3328 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3330 msg = result.fail_msg
3332 msg = ("Could not run OS rename script for instance %s on node %s"
3333 " (but the instance has been renamed in Ganeti): %s" %
3334 (inst.name, inst.primary_node, msg))
3335 self.proc.LogWarning(msg)
3337 _ShutdownInstanceDisks(self, inst)
3340 class LURemoveInstance(LogicalUnit):
3341 """Remove an instance.
3344 HPATH = "instance-remove"
3345 HTYPE = constants.HTYPE_INSTANCE
3346 _OP_REQP = ["instance_name", "ignore_failures"]
3349 def ExpandNames(self):
3350 self._ExpandAndLockInstance()
3351 self.needed_locks[locking.LEVEL_NODE] = []
3352 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3354 def DeclareLocks(self, level):
3355 if level == locking.LEVEL_NODE:
3356 self._LockInstancesNodes()
3358 def BuildHooksEnv(self):
3361 This runs on master, primary and secondary nodes of the instance.
3364 env = _BuildInstanceHookEnvByObject(self, self.instance)
3365 nl = [self.cfg.GetMasterNode()]
3368 def CheckPrereq(self):
3369 """Check prerequisites.
3371 This checks that the instance is in the cluster.
3374 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3375 assert self.instance is not None, \
3376 "Cannot retrieve locked instance %s" % self.op.instance_name
3378 def Exec(self, feedback_fn):
3379 """Remove the instance.
3382 instance = self.instance
3383 logging.info("Shutting down instance %s on node %s",
3384 instance.name, instance.primary_node)
3386 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3387 msg = result.fail_msg
3389 if self.op.ignore_failures:
3390 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3392 raise errors.OpExecError("Could not shutdown instance %s on"
3394 (instance.name, instance.primary_node, msg))
3396 logging.info("Removing block devices for instance %s", instance.name)
3398 if not _RemoveDisks(self, instance):
3399 if self.op.ignore_failures:
3400 feedback_fn("Warning: can't remove instance's disks")
3402 raise errors.OpExecError("Can't remove instance's disks")
3404 logging.info("Removing instance %s out of cluster config", instance.name)
3406 self.cfg.RemoveInstance(instance.name)
3407 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3410 class LUQueryInstances(NoHooksLU):
3411 """Logical unit for querying instances.
3414 _OP_REQP = ["output_fields", "names", "use_locking"]
3416 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3418 "disk_template", "ip", "mac", "bridge",
3419 "nic_mode", "nic_link",
3420 "sda_size", "sdb_size", "vcpus", "tags",
3421 "network_port", "beparams",
3422 r"(disk)\.(size)/([0-9]+)",
3423 r"(disk)\.(sizes)", "disk_usage",
3424 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3425 r"(nic)\.(bridge)/([0-9]+)",
3426 r"(nic)\.(macs|ips|modes|links|bridges)",
3427 r"(disk|nic)\.(count)",
3428 "serial_no", "hypervisor", "hvparams",] +
3430 for name in constants.HVS_PARAMETERS] +
3432 for name in constants.BES_PARAMETERS])
3433 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3436 def ExpandNames(self):
3437 _CheckOutputFields(static=self._FIELDS_STATIC,
3438 dynamic=self._FIELDS_DYNAMIC,
3439 selected=self.op.output_fields)
3441 self.needed_locks = {}
3442 self.share_locks[locking.LEVEL_INSTANCE] = 1
3443 self.share_locks[locking.LEVEL_NODE] = 1
3446 self.wanted = _GetWantedInstances(self, self.op.names)
3448 self.wanted = locking.ALL_SET
3450 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3451 self.do_locking = self.do_node_query and self.op.use_locking
3453 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3454 self.needed_locks[locking.LEVEL_NODE] = []
3455 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3457 def DeclareLocks(self, level):
3458 if level == locking.LEVEL_NODE and self.do_locking:
3459 self._LockInstancesNodes()
3461 def CheckPrereq(self):
3462 """Check prerequisites.
3467 def Exec(self, feedback_fn):
3468 """Computes the list of nodes and their attributes.
3471 all_info = self.cfg.GetAllInstancesInfo()
3472 if self.wanted == locking.ALL_SET:
3473 # caller didn't specify instance names, so ordering is not important
3475 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3477 instance_names = all_info.keys()
3478 instance_names = utils.NiceSort(instance_names)
3480 # caller did specify names, so we must keep the ordering
3482 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3484 tgt_set = all_info.keys()
3485 missing = set(self.wanted).difference(tgt_set)
3487 raise errors.OpExecError("Some instances were removed before"
3488 " retrieving their data: %s" % missing)
3489 instance_names = self.wanted
3491 instance_list = [all_info[iname] for iname in instance_names]
3493 # begin data gathering
3495 nodes = frozenset([inst.primary_node for inst in instance_list])
3496 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3500 if self.do_node_query:
3502 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3504 result = node_data[name]
3506 # offline nodes will be in both lists
3507 off_nodes.append(name)
3508 if result.failed or result.fail_msg:
3509 bad_nodes.append(name)
3512 live_data.update(result.payload)
3513 # else no instance is alive
3515 live_data = dict([(name, {}) for name in instance_names])
3517 # end data gathering
3522 cluster = self.cfg.GetClusterInfo()
3523 for instance in instance_list:
3525 i_hv = cluster.FillHV(instance)
3526 i_be = cluster.FillBE(instance)
3527 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3528 nic.nicparams) for nic in instance.nics]
3529 for field in self.op.output_fields:
3530 st_match = self._FIELDS_STATIC.Matches(field)
3535 elif field == "pnode":
3536 val = instance.primary_node
3537 elif field == "snodes":
3538 val = list(instance.secondary_nodes)
3539 elif field == "admin_state":
3540 val = instance.admin_up
3541 elif field == "oper_state":
3542 if instance.primary_node in bad_nodes:
3545 val = bool(live_data.get(instance.name))
3546 elif field == "status":
3547 if instance.primary_node in off_nodes:
3548 val = "ERROR_nodeoffline"
3549 elif instance.primary_node in bad_nodes:
3550 val = "ERROR_nodedown"
3552 running = bool(live_data.get(instance.name))
3554 if instance.admin_up:
3559 if instance.admin_up:
3563 elif field == "oper_ram":
3564 if instance.primary_node in bad_nodes:
3566 elif instance.name in live_data:
3567 val = live_data[instance.name].get("memory", "?")
3570 elif field == "vcpus":
3571 val = i_be[constants.BE_VCPUS]
3572 elif field == "disk_template":
3573 val = instance.disk_template
3576 val = instance.nics[0].ip
3579 elif field == "nic_mode":
3581 val = i_nicp[0][constants.NIC_MODE]
3584 elif field == "nic_link":
3586 val = i_nicp[0][constants.NIC_LINK]
3589 elif field == "bridge":
3590 if (instance.nics and
3591 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3592 val = i_nicp[0][constants.NIC_LINK]
3595 elif field == "mac":
3597 val = instance.nics[0].mac
3600 elif field == "sda_size" or field == "sdb_size":
3601 idx = ord(field[2]) - ord('a')
3603 val = instance.FindDisk(idx).size
3604 except errors.OpPrereqError:
3606 elif field == "disk_usage": # total disk usage per node
3607 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3608 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3609 elif field == "tags":
3610 val = list(instance.GetTags())
3611 elif field == "serial_no":
3612 val = instance.serial_no
3613 elif field == "network_port":
3614 val = instance.network_port
3615 elif field == "hypervisor":
3616 val = instance.hypervisor
3617 elif field == "hvparams":
3619 elif (field.startswith(HVPREFIX) and
3620 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3621 val = i_hv.get(field[len(HVPREFIX):], None)
3622 elif field == "beparams":
3624 elif (field.startswith(BEPREFIX) and
3625 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3626 val = i_be.get(field[len(BEPREFIX):], None)
3627 elif st_match and st_match.groups():
3628 # matches a variable list
3629 st_groups = st_match.groups()
3630 if st_groups and st_groups[0] == "disk":
3631 if st_groups[1] == "count":
3632 val = len(instance.disks)
3633 elif st_groups[1] == "sizes":
3634 val = [disk.size for disk in instance.disks]
3635 elif st_groups[1] == "size":
3637 val = instance.FindDisk(st_groups[2]).size
3638 except errors.OpPrereqError:
3641 assert False, "Unhandled disk parameter"
3642 elif st_groups[0] == "nic":
3643 if st_groups[1] == "count":
3644 val = len(instance.nics)
3645 elif st_groups[1] == "macs":
3646 val = [nic.mac for nic in instance.nics]
3647 elif st_groups[1] == "ips":
3648 val = [nic.ip for nic in instance.nics]
3649 elif st_groups[1] == "modes":
3650 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3651 elif st_groups[1] == "links":
3652 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3653 elif st_groups[1] == "bridges":
3656 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3657 val.append(nicp[constants.NIC_LINK])
3662 nic_idx = int(st_groups[2])
3663 if nic_idx >= len(instance.nics):
3666 if st_groups[1] == "mac":
3667 val = instance.nics[nic_idx].mac
3668 elif st_groups[1] == "ip":
3669 val = instance.nics[nic_idx].ip
3670 elif st_groups[1] == "mode":
3671 val = i_nicp[nic_idx][constants.NIC_MODE]
3672 elif st_groups[1] == "link":
3673 val = i_nicp[nic_idx][constants.NIC_LINK]
3674 elif st_groups[1] == "bridge":
3675 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3676 if nic_mode == constants.NIC_MODE_BRIDGED:
3677 val = i_nicp[nic_idx][constants.NIC_LINK]
3681 assert False, "Unhandled NIC parameter"
3683 assert False, ("Declared but unhandled variable parameter '%s'" %
3686 assert False, "Declared but unhandled parameter '%s'" % field
3693 class LUFailoverInstance(LogicalUnit):
3694 """Failover an instance.
3697 HPATH = "instance-failover"
3698 HTYPE = constants.HTYPE_INSTANCE
3699 _OP_REQP = ["instance_name", "ignore_consistency"]
3702 def ExpandNames(self):
3703 self._ExpandAndLockInstance()
3704 self.needed_locks[locking.LEVEL_NODE] = []
3705 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3707 def DeclareLocks(self, level):
3708 if level == locking.LEVEL_NODE:
3709 self._LockInstancesNodes()
3711 def BuildHooksEnv(self):
3714 This runs on master, primary and secondary nodes of the instance.
3718 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3720 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3721 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3724 def CheckPrereq(self):
3725 """Check prerequisites.
3727 This checks that the instance is in the cluster.
3730 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3731 assert self.instance is not None, \
3732 "Cannot retrieve locked instance %s" % self.op.instance_name
3734 bep = self.cfg.GetClusterInfo().FillBE(instance)
3735 if instance.disk_template not in constants.DTS_NET_MIRROR:
3736 raise errors.OpPrereqError("Instance's disk layout is not"
3737 " network mirrored, cannot failover.")
3739 secondary_nodes = instance.secondary_nodes
3740 if not secondary_nodes:
3741 raise errors.ProgrammerError("no secondary node but using "
3742 "a mirrored disk template")
3744 target_node = secondary_nodes[0]
3745 _CheckNodeOnline(self, target_node)
3746 _CheckNodeNotDrained(self, target_node)
3747 if instance.admin_up:
3748 # check memory requirements on the secondary node
3749 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3750 instance.name, bep[constants.BE_MEMORY],
3751 instance.hypervisor)
3753 self.LogInfo("Not checking memory on the secondary node as"
3754 " instance will not be started")
3756 # check bridge existance
3757 _CheckInstanceBridgesExist(self, instance, node=target_node)
3759 def Exec(self, feedback_fn):
3760 """Failover an instance.
3762 The failover is done by shutting it down on its present node and
3763 starting it on the secondary.
3766 instance = self.instance
3768 source_node = instance.primary_node
3769 target_node = instance.secondary_nodes[0]
3771 feedback_fn("* checking disk consistency between source and target")
3772 for dev in instance.disks:
3773 # for drbd, these are drbd over lvm
3774 if not _CheckDiskConsistency(self, dev, target_node, False):
3775 if instance.admin_up and not self.op.ignore_consistency:
3776 raise errors.OpExecError("Disk %s is degraded on target node,"
3777 " aborting failover." % dev.iv_name)
3779 feedback_fn("* shutting down instance on source node")
3780 logging.info("Shutting down instance %s on node %s",
3781 instance.name, source_node)
3783 result = self.rpc.call_instance_shutdown(source_node, instance)
3784 msg = result.fail_msg
3786 if self.op.ignore_consistency:
3787 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3788 " Proceeding anyway. Please make sure node"
3789 " %s is down. Error details: %s",
3790 instance.name, source_node, source_node, msg)
3792 raise errors.OpExecError("Could not shutdown instance %s on"
3794 (instance.name, source_node, msg))
3796 feedback_fn("* deactivating the instance's disks on source node")
3797 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3798 raise errors.OpExecError("Can't shut down the instance's disks.")
3800 instance.primary_node = target_node
3801 # distribute new instance config to the other nodes
3802 self.cfg.Update(instance)
3804 # Only start the instance if it's marked as up
3805 if instance.admin_up:
3806 feedback_fn("* activating the instance's disks on target node")
3807 logging.info("Starting instance %s on node %s",
3808 instance.name, target_node)
3810 disks_ok, _ = _AssembleInstanceDisks(self, instance,
3811 ignore_secondaries=True)
3813 _ShutdownInstanceDisks(self, instance)
3814 raise errors.OpExecError("Can't activate the instance's disks")
3816 feedback_fn("* starting the instance on the target node")
3817 result = self.rpc.call_instance_start(target_node, instance, None, None)
3818 msg = result.fail_msg
3820 _ShutdownInstanceDisks(self, instance)
3821 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3822 (instance.name, target_node, msg))
3825 class LUMigrateInstance(LogicalUnit):
3826 """Migrate an instance.
3828 This is migration without shutting down, compared to the failover,
3829 which is done with shutdown.
3832 HPATH = "instance-migrate"
3833 HTYPE = constants.HTYPE_INSTANCE
3834 _OP_REQP = ["instance_name", "live", "cleanup"]
3838 def ExpandNames(self):
3839 self._ExpandAndLockInstance()
3840 self.needed_locks[locking.LEVEL_NODE] = []
3841 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3843 def DeclareLocks(self, level):
3844 if level == locking.LEVEL_NODE:
3845 self._LockInstancesNodes()
3847 def BuildHooksEnv(self):
3850 This runs on master, primary and secondary nodes of the instance.
3853 env = _BuildInstanceHookEnvByObject(self, self.instance)
3854 env["MIGRATE_LIVE"] = self.op.live
3855 env["MIGRATE_CLEANUP"] = self.op.cleanup
3856 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3859 def CheckPrereq(self):
3860 """Check prerequisites.
3862 This checks that the instance is in the cluster.
3865 instance = self.cfg.GetInstanceInfo(
3866 self.cfg.ExpandInstanceName(self.op.instance_name))
3867 if instance is None:
3868 raise errors.OpPrereqError("Instance '%s' not known" %
3869 self.op.instance_name)
3871 if instance.disk_template != constants.DT_DRBD8:
3872 raise errors.OpPrereqError("Instance's disk layout is not"
3873 " drbd8, cannot migrate.")
3875 secondary_nodes = instance.secondary_nodes
3876 if not secondary_nodes:
3877 raise errors.ConfigurationError("No secondary node but using"
3878 " drbd8 disk template")
3880 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3882 target_node = secondary_nodes[0]
3883 # check memory requirements on the secondary node
3884 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3885 instance.name, i_be[constants.BE_MEMORY],
3886 instance.hypervisor)
3888 # check bridge existance
3889 _CheckInstanceBridgesExist(self, instance, node=target_node)
3891 if not self.op.cleanup:
3892 _CheckNodeNotDrained(self, target_node)
3893 result = self.rpc.call_instance_migratable(instance.primary_node,
3895 result.Raise("Can't migrate, please use failover", prereq=True)
3897 self.instance = instance
3899 def _WaitUntilSync(self):
3900 """Poll with custom rpc for disk sync.
3902 This uses our own step-based rpc call.
3905 self.feedback_fn("* wait until resync is done")
3909 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3911 self.instance.disks)
3913 for node, nres in result.items():
3914 nres.Raise("Cannot resync disks on node %s" % node)
3915 node_done, node_percent = nres.payload
3916 all_done = all_done and node_done
3917 if node_percent is not None:
3918 min_percent = min(min_percent, node_percent)
3920 if min_percent < 100:
3921 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3924 def _EnsureSecondary(self, node):
3925 """Demote a node to secondary.
3928 self.feedback_fn("* switching node %s to secondary mode" % node)
3930 for dev in self.instance.disks:
3931 self.cfg.SetDiskID(dev, node)
3933 result = self.rpc.call_blockdev_close(node, self.instance.name,
3934 self.instance.disks)
3935 result.Raise("Cannot change disk to secondary on node %s" % node)
3937 def _GoStandalone(self):
3938 """Disconnect from the network.
3941 self.feedback_fn("* changing into standalone mode")
3942 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3943 self.instance.disks)
3944 for node, nres in result.items():
3945 nres.Raise("Cannot disconnect disks node %s" % node)
3947 def _GoReconnect(self, multimaster):
3948 """Reconnect to the network.
3954 msg = "single-master"
3955 self.feedback_fn("* changing disks into %s mode" % msg)
3956 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3957 self.instance.disks,
3958 self.instance.name, multimaster)
3959 for node, nres in result.items():
3960 nres.Raise("Cannot change disks config on node %s" % node)
3962 def _ExecCleanup(self):
3963 """Try to cleanup after a failed migration.
3965 The cleanup is done by:
3966 - check that the instance is running only on one node
3967 (and update the config if needed)
3968 - change disks on its secondary node to secondary
3969 - wait until disks are fully synchronized
3970 - disconnect from the network
3971 - change disks into single-master mode
3972 - wait again until disks are fully synchronized
3975 instance = self.instance
3976 target_node = self.target_node
3977 source_node = self.source_node
3979 # check running on only one node
3980 self.feedback_fn("* checking where the instance actually runs"
3981 " (if this hangs, the hypervisor might be in"
3983 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3984 for node, result in ins_l.items():
3985 result.Raise("Can't contact node %s" % node)
3987 runningon_source = instance.name in ins_l[source_node].payload
3988 runningon_target = instance.name in ins_l[target_node].payload
3990 if runningon_source and runningon_target:
3991 raise errors.OpExecError("Instance seems to be running on two nodes,"
3992 " or the hypervisor is confused. You will have"
3993 " to ensure manually that it runs only on one"
3994 " and restart this operation.")
3996 if not (runningon_source or runningon_target):
3997 raise errors.OpExecError("Instance does not seem to be running at all."
3998 " In this case, it's safer to repair by"
3999 " running 'gnt-instance stop' to ensure disk"
4000 " shutdown, and then restarting it.")
4002 if runningon_target:
4003 # the migration has actually succeeded, we need to update the config
4004 self.feedback_fn("* instance running on secondary node (%s),"
4005 " updating config" % target_node)
4006 instance.primary_node = target_node
4007 self.cfg.Update(instance)
4008 demoted_node = source_node
4010 self.feedback_fn("* instance confirmed to be running on its"
4011 " primary node (%s)" % source_node)
4012 demoted_node = target_node
4014 self._EnsureSecondary(demoted_node)
4016 self._WaitUntilSync()
4017 except errors.OpExecError:
4018 # we ignore here errors, since if the device is standalone, it
4019 # won't be able to sync
4021 self._GoStandalone()
4022 self._GoReconnect(False)
4023 self._WaitUntilSync()
4025 self.feedback_fn("* done")
4027 def _RevertDiskStatus(self):
4028 """Try to revert the disk status after a failed migration.
4031 target_node = self.target_node
4033 self._EnsureSecondary(target_node)
4034 self._GoStandalone()
4035 self._GoReconnect(False)
4036 self._WaitUntilSync()
4037 except errors.OpExecError, err:
4038 self.LogWarning("Migration failed and I can't reconnect the"
4039 " drives: error '%s'\n"
4040 "Please look and recover the instance status" %
4043 def _AbortMigration(self):
4044 """Call the hypervisor code to abort a started migration.
4047 instance = self.instance
4048 target_node = self.target_node
4049 migration_info = self.migration_info
4051 abort_result = self.rpc.call_finalize_migration(target_node,
4055 abort_msg = abort_result.fail_msg
4057 logging.error("Aborting migration failed on target node %s: %s" %
4058 (target_node, abort_msg))
4059 # Don't raise an exception here, as we stil have to try to revert the
4060 # disk status, even if this step failed.
4062 def _ExecMigration(self):
4063 """Migrate an instance.
4065 The migrate is done by:
4066 - change the disks into dual-master mode
4067 - wait until disks are fully synchronized again
4068 - migrate the instance
4069 - change disks on the new secondary node (the old primary) to secondary
4070 - wait until disks are fully synchronized
4071 - change disks into single-master mode
4074 instance = self.instance
4075 target_node = self.target_node
4076 source_node = self.source_node
4078 self.feedback_fn("* checking disk consistency between source and target")
4079 for dev in instance.disks:
4080 if not _CheckDiskConsistency(self, dev, target_node, False):
4081 raise errors.OpExecError("Disk %s is degraded or not fully"
4082 " synchronized on target node,"
4083 " aborting migrate." % dev.iv_name)
4085 # First get the migration information from the remote node
4086 result = self.rpc.call_migration_info(source_node, instance)
4087 msg = result.fail_msg
4089 log_err = ("Failed fetching source migration information from %s: %s" %
4091 logging.error(log_err)
4092 raise errors.OpExecError(log_err)
4094 self.migration_info = migration_info = result.payload
4096 # Then switch the disks to master/master mode
4097 self._EnsureSecondary(target_node)
4098 self._GoStandalone()
4099 self._GoReconnect(True)
4100 self._WaitUntilSync()
4102 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4103 result = self.rpc.call_accept_instance(target_node,
4106 self.nodes_ip[target_node])
4108 msg = result.fail_msg
4110 logging.error("Instance pre-migration failed, trying to revert"
4111 " disk status: %s", msg)
4112 self._AbortMigration()
4113 self._RevertDiskStatus()
4114 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4115 (instance.name, msg))
4117 self.feedback_fn("* migrating instance to %s" % target_node)
4119 result = self.rpc.call_instance_migrate(source_node, instance,
4120 self.nodes_ip[target_node],
4122 msg = result.fail_msg
4124 logging.error("Instance migration failed, trying to revert"
4125 " disk status: %s", msg)
4126 self._AbortMigration()
4127 self._RevertDiskStatus()
4128 raise errors.OpExecError("Could not migrate instance %s: %s" %
4129 (instance.name, msg))
4132 instance.primary_node = target_node
4133 # distribute new instance config to the other nodes
4134 self.cfg.Update(instance)
4136 result = self.rpc.call_finalize_migration(target_node,
4140 msg = result.fail_msg
4142 logging.error("Instance migration succeeded, but finalization failed:"
4144 raise errors.OpExecError("Could not finalize instance migration: %s" %
4147 self._EnsureSecondary(source_node)
4148 self._WaitUntilSync()
4149 self._GoStandalone()
4150 self._GoReconnect(False)
4151 self._WaitUntilSync()
4153 self.feedback_fn("* done")
4155 def Exec(self, feedback_fn):
4156 """Perform the migration.
4159 self.feedback_fn = feedback_fn
4161 self.source_node = self.instance.primary_node
4162 self.target_node = self.instance.secondary_nodes[0]
4163 self.all_nodes = [self.source_node, self.target_node]
4165 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4166 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4169 return self._ExecCleanup()
4171 return self._ExecMigration()
4174 def _CreateBlockDev(lu, node, instance, device, force_create,
4176 """Create a tree of block devices on a given node.
4178 If this device type has to be created on secondaries, create it and
4181 If not, just recurse to children keeping the same 'force' value.
4183 @param lu: the lu on whose behalf we execute
4184 @param node: the node on which to create the device
4185 @type instance: L{objects.Instance}
4186 @param instance: the instance which owns the device
4187 @type device: L{objects.Disk}
4188 @param device: the device to create
4189 @type force_create: boolean
4190 @param force_create: whether to force creation of this device; this
4191 will be change to True whenever we find a device which has
4192 CreateOnSecondary() attribute
4193 @param info: the extra 'metadata' we should attach to the device
4194 (this will be represented as a LVM tag)
4195 @type force_open: boolean
4196 @param force_open: this parameter will be passes to the
4197 L{backend.BlockdevCreate} function where it specifies
4198 whether we run on primary or not, and it affects both
4199 the child assembly and the device own Open() execution
4202 if device.CreateOnSecondary():
4206 for child in device.children:
4207 _CreateBlockDev(lu, node, instance, child, force_create,
4210 if not force_create:
4213 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4216 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4217 """Create a single block device on a given node.
4219 This will not recurse over children of the device, so they must be
4222 @param lu: the lu on whose behalf we execute
4223 @param node: the node on which to create the device
4224 @type instance: L{objects.Instance}
4225 @param instance: the instance which owns the device
4226 @type device: L{objects.Disk}
4227 @param device: the device to create
4228 @param info: the extra 'metadata' we should attach to the device
4229 (this will be represented as a LVM tag)
4230 @type force_open: boolean
4231 @param force_open: this parameter will be passes to the
4232 L{backend.BlockdevCreate} function where it specifies
4233 whether we run on primary or not, and it affects both
4234 the child assembly and the device own Open() execution
4237 lu.cfg.SetDiskID(device, node)
4238 result = lu.rpc.call_blockdev_create(node, device, device.size,
4239 instance.name, force_open, info)
4240 result.Raise("Can't create block device %s on"
4241 " node %s for instance %s" % (device, node, instance.name))
4242 if device.physical_id is None:
4243 device.physical_id = result.payload
4246 def _GenerateUniqueNames(lu, exts):
4247 """Generate a suitable LV name.
4249 This will generate a logical volume name for the given instance.
4254 new_id = lu.cfg.GenerateUniqueID()
4255 results.append("%s%s" % (new_id, val))
4259 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4261 """Generate a drbd8 device complete with its children.
4264 port = lu.cfg.AllocatePort()
4265 vgname = lu.cfg.GetVGName()
4266 shared_secret = lu.cfg.GenerateDRBDSecret()
4267 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4268 logical_id=(vgname, names[0]))
4269 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4270 logical_id=(vgname, names[1]))
4271 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4272 logical_id=(primary, secondary, port,
4275 children=[dev_data, dev_meta],
4280 def _GenerateDiskTemplate(lu, template_name,
4281 instance_name, primary_node,
4282 secondary_nodes, disk_info,
4283 file_storage_dir, file_driver,
4285 """Generate the entire disk layout for a given template type.
4288 #TODO: compute space requirements
4290 vgname = lu.cfg.GetVGName()
4291 disk_count = len(disk_info)
4293 if template_name == constants.DT_DISKLESS:
4295 elif template_name == constants.DT_PLAIN:
4296 if len(secondary_nodes) != 0:
4297 raise errors.ProgrammerError("Wrong template configuration")
4299 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4300 for i in range(disk_count)])
4301 for idx, disk in enumerate(disk_info):
4302 disk_index = idx + base_index
4303 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4304 logical_id=(vgname, names[idx]),
4305 iv_name="disk/%d" % disk_index,
4307 disks.append(disk_dev)
4308 elif template_name == constants.DT_DRBD8:
4309 if len(secondary_nodes) != 1:
4310 raise errors.ProgrammerError("Wrong template configuration")
4311 remote_node = secondary_nodes[0]
4312 minors = lu.cfg.AllocateDRBDMinor(
4313 [primary_node, remote_node] * len(disk_info), instance_name)
4316 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4317 for i in range(disk_count)]):
4318 names.append(lv_prefix + "_data")
4319 names.append(lv_prefix + "_meta")
4320 for idx, disk in enumerate(disk_info):
4321 disk_index = idx + base_index
4322 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4323 disk["size"], names[idx*2:idx*2+2],
4324 "disk/%d" % disk_index,
4325 minors[idx*2], minors[idx*2+1])
4326 disk_dev.mode = disk["mode"]
4327 disks.append(disk_dev)
4328 elif template_name == constants.DT_FILE:
4329 if len(secondary_nodes) != 0:
4330 raise errors.ProgrammerError("Wrong template configuration")
4332 for idx, disk in enumerate(disk_info):
4333 disk_index = idx + base_index
4334 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4335 iv_name="disk/%d" % disk_index,
4336 logical_id=(file_driver,
4337 "%s/disk%d" % (file_storage_dir,
4340 disks.append(disk_dev)
4342 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4346 def _GetInstanceInfoText(instance):
4347 """Compute that text that should be added to the disk's metadata.
4350 return "originstname+%s" % instance.name
4353 def _CreateDisks(lu, instance):
4354 """Create all disks for an instance.
4356 This abstracts away some work from AddInstance.
4358 @type lu: L{LogicalUnit}
4359 @param lu: the logical unit on whose behalf we execute
4360 @type instance: L{objects.Instance}
4361 @param instance: the instance whose disks we should create
4363 @return: the success of the creation
4366 info = _GetInstanceInfoText(instance)
4367 pnode = instance.primary_node
4369 if instance.disk_template == constants.DT_FILE:
4370 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4371 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4373 result.Raise("Failed to create directory '%s' on"
4374 " node %s: %s" % (file_storage_dir, pnode))
4376 # Note: this needs to be kept in sync with adding of disks in
4377 # LUSetInstanceParams
4378 for device in instance.disks:
4379 logging.info("Creating volume %s for instance %s",
4380 device.iv_name, instance.name)
4382 for node in instance.all_nodes:
4383 f_create = node == pnode
4384 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4387 def _RemoveDisks(lu, instance):
4388 """Remove all disks for an instance.
4390 This abstracts away some work from `AddInstance()` and
4391 `RemoveInstance()`. Note that in case some of the devices couldn't
4392 be removed, the removal will continue with the other ones (compare
4393 with `_CreateDisks()`).
4395 @type lu: L{LogicalUnit}
4396 @param lu: the logical unit on whose behalf we execute
4397 @type instance: L{objects.Instance}
4398 @param instance: the instance whose disks we should remove
4400 @return: the success of the removal
4403 logging.info("Removing block devices for instance %s", instance.name)
4406 for device in instance.disks:
4407 for node, disk in device.ComputeNodeTree(instance.primary_node):
4408 lu.cfg.SetDiskID(disk, node)
4409 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4411 lu.LogWarning("Could not remove block device %s on node %s,"
4412 " continuing anyway: %s", device.iv_name, node, msg)
4415 if instance.disk_template == constants.DT_FILE:
4416 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4417 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4419 msg = result.fail_msg
4421 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4422 file_storage_dir, instance.primary_node, msg)
4428 def _ComputeDiskSize(disk_template, disks):
4429 """Compute disk size requirements in the volume group
4432 # Required free disk space as a function of disk and swap space
4434 constants.DT_DISKLESS: None,
4435 constants.DT_PLAIN: sum(d["size"] for d in disks),
4436 # 128 MB are added for drbd metadata for each disk
4437 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4438 constants.DT_FILE: None,
4441 if disk_template not in req_size_dict:
4442 raise errors.ProgrammerError("Disk template '%s' size requirement"
4443 " is unknown" % disk_template)
4445 return req_size_dict[disk_template]
4448 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4449 """Hypervisor parameter validation.
4451 This function abstract the hypervisor parameter validation to be
4452 used in both instance create and instance modify.
4454 @type lu: L{LogicalUnit}
4455 @param lu: the logical unit for which we check
4456 @type nodenames: list
4457 @param nodenames: the list of nodes on which we should check
4458 @type hvname: string
4459 @param hvname: the name of the hypervisor we should use
4460 @type hvparams: dict
4461 @param hvparams: the parameters which we need to check
4462 @raise errors.OpPrereqError: if the parameters are not valid
4465 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4468 for node in nodenames:
4472 info.Raise("Hypervisor parameter validation failed on node %s" % node)
4475 class LUCreateInstance(LogicalUnit):
4476 """Create an instance.
4479 HPATH = "instance-add"
4480 HTYPE = constants.HTYPE_INSTANCE
4481 _OP_REQP = ["instance_name", "disks", "disk_template",
4483 "wait_for_sync", "ip_check", "nics",
4484 "hvparams", "beparams"]
4487 def _ExpandNode(self, node):
4488 """Expands and checks one node name.
4491 node_full = self.cfg.ExpandNodeName(node)
4492 if node_full is None:
4493 raise errors.OpPrereqError("Unknown node %s" % node)
4496 def ExpandNames(self):
4497 """ExpandNames for CreateInstance.
4499 Figure out the right locks for instance creation.
4502 self.needed_locks = {}
4504 # set optional parameters to none if they don't exist
4505 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4506 if not hasattr(self.op, attr):
4507 setattr(self.op, attr, None)
4509 # cheap checks, mostly valid constants given
4511 # verify creation mode
4512 if self.op.mode not in (constants.INSTANCE_CREATE,
4513 constants.INSTANCE_IMPORT):
4514 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4517 # disk template and mirror node verification
4518 if self.op.disk_template not in constants.DISK_TEMPLATES:
4519 raise errors.OpPrereqError("Invalid disk template name")
4521 if self.op.hypervisor is None:
4522 self.op.hypervisor = self.cfg.GetHypervisorType()
4524 cluster = self.cfg.GetClusterInfo()
4525 enabled_hvs = cluster.enabled_hypervisors
4526 if self.op.hypervisor not in enabled_hvs:
4527 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4528 " cluster (%s)" % (self.op.hypervisor,
4529 ",".join(enabled_hvs)))
4531 # check hypervisor parameter syntax (locally)
4532 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4533 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4535 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4536 hv_type.CheckParameterSyntax(filled_hvp)
4537 self.hv_full = filled_hvp
4539 # fill and remember the beparams dict
4540 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4541 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4544 #### instance parameters check
4546 # instance name verification
4547 hostname1 = utils.HostInfo(self.op.instance_name)
4548 self.op.instance_name = instance_name = hostname1.name
4550 # this is just a preventive check, but someone might still add this
4551 # instance in the meantime, and creation will fail at lock-add time
4552 if instance_name in self.cfg.GetInstanceList():
4553 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4556 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4560 for idx, nic in enumerate(self.op.nics):
4561 nic_mode_req = nic.get("mode", None)
4562 nic_mode = nic_mode_req
4563 if nic_mode is None:
4564 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4566 # in routed mode, for the first nic, the default ip is 'auto'
4567 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4568 default_ip_mode = constants.VALUE_AUTO
4570 default_ip_mode = constants.VALUE_NONE
4572 # ip validity checks
4573 ip = nic.get("ip", default_ip_mode)
4574 if ip is None or ip.lower() == constants.VALUE_NONE:
4576 elif ip.lower() == constants.VALUE_AUTO:
4577 nic_ip = hostname1.ip
4579 if not utils.IsValidIP(ip):
4580 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4581 " like a valid IP" % ip)
4584 # TODO: check the ip for uniqueness !!
4585 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4586 raise errors.OpPrereqError("Routed nic mode requires an ip address")
4588 # MAC address verification
4589 mac = nic.get("mac", constants.VALUE_AUTO)
4590 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4591 if not utils.IsValidMac(mac.lower()):
4592 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4594 # bridge verification
4595 bridge = nic.get("bridge", None)
4596 link = nic.get("link", None)
4598 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
4599 " at the same time")
4600 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4601 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4607 nicparams[constants.NIC_MODE] = nic_mode_req
4609 nicparams[constants.NIC_LINK] = link
4611 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4613 objects.NIC.CheckParameterSyntax(check_params)
4614 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4616 # disk checks/pre-build
4618 for disk in self.op.disks:
4619 mode = disk.get("mode", constants.DISK_RDWR)
4620 if mode not in constants.DISK_ACCESS_SET:
4621 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4623 size = disk.get("size", None)
4625 raise errors.OpPrereqError("Missing disk size")
4629 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4630 self.disks.append({"size": size, "mode": mode})
4632 # used in CheckPrereq for ip ping check
4633 self.check_ip = hostname1.ip
4635 # file storage checks
4636 if (self.op.file_driver and
4637 not self.op.file_driver in constants.FILE_DRIVER):
4638 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4639 self.op.file_driver)
4641 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4642 raise errors.OpPrereqError("File storage directory path not absolute")
4644 ### Node/iallocator related checks
4645 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4646 raise errors.OpPrereqError("One and only one of iallocator and primary"
4647 " node must be given")
4649 if self.op.iallocator:
4650 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4652 self.op.pnode = self._ExpandNode(self.op.pnode)
4653 nodelist = [self.op.pnode]
4654 if self.op.snode is not None:
4655 self.op.snode = self._ExpandNode(self.op.snode)
4656 nodelist.append(self.op.snode)
4657 self.needed_locks[locking.LEVEL_NODE] = nodelist
4659 # in case of import lock the source node too
4660 if self.op.mode == constants.INSTANCE_IMPORT:
4661 src_node = getattr(self.op, "src_node", None)
4662 src_path = getattr(self.op, "src_path", None)
4664 if src_path is None:
4665 self.op.src_path = src_path = self.op.instance_name
4667 if src_node is None:
4668 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4669 self.op.src_node = None
4670 if os.path.isabs(src_path):
4671 raise errors.OpPrereqError("Importing an instance from an absolute"
4672 " path requires a source node option.")
4674 self.op.src_node = src_node = self._ExpandNode(src_node)
4675 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4676 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4677 if not os.path.isabs(src_path):
4678 self.op.src_path = src_path = \
4679 os.path.join(constants.EXPORT_DIR, src_path)
4681 else: # INSTANCE_CREATE
4682 if getattr(self.op, "os_type", None) is None:
4683 raise errors.OpPrereqError("No guest OS specified")
4685 def _RunAllocator(self):
4686 """Run the allocator based on input opcode.
4689 nics = [n.ToDict() for n in self.nics]
4690 ial = IAllocator(self.cfg, self.rpc,
4691 mode=constants.IALLOCATOR_MODE_ALLOC,
4692 name=self.op.instance_name,
4693 disk_template=self.op.disk_template,
4696 vcpus=self.be_full[constants.BE_VCPUS],
4697 mem_size=self.be_full[constants.BE_MEMORY],
4700 hypervisor=self.op.hypervisor,
4703 ial.Run(self.op.iallocator)
4706 raise errors.OpPrereqError("Can't compute nodes using"
4707 " iallocator '%s': %s" % (self.op.iallocator,
4709 if len(ial.nodes) != ial.required_nodes:
4710 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4711 " of nodes (%s), required %s" %
4712 (self.op.iallocator, len(ial.nodes),
4713 ial.required_nodes))
4714 self.op.pnode = ial.nodes[0]
4715 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4716 self.op.instance_name, self.op.iallocator,
4717 ", ".join(ial.nodes))
4718 if ial.required_nodes == 2:
4719 self.op.snode = ial.nodes[1]
4721 def BuildHooksEnv(self):
4724 This runs on master, primary and secondary nodes of the instance.
4728 "ADD_MODE": self.op.mode,
4730 if self.op.mode == constants.INSTANCE_IMPORT:
4731 env["SRC_NODE"] = self.op.src_node
4732 env["SRC_PATH"] = self.op.src_path
4733 env["SRC_IMAGES"] = self.src_images
4735 env.update(_BuildInstanceHookEnv(
4736 name=self.op.instance_name,
4737 primary_node=self.op.pnode,
4738 secondary_nodes=self.secondaries,
4739 status=self.op.start,
4740 os_type=self.op.os_type,
4741 memory=self.be_full[constants.BE_MEMORY],
4742 vcpus=self.be_full[constants.BE_VCPUS],
4743 nics=_NICListToTuple(self, self.nics),
4744 disk_template=self.op.disk_template,
4745 disks=[(d["size"], d["mode"]) for d in self.disks],
4748 hypervisor_name=self.op.hypervisor,
4751 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4756 def CheckPrereq(self):
4757 """Check prerequisites.
4760 if (not self.cfg.GetVGName() and
4761 self.op.disk_template not in constants.DTS_NOT_LVM):
4762 raise errors.OpPrereqError("Cluster does not support lvm-based"
4765 if self.op.mode == constants.INSTANCE_IMPORT:
4766 src_node = self.op.src_node
4767 src_path = self.op.src_path
4769 if src_node is None:
4770 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4771 exp_list = self.rpc.call_export_list(locked_nodes)
4773 for node in exp_list:
4774 if exp_list[node].fail_msg:
4776 if src_path in exp_list[node].payload:
4778 self.op.src_node = src_node = node
4779 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4783 raise errors.OpPrereqError("No export found for relative path %s" %
4786 _CheckNodeOnline(self, src_node)
4787 result = self.rpc.call_export_info(src_node, src_path)
4788 result.Raise("No export or invalid export found in dir %s" % src_path)
4790 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4791 if not export_info.has_section(constants.INISECT_EXP):
4792 raise errors.ProgrammerError("Corrupted export config")
4794 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4795 if (int(ei_version) != constants.EXPORT_VERSION):
4796 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4797 (ei_version, constants.EXPORT_VERSION))
4799 # Check that the new instance doesn't have less disks than the export
4800 instance_disks = len(self.disks)
4801 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4802 if instance_disks < export_disks:
4803 raise errors.OpPrereqError("Not enough disks to import."
4804 " (instance: %d, export: %d)" %
4805 (instance_disks, export_disks))
4807 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4809 for idx in range(export_disks):
4810 option = 'disk%d_dump' % idx
4811 if export_info.has_option(constants.INISECT_INS, option):
4812 # FIXME: are the old os-es, disk sizes, etc. useful?
4813 export_name = export_info.get(constants.INISECT_INS, option)
4814 image = os.path.join(src_path, export_name)
4815 disk_images.append(image)
4817 disk_images.append(False)
4819 self.src_images = disk_images
4821 old_name = export_info.get(constants.INISECT_INS, 'name')
4822 # FIXME: int() here could throw a ValueError on broken exports
4823 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4824 if self.op.instance_name == old_name:
4825 for idx, nic in enumerate(self.nics):
4826 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4827 nic_mac_ini = 'nic%d_mac' % idx
4828 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4830 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4831 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4832 if self.op.start and not self.op.ip_check:
4833 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4834 " adding an instance in start mode")
4836 if self.op.ip_check:
4837 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4838 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4839 (self.check_ip, self.op.instance_name))
4841 #### mac address generation
4842 # By generating here the mac address both the allocator and the hooks get
4843 # the real final mac address rather than the 'auto' or 'generate' value.
4844 # There is a race condition between the generation and the instance object
4845 # creation, which means that we know the mac is valid now, but we're not
4846 # sure it will be when we actually add the instance. If things go bad
4847 # adding the instance will abort because of a duplicate mac, and the
4848 # creation job will fail.
4849 for nic in self.nics:
4850 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4851 nic.mac = self.cfg.GenerateMAC()
4855 if self.op.iallocator is not None:
4856 self._RunAllocator()
4858 #### node related checks
4860 # check primary node
4861 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4862 assert self.pnode is not None, \
4863 "Cannot retrieve locked node %s" % self.op.pnode
4865 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4868 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4871 self.secondaries = []
4873 # mirror node verification
4874 if self.op.disk_template in constants.DTS_NET_MIRROR:
4875 if self.op.snode is None:
4876 raise errors.OpPrereqError("The networked disk templates need"
4878 if self.op.snode == pnode.name:
4879 raise errors.OpPrereqError("The secondary node cannot be"
4880 " the primary node.")
4881 _CheckNodeOnline(self, self.op.snode)
4882 _CheckNodeNotDrained(self, self.op.snode)
4883 self.secondaries.append(self.op.snode)
4885 nodenames = [pnode.name] + self.secondaries
4887 req_size = _ComputeDiskSize(self.op.disk_template,
4890 # Check lv size requirements
4891 if req_size is not None:
4892 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4894 for node in nodenames:
4895 info = nodeinfo[node]
4896 info.Raise("Cannot get current information from node %s" % node)
4898 vg_free = info.get('vg_free', None)
4899 if not isinstance(vg_free, int):
4900 raise errors.OpPrereqError("Can't compute free disk space on"
4902 if req_size > vg_free:
4903 raise errors.OpPrereqError("Not enough disk space on target node %s."
4904 " %d MB available, %d MB required" %
4905 (node, vg_free, req_size))
4907 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4910 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4911 result.Raise("OS '%s' not in supported os list for primary node %s" %
4912 (self.op.os_type, pnode.name), prereq=True)
4914 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4916 # memory check on primary node
4918 _CheckNodeFreeMemory(self, self.pnode.name,
4919 "creating instance %s" % self.op.instance_name,
4920 self.be_full[constants.BE_MEMORY],
4923 self.dry_run_result = list(nodenames)
4925 def Exec(self, feedback_fn):
4926 """Create and add the instance to the cluster.
4929 instance = self.op.instance_name
4930 pnode_name = self.pnode.name
4932 ht_kind = self.op.hypervisor
4933 if ht_kind in constants.HTS_REQ_PORT:
4934 network_port = self.cfg.AllocatePort()
4938 ##if self.op.vnc_bind_address is None:
4939 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4941 # this is needed because os.path.join does not accept None arguments
4942 if self.op.file_storage_dir is None:
4943 string_file_storage_dir = ""
4945 string_file_storage_dir = self.op.file_storage_dir
4947 # build the full file storage dir path
4948 file_storage_dir = os.path.normpath(os.path.join(
4949 self.cfg.GetFileStorageDir(),
4950 string_file_storage_dir, instance))
4953 disks = _GenerateDiskTemplate(self,
4954 self.op.disk_template,
4955 instance, pnode_name,
4959 self.op.file_driver,
4962 iobj = objects.Instance(name=instance, os=self.op.os_type,
4963 primary_node=pnode_name,
4964 nics=self.nics, disks=disks,
4965 disk_template=self.op.disk_template,
4967 network_port=network_port,
4968 beparams=self.op.beparams,
4969 hvparams=self.op.hvparams,
4970 hypervisor=self.op.hypervisor,
4973 feedback_fn("* creating instance disks...")
4975 _CreateDisks(self, iobj)
4976 except errors.OpExecError:
4977 self.LogWarning("Device creation failed, reverting...")
4979 _RemoveDisks(self, iobj)
4981 self.cfg.ReleaseDRBDMinors(instance)
4984 feedback_fn("adding instance %s to cluster config" % instance)
4986 self.cfg.AddInstance(iobj)
4987 # Declare that we don't want to remove the instance lock anymore, as we've
4988 # added the instance to the config
4989 del self.remove_locks[locking.LEVEL_INSTANCE]
4990 # Unlock all the nodes
4991 if self.op.mode == constants.INSTANCE_IMPORT:
4992 nodes_keep = [self.op.src_node]
4993 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4994 if node != self.op.src_node]
4995 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4996 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4998 self.context.glm.release(locking.LEVEL_NODE)
4999 del self.acquired_locks[locking.LEVEL_NODE]
5001 if self.op.wait_for_sync:
5002 disk_abort = not _WaitForSync(self, iobj)
5003 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5004 # make sure the disks are not degraded (still sync-ing is ok)
5006 feedback_fn("* checking mirrors status")
5007 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5012 _RemoveDisks(self, iobj)
5013 self.cfg.RemoveInstance(iobj.name)
5014 # Make sure the instance lock gets removed
5015 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5016 raise errors.OpExecError("There are some degraded disks for"
5019 feedback_fn("creating os for instance %s on node %s" %
5020 (instance, pnode_name))
5022 if iobj.disk_template != constants.DT_DISKLESS:
5023 if self.op.mode == constants.INSTANCE_CREATE:
5024 feedback_fn("* running the instance OS create scripts...")
5025 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5026 result.Raise("Could not add os for instance %s"
5027 " on node %s" % (instance, pnode_name))
5029 elif self.op.mode == constants.INSTANCE_IMPORT:
5030 feedback_fn("* running the instance OS import scripts...")
5031 src_node = self.op.src_node
5032 src_images = self.src_images
5033 cluster_name = self.cfg.GetClusterName()
5034 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5035 src_node, src_images,
5037 msg = import_result.fail_msg
5039 self.LogWarning("Error while importing the disk images for instance"
5040 " %s on node %s: %s" % (instance, pnode_name, msg))
5042 # also checked in the prereq part
5043 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5047 iobj.admin_up = True
5048 self.cfg.Update(iobj)
5049 logging.info("Starting instance %s on node %s", instance, pnode_name)
5050 feedback_fn("* starting instance...")
5051 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5052 result.Raise("Could not start instance")
5054 return list(iobj.all_nodes)
5057 class LUConnectConsole(NoHooksLU):
5058 """Connect to an instance's console.
5060 This is somewhat special in that it returns the command line that
5061 you need to run on the master node in order to connect to the
5065 _OP_REQP = ["instance_name"]
5068 def ExpandNames(self):
5069 self._ExpandAndLockInstance()
5071 def CheckPrereq(self):
5072 """Check prerequisites.
5074 This checks that the instance is in the cluster.
5077 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5078 assert self.instance is not None, \
5079 "Cannot retrieve locked instance %s" % self.op.instance_name
5080 _CheckNodeOnline(self, self.instance.primary_node)
5082 def Exec(self, feedback_fn):
5083 """Connect to the console of an instance
5086 instance = self.instance
5087 node = instance.primary_node
5089 node_insts = self.rpc.call_instance_list([node],
5090 [instance.hypervisor])[node]
5091 node_insts.Raise("Can't get node information from %s" % node)
5093 if instance.name not in node_insts.payload:
5094 raise errors.OpExecError("Instance %s is not running." % instance.name)
5096 logging.debug("Connecting to console of %s on %s", instance.name, node)
5098 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5099 cluster = self.cfg.GetClusterInfo()
5100 # beparams and hvparams are passed separately, to avoid editing the
5101 # instance and then saving the defaults in the instance itself.
5102 hvparams = cluster.FillHV(instance)
5103 beparams = cluster.FillBE(instance)
5104 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5107 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5110 class LUReplaceDisks(LogicalUnit):
5111 """Replace the disks of an instance.
5114 HPATH = "mirrors-replace"
5115 HTYPE = constants.HTYPE_INSTANCE
5116 _OP_REQP = ["instance_name", "mode", "disks"]
5119 def CheckArguments(self):
5120 if not hasattr(self.op, "remote_node"):
5121 self.op.remote_node = None
5122 if not hasattr(self.op, "iallocator"):
5123 self.op.iallocator = None
5125 _DiskReplacer.CheckArguments(self.op.mode, self.op.remote_node,
5128 def ExpandNames(self):
5129 self._ExpandAndLockInstance()
5131 if self.op.iallocator is not None:
5132 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5134 elif self.op.remote_node is not None:
5135 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5136 if remote_node is None:
5137 raise errors.OpPrereqError("Node '%s' not known" %
5138 self.op.remote_node)
5140 self.op.remote_node = remote_node
5142 # Warning: do not remove the locking of the new secondary here
5143 # unless DRBD8.AddChildren is changed to work in parallel;
5144 # currently it doesn't since parallel invocations of
5145 # FindUnusedMinor will conflict
5146 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5147 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5150 self.needed_locks[locking.LEVEL_NODE] = []
5151 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5153 self.replacer = _DiskReplacer(self, self.op.instance_name, self.op.mode,
5154 self.op.iallocator, self.op.remote_node,
5157 def DeclareLocks(self, level):
5158 # If we're not already locking all nodes in the set we have to declare the
5159 # instance's primary/secondary nodes.
5160 if (level == locking.LEVEL_NODE and
5161 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5162 self._LockInstancesNodes()
5164 def BuildHooksEnv(self):
5167 This runs on the master, the primary and all the secondaries.
5170 instance = self.replacer.instance
5172 "MODE": self.op.mode,
5173 "NEW_SECONDARY": self.op.remote_node,
5174 "OLD_SECONDARY": instance.secondary_nodes[0],
5176 env.update(_BuildInstanceHookEnvByObject(self, instance))
5178 self.cfg.GetMasterNode(),
5179 instance.primary_node,
5181 if self.op.remote_node is not None:
5182 nl.append(self.op.remote_node)
5185 def CheckPrereq(self):
5186 """Check prerequisites.
5188 This checks that the instance is in the cluster.
5191 self.replacer.CheckPrereq()
5193 def Exec(self, feedback_fn):
5194 """Execute disk replacement.
5196 This dispatches the disk replacement to the appropriate handler.
5199 self.replacer.Exec()
5202 class _DiskReplacer:
5203 """Replaces disks for an instance.
5205 Note: Locking is not within the scope of this class.
5208 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
5210 """Initializes this class.
5215 self.instance_name = instance_name
5217 self.iallocator_name = iallocator_name
5218 self.remote_node = remote_node
5226 self.instance = None
5227 self.new_node = None
5228 self.target_node = None
5229 self.other_node = None
5230 self.remote_node_info = None
5231 self.node_secondary_ip = None
5234 def CheckArguments(mode, remote_node, iallocator):
5235 # check for valid parameter combination
5236 cnt = [remote_node, iallocator].count(None)
5237 if mode == constants.REPLACE_DISK_CHG:
5239 raise errors.OpPrereqError("When changing the secondary either an"
5240 " iallocator script must be used or the"
5243 raise errors.OpPrereqError("Give either the iallocator or the new"
5244 " secondary, not both")
5245 else: # not replacing the secondary
5247 raise errors.OpPrereqError("The iallocator and new node options can"
5248 " be used only when changing the"
5252 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
5253 """Compute a new secondary node using an IAllocator.
5256 ial = IAllocator(lu.cfg, lu.rpc,
5257 mode=constants.IALLOCATOR_MODE_RELOC,
5259 relocate_from=relocate_from)
5261 ial.Run(iallocator_name)
5264 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
5265 " %s" % (iallocator_name, ial.info))
5267 if len(ial.nodes) != ial.required_nodes:
5268 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5269 " of nodes (%s), required %s" %
5270 (len(ial.nodes), ial.required_nodes))
5272 remote_node_name = ial.nodes[0]
5274 lu.LogInfo("Selected new secondary for instance '%s': %s",
5275 instance_name, remote_node_name)
5277 return remote_node_name
5279 def CheckPrereq(self):
5280 """Check prerequisites.
5282 This checks that the instance is in the cluster.
5285 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
5286 assert self.instance is not None, \
5287 "Cannot retrieve locked instance %s" % self.instance_name
5289 if self.instance.disk_template != constants.DT_DRBD8:
5290 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5293 if len(self.instance.secondary_nodes) != 1:
5294 raise errors.OpPrereqError("The instance has a strange layout,"
5295 " expected one secondary but found %d" %
5296 len(self.instance.secondary_nodes))
5298 secondary_node = self.instance.secondary_nodes[0]
5300 if self.iallocator_name is None:
5301 remote_node = self.remote_node
5303 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
5304 self.instance.name, secondary_node)
5306 if remote_node is not None:
5307 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5308 assert self.remote_node_info is not None, \
5309 "Cannot retrieve locked node %s" % remote_node
5311 self.remote_node_info = None
5313 if remote_node == self.instance.primary_node:
5314 raise errors.OpPrereqError("The specified node is the primary node of"
5317 if remote_node == secondary_node:
5318 raise errors.OpPrereqError("The specified node is already the"
5319 " secondary node of the instance.")
5321 if self.mode == constants.REPLACE_DISK_PRI:
5322 self.target_node = self.instance.primary_node
5323 self.other_node = secondary_node
5324 check_nodes = [self.target_node, self.other_node]
5326 elif self.mode == constants.REPLACE_DISK_SEC:
5327 self.target_node = secondary_node
5328 self.other_node = self.instance.primary_node
5329 check_nodes = [self.target_node, self.other_node]
5331 elif self.mode == constants.REPLACE_DISK_CHG:
5332 self.new_node = remote_node
5333 self.other_node = self.instance.primary_node
5334 self.target_node = secondary_node
5335 check_nodes = [self.new_node, self.other_node]
5337 _CheckNodeNotDrained(self.lu, remote_node)
5340 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
5343 for node in check_nodes:
5344 _CheckNodeOnline(self.lu, node)
5346 # If not specified all disks should be replaced
5348 self.disks = range(len(self.instance.disks))
5350 # Check whether disks are valid
5351 for disk_idx in self.disks:
5352 self.instance.FindDisk(disk_idx)
5354 # Get secondary node IP addresses
5357 for node_name in [self.target_node, self.other_node, self.new_node]:
5358 if node_name is not None:
5359 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
5361 self.node_secondary_ip = node_2nd_ip
5364 """Execute disk replacement.
5366 This dispatches the disk replacement to the appropriate handler.
5369 activate_disks = (not self.instance.admin_up)
5371 # Activate the instance disks if we're replacing them on a down instance
5373 _StartInstanceDisks(self.lu, self.instance, True)
5376 if self.mode == constants.REPLACE_DISK_CHG:
5377 return self._ExecDrbd8Secondary()
5379 return self._ExecDrbd8DiskOnly()
5382 # Deactivate the instance disks if we're replacing them on a down instance
5384 _SafeShutdownInstanceDisks(self.lu, self.instance)
5386 def _CheckVolumeGroup(self, nodes):
5387 self.lu.LogInfo("Checking volume groups")
5389 vgname = self.cfg.GetVGName()
5391 # Make sure volume group exists on all involved nodes
5392 results = self.rpc.call_vg_list(nodes)
5394 raise errors.OpExecError("Can't list volume groups on the nodes")
5398 res.Raise("Error checking node %s" % node)
5399 if vgname not in res.payload:
5400 raise errors.OpExecError("Volume group '%s' not found on node %s" %
5403 def _CheckDisksExistence(self, nodes):
5404 # Check disk existence
5405 for idx, dev in enumerate(self.instance.disks):
5406 if idx not in self.disks:
5410 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
5411 self.cfg.SetDiskID(dev, node)
5413 result = self.rpc.call_blockdev_find(node, dev)
5415 msg = result.fail_msg
5416 if msg or not result.payload:
5418 msg = "disk not found"
5419 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5422 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
5423 for idx, dev in enumerate(self.instance.disks):
5424 if idx not in self.disks:
5427 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
5430 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
5432 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
5433 " replace disks for instance %s" %
5434 (node_name, self.instance.name))
5436 def _CreateNewStorage(self, node_name):
5437 vgname = self.cfg.GetVGName()
5440 for idx, dev in enumerate(self.instance.disks):
5441 if idx not in self.disks:
5444 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
5446 self.cfg.SetDiskID(dev, node_name)
5448 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
5449 names = _GenerateUniqueNames(self.lu, lv_names)
5451 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
5452 logical_id=(vgname, names[0]))
5453 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5454 logical_id=(vgname, names[1]))
5456 new_lvs = [lv_data, lv_meta]
5457 old_lvs = dev.children
5458 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5460 # we pass force_create=True to force the LVM creation
5461 for new_lv in new_lvs:
5462 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
5463 _GetInstanceInfoText(self.instance), False)
5467 def _CheckDevices(self, node_name, iv_names):
5468 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5469 self.cfg.SetDiskID(dev, node_name)
5471 result = self.rpc.call_blockdev_find(node_name, dev)
5473 msg = result.fail_msg
5474 if msg or not result.payload:
5476 msg = "disk not found"
5477 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5480 if result.payload[5]:
5481 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5483 def _RemoveOldStorage(self, node_name, iv_names):
5484 for name, (dev, old_lvs, _) in iv_names.iteritems():
5485 self.lu.LogInfo("Remove logical volumes for %s" % name)
5488 self.cfg.SetDiskID(lv, node_name)
5490 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
5492 self.lu.LogWarning("Can't remove old LV: %s" % msg,
5493 hint="remove unused LVs manually")
5495 def _ExecDrbd8DiskOnly(self):
5496 """Replace a disk on the primary or secondary for DRBD 8.
5498 The algorithm for replace is quite complicated:
5500 1. for each disk to be replaced:
5502 1. create new LVs on the target node with unique names
5503 1. detach old LVs from the drbd device
5504 1. rename old LVs to name_replaced.<time_t>
5505 1. rename new LVs to old LVs
5506 1. attach the new LVs (with the old names now) to the drbd device
5508 1. wait for sync across all devices
5510 1. for each modified disk:
5512 1. remove old LVs (which have the name name_replaces.<time_t>)
5514 Failures are not very well handled.
5519 # Step: check device activation
5520 self.lu.LogStep(1, steps_total, "Check device existence")
5521 self._CheckDisksExistence([self.other_node, self.target_node])
5522 self._CheckVolumeGroup([self.target_node, self.other_node])
5524 # Step: check other node consistency
5525 self.lu.LogStep(2, steps_total, "Check peer consistency")
5526 self._CheckDisksConsistency(self.other_node,
5527 self.other_node == self.instance.primary_node,
5530 # Step: create new storage
5531 self.lu.LogStep(3, steps_total, "Allocate new storage")
5532 iv_names = self._CreateNewStorage(self.target_node)
5534 # Step: for each lv, detach+rename*2+attach
5535 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
5536 for dev, old_lvs, new_lvs in iv_names.itervalues():
5537 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
5539 result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs)
5540 result.Raise("Can't detach drbd from local storage on node"
5541 " %s for device %s" % (self.target_node, dev.iv_name))
5543 #cfg.Update(instance)
5545 # ok, we created the new LVs, so now we know we have the needed
5546 # storage; as such, we proceed on the target node to rename
5547 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5548 # using the assumption that logical_id == physical_id (which in
5549 # turn is the unique_id on that node)
5551 # FIXME(iustin): use a better name for the replaced LVs
5552 temp_suffix = int(time.time())
5553 ren_fn = lambda d, suff: (d.physical_id[0],
5554 d.physical_id[1] + "_replaced-%s" % suff)
5556 # Build the rename list based on what LVs exist on the node
5557 rename_old_to_new = []
5558 for to_ren in old_lvs:
5559 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
5560 if not result.fail_msg and result.payload:
5562 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
5564 self.lu.LogInfo("Renaming the old LVs on the target node")
5565 result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new)
5566 result.Raise("Can't rename old LVs on node %s" % self.target_node)
5568 # Now we rename the new LVs to the old LVs
5569 self.lu.LogInfo("Renaming the new LVs on the target node")
5570 rename_new_to_old = [(new, old.physical_id)
5571 for old, new in zip(old_lvs, new_lvs)]
5572 result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old)
5573 result.Raise("Can't rename new LVs on node %s" % self.target_node)
5575 for old, new in zip(old_lvs, new_lvs):
5576 new.logical_id = old.logical_id
5577 self.cfg.SetDiskID(new, self.target_node)
5579 for disk in old_lvs:
5580 disk.logical_id = ren_fn(disk, temp_suffix)
5581 self.cfg.SetDiskID(disk, self.target_node)
5583 # Now that the new lvs have the old name, we can add them to the device
5584 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
5585 result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs)
5586 msg = result.fail_msg
5588 for new_lv in new_lvs:
5589 msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg
5591 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
5592 hint=("cleanup manually the unused logical"
5594 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5596 dev.children = new_lvs
5598 self.cfg.Update(self.instance)
5601 # This can fail as the old devices are degraded and _WaitForSync
5602 # does a combined result over all disks, so we don't check its return value
5603 self.lu.LogStep(5, steps_total, "Sync devices")
5604 _WaitForSync(self.lu, self.instance, unlock=True)
5606 # Check all devices manually
5607 self._CheckDevices(self.instance.primary_node, iv_names)
5609 # Step: remove old storage
5610 self.lu.LogStep(6, steps_total, "Removing old storage")
5611 self._RemoveOldStorage(self.target_node, iv_names)
5613 def _ExecDrbd8Secondary(self):
5614 """Replace the secondary node for DRBD 8.
5616 The algorithm for replace is quite complicated:
5617 - for all disks of the instance:
5618 - create new LVs on the new node with same names
5619 - shutdown the drbd device on the old secondary
5620 - disconnect the drbd network on the primary
5621 - create the drbd device on the new secondary
5622 - network attach the drbd on the primary, using an artifice:
5623 the drbd code for Attach() will connect to the network if it
5624 finds a device which is connected to the good local disks but
5626 - wait for sync across all devices
5627 - remove all disks from the old secondary
5629 Failures are not very well handled.
5634 # Step: check device activation
5635 self.lu.LogStep(1, steps_total, "Check device existence")
5636 self._CheckDisksExistence([self.instance.primary_node])
5637 self._CheckVolumeGroup([self.instance.primary_node])
5639 # Step: check other node consistency
5640 self.lu.LogStep(2, steps_total, "Check peer consistency")
5641 self._CheckDisksConsistency(self.instance.primary_node, True, True)
5643 # Step: create new storage
5644 self.lu.LogStep(3, steps_total, "Allocate new storage")
5645 for idx, dev in enumerate(self.instance.disks):
5646 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
5647 (self.new_node, idx))
5648 # we pass force_create=True to force LVM creation
5649 for new_lv in dev.children:
5650 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
5651 _GetInstanceInfoText(self.instance), False)
5653 # Step 4: dbrd minors and drbd setups changes
5654 # after this, we must manually remove the drbd minors on both the
5655 # error and the success paths
5656 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
5657 minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks],
5659 logging.debug("Allocated minors %r" % (minors,))
5662 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
5663 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx))
5664 # create new devices on new_node; note that we create two IDs:
5665 # one without port, so the drbd will be activated without
5666 # networking information on the new node at this stage, and one
5667 # with network, for the latter activation in step 4
5668 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5669 if self.instance.primary_node == o_node1:
5674 new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret)
5675 new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret)
5677 iv_names[idx] = (dev, dev.children, new_net_id)
5678 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5680 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5681 logical_id=new_alone_id,
5682 children=dev.children,
5685 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
5686 _GetInstanceInfoText(self.instance), False)
5687 except errors.GenericError:
5688 self.cfg.ReleaseDRBDMinors(self.instance.name)
5691 # We have new devices, shutdown the drbd on the old secondary
5692 for idx, dev in enumerate(self.instance.disks):
5693 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
5694 self.cfg.SetDiskID(dev, self.target_node)
5695 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
5697 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
5698 "node: %s" % (idx, msg),
5699 hint=("Please cleanup this device manually as"
5700 " soon as possible"))
5702 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
5703 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip,
5704 self.instance.disks)[self.instance.primary_node]
5706 msg = result.fail_msg
5708 # detaches didn't succeed (unlikely)
5709 self.cfg.ReleaseDRBDMinors(self.instance.name)
5710 raise errors.OpExecError("Can't detach the disks from the network on"
5711 " old node: %s" % (msg,))
5713 # if we managed to detach at least one, we update all the disks of
5714 # the instance to point to the new secondary
5715 self.lu.LogInfo("Updating instance configuration")
5716 for dev, _, new_logical_id in iv_names.itervalues():
5717 dev.logical_id = new_logical_id
5718 self.cfg.SetDiskID(dev, self.instance.primary_node)
5720 self.cfg.Update(self.instance)
5722 # and now perform the drbd attach
5723 self.lu.LogInfo("Attaching primary drbds to new secondary"
5724 " (standalone => connected)")
5725 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip,
5726 self.instance.disks, self.instance.name,
5728 for to_node, to_result in result.items():
5729 msg = to_result.fail_msg
5731 self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg,
5732 hint=("please do a gnt-instance info to see the"
5733 " status of disks"))
5736 # This can fail as the old devices are degraded and _WaitForSync
5737 # does a combined result over all disks, so we don't check its return value
5738 self.lu.LogStep(5, steps_total, "Sync devices")
5739 _WaitForSync(self.lu, self.instance, unlock=True)
5741 # Check all devices manually
5742 self._CheckDevices(self.instance.primary_node, iv_names)
5744 # Step: remove old storage
5745 self.lu.LogStep(6, steps_total, "Removing old storage")
5746 self._RemoveOldStorage(self.target_node, iv_names)
5749 class LUGrowDisk(LogicalUnit):
5750 """Grow a disk of an instance.
5754 HTYPE = constants.HTYPE_INSTANCE
5755 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5758 def ExpandNames(self):
5759 self._ExpandAndLockInstance()
5760 self.needed_locks[locking.LEVEL_NODE] = []
5761 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5763 def DeclareLocks(self, level):
5764 if level == locking.LEVEL_NODE:
5765 self._LockInstancesNodes()
5767 def BuildHooksEnv(self):
5770 This runs on the master, the primary and all the secondaries.
5774 "DISK": self.op.disk,
5775 "AMOUNT": self.op.amount,
5777 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5779 self.cfg.GetMasterNode(),
5780 self.instance.primary_node,
5784 def CheckPrereq(self):
5785 """Check prerequisites.
5787 This checks that the instance is in the cluster.
5790 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5791 assert instance is not None, \
5792 "Cannot retrieve locked instance %s" % self.op.instance_name
5793 nodenames = list(instance.all_nodes)
5794 for node in nodenames:
5795 _CheckNodeOnline(self, node)
5798 self.instance = instance
5800 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5801 raise errors.OpPrereqError("Instance's disk layout does not support"
5804 self.disk = instance.FindDisk(self.op.disk)
5806 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5807 instance.hypervisor)
5808 for node in nodenames:
5809 info = nodeinfo[node]
5810 info.Raise("Cannot get current information from node %s" % node)
5811 vg_free = info.payload.get('vg_free', None)
5812 if not isinstance(vg_free, int):
5813 raise errors.OpPrereqError("Can't compute free disk space on"
5815 if self.op.amount > vg_free:
5816 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5817 " %d MiB available, %d MiB required" %
5818 (node, vg_free, self.op.amount))
5820 def Exec(self, feedback_fn):
5821 """Execute disk grow.
5824 instance = self.instance
5826 for node in instance.all_nodes:
5827 self.cfg.SetDiskID(disk, node)
5828 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5829 result.Raise("Grow request failed to node %s" % node)
5830 disk.RecordGrow(self.op.amount)
5831 self.cfg.Update(instance)
5832 if self.op.wait_for_sync:
5833 disk_abort = not _WaitForSync(self, instance)
5835 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5836 " status.\nPlease check the instance.")
5839 class LUQueryInstanceData(NoHooksLU):
5840 """Query runtime instance data.
5843 _OP_REQP = ["instances", "static"]
5846 def ExpandNames(self):
5847 self.needed_locks = {}
5848 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
5850 if not isinstance(self.op.instances, list):
5851 raise errors.OpPrereqError("Invalid argument type 'instances'")
5853 if self.op.instances:
5854 self.wanted_names = []
5855 for name in self.op.instances:
5856 full_name = self.cfg.ExpandInstanceName(name)
5857 if full_name is None:
5858 raise errors.OpPrereqError("Instance '%s' not known" % name)
5859 self.wanted_names.append(full_name)
5860 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5862 self.wanted_names = None
5863 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5865 self.needed_locks[locking.LEVEL_NODE] = []
5866 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5868 def DeclareLocks(self, level):
5869 if level == locking.LEVEL_NODE:
5870 self._LockInstancesNodes()
5872 def CheckPrereq(self):
5873 """Check prerequisites.
5875 This only checks the optional instance list against the existing names.
5878 if self.wanted_names is None:
5879 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5881 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5882 in self.wanted_names]
5885 def _ComputeDiskStatus(self, instance, snode, dev):
5886 """Compute block device status.
5889 static = self.op.static
5891 self.cfg.SetDiskID(dev, instance.primary_node)
5892 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5893 if dev_pstatus.offline:
5896 dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5897 dev_pstatus = dev_pstatus.payload
5901 if dev.dev_type in constants.LDS_DRBD:
5902 # we change the snode then (otherwise we use the one passed in)
5903 if dev.logical_id[0] == instance.primary_node:
5904 snode = dev.logical_id[1]
5906 snode = dev.logical_id[0]
5908 if snode and not static:
5909 self.cfg.SetDiskID(dev, snode)
5910 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5911 if dev_sstatus.offline:
5914 dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5915 dev_sstatus = dev_sstatus.payload
5920 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5921 for child in dev.children]
5926 "iv_name": dev.iv_name,
5927 "dev_type": dev.dev_type,
5928 "logical_id": dev.logical_id,
5929 "physical_id": dev.physical_id,
5930 "pstatus": dev_pstatus,
5931 "sstatus": dev_sstatus,
5932 "children": dev_children,
5939 def Exec(self, feedback_fn):
5940 """Gather and return data"""
5943 cluster = self.cfg.GetClusterInfo()
5945 for instance in self.wanted_instances:
5946 if not self.op.static:
5947 remote_info = self.rpc.call_instance_info(instance.primary_node,
5949 instance.hypervisor)
5950 remote_info.Raise("Error checking node %s" % instance.primary_node)
5951 remote_info = remote_info.payload
5952 if remote_info and "state" in remote_info:
5955 remote_state = "down"
5958 if instance.admin_up:
5961 config_state = "down"
5963 disks = [self._ComputeDiskStatus(instance, None, device)
5964 for device in instance.disks]
5967 "name": instance.name,
5968 "config_state": config_state,
5969 "run_state": remote_state,
5970 "pnode": instance.primary_node,
5971 "snodes": instance.secondary_nodes,
5973 # this happens to be the same format used for hooks
5974 "nics": _NICListToTuple(self, instance.nics),
5976 "hypervisor": instance.hypervisor,
5977 "network_port": instance.network_port,
5978 "hv_instance": instance.hvparams,
5979 "hv_actual": cluster.FillHV(instance),
5980 "be_instance": instance.beparams,
5981 "be_actual": cluster.FillBE(instance),
5984 result[instance.name] = idict
5989 class LUSetInstanceParams(LogicalUnit):
5990 """Modifies an instances's parameters.
5993 HPATH = "instance-modify"
5994 HTYPE = constants.HTYPE_INSTANCE
5995 _OP_REQP = ["instance_name"]
5998 def CheckArguments(self):
5999 if not hasattr(self.op, 'nics'):
6001 if not hasattr(self.op, 'disks'):
6003 if not hasattr(self.op, 'beparams'):
6004 self.op.beparams = {}
6005 if not hasattr(self.op, 'hvparams'):
6006 self.op.hvparams = {}
6007 self.op.force = getattr(self.op, "force", False)
6008 if not (self.op.nics or self.op.disks or
6009 self.op.hvparams or self.op.beparams):
6010 raise errors.OpPrereqError("No changes submitted")
6014 for disk_op, disk_dict in self.op.disks:
6015 if disk_op == constants.DDM_REMOVE:
6018 elif disk_op == constants.DDM_ADD:
6021 if not isinstance(disk_op, int):
6022 raise errors.OpPrereqError("Invalid disk index")
6023 if not isinstance(disk_dict, dict):
6024 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
6025 raise errors.OpPrereqError(msg)
6027 if disk_op == constants.DDM_ADD:
6028 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
6029 if mode not in constants.DISK_ACCESS_SET:
6030 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
6031 size = disk_dict.get('size', None)
6033 raise errors.OpPrereqError("Required disk parameter size missing")
6036 except ValueError, err:
6037 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
6039 disk_dict['size'] = size
6041 # modification of disk
6042 if 'size' in disk_dict:
6043 raise errors.OpPrereqError("Disk size change not possible, use"
6046 if disk_addremove > 1:
6047 raise errors.OpPrereqError("Only one disk add or remove operation"
6048 " supported at a time")
6052 for nic_op, nic_dict in self.op.nics:
6053 if nic_op == constants.DDM_REMOVE:
6056 elif nic_op == constants.DDM_ADD:
6059 if not isinstance(nic_op, int):
6060 raise errors.OpPrereqError("Invalid nic index")
6061 if not isinstance(nic_dict, dict):
6062 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
6063 raise errors.OpPrereqError(msg)
6065 # nic_dict should be a dict
6066 nic_ip = nic_dict.get('ip', None)
6067 if nic_ip is not None:
6068 if nic_ip.lower() == constants.VALUE_NONE:
6069 nic_dict['ip'] = None
6071 if not utils.IsValidIP(nic_ip):
6072 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
6074 nic_bridge = nic_dict.get('bridge', None)
6075 nic_link = nic_dict.get('link', None)
6076 if nic_bridge and nic_link:
6077 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
6078 " at the same time")
6079 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
6080 nic_dict['bridge'] = None
6081 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
6082 nic_dict['link'] = None
6084 if nic_op == constants.DDM_ADD:
6085 nic_mac = nic_dict.get('mac', None)
6087 nic_dict['mac'] = constants.VALUE_AUTO
6089 if 'mac' in nic_dict:
6090 nic_mac = nic_dict['mac']
6091 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6092 if not utils.IsValidMac(nic_mac):
6093 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6094 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6095 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6096 " modifying an existing nic")
6098 if nic_addremove > 1:
6099 raise errors.OpPrereqError("Only one NIC add or remove operation"
6100 " supported at a time")
6102 def ExpandNames(self):
6103 self._ExpandAndLockInstance()
6104 self.needed_locks[locking.LEVEL_NODE] = []
6105 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6107 def DeclareLocks(self, level):
6108 if level == locking.LEVEL_NODE:
6109 self._LockInstancesNodes()
6111 def BuildHooksEnv(self):
6114 This runs on the master, primary and secondaries.
6118 if constants.BE_MEMORY in self.be_new:
6119 args['memory'] = self.be_new[constants.BE_MEMORY]
6120 if constants.BE_VCPUS in self.be_new:
6121 args['vcpus'] = self.be_new[constants.BE_VCPUS]
6122 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6123 # information at all.
6126 nic_override = dict(self.op.nics)
6127 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6128 for idx, nic in enumerate(self.instance.nics):
6129 if idx in nic_override:
6130 this_nic_override = nic_override[idx]
6132 this_nic_override = {}
6133 if 'ip' in this_nic_override:
6134 ip = this_nic_override['ip']
6137 if 'mac' in this_nic_override:
6138 mac = this_nic_override['mac']
6141 if idx in self.nic_pnew:
6142 nicparams = self.nic_pnew[idx]
6144 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6145 mode = nicparams[constants.NIC_MODE]
6146 link = nicparams[constants.NIC_LINK]
6147 args['nics'].append((ip, mac, mode, link))
6148 if constants.DDM_ADD in nic_override:
6149 ip = nic_override[constants.DDM_ADD].get('ip', None)
6150 mac = nic_override[constants.DDM_ADD]['mac']
6151 nicparams = self.nic_pnew[constants.DDM_ADD]
6152 mode = nicparams[constants.NIC_MODE]
6153 link = nicparams[constants.NIC_LINK]
6154 args['nics'].append((ip, mac, mode, link))
6155 elif constants.DDM_REMOVE in nic_override:
6156 del args['nics'][-1]
6158 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6159 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6162 def _GetUpdatedParams(self, old_params, update_dict,
6163 default_values, parameter_types):
6164 """Return the new params dict for the given params.
6166 @type old_params: dict
6167 @param old_params: old parameters
6168 @type update_dict: dict
6169 @param update_dict: dict containing new parameter values,
6170 or constants.VALUE_DEFAULT to reset the
6171 parameter to its default value
6172 @type default_values: dict
6173 @param default_values: default values for the filled parameters
6174 @type parameter_types: dict
6175 @param parameter_types: dict mapping target dict keys to types
6176 in constants.ENFORCEABLE_TYPES
6177 @rtype: (dict, dict)
6178 @return: (new_parameters, filled_parameters)
6181 params_copy = copy.deepcopy(old_params)
6182 for key, val in update_dict.iteritems():
6183 if val == constants.VALUE_DEFAULT:
6185 del params_copy[key]
6189 params_copy[key] = val
6190 utils.ForceDictType(params_copy, parameter_types)
6191 params_filled = objects.FillDict(default_values, params_copy)
6192 return (params_copy, params_filled)
6194 def CheckPrereq(self):
6195 """Check prerequisites.
6197 This only checks the instance list against the existing names.
6200 self.force = self.op.force
6202 # checking the new params on the primary/secondary nodes
6204 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6205 cluster = self.cluster = self.cfg.GetClusterInfo()
6206 assert self.instance is not None, \
6207 "Cannot retrieve locked instance %s" % self.op.instance_name
6208 pnode = instance.primary_node
6209 nodelist = list(instance.all_nodes)
6211 # hvparams processing
6212 if self.op.hvparams:
6213 i_hvdict, hv_new = self._GetUpdatedParams(
6214 instance.hvparams, self.op.hvparams,
6215 cluster.hvparams[instance.hypervisor],
6216 constants.HVS_PARAMETER_TYPES)
6218 hypervisor.GetHypervisor(
6219 instance.hypervisor).CheckParameterSyntax(hv_new)
6220 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6221 self.hv_new = hv_new # the new actual values
6222 self.hv_inst = i_hvdict # the new dict (without defaults)
6224 self.hv_new = self.hv_inst = {}
6226 # beparams processing
6227 if self.op.beparams:
6228 i_bedict, be_new = self._GetUpdatedParams(
6229 instance.beparams, self.op.beparams,
6230 cluster.beparams[constants.PP_DEFAULT],
6231 constants.BES_PARAMETER_TYPES)
6232 self.be_new = be_new # the new actual values
6233 self.be_inst = i_bedict # the new dict (without defaults)
6235 self.be_new = self.be_inst = {}
6239 if constants.BE_MEMORY in self.op.beparams and not self.force:
6240 mem_check_list = [pnode]
6241 if be_new[constants.BE_AUTO_BALANCE]:
6242 # either we changed auto_balance to yes or it was from before
6243 mem_check_list.extend(instance.secondary_nodes)
6244 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6245 instance.hypervisor)
6246 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6247 instance.hypervisor)
6248 pninfo = nodeinfo[pnode]
6249 msg = pninfo.fail_msg
6251 # Assume the primary node is unreachable and go ahead
6252 self.warn.append("Can't get info from primary node %s: %s" %
6254 elif not isinstance(pninfo.payload.get('memory_free', None), int):
6255 self.warn.append("Node data from primary node %s doesn't contain"
6256 " free memory information" % pnode)
6257 elif instance_info.fail_msg:
6258 self.warn.append("Can't get instance runtime information: %s" %
6259 instance_info.fail_msg)
6261 if instance_info.payload:
6262 current_mem = int(instance_info.payload['memory'])
6264 # Assume instance not running
6265 # (there is a slight race condition here, but it's not very probable,
6266 # and we have no other way to check)
6268 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6269 pninfo.payload['memory_free'])
6271 raise errors.OpPrereqError("This change will prevent the instance"
6272 " from starting, due to %d MB of memory"
6273 " missing on its primary node" % miss_mem)
6275 if be_new[constants.BE_AUTO_BALANCE]:
6276 for node, nres in nodeinfo.items():
6277 if node not in instance.secondary_nodes:
6281 self.warn.append("Can't get info from secondary node %s: %s" %
6283 elif not isinstance(nres.payload.get('memory_free', None), int):
6284 self.warn.append("Secondary node %s didn't return free"
6285 " memory information" % node)
6286 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6287 self.warn.append("Not enough memory to failover instance to"
6288 " secondary node %s" % node)
6293 for nic_op, nic_dict in self.op.nics:
6294 if nic_op == constants.DDM_REMOVE:
6295 if not instance.nics:
6296 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6298 if nic_op != constants.DDM_ADD:
6300 if nic_op < 0 or nic_op >= len(instance.nics):
6301 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6303 (nic_op, len(instance.nics)))
6304 old_nic_params = instance.nics[nic_op].nicparams
6305 old_nic_ip = instance.nics[nic_op].ip
6310 update_params_dict = dict([(key, nic_dict[key])
6311 for key in constants.NICS_PARAMETERS
6312 if key in nic_dict])
6314 if 'bridge' in nic_dict:
6315 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6317 new_nic_params, new_filled_nic_params = \
6318 self._GetUpdatedParams(old_nic_params, update_params_dict,
6319 cluster.nicparams[constants.PP_DEFAULT],
6320 constants.NICS_PARAMETER_TYPES)
6321 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6322 self.nic_pinst[nic_op] = new_nic_params
6323 self.nic_pnew[nic_op] = new_filled_nic_params
6324 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6326 if new_nic_mode == constants.NIC_MODE_BRIDGED:
6327 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6328 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6330 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6332 self.warn.append(msg)
6334 raise errors.OpPrereqError(msg)
6335 if new_nic_mode == constants.NIC_MODE_ROUTED:
6336 if 'ip' in nic_dict:
6337 nic_ip = nic_dict['ip']
6341 raise errors.OpPrereqError('Cannot set the nic ip to None'
6343 if 'mac' in nic_dict:
6344 nic_mac = nic_dict['mac']
6346 raise errors.OpPrereqError('Cannot set the nic mac to None')
6347 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6348 # otherwise generate the mac
6349 nic_dict['mac'] = self.cfg.GenerateMAC()
6351 # or validate/reserve the current one
6352 if self.cfg.IsMacInUse(nic_mac):
6353 raise errors.OpPrereqError("MAC address %s already in use"
6354 " in cluster" % nic_mac)
6357 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6358 raise errors.OpPrereqError("Disk operations not supported for"
6359 " diskless instances")
6360 for disk_op, disk_dict in self.op.disks:
6361 if disk_op == constants.DDM_REMOVE:
6362 if len(instance.disks) == 1:
6363 raise errors.OpPrereqError("Cannot remove the last disk of"
6365 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6366 ins_l = ins_l[pnode]
6367 msg = ins_l.fail_msg
6369 raise errors.OpPrereqError("Can't contact node %s: %s" %
6371 if instance.name in ins_l.payload:
6372 raise errors.OpPrereqError("Instance is running, can't remove"
6375 if (disk_op == constants.DDM_ADD and
6376 len(instance.nics) >= constants.MAX_DISKS):
6377 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6378 " add more" % constants.MAX_DISKS)
6379 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6381 if disk_op < 0 or disk_op >= len(instance.disks):
6382 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6384 (disk_op, len(instance.disks)))
6388 def Exec(self, feedback_fn):
6389 """Modifies an instance.
6391 All parameters take effect only at the next restart of the instance.
6394 # Process here the warnings from CheckPrereq, as we don't have a
6395 # feedback_fn there.
6396 for warn in self.warn:
6397 feedback_fn("WARNING: %s" % warn)
6400 instance = self.instance
6401 cluster = self.cluster
6403 for disk_op, disk_dict in self.op.disks:
6404 if disk_op == constants.DDM_REMOVE:
6405 # remove the last disk
6406 device = instance.disks.pop()
6407 device_idx = len(instance.disks)
6408 for node, disk in device.ComputeNodeTree(instance.primary_node):
6409 self.cfg.SetDiskID(disk, node)
6410 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6412 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6413 " continuing anyway", device_idx, node, msg)
6414 result.append(("disk/%d" % device_idx, "remove"))
6415 elif disk_op == constants.DDM_ADD:
6417 if instance.disk_template == constants.DT_FILE:
6418 file_driver, file_path = instance.disks[0].logical_id
6419 file_path = os.path.dirname(file_path)
6421 file_driver = file_path = None
6422 disk_idx_base = len(instance.disks)
6423 new_disk = _GenerateDiskTemplate(self,
6424 instance.disk_template,
6425 instance.name, instance.primary_node,
6426 instance.secondary_nodes,
6431 instance.disks.append(new_disk)
6432 info = _GetInstanceInfoText(instance)
6434 logging.info("Creating volume %s for instance %s",
6435 new_disk.iv_name, instance.name)
6436 # Note: this needs to be kept in sync with _CreateDisks
6438 for node in instance.all_nodes:
6439 f_create = node == instance.primary_node
6441 _CreateBlockDev(self, node, instance, new_disk,
6442 f_create, info, f_create)
6443 except errors.OpExecError, err:
6444 self.LogWarning("Failed to create volume %s (%s) on"
6446 new_disk.iv_name, new_disk, node, err)
6447 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6448 (new_disk.size, new_disk.mode)))
6450 # change a given disk
6451 instance.disks[disk_op].mode = disk_dict['mode']
6452 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6454 for nic_op, nic_dict in self.op.nics:
6455 if nic_op == constants.DDM_REMOVE:
6456 # remove the last nic
6457 del instance.nics[-1]
6458 result.append(("nic.%d" % len(instance.nics), "remove"))
6459 elif nic_op == constants.DDM_ADD:
6460 # mac and bridge should be set, by now
6461 mac = nic_dict['mac']
6462 ip = nic_dict.get('ip', None)
6463 nicparams = self.nic_pinst[constants.DDM_ADD]
6464 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6465 instance.nics.append(new_nic)
6466 result.append(("nic.%d" % (len(instance.nics) - 1),
6467 "add:mac=%s,ip=%s,mode=%s,link=%s" %
6468 (new_nic.mac, new_nic.ip,
6469 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6470 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6473 for key in 'mac', 'ip':
6475 setattr(instance.nics[nic_op], key, nic_dict[key])
6476 if nic_op in self.nic_pnew:
6477 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6478 for key, val in nic_dict.iteritems():
6479 result.append(("nic.%s/%d" % (key, nic_op), val))
6482 if self.op.hvparams:
6483 instance.hvparams = self.hv_inst
6484 for key, val in self.op.hvparams.iteritems():
6485 result.append(("hv/%s" % key, val))
6488 if self.op.beparams:
6489 instance.beparams = self.be_inst
6490 for key, val in self.op.beparams.iteritems():
6491 result.append(("be/%s" % key, val))
6493 self.cfg.Update(instance)
6498 class LUQueryExports(NoHooksLU):
6499 """Query the exports list
6502 _OP_REQP = ['nodes']
6505 def ExpandNames(self):
6506 self.needed_locks = {}
6507 self.share_locks[locking.LEVEL_NODE] = 1
6508 if not self.op.nodes:
6509 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6511 self.needed_locks[locking.LEVEL_NODE] = \
6512 _GetWantedNodes(self, self.op.nodes)
6514 def CheckPrereq(self):
6515 """Check prerequisites.
6518 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6520 def Exec(self, feedback_fn):
6521 """Compute the list of all the exported system images.
6524 @return: a dictionary with the structure node->(export-list)
6525 where export-list is a list of the instances exported on
6529 rpcresult = self.rpc.call_export_list(self.nodes)
6531 for node in rpcresult:
6532 if rpcresult[node].fail_msg:
6533 result[node] = False
6535 result[node] = rpcresult[node].payload
6540 class LUExportInstance(LogicalUnit):
6541 """Export an instance to an image in the cluster.
6544 HPATH = "instance-export"
6545 HTYPE = constants.HTYPE_INSTANCE
6546 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6549 def ExpandNames(self):
6550 self._ExpandAndLockInstance()
6551 # FIXME: lock only instance primary and destination node
6553 # Sad but true, for now we have do lock all nodes, as we don't know where
6554 # the previous export might be, and and in this LU we search for it and
6555 # remove it from its current node. In the future we could fix this by:
6556 # - making a tasklet to search (share-lock all), then create the new one,
6557 # then one to remove, after
6558 # - removing the removal operation altogether
6559 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6561 def DeclareLocks(self, level):
6562 """Last minute lock declaration."""
6563 # All nodes are locked anyway, so nothing to do here.
6565 def BuildHooksEnv(self):
6568 This will run on the master, primary node and target node.
6572 "EXPORT_NODE": self.op.target_node,
6573 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6575 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6576 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6577 self.op.target_node]
6580 def CheckPrereq(self):
6581 """Check prerequisites.
6583 This checks that the instance and node names are valid.
6586 instance_name = self.op.instance_name
6587 self.instance = self.cfg.GetInstanceInfo(instance_name)
6588 assert self.instance is not None, \
6589 "Cannot retrieve locked instance %s" % self.op.instance_name
6590 _CheckNodeOnline(self, self.instance.primary_node)
6592 self.dst_node = self.cfg.GetNodeInfo(
6593 self.cfg.ExpandNodeName(self.op.target_node))
6595 if self.dst_node is None:
6596 # This is wrong node name, not a non-locked node
6597 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6598 _CheckNodeOnline(self, self.dst_node.name)
6599 _CheckNodeNotDrained(self, self.dst_node.name)
6601 # instance disk type verification
6602 for disk in self.instance.disks:
6603 if disk.dev_type == constants.LD_FILE:
6604 raise errors.OpPrereqError("Export not supported for instances with"
6605 " file-based disks")
6607 def Exec(self, feedback_fn):
6608 """Export an instance to an image in the cluster.
6611 instance = self.instance
6612 dst_node = self.dst_node
6613 src_node = instance.primary_node
6614 if self.op.shutdown:
6615 # shutdown the instance, but not the disks
6616 result = self.rpc.call_instance_shutdown(src_node, instance)
6617 result.Raise("Could not shutdown instance %s on"
6618 " node %s" % (instance.name, src_node))
6620 vgname = self.cfg.GetVGName()
6624 # set the disks ID correctly since call_instance_start needs the
6625 # correct drbd minor to create the symlinks
6626 for disk in instance.disks:
6627 self.cfg.SetDiskID(disk, src_node)
6630 for idx, disk in enumerate(instance.disks):
6631 # result.payload will be a snapshot of an lvm leaf of the one we passed
6632 result = self.rpc.call_blockdev_snapshot(src_node, disk)
6633 msg = result.fail_msg
6635 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6637 snap_disks.append(False)
6639 disk_id = (vgname, result.payload)
6640 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6641 logical_id=disk_id, physical_id=disk_id,
6642 iv_name=disk.iv_name)
6643 snap_disks.append(new_dev)
6646 if self.op.shutdown and instance.admin_up:
6647 result = self.rpc.call_instance_start(src_node, instance, None, None)
6648 msg = result.fail_msg
6650 _ShutdownInstanceDisks(self, instance)
6651 raise errors.OpExecError("Could not start instance: %s" % msg)
6653 # TODO: check for size
6655 cluster_name = self.cfg.GetClusterName()
6656 for idx, dev in enumerate(snap_disks):
6658 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6659 instance, cluster_name, idx)
6660 msg = result.fail_msg
6662 self.LogWarning("Could not export disk/%s from node %s to"
6663 " node %s: %s", idx, src_node, dst_node.name, msg)
6664 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6666 self.LogWarning("Could not remove snapshot for disk/%d from node"
6667 " %s: %s", idx, src_node, msg)
6669 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6670 msg = result.fail_msg
6672 self.LogWarning("Could not finalize export for instance %s"
6673 " on node %s: %s", instance.name, dst_node.name, msg)
6675 nodelist = self.cfg.GetNodeList()
6676 nodelist.remove(dst_node.name)
6678 # on one-node clusters nodelist will be empty after the removal
6679 # if we proceed the backup would be removed because OpQueryExports
6680 # substitutes an empty list with the full cluster node list.
6681 iname = instance.name
6683 exportlist = self.rpc.call_export_list(nodelist)
6684 for node in exportlist:
6685 if exportlist[node].fail_msg:
6687 if iname in exportlist[node].payload:
6688 msg = self.rpc.call_export_remove(node, iname).fail_msg
6690 self.LogWarning("Could not remove older export for instance %s"
6691 " on node %s: %s", iname, node, msg)
6694 class LURemoveExport(NoHooksLU):
6695 """Remove exports related to the named instance.
6698 _OP_REQP = ["instance_name"]
6701 def ExpandNames(self):
6702 self.needed_locks = {}
6703 # We need all nodes to be locked in order for RemoveExport to work, but we
6704 # don't need to lock the instance itself, as nothing will happen to it (and
6705 # we can remove exports also for a removed instance)
6706 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6708 def CheckPrereq(self):
6709 """Check prerequisites.
6713 def Exec(self, feedback_fn):
6714 """Remove any export.
6717 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6718 # If the instance was not found we'll try with the name that was passed in.
6719 # This will only work if it was an FQDN, though.
6721 if not instance_name:
6723 instance_name = self.op.instance_name
6725 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6726 exportlist = self.rpc.call_export_list(locked_nodes)
6728 for node in exportlist:
6729 msg = exportlist[node].fail_msg
6731 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6733 if instance_name in exportlist[node].payload:
6735 result = self.rpc.call_export_remove(node, instance_name)
6736 msg = result.fail_msg
6738 logging.error("Could not remove export for instance %s"
6739 " on node %s: %s", instance_name, node, msg)
6741 if fqdn_warn and not found:
6742 feedback_fn("Export not found. If trying to remove an export belonging"
6743 " to a deleted instance please use its Fully Qualified"
6747 class TagsLU(NoHooksLU):
6750 This is an abstract class which is the parent of all the other tags LUs.
6754 def ExpandNames(self):
6755 self.needed_locks = {}
6756 if self.op.kind == constants.TAG_NODE:
6757 name = self.cfg.ExpandNodeName(self.op.name)
6759 raise errors.OpPrereqError("Invalid node name (%s)" %
6762 self.needed_locks[locking.LEVEL_NODE] = name
6763 elif self.op.kind == constants.TAG_INSTANCE:
6764 name = self.cfg.ExpandInstanceName(self.op.name)
6766 raise errors.OpPrereqError("Invalid instance name (%s)" %
6769 self.needed_locks[locking.LEVEL_INSTANCE] = name
6771 def CheckPrereq(self):
6772 """Check prerequisites.
6775 if self.op.kind == constants.TAG_CLUSTER:
6776 self.target = self.cfg.GetClusterInfo()
6777 elif self.op.kind == constants.TAG_NODE:
6778 self.target = self.cfg.GetNodeInfo(self.op.name)
6779 elif self.op.kind == constants.TAG_INSTANCE:
6780 self.target = self.cfg.GetInstanceInfo(self.op.name)
6782 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6786 class LUGetTags(TagsLU):
6787 """Returns the tags of a given object.
6790 _OP_REQP = ["kind", "name"]
6793 def Exec(self, feedback_fn):
6794 """Returns the tag list.
6797 return list(self.target.GetTags())
6800 class LUSearchTags(NoHooksLU):
6801 """Searches the tags for a given pattern.
6804 _OP_REQP = ["pattern"]
6807 def ExpandNames(self):
6808 self.needed_locks = {}
6810 def CheckPrereq(self):
6811 """Check prerequisites.
6813 This checks the pattern passed for validity by compiling it.
6817 self.re = re.compile(self.op.pattern)
6818 except re.error, err:
6819 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6820 (self.op.pattern, err))
6822 def Exec(self, feedback_fn):
6823 """Returns the tag list.
6827 tgts = [("/cluster", cfg.GetClusterInfo())]
6828 ilist = cfg.GetAllInstancesInfo().values()
6829 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6830 nlist = cfg.GetAllNodesInfo().values()
6831 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6833 for path, target in tgts:
6834 for tag in target.GetTags():
6835 if self.re.search(tag):
6836 results.append((path, tag))
6840 class LUAddTags(TagsLU):
6841 """Sets a tag on a given object.
6844 _OP_REQP = ["kind", "name", "tags"]
6847 def CheckPrereq(self):
6848 """Check prerequisites.
6850 This checks the type and length of the tag name and value.
6853 TagsLU.CheckPrereq(self)
6854 for tag in self.op.tags:
6855 objects.TaggableObject.ValidateTag(tag)
6857 def Exec(self, feedback_fn):
6862 for tag in self.op.tags:
6863 self.target.AddTag(tag)
6864 except errors.TagError, err:
6865 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6867 self.cfg.Update(self.target)
6868 except errors.ConfigurationError:
6869 raise errors.OpRetryError("There has been a modification to the"
6870 " config file and the operation has been"
6871 " aborted. Please retry.")
6874 class LUDelTags(TagsLU):
6875 """Delete a list of tags from a given object.
6878 _OP_REQP = ["kind", "name", "tags"]
6881 def CheckPrereq(self):
6882 """Check prerequisites.
6884 This checks that we have the given tag.
6887 TagsLU.CheckPrereq(self)
6888 for tag in self.op.tags:
6889 objects.TaggableObject.ValidateTag(tag)
6890 del_tags = frozenset(self.op.tags)
6891 cur_tags = self.target.GetTags()
6892 if not del_tags <= cur_tags:
6893 diff_tags = del_tags - cur_tags
6894 diff_names = ["'%s'" % tag for tag in diff_tags]
6896 raise errors.OpPrereqError("Tag(s) %s not found" %
6897 (",".join(diff_names)))
6899 def Exec(self, feedback_fn):
6900 """Remove the tag from the object.
6903 for tag in self.op.tags:
6904 self.target.RemoveTag(tag)
6906 self.cfg.Update(self.target)
6907 except errors.ConfigurationError:
6908 raise errors.OpRetryError("There has been a modification to the"
6909 " config file and the operation has been"
6910 " aborted. Please retry.")
6913 class LUTestDelay(NoHooksLU):
6914 """Sleep for a specified amount of time.
6916 This LU sleeps on the master and/or nodes for a specified amount of
6920 _OP_REQP = ["duration", "on_master", "on_nodes"]
6923 def ExpandNames(self):
6924 """Expand names and set required locks.
6926 This expands the node list, if any.
6929 self.needed_locks = {}
6930 if self.op.on_nodes:
6931 # _GetWantedNodes can be used here, but is not always appropriate to use
6932 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6934 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6935 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6937 def CheckPrereq(self):
6938 """Check prerequisites.
6942 def Exec(self, feedback_fn):
6943 """Do the actual sleep.
6946 if self.op.on_master:
6947 if not utils.TestDelay(self.op.duration):
6948 raise errors.OpExecError("Error during master delay test")
6949 if self.op.on_nodes:
6950 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6951 for node, node_result in result.items():
6952 node_result.Raise("Failure during rpc call to node %s" % node)
6955 class IAllocator(object):
6956 """IAllocator framework.
6958 An IAllocator instance has three sets of attributes:
6959 - cfg that is needed to query the cluster
6960 - input data (all members of the _KEYS class attribute are required)
6961 - four buffer attributes (in|out_data|text), that represent the
6962 input (to the external script) in text and data structure format,
6963 and the output from it, again in two formats
6964 - the result variables from the script (success, info, nodes) for
6969 "mem_size", "disks", "disk_template",
6970 "os", "tags", "nics", "vcpus", "hypervisor",
6976 def __init__(self, cfg, rpc, mode, name, **kwargs):
6979 # init buffer variables
6980 self.in_text = self.out_text = self.in_data = self.out_data = None
6981 # init all input fields so that pylint is happy
6984 self.mem_size = self.disks = self.disk_template = None
6985 self.os = self.tags = self.nics = self.vcpus = None
6986 self.hypervisor = None
6987 self.relocate_from = None
6989 self.required_nodes = None
6990 # init result fields
6991 self.success = self.info = self.nodes = None
6992 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6993 keyset = self._ALLO_KEYS
6994 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6995 keyset = self._RELO_KEYS
6997 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6998 " IAllocator" % self.mode)
7000 if key not in keyset:
7001 raise errors.ProgrammerError("Invalid input parameter '%s' to"
7002 " IAllocator" % key)
7003 setattr(self, key, kwargs[key])
7005 if key not in kwargs:
7006 raise errors.ProgrammerError("Missing input parameter '%s' to"
7007 " IAllocator" % key)
7008 self._BuildInputData()
7010 def _ComputeClusterData(self):
7011 """Compute the generic allocator input data.
7013 This is the data that is independent of the actual operation.
7017 cluster_info = cfg.GetClusterInfo()
7020 "version": constants.IALLOCATOR_VERSION,
7021 "cluster_name": cfg.GetClusterName(),
7022 "cluster_tags": list(cluster_info.GetTags()),
7023 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
7024 # we don't have job IDs
7026 iinfo = cfg.GetAllInstancesInfo().values()
7027 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
7031 node_list = cfg.GetNodeList()
7033 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7034 hypervisor_name = self.hypervisor
7035 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7036 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
7038 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
7041 self.rpc.call_all_instances_info(node_list,
7042 cluster_info.enabled_hypervisors)
7043 for nname, nresult in node_data.items():
7044 # first fill in static (config-based) values
7045 ninfo = cfg.GetNodeInfo(nname)
7047 "tags": list(ninfo.GetTags()),
7048 "primary_ip": ninfo.primary_ip,
7049 "secondary_ip": ninfo.secondary_ip,
7050 "offline": ninfo.offline,
7051 "drained": ninfo.drained,
7052 "master_candidate": ninfo.master_candidate,
7055 if not ninfo.offline:
7056 nresult.Raise("Can't get data for node %s" % nname)
7057 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
7059 remote_info = nresult.payload
7060 for attr in ['memory_total', 'memory_free', 'memory_dom0',
7061 'vg_size', 'vg_free', 'cpu_total']:
7062 if attr not in remote_info:
7063 raise errors.OpExecError("Node '%s' didn't return attribute"
7064 " '%s'" % (nname, attr))
7065 if not isinstance(remote_info[attr], int):
7066 raise errors.OpExecError("Node '%s' returned invalid value"
7068 (nname, attr, remote_info[attr]))
7069 # compute memory used by primary instances
7070 i_p_mem = i_p_up_mem = 0
7071 for iinfo, beinfo in i_list:
7072 if iinfo.primary_node == nname:
7073 i_p_mem += beinfo[constants.BE_MEMORY]
7074 if iinfo.name not in node_iinfo[nname].payload:
7077 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
7078 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
7079 remote_info['memory_free'] -= max(0, i_mem_diff)
7082 i_p_up_mem += beinfo[constants.BE_MEMORY]
7084 # compute memory used by instances
7086 "total_memory": remote_info['memory_total'],
7087 "reserved_memory": remote_info['memory_dom0'],
7088 "free_memory": remote_info['memory_free'],
7089 "total_disk": remote_info['vg_size'],
7090 "free_disk": remote_info['vg_free'],
7091 "total_cpus": remote_info['cpu_total'],
7092 "i_pri_memory": i_p_mem,
7093 "i_pri_up_memory": i_p_up_mem,
7097 node_results[nname] = pnr
7098 data["nodes"] = node_results
7102 for iinfo, beinfo in i_list:
7104 for nic in iinfo.nics:
7105 filled_params = objects.FillDict(
7106 cluster_info.nicparams[constants.PP_DEFAULT],
7108 nic_dict = {"mac": nic.mac,
7110 "mode": filled_params[constants.NIC_MODE],
7111 "link": filled_params[constants.NIC_LINK],
7113 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7114 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7115 nic_data.append(nic_dict)
7117 "tags": list(iinfo.GetTags()),
7118 "admin_up": iinfo.admin_up,
7119 "vcpus": beinfo[constants.BE_VCPUS],
7120 "memory": beinfo[constants.BE_MEMORY],
7122 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7124 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7125 "disk_template": iinfo.disk_template,
7126 "hypervisor": iinfo.hypervisor,
7128 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7130 instance_data[iinfo.name] = pir
7132 data["instances"] = instance_data
7136 def _AddNewInstance(self):
7137 """Add new instance data to allocator structure.
7139 This in combination with _AllocatorGetClusterData will create the
7140 correct structure needed as input for the allocator.
7142 The checks for the completeness of the opcode must have already been
7148 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7150 if self.disk_template in constants.DTS_NET_MIRROR:
7151 self.required_nodes = 2
7153 self.required_nodes = 1
7157 "disk_template": self.disk_template,
7160 "vcpus": self.vcpus,
7161 "memory": self.mem_size,
7162 "disks": self.disks,
7163 "disk_space_total": disk_space,
7165 "required_nodes": self.required_nodes,
7167 data["request"] = request
7169 def _AddRelocateInstance(self):
7170 """Add relocate instance data to allocator structure.
7172 This in combination with _IAllocatorGetClusterData will create the
7173 correct structure needed as input for the allocator.
7175 The checks for the completeness of the opcode must have already been
7179 instance = self.cfg.GetInstanceInfo(self.name)
7180 if instance is None:
7181 raise errors.ProgrammerError("Unknown instance '%s' passed to"
7182 " IAllocator" % self.name)
7184 if instance.disk_template not in constants.DTS_NET_MIRROR:
7185 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7187 if len(instance.secondary_nodes) != 1:
7188 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7190 self.required_nodes = 1
7191 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7192 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7197 "disk_space_total": disk_space,
7198 "required_nodes": self.required_nodes,
7199 "relocate_from": self.relocate_from,
7201 self.in_data["request"] = request
7203 def _BuildInputData(self):
7204 """Build input data structures.
7207 self._ComputeClusterData()
7209 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7210 self._AddNewInstance()
7212 self._AddRelocateInstance()
7214 self.in_text = serializer.Dump(self.in_data)
7216 def Run(self, name, validate=True, call_fn=None):
7217 """Run an instance allocator and return the results.
7221 call_fn = self.rpc.call_iallocator_runner
7223 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
7224 result.Raise("Failure while running the iallocator script")
7226 self.out_text = result.payload
7228 self._ValidateResult()
7230 def _ValidateResult(self):
7231 """Process the allocator results.
7233 This will process and if successful save the result in
7234 self.out_data and the other parameters.
7238 rdict = serializer.Load(self.out_text)
7239 except Exception, err:
7240 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7242 if not isinstance(rdict, dict):
7243 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7245 for key in "success", "info", "nodes":
7246 if key not in rdict:
7247 raise errors.OpExecError("Can't parse iallocator results:"
7248 " missing key '%s'" % key)
7249 setattr(self, key, rdict[key])
7251 if not isinstance(rdict["nodes"], list):
7252 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7254 self.out_data = rdict
7257 class LUTestAllocator(NoHooksLU):
7258 """Run allocator tests.
7260 This LU runs the allocator tests
7263 _OP_REQP = ["direction", "mode", "name"]
7265 def CheckPrereq(self):
7266 """Check prerequisites.
7268 This checks the opcode parameters depending on the director and mode test.
7271 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7272 for attr in ["name", "mem_size", "disks", "disk_template",
7273 "os", "tags", "nics", "vcpus"]:
7274 if not hasattr(self.op, attr):
7275 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7277 iname = self.cfg.ExpandInstanceName(self.op.name)
7278 if iname is not None:
7279 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7281 if not isinstance(self.op.nics, list):
7282 raise errors.OpPrereqError("Invalid parameter 'nics'")
7283 for row in self.op.nics:
7284 if (not isinstance(row, dict) or
7287 "bridge" not in row):
7288 raise errors.OpPrereqError("Invalid contents of the"
7289 " 'nics' parameter")
7290 if not isinstance(self.op.disks, list):
7291 raise errors.OpPrereqError("Invalid parameter 'disks'")
7292 for row in self.op.disks:
7293 if (not isinstance(row, dict) or
7294 "size" not in row or
7295 not isinstance(row["size"], int) or
7296 "mode" not in row or
7297 row["mode"] not in ['r', 'w']):
7298 raise errors.OpPrereqError("Invalid contents of the"
7299 " 'disks' parameter")
7300 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7301 self.op.hypervisor = self.cfg.GetHypervisorType()
7302 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7303 if not hasattr(self.op, "name"):
7304 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7305 fname = self.cfg.ExpandInstanceName(self.op.name)
7307 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7309 self.op.name = fname
7310 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7312 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7315 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7316 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7317 raise errors.OpPrereqError("Missing allocator name")
7318 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7319 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7322 def Exec(self, feedback_fn):
7323 """Run the allocator test.
7326 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7327 ial = IAllocator(self.cfg, self.rpc,
7330 mem_size=self.op.mem_size,
7331 disks=self.op.disks,
7332 disk_template=self.op.disk_template,
7336 vcpus=self.op.vcpus,
7337 hypervisor=self.op.hypervisor,
7340 ial = IAllocator(self.cfg, self.rpc,
7343 relocate_from=list(self.relocate_from),
7346 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7347 result = ial.in_text
7349 ial.Run(self.op.allocator, validate=False)
7350 result = ial.out_text