4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
610 def _NICListToTuple(lu, nics):
611 """Build a list of nic information tuples.
613 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
614 value in LUQueryInstanceData.
616 @type lu: L{LogicalUnit}
617 @param lu: the logical unit on whose behalf we execute
618 @type nics: list of L{objects.NIC}
619 @param nics: list of nics to convert to hooks tuples
623 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
627 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
628 mode = filled_params[constants.NIC_MODE]
629 link = filled_params[constants.NIC_LINK]
630 hooks_nics.append((ip, mac, mode, link))
633 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
634 """Builds instance related env variables for hooks from an object.
636 @type lu: L{LogicalUnit}
637 @param lu: the logical unit on whose behalf we execute
638 @type instance: L{objects.Instance}
639 @param instance: the instance for which we should build the
642 @param override: dictionary with key/values that will override
645 @return: the hook environment dictionary
648 cluster = lu.cfg.GetClusterInfo()
649 bep = cluster.FillBE(instance)
650 hvp = cluster.FillHV(instance)
652 'name': instance.name,
653 'primary_node': instance.primary_node,
654 'secondary_nodes': instance.secondary_nodes,
655 'os_type': instance.os,
656 'status': instance.admin_up,
657 'memory': bep[constants.BE_MEMORY],
658 'vcpus': bep[constants.BE_VCPUS],
659 'nics': _NICListToTuple(lu, instance.nics),
660 'disk_template': instance.disk_template,
661 'disks': [(disk.size, disk.mode) for disk in instance.disks],
664 'hypervisor_name': instance.hypervisor,
667 args.update(override)
668 return _BuildInstanceHookEnv(**args)
671 def _AdjustCandidatePool(lu):
672 """Adjust the candidate pool after node operations.
675 mod_list = lu.cfg.MaintainCandidatePool()
677 lu.LogInfo("Promoted nodes to master candidate role: %s",
678 ", ".join(node.name for node in mod_list))
679 for name in mod_list:
680 lu.context.ReaddNode(name)
681 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
683 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
687 def _CheckNicsBridgesExist(lu, target_nics, target_node,
688 profile=constants.PP_DEFAULT):
689 """Check that the brigdes needed by a list of nics exist.
692 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
693 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
694 for nic in target_nics]
695 brlist = [params[constants.NIC_LINK] for params in paramslist
696 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
698 result = lu.rpc.call_bridges_exist(target_node, brlist)
699 result.Raise("Error checking bridges on destination node '%s'" %
700 target_node, prereq=True)
703 def _CheckInstanceBridgesExist(lu, instance, node=None):
704 """Check that the brigdes needed by an instance exist.
708 node = instance.primary_node
709 _CheckNicsBridgesExist(lu, instance.nics, node)
712 def _GetNodePrimaryInstances(cfg, node_name):
713 """Returns primary instances on a node.
718 for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
719 if node_name == inst.primary_node:
720 instances.append(inst)
725 def _GetNodeSecondaryInstances(cfg, node_name):
726 """Returns secondary instances on a node.
731 for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
732 if node_name in inst.secondary_nodes:
733 instances.append(inst)
738 class LUDestroyCluster(NoHooksLU):
739 """Logical unit for destroying the cluster.
744 def CheckPrereq(self):
745 """Check prerequisites.
747 This checks whether the cluster is empty.
749 Any errors are signaled by raising errors.OpPrereqError.
752 master = self.cfg.GetMasterNode()
754 nodelist = self.cfg.GetNodeList()
755 if len(nodelist) != 1 or nodelist[0] != master:
756 raise errors.OpPrereqError("There are still %d node(s) in"
757 " this cluster." % (len(nodelist) - 1))
758 instancelist = self.cfg.GetInstanceList()
760 raise errors.OpPrereqError("There are still %d instance(s) in"
761 " this cluster." % len(instancelist))
763 def Exec(self, feedback_fn):
764 """Destroys the cluster.
767 master = self.cfg.GetMasterNode()
768 result = self.rpc.call_node_stop_master(master, False)
769 result.Raise("Could not disable the master role")
770 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
771 utils.CreateBackup(priv_key)
772 utils.CreateBackup(pub_key)
776 class LUVerifyCluster(LogicalUnit):
777 """Verifies the cluster status.
780 HPATH = "cluster-verify"
781 HTYPE = constants.HTYPE_CLUSTER
782 _OP_REQP = ["skip_checks"]
785 def ExpandNames(self):
786 self.needed_locks = {
787 locking.LEVEL_NODE: locking.ALL_SET,
788 locking.LEVEL_INSTANCE: locking.ALL_SET,
790 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
792 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
793 node_result, feedback_fn, master_files,
795 """Run multiple tests against a node.
799 - compares ganeti version
800 - checks vg existence and size > 20G
801 - checks config file checksum
802 - checks ssh to other nodes
804 @type nodeinfo: L{objects.Node}
805 @param nodeinfo: the node to check
806 @param file_list: required list of files
807 @param local_cksum: dictionary of local files and their checksums
808 @param node_result: the results from the node
809 @param feedback_fn: function used to accumulate results
810 @param master_files: list of files that only masters should have
811 @param drbd_map: the useddrbd minors for this node, in
812 form of minor: (instance, must_exist) which correspond to instances
813 and their running status
814 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
819 # main result, node_result should be a non-empty dict
820 if not node_result or not isinstance(node_result, dict):
821 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
824 # compares ganeti version
825 local_version = constants.PROTOCOL_VERSION
826 remote_version = node_result.get('version', None)
827 if not (remote_version and isinstance(remote_version, (list, tuple)) and
828 len(remote_version) == 2):
829 feedback_fn(" - ERROR: connection to %s failed" % (node))
832 if local_version != remote_version[0]:
833 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
834 " node %s %s" % (local_version, node, remote_version[0]))
837 # node seems compatible, we can actually try to look into its results
841 # full package version
842 if constants.RELEASE_VERSION != remote_version[1]:
843 feedback_fn(" - WARNING: software version mismatch: master %s,"
845 (constants.RELEASE_VERSION, node, remote_version[1]))
847 # checks vg existence and size > 20G
848 if vg_name is not None:
849 vglist = node_result.get(constants.NV_VGLIST, None)
851 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
855 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
856 constants.MIN_VG_SIZE)
858 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
861 # checks config file checksum
863 remote_cksum = node_result.get(constants.NV_FILELIST, None)
864 if not isinstance(remote_cksum, dict):
866 feedback_fn(" - ERROR: node hasn't returned file checksum data")
868 for file_name in file_list:
869 node_is_mc = nodeinfo.master_candidate
870 must_have_file = file_name not in master_files
871 if file_name not in remote_cksum:
872 if node_is_mc or must_have_file:
874 feedback_fn(" - ERROR: file '%s' missing" % file_name)
875 elif remote_cksum[file_name] != local_cksum[file_name]:
876 if node_is_mc or must_have_file:
878 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
880 # not candidate and this is not a must-have file
882 feedback_fn(" - ERROR: file '%s' should not exist on non master"
883 " candidates (and the file is outdated)" % file_name)
885 # all good, except non-master/non-must have combination
886 if not node_is_mc and not must_have_file:
887 feedback_fn(" - ERROR: file '%s' should not exist on non master"
888 " candidates" % file_name)
892 if constants.NV_NODELIST not in node_result:
894 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
896 if node_result[constants.NV_NODELIST]:
898 for node in node_result[constants.NV_NODELIST]:
899 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
900 (node, node_result[constants.NV_NODELIST][node]))
902 if constants.NV_NODENETTEST not in node_result:
904 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
906 if node_result[constants.NV_NODENETTEST]:
908 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
910 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
911 (node, node_result[constants.NV_NODENETTEST][node]))
913 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
914 if isinstance(hyp_result, dict):
915 for hv_name, hv_result in hyp_result.iteritems():
916 if hv_result is not None:
917 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
918 (hv_name, hv_result))
920 # check used drbd list
921 if vg_name is not None:
922 used_minors = node_result.get(constants.NV_DRBDLIST, [])
923 if not isinstance(used_minors, (tuple, list)):
924 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
927 for minor, (iname, must_exist) in drbd_map.items():
928 if minor not in used_minors and must_exist:
929 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
930 " not active" % (minor, iname))
932 for minor in used_minors:
933 if minor not in drbd_map:
934 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
940 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
941 node_instance, feedback_fn, n_offline):
942 """Verify an instance.
944 This function checks to see if the required block devices are
945 available on the instance's node.
950 node_current = instanceconfig.primary_node
953 instanceconfig.MapLVsByNode(node_vol_should)
955 for node in node_vol_should:
956 if node in n_offline:
957 # ignore missing volumes on offline nodes
959 for volume in node_vol_should[node]:
960 if node not in node_vol_is or volume not in node_vol_is[node]:
961 feedback_fn(" - ERROR: volume %s missing on node %s" %
965 if instanceconfig.admin_up:
966 if ((node_current not in node_instance or
967 not instance in node_instance[node_current]) and
968 node_current not in n_offline):
969 feedback_fn(" - ERROR: instance %s not running on node %s" %
970 (instance, node_current))
973 for node in node_instance:
974 if (not node == node_current):
975 if instance in node_instance[node]:
976 feedback_fn(" - ERROR: instance %s should not run on node %s" %
982 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
983 """Verify if there are any unknown volumes in the cluster.
985 The .os, .swap and backup volumes are ignored. All other volumes are
991 for node in node_vol_is:
992 for volume in node_vol_is[node]:
993 if node not in node_vol_should or volume not in node_vol_should[node]:
994 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
999 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
1000 """Verify the list of running instances.
1002 This checks what instances are running but unknown to the cluster.
1006 for node in node_instance:
1007 for runninginstance in node_instance[node]:
1008 if runninginstance not in instancelist:
1009 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
1010 (runninginstance, node))
1014 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1015 """Verify N+1 Memory Resilience.
1017 Check that if one single node dies we can still start all the instances it
1023 for node, nodeinfo in node_info.iteritems():
1024 # This code checks that every node which is now listed as secondary has
1025 # enough memory to host all instances it is supposed to should a single
1026 # other node in the cluster fail.
1027 # FIXME: not ready for failover to an arbitrary node
1028 # FIXME: does not support file-backed instances
1029 # WARNING: we currently take into account down instances as well as up
1030 # ones, considering that even if they're down someone might want to start
1031 # them even in the event of a node failure.
1032 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1034 for instance in instances:
1035 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1036 if bep[constants.BE_AUTO_BALANCE]:
1037 needed_mem += bep[constants.BE_MEMORY]
1038 if nodeinfo['mfree'] < needed_mem:
1039 feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
1040 " failovers should node %s fail" % (node, prinode))
1044 def CheckPrereq(self):
1045 """Check prerequisites.
1047 Transform the list of checks we're going to skip into a set and check that
1048 all its members are valid.
1051 self.skip_set = frozenset(self.op.skip_checks)
1052 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1053 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1055 def BuildHooksEnv(self):
1058 Cluster-Verify hooks just ran in the post phase and their failure makes
1059 the output be logged in the verify output and the verification to fail.
1062 all_nodes = self.cfg.GetNodeList()
1064 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1066 for node in self.cfg.GetAllNodesInfo().values():
1067 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1069 return env, [], all_nodes
1071 def Exec(self, feedback_fn):
1072 """Verify integrity of cluster, performing various test on nodes.
1076 feedback_fn("* Verifying global settings")
1077 for msg in self.cfg.VerifyConfig():
1078 feedback_fn(" - ERROR: %s" % msg)
1080 vg_name = self.cfg.GetVGName()
1081 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1082 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1083 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1084 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1085 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1086 for iname in instancelist)
1087 i_non_redundant = [] # Non redundant instances
1088 i_non_a_balanced = [] # Non auto-balanced instances
1089 n_offline = [] # List of offline nodes
1090 n_drained = [] # List of nodes being drained
1096 # FIXME: verify OS list
1097 # do local checksums
1098 master_files = [constants.CLUSTER_CONF_FILE]
1100 file_names = ssconf.SimpleStore().GetFileList()
1101 file_names.append(constants.SSL_CERT_FILE)
1102 file_names.append(constants.RAPI_CERT_FILE)
1103 file_names.extend(master_files)
1105 local_checksums = utils.FingerprintFiles(file_names)
1107 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1108 node_verify_param = {
1109 constants.NV_FILELIST: file_names,
1110 constants.NV_NODELIST: [node.name for node in nodeinfo
1111 if not node.offline],
1112 constants.NV_HYPERVISOR: hypervisors,
1113 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1114 node.secondary_ip) for node in nodeinfo
1115 if not node.offline],
1116 constants.NV_INSTANCELIST: hypervisors,
1117 constants.NV_VERSION: None,
1118 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1120 if vg_name is not None:
1121 node_verify_param[constants.NV_VGLIST] = None
1122 node_verify_param[constants.NV_LVLIST] = vg_name
1123 node_verify_param[constants.NV_DRBDLIST] = None
1124 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1125 self.cfg.GetClusterName())
1127 cluster = self.cfg.GetClusterInfo()
1128 master_node = self.cfg.GetMasterNode()
1129 all_drbd_map = self.cfg.ComputeDRBDMap()
1131 for node_i in nodeinfo:
1135 feedback_fn("* Skipping offline node %s" % (node,))
1136 n_offline.append(node)
1139 if node == master_node:
1141 elif node_i.master_candidate:
1142 ntype = "master candidate"
1143 elif node_i.drained:
1145 n_drained.append(node)
1148 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1150 msg = all_nvinfo[node].fail_msg
1152 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1156 nresult = all_nvinfo[node].payload
1158 for minor, instance in all_drbd_map[node].items():
1159 if instance not in instanceinfo:
1160 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1162 # ghost instance should not be running, but otherwise we
1163 # don't give double warnings (both ghost instance and
1164 # unallocated minor in use)
1165 node_drbd[minor] = (instance, False)
1167 instance = instanceinfo[instance]
1168 node_drbd[minor] = (instance.name, instance.admin_up)
1169 result = self._VerifyNode(node_i, file_names, local_checksums,
1170 nresult, feedback_fn, master_files,
1174 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1176 node_volume[node] = {}
1177 elif isinstance(lvdata, basestring):
1178 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1179 (node, utils.SafeEncode(lvdata)))
1181 node_volume[node] = {}
1182 elif not isinstance(lvdata, dict):
1183 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1187 node_volume[node] = lvdata
1190 idata = nresult.get(constants.NV_INSTANCELIST, None)
1191 if not isinstance(idata, list):
1192 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1197 node_instance[node] = idata
1200 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1201 if not isinstance(nodeinfo, dict):
1202 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1208 "mfree": int(nodeinfo['memory_free']),
1211 # dictionary holding all instances this node is secondary for,
1212 # grouped by their primary node. Each key is a cluster node, and each
1213 # value is a list of instances which have the key as primary and the
1214 # current node as secondary. this is handy to calculate N+1 memory
1215 # availability if you can only failover from a primary to its
1217 "sinst-by-pnode": {},
1219 # FIXME: devise a free space model for file based instances as well
1220 if vg_name is not None:
1221 if (constants.NV_VGLIST not in nresult or
1222 vg_name not in nresult[constants.NV_VGLIST]):
1223 feedback_fn(" - ERROR: node %s didn't return data for the"
1224 " volume group '%s' - it is either missing or broken" %
1228 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1229 except (ValueError, KeyError):
1230 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1231 " from node %s" % (node,))
1235 node_vol_should = {}
1237 for instance in instancelist:
1238 feedback_fn("* Verifying instance %s" % instance)
1239 inst_config = instanceinfo[instance]
1240 result = self._VerifyInstance(instance, inst_config, node_volume,
1241 node_instance, feedback_fn, n_offline)
1243 inst_nodes_offline = []
1245 inst_config.MapLVsByNode(node_vol_should)
1247 instance_cfg[instance] = inst_config
1249 pnode = inst_config.primary_node
1250 if pnode in node_info:
1251 node_info[pnode]['pinst'].append(instance)
1252 elif pnode not in n_offline:
1253 feedback_fn(" - ERROR: instance %s, connection to primary node"
1254 " %s failed" % (instance, pnode))
1257 if pnode in n_offline:
1258 inst_nodes_offline.append(pnode)
1260 # If the instance is non-redundant we cannot survive losing its primary
1261 # node, so we are not N+1 compliant. On the other hand we have no disk
1262 # templates with more than one secondary so that situation is not well
1264 # FIXME: does not support file-backed instances
1265 if len(inst_config.secondary_nodes) == 0:
1266 i_non_redundant.append(instance)
1267 elif len(inst_config.secondary_nodes) > 1:
1268 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1271 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1272 i_non_a_balanced.append(instance)
1274 for snode in inst_config.secondary_nodes:
1275 if snode in node_info:
1276 node_info[snode]['sinst'].append(instance)
1277 if pnode not in node_info[snode]['sinst-by-pnode']:
1278 node_info[snode]['sinst-by-pnode'][pnode] = []
1279 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1280 elif snode not in n_offline:
1281 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1282 " %s failed" % (instance, snode))
1284 if snode in n_offline:
1285 inst_nodes_offline.append(snode)
1287 if inst_nodes_offline:
1288 # warn that the instance lives on offline nodes, and set bad=True
1289 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1290 ", ".join(inst_nodes_offline))
1293 feedback_fn("* Verifying orphan volumes")
1294 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1298 feedback_fn("* Verifying remaining instances")
1299 result = self._VerifyOrphanInstances(instancelist, node_instance,
1303 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1304 feedback_fn("* Verifying N+1 Memory redundancy")
1305 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1308 feedback_fn("* Other Notes")
1310 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1311 % len(i_non_redundant))
1313 if i_non_a_balanced:
1314 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1315 % len(i_non_a_balanced))
1318 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1321 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1325 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1326 """Analyze the post-hooks' result
1328 This method analyses the hook result, handles it, and sends some
1329 nicely-formatted feedback back to the user.
1331 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1332 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1333 @param hooks_results: the results of the multi-node hooks rpc call
1334 @param feedback_fn: function used send feedback back to the caller
1335 @param lu_result: previous Exec result
1336 @return: the new Exec result, based on the previous result
1340 # We only really run POST phase hooks, and are only interested in
1342 if phase == constants.HOOKS_PHASE_POST:
1343 # Used to change hooks' output to proper indentation
1344 indent_re = re.compile('^', re.M)
1345 feedback_fn("* Hooks Results")
1346 if not hooks_results:
1347 feedback_fn(" - ERROR: general communication failure")
1350 for node_name in hooks_results:
1351 show_node_header = True
1352 res = hooks_results[node_name]
1356 # no need to warn or set fail return value
1358 feedback_fn(" Communication failure in hooks execution: %s" %
1362 for script, hkr, output in res.payload:
1363 if hkr == constants.HKR_FAIL:
1364 # The node header is only shown once, if there are
1365 # failing hooks on that node
1366 if show_node_header:
1367 feedback_fn(" Node %s:" % node_name)
1368 show_node_header = False
1369 feedback_fn(" ERROR: Script %s failed, output:" % script)
1370 output = indent_re.sub(' ', output)
1371 feedback_fn("%s" % output)
1377 class LUVerifyDisks(NoHooksLU):
1378 """Verifies the cluster disks status.
1384 def ExpandNames(self):
1385 self.needed_locks = {
1386 locking.LEVEL_NODE: locking.ALL_SET,
1387 locking.LEVEL_INSTANCE: locking.ALL_SET,
1389 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1391 def CheckPrereq(self):
1392 """Check prerequisites.
1394 This has no prerequisites.
1399 def Exec(self, feedback_fn):
1400 """Verify integrity of cluster disks.
1402 @rtype: tuple of three items
1403 @return: a tuple of (dict of node-to-node_error, list of instances
1404 which need activate-disks, dict of instance: (node, volume) for
1408 result = res_nodes, res_instances, res_missing = {}, [], {}
1410 vg_name = self.cfg.GetVGName()
1411 nodes = utils.NiceSort(self.cfg.GetNodeList())
1412 instances = [self.cfg.GetInstanceInfo(name)
1413 for name in self.cfg.GetInstanceList()]
1416 for inst in instances:
1418 if (not inst.admin_up or
1419 inst.disk_template not in constants.DTS_NET_MIRROR):
1421 inst.MapLVsByNode(inst_lvs)
1422 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1423 for node, vol_list in inst_lvs.iteritems():
1424 for vol in vol_list:
1425 nv_dict[(node, vol)] = inst
1430 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1434 node_res = node_lvs[node]
1435 if node_res.offline:
1437 msg = node_res.fail_msg
1439 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1440 res_nodes[node] = msg
1443 lvs = node_res.payload
1444 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1445 inst = nv_dict.pop((node, lv_name), None)
1446 if (not lv_online and inst is not None
1447 and inst.name not in res_instances):
1448 res_instances.append(inst.name)
1450 # any leftover items in nv_dict are missing LVs, let's arrange the
1452 for key, inst in nv_dict.iteritems():
1453 if inst.name not in res_missing:
1454 res_missing[inst.name] = []
1455 res_missing[inst.name].append(key)
1460 class LURenameCluster(LogicalUnit):
1461 """Rename the cluster.
1464 HPATH = "cluster-rename"
1465 HTYPE = constants.HTYPE_CLUSTER
1468 def BuildHooksEnv(self):
1473 "OP_TARGET": self.cfg.GetClusterName(),
1474 "NEW_NAME": self.op.name,
1476 mn = self.cfg.GetMasterNode()
1477 return env, [mn], [mn]
1479 def CheckPrereq(self):
1480 """Verify that the passed name is a valid one.
1483 hostname = utils.HostInfo(self.op.name)
1485 new_name = hostname.name
1486 self.ip = new_ip = hostname.ip
1487 old_name = self.cfg.GetClusterName()
1488 old_ip = self.cfg.GetMasterIP()
1489 if new_name == old_name and new_ip == old_ip:
1490 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1491 " cluster has changed")
1492 if new_ip != old_ip:
1493 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1494 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1495 " reachable on the network. Aborting." %
1498 self.op.name = new_name
1500 def Exec(self, feedback_fn):
1501 """Rename the cluster.
1504 clustername = self.op.name
1507 # shutdown the master IP
1508 master = self.cfg.GetMasterNode()
1509 result = self.rpc.call_node_stop_master(master, False)
1510 result.Raise("Could not disable the master role")
1513 cluster = self.cfg.GetClusterInfo()
1514 cluster.cluster_name = clustername
1515 cluster.master_ip = ip
1516 self.cfg.Update(cluster)
1518 # update the known hosts file
1519 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1520 node_list = self.cfg.GetNodeList()
1522 node_list.remove(master)
1525 result = self.rpc.call_upload_file(node_list,
1526 constants.SSH_KNOWN_HOSTS_FILE)
1527 for to_node, to_result in result.iteritems():
1528 msg = to_result.fail_msg
1530 msg = ("Copy of file %s to node %s failed: %s" %
1531 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1532 self.proc.LogWarning(msg)
1535 result = self.rpc.call_node_start_master(master, False, False)
1536 msg = result.fail_msg
1538 self.LogWarning("Could not re-enable the master role on"
1539 " the master, please restart manually: %s", msg)
1542 def _RecursiveCheckIfLVMBased(disk):
1543 """Check if the given disk or its children are lvm-based.
1545 @type disk: L{objects.Disk}
1546 @param disk: the disk to check
1548 @return: boolean indicating whether a LD_LV dev_type was found or not
1552 for chdisk in disk.children:
1553 if _RecursiveCheckIfLVMBased(chdisk):
1555 return disk.dev_type == constants.LD_LV
1558 class LUSetClusterParams(LogicalUnit):
1559 """Change the parameters of the cluster.
1562 HPATH = "cluster-modify"
1563 HTYPE = constants.HTYPE_CLUSTER
1567 def CheckArguments(self):
1571 if not hasattr(self.op, "candidate_pool_size"):
1572 self.op.candidate_pool_size = None
1573 if self.op.candidate_pool_size is not None:
1575 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1576 except (ValueError, TypeError), err:
1577 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1579 if self.op.candidate_pool_size < 1:
1580 raise errors.OpPrereqError("At least one master candidate needed")
1582 def ExpandNames(self):
1583 # FIXME: in the future maybe other cluster params won't require checking on
1584 # all nodes to be modified.
1585 self.needed_locks = {
1586 locking.LEVEL_NODE: locking.ALL_SET,
1588 self.share_locks[locking.LEVEL_NODE] = 1
1590 def BuildHooksEnv(self):
1595 "OP_TARGET": self.cfg.GetClusterName(),
1596 "NEW_VG_NAME": self.op.vg_name,
1598 mn = self.cfg.GetMasterNode()
1599 return env, [mn], [mn]
1601 def CheckPrereq(self):
1602 """Check prerequisites.
1604 This checks whether the given params don't conflict and
1605 if the given volume group is valid.
1608 if self.op.vg_name is not None and not self.op.vg_name:
1609 instances = self.cfg.GetAllInstancesInfo().values()
1610 for inst in instances:
1611 for disk in inst.disks:
1612 if _RecursiveCheckIfLVMBased(disk):
1613 raise errors.OpPrereqError("Cannot disable lvm storage while"
1614 " lvm-based instances exist")
1616 node_list = self.acquired_locks[locking.LEVEL_NODE]
1618 # if vg_name not None, checks given volume group on all nodes
1620 vglist = self.rpc.call_vg_list(node_list)
1621 for node in node_list:
1622 msg = vglist[node].fail_msg
1624 # ignoring down node
1625 self.LogWarning("Error while gathering data on node %s"
1626 " (ignoring node): %s", node, msg)
1628 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1630 constants.MIN_VG_SIZE)
1632 raise errors.OpPrereqError("Error on node '%s': %s" %
1635 self.cluster = cluster = self.cfg.GetClusterInfo()
1636 # validate params changes
1637 if self.op.beparams:
1638 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1639 self.new_beparams = objects.FillDict(
1640 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1642 if self.op.nicparams:
1643 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1644 self.new_nicparams = objects.FillDict(
1645 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1646 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1648 # hypervisor list/parameters
1649 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1650 if self.op.hvparams:
1651 if not isinstance(self.op.hvparams, dict):
1652 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1653 for hv_name, hv_dict in self.op.hvparams.items():
1654 if hv_name not in self.new_hvparams:
1655 self.new_hvparams[hv_name] = hv_dict
1657 self.new_hvparams[hv_name].update(hv_dict)
1659 if self.op.enabled_hypervisors is not None:
1660 self.hv_list = self.op.enabled_hypervisors
1661 if not self.hv_list:
1662 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1663 " least one member")
1664 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1666 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1667 " entries: %s" % invalid_hvs)
1669 self.hv_list = cluster.enabled_hypervisors
1671 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1672 # either the enabled list has changed, or the parameters have, validate
1673 for hv_name, hv_params in self.new_hvparams.items():
1674 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1675 (self.op.enabled_hypervisors and
1676 hv_name in self.op.enabled_hypervisors)):
1677 # either this is a new hypervisor, or its parameters have changed
1678 hv_class = hypervisor.GetHypervisor(hv_name)
1679 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1680 hv_class.CheckParameterSyntax(hv_params)
1681 _CheckHVParams(self, node_list, hv_name, hv_params)
1683 def Exec(self, feedback_fn):
1684 """Change the parameters of the cluster.
1687 if self.op.vg_name is not None:
1688 new_volume = self.op.vg_name
1691 if new_volume != self.cfg.GetVGName():
1692 self.cfg.SetVGName(new_volume)
1694 feedback_fn("Cluster LVM configuration already in desired"
1695 " state, not changing")
1696 if self.op.hvparams:
1697 self.cluster.hvparams = self.new_hvparams
1698 if self.op.enabled_hypervisors is not None:
1699 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1700 if self.op.beparams:
1701 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1702 if self.op.nicparams:
1703 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1705 if self.op.candidate_pool_size is not None:
1706 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1707 # we need to update the pool size here, otherwise the save will fail
1708 _AdjustCandidatePool(self)
1710 self.cfg.Update(self.cluster)
1713 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1714 """Distribute additional files which are part of the cluster configuration.
1716 ConfigWriter takes care of distributing the config and ssconf files, but
1717 there are more files which should be distributed to all nodes. This function
1718 makes sure those are copied.
1720 @param lu: calling logical unit
1721 @param additional_nodes: list of nodes not in the config to distribute to
1724 # 1. Gather target nodes
1725 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1726 dist_nodes = lu.cfg.GetNodeList()
1727 if additional_nodes is not None:
1728 dist_nodes.extend(additional_nodes)
1729 if myself.name in dist_nodes:
1730 dist_nodes.remove(myself.name)
1731 # 2. Gather files to distribute
1732 dist_files = set([constants.ETC_HOSTS,
1733 constants.SSH_KNOWN_HOSTS_FILE,
1734 constants.RAPI_CERT_FILE,
1735 constants.RAPI_USERS_FILE,
1736 constants.HMAC_CLUSTER_KEY,
1739 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1740 for hv_name in enabled_hypervisors:
1741 hv_class = hypervisor.GetHypervisor(hv_name)
1742 dist_files.update(hv_class.GetAncillaryFiles())
1744 # 3. Perform the files upload
1745 for fname in dist_files:
1746 if os.path.exists(fname):
1747 result = lu.rpc.call_upload_file(dist_nodes, fname)
1748 for to_node, to_result in result.items():
1749 msg = to_result.fail_msg
1751 msg = ("Copy of file %s to node %s failed: %s" %
1752 (fname, to_node, msg))
1753 lu.proc.LogWarning(msg)
1756 class LURedistributeConfig(NoHooksLU):
1757 """Force the redistribution of cluster configuration.
1759 This is a very simple LU.
1765 def ExpandNames(self):
1766 self.needed_locks = {
1767 locking.LEVEL_NODE: locking.ALL_SET,
1769 self.share_locks[locking.LEVEL_NODE] = 1
1771 def CheckPrereq(self):
1772 """Check prerequisites.
1776 def Exec(self, feedback_fn):
1777 """Redistribute the configuration.
1780 self.cfg.Update(self.cfg.GetClusterInfo())
1781 _RedistributeAncillaryFiles(self)
1784 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1785 """Sleep and poll for an instance's disk to sync.
1788 if not instance.disks:
1792 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1794 node = instance.primary_node
1796 for dev in instance.disks:
1797 lu.cfg.SetDiskID(dev, node)
1800 degr_retries = 10 # in seconds, as we sleep 1 second each time
1804 cumul_degraded = False
1805 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1806 msg = rstats.fail_msg
1808 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1811 raise errors.RemoteError("Can't contact node %s for mirror data,"
1812 " aborting." % node)
1815 rstats = rstats.payload
1817 for i, mstat in enumerate(rstats):
1819 lu.LogWarning("Can't compute data for node %s/%s",
1820 node, instance.disks[i].iv_name)
1822 # we ignore the ldisk parameter
1823 perc_done, est_time, is_degraded, _ = mstat
1824 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1825 if perc_done is not None:
1827 if est_time is not None:
1828 rem_time = "%d estimated seconds remaining" % est_time
1831 rem_time = "no time estimate"
1832 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1833 (instance.disks[i].iv_name, perc_done, rem_time))
1835 # if we're done but degraded, let's do a few small retries, to
1836 # make sure we see a stable and not transient situation; therefore
1837 # we force restart of the loop
1838 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1839 logging.info("Degraded disks found, %d retries left", degr_retries)
1847 time.sleep(min(60, max_time))
1850 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1851 return not cumul_degraded
1854 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1855 """Check that mirrors are not degraded.
1857 The ldisk parameter, if True, will change the test from the
1858 is_degraded attribute (which represents overall non-ok status for
1859 the device(s)) to the ldisk (representing the local storage status).
1862 lu.cfg.SetDiskID(dev, node)
1869 if on_primary or dev.AssembleOnSecondary():
1870 rstats = lu.rpc.call_blockdev_find(node, dev)
1871 msg = rstats.fail_msg
1873 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1875 elif not rstats.payload:
1876 lu.LogWarning("Can't find disk on node %s", node)
1879 result = result and (not rstats.payload[idx])
1881 for child in dev.children:
1882 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1887 class LUDiagnoseOS(NoHooksLU):
1888 """Logical unit for OS diagnose/query.
1891 _OP_REQP = ["output_fields", "names"]
1893 _FIELDS_STATIC = utils.FieldSet()
1894 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1896 def ExpandNames(self):
1898 raise errors.OpPrereqError("Selective OS query not supported")
1900 _CheckOutputFields(static=self._FIELDS_STATIC,
1901 dynamic=self._FIELDS_DYNAMIC,
1902 selected=self.op.output_fields)
1904 # Lock all nodes, in shared mode
1905 # Temporary removal of locks, should be reverted later
1906 # TODO: reintroduce locks when they are lighter-weight
1907 self.needed_locks = {}
1908 #self.share_locks[locking.LEVEL_NODE] = 1
1909 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1911 def CheckPrereq(self):
1912 """Check prerequisites.
1917 def _DiagnoseByOS(node_list, rlist):
1918 """Remaps a per-node return list into an a per-os per-node dictionary
1920 @param node_list: a list with the names of all nodes
1921 @param rlist: a map with node names as keys and OS objects as values
1924 @return: a dictionary with osnames as keys and as value another map, with
1925 nodes as keys and tuples of (path, status, diagnose) as values, eg::
1927 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1928 (/srv/..., False, "invalid api")],
1929 "node2": [(/srv/..., True, "")]}
1934 # we build here the list of nodes that didn't fail the RPC (at RPC
1935 # level), so that nodes with a non-responding node daemon don't
1936 # make all OSes invalid
1937 good_nodes = [node_name for node_name in rlist
1938 if not rlist[node_name].fail_msg]
1939 for node_name, nr in rlist.items():
1940 if nr.fail_msg or not nr.payload:
1942 for name, path, status, diagnose in nr.payload:
1943 if name not in all_os:
1944 # build a list of nodes for this os containing empty lists
1945 # for each node in node_list
1947 for nname in good_nodes:
1948 all_os[name][nname] = []
1949 all_os[name][node_name].append((path, status, diagnose))
1952 def Exec(self, feedback_fn):
1953 """Compute the list of OSes.
1956 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1957 node_data = self.rpc.call_os_diagnose(valid_nodes)
1958 pol = self._DiagnoseByOS(valid_nodes, node_data)
1960 for os_name, os_data in pol.items():
1962 for field in self.op.output_fields:
1965 elif field == "valid":
1966 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1967 elif field == "node_status":
1968 # this is just a copy of the dict
1970 for node_name, nos_list in os_data.items():
1971 val[node_name] = nos_list
1973 raise errors.ParameterError(field)
1980 class LURemoveNode(LogicalUnit):
1981 """Logical unit for removing a node.
1984 HPATH = "node-remove"
1985 HTYPE = constants.HTYPE_NODE
1986 _OP_REQP = ["node_name"]
1988 def BuildHooksEnv(self):
1991 This doesn't run on the target node in the pre phase as a failed
1992 node would then be impossible to remove.
1996 "OP_TARGET": self.op.node_name,
1997 "NODE_NAME": self.op.node_name,
1999 all_nodes = self.cfg.GetNodeList()
2000 all_nodes.remove(self.op.node_name)
2001 return env, all_nodes, all_nodes
2003 def CheckPrereq(self):
2004 """Check prerequisites.
2007 - the node exists in the configuration
2008 - it does not have primary or secondary instances
2009 - it's not the master
2011 Any errors are signaled by raising errors.OpPrereqError.
2014 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2016 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2018 instance_list = self.cfg.GetInstanceList()
2020 masternode = self.cfg.GetMasterNode()
2021 if node.name == masternode:
2022 raise errors.OpPrereqError("Node is the master node,"
2023 " you need to failover first.")
2025 for instance_name in instance_list:
2026 instance = self.cfg.GetInstanceInfo(instance_name)
2027 if node.name in instance.all_nodes:
2028 raise errors.OpPrereqError("Instance %s is still running on the node,"
2029 " please remove first." % instance_name)
2030 self.op.node_name = node.name
2033 def Exec(self, feedback_fn):
2034 """Removes the node from the cluster.
2038 logging.info("Stopping the node daemon and removing configs from node %s",
2041 self.context.RemoveNode(node.name)
2043 result = self.rpc.call_node_leave_cluster(node.name)
2044 msg = result.fail_msg
2046 self.LogWarning("Errors encountered on the remote node while leaving"
2047 " the cluster: %s", msg)
2049 # Promote nodes to master candidate as needed
2050 _AdjustCandidatePool(self)
2053 class LUQueryNodes(NoHooksLU):
2054 """Logical unit for querying nodes.
2057 _OP_REQP = ["output_fields", "names", "use_locking"]
2059 _FIELDS_DYNAMIC = utils.FieldSet(
2061 "mtotal", "mnode", "mfree",
2063 "ctotal", "cnodes", "csockets",
2066 _FIELDS_STATIC = utils.FieldSet(
2067 "name", "pinst_cnt", "sinst_cnt",
2068 "pinst_list", "sinst_list",
2069 "pip", "sip", "tags",
2078 def ExpandNames(self):
2079 _CheckOutputFields(static=self._FIELDS_STATIC,
2080 dynamic=self._FIELDS_DYNAMIC,
2081 selected=self.op.output_fields)
2083 self.needed_locks = {}
2084 self.share_locks[locking.LEVEL_NODE] = 1
2087 self.wanted = _GetWantedNodes(self, self.op.names)
2089 self.wanted = locking.ALL_SET
2091 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2092 self.do_locking = self.do_node_query and self.op.use_locking
2094 # if we don't request only static fields, we need to lock the nodes
2095 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2098 def CheckPrereq(self):
2099 """Check prerequisites.
2102 # The validation of the node list is done in the _GetWantedNodes,
2103 # if non empty, and if empty, there's no validation to do
2106 def Exec(self, feedback_fn):
2107 """Computes the list of nodes and their attributes.
2110 all_info = self.cfg.GetAllNodesInfo()
2112 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2113 elif self.wanted != locking.ALL_SET:
2114 nodenames = self.wanted
2115 missing = set(nodenames).difference(all_info.keys())
2117 raise errors.OpExecError(
2118 "Some nodes were removed before retrieving their data: %s" % missing)
2120 nodenames = all_info.keys()
2122 nodenames = utils.NiceSort(nodenames)
2123 nodelist = [all_info[name] for name in nodenames]
2125 # begin data gathering
2127 if self.do_node_query:
2129 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2130 self.cfg.GetHypervisorType())
2131 for name in nodenames:
2132 nodeinfo = node_data[name]
2133 if not nodeinfo.fail_msg and nodeinfo.payload:
2134 nodeinfo = nodeinfo.payload
2135 fn = utils.TryConvert
2137 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2138 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2139 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2140 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2141 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2142 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2143 "bootid": nodeinfo.get('bootid', None),
2144 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2145 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2148 live_data[name] = {}
2150 live_data = dict.fromkeys(nodenames, {})
2152 node_to_primary = dict([(name, set()) for name in nodenames])
2153 node_to_secondary = dict([(name, set()) for name in nodenames])
2155 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2156 "sinst_cnt", "sinst_list"))
2157 if inst_fields & frozenset(self.op.output_fields):
2158 instancelist = self.cfg.GetInstanceList()
2160 for instance_name in instancelist:
2161 inst = self.cfg.GetInstanceInfo(instance_name)
2162 if inst.primary_node in node_to_primary:
2163 node_to_primary[inst.primary_node].add(inst.name)
2164 for secnode in inst.secondary_nodes:
2165 if secnode in node_to_secondary:
2166 node_to_secondary[secnode].add(inst.name)
2168 master_node = self.cfg.GetMasterNode()
2170 # end data gathering
2173 for node in nodelist:
2175 for field in self.op.output_fields:
2178 elif field == "pinst_list":
2179 val = list(node_to_primary[node.name])
2180 elif field == "sinst_list":
2181 val = list(node_to_secondary[node.name])
2182 elif field == "pinst_cnt":
2183 val = len(node_to_primary[node.name])
2184 elif field == "sinst_cnt":
2185 val = len(node_to_secondary[node.name])
2186 elif field == "pip":
2187 val = node.primary_ip
2188 elif field == "sip":
2189 val = node.secondary_ip
2190 elif field == "tags":
2191 val = list(node.GetTags())
2192 elif field == "serial_no":
2193 val = node.serial_no
2194 elif field == "master_candidate":
2195 val = node.master_candidate
2196 elif field == "master":
2197 val = node.name == master_node
2198 elif field == "offline":
2200 elif field == "drained":
2202 elif self._FIELDS_DYNAMIC.Matches(field):
2203 val = live_data[node.name].get(field, None)
2204 elif field == "role":
2205 if node.name == master_node:
2207 elif node.master_candidate:
2216 raise errors.ParameterError(field)
2217 node_output.append(val)
2218 output.append(node_output)
2223 class LUQueryNodeVolumes(NoHooksLU):
2224 """Logical unit for getting volumes on node(s).
2227 _OP_REQP = ["nodes", "output_fields"]
2229 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2230 _FIELDS_STATIC = utils.FieldSet("node")
2232 def ExpandNames(self):
2233 _CheckOutputFields(static=self._FIELDS_STATIC,
2234 dynamic=self._FIELDS_DYNAMIC,
2235 selected=self.op.output_fields)
2237 self.needed_locks = {}
2238 self.share_locks[locking.LEVEL_NODE] = 1
2239 if not self.op.nodes:
2240 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2242 self.needed_locks[locking.LEVEL_NODE] = \
2243 _GetWantedNodes(self, self.op.nodes)
2245 def CheckPrereq(self):
2246 """Check prerequisites.
2248 This checks that the fields required are valid output fields.
2251 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2253 def Exec(self, feedback_fn):
2254 """Computes the list of nodes and their attributes.
2257 nodenames = self.nodes
2258 volumes = self.rpc.call_node_volumes(nodenames)
2260 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2261 in self.cfg.GetInstanceList()]
2263 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2266 for node in nodenames:
2267 nresult = volumes[node]
2270 msg = nresult.fail_msg
2272 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2275 node_vols = nresult.payload[:]
2276 node_vols.sort(key=lambda vol: vol['dev'])
2278 for vol in node_vols:
2280 for field in self.op.output_fields:
2283 elif field == "phys":
2287 elif field == "name":
2289 elif field == "size":
2290 val = int(float(vol['size']))
2291 elif field == "instance":
2293 if node not in lv_by_node[inst]:
2295 if vol['name'] in lv_by_node[inst][node]:
2301 raise errors.ParameterError(field)
2302 node_output.append(str(val))
2304 output.append(node_output)
2309 class LUQueryNodeStorage(NoHooksLU):
2310 """Logical unit for getting information on storage units on node(s).
2313 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2315 _FIELDS_STATIC = utils.FieldSet("node")
2317 def ExpandNames(self):
2318 storage_type = self.op.storage_type
2320 if storage_type not in constants.VALID_STORAGE_FIELDS:
2321 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2323 dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2325 _CheckOutputFields(static=self._FIELDS_STATIC,
2326 dynamic=utils.FieldSet(*dynamic_fields),
2327 selected=self.op.output_fields)
2329 self.needed_locks = {}
2330 self.share_locks[locking.LEVEL_NODE] = 1
2333 self.needed_locks[locking.LEVEL_NODE] = \
2334 _GetWantedNodes(self, self.op.nodes)
2336 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2338 def CheckPrereq(self):
2339 """Check prerequisites.
2341 This checks that the fields required are valid output fields.
2344 self.op.name = getattr(self.op, "name", None)
2346 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2348 def Exec(self, feedback_fn):
2349 """Computes the list of nodes and their attributes.
2352 # Special case for file storage
2353 if self.op.storage_type == constants.ST_FILE:
2354 st_args = [self.cfg.GetFileStorageDir()]
2358 # Always get name to sort by
2359 if constants.SF_NAME in self.op.output_fields:
2360 fields = self.op.output_fields[:]
2362 fields = [constants.SF_NAME] + self.op.output_fields
2364 # Never ask for node as it's only known to the LU
2365 while "node" in fields:
2366 fields.remove("node")
2368 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2369 name_idx = field_idx[constants.SF_NAME]
2371 data = self.rpc.call_storage_list(self.nodes,
2372 self.op.storage_type, st_args,
2373 self.op.name, fields)
2377 for node in utils.NiceSort(self.nodes):
2378 nresult = data[node]
2382 msg = nresult.fail_msg
2384 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2387 rows = dict([(row[name_idx], row) for row in nresult.payload])
2389 for name in utils.NiceSort(rows.keys()):
2394 for field in self.op.output_fields:
2397 elif field in field_idx:
2398 val = row[field_idx[field]]
2400 raise errors.ParameterError(field)
2409 class LUAddNode(LogicalUnit):
2410 """Logical unit for adding node to the cluster.
2414 HTYPE = constants.HTYPE_NODE
2415 _OP_REQP = ["node_name"]
2417 def BuildHooksEnv(self):
2420 This will run on all nodes before, and on all nodes + the new node after.
2424 "OP_TARGET": self.op.node_name,
2425 "NODE_NAME": self.op.node_name,
2426 "NODE_PIP": self.op.primary_ip,
2427 "NODE_SIP": self.op.secondary_ip,
2429 nodes_0 = self.cfg.GetNodeList()
2430 nodes_1 = nodes_0 + [self.op.node_name, ]
2431 return env, nodes_0, nodes_1
2433 def CheckPrereq(self):
2434 """Check prerequisites.
2437 - the new node is not already in the config
2439 - its parameters (single/dual homed) matches the cluster
2441 Any errors are signaled by raising errors.OpPrereqError.
2444 node_name = self.op.node_name
2447 dns_data = utils.HostInfo(node_name)
2449 node = dns_data.name
2450 primary_ip = self.op.primary_ip = dns_data.ip
2451 secondary_ip = getattr(self.op, "secondary_ip", None)
2452 if secondary_ip is None:
2453 secondary_ip = primary_ip
2454 if not utils.IsValidIP(secondary_ip):
2455 raise errors.OpPrereqError("Invalid secondary IP given")
2456 self.op.secondary_ip = secondary_ip
2458 node_list = cfg.GetNodeList()
2459 if not self.op.readd and node in node_list:
2460 raise errors.OpPrereqError("Node %s is already in the configuration" %
2462 elif self.op.readd and node not in node_list:
2463 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2465 for existing_node_name in node_list:
2466 existing_node = cfg.GetNodeInfo(existing_node_name)
2468 if self.op.readd and node == existing_node_name:
2469 if (existing_node.primary_ip != primary_ip or
2470 existing_node.secondary_ip != secondary_ip):
2471 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2472 " address configuration as before")
2475 if (existing_node.primary_ip == primary_ip or
2476 existing_node.secondary_ip == primary_ip or
2477 existing_node.primary_ip == secondary_ip or
2478 existing_node.secondary_ip == secondary_ip):
2479 raise errors.OpPrereqError("New node ip address(es) conflict with"
2480 " existing node %s" % existing_node.name)
2482 # check that the type of the node (single versus dual homed) is the
2483 # same as for the master
2484 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2485 master_singlehomed = myself.secondary_ip == myself.primary_ip
2486 newbie_singlehomed = secondary_ip == primary_ip
2487 if master_singlehomed != newbie_singlehomed:
2488 if master_singlehomed:
2489 raise errors.OpPrereqError("The master has no private ip but the"
2490 " new node has one")
2492 raise errors.OpPrereqError("The master has a private ip but the"
2493 " new node doesn't have one")
2495 # checks reachability
2496 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2497 raise errors.OpPrereqError("Node not reachable by ping")
2499 if not newbie_singlehomed:
2500 # check reachability from my secondary ip to newbie's secondary ip
2501 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2502 source=myself.secondary_ip):
2503 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2504 " based ping to noded port")
2506 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2511 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2512 # the new node will increase mc_max with one, so:
2513 mc_max = min(mc_max + 1, cp_size)
2514 self.master_candidate = mc_now < mc_max
2517 self.new_node = self.cfg.GetNodeInfo(node)
2518 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2520 self.new_node = objects.Node(name=node,
2521 primary_ip=primary_ip,
2522 secondary_ip=secondary_ip,
2523 master_candidate=self.master_candidate,
2524 offline=False, drained=False)
2526 def Exec(self, feedback_fn):
2527 """Adds the new node to the cluster.
2530 new_node = self.new_node
2531 node = new_node.name
2533 # for re-adds, reset the offline/drained/master-candidate flags;
2534 # we need to reset here, otherwise offline would prevent RPC calls
2535 # later in the procedure; this also means that if the re-add
2536 # fails, we are left with a non-offlined, broken node
2538 new_node.drained = new_node.offline = False
2539 self.LogInfo("Readding a node, the offline/drained flags were reset")
2540 # if we demote the node, we do cleanup later in the procedure
2541 new_node.master_candidate = self.master_candidate
2543 # notify the user about any possible mc promotion
2544 if new_node.master_candidate:
2545 self.LogInfo("Node will be a master candidate")
2547 # check connectivity
2548 result = self.rpc.call_version([node])[node]
2549 result.Raise("Can't get version information from node %s" % node)
2550 if constants.PROTOCOL_VERSION == result.payload:
2551 logging.info("Communication to node %s fine, sw version %s match",
2552 node, result.payload)
2554 raise errors.OpExecError("Version mismatch master version %s,"
2555 " node version %s" %
2556 (constants.PROTOCOL_VERSION, result.payload))
2559 logging.info("Copy ssh key to node %s", node)
2560 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2562 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2563 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2569 keyarray.append(f.read())
2573 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2575 keyarray[3], keyarray[4], keyarray[5])
2576 result.Raise("Cannot transfer ssh keys to the new node")
2578 # Add node to our /etc/hosts, and add key to known_hosts
2579 if self.cfg.GetClusterInfo().modify_etc_hosts:
2580 utils.AddHostToEtcHosts(new_node.name)
2582 if new_node.secondary_ip != new_node.primary_ip:
2583 result = self.rpc.call_node_has_ip_address(new_node.name,
2584 new_node.secondary_ip)
2585 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2587 if not result.payload:
2588 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2589 " you gave (%s). Please fix and re-run this"
2590 " command." % new_node.secondary_ip)
2592 node_verify_list = [self.cfg.GetMasterNode()]
2593 node_verify_param = {
2595 # TODO: do a node-net-test as well?
2598 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2599 self.cfg.GetClusterName())
2600 for verifier in node_verify_list:
2601 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2602 nl_payload = result[verifier].payload['nodelist']
2604 for failed in nl_payload:
2605 feedback_fn("ssh/hostname verification failed %s -> %s" %
2606 (verifier, nl_payload[failed]))
2607 raise errors.OpExecError("ssh/hostname verification failed.")
2610 _RedistributeAncillaryFiles(self)
2611 self.context.ReaddNode(new_node)
2612 # make sure we redistribute the config
2613 self.cfg.Update(new_node)
2614 # and make sure the new node will not have old files around
2615 if not new_node.master_candidate:
2616 result = self.rpc.call_node_demote_from_mc(new_node.name)
2617 msg = result.RemoteFailMsg()
2619 self.LogWarning("Node failed to demote itself from master"
2620 " candidate status: %s" % msg)
2622 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2623 self.context.AddNode(new_node)
2626 class LUSetNodeParams(LogicalUnit):
2627 """Modifies the parameters of a node.
2630 HPATH = "node-modify"
2631 HTYPE = constants.HTYPE_NODE
2632 _OP_REQP = ["node_name"]
2635 def CheckArguments(self):
2636 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2637 if node_name is None:
2638 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2639 self.op.node_name = node_name
2640 _CheckBooleanOpField(self.op, 'master_candidate')
2641 _CheckBooleanOpField(self.op, 'offline')
2642 _CheckBooleanOpField(self.op, 'drained')
2643 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2644 if all_mods.count(None) == 3:
2645 raise errors.OpPrereqError("Please pass at least one modification")
2646 if all_mods.count(True) > 1:
2647 raise errors.OpPrereqError("Can't set the node into more than one"
2648 " state at the same time")
2650 def ExpandNames(self):
2651 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2653 def BuildHooksEnv(self):
2656 This runs on the master node.
2660 "OP_TARGET": self.op.node_name,
2661 "MASTER_CANDIDATE": str(self.op.master_candidate),
2662 "OFFLINE": str(self.op.offline),
2663 "DRAINED": str(self.op.drained),
2665 nl = [self.cfg.GetMasterNode(),
2669 def CheckPrereq(self):
2670 """Check prerequisites.
2672 This only checks the instance list against the existing names.
2675 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2677 if ((self.op.master_candidate == False or self.op.offline == True or
2678 self.op.drained == True) and node.master_candidate):
2679 # we will demote the node from master_candidate
2680 if self.op.node_name == self.cfg.GetMasterNode():
2681 raise errors.OpPrereqError("The master node has to be a"
2682 " master candidate, online and not drained")
2683 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2684 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2685 if num_candidates <= cp_size:
2686 msg = ("Not enough master candidates (desired"
2687 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2689 self.LogWarning(msg)
2691 raise errors.OpPrereqError(msg)
2693 if (self.op.master_candidate == True and
2694 ((node.offline and not self.op.offline == False) or
2695 (node.drained and not self.op.drained == False))):
2696 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2697 " to master_candidate" % node.name)
2701 def Exec(self, feedback_fn):
2710 if self.op.offline is not None:
2711 node.offline = self.op.offline
2712 result.append(("offline", str(self.op.offline)))
2713 if self.op.offline == True:
2714 if node.master_candidate:
2715 node.master_candidate = False
2717 result.append(("master_candidate", "auto-demotion due to offline"))
2719 node.drained = False
2720 result.append(("drained", "clear drained status due to offline"))
2722 if self.op.master_candidate is not None:
2723 node.master_candidate = self.op.master_candidate
2725 result.append(("master_candidate", str(self.op.master_candidate)))
2726 if self.op.master_candidate == False:
2727 rrc = self.rpc.call_node_demote_from_mc(node.name)
2730 self.LogWarning("Node failed to demote itself: %s" % msg)
2732 if self.op.drained is not None:
2733 node.drained = self.op.drained
2734 result.append(("drained", str(self.op.drained)))
2735 if self.op.drained == True:
2736 if node.master_candidate:
2737 node.master_candidate = False
2739 result.append(("master_candidate", "auto-demotion due to drain"))
2740 rrc = self.rpc.call_node_demote_from_mc(node.name)
2741 msg = rrc.RemoteFailMsg()
2743 self.LogWarning("Node failed to demote itself: %s" % msg)
2745 node.offline = False
2746 result.append(("offline", "clear offline status due to drain"))
2748 # this will trigger configuration file update, if needed
2749 self.cfg.Update(node)
2750 # this will trigger job queue propagation or cleanup
2752 self.context.ReaddNode(node)
2757 class LUPowercycleNode(NoHooksLU):
2758 """Powercycles a node.
2761 _OP_REQP = ["node_name", "force"]
2764 def CheckArguments(self):
2765 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2766 if node_name is None:
2767 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2768 self.op.node_name = node_name
2769 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2770 raise errors.OpPrereqError("The node is the master and the force"
2771 " parameter was not set")
2773 def ExpandNames(self):
2774 """Locking for PowercycleNode.
2776 This is a last-resource option and shouldn't block on other
2777 jobs. Therefore, we grab no locks.
2780 self.needed_locks = {}
2782 def CheckPrereq(self):
2783 """Check prerequisites.
2785 This LU has no prereqs.
2790 def Exec(self, feedback_fn):
2794 result = self.rpc.call_node_powercycle(self.op.node_name,
2795 self.cfg.GetHypervisorType())
2796 result.Raise("Failed to schedule the reboot")
2797 return result.payload
2800 class LUQueryClusterInfo(NoHooksLU):
2801 """Query cluster configuration.
2807 def ExpandNames(self):
2808 self.needed_locks = {}
2810 def CheckPrereq(self):
2811 """No prerequsites needed for this LU.
2816 def Exec(self, feedback_fn):
2817 """Return cluster config.
2820 cluster = self.cfg.GetClusterInfo()
2822 "software_version": constants.RELEASE_VERSION,
2823 "protocol_version": constants.PROTOCOL_VERSION,
2824 "config_version": constants.CONFIG_VERSION,
2825 "os_api_version": max(constants.OS_API_VERSIONS),
2826 "export_version": constants.EXPORT_VERSION,
2827 "architecture": (platform.architecture()[0], platform.machine()),
2828 "name": cluster.cluster_name,
2829 "master": cluster.master_node,
2830 "default_hypervisor": cluster.enabled_hypervisors[0],
2831 "enabled_hypervisors": cluster.enabled_hypervisors,
2832 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2833 for hypervisor_name in cluster.enabled_hypervisors]),
2834 "beparams": cluster.beparams,
2835 "nicparams": cluster.nicparams,
2836 "candidate_pool_size": cluster.candidate_pool_size,
2837 "master_netdev": cluster.master_netdev,
2838 "volume_group_name": cluster.volume_group_name,
2839 "file_storage_dir": cluster.file_storage_dir,
2845 class LUQueryConfigValues(NoHooksLU):
2846 """Return configuration values.
2851 _FIELDS_DYNAMIC = utils.FieldSet()
2852 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2854 def ExpandNames(self):
2855 self.needed_locks = {}
2857 _CheckOutputFields(static=self._FIELDS_STATIC,
2858 dynamic=self._FIELDS_DYNAMIC,
2859 selected=self.op.output_fields)
2861 def CheckPrereq(self):
2862 """No prerequisites.
2867 def Exec(self, feedback_fn):
2868 """Dump a representation of the cluster config to the standard output.
2872 for field in self.op.output_fields:
2873 if field == "cluster_name":
2874 entry = self.cfg.GetClusterName()
2875 elif field == "master_node":
2876 entry = self.cfg.GetMasterNode()
2877 elif field == "drain_flag":
2878 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2880 raise errors.ParameterError(field)
2881 values.append(entry)
2885 class LUActivateInstanceDisks(NoHooksLU):
2886 """Bring up an instance's disks.
2889 _OP_REQP = ["instance_name"]
2892 def ExpandNames(self):
2893 self._ExpandAndLockInstance()
2894 self.needed_locks[locking.LEVEL_NODE] = []
2895 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2897 def DeclareLocks(self, level):
2898 if level == locking.LEVEL_NODE:
2899 self._LockInstancesNodes()
2901 def CheckPrereq(self):
2902 """Check prerequisites.
2904 This checks that the instance is in the cluster.
2907 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2908 assert self.instance is not None, \
2909 "Cannot retrieve locked instance %s" % self.op.instance_name
2910 _CheckNodeOnline(self, self.instance.primary_node)
2912 def Exec(self, feedback_fn):
2913 """Activate the disks.
2916 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2918 raise errors.OpExecError("Cannot activate block devices")
2923 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2924 """Prepare the block devices for an instance.
2926 This sets up the block devices on all nodes.
2928 @type lu: L{LogicalUnit}
2929 @param lu: the logical unit on whose behalf we execute
2930 @type instance: L{objects.Instance}
2931 @param instance: the instance for whose disks we assemble
2932 @type ignore_secondaries: boolean
2933 @param ignore_secondaries: if true, errors on secondary nodes
2934 won't result in an error return from the function
2935 @return: False if the operation failed, otherwise a list of
2936 (host, instance_visible_name, node_visible_name)
2937 with the mapping from node devices to instance devices
2942 iname = instance.name
2943 # With the two passes mechanism we try to reduce the window of
2944 # opportunity for the race condition of switching DRBD to primary
2945 # before handshaking occured, but we do not eliminate it
2947 # The proper fix would be to wait (with some limits) until the
2948 # connection has been made and drbd transitions from WFConnection
2949 # into any other network-connected state (Connected, SyncTarget,
2952 # 1st pass, assemble on all nodes in secondary mode
2953 for inst_disk in instance.disks:
2954 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2955 lu.cfg.SetDiskID(node_disk, node)
2956 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2957 msg = result.fail_msg
2959 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2960 " (is_primary=False, pass=1): %s",
2961 inst_disk.iv_name, node, msg)
2962 if not ignore_secondaries:
2965 # FIXME: race condition on drbd migration to primary
2967 # 2nd pass, do only the primary node
2968 for inst_disk in instance.disks:
2969 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2970 if node != instance.primary_node:
2972 lu.cfg.SetDiskID(node_disk, node)
2973 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2974 msg = result.fail_msg
2976 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2977 " (is_primary=True, pass=2): %s",
2978 inst_disk.iv_name, node, msg)
2980 device_info.append((instance.primary_node, inst_disk.iv_name,
2983 # leave the disks configured for the primary node
2984 # this is a workaround that would be fixed better by
2985 # improving the logical/physical id handling
2986 for disk in instance.disks:
2987 lu.cfg.SetDiskID(disk, instance.primary_node)
2989 return disks_ok, device_info
2992 def _StartInstanceDisks(lu, instance, force):
2993 """Start the disks of an instance.
2996 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2997 ignore_secondaries=force)
2999 _ShutdownInstanceDisks(lu, instance)
3000 if force is not None and not force:
3001 lu.proc.LogWarning("", hint="If the message above refers to a"
3003 " you can retry the operation using '--force'.")
3004 raise errors.OpExecError("Disk consistency error")
3007 class LUDeactivateInstanceDisks(NoHooksLU):
3008 """Shutdown an instance's disks.
3011 _OP_REQP = ["instance_name"]
3014 def ExpandNames(self):
3015 self._ExpandAndLockInstance()
3016 self.needed_locks[locking.LEVEL_NODE] = []
3017 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3019 def DeclareLocks(self, level):
3020 if level == locking.LEVEL_NODE:
3021 self._LockInstancesNodes()
3023 def CheckPrereq(self):
3024 """Check prerequisites.
3026 This checks that the instance is in the cluster.
3029 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3030 assert self.instance is not None, \
3031 "Cannot retrieve locked instance %s" % self.op.instance_name
3033 def Exec(self, feedback_fn):
3034 """Deactivate the disks
3037 instance = self.instance
3038 _SafeShutdownInstanceDisks(self, instance)
3041 def _SafeShutdownInstanceDisks(lu, instance):
3042 """Shutdown block devices of an instance.
3044 This function checks if an instance is running, before calling
3045 _ShutdownInstanceDisks.
3048 pnode = instance.primary_node
3049 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3050 ins_l.Raise("Can't contact node %s" % pnode)
3052 if instance.name in ins_l.payload:
3053 raise errors.OpExecError("Instance is running, can't shutdown"
3056 _ShutdownInstanceDisks(lu, instance)
3059 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3060 """Shutdown block devices of an instance.
3062 This does the shutdown on all nodes of the instance.
3064 If the ignore_primary is false, errors on the primary node are
3069 for disk in instance.disks:
3070 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3071 lu.cfg.SetDiskID(top_disk, node)
3072 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3073 msg = result.fail_msg
3075 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3076 disk.iv_name, node, msg)
3077 if not ignore_primary or node != instance.primary_node:
3082 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3083 """Checks if a node has enough free memory.
3085 This function check if a given node has the needed amount of free
3086 memory. In case the node has less memory or we cannot get the
3087 information from the node, this function raise an OpPrereqError
3090 @type lu: C{LogicalUnit}
3091 @param lu: a logical unit from which we get configuration data
3093 @param node: the node to check
3094 @type reason: C{str}
3095 @param reason: string to use in the error message
3096 @type requested: C{int}
3097 @param requested: the amount of memory in MiB to check for
3098 @type hypervisor_name: C{str}
3099 @param hypervisor_name: the hypervisor to ask for memory stats
3100 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3101 we cannot check the node
3104 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3105 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3106 free_mem = nodeinfo[node].payload.get('memory_free', None)
3107 if not isinstance(free_mem, int):
3108 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3109 " was '%s'" % (node, free_mem))
3110 if requested > free_mem:
3111 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3112 " needed %s MiB, available %s MiB" %
3113 (node, reason, requested, free_mem))
3116 class LUStartupInstance(LogicalUnit):
3117 """Starts an instance.
3120 HPATH = "instance-start"
3121 HTYPE = constants.HTYPE_INSTANCE
3122 _OP_REQP = ["instance_name", "force"]
3125 def ExpandNames(self):
3126 self._ExpandAndLockInstance()
3128 def BuildHooksEnv(self):
3131 This runs on master, primary and secondary nodes of the instance.
3135 "FORCE": self.op.force,
3137 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3138 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3141 def CheckPrereq(self):
3142 """Check prerequisites.
3144 This checks that the instance is in the cluster.
3147 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3148 assert self.instance is not None, \
3149 "Cannot retrieve locked instance %s" % self.op.instance_name
3152 self.beparams = getattr(self.op, "beparams", {})
3154 if not isinstance(self.beparams, dict):
3155 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3156 " dict" % (type(self.beparams), ))
3157 # fill the beparams dict
3158 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3159 self.op.beparams = self.beparams
3162 self.hvparams = getattr(self.op, "hvparams", {})
3164 if not isinstance(self.hvparams, dict):
3165 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3166 " dict" % (type(self.hvparams), ))
3168 # check hypervisor parameter syntax (locally)
3169 cluster = self.cfg.GetClusterInfo()
3170 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3171 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3173 filled_hvp.update(self.hvparams)
3174 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3175 hv_type.CheckParameterSyntax(filled_hvp)
3176 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3177 self.op.hvparams = self.hvparams
3179 _CheckNodeOnline(self, instance.primary_node)
3181 bep = self.cfg.GetClusterInfo().FillBE(instance)
3182 # check bridges existence
3183 _CheckInstanceBridgesExist(self, instance)
3185 remote_info = self.rpc.call_instance_info(instance.primary_node,
3187 instance.hypervisor)
3188 remote_info.Raise("Error checking node %s" % instance.primary_node,
3190 if not remote_info.payload: # not running already
3191 _CheckNodeFreeMemory(self, instance.primary_node,
3192 "starting instance %s" % instance.name,
3193 bep[constants.BE_MEMORY], instance.hypervisor)
3195 def Exec(self, feedback_fn):
3196 """Start the instance.
3199 instance = self.instance
3200 force = self.op.force
3202 self.cfg.MarkInstanceUp(instance.name)
3204 node_current = instance.primary_node
3206 _StartInstanceDisks(self, instance, force)
3208 result = self.rpc.call_instance_start(node_current, instance,
3209 self.hvparams, self.beparams)
3210 msg = result.fail_msg
3212 _ShutdownInstanceDisks(self, instance)
3213 raise errors.OpExecError("Could not start instance: %s" % msg)
3216 class LURebootInstance(LogicalUnit):
3217 """Reboot an instance.
3220 HPATH = "instance-reboot"
3221 HTYPE = constants.HTYPE_INSTANCE
3222 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3225 def ExpandNames(self):
3226 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3227 constants.INSTANCE_REBOOT_HARD,
3228 constants.INSTANCE_REBOOT_FULL]:
3229 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3230 (constants.INSTANCE_REBOOT_SOFT,
3231 constants.INSTANCE_REBOOT_HARD,
3232 constants.INSTANCE_REBOOT_FULL))
3233 self._ExpandAndLockInstance()
3235 def BuildHooksEnv(self):
3238 This runs on master, primary and secondary nodes of the instance.
3242 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3243 "REBOOT_TYPE": self.op.reboot_type,
3245 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3246 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3249 def CheckPrereq(self):
3250 """Check prerequisites.
3252 This checks that the instance is in the cluster.
3255 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3256 assert self.instance is not None, \
3257 "Cannot retrieve locked instance %s" % self.op.instance_name
3259 _CheckNodeOnline(self, instance.primary_node)
3261 # check bridges existence
3262 _CheckInstanceBridgesExist(self, instance)
3264 def Exec(self, feedback_fn):
3265 """Reboot the instance.
3268 instance = self.instance
3269 ignore_secondaries = self.op.ignore_secondaries
3270 reboot_type = self.op.reboot_type
3272 node_current = instance.primary_node
3274 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3275 constants.INSTANCE_REBOOT_HARD]:
3276 for disk in instance.disks:
3277 self.cfg.SetDiskID(disk, node_current)
3278 result = self.rpc.call_instance_reboot(node_current, instance,
3280 result.Raise("Could not reboot instance")
3282 result = self.rpc.call_instance_shutdown(node_current, instance)
3283 result.Raise("Could not shutdown instance for full reboot")
3284 _ShutdownInstanceDisks(self, instance)
3285 _StartInstanceDisks(self, instance, ignore_secondaries)
3286 result = self.rpc.call_instance_start(node_current, instance, None, None)
3287 msg = result.fail_msg
3289 _ShutdownInstanceDisks(self, instance)
3290 raise errors.OpExecError("Could not start instance for"
3291 " full reboot: %s" % msg)
3293 self.cfg.MarkInstanceUp(instance.name)
3296 class LUShutdownInstance(LogicalUnit):
3297 """Shutdown an instance.
3300 HPATH = "instance-stop"
3301 HTYPE = constants.HTYPE_INSTANCE
3302 _OP_REQP = ["instance_name"]
3305 def ExpandNames(self):
3306 self._ExpandAndLockInstance()
3308 def BuildHooksEnv(self):
3311 This runs on master, primary and secondary nodes of the instance.
3314 env = _BuildInstanceHookEnvByObject(self, self.instance)
3315 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3318 def CheckPrereq(self):
3319 """Check prerequisites.
3321 This checks that the instance is in the cluster.
3324 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3325 assert self.instance is not None, \
3326 "Cannot retrieve locked instance %s" % self.op.instance_name
3327 _CheckNodeOnline(self, self.instance.primary_node)
3329 def Exec(self, feedback_fn):
3330 """Shutdown the instance.
3333 instance = self.instance
3334 node_current = instance.primary_node
3335 self.cfg.MarkInstanceDown(instance.name)
3336 result = self.rpc.call_instance_shutdown(node_current, instance)
3337 msg = result.fail_msg
3339 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3341 _ShutdownInstanceDisks(self, instance)
3344 class LUReinstallInstance(LogicalUnit):
3345 """Reinstall an instance.
3348 HPATH = "instance-reinstall"
3349 HTYPE = constants.HTYPE_INSTANCE
3350 _OP_REQP = ["instance_name"]
3353 def ExpandNames(self):
3354 self._ExpandAndLockInstance()
3356 def BuildHooksEnv(self):
3359 This runs on master, primary and secondary nodes of the instance.
3362 env = _BuildInstanceHookEnvByObject(self, self.instance)
3363 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3366 def CheckPrereq(self):
3367 """Check prerequisites.
3369 This checks that the instance is in the cluster and is not running.
3372 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3373 assert instance is not None, \
3374 "Cannot retrieve locked instance %s" % self.op.instance_name
3375 _CheckNodeOnline(self, instance.primary_node)
3377 if instance.disk_template == constants.DT_DISKLESS:
3378 raise errors.OpPrereqError("Instance '%s' has no disks" %
3379 self.op.instance_name)
3380 if instance.admin_up:
3381 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3382 self.op.instance_name)
3383 remote_info = self.rpc.call_instance_info(instance.primary_node,
3385 instance.hypervisor)
3386 remote_info.Raise("Error checking node %s" % instance.primary_node,
3388 if remote_info.payload:
3389 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3390 (self.op.instance_name,
3391 instance.primary_node))
3393 self.op.os_type = getattr(self.op, "os_type", None)
3394 if self.op.os_type is not None:
3396 pnode = self.cfg.GetNodeInfo(
3397 self.cfg.ExpandNodeName(instance.primary_node))
3399 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3401 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3402 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3403 (self.op.os_type, pnode.name), prereq=True)
3405 self.instance = instance
3407 def Exec(self, feedback_fn):
3408 """Reinstall the instance.
3411 inst = self.instance
3413 if self.op.os_type is not None:
3414 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3415 inst.os = self.op.os_type
3416 self.cfg.Update(inst)
3418 _StartInstanceDisks(self, inst, None)
3420 feedback_fn("Running the instance OS create scripts...")
3421 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3422 result.Raise("Could not install OS for instance %s on node %s" %
3423 (inst.name, inst.primary_node))
3425 _ShutdownInstanceDisks(self, inst)
3428 class LURenameInstance(LogicalUnit):
3429 """Rename an instance.
3432 HPATH = "instance-rename"
3433 HTYPE = constants.HTYPE_INSTANCE
3434 _OP_REQP = ["instance_name", "new_name"]
3436 def BuildHooksEnv(self):
3439 This runs on master, primary and secondary nodes of the instance.
3442 env = _BuildInstanceHookEnvByObject(self, self.instance)
3443 env["INSTANCE_NEW_NAME"] = self.op.new_name
3444 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3447 def CheckPrereq(self):
3448 """Check prerequisites.
3450 This checks that the instance is in the cluster and is not running.
3453 instance = self.cfg.GetInstanceInfo(
3454 self.cfg.ExpandInstanceName(self.op.instance_name))
3455 if instance is None:
3456 raise errors.OpPrereqError("Instance '%s' not known" %
3457 self.op.instance_name)
3458 _CheckNodeOnline(self, instance.primary_node)
3460 if instance.admin_up:
3461 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3462 self.op.instance_name)
3463 remote_info = self.rpc.call_instance_info(instance.primary_node,
3465 instance.hypervisor)
3466 remote_info.Raise("Error checking node %s" % instance.primary_node,
3468 if remote_info.payload:
3469 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3470 (self.op.instance_name,
3471 instance.primary_node))
3472 self.instance = instance
3474 # new name verification
3475 name_info = utils.HostInfo(self.op.new_name)
3477 self.op.new_name = new_name = name_info.name
3478 instance_list = self.cfg.GetInstanceList()
3479 if new_name in instance_list:
3480 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3483 if not getattr(self.op, "ignore_ip", False):
3484 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3485 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3486 (name_info.ip, new_name))
3489 def Exec(self, feedback_fn):
3490 """Reinstall the instance.
3493 inst = self.instance
3494 old_name = inst.name
3496 if inst.disk_template == constants.DT_FILE:
3497 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3499 self.cfg.RenameInstance(inst.name, self.op.new_name)
3500 # Change the instance lock. This is definitely safe while we hold the BGL
3501 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3502 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3504 # re-read the instance from the configuration after rename
3505 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3507 if inst.disk_template == constants.DT_FILE:
3508 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3509 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3510 old_file_storage_dir,
3511 new_file_storage_dir)
3512 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3513 " (but the instance has been renamed in Ganeti)" %
3514 (inst.primary_node, old_file_storage_dir,
3515 new_file_storage_dir))
3517 _StartInstanceDisks(self, inst, None)
3519 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3521 msg = result.fail_msg
3523 msg = ("Could not run OS rename script for instance %s on node %s"
3524 " (but the instance has been renamed in Ganeti): %s" %
3525 (inst.name, inst.primary_node, msg))
3526 self.proc.LogWarning(msg)
3528 _ShutdownInstanceDisks(self, inst)
3531 class LURemoveInstance(LogicalUnit):
3532 """Remove an instance.
3535 HPATH = "instance-remove"
3536 HTYPE = constants.HTYPE_INSTANCE
3537 _OP_REQP = ["instance_name", "ignore_failures"]
3540 def ExpandNames(self):
3541 self._ExpandAndLockInstance()
3542 self.needed_locks[locking.LEVEL_NODE] = []
3543 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3545 def DeclareLocks(self, level):
3546 if level == locking.LEVEL_NODE:
3547 self._LockInstancesNodes()
3549 def BuildHooksEnv(self):
3552 This runs on master, primary and secondary nodes of the instance.
3555 env = _BuildInstanceHookEnvByObject(self, self.instance)
3556 nl = [self.cfg.GetMasterNode()]
3559 def CheckPrereq(self):
3560 """Check prerequisites.
3562 This checks that the instance is in the cluster.
3565 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3566 assert self.instance is not None, \
3567 "Cannot retrieve locked instance %s" % self.op.instance_name
3569 def Exec(self, feedback_fn):
3570 """Remove the instance.
3573 instance = self.instance
3574 logging.info("Shutting down instance %s on node %s",
3575 instance.name, instance.primary_node)
3577 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3578 msg = result.fail_msg
3580 if self.op.ignore_failures:
3581 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3583 raise errors.OpExecError("Could not shutdown instance %s on"
3585 (instance.name, instance.primary_node, msg))
3587 logging.info("Removing block devices for instance %s", instance.name)
3589 if not _RemoveDisks(self, instance):
3590 if self.op.ignore_failures:
3591 feedback_fn("Warning: can't remove instance's disks")
3593 raise errors.OpExecError("Can't remove instance's disks")
3595 logging.info("Removing instance %s out of cluster config", instance.name)
3597 self.cfg.RemoveInstance(instance.name)
3598 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3601 class LUQueryInstances(NoHooksLU):
3602 """Logical unit for querying instances.
3605 _OP_REQP = ["output_fields", "names", "use_locking"]
3607 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3609 "disk_template", "ip", "mac", "bridge",
3610 "nic_mode", "nic_link",
3611 "sda_size", "sdb_size", "vcpus", "tags",
3612 "network_port", "beparams",
3613 r"(disk)\.(size)/([0-9]+)",
3614 r"(disk)\.(sizes)", "disk_usage",
3615 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3616 r"(nic)\.(bridge)/([0-9]+)",
3617 r"(nic)\.(macs|ips|modes|links|bridges)",
3618 r"(disk|nic)\.(count)",
3619 "serial_no", "hypervisor", "hvparams",] +
3621 for name in constants.HVS_PARAMETERS] +
3623 for name in constants.BES_PARAMETERS])
3624 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3627 def ExpandNames(self):
3628 _CheckOutputFields(static=self._FIELDS_STATIC,
3629 dynamic=self._FIELDS_DYNAMIC,
3630 selected=self.op.output_fields)
3632 self.needed_locks = {}
3633 self.share_locks[locking.LEVEL_INSTANCE] = 1
3634 self.share_locks[locking.LEVEL_NODE] = 1
3637 self.wanted = _GetWantedInstances(self, self.op.names)
3639 self.wanted = locking.ALL_SET
3641 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3642 self.do_locking = self.do_node_query and self.op.use_locking
3644 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3645 self.needed_locks[locking.LEVEL_NODE] = []
3646 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3648 def DeclareLocks(self, level):
3649 if level == locking.LEVEL_NODE and self.do_locking:
3650 self._LockInstancesNodes()
3652 def CheckPrereq(self):
3653 """Check prerequisites.
3658 def Exec(self, feedback_fn):
3659 """Computes the list of nodes and their attributes.
3662 all_info = self.cfg.GetAllInstancesInfo()
3663 if self.wanted == locking.ALL_SET:
3664 # caller didn't specify instance names, so ordering is not important
3666 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3668 instance_names = all_info.keys()
3669 instance_names = utils.NiceSort(instance_names)
3671 # caller did specify names, so we must keep the ordering
3673 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3675 tgt_set = all_info.keys()
3676 missing = set(self.wanted).difference(tgt_set)
3678 raise errors.OpExecError("Some instances were removed before"
3679 " retrieving their data: %s" % missing)
3680 instance_names = self.wanted
3682 instance_list = [all_info[iname] for iname in instance_names]
3684 # begin data gathering
3686 nodes = frozenset([inst.primary_node for inst in instance_list])
3687 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3691 if self.do_node_query:
3693 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3695 result = node_data[name]
3697 # offline nodes will be in both lists
3698 off_nodes.append(name)
3699 if result.failed or result.fail_msg:
3700 bad_nodes.append(name)
3703 live_data.update(result.payload)
3704 # else no instance is alive
3706 live_data = dict([(name, {}) for name in instance_names])
3708 # end data gathering
3713 cluster = self.cfg.GetClusterInfo()
3714 for instance in instance_list:
3716 i_hv = cluster.FillHV(instance)
3717 i_be = cluster.FillBE(instance)
3718 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3719 nic.nicparams) for nic in instance.nics]
3720 for field in self.op.output_fields:
3721 st_match = self._FIELDS_STATIC.Matches(field)
3726 elif field == "pnode":
3727 val = instance.primary_node
3728 elif field == "snodes":
3729 val = list(instance.secondary_nodes)
3730 elif field == "admin_state":
3731 val = instance.admin_up
3732 elif field == "oper_state":
3733 if instance.primary_node in bad_nodes:
3736 val = bool(live_data.get(instance.name))
3737 elif field == "status":
3738 if instance.primary_node in off_nodes:
3739 val = "ERROR_nodeoffline"
3740 elif instance.primary_node in bad_nodes:
3741 val = "ERROR_nodedown"
3743 running = bool(live_data.get(instance.name))
3745 if instance.admin_up:
3750 if instance.admin_up:
3754 elif field == "oper_ram":
3755 if instance.primary_node in bad_nodes:
3757 elif instance.name in live_data:
3758 val = live_data[instance.name].get("memory", "?")
3761 elif field == "vcpus":
3762 val = i_be[constants.BE_VCPUS]
3763 elif field == "disk_template":
3764 val = instance.disk_template
3767 val = instance.nics[0].ip
3770 elif field == "nic_mode":
3772 val = i_nicp[0][constants.NIC_MODE]
3775 elif field == "nic_link":
3777 val = i_nicp[0][constants.NIC_LINK]
3780 elif field == "bridge":
3781 if (instance.nics and
3782 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3783 val = i_nicp[0][constants.NIC_LINK]
3786 elif field == "mac":
3788 val = instance.nics[0].mac
3791 elif field == "sda_size" or field == "sdb_size":
3792 idx = ord(field[2]) - ord('a')
3794 val = instance.FindDisk(idx).size
3795 except errors.OpPrereqError:
3797 elif field == "disk_usage": # total disk usage per node
3798 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3799 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3800 elif field == "tags":
3801 val = list(instance.GetTags())
3802 elif field == "serial_no":
3803 val = instance.serial_no
3804 elif field == "network_port":
3805 val = instance.network_port
3806 elif field == "hypervisor":
3807 val = instance.hypervisor
3808 elif field == "hvparams":
3810 elif (field.startswith(HVPREFIX) and
3811 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3812 val = i_hv.get(field[len(HVPREFIX):], None)
3813 elif field == "beparams":
3815 elif (field.startswith(BEPREFIX) and
3816 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3817 val = i_be.get(field[len(BEPREFIX):], None)
3818 elif st_match and st_match.groups():
3819 # matches a variable list
3820 st_groups = st_match.groups()
3821 if st_groups and st_groups[0] == "disk":
3822 if st_groups[1] == "count":
3823 val = len(instance.disks)
3824 elif st_groups[1] == "sizes":
3825 val = [disk.size for disk in instance.disks]
3826 elif st_groups[1] == "size":
3828 val = instance.FindDisk(st_groups[2]).size
3829 except errors.OpPrereqError:
3832 assert False, "Unhandled disk parameter"
3833 elif st_groups[0] == "nic":
3834 if st_groups[1] == "count":
3835 val = len(instance.nics)
3836 elif st_groups[1] == "macs":
3837 val = [nic.mac for nic in instance.nics]
3838 elif st_groups[1] == "ips":
3839 val = [nic.ip for nic in instance.nics]
3840 elif st_groups[1] == "modes":
3841 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3842 elif st_groups[1] == "links":
3843 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3844 elif st_groups[1] == "bridges":
3847 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3848 val.append(nicp[constants.NIC_LINK])
3853 nic_idx = int(st_groups[2])
3854 if nic_idx >= len(instance.nics):
3857 if st_groups[1] == "mac":
3858 val = instance.nics[nic_idx].mac
3859 elif st_groups[1] == "ip":
3860 val = instance.nics[nic_idx].ip
3861 elif st_groups[1] == "mode":
3862 val = i_nicp[nic_idx][constants.NIC_MODE]
3863 elif st_groups[1] == "link":
3864 val = i_nicp[nic_idx][constants.NIC_LINK]
3865 elif st_groups[1] == "bridge":
3866 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3867 if nic_mode == constants.NIC_MODE_BRIDGED:
3868 val = i_nicp[nic_idx][constants.NIC_LINK]
3872 assert False, "Unhandled NIC parameter"
3874 assert False, ("Declared but unhandled variable parameter '%s'" %
3877 assert False, "Declared but unhandled parameter '%s'" % field
3884 class LUFailoverInstance(LogicalUnit):
3885 """Failover an instance.
3888 HPATH = "instance-failover"
3889 HTYPE = constants.HTYPE_INSTANCE
3890 _OP_REQP = ["instance_name", "ignore_consistency"]
3893 def ExpandNames(self):
3894 self._ExpandAndLockInstance()
3895 self.needed_locks[locking.LEVEL_NODE] = []
3896 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3898 def DeclareLocks(self, level):
3899 if level == locking.LEVEL_NODE:
3900 self._LockInstancesNodes()
3902 def BuildHooksEnv(self):
3905 This runs on master, primary and secondary nodes of the instance.
3909 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3911 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3912 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3915 def CheckPrereq(self):
3916 """Check prerequisites.
3918 This checks that the instance is in the cluster.
3921 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3922 assert self.instance is not None, \
3923 "Cannot retrieve locked instance %s" % self.op.instance_name
3925 bep = self.cfg.GetClusterInfo().FillBE(instance)
3926 if instance.disk_template not in constants.DTS_NET_MIRROR:
3927 raise errors.OpPrereqError("Instance's disk layout is not"
3928 " network mirrored, cannot failover.")
3930 secondary_nodes = instance.secondary_nodes
3931 if not secondary_nodes:
3932 raise errors.ProgrammerError("no secondary node but using "
3933 "a mirrored disk template")
3935 target_node = secondary_nodes[0]
3936 _CheckNodeOnline(self, target_node)
3937 _CheckNodeNotDrained(self, target_node)
3938 if instance.admin_up:
3939 # check memory requirements on the secondary node
3940 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3941 instance.name, bep[constants.BE_MEMORY],
3942 instance.hypervisor)
3944 self.LogInfo("Not checking memory on the secondary node as"
3945 " instance will not be started")
3947 # check bridge existance
3948 _CheckInstanceBridgesExist(self, instance, node=target_node)
3950 def Exec(self, feedback_fn):
3951 """Failover an instance.
3953 The failover is done by shutting it down on its present node and
3954 starting it on the secondary.
3957 instance = self.instance
3959 source_node = instance.primary_node
3960 target_node = instance.secondary_nodes[0]
3962 feedback_fn("* checking disk consistency between source and target")
3963 for dev in instance.disks:
3964 # for drbd, these are drbd over lvm
3965 if not _CheckDiskConsistency(self, dev, target_node, False):
3966 if instance.admin_up and not self.op.ignore_consistency:
3967 raise errors.OpExecError("Disk %s is degraded on target node,"
3968 " aborting failover." % dev.iv_name)
3970 feedback_fn("* shutting down instance on source node")
3971 logging.info("Shutting down instance %s on node %s",
3972 instance.name, source_node)
3974 result = self.rpc.call_instance_shutdown(source_node, instance)
3975 msg = result.fail_msg
3977 if self.op.ignore_consistency:
3978 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3979 " Proceeding anyway. Please make sure node"
3980 " %s is down. Error details: %s",
3981 instance.name, source_node, source_node, msg)
3983 raise errors.OpExecError("Could not shutdown instance %s on"
3985 (instance.name, source_node, msg))
3987 feedback_fn("* deactivating the instance's disks on source node")
3988 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3989 raise errors.OpExecError("Can't shut down the instance's disks.")
3991 instance.primary_node = target_node
3992 # distribute new instance config to the other nodes
3993 self.cfg.Update(instance)
3995 # Only start the instance if it's marked as up
3996 if instance.admin_up:
3997 feedback_fn("* activating the instance's disks on target node")
3998 logging.info("Starting instance %s on node %s",
3999 instance.name, target_node)
4001 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4002 ignore_secondaries=True)
4004 _ShutdownInstanceDisks(self, instance)
4005 raise errors.OpExecError("Can't activate the instance's disks")
4007 feedback_fn("* starting the instance on the target node")
4008 result = self.rpc.call_instance_start(target_node, instance, None, None)
4009 msg = result.fail_msg
4011 _ShutdownInstanceDisks(self, instance)
4012 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4013 (instance.name, target_node, msg))
4016 class LUMigrateInstance(LogicalUnit):
4017 """Migrate an instance.
4019 This is migration without shutting down, compared to the failover,
4020 which is done with shutdown.
4023 HPATH = "instance-migrate"
4024 HTYPE = constants.HTYPE_INSTANCE
4025 _OP_REQP = ["instance_name", "live", "cleanup"]
4029 def ExpandNames(self):
4030 self._ExpandAndLockInstance()
4032 self.needed_locks[locking.LEVEL_NODE] = []
4033 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4035 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4036 self.op.live, self.op.cleanup)
4037 self.tasklets = [self._migrater]
4039 def DeclareLocks(self, level):
4040 if level == locking.LEVEL_NODE:
4041 self._LockInstancesNodes()
4043 def BuildHooksEnv(self):
4046 This runs on master, primary and secondary nodes of the instance.
4049 instance = self._migrater.instance
4050 env = _BuildInstanceHookEnvByObject(self, instance)
4051 env["MIGRATE_LIVE"] = self.op.live
4052 env["MIGRATE_CLEANUP"] = self.op.cleanup
4053 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4057 class LUMigrateNode(LogicalUnit):
4058 """Migrate all instances from a node.
4061 HPATH = "node-migrate"
4062 HTYPE = constants.HTYPE_NODE
4063 _OP_REQP = ["node_name", "live"]
4066 def ExpandNames(self):
4067 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4068 if self.op.node_name is None:
4069 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4071 self.needed_locks = {
4072 locking.LEVEL_NODE: [self.op.node_name],
4075 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4077 # Create tasklets for migrating instances for all instances on this node
4081 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4082 logging.debug("Migrating instance %s", inst.name)
4083 names.append(inst.name)
4085 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4087 self.tasklets = tasklets
4089 # Declare instance locks
4090 self.needed_locks[locking.LEVEL_INSTANCE] = names
4092 def DeclareLocks(self, level):
4093 if level == locking.LEVEL_NODE:
4094 self._LockInstancesNodes()
4096 def BuildHooksEnv(self):
4099 This runs on the master, the primary and all the secondaries.
4103 "NODE_NAME": self.op.node_name,
4106 nl = [self.cfg.GetMasterNode()]
4108 return (env, nl, nl)
4111 class TLMigrateInstance(Tasklet):
4112 def __init__(self, lu, instance_name, live, cleanup):
4113 """Initializes this class.
4116 Tasklet.__init__(self, lu)
4119 self.instance_name = instance_name
4121 self.cleanup = cleanup
4123 def CheckPrereq(self):
4124 """Check prerequisites.
4126 This checks that the instance is in the cluster.
4129 instance = self.cfg.GetInstanceInfo(
4130 self.cfg.ExpandInstanceName(self.instance_name))
4131 if instance is None:
4132 raise errors.OpPrereqError("Instance '%s' not known" %
4135 if instance.disk_template != constants.DT_DRBD8:
4136 raise errors.OpPrereqError("Instance's disk layout is not"
4137 " drbd8, cannot migrate.")
4139 secondary_nodes = instance.secondary_nodes
4140 if not secondary_nodes:
4141 raise errors.ConfigurationError("No secondary node but using"
4142 " drbd8 disk template")
4144 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4146 target_node = secondary_nodes[0]
4147 # check memory requirements on the secondary node
4148 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4149 instance.name, i_be[constants.BE_MEMORY],
4150 instance.hypervisor)
4152 # check bridge existance
4153 _CheckInstanceBridgesExist(self, instance, node=target_node)
4155 if not self.cleanup:
4156 _CheckNodeNotDrained(self, target_node)
4157 result = self.rpc.call_instance_migratable(instance.primary_node,
4159 result.Raise("Can't migrate, please use failover", prereq=True)
4161 self.instance = instance
4163 def _WaitUntilSync(self):
4164 """Poll with custom rpc for disk sync.
4166 This uses our own step-based rpc call.
4169 self.feedback_fn("* wait until resync is done")
4173 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4175 self.instance.disks)
4177 for node, nres in result.items():
4178 nres.Raise("Cannot resync disks on node %s" % node)
4179 node_done, node_percent = nres.payload
4180 all_done = all_done and node_done
4181 if node_percent is not None:
4182 min_percent = min(min_percent, node_percent)
4184 if min_percent < 100:
4185 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4188 def _EnsureSecondary(self, node):
4189 """Demote a node to secondary.
4192 self.feedback_fn("* switching node %s to secondary mode" % node)
4194 for dev in self.instance.disks:
4195 self.cfg.SetDiskID(dev, node)
4197 result = self.rpc.call_blockdev_close(node, self.instance.name,
4198 self.instance.disks)
4199 result.Raise("Cannot change disk to secondary on node %s" % node)
4201 def _GoStandalone(self):
4202 """Disconnect from the network.
4205 self.feedback_fn("* changing into standalone mode")
4206 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4207 self.instance.disks)
4208 for node, nres in result.items():
4209 nres.Raise("Cannot disconnect disks node %s" % node)
4211 def _GoReconnect(self, multimaster):
4212 """Reconnect to the network.
4218 msg = "single-master"
4219 self.feedback_fn("* changing disks into %s mode" % msg)
4220 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4221 self.instance.disks,
4222 self.instance.name, multimaster)
4223 for node, nres in result.items():
4224 nres.Raise("Cannot change disks config on node %s" % node)
4226 def _ExecCleanup(self):
4227 """Try to cleanup after a failed migration.
4229 The cleanup is done by:
4230 - check that the instance is running only on one node
4231 (and update the config if needed)
4232 - change disks on its secondary node to secondary
4233 - wait until disks are fully synchronized
4234 - disconnect from the network
4235 - change disks into single-master mode
4236 - wait again until disks are fully synchronized
4239 instance = self.instance
4240 target_node = self.target_node
4241 source_node = self.source_node
4243 # check running on only one node
4244 self.feedback_fn("* checking where the instance actually runs"
4245 " (if this hangs, the hypervisor might be in"
4247 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4248 for node, result in ins_l.items():
4249 result.Raise("Can't contact node %s" % node)
4251 runningon_source = instance.name in ins_l[source_node].payload
4252 runningon_target = instance.name in ins_l[target_node].payload
4254 if runningon_source and runningon_target:
4255 raise errors.OpExecError("Instance seems to be running on two nodes,"
4256 " or the hypervisor is confused. You will have"
4257 " to ensure manually that it runs only on one"
4258 " and restart this operation.")
4260 if not (runningon_source or runningon_target):
4261 raise errors.OpExecError("Instance does not seem to be running at all."
4262 " In this case, it's safer to repair by"
4263 " running 'gnt-instance stop' to ensure disk"
4264 " shutdown, and then restarting it.")
4266 if runningon_target:
4267 # the migration has actually succeeded, we need to update the config
4268 self.feedback_fn("* instance running on secondary node (%s),"
4269 " updating config" % target_node)
4270 instance.primary_node = target_node
4271 self.cfg.Update(instance)
4272 demoted_node = source_node
4274 self.feedback_fn("* instance confirmed to be running on its"
4275 " primary node (%s)" % source_node)
4276 demoted_node = target_node
4278 self._EnsureSecondary(demoted_node)
4280 self._WaitUntilSync()
4281 except errors.OpExecError:
4282 # we ignore here errors, since if the device is standalone, it
4283 # won't be able to sync
4285 self._GoStandalone()
4286 self._GoReconnect(False)
4287 self._WaitUntilSync()
4289 self.feedback_fn("* done")
4291 def _RevertDiskStatus(self):
4292 """Try to revert the disk status after a failed migration.
4295 target_node = self.target_node
4297 self._EnsureSecondary(target_node)
4298 self._GoStandalone()
4299 self._GoReconnect(False)
4300 self._WaitUntilSync()
4301 except errors.OpExecError, err:
4302 self.lu.LogWarning("Migration failed and I can't reconnect the"
4303 " drives: error '%s'\n"
4304 "Please look and recover the instance status" %
4307 def _AbortMigration(self):
4308 """Call the hypervisor code to abort a started migration.
4311 instance = self.instance
4312 target_node = self.target_node
4313 migration_info = self.migration_info
4315 abort_result = self.rpc.call_finalize_migration(target_node,
4319 abort_msg = abort_result.fail_msg
4321 logging.error("Aborting migration failed on target node %s: %s" %
4322 (target_node, abort_msg))
4323 # Don't raise an exception here, as we stil have to try to revert the
4324 # disk status, even if this step failed.
4326 def _ExecMigration(self):
4327 """Migrate an instance.
4329 The migrate is done by:
4330 - change the disks into dual-master mode
4331 - wait until disks are fully synchronized again
4332 - migrate the instance
4333 - change disks on the new secondary node (the old primary) to secondary
4334 - wait until disks are fully synchronized
4335 - change disks into single-master mode
4338 instance = self.instance
4339 target_node = self.target_node
4340 source_node = self.source_node
4342 self.feedback_fn("* checking disk consistency between source and target")
4343 for dev in instance.disks:
4344 if not _CheckDiskConsistency(self, dev, target_node, False):
4345 raise errors.OpExecError("Disk %s is degraded or not fully"
4346 " synchronized on target node,"
4347 " aborting migrate." % dev.iv_name)
4349 # First get the migration information from the remote node
4350 result = self.rpc.call_migration_info(source_node, instance)
4351 msg = result.fail_msg
4353 log_err = ("Failed fetching source migration information from %s: %s" %
4355 logging.error(log_err)
4356 raise errors.OpExecError(log_err)
4358 self.migration_info = migration_info = result.payload
4360 # Then switch the disks to master/master mode
4361 self._EnsureSecondary(target_node)
4362 self._GoStandalone()
4363 self._GoReconnect(True)
4364 self._WaitUntilSync()
4366 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4367 result = self.rpc.call_accept_instance(target_node,
4370 self.nodes_ip[target_node])
4372 msg = result.fail_msg
4374 logging.error("Instance pre-migration failed, trying to revert"
4375 " disk status: %s", msg)
4376 self._AbortMigration()
4377 self._RevertDiskStatus()
4378 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4379 (instance.name, msg))
4381 self.feedback_fn("* migrating instance to %s" % target_node)
4383 result = self.rpc.call_instance_migrate(source_node, instance,
4384 self.nodes_ip[target_node],
4386 msg = result.fail_msg
4388 logging.error("Instance migration failed, trying to revert"
4389 " disk status: %s", msg)
4390 self._AbortMigration()
4391 self._RevertDiskStatus()
4392 raise errors.OpExecError("Could not migrate instance %s: %s" %
4393 (instance.name, msg))
4396 instance.primary_node = target_node
4397 # distribute new instance config to the other nodes
4398 self.cfg.Update(instance)
4400 result = self.rpc.call_finalize_migration(target_node,
4404 msg = result.fail_msg
4406 logging.error("Instance migration succeeded, but finalization failed:"
4408 raise errors.OpExecError("Could not finalize instance migration: %s" %
4411 self._EnsureSecondary(source_node)
4412 self._WaitUntilSync()
4413 self._GoStandalone()
4414 self._GoReconnect(False)
4415 self._WaitUntilSync()
4417 self.feedback_fn("* done")
4419 def Exec(self, feedback_fn):
4420 """Perform the migration.
4423 feedback_fn("Migrating instance %s" % self.instance.name)
4425 self.feedback_fn = feedback_fn
4427 self.source_node = self.instance.primary_node
4428 self.target_node = self.instance.secondary_nodes[0]
4429 self.all_nodes = [self.source_node, self.target_node]
4431 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4432 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4436 return self._ExecCleanup()
4438 return self._ExecMigration()
4441 def _CreateBlockDev(lu, node, instance, device, force_create,
4443 """Create a tree of block devices on a given node.
4445 If this device type has to be created on secondaries, create it and
4448 If not, just recurse to children keeping the same 'force' value.
4450 @param lu: the lu on whose behalf we execute
4451 @param node: the node on which to create the device
4452 @type instance: L{objects.Instance}
4453 @param instance: the instance which owns the device
4454 @type device: L{objects.Disk}
4455 @param device: the device to create
4456 @type force_create: boolean
4457 @param force_create: whether to force creation of this device; this
4458 will be change to True whenever we find a device which has
4459 CreateOnSecondary() attribute
4460 @param info: the extra 'metadata' we should attach to the device
4461 (this will be represented as a LVM tag)
4462 @type force_open: boolean
4463 @param force_open: this parameter will be passes to the
4464 L{backend.BlockdevCreate} function where it specifies
4465 whether we run on primary or not, and it affects both
4466 the child assembly and the device own Open() execution
4469 if device.CreateOnSecondary():
4473 for child in device.children:
4474 _CreateBlockDev(lu, node, instance, child, force_create,
4477 if not force_create:
4480 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4483 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4484 """Create a single block device on a given node.
4486 This will not recurse over children of the device, so they must be
4489 @param lu: the lu on whose behalf we execute
4490 @param node: the node on which to create the device
4491 @type instance: L{objects.Instance}
4492 @param instance: the instance which owns the device
4493 @type device: L{objects.Disk}
4494 @param device: the device to create
4495 @param info: the extra 'metadata' we should attach to the device
4496 (this will be represented as a LVM tag)
4497 @type force_open: boolean
4498 @param force_open: this parameter will be passes to the
4499 L{backend.BlockdevCreate} function where it specifies
4500 whether we run on primary or not, and it affects both
4501 the child assembly and the device own Open() execution
4504 lu.cfg.SetDiskID(device, node)
4505 result = lu.rpc.call_blockdev_create(node, device, device.size,
4506 instance.name, force_open, info)
4507 result.Raise("Can't create block device %s on"
4508 " node %s for instance %s" % (device, node, instance.name))
4509 if device.physical_id is None:
4510 device.physical_id = result.payload
4513 def _GenerateUniqueNames(lu, exts):
4514 """Generate a suitable LV name.
4516 This will generate a logical volume name for the given instance.
4521 new_id = lu.cfg.GenerateUniqueID()
4522 results.append("%s%s" % (new_id, val))
4526 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4528 """Generate a drbd8 device complete with its children.
4531 port = lu.cfg.AllocatePort()
4532 vgname = lu.cfg.GetVGName()
4533 shared_secret = lu.cfg.GenerateDRBDSecret()
4534 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4535 logical_id=(vgname, names[0]))
4536 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4537 logical_id=(vgname, names[1]))
4538 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4539 logical_id=(primary, secondary, port,
4542 children=[dev_data, dev_meta],
4547 def _GenerateDiskTemplate(lu, template_name,
4548 instance_name, primary_node,
4549 secondary_nodes, disk_info,
4550 file_storage_dir, file_driver,
4552 """Generate the entire disk layout for a given template type.
4555 #TODO: compute space requirements
4557 vgname = lu.cfg.GetVGName()
4558 disk_count = len(disk_info)
4560 if template_name == constants.DT_DISKLESS:
4562 elif template_name == constants.DT_PLAIN:
4563 if len(secondary_nodes) != 0:
4564 raise errors.ProgrammerError("Wrong template configuration")
4566 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4567 for i in range(disk_count)])
4568 for idx, disk in enumerate(disk_info):
4569 disk_index = idx + base_index
4570 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4571 logical_id=(vgname, names[idx]),
4572 iv_name="disk/%d" % disk_index,
4574 disks.append(disk_dev)
4575 elif template_name == constants.DT_DRBD8:
4576 if len(secondary_nodes) != 1:
4577 raise errors.ProgrammerError("Wrong template configuration")
4578 remote_node = secondary_nodes[0]
4579 minors = lu.cfg.AllocateDRBDMinor(
4580 [primary_node, remote_node] * len(disk_info), instance_name)
4583 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4584 for i in range(disk_count)]):
4585 names.append(lv_prefix + "_data")
4586 names.append(lv_prefix + "_meta")
4587 for idx, disk in enumerate(disk_info):
4588 disk_index = idx + base_index
4589 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4590 disk["size"], names[idx*2:idx*2+2],
4591 "disk/%d" % disk_index,
4592 minors[idx*2], minors[idx*2+1])
4593 disk_dev.mode = disk["mode"]
4594 disks.append(disk_dev)
4595 elif template_name == constants.DT_FILE:
4596 if len(secondary_nodes) != 0:
4597 raise errors.ProgrammerError("Wrong template configuration")
4599 for idx, disk in enumerate(disk_info):
4600 disk_index = idx + base_index
4601 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4602 iv_name="disk/%d" % disk_index,
4603 logical_id=(file_driver,
4604 "%s/disk%d" % (file_storage_dir,
4607 disks.append(disk_dev)
4609 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4613 def _GetInstanceInfoText(instance):
4614 """Compute that text that should be added to the disk's metadata.
4617 return "originstname+%s" % instance.name
4620 def _CreateDisks(lu, instance):
4621 """Create all disks for an instance.
4623 This abstracts away some work from AddInstance.
4625 @type lu: L{LogicalUnit}
4626 @param lu: the logical unit on whose behalf we execute
4627 @type instance: L{objects.Instance}
4628 @param instance: the instance whose disks we should create
4630 @return: the success of the creation
4633 info = _GetInstanceInfoText(instance)
4634 pnode = instance.primary_node
4636 if instance.disk_template == constants.DT_FILE:
4637 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4638 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4640 result.Raise("Failed to create directory '%s' on"
4641 " node %s: %s" % (file_storage_dir, pnode))
4643 # Note: this needs to be kept in sync with adding of disks in
4644 # LUSetInstanceParams
4645 for device in instance.disks:
4646 logging.info("Creating volume %s for instance %s",
4647 device.iv_name, instance.name)
4649 for node in instance.all_nodes:
4650 f_create = node == pnode
4651 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4654 def _RemoveDisks(lu, instance):
4655 """Remove all disks for an instance.
4657 This abstracts away some work from `AddInstance()` and
4658 `RemoveInstance()`. Note that in case some of the devices couldn't
4659 be removed, the removal will continue with the other ones (compare
4660 with `_CreateDisks()`).
4662 @type lu: L{LogicalUnit}
4663 @param lu: the logical unit on whose behalf we execute
4664 @type instance: L{objects.Instance}
4665 @param instance: the instance whose disks we should remove
4667 @return: the success of the removal
4670 logging.info("Removing block devices for instance %s", instance.name)
4673 for device in instance.disks:
4674 for node, disk in device.ComputeNodeTree(instance.primary_node):
4675 lu.cfg.SetDiskID(disk, node)
4676 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4678 lu.LogWarning("Could not remove block device %s on node %s,"
4679 " continuing anyway: %s", device.iv_name, node, msg)
4682 if instance.disk_template == constants.DT_FILE:
4683 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4684 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4686 msg = result.fail_msg
4688 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4689 file_storage_dir, instance.primary_node, msg)
4695 def _ComputeDiskSize(disk_template, disks):
4696 """Compute disk size requirements in the volume group
4699 # Required free disk space as a function of disk and swap space
4701 constants.DT_DISKLESS: None,
4702 constants.DT_PLAIN: sum(d["size"] for d in disks),
4703 # 128 MB are added for drbd metadata for each disk
4704 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4705 constants.DT_FILE: None,
4708 if disk_template not in req_size_dict:
4709 raise errors.ProgrammerError("Disk template '%s' size requirement"
4710 " is unknown" % disk_template)
4712 return req_size_dict[disk_template]
4715 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4716 """Hypervisor parameter validation.
4718 This function abstract the hypervisor parameter validation to be
4719 used in both instance create and instance modify.
4721 @type lu: L{LogicalUnit}
4722 @param lu: the logical unit for which we check
4723 @type nodenames: list
4724 @param nodenames: the list of nodes on which we should check
4725 @type hvname: string
4726 @param hvname: the name of the hypervisor we should use
4727 @type hvparams: dict
4728 @param hvparams: the parameters which we need to check
4729 @raise errors.OpPrereqError: if the parameters are not valid
4732 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4735 for node in nodenames:
4739 info.Raise("Hypervisor parameter validation failed on node %s" % node)
4742 class LUCreateInstance(LogicalUnit):
4743 """Create an instance.
4746 HPATH = "instance-add"
4747 HTYPE = constants.HTYPE_INSTANCE
4748 _OP_REQP = ["instance_name", "disks", "disk_template",
4750 "wait_for_sync", "ip_check", "nics",
4751 "hvparams", "beparams"]
4754 def _ExpandNode(self, node):
4755 """Expands and checks one node name.
4758 node_full = self.cfg.ExpandNodeName(node)
4759 if node_full is None:
4760 raise errors.OpPrereqError("Unknown node %s" % node)
4763 def ExpandNames(self):
4764 """ExpandNames for CreateInstance.
4766 Figure out the right locks for instance creation.
4769 self.needed_locks = {}
4771 # set optional parameters to none if they don't exist
4772 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4773 if not hasattr(self.op, attr):
4774 setattr(self.op, attr, None)
4776 # cheap checks, mostly valid constants given
4778 # verify creation mode
4779 if self.op.mode not in (constants.INSTANCE_CREATE,
4780 constants.INSTANCE_IMPORT):
4781 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4784 # disk template and mirror node verification
4785 if self.op.disk_template not in constants.DISK_TEMPLATES:
4786 raise errors.OpPrereqError("Invalid disk template name")
4788 if self.op.hypervisor is None:
4789 self.op.hypervisor = self.cfg.GetHypervisorType()
4791 cluster = self.cfg.GetClusterInfo()
4792 enabled_hvs = cluster.enabled_hypervisors
4793 if self.op.hypervisor not in enabled_hvs:
4794 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4795 " cluster (%s)" % (self.op.hypervisor,
4796 ",".join(enabled_hvs)))
4798 # check hypervisor parameter syntax (locally)
4799 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4800 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4802 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4803 hv_type.CheckParameterSyntax(filled_hvp)
4804 self.hv_full = filled_hvp
4806 # fill and remember the beparams dict
4807 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4808 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4811 #### instance parameters check
4813 # instance name verification
4814 hostname1 = utils.HostInfo(self.op.instance_name)
4815 self.op.instance_name = instance_name = hostname1.name
4817 # this is just a preventive check, but someone might still add this
4818 # instance in the meantime, and creation will fail at lock-add time
4819 if instance_name in self.cfg.GetInstanceList():
4820 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4823 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4827 for idx, nic in enumerate(self.op.nics):
4828 nic_mode_req = nic.get("mode", None)
4829 nic_mode = nic_mode_req
4830 if nic_mode is None:
4831 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4833 # in routed mode, for the first nic, the default ip is 'auto'
4834 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4835 default_ip_mode = constants.VALUE_AUTO
4837 default_ip_mode = constants.VALUE_NONE
4839 # ip validity checks
4840 ip = nic.get("ip", default_ip_mode)
4841 if ip is None or ip.lower() == constants.VALUE_NONE:
4843 elif ip.lower() == constants.VALUE_AUTO:
4844 nic_ip = hostname1.ip
4846 if not utils.IsValidIP(ip):
4847 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4848 " like a valid IP" % ip)
4851 # TODO: check the ip for uniqueness !!
4852 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4853 raise errors.OpPrereqError("Routed nic mode requires an ip address")
4855 # MAC address verification
4856 mac = nic.get("mac", constants.VALUE_AUTO)
4857 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4858 if not utils.IsValidMac(mac.lower()):
4859 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4861 # bridge verification
4862 bridge = nic.get("bridge", None)
4863 link = nic.get("link", None)
4865 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
4866 " at the same time")
4867 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4868 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4874 nicparams[constants.NIC_MODE] = nic_mode_req
4876 nicparams[constants.NIC_LINK] = link
4878 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4880 objects.NIC.CheckParameterSyntax(check_params)
4881 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4883 # disk checks/pre-build
4885 for disk in self.op.disks:
4886 mode = disk.get("mode", constants.DISK_RDWR)
4887 if mode not in constants.DISK_ACCESS_SET:
4888 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4890 size = disk.get("size", None)
4892 raise errors.OpPrereqError("Missing disk size")
4896 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4897 self.disks.append({"size": size, "mode": mode})
4899 # used in CheckPrereq for ip ping check
4900 self.check_ip = hostname1.ip
4902 # file storage checks
4903 if (self.op.file_driver and
4904 not self.op.file_driver in constants.FILE_DRIVER):
4905 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4906 self.op.file_driver)
4908 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4909 raise errors.OpPrereqError("File storage directory path not absolute")
4911 ### Node/iallocator related checks
4912 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4913 raise errors.OpPrereqError("One and only one of iallocator and primary"
4914 " node must be given")
4916 if self.op.iallocator:
4917 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4919 self.op.pnode = self._ExpandNode(self.op.pnode)
4920 nodelist = [self.op.pnode]
4921 if self.op.snode is not None:
4922 self.op.snode = self._ExpandNode(self.op.snode)
4923 nodelist.append(self.op.snode)
4924 self.needed_locks[locking.LEVEL_NODE] = nodelist
4926 # in case of import lock the source node too
4927 if self.op.mode == constants.INSTANCE_IMPORT:
4928 src_node = getattr(self.op, "src_node", None)
4929 src_path = getattr(self.op, "src_path", None)
4931 if src_path is None:
4932 self.op.src_path = src_path = self.op.instance_name
4934 if src_node is None:
4935 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4936 self.op.src_node = None
4937 if os.path.isabs(src_path):
4938 raise errors.OpPrereqError("Importing an instance from an absolute"
4939 " path requires a source node option.")
4941 self.op.src_node = src_node = self._ExpandNode(src_node)
4942 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4943 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4944 if not os.path.isabs(src_path):
4945 self.op.src_path = src_path = \
4946 os.path.join(constants.EXPORT_DIR, src_path)
4948 else: # INSTANCE_CREATE
4949 if getattr(self.op, "os_type", None) is None:
4950 raise errors.OpPrereqError("No guest OS specified")
4952 def _RunAllocator(self):
4953 """Run the allocator based on input opcode.
4956 nics = [n.ToDict() for n in self.nics]
4957 ial = IAllocator(self.cfg, self.rpc,
4958 mode=constants.IALLOCATOR_MODE_ALLOC,
4959 name=self.op.instance_name,
4960 disk_template=self.op.disk_template,
4963 vcpus=self.be_full[constants.BE_VCPUS],
4964 mem_size=self.be_full[constants.BE_MEMORY],
4967 hypervisor=self.op.hypervisor,
4970 ial.Run(self.op.iallocator)
4973 raise errors.OpPrereqError("Can't compute nodes using"
4974 " iallocator '%s': %s" % (self.op.iallocator,
4976 if len(ial.nodes) != ial.required_nodes:
4977 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4978 " of nodes (%s), required %s" %
4979 (self.op.iallocator, len(ial.nodes),
4980 ial.required_nodes))
4981 self.op.pnode = ial.nodes[0]
4982 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4983 self.op.instance_name, self.op.iallocator,
4984 ", ".join(ial.nodes))
4985 if ial.required_nodes == 2:
4986 self.op.snode = ial.nodes[1]
4988 def BuildHooksEnv(self):
4991 This runs on master, primary and secondary nodes of the instance.
4995 "ADD_MODE": self.op.mode,
4997 if self.op.mode == constants.INSTANCE_IMPORT:
4998 env["SRC_NODE"] = self.op.src_node
4999 env["SRC_PATH"] = self.op.src_path
5000 env["SRC_IMAGES"] = self.src_images
5002 env.update(_BuildInstanceHookEnv(
5003 name=self.op.instance_name,
5004 primary_node=self.op.pnode,
5005 secondary_nodes=self.secondaries,
5006 status=self.op.start,
5007 os_type=self.op.os_type,
5008 memory=self.be_full[constants.BE_MEMORY],
5009 vcpus=self.be_full[constants.BE_VCPUS],
5010 nics=_NICListToTuple(self, self.nics),
5011 disk_template=self.op.disk_template,
5012 disks=[(d["size"], d["mode"]) for d in self.disks],
5015 hypervisor_name=self.op.hypervisor,
5018 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5023 def CheckPrereq(self):
5024 """Check prerequisites.
5027 if (not self.cfg.GetVGName() and
5028 self.op.disk_template not in constants.DTS_NOT_LVM):
5029 raise errors.OpPrereqError("Cluster does not support lvm-based"
5032 if self.op.mode == constants.INSTANCE_IMPORT:
5033 src_node = self.op.src_node
5034 src_path = self.op.src_path
5036 if src_node is None:
5037 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5038 exp_list = self.rpc.call_export_list(locked_nodes)
5040 for node in exp_list:
5041 if exp_list[node].fail_msg:
5043 if src_path in exp_list[node].payload:
5045 self.op.src_node = src_node = node
5046 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5050 raise errors.OpPrereqError("No export found for relative path %s" %
5053 _CheckNodeOnline(self, src_node)
5054 result = self.rpc.call_export_info(src_node, src_path)
5055 result.Raise("No export or invalid export found in dir %s" % src_path)
5057 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5058 if not export_info.has_section(constants.INISECT_EXP):
5059 raise errors.ProgrammerError("Corrupted export config")
5061 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5062 if (int(ei_version) != constants.EXPORT_VERSION):
5063 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5064 (ei_version, constants.EXPORT_VERSION))
5066 # Check that the new instance doesn't have less disks than the export
5067 instance_disks = len(self.disks)
5068 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5069 if instance_disks < export_disks:
5070 raise errors.OpPrereqError("Not enough disks to import."
5071 " (instance: %d, export: %d)" %
5072 (instance_disks, export_disks))
5074 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5076 for idx in range(export_disks):
5077 option = 'disk%d_dump' % idx
5078 if export_info.has_option(constants.INISECT_INS, option):
5079 # FIXME: are the old os-es, disk sizes, etc. useful?
5080 export_name = export_info.get(constants.INISECT_INS, option)
5081 image = os.path.join(src_path, export_name)
5082 disk_images.append(image)
5084 disk_images.append(False)
5086 self.src_images = disk_images
5088 old_name = export_info.get(constants.INISECT_INS, 'name')
5089 # FIXME: int() here could throw a ValueError on broken exports
5090 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5091 if self.op.instance_name == old_name:
5092 for idx, nic in enumerate(self.nics):
5093 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5094 nic_mac_ini = 'nic%d_mac' % idx
5095 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5097 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5098 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5099 if self.op.start and not self.op.ip_check:
5100 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5101 " adding an instance in start mode")
5103 if self.op.ip_check:
5104 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5105 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5106 (self.check_ip, self.op.instance_name))
5108 #### mac address generation
5109 # By generating here the mac address both the allocator and the hooks get
5110 # the real final mac address rather than the 'auto' or 'generate' value.
5111 # There is a race condition between the generation and the instance object
5112 # creation, which means that we know the mac is valid now, but we're not
5113 # sure it will be when we actually add the instance. If things go bad
5114 # adding the instance will abort because of a duplicate mac, and the
5115 # creation job will fail.
5116 for nic in self.nics:
5117 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5118 nic.mac = self.cfg.GenerateMAC()
5122 if self.op.iallocator is not None:
5123 self._RunAllocator()
5125 #### node related checks
5127 # check primary node
5128 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5129 assert self.pnode is not None, \
5130 "Cannot retrieve locked node %s" % self.op.pnode
5132 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5135 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5138 self.secondaries = []
5140 # mirror node verification
5141 if self.op.disk_template in constants.DTS_NET_MIRROR:
5142 if self.op.snode is None:
5143 raise errors.OpPrereqError("The networked disk templates need"
5145 if self.op.snode == pnode.name:
5146 raise errors.OpPrereqError("The secondary node cannot be"
5147 " the primary node.")
5148 _CheckNodeOnline(self, self.op.snode)
5149 _CheckNodeNotDrained(self, self.op.snode)
5150 self.secondaries.append(self.op.snode)
5152 nodenames = [pnode.name] + self.secondaries
5154 req_size = _ComputeDiskSize(self.op.disk_template,
5157 # Check lv size requirements
5158 if req_size is not None:
5159 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5161 for node in nodenames:
5162 info = nodeinfo[node]
5163 info.Raise("Cannot get current information from node %s" % node)
5165 vg_free = info.get('vg_free', None)
5166 if not isinstance(vg_free, int):
5167 raise errors.OpPrereqError("Can't compute free disk space on"
5169 if req_size > vg_free:
5170 raise errors.OpPrereqError("Not enough disk space on target node %s."
5171 " %d MB available, %d MB required" %
5172 (node, vg_free, req_size))
5174 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5177 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5178 result.Raise("OS '%s' not in supported os list for primary node %s" %
5179 (self.op.os_type, pnode.name), prereq=True)
5181 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5183 # memory check on primary node
5185 _CheckNodeFreeMemory(self, self.pnode.name,
5186 "creating instance %s" % self.op.instance_name,
5187 self.be_full[constants.BE_MEMORY],
5190 self.dry_run_result = list(nodenames)
5192 def Exec(self, feedback_fn):
5193 """Create and add the instance to the cluster.
5196 instance = self.op.instance_name
5197 pnode_name = self.pnode.name
5199 ht_kind = self.op.hypervisor
5200 if ht_kind in constants.HTS_REQ_PORT:
5201 network_port = self.cfg.AllocatePort()
5205 ##if self.op.vnc_bind_address is None:
5206 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5208 # this is needed because os.path.join does not accept None arguments
5209 if self.op.file_storage_dir is None:
5210 string_file_storage_dir = ""
5212 string_file_storage_dir = self.op.file_storage_dir
5214 # build the full file storage dir path
5215 file_storage_dir = os.path.normpath(os.path.join(
5216 self.cfg.GetFileStorageDir(),
5217 string_file_storage_dir, instance))
5220 disks = _GenerateDiskTemplate(self,
5221 self.op.disk_template,
5222 instance, pnode_name,
5226 self.op.file_driver,
5229 iobj = objects.Instance(name=instance, os=self.op.os_type,
5230 primary_node=pnode_name,
5231 nics=self.nics, disks=disks,
5232 disk_template=self.op.disk_template,
5234 network_port=network_port,
5235 beparams=self.op.beparams,
5236 hvparams=self.op.hvparams,
5237 hypervisor=self.op.hypervisor,
5240 feedback_fn("* creating instance disks...")
5242 _CreateDisks(self, iobj)
5243 except errors.OpExecError:
5244 self.LogWarning("Device creation failed, reverting...")
5246 _RemoveDisks(self, iobj)
5248 self.cfg.ReleaseDRBDMinors(instance)
5251 feedback_fn("adding instance %s to cluster config" % instance)
5253 self.cfg.AddInstance(iobj)
5254 # Declare that we don't want to remove the instance lock anymore, as we've
5255 # added the instance to the config
5256 del self.remove_locks[locking.LEVEL_INSTANCE]
5257 # Unlock all the nodes
5258 if self.op.mode == constants.INSTANCE_IMPORT:
5259 nodes_keep = [self.op.src_node]
5260 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5261 if node != self.op.src_node]
5262 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5263 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5265 self.context.glm.release(locking.LEVEL_NODE)
5266 del self.acquired_locks[locking.LEVEL_NODE]
5268 if self.op.wait_for_sync:
5269 disk_abort = not _WaitForSync(self, iobj)
5270 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5271 # make sure the disks are not degraded (still sync-ing is ok)
5273 feedback_fn("* checking mirrors status")
5274 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5279 _RemoveDisks(self, iobj)
5280 self.cfg.RemoveInstance(iobj.name)
5281 # Make sure the instance lock gets removed
5282 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5283 raise errors.OpExecError("There are some degraded disks for"
5286 feedback_fn("creating os for instance %s on node %s" %
5287 (instance, pnode_name))
5289 if iobj.disk_template != constants.DT_DISKLESS:
5290 if self.op.mode == constants.INSTANCE_CREATE:
5291 feedback_fn("* running the instance OS create scripts...")
5292 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5293 result.Raise("Could not add os for instance %s"
5294 " on node %s" % (instance, pnode_name))
5296 elif self.op.mode == constants.INSTANCE_IMPORT:
5297 feedback_fn("* running the instance OS import scripts...")
5298 src_node = self.op.src_node
5299 src_images = self.src_images
5300 cluster_name = self.cfg.GetClusterName()
5301 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5302 src_node, src_images,
5304 msg = import_result.fail_msg
5306 self.LogWarning("Error while importing the disk images for instance"
5307 " %s on node %s: %s" % (instance, pnode_name, msg))
5309 # also checked in the prereq part
5310 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5314 iobj.admin_up = True
5315 self.cfg.Update(iobj)
5316 logging.info("Starting instance %s on node %s", instance, pnode_name)
5317 feedback_fn("* starting instance...")
5318 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5319 result.Raise("Could not start instance")
5321 return list(iobj.all_nodes)
5324 class LUConnectConsole(NoHooksLU):
5325 """Connect to an instance's console.
5327 This is somewhat special in that it returns the command line that
5328 you need to run on the master node in order to connect to the
5332 _OP_REQP = ["instance_name"]
5335 def ExpandNames(self):
5336 self._ExpandAndLockInstance()
5338 def CheckPrereq(self):
5339 """Check prerequisites.
5341 This checks that the instance is in the cluster.
5344 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5345 assert self.instance is not None, \
5346 "Cannot retrieve locked instance %s" % self.op.instance_name
5347 _CheckNodeOnline(self, self.instance.primary_node)
5349 def Exec(self, feedback_fn):
5350 """Connect to the console of an instance
5353 instance = self.instance
5354 node = instance.primary_node
5356 node_insts = self.rpc.call_instance_list([node],
5357 [instance.hypervisor])[node]
5358 node_insts.Raise("Can't get node information from %s" % node)
5360 if instance.name not in node_insts.payload:
5361 raise errors.OpExecError("Instance %s is not running." % instance.name)
5363 logging.debug("Connecting to console of %s on %s", instance.name, node)
5365 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5366 cluster = self.cfg.GetClusterInfo()
5367 # beparams and hvparams are passed separately, to avoid editing the
5368 # instance and then saving the defaults in the instance itself.
5369 hvparams = cluster.FillHV(instance)
5370 beparams = cluster.FillBE(instance)
5371 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5374 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5377 class LUReplaceDisks(LogicalUnit):
5378 """Replace the disks of an instance.
5381 HPATH = "mirrors-replace"
5382 HTYPE = constants.HTYPE_INSTANCE
5383 _OP_REQP = ["instance_name", "mode", "disks"]
5386 def CheckArguments(self):
5387 if not hasattr(self.op, "remote_node"):
5388 self.op.remote_node = None
5389 if not hasattr(self.op, "iallocator"):
5390 self.op.iallocator = None
5392 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
5395 def ExpandNames(self):
5396 self._ExpandAndLockInstance()
5398 if self.op.iallocator is not None:
5399 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5401 elif self.op.remote_node is not None:
5402 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5403 if remote_node is None:
5404 raise errors.OpPrereqError("Node '%s' not known" %
5405 self.op.remote_node)
5407 self.op.remote_node = remote_node
5409 # Warning: do not remove the locking of the new secondary here
5410 # unless DRBD8.AddChildren is changed to work in parallel;
5411 # currently it doesn't since parallel invocations of
5412 # FindUnusedMinor will conflict
5413 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5414 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5417 self.needed_locks[locking.LEVEL_NODE] = []
5418 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5420 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
5421 self.op.iallocator, self.op.remote_node,
5424 self.tasklets = [self.replacer]
5426 def DeclareLocks(self, level):
5427 # If we're not already locking all nodes in the set we have to declare the
5428 # instance's primary/secondary nodes.
5429 if (level == locking.LEVEL_NODE and
5430 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5431 self._LockInstancesNodes()
5433 def BuildHooksEnv(self):
5436 This runs on the master, the primary and all the secondaries.
5439 instance = self.replacer.instance
5441 "MODE": self.op.mode,
5442 "NEW_SECONDARY": self.op.remote_node,
5443 "OLD_SECONDARY": instance.secondary_nodes[0],
5445 env.update(_BuildInstanceHookEnvByObject(self, instance))
5447 self.cfg.GetMasterNode(),
5448 instance.primary_node,
5450 if self.op.remote_node is not None:
5451 nl.append(self.op.remote_node)
5455 class LUEvacuateNode(LogicalUnit):
5456 """Relocate the secondary instances from a node.
5459 HPATH = "node-evacuate"
5460 HTYPE = constants.HTYPE_NODE
5461 _OP_REQP = ["node_name"]
5464 def CheckArguments(self):
5465 if not hasattr(self.op, "remote_node"):
5466 self.op.remote_node = None
5467 if not hasattr(self.op, "iallocator"):
5468 self.op.iallocator = None
5470 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
5471 self.op.remote_node,
5474 def ExpandNames(self):
5475 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
5476 if self.op.node_name is None:
5477 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
5479 self.needed_locks = {}
5481 # Declare node locks
5482 if self.op.iallocator is not None:
5483 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5485 elif self.op.remote_node is not None:
5486 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5487 if remote_node is None:
5488 raise errors.OpPrereqError("Node '%s' not known" %
5489 self.op.remote_node)
5491 self.op.remote_node = remote_node
5493 # Warning: do not remove the locking of the new secondary here
5494 # unless DRBD8.AddChildren is changed to work in parallel;
5495 # currently it doesn't since parallel invocations of
5496 # FindUnusedMinor will conflict
5497 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5498 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5501 raise errors.OpPrereqError("Invalid parameters")
5503 # Create tasklets for replacing disks for all secondary instances on this
5508 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
5509 logging.debug("Replacing disks for instance %s", inst.name)
5510 names.append(inst.name)
5512 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
5513 self.op.iallocator, self.op.remote_node, [])
5514 tasklets.append(replacer)
5516 self.tasklets = tasklets
5517 self.instance_names = names
5519 # Declare instance locks
5520 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
5522 def DeclareLocks(self, level):
5523 # If we're not already locking all nodes in the set we have to declare the
5524 # instance's primary/secondary nodes.
5525 if (level == locking.LEVEL_NODE and
5526 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5527 self._LockInstancesNodes()
5529 def BuildHooksEnv(self):
5532 This runs on the master, the primary and all the secondaries.
5536 "NODE_NAME": self.op.node_name,
5539 nl = [self.cfg.GetMasterNode()]
5541 if self.op.remote_node is not None:
5542 env["NEW_SECONDARY"] = self.op.remote_node
5543 nl.append(self.op.remote_node)
5545 return (env, nl, nl)
5548 class TLReplaceDisks(Tasklet):
5549 """Replaces disks for an instance.
5551 Note: Locking is not within the scope of this class.
5554 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
5556 """Initializes this class.
5559 Tasklet.__init__(self, lu)
5562 self.instance_name = instance_name
5564 self.iallocator_name = iallocator_name
5565 self.remote_node = remote_node
5569 self.instance = None
5570 self.new_node = None
5571 self.target_node = None
5572 self.other_node = None
5573 self.remote_node_info = None
5574 self.node_secondary_ip = None
5577 def CheckArguments(mode, remote_node, iallocator):
5578 """Helper function for users of this class.
5581 # check for valid parameter combination
5582 cnt = [remote_node, iallocator].count(None)
5583 if mode == constants.REPLACE_DISK_CHG:
5585 raise errors.OpPrereqError("When changing the secondary either an"
5586 " iallocator script must be used or the"
5589 raise errors.OpPrereqError("Give either the iallocator or the new"
5590 " secondary, not both")
5591 else: # not replacing the secondary
5593 raise errors.OpPrereqError("The iallocator and new node options can"
5594 " be used only when changing the"
5598 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
5599 """Compute a new secondary node using an IAllocator.
5602 ial = IAllocator(lu.cfg, lu.rpc,
5603 mode=constants.IALLOCATOR_MODE_RELOC,
5605 relocate_from=relocate_from)
5607 ial.Run(iallocator_name)
5610 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
5611 " %s" % (iallocator_name, ial.info))
5613 if len(ial.nodes) != ial.required_nodes:
5614 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5615 " of nodes (%s), required %s" %
5616 (len(ial.nodes), ial.required_nodes))
5618 remote_node_name = ial.nodes[0]
5620 lu.LogInfo("Selected new secondary for instance '%s': %s",
5621 instance_name, remote_node_name)
5623 return remote_node_name
5625 def CheckPrereq(self):
5626 """Check prerequisites.
5628 This checks that the instance is in the cluster.
5631 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
5632 assert self.instance is not None, \
5633 "Cannot retrieve locked instance %s" % self.instance_name
5635 if self.instance.disk_template != constants.DT_DRBD8:
5636 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5639 if len(self.instance.secondary_nodes) != 1:
5640 raise errors.OpPrereqError("The instance has a strange layout,"
5641 " expected one secondary but found %d" %
5642 len(self.instance.secondary_nodes))
5644 secondary_node = self.instance.secondary_nodes[0]
5646 if self.iallocator_name is None:
5647 remote_node = self.remote_node
5649 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
5650 self.instance.name, secondary_node)
5652 if remote_node is not None:
5653 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5654 assert self.remote_node_info is not None, \
5655 "Cannot retrieve locked node %s" % remote_node
5657 self.remote_node_info = None
5659 if remote_node == self.instance.primary_node:
5660 raise errors.OpPrereqError("The specified node is the primary node of"
5663 if remote_node == secondary_node:
5664 raise errors.OpPrereqError("The specified node is already the"
5665 " secondary node of the instance.")
5667 if self.mode == constants.REPLACE_DISK_PRI:
5668 self.target_node = self.instance.primary_node
5669 self.other_node = secondary_node
5670 check_nodes = [self.target_node, self.other_node]
5672 elif self.mode == constants.REPLACE_DISK_SEC:
5673 self.target_node = secondary_node
5674 self.other_node = self.instance.primary_node
5675 check_nodes = [self.target_node, self.other_node]
5677 elif self.mode == constants.REPLACE_DISK_CHG:
5678 self.new_node = remote_node
5679 self.other_node = self.instance.primary_node
5680 self.target_node = secondary_node
5681 check_nodes = [self.new_node, self.other_node]
5683 _CheckNodeNotDrained(self.lu, remote_node)
5686 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
5689 for node in check_nodes:
5690 _CheckNodeOnline(self.lu, node)
5692 # If not specified all disks should be replaced
5694 self.disks = range(len(self.instance.disks))
5696 # Check whether disks are valid
5697 for disk_idx in self.disks:
5698 self.instance.FindDisk(disk_idx)
5700 # Get secondary node IP addresses
5703 for node_name in [self.target_node, self.other_node, self.new_node]:
5704 if node_name is not None:
5705 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
5707 self.node_secondary_ip = node_2nd_ip
5709 def Exec(self, feedback_fn):
5710 """Execute disk replacement.
5712 This dispatches the disk replacement to the appropriate handler.
5715 feedback_fn("Replacing disks for %s" % self.instance.name)
5717 activate_disks = (not self.instance.admin_up)
5719 # Activate the instance disks if we're replacing them on a down instance
5721 _StartInstanceDisks(self.lu, self.instance, True)
5724 if self.mode == constants.REPLACE_DISK_CHG:
5725 return self._ExecDrbd8Secondary()
5727 return self._ExecDrbd8DiskOnly()
5730 # Deactivate the instance disks if we're replacing them on a down instance
5732 _SafeShutdownInstanceDisks(self.lu, self.instance)
5734 def _CheckVolumeGroup(self, nodes):
5735 self.lu.LogInfo("Checking volume groups")
5737 vgname = self.cfg.GetVGName()
5739 # Make sure volume group exists on all involved nodes
5740 results = self.rpc.call_vg_list(nodes)
5742 raise errors.OpExecError("Can't list volume groups on the nodes")
5746 res.Raise("Error checking node %s" % node)
5747 if vgname not in res.payload:
5748 raise errors.OpExecError("Volume group '%s' not found on node %s" %
5751 def _CheckDisksExistence(self, nodes):
5752 # Check disk existence
5753 for idx, dev in enumerate(self.instance.disks):
5754 if idx not in self.disks:
5758 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
5759 self.cfg.SetDiskID(dev, node)
5761 result = self.rpc.call_blockdev_find(node, dev)
5763 msg = result.fail_msg
5764 if msg or not result.payload:
5766 msg = "disk not found"
5767 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5770 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
5771 for idx, dev in enumerate(self.instance.disks):
5772 if idx not in self.disks:
5775 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
5778 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
5780 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
5781 " replace disks for instance %s" %
5782 (node_name, self.instance.name))
5784 def _CreateNewStorage(self, node_name):
5785 vgname = self.cfg.GetVGName()
5788 for idx, dev in enumerate(self.instance.disks):
5789 if idx not in self.disks:
5792 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
5794 self.cfg.SetDiskID(dev, node_name)
5796 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
5797 names = _GenerateUniqueNames(self.lu, lv_names)
5799 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
5800 logical_id=(vgname, names[0]))
5801 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5802 logical_id=(vgname, names[1]))
5804 new_lvs = [lv_data, lv_meta]
5805 old_lvs = dev.children
5806 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5808 # we pass force_create=True to force the LVM creation
5809 for new_lv in new_lvs:
5810 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
5811 _GetInstanceInfoText(self.instance), False)
5815 def _CheckDevices(self, node_name, iv_names):
5816 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5817 self.cfg.SetDiskID(dev, node_name)
5819 result = self.rpc.call_blockdev_find(node_name, dev)
5821 msg = result.fail_msg
5822 if msg or not result.payload:
5824 msg = "disk not found"
5825 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5828 if result.payload[5]:
5829 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5831 def _RemoveOldStorage(self, node_name, iv_names):
5832 for name, (dev, old_lvs, _) in iv_names.iteritems():
5833 self.lu.LogInfo("Remove logical volumes for %s" % name)
5836 self.cfg.SetDiskID(lv, node_name)
5838 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
5840 self.lu.LogWarning("Can't remove old LV: %s" % msg,
5841 hint="remove unused LVs manually")
5843 def _ExecDrbd8DiskOnly(self):
5844 """Replace a disk on the primary or secondary for DRBD 8.
5846 The algorithm for replace is quite complicated:
5848 1. for each disk to be replaced:
5850 1. create new LVs on the target node with unique names
5851 1. detach old LVs from the drbd device
5852 1. rename old LVs to name_replaced.<time_t>
5853 1. rename new LVs to old LVs
5854 1. attach the new LVs (with the old names now) to the drbd device
5856 1. wait for sync across all devices
5858 1. for each modified disk:
5860 1. remove old LVs (which have the name name_replaces.<time_t>)
5862 Failures are not very well handled.
5867 # Step: check device activation
5868 self.lu.LogStep(1, steps_total, "Check device existence")
5869 self._CheckDisksExistence([self.other_node, self.target_node])
5870 self._CheckVolumeGroup([self.target_node, self.other_node])
5872 # Step: check other node consistency
5873 self.lu.LogStep(2, steps_total, "Check peer consistency")
5874 self._CheckDisksConsistency(self.other_node,
5875 self.other_node == self.instance.primary_node,
5878 # Step: create new storage
5879 self.lu.LogStep(3, steps_total, "Allocate new storage")
5880 iv_names = self._CreateNewStorage(self.target_node)
5882 # Step: for each lv, detach+rename*2+attach
5883 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
5884 for dev, old_lvs, new_lvs in iv_names.itervalues():
5885 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
5887 result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs)
5888 result.Raise("Can't detach drbd from local storage on node"
5889 " %s for device %s" % (self.target_node, dev.iv_name))
5891 #cfg.Update(instance)
5893 # ok, we created the new LVs, so now we know we have the needed
5894 # storage; as such, we proceed on the target node to rename
5895 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5896 # using the assumption that logical_id == physical_id (which in
5897 # turn is the unique_id on that node)
5899 # FIXME(iustin): use a better name for the replaced LVs
5900 temp_suffix = int(time.time())
5901 ren_fn = lambda d, suff: (d.physical_id[0],
5902 d.physical_id[1] + "_replaced-%s" % suff)
5904 # Build the rename list based on what LVs exist on the node
5905 rename_old_to_new = []
5906 for to_ren in old_lvs:
5907 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
5908 if not result.fail_msg and result.payload:
5910 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
5912 self.lu.LogInfo("Renaming the old LVs on the target node")
5913 result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new)
5914 result.Raise("Can't rename old LVs on node %s" % self.target_node)
5916 # Now we rename the new LVs to the old LVs
5917 self.lu.LogInfo("Renaming the new LVs on the target node")
5918 rename_new_to_old = [(new, old.physical_id)
5919 for old, new in zip(old_lvs, new_lvs)]
5920 result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old)
5921 result.Raise("Can't rename new LVs on node %s" % self.target_node)
5923 for old, new in zip(old_lvs, new_lvs):
5924 new.logical_id = old.logical_id
5925 self.cfg.SetDiskID(new, self.target_node)
5927 for disk in old_lvs:
5928 disk.logical_id = ren_fn(disk, temp_suffix)
5929 self.cfg.SetDiskID(disk, self.target_node)
5931 # Now that the new lvs have the old name, we can add them to the device
5932 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
5933 result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs)
5934 msg = result.fail_msg
5936 for new_lv in new_lvs:
5937 msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg
5939 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
5940 hint=("cleanup manually the unused logical"
5942 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5944 dev.children = new_lvs
5946 self.cfg.Update(self.instance)
5949 # This can fail as the old devices are degraded and _WaitForSync
5950 # does a combined result over all disks, so we don't check its return value
5951 self.lu.LogStep(5, steps_total, "Sync devices")
5952 _WaitForSync(self.lu, self.instance, unlock=True)
5954 # Check all devices manually
5955 self._CheckDevices(self.instance.primary_node, iv_names)
5957 # Step: remove old storage
5958 self.lu.LogStep(6, steps_total, "Removing old storage")
5959 self._RemoveOldStorage(self.target_node, iv_names)
5961 def _ExecDrbd8Secondary(self):
5962 """Replace the secondary node for DRBD 8.
5964 The algorithm for replace is quite complicated:
5965 - for all disks of the instance:
5966 - create new LVs on the new node with same names
5967 - shutdown the drbd device on the old secondary
5968 - disconnect the drbd network on the primary
5969 - create the drbd device on the new secondary
5970 - network attach the drbd on the primary, using an artifice:
5971 the drbd code for Attach() will connect to the network if it
5972 finds a device which is connected to the good local disks but
5974 - wait for sync across all devices
5975 - remove all disks from the old secondary
5977 Failures are not very well handled.
5982 # Step: check device activation
5983 self.lu.LogStep(1, steps_total, "Check device existence")
5984 self._CheckDisksExistence([self.instance.primary_node])
5985 self._CheckVolumeGroup([self.instance.primary_node])
5987 # Step: check other node consistency
5988 self.lu.LogStep(2, steps_total, "Check peer consistency")
5989 self._CheckDisksConsistency(self.instance.primary_node, True, True)
5991 # Step: create new storage
5992 self.lu.LogStep(3, steps_total, "Allocate new storage")
5993 for idx, dev in enumerate(self.instance.disks):
5994 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
5995 (self.new_node, idx))
5996 # we pass force_create=True to force LVM creation
5997 for new_lv in dev.children:
5998 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
5999 _GetInstanceInfoText(self.instance), False)
6001 # Step 4: dbrd minors and drbd setups changes
6002 # after this, we must manually remove the drbd minors on both the
6003 # error and the success paths
6004 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6005 minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks],
6007 logging.debug("Allocated minors %r" % (minors,))
6010 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6011 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx))
6012 # create new devices on new_node; note that we create two IDs:
6013 # one without port, so the drbd will be activated without
6014 # networking information on the new node at this stage, and one
6015 # with network, for the latter activation in step 4
6016 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6017 if self.instance.primary_node == o_node1:
6022 new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret)
6023 new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret)
6025 iv_names[idx] = (dev, dev.children, new_net_id)
6026 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6028 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6029 logical_id=new_alone_id,
6030 children=dev.children,
6033 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6034 _GetInstanceInfoText(self.instance), False)
6035 except errors.GenericError:
6036 self.cfg.ReleaseDRBDMinors(self.instance.name)
6039 # We have new devices, shutdown the drbd on the old secondary
6040 for idx, dev in enumerate(self.instance.disks):
6041 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6042 self.cfg.SetDiskID(dev, self.target_node)
6043 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6045 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6046 "node: %s" % (idx, msg),
6047 hint=("Please cleanup this device manually as"
6048 " soon as possible"))
6050 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6051 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip,
6052 self.instance.disks)[self.instance.primary_node]
6054 msg = result.fail_msg
6056 # detaches didn't succeed (unlikely)
6057 self.cfg.ReleaseDRBDMinors(self.instance.name)
6058 raise errors.OpExecError("Can't detach the disks from the network on"
6059 " old node: %s" % (msg,))
6061 # if we managed to detach at least one, we update all the disks of
6062 # the instance to point to the new secondary
6063 self.lu.LogInfo("Updating instance configuration")
6064 for dev, _, new_logical_id in iv_names.itervalues():
6065 dev.logical_id = new_logical_id
6066 self.cfg.SetDiskID(dev, self.instance.primary_node)
6068 self.cfg.Update(self.instance)
6070 # and now perform the drbd attach
6071 self.lu.LogInfo("Attaching primary drbds to new secondary"
6072 " (standalone => connected)")
6073 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip,
6074 self.instance.disks, self.instance.name,
6076 for to_node, to_result in result.items():
6077 msg = to_result.fail_msg
6079 self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg,
6080 hint=("please do a gnt-instance info to see the"
6081 " status of disks"))
6084 # This can fail as the old devices are degraded and _WaitForSync
6085 # does a combined result over all disks, so we don't check its return value
6086 self.lu.LogStep(5, steps_total, "Sync devices")
6087 _WaitForSync(self.lu, self.instance, unlock=True)
6089 # Check all devices manually
6090 self._CheckDevices(self.instance.primary_node, iv_names)
6092 # Step: remove old storage
6093 self.lu.LogStep(6, steps_total, "Removing old storage")
6094 self._RemoveOldStorage(self.target_node, iv_names)
6097 class LUGrowDisk(LogicalUnit):
6098 """Grow a disk of an instance.
6102 HTYPE = constants.HTYPE_INSTANCE
6103 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6106 def ExpandNames(self):
6107 self._ExpandAndLockInstance()
6108 self.needed_locks[locking.LEVEL_NODE] = []
6109 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6111 def DeclareLocks(self, level):
6112 if level == locking.LEVEL_NODE:
6113 self._LockInstancesNodes()
6115 def BuildHooksEnv(self):
6118 This runs on the master, the primary and all the secondaries.
6122 "DISK": self.op.disk,
6123 "AMOUNT": self.op.amount,
6125 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6127 self.cfg.GetMasterNode(),
6128 self.instance.primary_node,
6132 def CheckPrereq(self):
6133 """Check prerequisites.
6135 This checks that the instance is in the cluster.
6138 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6139 assert instance is not None, \
6140 "Cannot retrieve locked instance %s" % self.op.instance_name
6141 nodenames = list(instance.all_nodes)
6142 for node in nodenames:
6143 _CheckNodeOnline(self, node)
6146 self.instance = instance
6148 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6149 raise errors.OpPrereqError("Instance's disk layout does not support"
6152 self.disk = instance.FindDisk(self.op.disk)
6154 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6155 instance.hypervisor)
6156 for node in nodenames:
6157 info = nodeinfo[node]
6158 info.Raise("Cannot get current information from node %s" % node)
6159 vg_free = info.payload.get('vg_free', None)
6160 if not isinstance(vg_free, int):
6161 raise errors.OpPrereqError("Can't compute free disk space on"
6163 if self.op.amount > vg_free:
6164 raise errors.OpPrereqError("Not enough disk space on target node %s:"
6165 " %d MiB available, %d MiB required" %
6166 (node, vg_free, self.op.amount))
6168 def Exec(self, feedback_fn):
6169 """Execute disk grow.
6172 instance = self.instance
6174 for node in instance.all_nodes:
6175 self.cfg.SetDiskID(disk, node)
6176 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6177 result.Raise("Grow request failed to node %s" % node)
6178 disk.RecordGrow(self.op.amount)
6179 self.cfg.Update(instance)
6180 if self.op.wait_for_sync:
6181 disk_abort = not _WaitForSync(self, instance)
6183 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6184 " status.\nPlease check the instance.")
6187 class LUQueryInstanceData(NoHooksLU):
6188 """Query runtime instance data.
6191 _OP_REQP = ["instances", "static"]
6194 def ExpandNames(self):
6195 self.needed_locks = {}
6196 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6198 if not isinstance(self.op.instances, list):
6199 raise errors.OpPrereqError("Invalid argument type 'instances'")
6201 if self.op.instances:
6202 self.wanted_names = []
6203 for name in self.op.instances:
6204 full_name = self.cfg.ExpandInstanceName(name)
6205 if full_name is None:
6206 raise errors.OpPrereqError("Instance '%s' not known" % name)
6207 self.wanted_names.append(full_name)
6208 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6210 self.wanted_names = None
6211 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6213 self.needed_locks[locking.LEVEL_NODE] = []
6214 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6216 def DeclareLocks(self, level):
6217 if level == locking.LEVEL_NODE:
6218 self._LockInstancesNodes()
6220 def CheckPrereq(self):
6221 """Check prerequisites.
6223 This only checks the optional instance list against the existing names.
6226 if self.wanted_names is None:
6227 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
6229 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
6230 in self.wanted_names]
6233 def _ComputeDiskStatus(self, instance, snode, dev):
6234 """Compute block device status.
6237 static = self.op.static
6239 self.cfg.SetDiskID(dev, instance.primary_node)
6240 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
6241 if dev_pstatus.offline:
6244 dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
6245 dev_pstatus = dev_pstatus.payload
6249 if dev.dev_type in constants.LDS_DRBD:
6250 # we change the snode then (otherwise we use the one passed in)
6251 if dev.logical_id[0] == instance.primary_node:
6252 snode = dev.logical_id[1]
6254 snode = dev.logical_id[0]
6256 if snode and not static:
6257 self.cfg.SetDiskID(dev, snode)
6258 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
6259 if dev_sstatus.offline:
6262 dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
6263 dev_sstatus = dev_sstatus.payload
6268 dev_children = [self._ComputeDiskStatus(instance, snode, child)
6269 for child in dev.children]
6274 "iv_name": dev.iv_name,
6275 "dev_type": dev.dev_type,
6276 "logical_id": dev.logical_id,
6277 "physical_id": dev.physical_id,
6278 "pstatus": dev_pstatus,
6279 "sstatus": dev_sstatus,
6280 "children": dev_children,
6287 def Exec(self, feedback_fn):
6288 """Gather and return data"""
6291 cluster = self.cfg.GetClusterInfo()
6293 for instance in self.wanted_instances:
6294 if not self.op.static:
6295 remote_info = self.rpc.call_instance_info(instance.primary_node,
6297 instance.hypervisor)
6298 remote_info.Raise("Error checking node %s" % instance.primary_node)
6299 remote_info = remote_info.payload
6300 if remote_info and "state" in remote_info:
6303 remote_state = "down"
6306 if instance.admin_up:
6309 config_state = "down"
6311 disks = [self._ComputeDiskStatus(instance, None, device)
6312 for device in instance.disks]
6315 "name": instance.name,
6316 "config_state": config_state,
6317 "run_state": remote_state,
6318 "pnode": instance.primary_node,
6319 "snodes": instance.secondary_nodes,
6321 # this happens to be the same format used for hooks
6322 "nics": _NICListToTuple(self, instance.nics),
6324 "hypervisor": instance.hypervisor,
6325 "network_port": instance.network_port,
6326 "hv_instance": instance.hvparams,
6327 "hv_actual": cluster.FillHV(instance),
6328 "be_instance": instance.beparams,
6329 "be_actual": cluster.FillBE(instance),
6332 result[instance.name] = idict
6337 class LUSetInstanceParams(LogicalUnit):
6338 """Modifies an instances's parameters.
6341 HPATH = "instance-modify"
6342 HTYPE = constants.HTYPE_INSTANCE
6343 _OP_REQP = ["instance_name"]
6346 def CheckArguments(self):
6347 if not hasattr(self.op, 'nics'):
6349 if not hasattr(self.op, 'disks'):
6351 if not hasattr(self.op, 'beparams'):
6352 self.op.beparams = {}
6353 if not hasattr(self.op, 'hvparams'):
6354 self.op.hvparams = {}
6355 self.op.force = getattr(self.op, "force", False)
6356 if not (self.op.nics or self.op.disks or
6357 self.op.hvparams or self.op.beparams):
6358 raise errors.OpPrereqError("No changes submitted")
6362 for disk_op, disk_dict in self.op.disks:
6363 if disk_op == constants.DDM_REMOVE:
6366 elif disk_op == constants.DDM_ADD:
6369 if not isinstance(disk_op, int):
6370 raise errors.OpPrereqError("Invalid disk index")
6371 if not isinstance(disk_dict, dict):
6372 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
6373 raise errors.OpPrereqError(msg)
6375 if disk_op == constants.DDM_ADD:
6376 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
6377 if mode not in constants.DISK_ACCESS_SET:
6378 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
6379 size = disk_dict.get('size', None)
6381 raise errors.OpPrereqError("Required disk parameter size missing")
6384 except ValueError, err:
6385 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
6387 disk_dict['size'] = size
6389 # modification of disk
6390 if 'size' in disk_dict:
6391 raise errors.OpPrereqError("Disk size change not possible, use"
6394 if disk_addremove > 1:
6395 raise errors.OpPrereqError("Only one disk add or remove operation"
6396 " supported at a time")
6400 for nic_op, nic_dict in self.op.nics:
6401 if nic_op == constants.DDM_REMOVE:
6404 elif nic_op == constants.DDM_ADD:
6407 if not isinstance(nic_op, int):
6408 raise errors.OpPrereqError("Invalid nic index")
6409 if not isinstance(nic_dict, dict):
6410 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
6411 raise errors.OpPrereqError(msg)
6413 # nic_dict should be a dict
6414 nic_ip = nic_dict.get('ip', None)
6415 if nic_ip is not None:
6416 if nic_ip.lower() == constants.VALUE_NONE:
6417 nic_dict['ip'] = None
6419 if not utils.IsValidIP(nic_ip):
6420 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
6422 nic_bridge = nic_dict.get('bridge', None)
6423 nic_link = nic_dict.get('link', None)
6424 if nic_bridge and nic_link:
6425 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
6426 " at the same time")
6427 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
6428 nic_dict['bridge'] = None
6429 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
6430 nic_dict['link'] = None
6432 if nic_op == constants.DDM_ADD:
6433 nic_mac = nic_dict.get('mac', None)
6435 nic_dict['mac'] = constants.VALUE_AUTO
6437 if 'mac' in nic_dict:
6438 nic_mac = nic_dict['mac']
6439 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6440 if not utils.IsValidMac(nic_mac):
6441 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6442 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6443 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6444 " modifying an existing nic")
6446 if nic_addremove > 1:
6447 raise errors.OpPrereqError("Only one NIC add or remove operation"
6448 " supported at a time")
6450 def ExpandNames(self):
6451 self._ExpandAndLockInstance()
6452 self.needed_locks[locking.LEVEL_NODE] = []
6453 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6455 def DeclareLocks(self, level):
6456 if level == locking.LEVEL_NODE:
6457 self._LockInstancesNodes()
6459 def BuildHooksEnv(self):
6462 This runs on the master, primary and secondaries.
6466 if constants.BE_MEMORY in self.be_new:
6467 args['memory'] = self.be_new[constants.BE_MEMORY]
6468 if constants.BE_VCPUS in self.be_new:
6469 args['vcpus'] = self.be_new[constants.BE_VCPUS]
6470 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6471 # information at all.
6474 nic_override = dict(self.op.nics)
6475 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6476 for idx, nic in enumerate(self.instance.nics):
6477 if idx in nic_override:
6478 this_nic_override = nic_override[idx]
6480 this_nic_override = {}
6481 if 'ip' in this_nic_override:
6482 ip = this_nic_override['ip']
6485 if 'mac' in this_nic_override:
6486 mac = this_nic_override['mac']
6489 if idx in self.nic_pnew:
6490 nicparams = self.nic_pnew[idx]
6492 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6493 mode = nicparams[constants.NIC_MODE]
6494 link = nicparams[constants.NIC_LINK]
6495 args['nics'].append((ip, mac, mode, link))
6496 if constants.DDM_ADD in nic_override:
6497 ip = nic_override[constants.DDM_ADD].get('ip', None)
6498 mac = nic_override[constants.DDM_ADD]['mac']
6499 nicparams = self.nic_pnew[constants.DDM_ADD]
6500 mode = nicparams[constants.NIC_MODE]
6501 link = nicparams[constants.NIC_LINK]
6502 args['nics'].append((ip, mac, mode, link))
6503 elif constants.DDM_REMOVE in nic_override:
6504 del args['nics'][-1]
6506 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6507 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6510 def _GetUpdatedParams(self, old_params, update_dict,
6511 default_values, parameter_types):
6512 """Return the new params dict for the given params.
6514 @type old_params: dict
6515 @param old_params: old parameters
6516 @type update_dict: dict
6517 @param update_dict: dict containing new parameter values,
6518 or constants.VALUE_DEFAULT to reset the
6519 parameter to its default value
6520 @type default_values: dict
6521 @param default_values: default values for the filled parameters
6522 @type parameter_types: dict
6523 @param parameter_types: dict mapping target dict keys to types
6524 in constants.ENFORCEABLE_TYPES
6525 @rtype: (dict, dict)
6526 @return: (new_parameters, filled_parameters)
6529 params_copy = copy.deepcopy(old_params)
6530 for key, val in update_dict.iteritems():
6531 if val == constants.VALUE_DEFAULT:
6533 del params_copy[key]
6537 params_copy[key] = val
6538 utils.ForceDictType(params_copy, parameter_types)
6539 params_filled = objects.FillDict(default_values, params_copy)
6540 return (params_copy, params_filled)
6542 def CheckPrereq(self):
6543 """Check prerequisites.
6545 This only checks the instance list against the existing names.
6548 self.force = self.op.force
6550 # checking the new params on the primary/secondary nodes
6552 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6553 cluster = self.cluster = self.cfg.GetClusterInfo()
6554 assert self.instance is not None, \
6555 "Cannot retrieve locked instance %s" % self.op.instance_name
6556 pnode = instance.primary_node
6557 nodelist = list(instance.all_nodes)
6559 # hvparams processing
6560 if self.op.hvparams:
6561 i_hvdict, hv_new = self._GetUpdatedParams(
6562 instance.hvparams, self.op.hvparams,
6563 cluster.hvparams[instance.hypervisor],
6564 constants.HVS_PARAMETER_TYPES)
6566 hypervisor.GetHypervisor(
6567 instance.hypervisor).CheckParameterSyntax(hv_new)
6568 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6569 self.hv_new = hv_new # the new actual values
6570 self.hv_inst = i_hvdict # the new dict (without defaults)
6572 self.hv_new = self.hv_inst = {}
6574 # beparams processing
6575 if self.op.beparams:
6576 i_bedict, be_new = self._GetUpdatedParams(
6577 instance.beparams, self.op.beparams,
6578 cluster.beparams[constants.PP_DEFAULT],
6579 constants.BES_PARAMETER_TYPES)
6580 self.be_new = be_new # the new actual values
6581 self.be_inst = i_bedict # the new dict (without defaults)
6583 self.be_new = self.be_inst = {}
6587 if constants.BE_MEMORY in self.op.beparams and not self.force:
6588 mem_check_list = [pnode]
6589 if be_new[constants.BE_AUTO_BALANCE]:
6590 # either we changed auto_balance to yes or it was from before
6591 mem_check_list.extend(instance.secondary_nodes)
6592 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6593 instance.hypervisor)
6594 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6595 instance.hypervisor)
6596 pninfo = nodeinfo[pnode]
6597 msg = pninfo.fail_msg
6599 # Assume the primary node is unreachable and go ahead
6600 self.warn.append("Can't get info from primary node %s: %s" %
6602 elif not isinstance(pninfo.payload.get('memory_free', None), int):
6603 self.warn.append("Node data from primary node %s doesn't contain"
6604 " free memory information" % pnode)
6605 elif instance_info.fail_msg:
6606 self.warn.append("Can't get instance runtime information: %s" %
6607 instance_info.fail_msg)
6609 if instance_info.payload:
6610 current_mem = int(instance_info.payload['memory'])
6612 # Assume instance not running
6613 # (there is a slight race condition here, but it's not very probable,
6614 # and we have no other way to check)
6616 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6617 pninfo.payload['memory_free'])
6619 raise errors.OpPrereqError("This change will prevent the instance"
6620 " from starting, due to %d MB of memory"
6621 " missing on its primary node" % miss_mem)
6623 if be_new[constants.BE_AUTO_BALANCE]:
6624 for node, nres in nodeinfo.items():
6625 if node not in instance.secondary_nodes:
6629 self.warn.append("Can't get info from secondary node %s: %s" %
6631 elif not isinstance(nres.payload.get('memory_free', None), int):
6632 self.warn.append("Secondary node %s didn't return free"
6633 " memory information" % node)
6634 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6635 self.warn.append("Not enough memory to failover instance to"
6636 " secondary node %s" % node)
6641 for nic_op, nic_dict in self.op.nics:
6642 if nic_op == constants.DDM_REMOVE:
6643 if not instance.nics:
6644 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6646 if nic_op != constants.DDM_ADD:
6648 if nic_op < 0 or nic_op >= len(instance.nics):
6649 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6651 (nic_op, len(instance.nics)))
6652 old_nic_params = instance.nics[nic_op].nicparams
6653 old_nic_ip = instance.nics[nic_op].ip
6658 update_params_dict = dict([(key, nic_dict[key])
6659 for key in constants.NICS_PARAMETERS
6660 if key in nic_dict])
6662 if 'bridge' in nic_dict:
6663 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6665 new_nic_params, new_filled_nic_params = \
6666 self._GetUpdatedParams(old_nic_params, update_params_dict,
6667 cluster.nicparams[constants.PP_DEFAULT],
6668 constants.NICS_PARAMETER_TYPES)
6669 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6670 self.nic_pinst[nic_op] = new_nic_params
6671 self.nic_pnew[nic_op] = new_filled_nic_params
6672 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6674 if new_nic_mode == constants.NIC_MODE_BRIDGED:
6675 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6676 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6678 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6680 self.warn.append(msg)
6682 raise errors.OpPrereqError(msg)
6683 if new_nic_mode == constants.NIC_MODE_ROUTED:
6684 if 'ip' in nic_dict:
6685 nic_ip = nic_dict['ip']
6689 raise errors.OpPrereqError('Cannot set the nic ip to None'
6691 if 'mac' in nic_dict:
6692 nic_mac = nic_dict['mac']
6694 raise errors.OpPrereqError('Cannot set the nic mac to None')
6695 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6696 # otherwise generate the mac
6697 nic_dict['mac'] = self.cfg.GenerateMAC()
6699 # or validate/reserve the current one
6700 if self.cfg.IsMacInUse(nic_mac):
6701 raise errors.OpPrereqError("MAC address %s already in use"
6702 " in cluster" % nic_mac)
6705 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6706 raise errors.OpPrereqError("Disk operations not supported for"
6707 " diskless instances")
6708 for disk_op, disk_dict in self.op.disks:
6709 if disk_op == constants.DDM_REMOVE:
6710 if len(instance.disks) == 1:
6711 raise errors.OpPrereqError("Cannot remove the last disk of"
6713 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6714 ins_l = ins_l[pnode]
6715 msg = ins_l.fail_msg
6717 raise errors.OpPrereqError("Can't contact node %s: %s" %
6719 if instance.name in ins_l.payload:
6720 raise errors.OpPrereqError("Instance is running, can't remove"
6723 if (disk_op == constants.DDM_ADD and
6724 len(instance.nics) >= constants.MAX_DISKS):
6725 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6726 " add more" % constants.MAX_DISKS)
6727 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6729 if disk_op < 0 or disk_op >= len(instance.disks):
6730 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6732 (disk_op, len(instance.disks)))
6736 def Exec(self, feedback_fn):
6737 """Modifies an instance.
6739 All parameters take effect only at the next restart of the instance.
6742 # Process here the warnings from CheckPrereq, as we don't have a
6743 # feedback_fn there.
6744 for warn in self.warn:
6745 feedback_fn("WARNING: %s" % warn)
6748 instance = self.instance
6749 cluster = self.cluster
6751 for disk_op, disk_dict in self.op.disks:
6752 if disk_op == constants.DDM_REMOVE:
6753 # remove the last disk
6754 device = instance.disks.pop()
6755 device_idx = len(instance.disks)
6756 for node, disk in device.ComputeNodeTree(instance.primary_node):
6757 self.cfg.SetDiskID(disk, node)
6758 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6760 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6761 " continuing anyway", device_idx, node, msg)
6762 result.append(("disk/%d" % device_idx, "remove"))
6763 elif disk_op == constants.DDM_ADD:
6765 if instance.disk_template == constants.DT_FILE:
6766 file_driver, file_path = instance.disks[0].logical_id
6767 file_path = os.path.dirname(file_path)
6769 file_driver = file_path = None
6770 disk_idx_base = len(instance.disks)
6771 new_disk = _GenerateDiskTemplate(self,
6772 instance.disk_template,
6773 instance.name, instance.primary_node,
6774 instance.secondary_nodes,
6779 instance.disks.append(new_disk)
6780 info = _GetInstanceInfoText(instance)
6782 logging.info("Creating volume %s for instance %s",
6783 new_disk.iv_name, instance.name)
6784 # Note: this needs to be kept in sync with _CreateDisks
6786 for node in instance.all_nodes:
6787 f_create = node == instance.primary_node
6789 _CreateBlockDev(self, node, instance, new_disk,
6790 f_create, info, f_create)
6791 except errors.OpExecError, err:
6792 self.LogWarning("Failed to create volume %s (%s) on"
6794 new_disk.iv_name, new_disk, node, err)
6795 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6796 (new_disk.size, new_disk.mode)))
6798 # change a given disk
6799 instance.disks[disk_op].mode = disk_dict['mode']
6800 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6802 for nic_op, nic_dict in self.op.nics:
6803 if nic_op == constants.DDM_REMOVE:
6804 # remove the last nic
6805 del instance.nics[-1]
6806 result.append(("nic.%d" % len(instance.nics), "remove"))
6807 elif nic_op == constants.DDM_ADD:
6808 # mac and bridge should be set, by now
6809 mac = nic_dict['mac']
6810 ip = nic_dict.get('ip', None)
6811 nicparams = self.nic_pinst[constants.DDM_ADD]
6812 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6813 instance.nics.append(new_nic)
6814 result.append(("nic.%d" % (len(instance.nics) - 1),
6815 "add:mac=%s,ip=%s,mode=%s,link=%s" %
6816 (new_nic.mac, new_nic.ip,
6817 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6818 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6821 for key in 'mac', 'ip':
6823 setattr(instance.nics[nic_op], key, nic_dict[key])
6824 if nic_op in self.nic_pnew:
6825 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6826 for key, val in nic_dict.iteritems():
6827 result.append(("nic.%s/%d" % (key, nic_op), val))
6830 if self.op.hvparams:
6831 instance.hvparams = self.hv_inst
6832 for key, val in self.op.hvparams.iteritems():
6833 result.append(("hv/%s" % key, val))
6836 if self.op.beparams:
6837 instance.beparams = self.be_inst
6838 for key, val in self.op.beparams.iteritems():
6839 result.append(("be/%s" % key, val))
6841 self.cfg.Update(instance)
6846 class LUQueryExports(NoHooksLU):
6847 """Query the exports list
6850 _OP_REQP = ['nodes']
6853 def ExpandNames(self):
6854 self.needed_locks = {}
6855 self.share_locks[locking.LEVEL_NODE] = 1
6856 if not self.op.nodes:
6857 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6859 self.needed_locks[locking.LEVEL_NODE] = \
6860 _GetWantedNodes(self, self.op.nodes)
6862 def CheckPrereq(self):
6863 """Check prerequisites.
6866 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6868 def Exec(self, feedback_fn):
6869 """Compute the list of all the exported system images.
6872 @return: a dictionary with the structure node->(export-list)
6873 where export-list is a list of the instances exported on
6877 rpcresult = self.rpc.call_export_list(self.nodes)
6879 for node in rpcresult:
6880 if rpcresult[node].fail_msg:
6881 result[node] = False
6883 result[node] = rpcresult[node].payload
6888 class LUExportInstance(LogicalUnit):
6889 """Export an instance to an image in the cluster.
6892 HPATH = "instance-export"
6893 HTYPE = constants.HTYPE_INSTANCE
6894 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6897 def ExpandNames(self):
6898 self._ExpandAndLockInstance()
6899 # FIXME: lock only instance primary and destination node
6901 # Sad but true, for now we have do lock all nodes, as we don't know where
6902 # the previous export might be, and and in this LU we search for it and
6903 # remove it from its current node. In the future we could fix this by:
6904 # - making a tasklet to search (share-lock all), then create the new one,
6905 # then one to remove, after
6906 # - removing the removal operation altogether
6907 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6909 def DeclareLocks(self, level):
6910 """Last minute lock declaration."""
6911 # All nodes are locked anyway, so nothing to do here.
6913 def BuildHooksEnv(self):
6916 This will run on the master, primary node and target node.
6920 "EXPORT_NODE": self.op.target_node,
6921 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6923 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6924 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6925 self.op.target_node]
6928 def CheckPrereq(self):
6929 """Check prerequisites.
6931 This checks that the instance and node names are valid.
6934 instance_name = self.op.instance_name
6935 self.instance = self.cfg.GetInstanceInfo(instance_name)
6936 assert self.instance is not None, \
6937 "Cannot retrieve locked instance %s" % self.op.instance_name
6938 _CheckNodeOnline(self, self.instance.primary_node)
6940 self.dst_node = self.cfg.GetNodeInfo(
6941 self.cfg.ExpandNodeName(self.op.target_node))
6943 if self.dst_node is None:
6944 # This is wrong node name, not a non-locked node
6945 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6946 _CheckNodeOnline(self, self.dst_node.name)
6947 _CheckNodeNotDrained(self, self.dst_node.name)
6949 # instance disk type verification
6950 for disk in self.instance.disks:
6951 if disk.dev_type == constants.LD_FILE:
6952 raise errors.OpPrereqError("Export not supported for instances with"
6953 " file-based disks")
6955 def Exec(self, feedback_fn):
6956 """Export an instance to an image in the cluster.
6959 instance = self.instance
6960 dst_node = self.dst_node
6961 src_node = instance.primary_node
6962 if self.op.shutdown:
6963 # shutdown the instance, but not the disks
6964 result = self.rpc.call_instance_shutdown(src_node, instance)
6965 result.Raise("Could not shutdown instance %s on"
6966 " node %s" % (instance.name, src_node))
6968 vgname = self.cfg.GetVGName()
6972 # set the disks ID correctly since call_instance_start needs the
6973 # correct drbd minor to create the symlinks
6974 for disk in instance.disks:
6975 self.cfg.SetDiskID(disk, src_node)
6978 for idx, disk in enumerate(instance.disks):
6979 # result.payload will be a snapshot of an lvm leaf of the one we passed
6980 result = self.rpc.call_blockdev_snapshot(src_node, disk)
6981 msg = result.fail_msg
6983 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6985 snap_disks.append(False)
6987 disk_id = (vgname, result.payload)
6988 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6989 logical_id=disk_id, physical_id=disk_id,
6990 iv_name=disk.iv_name)
6991 snap_disks.append(new_dev)
6994 if self.op.shutdown and instance.admin_up:
6995 result = self.rpc.call_instance_start(src_node, instance, None, None)
6996 msg = result.fail_msg
6998 _ShutdownInstanceDisks(self, instance)
6999 raise errors.OpExecError("Could not start instance: %s" % msg)
7001 # TODO: check for size
7003 cluster_name = self.cfg.GetClusterName()
7004 for idx, dev in enumerate(snap_disks):
7006 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7007 instance, cluster_name, idx)
7008 msg = result.fail_msg
7010 self.LogWarning("Could not export disk/%s from node %s to"
7011 " node %s: %s", idx, src_node, dst_node.name, msg)
7012 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7014 self.LogWarning("Could not remove snapshot for disk/%d from node"
7015 " %s: %s", idx, src_node, msg)
7017 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7018 msg = result.fail_msg
7020 self.LogWarning("Could not finalize export for instance %s"
7021 " on node %s: %s", instance.name, dst_node.name, msg)
7023 nodelist = self.cfg.GetNodeList()
7024 nodelist.remove(dst_node.name)
7026 # on one-node clusters nodelist will be empty after the removal
7027 # if we proceed the backup would be removed because OpQueryExports
7028 # substitutes an empty list with the full cluster node list.
7029 iname = instance.name
7031 exportlist = self.rpc.call_export_list(nodelist)
7032 for node in exportlist:
7033 if exportlist[node].fail_msg:
7035 if iname in exportlist[node].payload:
7036 msg = self.rpc.call_export_remove(node, iname).fail_msg
7038 self.LogWarning("Could not remove older export for instance %s"
7039 " on node %s: %s", iname, node, msg)
7042 class LURemoveExport(NoHooksLU):
7043 """Remove exports related to the named instance.
7046 _OP_REQP = ["instance_name"]
7049 def ExpandNames(self):
7050 self.needed_locks = {}
7051 # We need all nodes to be locked in order for RemoveExport to work, but we
7052 # don't need to lock the instance itself, as nothing will happen to it (and
7053 # we can remove exports also for a removed instance)
7054 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7056 def CheckPrereq(self):
7057 """Check prerequisites.
7061 def Exec(self, feedback_fn):
7062 """Remove any export.
7065 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7066 # If the instance was not found we'll try with the name that was passed in.
7067 # This will only work if it was an FQDN, though.
7069 if not instance_name:
7071 instance_name = self.op.instance_name
7073 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7074 exportlist = self.rpc.call_export_list(locked_nodes)
7076 for node in exportlist:
7077 msg = exportlist[node].fail_msg
7079 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7081 if instance_name in exportlist[node].payload:
7083 result = self.rpc.call_export_remove(node, instance_name)
7084 msg = result.fail_msg
7086 logging.error("Could not remove export for instance %s"
7087 " on node %s: %s", instance_name, node, msg)
7089 if fqdn_warn and not found:
7090 feedback_fn("Export not found. If trying to remove an export belonging"
7091 " to a deleted instance please use its Fully Qualified"
7095 class TagsLU(NoHooksLU):
7098 This is an abstract class which is the parent of all the other tags LUs.
7102 def ExpandNames(self):
7103 self.needed_locks = {}
7104 if self.op.kind == constants.TAG_NODE:
7105 name = self.cfg.ExpandNodeName(self.op.name)
7107 raise errors.OpPrereqError("Invalid node name (%s)" %
7110 self.needed_locks[locking.LEVEL_NODE] = name
7111 elif self.op.kind == constants.TAG_INSTANCE:
7112 name = self.cfg.ExpandInstanceName(self.op.name)
7114 raise errors.OpPrereqError("Invalid instance name (%s)" %
7117 self.needed_locks[locking.LEVEL_INSTANCE] = name
7119 def CheckPrereq(self):
7120 """Check prerequisites.
7123 if self.op.kind == constants.TAG_CLUSTER:
7124 self.target = self.cfg.GetClusterInfo()
7125 elif self.op.kind == constants.TAG_NODE:
7126 self.target = self.cfg.GetNodeInfo(self.op.name)
7127 elif self.op.kind == constants.TAG_INSTANCE:
7128 self.target = self.cfg.GetInstanceInfo(self.op.name)
7130 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7134 class LUGetTags(TagsLU):
7135 """Returns the tags of a given object.
7138 _OP_REQP = ["kind", "name"]
7141 def Exec(self, feedback_fn):
7142 """Returns the tag list.
7145 return list(self.target.GetTags())
7148 class LUSearchTags(NoHooksLU):
7149 """Searches the tags for a given pattern.
7152 _OP_REQP = ["pattern"]
7155 def ExpandNames(self):
7156 self.needed_locks = {}
7158 def CheckPrereq(self):
7159 """Check prerequisites.
7161 This checks the pattern passed for validity by compiling it.
7165 self.re = re.compile(self.op.pattern)
7166 except re.error, err:
7167 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
7168 (self.op.pattern, err))
7170 def Exec(self, feedback_fn):
7171 """Returns the tag list.
7175 tgts = [("/cluster", cfg.GetClusterInfo())]
7176 ilist = cfg.GetAllInstancesInfo().values()
7177 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7178 nlist = cfg.GetAllNodesInfo().values()
7179 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7181 for path, target in tgts:
7182 for tag in target.GetTags():
7183 if self.re.search(tag):
7184 results.append((path, tag))
7188 class LUAddTags(TagsLU):
7189 """Sets a tag on a given object.
7192 _OP_REQP = ["kind", "name", "tags"]
7195 def CheckPrereq(self):
7196 """Check prerequisites.
7198 This checks the type and length of the tag name and value.
7201 TagsLU.CheckPrereq(self)
7202 for tag in self.op.tags:
7203 objects.TaggableObject.ValidateTag(tag)
7205 def Exec(self, feedback_fn):
7210 for tag in self.op.tags:
7211 self.target.AddTag(tag)
7212 except errors.TagError, err:
7213 raise errors.OpExecError("Error while setting tag: %s" % str(err))
7215 self.cfg.Update(self.target)
7216 except errors.ConfigurationError:
7217 raise errors.OpRetryError("There has been a modification to the"
7218 " config file and the operation has been"
7219 " aborted. Please retry.")
7222 class LUDelTags(TagsLU):
7223 """Delete a list of tags from a given object.
7226 _OP_REQP = ["kind", "name", "tags"]
7229 def CheckPrereq(self):
7230 """Check prerequisites.
7232 This checks that we have the given tag.
7235 TagsLU.CheckPrereq(self)
7236 for tag in self.op.tags:
7237 objects.TaggableObject.ValidateTag(tag)
7238 del_tags = frozenset(self.op.tags)
7239 cur_tags = self.target.GetTags()
7240 if not del_tags <= cur_tags:
7241 diff_tags = del_tags - cur_tags
7242 diff_names = ["'%s'" % tag for tag in diff_tags]
7244 raise errors.OpPrereqError("Tag(s) %s not found" %
7245 (",".join(diff_names)))
7247 def Exec(self, feedback_fn):
7248 """Remove the tag from the object.
7251 for tag in self.op.tags:
7252 self.target.RemoveTag(tag)
7254 self.cfg.Update(self.target)
7255 except errors.ConfigurationError:
7256 raise errors.OpRetryError("There has been a modification to the"
7257 " config file and the operation has been"
7258 " aborted. Please retry.")
7261 class LUTestDelay(NoHooksLU):
7262 """Sleep for a specified amount of time.
7264 This LU sleeps on the master and/or nodes for a specified amount of
7268 _OP_REQP = ["duration", "on_master", "on_nodes"]
7271 def ExpandNames(self):
7272 """Expand names and set required locks.
7274 This expands the node list, if any.
7277 self.needed_locks = {}
7278 if self.op.on_nodes:
7279 # _GetWantedNodes can be used here, but is not always appropriate to use
7280 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
7282 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
7283 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
7285 def CheckPrereq(self):
7286 """Check prerequisites.
7290 def Exec(self, feedback_fn):
7291 """Do the actual sleep.
7294 if self.op.on_master:
7295 if not utils.TestDelay(self.op.duration):
7296 raise errors.OpExecError("Error during master delay test")
7297 if self.op.on_nodes:
7298 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
7299 for node, node_result in result.items():
7300 node_result.Raise("Failure during rpc call to node %s" % node)
7303 class IAllocator(object):
7304 """IAllocator framework.
7306 An IAllocator instance has three sets of attributes:
7307 - cfg that is needed to query the cluster
7308 - input data (all members of the _KEYS class attribute are required)
7309 - four buffer attributes (in|out_data|text), that represent the
7310 input (to the external script) in text and data structure format,
7311 and the output from it, again in two formats
7312 - the result variables from the script (success, info, nodes) for
7317 "mem_size", "disks", "disk_template",
7318 "os", "tags", "nics", "vcpus", "hypervisor",
7324 def __init__(self, cfg, rpc, mode, name, **kwargs):
7327 # init buffer variables
7328 self.in_text = self.out_text = self.in_data = self.out_data = None
7329 # init all input fields so that pylint is happy
7332 self.mem_size = self.disks = self.disk_template = None
7333 self.os = self.tags = self.nics = self.vcpus = None
7334 self.hypervisor = None
7335 self.relocate_from = None
7337 self.required_nodes = None
7338 # init result fields
7339 self.success = self.info = self.nodes = None
7340 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7341 keyset = self._ALLO_KEYS
7342 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7343 keyset = self._RELO_KEYS
7345 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
7346 " IAllocator" % self.mode)
7348 if key not in keyset:
7349 raise errors.ProgrammerError("Invalid input parameter '%s' to"
7350 " IAllocator" % key)
7351 setattr(self, key, kwargs[key])
7353 if key not in kwargs:
7354 raise errors.ProgrammerError("Missing input parameter '%s' to"
7355 " IAllocator" % key)
7356 self._BuildInputData()
7358 def _ComputeClusterData(self):
7359 """Compute the generic allocator input data.
7361 This is the data that is independent of the actual operation.
7365 cluster_info = cfg.GetClusterInfo()
7368 "version": constants.IALLOCATOR_VERSION,
7369 "cluster_name": cfg.GetClusterName(),
7370 "cluster_tags": list(cluster_info.GetTags()),
7371 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
7372 # we don't have job IDs
7374 iinfo = cfg.GetAllInstancesInfo().values()
7375 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
7379 node_list = cfg.GetNodeList()
7381 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7382 hypervisor_name = self.hypervisor
7383 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7384 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
7386 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
7389 self.rpc.call_all_instances_info(node_list,
7390 cluster_info.enabled_hypervisors)
7391 for nname, nresult in node_data.items():
7392 # first fill in static (config-based) values
7393 ninfo = cfg.GetNodeInfo(nname)
7395 "tags": list(ninfo.GetTags()),
7396 "primary_ip": ninfo.primary_ip,
7397 "secondary_ip": ninfo.secondary_ip,
7398 "offline": ninfo.offline,
7399 "drained": ninfo.drained,
7400 "master_candidate": ninfo.master_candidate,
7403 if not ninfo.offline:
7404 nresult.Raise("Can't get data for node %s" % nname)
7405 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
7407 remote_info = nresult.payload
7408 for attr in ['memory_total', 'memory_free', 'memory_dom0',
7409 'vg_size', 'vg_free', 'cpu_total']:
7410 if attr not in remote_info:
7411 raise errors.OpExecError("Node '%s' didn't return attribute"
7412 " '%s'" % (nname, attr))
7413 if not isinstance(remote_info[attr], int):
7414 raise errors.OpExecError("Node '%s' returned invalid value"
7416 (nname, attr, remote_info[attr]))
7417 # compute memory used by primary instances
7418 i_p_mem = i_p_up_mem = 0
7419 for iinfo, beinfo in i_list:
7420 if iinfo.primary_node == nname:
7421 i_p_mem += beinfo[constants.BE_MEMORY]
7422 if iinfo.name not in node_iinfo[nname].payload:
7425 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
7426 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
7427 remote_info['memory_free'] -= max(0, i_mem_diff)
7430 i_p_up_mem += beinfo[constants.BE_MEMORY]
7432 # compute memory used by instances
7434 "total_memory": remote_info['memory_total'],
7435 "reserved_memory": remote_info['memory_dom0'],
7436 "free_memory": remote_info['memory_free'],
7437 "total_disk": remote_info['vg_size'],
7438 "free_disk": remote_info['vg_free'],
7439 "total_cpus": remote_info['cpu_total'],
7440 "i_pri_memory": i_p_mem,
7441 "i_pri_up_memory": i_p_up_mem,
7445 node_results[nname] = pnr
7446 data["nodes"] = node_results
7450 for iinfo, beinfo in i_list:
7452 for nic in iinfo.nics:
7453 filled_params = objects.FillDict(
7454 cluster_info.nicparams[constants.PP_DEFAULT],
7456 nic_dict = {"mac": nic.mac,
7458 "mode": filled_params[constants.NIC_MODE],
7459 "link": filled_params[constants.NIC_LINK],
7461 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7462 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7463 nic_data.append(nic_dict)
7465 "tags": list(iinfo.GetTags()),
7466 "admin_up": iinfo.admin_up,
7467 "vcpus": beinfo[constants.BE_VCPUS],
7468 "memory": beinfo[constants.BE_MEMORY],
7470 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7472 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7473 "disk_template": iinfo.disk_template,
7474 "hypervisor": iinfo.hypervisor,
7476 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7478 instance_data[iinfo.name] = pir
7480 data["instances"] = instance_data
7484 def _AddNewInstance(self):
7485 """Add new instance data to allocator structure.
7487 This in combination with _AllocatorGetClusterData will create the
7488 correct structure needed as input for the allocator.
7490 The checks for the completeness of the opcode must have already been
7496 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7498 if self.disk_template in constants.DTS_NET_MIRROR:
7499 self.required_nodes = 2
7501 self.required_nodes = 1
7505 "disk_template": self.disk_template,
7508 "vcpus": self.vcpus,
7509 "memory": self.mem_size,
7510 "disks": self.disks,
7511 "disk_space_total": disk_space,
7513 "required_nodes": self.required_nodes,
7515 data["request"] = request
7517 def _AddRelocateInstance(self):
7518 """Add relocate instance data to allocator structure.
7520 This in combination with _IAllocatorGetClusterData will create the
7521 correct structure needed as input for the allocator.
7523 The checks for the completeness of the opcode must have already been
7527 instance = self.cfg.GetInstanceInfo(self.name)
7528 if instance is None:
7529 raise errors.ProgrammerError("Unknown instance '%s' passed to"
7530 " IAllocator" % self.name)
7532 if instance.disk_template not in constants.DTS_NET_MIRROR:
7533 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7535 if len(instance.secondary_nodes) != 1:
7536 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7538 self.required_nodes = 1
7539 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7540 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7545 "disk_space_total": disk_space,
7546 "required_nodes": self.required_nodes,
7547 "relocate_from": self.relocate_from,
7549 self.in_data["request"] = request
7551 def _BuildInputData(self):
7552 """Build input data structures.
7555 self._ComputeClusterData()
7557 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7558 self._AddNewInstance()
7560 self._AddRelocateInstance()
7562 self.in_text = serializer.Dump(self.in_data)
7564 def Run(self, name, validate=True, call_fn=None):
7565 """Run an instance allocator and return the results.
7569 call_fn = self.rpc.call_iallocator_runner
7571 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
7572 result.Raise("Failure while running the iallocator script")
7574 self.out_text = result.payload
7576 self._ValidateResult()
7578 def _ValidateResult(self):
7579 """Process the allocator results.
7581 This will process and if successful save the result in
7582 self.out_data and the other parameters.
7586 rdict = serializer.Load(self.out_text)
7587 except Exception, err:
7588 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7590 if not isinstance(rdict, dict):
7591 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7593 for key in "success", "info", "nodes":
7594 if key not in rdict:
7595 raise errors.OpExecError("Can't parse iallocator results:"
7596 " missing key '%s'" % key)
7597 setattr(self, key, rdict[key])
7599 if not isinstance(rdict["nodes"], list):
7600 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7602 self.out_data = rdict
7605 class LUTestAllocator(NoHooksLU):
7606 """Run allocator tests.
7608 This LU runs the allocator tests
7611 _OP_REQP = ["direction", "mode", "name"]
7613 def CheckPrereq(self):
7614 """Check prerequisites.
7616 This checks the opcode parameters depending on the director and mode test.
7619 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7620 for attr in ["name", "mem_size", "disks", "disk_template",
7621 "os", "tags", "nics", "vcpus"]:
7622 if not hasattr(self.op, attr):
7623 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7625 iname = self.cfg.ExpandInstanceName(self.op.name)
7626 if iname is not None:
7627 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7629 if not isinstance(self.op.nics, list):
7630 raise errors.OpPrereqError("Invalid parameter 'nics'")
7631 for row in self.op.nics:
7632 if (not isinstance(row, dict) or
7635 "bridge" not in row):
7636 raise errors.OpPrereqError("Invalid contents of the"
7637 " 'nics' parameter")
7638 if not isinstance(self.op.disks, list):
7639 raise errors.OpPrereqError("Invalid parameter 'disks'")
7640 for row in self.op.disks:
7641 if (not isinstance(row, dict) or
7642 "size" not in row or
7643 not isinstance(row["size"], int) or
7644 "mode" not in row or
7645 row["mode"] not in ['r', 'w']):
7646 raise errors.OpPrereqError("Invalid contents of the"
7647 " 'disks' parameter")
7648 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7649 self.op.hypervisor = self.cfg.GetHypervisorType()
7650 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7651 if not hasattr(self.op, "name"):
7652 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7653 fname = self.cfg.ExpandInstanceName(self.op.name)
7655 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7657 self.op.name = fname
7658 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7660 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7663 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7664 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7665 raise errors.OpPrereqError("Missing allocator name")
7666 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7667 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7670 def Exec(self, feedback_fn):
7671 """Run the allocator test.
7674 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7675 ial = IAllocator(self.cfg, self.rpc,
7678 mem_size=self.op.mem_size,
7679 disks=self.op.disks,
7680 disk_template=self.op.disk_template,
7684 vcpus=self.op.vcpus,
7685 hypervisor=self.op.hypervisor,
7688 ial = IAllocator(self.cfg, self.rpc,
7691 relocate_from=list(self.relocate_from),
7694 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7695 result = ial.in_text
7697 ial.Run(self.op.allocator, validate=False)
7698 result = ial.out_text