4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
611 def _NICListToTuple(lu, nics):
612 """Build a list of nic information tuples.
614 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615 value in LUQueryInstanceData.
617 @type lu: L{LogicalUnit}
618 @param lu: the logical unit on whose behalf we execute
619 @type nics: list of L{objects.NIC}
620 @param nics: list of nics to convert to hooks tuples
624 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
628 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629 mode = filled_params[constants.NIC_MODE]
630 link = filled_params[constants.NIC_LINK]
631 hooks_nics.append((ip, mac, mode, link))
635 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636 """Builds instance related env variables for hooks from an object.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type instance: L{objects.Instance}
641 @param instance: the instance for which we should build the
644 @param override: dictionary with key/values that will override
647 @return: the hook environment dictionary
650 cluster = lu.cfg.GetClusterInfo()
651 bep = cluster.FillBE(instance)
652 hvp = cluster.FillHV(instance)
654 'name': instance.name,
655 'primary_node': instance.primary_node,
656 'secondary_nodes': instance.secondary_nodes,
657 'os_type': instance.os,
658 'status': instance.admin_up,
659 'memory': bep[constants.BE_MEMORY],
660 'vcpus': bep[constants.BE_VCPUS],
661 'nics': _NICListToTuple(lu, instance.nics),
662 'disk_template': instance.disk_template,
663 'disks': [(disk.size, disk.mode) for disk in instance.disks],
666 'hypervisor_name': instance.hypervisor,
669 args.update(override)
670 return _BuildInstanceHookEnv(**args)
673 def _AdjustCandidatePool(lu):
674 """Adjust the candidate pool after node operations.
677 mod_list = lu.cfg.MaintainCandidatePool()
679 lu.LogInfo("Promoted nodes to master candidate role: %s",
680 ", ".join(node.name for node in mod_list))
681 for name in mod_list:
682 lu.context.ReaddNode(name)
683 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
685 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
689 def _CheckNicsBridgesExist(lu, target_nics, target_node,
690 profile=constants.PP_DEFAULT):
691 """Check that the brigdes needed by a list of nics exist.
694 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696 for nic in target_nics]
697 brlist = [params[constants.NIC_LINK] for params in paramslist
698 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
700 result = lu.rpc.call_bridges_exist(target_node, brlist)
701 result.Raise("Error checking bridges on destination node '%s'" %
702 target_node, prereq=True)
705 def _CheckInstanceBridgesExist(lu, instance, node=None):
706 """Check that the brigdes needed by an instance exist.
710 node = instance.primary_node
711 _CheckNicsBridgesExist(lu, instance.nics, node)
714 def _GetNodeInstancesInner(cfg, fn):
715 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
718 def _GetNodeInstances(cfg, node_name):
719 """Returns a list of all primary and secondary instances on a node.
723 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
726 def _GetNodePrimaryInstances(cfg, node_name):
727 """Returns primary instances on a node.
730 return _GetNodeInstancesInner(cfg,
731 lambda inst: node_name == inst.primary_node)
734 def _GetNodeSecondaryInstances(cfg, node_name):
735 """Returns secondary instances on a node.
738 return _GetNodeInstancesInner(cfg,
739 lambda inst: node_name in inst.secondary_nodes)
742 def _GetStorageTypeArgs(cfg, storage_type):
743 """Returns the arguments for a storage type.
746 # Special case for file storage
747 if storage_type == constants.ST_FILE:
748 # storage.FileStorage wants a list of storage directories
749 return [[cfg.GetFileStorageDir()]]
754 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
757 for dev in instance.disks:
758 cfg.SetDiskID(dev, node_name)
760 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
761 result.Raise("Failed to get disk status from node %s" % node_name,
764 for idx, bdev_status in enumerate(result.payload):
765 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
771 class LUPostInitCluster(LogicalUnit):
772 """Logical unit for running hooks after cluster initialization.
775 HPATH = "cluster-init"
776 HTYPE = constants.HTYPE_CLUSTER
779 def BuildHooksEnv(self):
783 env = {"OP_TARGET": self.cfg.GetClusterName()}
784 mn = self.cfg.GetMasterNode()
787 def CheckPrereq(self):
788 """No prerequisites to check.
793 def Exec(self, feedback_fn):
800 class LUDestroyCluster(NoHooksLU):
801 """Logical unit for destroying the cluster.
806 def CheckPrereq(self):
807 """Check prerequisites.
809 This checks whether the cluster is empty.
811 Any errors are signaled by raising errors.OpPrereqError.
814 master = self.cfg.GetMasterNode()
816 nodelist = self.cfg.GetNodeList()
817 if len(nodelist) != 1 or nodelist[0] != master:
818 raise errors.OpPrereqError("There are still %d node(s) in"
819 " this cluster." % (len(nodelist) - 1))
820 instancelist = self.cfg.GetInstanceList()
822 raise errors.OpPrereqError("There are still %d instance(s) in"
823 " this cluster." % len(instancelist))
825 def Exec(self, feedback_fn):
826 """Destroys the cluster.
829 master = self.cfg.GetMasterNode()
830 result = self.rpc.call_node_stop_master(master, False)
831 result.Raise("Could not disable the master role")
832 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
833 utils.CreateBackup(priv_key)
834 utils.CreateBackup(pub_key)
838 class LUVerifyCluster(LogicalUnit):
839 """Verifies the cluster status.
842 HPATH = "cluster-verify"
843 HTYPE = constants.HTYPE_CLUSTER
844 _OP_REQP = ["skip_checks"]
847 def ExpandNames(self):
848 self.needed_locks = {
849 locking.LEVEL_NODE: locking.ALL_SET,
850 locking.LEVEL_INSTANCE: locking.ALL_SET,
852 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
854 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
855 node_result, feedback_fn, master_files,
857 """Run multiple tests against a node.
861 - compares ganeti version
862 - checks vg existence and size > 20G
863 - checks config file checksum
864 - checks ssh to other nodes
866 @type nodeinfo: L{objects.Node}
867 @param nodeinfo: the node to check
868 @param file_list: required list of files
869 @param local_cksum: dictionary of local files and their checksums
870 @param node_result: the results from the node
871 @param feedback_fn: function used to accumulate results
872 @param master_files: list of files that only masters should have
873 @param drbd_map: the useddrbd minors for this node, in
874 form of minor: (instance, must_exist) which correspond to instances
875 and their running status
876 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
881 # main result, node_result should be a non-empty dict
882 if not node_result or not isinstance(node_result, dict):
883 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
886 # compares ganeti version
887 local_version = constants.PROTOCOL_VERSION
888 remote_version = node_result.get('version', None)
889 if not (remote_version and isinstance(remote_version, (list, tuple)) and
890 len(remote_version) == 2):
891 feedback_fn(" - ERROR: connection to %s failed" % (node))
894 if local_version != remote_version[0]:
895 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
896 " node %s %s" % (local_version, node, remote_version[0]))
899 # node seems compatible, we can actually try to look into its results
903 # full package version
904 if constants.RELEASE_VERSION != remote_version[1]:
905 feedback_fn(" - WARNING: software version mismatch: master %s,"
907 (constants.RELEASE_VERSION, node, remote_version[1]))
909 # checks vg existence and size > 20G
910 if vg_name is not None:
911 vglist = node_result.get(constants.NV_VGLIST, None)
913 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
917 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
918 constants.MIN_VG_SIZE)
920 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
923 # checks config file checksum
925 remote_cksum = node_result.get(constants.NV_FILELIST, None)
926 if not isinstance(remote_cksum, dict):
928 feedback_fn(" - ERROR: node hasn't returned file checksum data")
930 for file_name in file_list:
931 node_is_mc = nodeinfo.master_candidate
932 must_have_file = file_name not in master_files
933 if file_name not in remote_cksum:
934 if node_is_mc or must_have_file:
936 feedback_fn(" - ERROR: file '%s' missing" % file_name)
937 elif remote_cksum[file_name] != local_cksum[file_name]:
938 if node_is_mc or must_have_file:
940 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
942 # not candidate and this is not a must-have file
944 feedback_fn(" - ERROR: file '%s' should not exist on non master"
945 " candidates (and the file is outdated)" % file_name)
947 # all good, except non-master/non-must have combination
948 if not node_is_mc and not must_have_file:
949 feedback_fn(" - ERROR: file '%s' should not exist on non master"
950 " candidates" % file_name)
954 if constants.NV_NODELIST not in node_result:
956 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
958 if node_result[constants.NV_NODELIST]:
960 for node in node_result[constants.NV_NODELIST]:
961 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
962 (node, node_result[constants.NV_NODELIST][node]))
964 if constants.NV_NODENETTEST not in node_result:
966 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
968 if node_result[constants.NV_NODENETTEST]:
970 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
972 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
973 (node, node_result[constants.NV_NODENETTEST][node]))
975 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
976 if isinstance(hyp_result, dict):
977 for hv_name, hv_result in hyp_result.iteritems():
978 if hv_result is not None:
979 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
980 (hv_name, hv_result))
982 # check used drbd list
983 if vg_name is not None:
984 used_minors = node_result.get(constants.NV_DRBDLIST, [])
985 if not isinstance(used_minors, (tuple, list)):
986 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
989 for minor, (iname, must_exist) in drbd_map.items():
990 if minor not in used_minors and must_exist:
991 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
992 " not active" % (minor, iname))
994 for minor in used_minors:
995 if minor not in drbd_map:
996 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
1002 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1003 node_instance, feedback_fn, n_offline):
1004 """Verify an instance.
1006 This function checks to see if the required block devices are
1007 available on the instance's node.
1012 node_current = instanceconfig.primary_node
1014 node_vol_should = {}
1015 instanceconfig.MapLVsByNode(node_vol_should)
1017 for node in node_vol_should:
1018 if node in n_offline:
1019 # ignore missing volumes on offline nodes
1021 for volume in node_vol_should[node]:
1022 if node not in node_vol_is or volume not in node_vol_is[node]:
1023 feedback_fn(" - ERROR: volume %s missing on node %s" %
1027 if instanceconfig.admin_up:
1028 if ((node_current not in node_instance or
1029 not instance in node_instance[node_current]) and
1030 node_current not in n_offline):
1031 feedback_fn(" - ERROR: instance %s not running on node %s" %
1032 (instance, node_current))
1035 for node in node_instance:
1036 if (not node == node_current):
1037 if instance in node_instance[node]:
1038 feedback_fn(" - ERROR: instance %s should not run on node %s" %
1044 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
1045 """Verify if there are any unknown volumes in the cluster.
1047 The .os, .swap and backup volumes are ignored. All other volumes are
1048 reported as unknown.
1053 for node in node_vol_is:
1054 for volume in node_vol_is[node]:
1055 if node not in node_vol_should or volume not in node_vol_should[node]:
1056 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
1061 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
1062 """Verify the list of running instances.
1064 This checks what instances are running but unknown to the cluster.
1068 for node in node_instance:
1069 for runninginstance in node_instance[node]:
1070 if runninginstance not in instancelist:
1071 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
1072 (runninginstance, node))
1076 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1077 """Verify N+1 Memory Resilience.
1079 Check that if one single node dies we can still start all the instances it
1085 for node, nodeinfo in node_info.iteritems():
1086 # This code checks that every node which is now listed as secondary has
1087 # enough memory to host all instances it is supposed to should a single
1088 # other node in the cluster fail.
1089 # FIXME: not ready for failover to an arbitrary node
1090 # FIXME: does not support file-backed instances
1091 # WARNING: we currently take into account down instances as well as up
1092 # ones, considering that even if they're down someone might want to start
1093 # them even in the event of a node failure.
1094 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1096 for instance in instances:
1097 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1098 if bep[constants.BE_AUTO_BALANCE]:
1099 needed_mem += bep[constants.BE_MEMORY]
1100 if nodeinfo['mfree'] < needed_mem:
1101 feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
1102 " failovers should node %s fail" % (node, prinode))
1106 def CheckPrereq(self):
1107 """Check prerequisites.
1109 Transform the list of checks we're going to skip into a set and check that
1110 all its members are valid.
1113 self.skip_set = frozenset(self.op.skip_checks)
1114 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1115 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1117 def BuildHooksEnv(self):
1120 Cluster-Verify hooks just ran in the post phase and their failure makes
1121 the output be logged in the verify output and the verification to fail.
1124 all_nodes = self.cfg.GetNodeList()
1126 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1128 for node in self.cfg.GetAllNodesInfo().values():
1129 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1131 return env, [], all_nodes
1133 def Exec(self, feedback_fn):
1134 """Verify integrity of cluster, performing various test on nodes.
1138 feedback_fn("* Verifying global settings")
1139 for msg in self.cfg.VerifyConfig():
1140 feedback_fn(" - ERROR: %s" % msg)
1142 vg_name = self.cfg.GetVGName()
1143 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1144 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1145 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1146 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1147 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1148 for iname in instancelist)
1149 i_non_redundant = [] # Non redundant instances
1150 i_non_a_balanced = [] # Non auto-balanced instances
1151 n_offline = [] # List of offline nodes
1152 n_drained = [] # List of nodes being drained
1158 # FIXME: verify OS list
1159 # do local checksums
1160 master_files = [constants.CLUSTER_CONF_FILE]
1162 file_names = ssconf.SimpleStore().GetFileList()
1163 file_names.append(constants.SSL_CERT_FILE)
1164 file_names.append(constants.RAPI_CERT_FILE)
1165 file_names.extend(master_files)
1167 local_checksums = utils.FingerprintFiles(file_names)
1169 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1170 node_verify_param = {
1171 constants.NV_FILELIST: file_names,
1172 constants.NV_NODELIST: [node.name for node in nodeinfo
1173 if not node.offline],
1174 constants.NV_HYPERVISOR: hypervisors,
1175 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1176 node.secondary_ip) for node in nodeinfo
1177 if not node.offline],
1178 constants.NV_INSTANCELIST: hypervisors,
1179 constants.NV_VERSION: None,
1180 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1182 if vg_name is not None:
1183 node_verify_param[constants.NV_VGLIST] = None
1184 node_verify_param[constants.NV_LVLIST] = vg_name
1185 node_verify_param[constants.NV_DRBDLIST] = None
1186 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1187 self.cfg.GetClusterName())
1189 cluster = self.cfg.GetClusterInfo()
1190 master_node = self.cfg.GetMasterNode()
1191 all_drbd_map = self.cfg.ComputeDRBDMap()
1193 for node_i in nodeinfo:
1197 feedback_fn("* Skipping offline node %s" % (node,))
1198 n_offline.append(node)
1201 if node == master_node:
1203 elif node_i.master_candidate:
1204 ntype = "master candidate"
1205 elif node_i.drained:
1207 n_drained.append(node)
1210 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1212 msg = all_nvinfo[node].fail_msg
1214 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1218 nresult = all_nvinfo[node].payload
1220 for minor, instance in all_drbd_map[node].items():
1221 if instance not in instanceinfo:
1222 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1224 # ghost instance should not be running, but otherwise we
1225 # don't give double warnings (both ghost instance and
1226 # unallocated minor in use)
1227 node_drbd[minor] = (instance, False)
1229 instance = instanceinfo[instance]
1230 node_drbd[minor] = (instance.name, instance.admin_up)
1231 result = self._VerifyNode(node_i, file_names, local_checksums,
1232 nresult, feedback_fn, master_files,
1236 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1238 node_volume[node] = {}
1239 elif isinstance(lvdata, basestring):
1240 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1241 (node, utils.SafeEncode(lvdata)))
1243 node_volume[node] = {}
1244 elif not isinstance(lvdata, dict):
1245 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1249 node_volume[node] = lvdata
1252 idata = nresult.get(constants.NV_INSTANCELIST, None)
1253 if not isinstance(idata, list):
1254 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1259 node_instance[node] = idata
1262 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1263 if not isinstance(nodeinfo, dict):
1264 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1270 "mfree": int(nodeinfo['memory_free']),
1273 # dictionary holding all instances this node is secondary for,
1274 # grouped by their primary node. Each key is a cluster node, and each
1275 # value is a list of instances which have the key as primary and the
1276 # current node as secondary. this is handy to calculate N+1 memory
1277 # availability if you can only failover from a primary to its
1279 "sinst-by-pnode": {},
1281 # FIXME: devise a free space model for file based instances as well
1282 if vg_name is not None:
1283 if (constants.NV_VGLIST not in nresult or
1284 vg_name not in nresult[constants.NV_VGLIST]):
1285 feedback_fn(" - ERROR: node %s didn't return data for the"
1286 " volume group '%s' - it is either missing or broken" %
1290 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1291 except (ValueError, KeyError):
1292 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1293 " from node %s" % (node,))
1297 node_vol_should = {}
1299 for instance in instancelist:
1300 feedback_fn("* Verifying instance %s" % instance)
1301 inst_config = instanceinfo[instance]
1302 result = self._VerifyInstance(instance, inst_config, node_volume,
1303 node_instance, feedback_fn, n_offline)
1305 inst_nodes_offline = []
1307 inst_config.MapLVsByNode(node_vol_should)
1309 instance_cfg[instance] = inst_config
1311 pnode = inst_config.primary_node
1312 if pnode in node_info:
1313 node_info[pnode]['pinst'].append(instance)
1314 elif pnode not in n_offline:
1315 feedback_fn(" - ERROR: instance %s, connection to primary node"
1316 " %s failed" % (instance, pnode))
1319 if pnode in n_offline:
1320 inst_nodes_offline.append(pnode)
1322 # If the instance is non-redundant we cannot survive losing its primary
1323 # node, so we are not N+1 compliant. On the other hand we have no disk
1324 # templates with more than one secondary so that situation is not well
1326 # FIXME: does not support file-backed instances
1327 if len(inst_config.secondary_nodes) == 0:
1328 i_non_redundant.append(instance)
1329 elif len(inst_config.secondary_nodes) > 1:
1330 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1333 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1334 i_non_a_balanced.append(instance)
1336 for snode in inst_config.secondary_nodes:
1337 if snode in node_info:
1338 node_info[snode]['sinst'].append(instance)
1339 if pnode not in node_info[snode]['sinst-by-pnode']:
1340 node_info[snode]['sinst-by-pnode'][pnode] = []
1341 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1342 elif snode not in n_offline:
1343 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1344 " %s failed" % (instance, snode))
1346 if snode in n_offline:
1347 inst_nodes_offline.append(snode)
1349 if inst_nodes_offline:
1350 # warn that the instance lives on offline nodes, and set bad=True
1351 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1352 ", ".join(inst_nodes_offline))
1355 feedback_fn("* Verifying orphan volumes")
1356 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1360 feedback_fn("* Verifying remaining instances")
1361 result = self._VerifyOrphanInstances(instancelist, node_instance,
1365 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1366 feedback_fn("* Verifying N+1 Memory redundancy")
1367 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1370 feedback_fn("* Other Notes")
1372 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1373 % len(i_non_redundant))
1375 if i_non_a_balanced:
1376 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1377 % len(i_non_a_balanced))
1380 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1383 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1387 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1388 """Analyze the post-hooks' result
1390 This method analyses the hook result, handles it, and sends some
1391 nicely-formatted feedback back to the user.
1393 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1394 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1395 @param hooks_results: the results of the multi-node hooks rpc call
1396 @param feedback_fn: function used send feedback back to the caller
1397 @param lu_result: previous Exec result
1398 @return: the new Exec result, based on the previous result
1402 # We only really run POST phase hooks, and are only interested in
1404 if phase == constants.HOOKS_PHASE_POST:
1405 # Used to change hooks' output to proper indentation
1406 indent_re = re.compile('^', re.M)
1407 feedback_fn("* Hooks Results")
1408 if not hooks_results:
1409 feedback_fn(" - ERROR: general communication failure")
1412 for node_name in hooks_results:
1413 show_node_header = True
1414 res = hooks_results[node_name]
1418 # no need to warn or set fail return value
1420 feedback_fn(" Communication failure in hooks execution: %s" %
1424 for script, hkr, output in res.payload:
1425 if hkr == constants.HKR_FAIL:
1426 # The node header is only shown once, if there are
1427 # failing hooks on that node
1428 if show_node_header:
1429 feedback_fn(" Node %s:" % node_name)
1430 show_node_header = False
1431 feedback_fn(" ERROR: Script %s failed, output:" % script)
1432 output = indent_re.sub(' ', output)
1433 feedback_fn("%s" % output)
1439 class LUVerifyDisks(NoHooksLU):
1440 """Verifies the cluster disks status.
1446 def ExpandNames(self):
1447 self.needed_locks = {
1448 locking.LEVEL_NODE: locking.ALL_SET,
1449 locking.LEVEL_INSTANCE: locking.ALL_SET,
1451 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1453 def CheckPrereq(self):
1454 """Check prerequisites.
1456 This has no prerequisites.
1461 def Exec(self, feedback_fn):
1462 """Verify integrity of cluster disks.
1464 @rtype: tuple of three items
1465 @return: a tuple of (dict of node-to-node_error, list of instances
1466 which need activate-disks, dict of instance: (node, volume) for
1470 result = res_nodes, res_instances, res_missing = {}, [], {}
1472 vg_name = self.cfg.GetVGName()
1473 nodes = utils.NiceSort(self.cfg.GetNodeList())
1474 instances = [self.cfg.GetInstanceInfo(name)
1475 for name in self.cfg.GetInstanceList()]
1478 for inst in instances:
1480 if (not inst.admin_up or
1481 inst.disk_template not in constants.DTS_NET_MIRROR):
1483 inst.MapLVsByNode(inst_lvs)
1484 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1485 for node, vol_list in inst_lvs.iteritems():
1486 for vol in vol_list:
1487 nv_dict[(node, vol)] = inst
1492 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1496 node_res = node_lvs[node]
1497 if node_res.offline:
1499 msg = node_res.fail_msg
1501 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1502 res_nodes[node] = msg
1505 lvs = node_res.payload
1506 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1507 inst = nv_dict.pop((node, lv_name), None)
1508 if (not lv_online and inst is not None
1509 and inst.name not in res_instances):
1510 res_instances.append(inst.name)
1512 # any leftover items in nv_dict are missing LVs, let's arrange the
1514 for key, inst in nv_dict.iteritems():
1515 if inst.name not in res_missing:
1516 res_missing[inst.name] = []
1517 res_missing[inst.name].append(key)
1522 class LURepairDiskSizes(NoHooksLU):
1523 """Verifies the cluster disks sizes.
1526 _OP_REQP = ["instances"]
1529 def ExpandNames(self):
1531 if not isinstance(self.op.instances, list):
1532 raise errors.OpPrereqError("Invalid argument type 'instances'")
1534 if self.op.instances:
1535 self.wanted_names = []
1536 for name in self.op.instances:
1537 full_name = self.cfg.ExpandInstanceName(name)
1538 if full_name is None:
1539 raise errors.OpPrereqError("Instance '%s' not known" % name)
1540 self.wanted_names.append(full_name)
1541 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1542 self.needed_locks = {
1543 locking.LEVEL_NODE: [],
1544 locking.LEVEL_INSTANCE: self.wanted_names,
1546 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1548 self.wanted_names = None
1549 self.needed_locks = {
1550 locking.LEVEL_NODE: locking.ALL_SET,
1551 locking.LEVEL_INSTANCE: locking.ALL_SET,
1553 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1555 def DeclareLocks(self, level):
1556 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1557 self._LockInstancesNodes(primary_only=True)
1559 def CheckPrereq(self):
1560 """Check prerequisites.
1562 This only checks the optional instance list against the existing names.
1565 if self.wanted_names is None:
1566 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1568 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1569 in self.wanted_names]
1571 def Exec(self, feedback_fn):
1572 """Verify the size of cluster disks.
1575 # TODO: check child disks too
1576 # TODO: check differences in size between primary/secondary nodes
1578 for instance in self.wanted_instances:
1579 pnode = instance.primary_node
1580 if pnode not in per_node_disks:
1581 per_node_disks[pnode] = []
1582 for idx, disk in enumerate(instance.disks):
1583 per_node_disks[pnode].append((instance, idx, disk))
1586 for node, dskl in per_node_disks.items():
1587 result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1589 self.LogWarning("Failure in blockdev_getsizes call to node"
1590 " %s, ignoring", node)
1592 if len(result.data) != len(dskl):
1593 self.LogWarning("Invalid result from node %s, ignoring node results",
1596 for ((instance, idx, disk), size) in zip(dskl, result.data):
1598 self.LogWarning("Disk %d of instance %s did not return size"
1599 " information, ignoring", idx, instance.name)
1601 if not isinstance(size, (int, long)):
1602 self.LogWarning("Disk %d of instance %s did not return valid"
1603 " size information, ignoring", idx, instance.name)
1606 if size != disk.size:
1607 self.LogInfo("Disk %d of instance %s has mismatched size,"
1608 " correcting: recorded %d, actual %d", idx,
1609 instance.name, disk.size, size)
1611 self.cfg.Update(instance)
1612 changed.append((instance.name, idx, size))
1616 class LURenameCluster(LogicalUnit):
1617 """Rename the cluster.
1620 HPATH = "cluster-rename"
1621 HTYPE = constants.HTYPE_CLUSTER
1624 def BuildHooksEnv(self):
1629 "OP_TARGET": self.cfg.GetClusterName(),
1630 "NEW_NAME": self.op.name,
1632 mn = self.cfg.GetMasterNode()
1633 return env, [mn], [mn]
1635 def CheckPrereq(self):
1636 """Verify that the passed name is a valid one.
1639 hostname = utils.HostInfo(self.op.name)
1641 new_name = hostname.name
1642 self.ip = new_ip = hostname.ip
1643 old_name = self.cfg.GetClusterName()
1644 old_ip = self.cfg.GetMasterIP()
1645 if new_name == old_name and new_ip == old_ip:
1646 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1647 " cluster has changed")
1648 if new_ip != old_ip:
1649 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1650 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1651 " reachable on the network. Aborting." %
1654 self.op.name = new_name
1656 def Exec(self, feedback_fn):
1657 """Rename the cluster.
1660 clustername = self.op.name
1663 # shutdown the master IP
1664 master = self.cfg.GetMasterNode()
1665 result = self.rpc.call_node_stop_master(master, False)
1666 result.Raise("Could not disable the master role")
1669 cluster = self.cfg.GetClusterInfo()
1670 cluster.cluster_name = clustername
1671 cluster.master_ip = ip
1672 self.cfg.Update(cluster)
1674 # update the known hosts file
1675 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1676 node_list = self.cfg.GetNodeList()
1678 node_list.remove(master)
1681 result = self.rpc.call_upload_file(node_list,
1682 constants.SSH_KNOWN_HOSTS_FILE)
1683 for to_node, to_result in result.iteritems():
1684 msg = to_result.fail_msg
1686 msg = ("Copy of file %s to node %s failed: %s" %
1687 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1688 self.proc.LogWarning(msg)
1691 result = self.rpc.call_node_start_master(master, False, False)
1692 msg = result.fail_msg
1694 self.LogWarning("Could not re-enable the master role on"
1695 " the master, please restart manually: %s", msg)
1698 def _RecursiveCheckIfLVMBased(disk):
1699 """Check if the given disk or its children are lvm-based.
1701 @type disk: L{objects.Disk}
1702 @param disk: the disk to check
1704 @return: boolean indicating whether a LD_LV dev_type was found or not
1708 for chdisk in disk.children:
1709 if _RecursiveCheckIfLVMBased(chdisk):
1711 return disk.dev_type == constants.LD_LV
1714 class LUSetClusterParams(LogicalUnit):
1715 """Change the parameters of the cluster.
1718 HPATH = "cluster-modify"
1719 HTYPE = constants.HTYPE_CLUSTER
1723 def CheckArguments(self):
1727 if not hasattr(self.op, "candidate_pool_size"):
1728 self.op.candidate_pool_size = None
1729 if self.op.candidate_pool_size is not None:
1731 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1732 except (ValueError, TypeError), err:
1733 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1735 if self.op.candidate_pool_size < 1:
1736 raise errors.OpPrereqError("At least one master candidate needed")
1738 def ExpandNames(self):
1739 # FIXME: in the future maybe other cluster params won't require checking on
1740 # all nodes to be modified.
1741 self.needed_locks = {
1742 locking.LEVEL_NODE: locking.ALL_SET,
1744 self.share_locks[locking.LEVEL_NODE] = 1
1746 def BuildHooksEnv(self):
1751 "OP_TARGET": self.cfg.GetClusterName(),
1752 "NEW_VG_NAME": self.op.vg_name,
1754 mn = self.cfg.GetMasterNode()
1755 return env, [mn], [mn]
1757 def CheckPrereq(self):
1758 """Check prerequisites.
1760 This checks whether the given params don't conflict and
1761 if the given volume group is valid.
1764 if self.op.vg_name is not None and not self.op.vg_name:
1765 instances = self.cfg.GetAllInstancesInfo().values()
1766 for inst in instances:
1767 for disk in inst.disks:
1768 if _RecursiveCheckIfLVMBased(disk):
1769 raise errors.OpPrereqError("Cannot disable lvm storage while"
1770 " lvm-based instances exist")
1772 node_list = self.acquired_locks[locking.LEVEL_NODE]
1774 # if vg_name not None, checks given volume group on all nodes
1776 vglist = self.rpc.call_vg_list(node_list)
1777 for node in node_list:
1778 msg = vglist[node].fail_msg
1780 # ignoring down node
1781 self.LogWarning("Error while gathering data on node %s"
1782 " (ignoring node): %s", node, msg)
1784 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1786 constants.MIN_VG_SIZE)
1788 raise errors.OpPrereqError("Error on node '%s': %s" %
1791 self.cluster = cluster = self.cfg.GetClusterInfo()
1792 # validate params changes
1793 if self.op.beparams:
1794 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1795 self.new_beparams = objects.FillDict(
1796 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1798 if self.op.nicparams:
1799 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1800 self.new_nicparams = objects.FillDict(
1801 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1802 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1804 # hypervisor list/parameters
1805 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1806 if self.op.hvparams:
1807 if not isinstance(self.op.hvparams, dict):
1808 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1809 for hv_name, hv_dict in self.op.hvparams.items():
1810 if hv_name not in self.new_hvparams:
1811 self.new_hvparams[hv_name] = hv_dict
1813 self.new_hvparams[hv_name].update(hv_dict)
1815 if self.op.enabled_hypervisors is not None:
1816 self.hv_list = self.op.enabled_hypervisors
1817 if not self.hv_list:
1818 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1819 " least one member")
1820 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1822 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1823 " entries: %s" % invalid_hvs)
1825 self.hv_list = cluster.enabled_hypervisors
1827 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1828 # either the enabled list has changed, or the parameters have, validate
1829 for hv_name, hv_params in self.new_hvparams.items():
1830 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1831 (self.op.enabled_hypervisors and
1832 hv_name in self.op.enabled_hypervisors)):
1833 # either this is a new hypervisor, or its parameters have changed
1834 hv_class = hypervisor.GetHypervisor(hv_name)
1835 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1836 hv_class.CheckParameterSyntax(hv_params)
1837 _CheckHVParams(self, node_list, hv_name, hv_params)
1839 def Exec(self, feedback_fn):
1840 """Change the parameters of the cluster.
1843 if self.op.vg_name is not None:
1844 new_volume = self.op.vg_name
1847 if new_volume != self.cfg.GetVGName():
1848 self.cfg.SetVGName(new_volume)
1850 feedback_fn("Cluster LVM configuration already in desired"
1851 " state, not changing")
1852 if self.op.hvparams:
1853 self.cluster.hvparams = self.new_hvparams
1854 if self.op.enabled_hypervisors is not None:
1855 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1856 if self.op.beparams:
1857 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1858 if self.op.nicparams:
1859 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1861 if self.op.candidate_pool_size is not None:
1862 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1863 # we need to update the pool size here, otherwise the save will fail
1864 _AdjustCandidatePool(self)
1866 self.cfg.Update(self.cluster)
1869 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1870 """Distribute additional files which are part of the cluster configuration.
1872 ConfigWriter takes care of distributing the config and ssconf files, but
1873 there are more files which should be distributed to all nodes. This function
1874 makes sure those are copied.
1876 @param lu: calling logical unit
1877 @param additional_nodes: list of nodes not in the config to distribute to
1880 # 1. Gather target nodes
1881 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1882 dist_nodes = lu.cfg.GetNodeList()
1883 if additional_nodes is not None:
1884 dist_nodes.extend(additional_nodes)
1885 if myself.name in dist_nodes:
1886 dist_nodes.remove(myself.name)
1887 # 2. Gather files to distribute
1888 dist_files = set([constants.ETC_HOSTS,
1889 constants.SSH_KNOWN_HOSTS_FILE,
1890 constants.RAPI_CERT_FILE,
1891 constants.RAPI_USERS_FILE,
1892 constants.HMAC_CLUSTER_KEY,
1895 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1896 for hv_name in enabled_hypervisors:
1897 hv_class = hypervisor.GetHypervisor(hv_name)
1898 dist_files.update(hv_class.GetAncillaryFiles())
1900 # 3. Perform the files upload
1901 for fname in dist_files:
1902 if os.path.exists(fname):
1903 result = lu.rpc.call_upload_file(dist_nodes, fname)
1904 for to_node, to_result in result.items():
1905 msg = to_result.fail_msg
1907 msg = ("Copy of file %s to node %s failed: %s" %
1908 (fname, to_node, msg))
1909 lu.proc.LogWarning(msg)
1912 class LURedistributeConfig(NoHooksLU):
1913 """Force the redistribution of cluster configuration.
1915 This is a very simple LU.
1921 def ExpandNames(self):
1922 self.needed_locks = {
1923 locking.LEVEL_NODE: locking.ALL_SET,
1925 self.share_locks[locking.LEVEL_NODE] = 1
1927 def CheckPrereq(self):
1928 """Check prerequisites.
1932 def Exec(self, feedback_fn):
1933 """Redistribute the configuration.
1936 self.cfg.Update(self.cfg.GetClusterInfo())
1937 _RedistributeAncillaryFiles(self)
1940 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1941 """Sleep and poll for an instance's disk to sync.
1944 if not instance.disks:
1948 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1950 node = instance.primary_node
1952 for dev in instance.disks:
1953 lu.cfg.SetDiskID(dev, node)
1956 degr_retries = 10 # in seconds, as we sleep 1 second each time
1960 cumul_degraded = False
1961 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1962 msg = rstats.fail_msg
1964 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1967 raise errors.RemoteError("Can't contact node %s for mirror data,"
1968 " aborting." % node)
1971 rstats = rstats.payload
1973 for i, mstat in enumerate(rstats):
1975 lu.LogWarning("Can't compute data for node %s/%s",
1976 node, instance.disks[i].iv_name)
1979 cumul_degraded = (cumul_degraded or
1980 (mstat.is_degraded and mstat.sync_percent is None))
1981 if mstat.sync_percent is not None:
1983 if mstat.estimated_time is not None:
1984 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
1985 max_time = mstat.estimated_time
1987 rem_time = "no time estimate"
1988 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1989 (instance.disks[i].iv_name, mstat.sync_percent, rem_time))
1991 # if we're done but degraded, let's do a few small retries, to
1992 # make sure we see a stable and not transient situation; therefore
1993 # we force restart of the loop
1994 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1995 logging.info("Degraded disks found, %d retries left", degr_retries)
2003 time.sleep(min(60, max_time))
2006 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2007 return not cumul_degraded
2010 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2011 """Check that mirrors are not degraded.
2013 The ldisk parameter, if True, will change the test from the
2014 is_degraded attribute (which represents overall non-ok status for
2015 the device(s)) to the ldisk (representing the local storage status).
2018 lu.cfg.SetDiskID(dev, node)
2022 if on_primary or dev.AssembleOnSecondary():
2023 rstats = lu.rpc.call_blockdev_find(node, dev)
2024 msg = rstats.fail_msg
2026 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2028 elif not rstats.payload:
2029 lu.LogWarning("Can't find disk on node %s", node)
2033 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2035 result = result and not rstats.payload.is_degraded
2038 for child in dev.children:
2039 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2044 class LUDiagnoseOS(NoHooksLU):
2045 """Logical unit for OS diagnose/query.
2048 _OP_REQP = ["output_fields", "names"]
2050 _FIELDS_STATIC = utils.FieldSet()
2051 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2053 def ExpandNames(self):
2055 raise errors.OpPrereqError("Selective OS query not supported")
2057 _CheckOutputFields(static=self._FIELDS_STATIC,
2058 dynamic=self._FIELDS_DYNAMIC,
2059 selected=self.op.output_fields)
2061 # Lock all nodes, in shared mode
2062 # Temporary removal of locks, should be reverted later
2063 # TODO: reintroduce locks when they are lighter-weight
2064 self.needed_locks = {}
2065 #self.share_locks[locking.LEVEL_NODE] = 1
2066 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2068 def CheckPrereq(self):
2069 """Check prerequisites.
2074 def _DiagnoseByOS(node_list, rlist):
2075 """Remaps a per-node return list into an a per-os per-node dictionary
2077 @param node_list: a list with the names of all nodes
2078 @param rlist: a map with node names as keys and OS objects as values
2081 @return: a dictionary with osnames as keys and as value another map, with
2082 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2084 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2085 (/srv/..., False, "invalid api")],
2086 "node2": [(/srv/..., True, "")]}
2091 # we build here the list of nodes that didn't fail the RPC (at RPC
2092 # level), so that nodes with a non-responding node daemon don't
2093 # make all OSes invalid
2094 good_nodes = [node_name for node_name in rlist
2095 if not rlist[node_name].fail_msg]
2096 for node_name, nr in rlist.items():
2097 if nr.fail_msg or not nr.payload:
2099 for name, path, status, diagnose in nr.payload:
2100 if name not in all_os:
2101 # build a list of nodes for this os containing empty lists
2102 # for each node in node_list
2104 for nname in good_nodes:
2105 all_os[name][nname] = []
2106 all_os[name][node_name].append((path, status, diagnose))
2109 def Exec(self, feedback_fn):
2110 """Compute the list of OSes.
2113 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2114 node_data = self.rpc.call_os_diagnose(valid_nodes)
2115 pol = self._DiagnoseByOS(valid_nodes, node_data)
2117 for os_name, os_data in pol.items():
2119 for field in self.op.output_fields:
2122 elif field == "valid":
2123 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2124 elif field == "node_status":
2125 # this is just a copy of the dict
2127 for node_name, nos_list in os_data.items():
2128 val[node_name] = nos_list
2130 raise errors.ParameterError(field)
2137 class LURemoveNode(LogicalUnit):
2138 """Logical unit for removing a node.
2141 HPATH = "node-remove"
2142 HTYPE = constants.HTYPE_NODE
2143 _OP_REQP = ["node_name"]
2145 def BuildHooksEnv(self):
2148 This doesn't run on the target node in the pre phase as a failed
2149 node would then be impossible to remove.
2153 "OP_TARGET": self.op.node_name,
2154 "NODE_NAME": self.op.node_name,
2156 all_nodes = self.cfg.GetNodeList()
2157 all_nodes.remove(self.op.node_name)
2158 return env, all_nodes, all_nodes
2160 def CheckPrereq(self):
2161 """Check prerequisites.
2164 - the node exists in the configuration
2165 - it does not have primary or secondary instances
2166 - it's not the master
2168 Any errors are signaled by raising errors.OpPrereqError.
2171 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2173 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2175 instance_list = self.cfg.GetInstanceList()
2177 masternode = self.cfg.GetMasterNode()
2178 if node.name == masternode:
2179 raise errors.OpPrereqError("Node is the master node,"
2180 " you need to failover first.")
2182 for instance_name in instance_list:
2183 instance = self.cfg.GetInstanceInfo(instance_name)
2184 if node.name in instance.all_nodes:
2185 raise errors.OpPrereqError("Instance %s is still running on the node,"
2186 " please remove first." % instance_name)
2187 self.op.node_name = node.name
2190 def Exec(self, feedback_fn):
2191 """Removes the node from the cluster.
2195 logging.info("Stopping the node daemon and removing configs from node %s",
2198 self.context.RemoveNode(node.name)
2200 result = self.rpc.call_node_leave_cluster(node.name)
2201 msg = result.fail_msg
2203 self.LogWarning("Errors encountered on the remote node while leaving"
2204 " the cluster: %s", msg)
2206 # Promote nodes to master candidate as needed
2207 _AdjustCandidatePool(self)
2210 class LUQueryNodes(NoHooksLU):
2211 """Logical unit for querying nodes.
2214 _OP_REQP = ["output_fields", "names", "use_locking"]
2216 _FIELDS_DYNAMIC = utils.FieldSet(
2218 "mtotal", "mnode", "mfree",
2220 "ctotal", "cnodes", "csockets",
2223 _FIELDS_STATIC = utils.FieldSet(
2224 "name", "pinst_cnt", "sinst_cnt",
2225 "pinst_list", "sinst_list",
2226 "pip", "sip", "tags",
2235 def ExpandNames(self):
2236 _CheckOutputFields(static=self._FIELDS_STATIC,
2237 dynamic=self._FIELDS_DYNAMIC,
2238 selected=self.op.output_fields)
2240 self.needed_locks = {}
2241 self.share_locks[locking.LEVEL_NODE] = 1
2244 self.wanted = _GetWantedNodes(self, self.op.names)
2246 self.wanted = locking.ALL_SET
2248 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2249 self.do_locking = self.do_node_query and self.op.use_locking
2251 # if we don't request only static fields, we need to lock the nodes
2252 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2255 def CheckPrereq(self):
2256 """Check prerequisites.
2259 # The validation of the node list is done in the _GetWantedNodes,
2260 # if non empty, and if empty, there's no validation to do
2263 def Exec(self, feedback_fn):
2264 """Computes the list of nodes and their attributes.
2267 all_info = self.cfg.GetAllNodesInfo()
2269 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2270 elif self.wanted != locking.ALL_SET:
2271 nodenames = self.wanted
2272 missing = set(nodenames).difference(all_info.keys())
2274 raise errors.OpExecError(
2275 "Some nodes were removed before retrieving their data: %s" % missing)
2277 nodenames = all_info.keys()
2279 nodenames = utils.NiceSort(nodenames)
2280 nodelist = [all_info[name] for name in nodenames]
2282 # begin data gathering
2284 if self.do_node_query:
2286 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2287 self.cfg.GetHypervisorType())
2288 for name in nodenames:
2289 nodeinfo = node_data[name]
2290 if not nodeinfo.fail_msg and nodeinfo.payload:
2291 nodeinfo = nodeinfo.payload
2292 fn = utils.TryConvert
2294 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2295 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2296 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2297 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2298 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2299 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2300 "bootid": nodeinfo.get('bootid', None),
2301 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2302 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2305 live_data[name] = {}
2307 live_data = dict.fromkeys(nodenames, {})
2309 node_to_primary = dict([(name, set()) for name in nodenames])
2310 node_to_secondary = dict([(name, set()) for name in nodenames])
2312 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2313 "sinst_cnt", "sinst_list"))
2314 if inst_fields & frozenset(self.op.output_fields):
2315 instancelist = self.cfg.GetInstanceList()
2317 for instance_name in instancelist:
2318 inst = self.cfg.GetInstanceInfo(instance_name)
2319 if inst.primary_node in node_to_primary:
2320 node_to_primary[inst.primary_node].add(inst.name)
2321 for secnode in inst.secondary_nodes:
2322 if secnode in node_to_secondary:
2323 node_to_secondary[secnode].add(inst.name)
2325 master_node = self.cfg.GetMasterNode()
2327 # end data gathering
2330 for node in nodelist:
2332 for field in self.op.output_fields:
2335 elif field == "pinst_list":
2336 val = list(node_to_primary[node.name])
2337 elif field == "sinst_list":
2338 val = list(node_to_secondary[node.name])
2339 elif field == "pinst_cnt":
2340 val = len(node_to_primary[node.name])
2341 elif field == "sinst_cnt":
2342 val = len(node_to_secondary[node.name])
2343 elif field == "pip":
2344 val = node.primary_ip
2345 elif field == "sip":
2346 val = node.secondary_ip
2347 elif field == "tags":
2348 val = list(node.GetTags())
2349 elif field == "serial_no":
2350 val = node.serial_no
2351 elif field == "master_candidate":
2352 val = node.master_candidate
2353 elif field == "master":
2354 val = node.name == master_node
2355 elif field == "offline":
2357 elif field == "drained":
2359 elif self._FIELDS_DYNAMIC.Matches(field):
2360 val = live_data[node.name].get(field, None)
2361 elif field == "role":
2362 if node.name == master_node:
2364 elif node.master_candidate:
2373 raise errors.ParameterError(field)
2374 node_output.append(val)
2375 output.append(node_output)
2380 class LUQueryNodeVolumes(NoHooksLU):
2381 """Logical unit for getting volumes on node(s).
2384 _OP_REQP = ["nodes", "output_fields"]
2386 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2387 _FIELDS_STATIC = utils.FieldSet("node")
2389 def ExpandNames(self):
2390 _CheckOutputFields(static=self._FIELDS_STATIC,
2391 dynamic=self._FIELDS_DYNAMIC,
2392 selected=self.op.output_fields)
2394 self.needed_locks = {}
2395 self.share_locks[locking.LEVEL_NODE] = 1
2396 if not self.op.nodes:
2397 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2399 self.needed_locks[locking.LEVEL_NODE] = \
2400 _GetWantedNodes(self, self.op.nodes)
2402 def CheckPrereq(self):
2403 """Check prerequisites.
2405 This checks that the fields required are valid output fields.
2408 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2410 def Exec(self, feedback_fn):
2411 """Computes the list of nodes and their attributes.
2414 nodenames = self.nodes
2415 volumes = self.rpc.call_node_volumes(nodenames)
2417 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2418 in self.cfg.GetInstanceList()]
2420 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2423 for node in nodenames:
2424 nresult = volumes[node]
2427 msg = nresult.fail_msg
2429 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2432 node_vols = nresult.payload[:]
2433 node_vols.sort(key=lambda vol: vol['dev'])
2435 for vol in node_vols:
2437 for field in self.op.output_fields:
2440 elif field == "phys":
2444 elif field == "name":
2446 elif field == "size":
2447 val = int(float(vol['size']))
2448 elif field == "instance":
2450 if node not in lv_by_node[inst]:
2452 if vol['name'] in lv_by_node[inst][node]:
2458 raise errors.ParameterError(field)
2459 node_output.append(str(val))
2461 output.append(node_output)
2466 class LUQueryNodeStorage(NoHooksLU):
2467 """Logical unit for getting information on storage units on node(s).
2470 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2472 _FIELDS_STATIC = utils.FieldSet("node")
2474 def ExpandNames(self):
2475 storage_type = self.op.storage_type
2477 if storage_type not in constants.VALID_STORAGE_FIELDS:
2478 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2480 dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2482 _CheckOutputFields(static=self._FIELDS_STATIC,
2483 dynamic=utils.FieldSet(*dynamic_fields),
2484 selected=self.op.output_fields)
2486 self.needed_locks = {}
2487 self.share_locks[locking.LEVEL_NODE] = 1
2490 self.needed_locks[locking.LEVEL_NODE] = \
2491 _GetWantedNodes(self, self.op.nodes)
2493 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2495 def CheckPrereq(self):
2496 """Check prerequisites.
2498 This checks that the fields required are valid output fields.
2501 self.op.name = getattr(self.op, "name", None)
2503 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2505 def Exec(self, feedback_fn):
2506 """Computes the list of nodes and their attributes.
2509 # Always get name to sort by
2510 if constants.SF_NAME in self.op.output_fields:
2511 fields = self.op.output_fields[:]
2513 fields = [constants.SF_NAME] + self.op.output_fields
2515 # Never ask for node as it's only known to the LU
2516 while "node" in fields:
2517 fields.remove("node")
2519 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2520 name_idx = field_idx[constants.SF_NAME]
2522 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2523 data = self.rpc.call_storage_list(self.nodes,
2524 self.op.storage_type, st_args,
2525 self.op.name, fields)
2529 for node in utils.NiceSort(self.nodes):
2530 nresult = data[node]
2534 msg = nresult.fail_msg
2536 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2539 rows = dict([(row[name_idx], row) for row in nresult.payload])
2541 for name in utils.NiceSort(rows.keys()):
2546 for field in self.op.output_fields:
2549 elif field in field_idx:
2550 val = row[field_idx[field]]
2552 raise errors.ParameterError(field)
2561 class LUModifyNodeStorage(NoHooksLU):
2562 """Logical unit for modifying a storage volume on a node.
2565 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2568 def CheckArguments(self):
2569 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2570 if node_name is None:
2571 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2573 self.op.node_name = node_name
2575 storage_type = self.op.storage_type
2576 if storage_type not in constants.VALID_STORAGE_FIELDS:
2577 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2579 def ExpandNames(self):
2580 self.needed_locks = {
2581 locking.LEVEL_NODE: self.op.node_name,
2584 def CheckPrereq(self):
2585 """Check prerequisites.
2588 storage_type = self.op.storage_type
2591 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2593 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2594 " modified" % storage_type)
2596 diff = set(self.op.changes.keys()) - modifiable
2598 raise errors.OpPrereqError("The following fields can not be modified for"
2599 " storage units of type '%s': %r" %
2600 (storage_type, list(diff)))
2602 def Exec(self, feedback_fn):
2603 """Computes the list of nodes and their attributes.
2606 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2607 result = self.rpc.call_storage_modify(self.op.node_name,
2608 self.op.storage_type, st_args,
2609 self.op.name, self.op.changes)
2610 result.Raise("Failed to modify storage unit '%s' on %s" %
2611 (self.op.name, self.op.node_name))
2614 class LUAddNode(LogicalUnit):
2615 """Logical unit for adding node to the cluster.
2619 HTYPE = constants.HTYPE_NODE
2620 _OP_REQP = ["node_name"]
2622 def BuildHooksEnv(self):
2625 This will run on all nodes before, and on all nodes + the new node after.
2629 "OP_TARGET": self.op.node_name,
2630 "NODE_NAME": self.op.node_name,
2631 "NODE_PIP": self.op.primary_ip,
2632 "NODE_SIP": self.op.secondary_ip,
2634 nodes_0 = self.cfg.GetNodeList()
2635 nodes_1 = nodes_0 + [self.op.node_name, ]
2636 return env, nodes_0, nodes_1
2638 def CheckPrereq(self):
2639 """Check prerequisites.
2642 - the new node is not already in the config
2644 - its parameters (single/dual homed) matches the cluster
2646 Any errors are signaled by raising errors.OpPrereqError.
2649 node_name = self.op.node_name
2652 dns_data = utils.HostInfo(node_name)
2654 node = dns_data.name
2655 primary_ip = self.op.primary_ip = dns_data.ip
2656 secondary_ip = getattr(self.op, "secondary_ip", None)
2657 if secondary_ip is None:
2658 secondary_ip = primary_ip
2659 if not utils.IsValidIP(secondary_ip):
2660 raise errors.OpPrereqError("Invalid secondary IP given")
2661 self.op.secondary_ip = secondary_ip
2663 node_list = cfg.GetNodeList()
2664 if not self.op.readd and node in node_list:
2665 raise errors.OpPrereqError("Node %s is already in the configuration" %
2667 elif self.op.readd and node not in node_list:
2668 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2670 for existing_node_name in node_list:
2671 existing_node = cfg.GetNodeInfo(existing_node_name)
2673 if self.op.readd and node == existing_node_name:
2674 if (existing_node.primary_ip != primary_ip or
2675 existing_node.secondary_ip != secondary_ip):
2676 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2677 " address configuration as before")
2680 if (existing_node.primary_ip == primary_ip or
2681 existing_node.secondary_ip == primary_ip or
2682 existing_node.primary_ip == secondary_ip or
2683 existing_node.secondary_ip == secondary_ip):
2684 raise errors.OpPrereqError("New node ip address(es) conflict with"
2685 " existing node %s" % existing_node.name)
2687 # check that the type of the node (single versus dual homed) is the
2688 # same as for the master
2689 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2690 master_singlehomed = myself.secondary_ip == myself.primary_ip
2691 newbie_singlehomed = secondary_ip == primary_ip
2692 if master_singlehomed != newbie_singlehomed:
2693 if master_singlehomed:
2694 raise errors.OpPrereqError("The master has no private ip but the"
2695 " new node has one")
2697 raise errors.OpPrereqError("The master has a private ip but the"
2698 " new node doesn't have one")
2700 # checks reachability
2701 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2702 raise errors.OpPrereqError("Node not reachable by ping")
2704 if not newbie_singlehomed:
2705 # check reachability from my secondary ip to newbie's secondary ip
2706 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2707 source=myself.secondary_ip):
2708 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2709 " based ping to noded port")
2711 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2716 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2717 # the new node will increase mc_max with one, so:
2718 mc_max = min(mc_max + 1, cp_size)
2719 self.master_candidate = mc_now < mc_max
2722 self.new_node = self.cfg.GetNodeInfo(node)
2723 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2725 self.new_node = objects.Node(name=node,
2726 primary_ip=primary_ip,
2727 secondary_ip=secondary_ip,
2728 master_candidate=self.master_candidate,
2729 offline=False, drained=False)
2731 def Exec(self, feedback_fn):
2732 """Adds the new node to the cluster.
2735 new_node = self.new_node
2736 node = new_node.name
2738 # for re-adds, reset the offline/drained/master-candidate flags;
2739 # we need to reset here, otherwise offline would prevent RPC calls
2740 # later in the procedure; this also means that if the re-add
2741 # fails, we are left with a non-offlined, broken node
2743 new_node.drained = new_node.offline = False
2744 self.LogInfo("Readding a node, the offline/drained flags were reset")
2745 # if we demote the node, we do cleanup later in the procedure
2746 new_node.master_candidate = self.master_candidate
2748 # notify the user about any possible mc promotion
2749 if new_node.master_candidate:
2750 self.LogInfo("Node will be a master candidate")
2752 # check connectivity
2753 result = self.rpc.call_version([node])[node]
2754 result.Raise("Can't get version information from node %s" % node)
2755 if constants.PROTOCOL_VERSION == result.payload:
2756 logging.info("Communication to node %s fine, sw version %s match",
2757 node, result.payload)
2759 raise errors.OpExecError("Version mismatch master version %s,"
2760 " node version %s" %
2761 (constants.PROTOCOL_VERSION, result.payload))
2764 logging.info("Copy ssh key to node %s", node)
2765 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2767 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2768 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2774 keyarray.append(f.read())
2778 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2780 keyarray[3], keyarray[4], keyarray[5])
2781 result.Raise("Cannot transfer ssh keys to the new node")
2783 # Add node to our /etc/hosts, and add key to known_hosts
2784 if self.cfg.GetClusterInfo().modify_etc_hosts:
2785 utils.AddHostToEtcHosts(new_node.name)
2787 if new_node.secondary_ip != new_node.primary_ip:
2788 result = self.rpc.call_node_has_ip_address(new_node.name,
2789 new_node.secondary_ip)
2790 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2792 if not result.payload:
2793 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2794 " you gave (%s). Please fix and re-run this"
2795 " command." % new_node.secondary_ip)
2797 node_verify_list = [self.cfg.GetMasterNode()]
2798 node_verify_param = {
2800 # TODO: do a node-net-test as well?
2803 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2804 self.cfg.GetClusterName())
2805 for verifier in node_verify_list:
2806 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2807 nl_payload = result[verifier].payload['nodelist']
2809 for failed in nl_payload:
2810 feedback_fn("ssh/hostname verification failed %s -> %s" %
2811 (verifier, nl_payload[failed]))
2812 raise errors.OpExecError("ssh/hostname verification failed.")
2815 _RedistributeAncillaryFiles(self)
2816 self.context.ReaddNode(new_node)
2817 # make sure we redistribute the config
2818 self.cfg.Update(new_node)
2819 # and make sure the new node will not have old files around
2820 if not new_node.master_candidate:
2821 result = self.rpc.call_node_demote_from_mc(new_node.name)
2822 msg = result.RemoteFailMsg()
2824 self.LogWarning("Node failed to demote itself from master"
2825 " candidate status: %s" % msg)
2827 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2828 self.context.AddNode(new_node)
2831 class LUSetNodeParams(LogicalUnit):
2832 """Modifies the parameters of a node.
2835 HPATH = "node-modify"
2836 HTYPE = constants.HTYPE_NODE
2837 _OP_REQP = ["node_name"]
2840 def CheckArguments(self):
2841 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2842 if node_name is None:
2843 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2844 self.op.node_name = node_name
2845 _CheckBooleanOpField(self.op, 'master_candidate')
2846 _CheckBooleanOpField(self.op, 'offline')
2847 _CheckBooleanOpField(self.op, 'drained')
2848 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2849 if all_mods.count(None) == 3:
2850 raise errors.OpPrereqError("Please pass at least one modification")
2851 if all_mods.count(True) > 1:
2852 raise errors.OpPrereqError("Can't set the node into more than one"
2853 " state at the same time")
2855 def ExpandNames(self):
2856 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2858 def BuildHooksEnv(self):
2861 This runs on the master node.
2865 "OP_TARGET": self.op.node_name,
2866 "MASTER_CANDIDATE": str(self.op.master_candidate),
2867 "OFFLINE": str(self.op.offline),
2868 "DRAINED": str(self.op.drained),
2870 nl = [self.cfg.GetMasterNode(),
2874 def CheckPrereq(self):
2875 """Check prerequisites.
2877 This only checks the instance list against the existing names.
2880 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2882 if ((self.op.master_candidate == False or self.op.offline == True or
2883 self.op.drained == True) and node.master_candidate):
2884 # we will demote the node from master_candidate
2885 if self.op.node_name == self.cfg.GetMasterNode():
2886 raise errors.OpPrereqError("The master node has to be a"
2887 " master candidate, online and not drained")
2888 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2889 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2890 if num_candidates <= cp_size:
2891 msg = ("Not enough master candidates (desired"
2892 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2894 self.LogWarning(msg)
2896 raise errors.OpPrereqError(msg)
2898 if (self.op.master_candidate == True and
2899 ((node.offline and not self.op.offline == False) or
2900 (node.drained and not self.op.drained == False))):
2901 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2902 " to master_candidate" % node.name)
2906 def Exec(self, feedback_fn):
2915 if self.op.offline is not None:
2916 node.offline = self.op.offline
2917 result.append(("offline", str(self.op.offline)))
2918 if self.op.offline == True:
2919 if node.master_candidate:
2920 node.master_candidate = False
2922 result.append(("master_candidate", "auto-demotion due to offline"))
2924 node.drained = False
2925 result.append(("drained", "clear drained status due to offline"))
2927 if self.op.master_candidate is not None:
2928 node.master_candidate = self.op.master_candidate
2930 result.append(("master_candidate", str(self.op.master_candidate)))
2931 if self.op.master_candidate == False:
2932 rrc = self.rpc.call_node_demote_from_mc(node.name)
2935 self.LogWarning("Node failed to demote itself: %s" % msg)
2937 if self.op.drained is not None:
2938 node.drained = self.op.drained
2939 result.append(("drained", str(self.op.drained)))
2940 if self.op.drained == True:
2941 if node.master_candidate:
2942 node.master_candidate = False
2944 result.append(("master_candidate", "auto-demotion due to drain"))
2945 rrc = self.rpc.call_node_demote_from_mc(node.name)
2946 msg = rrc.RemoteFailMsg()
2948 self.LogWarning("Node failed to demote itself: %s" % msg)
2950 node.offline = False
2951 result.append(("offline", "clear offline status due to drain"))
2953 # this will trigger configuration file update, if needed
2954 self.cfg.Update(node)
2955 # this will trigger job queue propagation or cleanup
2957 self.context.ReaddNode(node)
2962 class LUPowercycleNode(NoHooksLU):
2963 """Powercycles a node.
2966 _OP_REQP = ["node_name", "force"]
2969 def CheckArguments(self):
2970 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2971 if node_name is None:
2972 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2973 self.op.node_name = node_name
2974 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2975 raise errors.OpPrereqError("The node is the master and the force"
2976 " parameter was not set")
2978 def ExpandNames(self):
2979 """Locking for PowercycleNode.
2981 This is a last-resort option and shouldn't block on other
2982 jobs. Therefore, we grab no locks.
2985 self.needed_locks = {}
2987 def CheckPrereq(self):
2988 """Check prerequisites.
2990 This LU has no prereqs.
2995 def Exec(self, feedback_fn):
2999 result = self.rpc.call_node_powercycle(self.op.node_name,
3000 self.cfg.GetHypervisorType())
3001 result.Raise("Failed to schedule the reboot")
3002 return result.payload
3005 class LUQueryClusterInfo(NoHooksLU):
3006 """Query cluster configuration.
3012 def ExpandNames(self):
3013 self.needed_locks = {}
3015 def CheckPrereq(self):
3016 """No prerequsites needed for this LU.
3021 def Exec(self, feedback_fn):
3022 """Return cluster config.
3025 cluster = self.cfg.GetClusterInfo()
3027 "software_version": constants.RELEASE_VERSION,
3028 "protocol_version": constants.PROTOCOL_VERSION,
3029 "config_version": constants.CONFIG_VERSION,
3030 "os_api_version": max(constants.OS_API_VERSIONS),
3031 "export_version": constants.EXPORT_VERSION,
3032 "architecture": (platform.architecture()[0], platform.machine()),
3033 "name": cluster.cluster_name,
3034 "master": cluster.master_node,
3035 "default_hypervisor": cluster.enabled_hypervisors[0],
3036 "enabled_hypervisors": cluster.enabled_hypervisors,
3037 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3038 for hypervisor_name in cluster.enabled_hypervisors]),
3039 "beparams": cluster.beparams,
3040 "nicparams": cluster.nicparams,
3041 "candidate_pool_size": cluster.candidate_pool_size,
3042 "master_netdev": cluster.master_netdev,
3043 "volume_group_name": cluster.volume_group_name,
3044 "file_storage_dir": cluster.file_storage_dir,
3050 class LUQueryConfigValues(NoHooksLU):
3051 """Return configuration values.
3056 _FIELDS_DYNAMIC = utils.FieldSet()
3057 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
3059 def ExpandNames(self):
3060 self.needed_locks = {}
3062 _CheckOutputFields(static=self._FIELDS_STATIC,
3063 dynamic=self._FIELDS_DYNAMIC,
3064 selected=self.op.output_fields)
3066 def CheckPrereq(self):
3067 """No prerequisites.
3072 def Exec(self, feedback_fn):
3073 """Dump a representation of the cluster config to the standard output.
3077 for field in self.op.output_fields:
3078 if field == "cluster_name":
3079 entry = self.cfg.GetClusterName()
3080 elif field == "master_node":
3081 entry = self.cfg.GetMasterNode()
3082 elif field == "drain_flag":
3083 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3085 raise errors.ParameterError(field)
3086 values.append(entry)
3090 class LUActivateInstanceDisks(NoHooksLU):
3091 """Bring up an instance's disks.
3094 _OP_REQP = ["instance_name"]
3097 def ExpandNames(self):
3098 self._ExpandAndLockInstance()
3099 self.needed_locks[locking.LEVEL_NODE] = []
3100 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3102 def DeclareLocks(self, level):
3103 if level == locking.LEVEL_NODE:
3104 self._LockInstancesNodes()
3106 def CheckPrereq(self):
3107 """Check prerequisites.
3109 This checks that the instance is in the cluster.
3112 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3113 assert self.instance is not None, \
3114 "Cannot retrieve locked instance %s" % self.op.instance_name
3115 _CheckNodeOnline(self, self.instance.primary_node)
3116 if not hasattr(self.op, "ignore_size"):
3117 self.op.ignore_size = False
3119 def Exec(self, feedback_fn):
3120 """Activate the disks.
3123 disks_ok, disks_info = \
3124 _AssembleInstanceDisks(self, self.instance,
3125 ignore_size=self.op.ignore_size)
3127 raise errors.OpExecError("Cannot activate block devices")
3132 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3134 """Prepare the block devices for an instance.
3136 This sets up the block devices on all nodes.
3138 @type lu: L{LogicalUnit}
3139 @param lu: the logical unit on whose behalf we execute
3140 @type instance: L{objects.Instance}
3141 @param instance: the instance for whose disks we assemble
3142 @type ignore_secondaries: boolean
3143 @param ignore_secondaries: if true, errors on secondary nodes
3144 won't result in an error return from the function
3145 @type ignore_size: boolean
3146 @param ignore_size: if true, the current known size of the disk
3147 will not be used during the disk activation, useful for cases
3148 when the size is wrong
3149 @return: False if the operation failed, otherwise a list of
3150 (host, instance_visible_name, node_visible_name)
3151 with the mapping from node devices to instance devices
3156 iname = instance.name
3157 # With the two passes mechanism we try to reduce the window of
3158 # opportunity for the race condition of switching DRBD to primary
3159 # before handshaking occured, but we do not eliminate it
3161 # The proper fix would be to wait (with some limits) until the
3162 # connection has been made and drbd transitions from WFConnection
3163 # into any other network-connected state (Connected, SyncTarget,
3166 # 1st pass, assemble on all nodes in secondary mode
3167 for inst_disk in instance.disks:
3168 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3170 node_disk = node_disk.Copy()
3171 node_disk.UnsetSize()
3172 lu.cfg.SetDiskID(node_disk, node)
3173 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3174 msg = result.fail_msg
3176 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3177 " (is_primary=False, pass=1): %s",
3178 inst_disk.iv_name, node, msg)
3179 if not ignore_secondaries:
3182 # FIXME: race condition on drbd migration to primary
3184 # 2nd pass, do only the primary node
3185 for inst_disk in instance.disks:
3186 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3187 if node != instance.primary_node:
3190 node_disk = node_disk.Copy()
3191 node_disk.UnsetSize()
3192 lu.cfg.SetDiskID(node_disk, node)
3193 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3194 msg = result.fail_msg
3196 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3197 " (is_primary=True, pass=2): %s",
3198 inst_disk.iv_name, node, msg)
3200 device_info.append((instance.primary_node, inst_disk.iv_name,
3203 # leave the disks configured for the primary node
3204 # this is a workaround that would be fixed better by
3205 # improving the logical/physical id handling
3206 for disk in instance.disks:
3207 lu.cfg.SetDiskID(disk, instance.primary_node)
3209 return disks_ok, device_info
3212 def _StartInstanceDisks(lu, instance, force):
3213 """Start the disks of an instance.
3216 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3217 ignore_secondaries=force)
3219 _ShutdownInstanceDisks(lu, instance)
3220 if force is not None and not force:
3221 lu.proc.LogWarning("", hint="If the message above refers to a"
3223 " you can retry the operation using '--force'.")
3224 raise errors.OpExecError("Disk consistency error")
3227 class LUDeactivateInstanceDisks(NoHooksLU):
3228 """Shutdown an instance's disks.
3231 _OP_REQP = ["instance_name"]
3234 def ExpandNames(self):
3235 self._ExpandAndLockInstance()
3236 self.needed_locks[locking.LEVEL_NODE] = []
3237 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3239 def DeclareLocks(self, level):
3240 if level == locking.LEVEL_NODE:
3241 self._LockInstancesNodes()
3243 def CheckPrereq(self):
3244 """Check prerequisites.
3246 This checks that the instance is in the cluster.
3249 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3250 assert self.instance is not None, \
3251 "Cannot retrieve locked instance %s" % self.op.instance_name
3253 def Exec(self, feedback_fn):
3254 """Deactivate the disks
3257 instance = self.instance
3258 _SafeShutdownInstanceDisks(self, instance)
3261 def _SafeShutdownInstanceDisks(lu, instance):
3262 """Shutdown block devices of an instance.
3264 This function checks if an instance is running, before calling
3265 _ShutdownInstanceDisks.
3268 pnode = instance.primary_node
3269 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3270 ins_l.Raise("Can't contact node %s" % pnode)
3272 if instance.name in ins_l.payload:
3273 raise errors.OpExecError("Instance is running, can't shutdown"
3276 _ShutdownInstanceDisks(lu, instance)
3279 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3280 """Shutdown block devices of an instance.
3282 This does the shutdown on all nodes of the instance.
3284 If the ignore_primary is false, errors on the primary node are
3289 for disk in instance.disks:
3290 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3291 lu.cfg.SetDiskID(top_disk, node)
3292 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3293 msg = result.fail_msg
3295 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3296 disk.iv_name, node, msg)
3297 if not ignore_primary or node != instance.primary_node:
3302 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3303 """Checks if a node has enough free memory.
3305 This function check if a given node has the needed amount of free
3306 memory. In case the node has less memory or we cannot get the
3307 information from the node, this function raise an OpPrereqError
3310 @type lu: C{LogicalUnit}
3311 @param lu: a logical unit from which we get configuration data
3313 @param node: the node to check
3314 @type reason: C{str}
3315 @param reason: string to use in the error message
3316 @type requested: C{int}
3317 @param requested: the amount of memory in MiB to check for
3318 @type hypervisor_name: C{str}
3319 @param hypervisor_name: the hypervisor to ask for memory stats
3320 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3321 we cannot check the node
3324 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3325 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3326 free_mem = nodeinfo[node].payload.get('memory_free', None)
3327 if not isinstance(free_mem, int):
3328 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3329 " was '%s'" % (node, free_mem))
3330 if requested > free_mem:
3331 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3332 " needed %s MiB, available %s MiB" %
3333 (node, reason, requested, free_mem))
3336 class LUStartupInstance(LogicalUnit):
3337 """Starts an instance.
3340 HPATH = "instance-start"
3341 HTYPE = constants.HTYPE_INSTANCE
3342 _OP_REQP = ["instance_name", "force"]
3345 def ExpandNames(self):
3346 self._ExpandAndLockInstance()
3348 def BuildHooksEnv(self):
3351 This runs on master, primary and secondary nodes of the instance.
3355 "FORCE": self.op.force,
3357 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3358 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3361 def CheckPrereq(self):
3362 """Check prerequisites.
3364 This checks that the instance is in the cluster.
3367 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3368 assert self.instance is not None, \
3369 "Cannot retrieve locked instance %s" % self.op.instance_name
3372 self.beparams = getattr(self.op, "beparams", {})
3374 if not isinstance(self.beparams, dict):
3375 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3376 " dict" % (type(self.beparams), ))
3377 # fill the beparams dict
3378 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3379 self.op.beparams = self.beparams
3382 self.hvparams = getattr(self.op, "hvparams", {})
3384 if not isinstance(self.hvparams, dict):
3385 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3386 " dict" % (type(self.hvparams), ))
3388 # check hypervisor parameter syntax (locally)
3389 cluster = self.cfg.GetClusterInfo()
3390 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3391 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3393 filled_hvp.update(self.hvparams)
3394 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3395 hv_type.CheckParameterSyntax(filled_hvp)
3396 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3397 self.op.hvparams = self.hvparams
3399 _CheckNodeOnline(self, instance.primary_node)
3401 bep = self.cfg.GetClusterInfo().FillBE(instance)
3402 # check bridges existence
3403 _CheckInstanceBridgesExist(self, instance)
3405 remote_info = self.rpc.call_instance_info(instance.primary_node,
3407 instance.hypervisor)
3408 remote_info.Raise("Error checking node %s" % instance.primary_node,
3410 if not remote_info.payload: # not running already
3411 _CheckNodeFreeMemory(self, instance.primary_node,
3412 "starting instance %s" % instance.name,
3413 bep[constants.BE_MEMORY], instance.hypervisor)
3415 def Exec(self, feedback_fn):
3416 """Start the instance.
3419 instance = self.instance
3420 force = self.op.force
3422 self.cfg.MarkInstanceUp(instance.name)
3424 node_current = instance.primary_node
3426 _StartInstanceDisks(self, instance, force)
3428 result = self.rpc.call_instance_start(node_current, instance,
3429 self.hvparams, self.beparams)
3430 msg = result.fail_msg
3432 _ShutdownInstanceDisks(self, instance)
3433 raise errors.OpExecError("Could not start instance: %s" % msg)
3436 class LURebootInstance(LogicalUnit):
3437 """Reboot an instance.
3440 HPATH = "instance-reboot"
3441 HTYPE = constants.HTYPE_INSTANCE
3442 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3445 def ExpandNames(self):
3446 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3447 constants.INSTANCE_REBOOT_HARD,
3448 constants.INSTANCE_REBOOT_FULL]:
3449 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3450 (constants.INSTANCE_REBOOT_SOFT,
3451 constants.INSTANCE_REBOOT_HARD,
3452 constants.INSTANCE_REBOOT_FULL))
3453 self._ExpandAndLockInstance()
3455 def BuildHooksEnv(self):
3458 This runs on master, primary and secondary nodes of the instance.
3462 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3463 "REBOOT_TYPE": self.op.reboot_type,
3465 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3466 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3469 def CheckPrereq(self):
3470 """Check prerequisites.
3472 This checks that the instance is in the cluster.
3475 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3476 assert self.instance is not None, \
3477 "Cannot retrieve locked instance %s" % self.op.instance_name
3479 _CheckNodeOnline(self, instance.primary_node)
3481 # check bridges existence
3482 _CheckInstanceBridgesExist(self, instance)
3484 def Exec(self, feedback_fn):
3485 """Reboot the instance.
3488 instance = self.instance
3489 ignore_secondaries = self.op.ignore_secondaries
3490 reboot_type = self.op.reboot_type
3492 node_current = instance.primary_node
3494 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3495 constants.INSTANCE_REBOOT_HARD]:
3496 for disk in instance.disks:
3497 self.cfg.SetDiskID(disk, node_current)
3498 result = self.rpc.call_instance_reboot(node_current, instance,
3500 result.Raise("Could not reboot instance")
3502 result = self.rpc.call_instance_shutdown(node_current, instance)
3503 result.Raise("Could not shutdown instance for full reboot")
3504 _ShutdownInstanceDisks(self, instance)
3505 _StartInstanceDisks(self, instance, ignore_secondaries)
3506 result = self.rpc.call_instance_start(node_current, instance, None, None)
3507 msg = result.fail_msg
3509 _ShutdownInstanceDisks(self, instance)
3510 raise errors.OpExecError("Could not start instance for"
3511 " full reboot: %s" % msg)
3513 self.cfg.MarkInstanceUp(instance.name)
3516 class LUShutdownInstance(LogicalUnit):
3517 """Shutdown an instance.
3520 HPATH = "instance-stop"
3521 HTYPE = constants.HTYPE_INSTANCE
3522 _OP_REQP = ["instance_name"]
3525 def ExpandNames(self):
3526 self._ExpandAndLockInstance()
3528 def BuildHooksEnv(self):
3531 This runs on master, primary and secondary nodes of the instance.
3534 env = _BuildInstanceHookEnvByObject(self, self.instance)
3535 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3538 def CheckPrereq(self):
3539 """Check prerequisites.
3541 This checks that the instance is in the cluster.
3544 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3545 assert self.instance is not None, \
3546 "Cannot retrieve locked instance %s" % self.op.instance_name
3547 _CheckNodeOnline(self, self.instance.primary_node)
3549 def Exec(self, feedback_fn):
3550 """Shutdown the instance.
3553 instance = self.instance
3554 node_current = instance.primary_node
3555 self.cfg.MarkInstanceDown(instance.name)
3556 result = self.rpc.call_instance_shutdown(node_current, instance)
3557 msg = result.fail_msg
3559 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3561 _ShutdownInstanceDisks(self, instance)
3564 class LUReinstallInstance(LogicalUnit):
3565 """Reinstall an instance.
3568 HPATH = "instance-reinstall"
3569 HTYPE = constants.HTYPE_INSTANCE
3570 _OP_REQP = ["instance_name"]
3573 def ExpandNames(self):
3574 self._ExpandAndLockInstance()
3576 def BuildHooksEnv(self):
3579 This runs on master, primary and secondary nodes of the instance.
3582 env = _BuildInstanceHookEnvByObject(self, self.instance)
3583 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3586 def CheckPrereq(self):
3587 """Check prerequisites.
3589 This checks that the instance is in the cluster and is not running.
3592 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3593 assert instance is not None, \
3594 "Cannot retrieve locked instance %s" % self.op.instance_name
3595 _CheckNodeOnline(self, instance.primary_node)
3597 if instance.disk_template == constants.DT_DISKLESS:
3598 raise errors.OpPrereqError("Instance '%s' has no disks" %
3599 self.op.instance_name)
3600 if instance.admin_up:
3601 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3602 self.op.instance_name)
3603 remote_info = self.rpc.call_instance_info(instance.primary_node,
3605 instance.hypervisor)
3606 remote_info.Raise("Error checking node %s" % instance.primary_node,
3608 if remote_info.payload:
3609 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3610 (self.op.instance_name,
3611 instance.primary_node))
3613 self.op.os_type = getattr(self.op, "os_type", None)
3614 if self.op.os_type is not None:
3616 pnode = self.cfg.GetNodeInfo(
3617 self.cfg.ExpandNodeName(instance.primary_node))
3619 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3621 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3622 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3623 (self.op.os_type, pnode.name), prereq=True)
3625 self.instance = instance
3627 def Exec(self, feedback_fn):
3628 """Reinstall the instance.
3631 inst = self.instance
3633 if self.op.os_type is not None:
3634 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3635 inst.os = self.op.os_type
3636 self.cfg.Update(inst)
3638 _StartInstanceDisks(self, inst, None)
3640 feedback_fn("Running the instance OS create scripts...")
3641 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3642 result.Raise("Could not install OS for instance %s on node %s" %
3643 (inst.name, inst.primary_node))
3645 _ShutdownInstanceDisks(self, inst)
3648 class LURecreateInstanceDisks(LogicalUnit):
3649 """Recreate an instance's missing disks.
3652 HPATH = "instance-recreate-disks"
3653 HTYPE = constants.HTYPE_INSTANCE
3654 _OP_REQP = ["instance_name", "disks"]
3657 def CheckArguments(self):
3658 """Check the arguments.
3661 if not isinstance(self.op.disks, list):
3662 raise errors.OpPrereqError("Invalid disks parameter")
3663 for item in self.op.disks:
3664 if (not isinstance(item, int) or
3666 raise errors.OpPrereqError("Invalid disk specification '%s'" %
3669 def ExpandNames(self):
3670 self._ExpandAndLockInstance()
3672 def BuildHooksEnv(self):
3675 This runs on master, primary and secondary nodes of the instance.
3678 env = _BuildInstanceHookEnvByObject(self, self.instance)
3679 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3682 def CheckPrereq(self):
3683 """Check prerequisites.
3685 This checks that the instance is in the cluster and is not running.
3688 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3689 assert instance is not None, \
3690 "Cannot retrieve locked instance %s" % self.op.instance_name
3691 _CheckNodeOnline(self, instance.primary_node)
3693 if instance.disk_template == constants.DT_DISKLESS:
3694 raise errors.OpPrereqError("Instance '%s' has no disks" %
3695 self.op.instance_name)
3696 if instance.admin_up:
3697 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3698 self.op.instance_name)
3699 remote_info = self.rpc.call_instance_info(instance.primary_node,
3701 instance.hypervisor)
3702 remote_info.Raise("Error checking node %s" % instance.primary_node,
3704 if remote_info.payload:
3705 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3706 (self.op.instance_name,
3707 instance.primary_node))
3709 if not self.op.disks:
3710 self.op.disks = range(len(instance.disks))
3712 for idx in self.op.disks:
3713 if idx >= len(instance.disks):
3714 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3716 self.instance = instance
3718 def Exec(self, feedback_fn):
3719 """Recreate the disks.
3723 for idx, disk in enumerate(self.instance.disks):
3724 if idx not in self.op.disks: # disk idx has not been passed in
3728 _CreateDisks(self, self.instance, to_skip=to_skip)
3731 class LURenameInstance(LogicalUnit):
3732 """Rename an instance.
3735 HPATH = "instance-rename"
3736 HTYPE = constants.HTYPE_INSTANCE
3737 _OP_REQP = ["instance_name", "new_name"]
3739 def BuildHooksEnv(self):
3742 This runs on master, primary and secondary nodes of the instance.
3745 env = _BuildInstanceHookEnvByObject(self, self.instance)
3746 env["INSTANCE_NEW_NAME"] = self.op.new_name
3747 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3750 def CheckPrereq(self):
3751 """Check prerequisites.
3753 This checks that the instance is in the cluster and is not running.
3756 instance = self.cfg.GetInstanceInfo(
3757 self.cfg.ExpandInstanceName(self.op.instance_name))
3758 if instance is None:
3759 raise errors.OpPrereqError("Instance '%s' not known" %
3760 self.op.instance_name)
3761 _CheckNodeOnline(self, instance.primary_node)
3763 if instance.admin_up:
3764 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3765 self.op.instance_name)
3766 remote_info = self.rpc.call_instance_info(instance.primary_node,
3768 instance.hypervisor)
3769 remote_info.Raise("Error checking node %s" % instance.primary_node,
3771 if remote_info.payload:
3772 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3773 (self.op.instance_name,
3774 instance.primary_node))
3775 self.instance = instance
3777 # new name verification
3778 name_info = utils.HostInfo(self.op.new_name)
3780 self.op.new_name = new_name = name_info.name
3781 instance_list = self.cfg.GetInstanceList()
3782 if new_name in instance_list:
3783 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3786 if not getattr(self.op, "ignore_ip", False):
3787 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3788 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3789 (name_info.ip, new_name))
3792 def Exec(self, feedback_fn):
3793 """Reinstall the instance.
3796 inst = self.instance
3797 old_name = inst.name
3799 if inst.disk_template == constants.DT_FILE:
3800 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3802 self.cfg.RenameInstance(inst.name, self.op.new_name)
3803 # Change the instance lock. This is definitely safe while we hold the BGL
3804 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3805 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3807 # re-read the instance from the configuration after rename
3808 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3810 if inst.disk_template == constants.DT_FILE:
3811 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3812 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3813 old_file_storage_dir,
3814 new_file_storage_dir)
3815 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3816 " (but the instance has been renamed in Ganeti)" %
3817 (inst.primary_node, old_file_storage_dir,
3818 new_file_storage_dir))
3820 _StartInstanceDisks(self, inst, None)
3822 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3824 msg = result.fail_msg
3826 msg = ("Could not run OS rename script for instance %s on node %s"
3827 " (but the instance has been renamed in Ganeti): %s" %
3828 (inst.name, inst.primary_node, msg))
3829 self.proc.LogWarning(msg)
3831 _ShutdownInstanceDisks(self, inst)
3834 class LURemoveInstance(LogicalUnit):
3835 """Remove an instance.
3838 HPATH = "instance-remove"
3839 HTYPE = constants.HTYPE_INSTANCE
3840 _OP_REQP = ["instance_name", "ignore_failures"]
3843 def ExpandNames(self):
3844 self._ExpandAndLockInstance()
3845 self.needed_locks[locking.LEVEL_NODE] = []
3846 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3848 def DeclareLocks(self, level):
3849 if level == locking.LEVEL_NODE:
3850 self._LockInstancesNodes()
3852 def BuildHooksEnv(self):
3855 This runs on master, primary and secondary nodes of the instance.
3858 env = _BuildInstanceHookEnvByObject(self, self.instance)
3859 nl = [self.cfg.GetMasterNode()]
3862 def CheckPrereq(self):
3863 """Check prerequisites.
3865 This checks that the instance is in the cluster.
3868 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3869 assert self.instance is not None, \
3870 "Cannot retrieve locked instance %s" % self.op.instance_name
3872 def Exec(self, feedback_fn):
3873 """Remove the instance.
3876 instance = self.instance
3877 logging.info("Shutting down instance %s on node %s",
3878 instance.name, instance.primary_node)
3880 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3881 msg = result.fail_msg
3883 if self.op.ignore_failures:
3884 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3886 raise errors.OpExecError("Could not shutdown instance %s on"
3888 (instance.name, instance.primary_node, msg))
3890 logging.info("Removing block devices for instance %s", instance.name)
3892 if not _RemoveDisks(self, instance):
3893 if self.op.ignore_failures:
3894 feedback_fn("Warning: can't remove instance's disks")
3896 raise errors.OpExecError("Can't remove instance's disks")
3898 logging.info("Removing instance %s out of cluster config", instance.name)
3900 self.cfg.RemoveInstance(instance.name)
3901 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3904 class LUQueryInstances(NoHooksLU):
3905 """Logical unit for querying instances.
3908 _OP_REQP = ["output_fields", "names", "use_locking"]
3910 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3912 "disk_template", "ip", "mac", "bridge",
3913 "nic_mode", "nic_link",
3914 "sda_size", "sdb_size", "vcpus", "tags",
3915 "network_port", "beparams",
3916 r"(disk)\.(size)/([0-9]+)",
3917 r"(disk)\.(sizes)", "disk_usage",
3918 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3919 r"(nic)\.(bridge)/([0-9]+)",
3920 r"(nic)\.(macs|ips|modes|links|bridges)",
3921 r"(disk|nic)\.(count)",
3922 "serial_no", "hypervisor", "hvparams",] +
3924 for name in constants.HVS_PARAMETERS] +
3926 for name in constants.BES_PARAMETERS])
3927 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3930 def ExpandNames(self):
3931 _CheckOutputFields(static=self._FIELDS_STATIC,
3932 dynamic=self._FIELDS_DYNAMIC,
3933 selected=self.op.output_fields)
3935 self.needed_locks = {}
3936 self.share_locks[locking.LEVEL_INSTANCE] = 1
3937 self.share_locks[locking.LEVEL_NODE] = 1
3940 self.wanted = _GetWantedInstances(self, self.op.names)
3942 self.wanted = locking.ALL_SET
3944 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3945 self.do_locking = self.do_node_query and self.op.use_locking
3947 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3948 self.needed_locks[locking.LEVEL_NODE] = []
3949 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3951 def DeclareLocks(self, level):
3952 if level == locking.LEVEL_NODE and self.do_locking:
3953 self._LockInstancesNodes()
3955 def CheckPrereq(self):
3956 """Check prerequisites.
3961 def Exec(self, feedback_fn):
3962 """Computes the list of nodes and their attributes.
3965 all_info = self.cfg.GetAllInstancesInfo()
3966 if self.wanted == locking.ALL_SET:
3967 # caller didn't specify instance names, so ordering is not important
3969 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3971 instance_names = all_info.keys()
3972 instance_names = utils.NiceSort(instance_names)
3974 # caller did specify names, so we must keep the ordering
3976 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3978 tgt_set = all_info.keys()
3979 missing = set(self.wanted).difference(tgt_set)
3981 raise errors.OpExecError("Some instances were removed before"
3982 " retrieving their data: %s" % missing)
3983 instance_names = self.wanted
3985 instance_list = [all_info[iname] for iname in instance_names]
3987 # begin data gathering
3989 nodes = frozenset([inst.primary_node for inst in instance_list])
3990 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3994 if self.do_node_query:
3996 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3998 result = node_data[name]
4000 # offline nodes will be in both lists
4001 off_nodes.append(name)
4002 if result.failed or result.fail_msg:
4003 bad_nodes.append(name)
4006 live_data.update(result.payload)
4007 # else no instance is alive
4009 live_data = dict([(name, {}) for name in instance_names])
4011 # end data gathering
4016 cluster = self.cfg.GetClusterInfo()
4017 for instance in instance_list:
4019 i_hv = cluster.FillHV(instance)
4020 i_be = cluster.FillBE(instance)
4021 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4022 nic.nicparams) for nic in instance.nics]
4023 for field in self.op.output_fields:
4024 st_match = self._FIELDS_STATIC.Matches(field)
4029 elif field == "pnode":
4030 val = instance.primary_node
4031 elif field == "snodes":
4032 val = list(instance.secondary_nodes)
4033 elif field == "admin_state":
4034 val = instance.admin_up
4035 elif field == "oper_state":
4036 if instance.primary_node in bad_nodes:
4039 val = bool(live_data.get(instance.name))
4040 elif field == "status":
4041 if instance.primary_node in off_nodes:
4042 val = "ERROR_nodeoffline"
4043 elif instance.primary_node in bad_nodes:
4044 val = "ERROR_nodedown"
4046 running = bool(live_data.get(instance.name))
4048 if instance.admin_up:
4053 if instance.admin_up:
4057 elif field == "oper_ram":
4058 if instance.primary_node in bad_nodes:
4060 elif instance.name in live_data:
4061 val = live_data[instance.name].get("memory", "?")
4064 elif field == "vcpus":
4065 val = i_be[constants.BE_VCPUS]
4066 elif field == "disk_template":
4067 val = instance.disk_template
4070 val = instance.nics[0].ip
4073 elif field == "nic_mode":
4075 val = i_nicp[0][constants.NIC_MODE]
4078 elif field == "nic_link":
4080 val = i_nicp[0][constants.NIC_LINK]
4083 elif field == "bridge":
4084 if (instance.nics and
4085 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4086 val = i_nicp[0][constants.NIC_LINK]
4089 elif field == "mac":
4091 val = instance.nics[0].mac
4094 elif field == "sda_size" or field == "sdb_size":
4095 idx = ord(field[2]) - ord('a')
4097 val = instance.FindDisk(idx).size
4098 except errors.OpPrereqError:
4100 elif field == "disk_usage": # total disk usage per node
4101 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4102 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4103 elif field == "tags":
4104 val = list(instance.GetTags())
4105 elif field == "serial_no":
4106 val = instance.serial_no
4107 elif field == "network_port":
4108 val = instance.network_port
4109 elif field == "hypervisor":
4110 val = instance.hypervisor
4111 elif field == "hvparams":
4113 elif (field.startswith(HVPREFIX) and
4114 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
4115 val = i_hv.get(field[len(HVPREFIX):], None)
4116 elif field == "beparams":
4118 elif (field.startswith(BEPREFIX) and
4119 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4120 val = i_be.get(field[len(BEPREFIX):], None)
4121 elif st_match and st_match.groups():
4122 # matches a variable list
4123 st_groups = st_match.groups()
4124 if st_groups and st_groups[0] == "disk":
4125 if st_groups[1] == "count":
4126 val = len(instance.disks)
4127 elif st_groups[1] == "sizes":
4128 val = [disk.size for disk in instance.disks]
4129 elif st_groups[1] == "size":
4131 val = instance.FindDisk(st_groups[2]).size
4132 except errors.OpPrereqError:
4135 assert False, "Unhandled disk parameter"
4136 elif st_groups[0] == "nic":
4137 if st_groups[1] == "count":
4138 val = len(instance.nics)
4139 elif st_groups[1] == "macs":
4140 val = [nic.mac for nic in instance.nics]
4141 elif st_groups[1] == "ips":
4142 val = [nic.ip for nic in instance.nics]
4143 elif st_groups[1] == "modes":
4144 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4145 elif st_groups[1] == "links":
4146 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4147 elif st_groups[1] == "bridges":
4150 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4151 val.append(nicp[constants.NIC_LINK])
4156 nic_idx = int(st_groups[2])
4157 if nic_idx >= len(instance.nics):
4160 if st_groups[1] == "mac":
4161 val = instance.nics[nic_idx].mac
4162 elif st_groups[1] == "ip":
4163 val = instance.nics[nic_idx].ip
4164 elif st_groups[1] == "mode":
4165 val = i_nicp[nic_idx][constants.NIC_MODE]
4166 elif st_groups[1] == "link":
4167 val = i_nicp[nic_idx][constants.NIC_LINK]
4168 elif st_groups[1] == "bridge":
4169 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4170 if nic_mode == constants.NIC_MODE_BRIDGED:
4171 val = i_nicp[nic_idx][constants.NIC_LINK]
4175 assert False, "Unhandled NIC parameter"
4177 assert False, ("Declared but unhandled variable parameter '%s'" %
4180 assert False, "Declared but unhandled parameter '%s'" % field
4187 class LUFailoverInstance(LogicalUnit):
4188 """Failover an instance.
4191 HPATH = "instance-failover"
4192 HTYPE = constants.HTYPE_INSTANCE
4193 _OP_REQP = ["instance_name", "ignore_consistency"]
4196 def ExpandNames(self):
4197 self._ExpandAndLockInstance()
4198 self.needed_locks[locking.LEVEL_NODE] = []
4199 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4201 def DeclareLocks(self, level):
4202 if level == locking.LEVEL_NODE:
4203 self._LockInstancesNodes()
4205 def BuildHooksEnv(self):
4208 This runs on master, primary and secondary nodes of the instance.
4212 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4214 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4215 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4218 def CheckPrereq(self):
4219 """Check prerequisites.
4221 This checks that the instance is in the cluster.
4224 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4225 assert self.instance is not None, \
4226 "Cannot retrieve locked instance %s" % self.op.instance_name
4228 bep = self.cfg.GetClusterInfo().FillBE(instance)
4229 if instance.disk_template not in constants.DTS_NET_MIRROR:
4230 raise errors.OpPrereqError("Instance's disk layout is not"
4231 " network mirrored, cannot failover.")
4233 secondary_nodes = instance.secondary_nodes
4234 if not secondary_nodes:
4235 raise errors.ProgrammerError("no secondary node but using "
4236 "a mirrored disk template")
4238 target_node = secondary_nodes[0]
4239 _CheckNodeOnline(self, target_node)
4240 _CheckNodeNotDrained(self, target_node)
4241 if instance.admin_up:
4242 # check memory requirements on the secondary node
4243 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4244 instance.name, bep[constants.BE_MEMORY],
4245 instance.hypervisor)
4247 self.LogInfo("Not checking memory on the secondary node as"
4248 " instance will not be started")
4250 # check bridge existance
4251 _CheckInstanceBridgesExist(self, instance, node=target_node)
4253 def Exec(self, feedback_fn):
4254 """Failover an instance.
4256 The failover is done by shutting it down on its present node and
4257 starting it on the secondary.
4260 instance = self.instance
4262 source_node = instance.primary_node
4263 target_node = instance.secondary_nodes[0]
4265 feedback_fn("* checking disk consistency between source and target")
4266 for dev in instance.disks:
4267 # for drbd, these are drbd over lvm
4268 if not _CheckDiskConsistency(self, dev, target_node, False):
4269 if instance.admin_up and not self.op.ignore_consistency:
4270 raise errors.OpExecError("Disk %s is degraded on target node,"
4271 " aborting failover." % dev.iv_name)
4273 feedback_fn("* shutting down instance on source node")
4274 logging.info("Shutting down instance %s on node %s",
4275 instance.name, source_node)
4277 result = self.rpc.call_instance_shutdown(source_node, instance)
4278 msg = result.fail_msg
4280 if self.op.ignore_consistency:
4281 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4282 " Proceeding anyway. Please make sure node"
4283 " %s is down. Error details: %s",
4284 instance.name, source_node, source_node, msg)
4286 raise errors.OpExecError("Could not shutdown instance %s on"
4288 (instance.name, source_node, msg))
4290 feedback_fn("* deactivating the instance's disks on source node")
4291 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4292 raise errors.OpExecError("Can't shut down the instance's disks.")
4294 instance.primary_node = target_node
4295 # distribute new instance config to the other nodes
4296 self.cfg.Update(instance)
4298 # Only start the instance if it's marked as up
4299 if instance.admin_up:
4300 feedback_fn("* activating the instance's disks on target node")
4301 logging.info("Starting instance %s on node %s",
4302 instance.name, target_node)
4304 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4305 ignore_secondaries=True)
4307 _ShutdownInstanceDisks(self, instance)
4308 raise errors.OpExecError("Can't activate the instance's disks")
4310 feedback_fn("* starting the instance on the target node")
4311 result = self.rpc.call_instance_start(target_node, instance, None, None)
4312 msg = result.fail_msg
4314 _ShutdownInstanceDisks(self, instance)
4315 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4316 (instance.name, target_node, msg))
4319 class LUMigrateInstance(LogicalUnit):
4320 """Migrate an instance.
4322 This is migration without shutting down, compared to the failover,
4323 which is done with shutdown.
4326 HPATH = "instance-migrate"
4327 HTYPE = constants.HTYPE_INSTANCE
4328 _OP_REQP = ["instance_name", "live", "cleanup"]
4332 def ExpandNames(self):
4333 self._ExpandAndLockInstance()
4335 self.needed_locks[locking.LEVEL_NODE] = []
4336 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4338 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4339 self.op.live, self.op.cleanup)
4340 self.tasklets = [self._migrater]
4342 def DeclareLocks(self, level):
4343 if level == locking.LEVEL_NODE:
4344 self._LockInstancesNodes()
4346 def BuildHooksEnv(self):
4349 This runs on master, primary and secondary nodes of the instance.
4352 instance = self._migrater.instance
4353 env = _BuildInstanceHookEnvByObject(self, instance)
4354 env["MIGRATE_LIVE"] = self.op.live
4355 env["MIGRATE_CLEANUP"] = self.op.cleanup
4356 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4360 class LUMigrateNode(LogicalUnit):
4361 """Migrate all instances from a node.
4364 HPATH = "node-migrate"
4365 HTYPE = constants.HTYPE_NODE
4366 _OP_REQP = ["node_name", "live"]
4369 def ExpandNames(self):
4370 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4371 if self.op.node_name is None:
4372 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4374 self.needed_locks = {
4375 locking.LEVEL_NODE: [self.op.node_name],
4378 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4380 # Create tasklets for migrating instances for all instances on this node
4384 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4385 logging.debug("Migrating instance %s", inst.name)
4386 names.append(inst.name)
4388 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4390 self.tasklets = tasklets
4392 # Declare instance locks
4393 self.needed_locks[locking.LEVEL_INSTANCE] = names
4395 def DeclareLocks(self, level):
4396 if level == locking.LEVEL_NODE:
4397 self._LockInstancesNodes()
4399 def BuildHooksEnv(self):
4402 This runs on the master, the primary and all the secondaries.
4406 "NODE_NAME": self.op.node_name,
4409 nl = [self.cfg.GetMasterNode()]
4411 return (env, nl, nl)
4414 class TLMigrateInstance(Tasklet):
4415 def __init__(self, lu, instance_name, live, cleanup):
4416 """Initializes this class.
4419 Tasklet.__init__(self, lu)
4422 self.instance_name = instance_name
4424 self.cleanup = cleanup
4426 def CheckPrereq(self):
4427 """Check prerequisites.
4429 This checks that the instance is in the cluster.
4432 instance = self.cfg.GetInstanceInfo(
4433 self.cfg.ExpandInstanceName(self.instance_name))
4434 if instance is None:
4435 raise errors.OpPrereqError("Instance '%s' not known" %
4438 if instance.disk_template != constants.DT_DRBD8:
4439 raise errors.OpPrereqError("Instance's disk layout is not"
4440 " drbd8, cannot migrate.")
4442 secondary_nodes = instance.secondary_nodes
4443 if not secondary_nodes:
4444 raise errors.ConfigurationError("No secondary node but using"
4445 " drbd8 disk template")
4447 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4449 target_node = secondary_nodes[0]
4450 # check memory requirements on the secondary node
4451 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4452 instance.name, i_be[constants.BE_MEMORY],
4453 instance.hypervisor)
4455 # check bridge existance
4456 _CheckInstanceBridgesExist(self, instance, node=target_node)
4458 if not self.cleanup:
4459 _CheckNodeNotDrained(self, target_node)
4460 result = self.rpc.call_instance_migratable(instance.primary_node,
4462 result.Raise("Can't migrate, please use failover", prereq=True)
4464 self.instance = instance
4466 def _WaitUntilSync(self):
4467 """Poll with custom rpc for disk sync.
4469 This uses our own step-based rpc call.
4472 self.feedback_fn("* wait until resync is done")
4476 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4478 self.instance.disks)
4480 for node, nres in result.items():
4481 nres.Raise("Cannot resync disks on node %s" % node)
4482 node_done, node_percent = nres.payload
4483 all_done = all_done and node_done
4484 if node_percent is not None:
4485 min_percent = min(min_percent, node_percent)
4487 if min_percent < 100:
4488 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4491 def _EnsureSecondary(self, node):
4492 """Demote a node to secondary.
4495 self.feedback_fn("* switching node %s to secondary mode" % node)
4497 for dev in self.instance.disks:
4498 self.cfg.SetDiskID(dev, node)
4500 result = self.rpc.call_blockdev_close(node, self.instance.name,
4501 self.instance.disks)
4502 result.Raise("Cannot change disk to secondary on node %s" % node)
4504 def _GoStandalone(self):
4505 """Disconnect from the network.
4508 self.feedback_fn("* changing into standalone mode")
4509 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4510 self.instance.disks)
4511 for node, nres in result.items():
4512 nres.Raise("Cannot disconnect disks node %s" % node)
4514 def _GoReconnect(self, multimaster):
4515 """Reconnect to the network.
4521 msg = "single-master"
4522 self.feedback_fn("* changing disks into %s mode" % msg)
4523 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4524 self.instance.disks,
4525 self.instance.name, multimaster)
4526 for node, nres in result.items():
4527 nres.Raise("Cannot change disks config on node %s" % node)
4529 def _ExecCleanup(self):
4530 """Try to cleanup after a failed migration.
4532 The cleanup is done by:
4533 - check that the instance is running only on one node
4534 (and update the config if needed)
4535 - change disks on its secondary node to secondary
4536 - wait until disks are fully synchronized
4537 - disconnect from the network
4538 - change disks into single-master mode
4539 - wait again until disks are fully synchronized
4542 instance = self.instance
4543 target_node = self.target_node
4544 source_node = self.source_node
4546 # check running on only one node
4547 self.feedback_fn("* checking where the instance actually runs"
4548 " (if this hangs, the hypervisor might be in"
4550 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4551 for node, result in ins_l.items():
4552 result.Raise("Can't contact node %s" % node)
4554 runningon_source = instance.name in ins_l[source_node].payload
4555 runningon_target = instance.name in ins_l[target_node].payload
4557 if runningon_source and runningon_target:
4558 raise errors.OpExecError("Instance seems to be running on two nodes,"
4559 " or the hypervisor is confused. You will have"
4560 " to ensure manually that it runs only on one"
4561 " and restart this operation.")
4563 if not (runningon_source or runningon_target):
4564 raise errors.OpExecError("Instance does not seem to be running at all."
4565 " In this case, it's safer to repair by"
4566 " running 'gnt-instance stop' to ensure disk"
4567 " shutdown, and then restarting it.")
4569 if runningon_target:
4570 # the migration has actually succeeded, we need to update the config
4571 self.feedback_fn("* instance running on secondary node (%s),"
4572 " updating config" % target_node)
4573 instance.primary_node = target_node
4574 self.cfg.Update(instance)
4575 demoted_node = source_node
4577 self.feedback_fn("* instance confirmed to be running on its"
4578 " primary node (%s)" % source_node)
4579 demoted_node = target_node
4581 self._EnsureSecondary(demoted_node)
4583 self._WaitUntilSync()
4584 except errors.OpExecError:
4585 # we ignore here errors, since if the device is standalone, it
4586 # won't be able to sync
4588 self._GoStandalone()
4589 self._GoReconnect(False)
4590 self._WaitUntilSync()
4592 self.feedback_fn("* done")
4594 def _RevertDiskStatus(self):
4595 """Try to revert the disk status after a failed migration.
4598 target_node = self.target_node
4600 self._EnsureSecondary(target_node)
4601 self._GoStandalone()
4602 self._GoReconnect(False)
4603 self._WaitUntilSync()
4604 except errors.OpExecError, err:
4605 self.lu.LogWarning("Migration failed and I can't reconnect the"
4606 " drives: error '%s'\n"
4607 "Please look and recover the instance status" %
4610 def _AbortMigration(self):
4611 """Call the hypervisor code to abort a started migration.
4614 instance = self.instance
4615 target_node = self.target_node
4616 migration_info = self.migration_info
4618 abort_result = self.rpc.call_finalize_migration(target_node,
4622 abort_msg = abort_result.fail_msg
4624 logging.error("Aborting migration failed on target node %s: %s" %
4625 (target_node, abort_msg))
4626 # Don't raise an exception here, as we stil have to try to revert the
4627 # disk status, even if this step failed.
4629 def _ExecMigration(self):
4630 """Migrate an instance.
4632 The migrate is done by:
4633 - change the disks into dual-master mode
4634 - wait until disks are fully synchronized again
4635 - migrate the instance
4636 - change disks on the new secondary node (the old primary) to secondary
4637 - wait until disks are fully synchronized
4638 - change disks into single-master mode
4641 instance = self.instance
4642 target_node = self.target_node
4643 source_node = self.source_node
4645 self.feedback_fn("* checking disk consistency between source and target")
4646 for dev in instance.disks:
4647 if not _CheckDiskConsistency(self, dev, target_node, False):
4648 raise errors.OpExecError("Disk %s is degraded or not fully"
4649 " synchronized on target node,"
4650 " aborting migrate." % dev.iv_name)
4652 # First get the migration information from the remote node
4653 result = self.rpc.call_migration_info(source_node, instance)
4654 msg = result.fail_msg
4656 log_err = ("Failed fetching source migration information from %s: %s" %
4658 logging.error(log_err)
4659 raise errors.OpExecError(log_err)
4661 self.migration_info = migration_info = result.payload
4663 # Then switch the disks to master/master mode
4664 self._EnsureSecondary(target_node)
4665 self._GoStandalone()
4666 self._GoReconnect(True)
4667 self._WaitUntilSync()
4669 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4670 result = self.rpc.call_accept_instance(target_node,
4673 self.nodes_ip[target_node])
4675 msg = result.fail_msg
4677 logging.error("Instance pre-migration failed, trying to revert"
4678 " disk status: %s", msg)
4679 self._AbortMigration()
4680 self._RevertDiskStatus()
4681 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4682 (instance.name, msg))
4684 self.feedback_fn("* migrating instance to %s" % target_node)
4686 result = self.rpc.call_instance_migrate(source_node, instance,
4687 self.nodes_ip[target_node],
4689 msg = result.fail_msg
4691 logging.error("Instance migration failed, trying to revert"
4692 " disk status: %s", msg)
4693 self._AbortMigration()
4694 self._RevertDiskStatus()
4695 raise errors.OpExecError("Could not migrate instance %s: %s" %
4696 (instance.name, msg))
4699 instance.primary_node = target_node
4700 # distribute new instance config to the other nodes
4701 self.cfg.Update(instance)
4703 result = self.rpc.call_finalize_migration(target_node,
4707 msg = result.fail_msg
4709 logging.error("Instance migration succeeded, but finalization failed:"
4711 raise errors.OpExecError("Could not finalize instance migration: %s" %
4714 self._EnsureSecondary(source_node)
4715 self._WaitUntilSync()
4716 self._GoStandalone()
4717 self._GoReconnect(False)
4718 self._WaitUntilSync()
4720 self.feedback_fn("* done")
4722 def Exec(self, feedback_fn):
4723 """Perform the migration.
4726 feedback_fn("Migrating instance %s" % self.instance.name)
4728 self.feedback_fn = feedback_fn
4730 self.source_node = self.instance.primary_node
4731 self.target_node = self.instance.secondary_nodes[0]
4732 self.all_nodes = [self.source_node, self.target_node]
4734 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4735 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4739 return self._ExecCleanup()
4741 return self._ExecMigration()
4744 def _CreateBlockDev(lu, node, instance, device, force_create,
4746 """Create a tree of block devices on a given node.
4748 If this device type has to be created on secondaries, create it and
4751 If not, just recurse to children keeping the same 'force' value.
4753 @param lu: the lu on whose behalf we execute
4754 @param node: the node on which to create the device
4755 @type instance: L{objects.Instance}
4756 @param instance: the instance which owns the device
4757 @type device: L{objects.Disk}
4758 @param device: the device to create
4759 @type force_create: boolean
4760 @param force_create: whether to force creation of this device; this
4761 will be change to True whenever we find a device which has
4762 CreateOnSecondary() attribute
4763 @param info: the extra 'metadata' we should attach to the device
4764 (this will be represented as a LVM tag)
4765 @type force_open: boolean
4766 @param force_open: this parameter will be passes to the
4767 L{backend.BlockdevCreate} function where it specifies
4768 whether we run on primary or not, and it affects both
4769 the child assembly and the device own Open() execution
4772 if device.CreateOnSecondary():
4776 for child in device.children:
4777 _CreateBlockDev(lu, node, instance, child, force_create,
4780 if not force_create:
4783 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4786 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4787 """Create a single block device on a given node.
4789 This will not recurse over children of the device, so they must be
4792 @param lu: the lu on whose behalf we execute
4793 @param node: the node on which to create the device
4794 @type instance: L{objects.Instance}
4795 @param instance: the instance which owns the device
4796 @type device: L{objects.Disk}
4797 @param device: the device to create
4798 @param info: the extra 'metadata' we should attach to the device
4799 (this will be represented as a LVM tag)
4800 @type force_open: boolean
4801 @param force_open: this parameter will be passes to the
4802 L{backend.BlockdevCreate} function where it specifies
4803 whether we run on primary or not, and it affects both
4804 the child assembly and the device own Open() execution
4807 lu.cfg.SetDiskID(device, node)
4808 result = lu.rpc.call_blockdev_create(node, device, device.size,
4809 instance.name, force_open, info)
4810 result.Raise("Can't create block device %s on"
4811 " node %s for instance %s" % (device, node, instance.name))
4812 if device.physical_id is None:
4813 device.physical_id = result.payload
4816 def _GenerateUniqueNames(lu, exts):
4817 """Generate a suitable LV name.
4819 This will generate a logical volume name for the given instance.
4824 new_id = lu.cfg.GenerateUniqueID()
4825 results.append("%s%s" % (new_id, val))
4829 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4831 """Generate a drbd8 device complete with its children.
4834 port = lu.cfg.AllocatePort()
4835 vgname = lu.cfg.GetVGName()
4836 shared_secret = lu.cfg.GenerateDRBDSecret()
4837 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4838 logical_id=(vgname, names[0]))
4839 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4840 logical_id=(vgname, names[1]))
4841 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4842 logical_id=(primary, secondary, port,
4845 children=[dev_data, dev_meta],
4850 def _GenerateDiskTemplate(lu, template_name,
4851 instance_name, primary_node,
4852 secondary_nodes, disk_info,
4853 file_storage_dir, file_driver,
4855 """Generate the entire disk layout for a given template type.
4858 #TODO: compute space requirements
4860 vgname = lu.cfg.GetVGName()
4861 disk_count = len(disk_info)
4863 if template_name == constants.DT_DISKLESS:
4865 elif template_name == constants.DT_PLAIN:
4866 if len(secondary_nodes) != 0:
4867 raise errors.ProgrammerError("Wrong template configuration")
4869 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4870 for i in range(disk_count)])
4871 for idx, disk in enumerate(disk_info):
4872 disk_index = idx + base_index
4873 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4874 logical_id=(vgname, names[idx]),
4875 iv_name="disk/%d" % disk_index,
4877 disks.append(disk_dev)
4878 elif template_name == constants.DT_DRBD8:
4879 if len(secondary_nodes) != 1:
4880 raise errors.ProgrammerError("Wrong template configuration")
4881 remote_node = secondary_nodes[0]
4882 minors = lu.cfg.AllocateDRBDMinor(
4883 [primary_node, remote_node] * len(disk_info), instance_name)
4886 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4887 for i in range(disk_count)]):
4888 names.append(lv_prefix + "_data")
4889 names.append(lv_prefix + "_meta")
4890 for idx, disk in enumerate(disk_info):
4891 disk_index = idx + base_index
4892 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4893 disk["size"], names[idx*2:idx*2+2],
4894 "disk/%d" % disk_index,
4895 minors[idx*2], minors[idx*2+1])
4896 disk_dev.mode = disk["mode"]
4897 disks.append(disk_dev)
4898 elif template_name == constants.DT_FILE:
4899 if len(secondary_nodes) != 0:
4900 raise errors.ProgrammerError("Wrong template configuration")
4902 for idx, disk in enumerate(disk_info):
4903 disk_index = idx + base_index
4904 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4905 iv_name="disk/%d" % disk_index,
4906 logical_id=(file_driver,
4907 "%s/disk%d" % (file_storage_dir,
4910 disks.append(disk_dev)
4912 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4916 def _GetInstanceInfoText(instance):
4917 """Compute that text that should be added to the disk's metadata.
4920 return "originstname+%s" % instance.name
4923 def _CreateDisks(lu, instance, to_skip=None):
4924 """Create all disks for an instance.
4926 This abstracts away some work from AddInstance.
4928 @type lu: L{LogicalUnit}
4929 @param lu: the logical unit on whose behalf we execute
4930 @type instance: L{objects.Instance}
4931 @param instance: the instance whose disks we should create
4933 @param to_skip: list of indices to skip
4935 @return: the success of the creation
4938 info = _GetInstanceInfoText(instance)
4939 pnode = instance.primary_node
4941 if instance.disk_template == constants.DT_FILE:
4942 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4943 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4945 result.Raise("Failed to create directory '%s' on"
4946 " node %s: %s" % (file_storage_dir, pnode))
4948 # Note: this needs to be kept in sync with adding of disks in
4949 # LUSetInstanceParams
4950 for idx, device in enumerate(instance.disks):
4951 if to_skip and idx in to_skip:
4953 logging.info("Creating volume %s for instance %s",
4954 device.iv_name, instance.name)
4956 for node in instance.all_nodes:
4957 f_create = node == pnode
4958 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4961 def _RemoveDisks(lu, instance):
4962 """Remove all disks for an instance.
4964 This abstracts away some work from `AddInstance()` and
4965 `RemoveInstance()`. Note that in case some of the devices couldn't
4966 be removed, the removal will continue with the other ones (compare
4967 with `_CreateDisks()`).
4969 @type lu: L{LogicalUnit}
4970 @param lu: the logical unit on whose behalf we execute
4971 @type instance: L{objects.Instance}
4972 @param instance: the instance whose disks we should remove
4974 @return: the success of the removal
4977 logging.info("Removing block devices for instance %s", instance.name)
4980 for device in instance.disks:
4981 for node, disk in device.ComputeNodeTree(instance.primary_node):
4982 lu.cfg.SetDiskID(disk, node)
4983 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4985 lu.LogWarning("Could not remove block device %s on node %s,"
4986 " continuing anyway: %s", device.iv_name, node, msg)
4989 if instance.disk_template == constants.DT_FILE:
4990 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4991 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4993 msg = result.fail_msg
4995 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4996 file_storage_dir, instance.primary_node, msg)
5002 def _ComputeDiskSize(disk_template, disks):
5003 """Compute disk size requirements in the volume group
5006 # Required free disk space as a function of disk and swap space
5008 constants.DT_DISKLESS: None,
5009 constants.DT_PLAIN: sum(d["size"] for d in disks),
5010 # 128 MB are added for drbd metadata for each disk
5011 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5012 constants.DT_FILE: None,
5015 if disk_template not in req_size_dict:
5016 raise errors.ProgrammerError("Disk template '%s' size requirement"
5017 " is unknown" % disk_template)
5019 return req_size_dict[disk_template]
5022 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5023 """Hypervisor parameter validation.
5025 This function abstract the hypervisor parameter validation to be
5026 used in both instance create and instance modify.
5028 @type lu: L{LogicalUnit}
5029 @param lu: the logical unit for which we check
5030 @type nodenames: list
5031 @param nodenames: the list of nodes on which we should check
5032 @type hvname: string
5033 @param hvname: the name of the hypervisor we should use
5034 @type hvparams: dict
5035 @param hvparams: the parameters which we need to check
5036 @raise errors.OpPrereqError: if the parameters are not valid
5039 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5042 for node in nodenames:
5046 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5049 class LUCreateInstance(LogicalUnit):
5050 """Create an instance.
5053 HPATH = "instance-add"
5054 HTYPE = constants.HTYPE_INSTANCE
5055 _OP_REQP = ["instance_name", "disks", "disk_template",
5057 "wait_for_sync", "ip_check", "nics",
5058 "hvparams", "beparams"]
5061 def _ExpandNode(self, node):
5062 """Expands and checks one node name.
5065 node_full = self.cfg.ExpandNodeName(node)
5066 if node_full is None:
5067 raise errors.OpPrereqError("Unknown node %s" % node)
5070 def ExpandNames(self):
5071 """ExpandNames for CreateInstance.
5073 Figure out the right locks for instance creation.
5076 self.needed_locks = {}
5078 # set optional parameters to none if they don't exist
5079 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5080 if not hasattr(self.op, attr):
5081 setattr(self.op, attr, None)
5083 # cheap checks, mostly valid constants given
5085 # verify creation mode
5086 if self.op.mode not in (constants.INSTANCE_CREATE,
5087 constants.INSTANCE_IMPORT):
5088 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5091 # disk template and mirror node verification
5092 if self.op.disk_template not in constants.DISK_TEMPLATES:
5093 raise errors.OpPrereqError("Invalid disk template name")
5095 if self.op.hypervisor is None:
5096 self.op.hypervisor = self.cfg.GetHypervisorType()
5098 cluster = self.cfg.GetClusterInfo()
5099 enabled_hvs = cluster.enabled_hypervisors
5100 if self.op.hypervisor not in enabled_hvs:
5101 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5102 " cluster (%s)" % (self.op.hypervisor,
5103 ",".join(enabled_hvs)))
5105 # check hypervisor parameter syntax (locally)
5106 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5107 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5109 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5110 hv_type.CheckParameterSyntax(filled_hvp)
5111 self.hv_full = filled_hvp
5113 # fill and remember the beparams dict
5114 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5115 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5118 #### instance parameters check
5120 # instance name verification
5121 hostname1 = utils.HostInfo(self.op.instance_name)
5122 self.op.instance_name = instance_name = hostname1.name
5124 # this is just a preventive check, but someone might still add this
5125 # instance in the meantime, and creation will fail at lock-add time
5126 if instance_name in self.cfg.GetInstanceList():
5127 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5130 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5134 for idx, nic in enumerate(self.op.nics):
5135 nic_mode_req = nic.get("mode", None)
5136 nic_mode = nic_mode_req
5137 if nic_mode is None:
5138 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5140 # in routed mode, for the first nic, the default ip is 'auto'
5141 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5142 default_ip_mode = constants.VALUE_AUTO
5144 default_ip_mode = constants.VALUE_NONE
5146 # ip validity checks
5147 ip = nic.get("ip", default_ip_mode)
5148 if ip is None or ip.lower() == constants.VALUE_NONE:
5150 elif ip.lower() == constants.VALUE_AUTO:
5151 nic_ip = hostname1.ip
5153 if not utils.IsValidIP(ip):
5154 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5155 " like a valid IP" % ip)
5158 # TODO: check the ip for uniqueness !!
5159 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5160 raise errors.OpPrereqError("Routed nic mode requires an ip address")
5162 # MAC address verification
5163 mac = nic.get("mac", constants.VALUE_AUTO)
5164 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5165 if not utils.IsValidMac(mac.lower()):
5166 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5168 # bridge verification
5169 bridge = nic.get("bridge", None)
5170 link = nic.get("link", None)
5172 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5173 " at the same time")
5174 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5175 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
5181 nicparams[constants.NIC_MODE] = nic_mode_req
5183 nicparams[constants.NIC_LINK] = link
5185 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5187 objects.NIC.CheckParameterSyntax(check_params)
5188 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5190 # disk checks/pre-build
5192 for disk in self.op.disks:
5193 mode = disk.get("mode", constants.DISK_RDWR)
5194 if mode not in constants.DISK_ACCESS_SET:
5195 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5197 size = disk.get("size", None)
5199 raise errors.OpPrereqError("Missing disk size")
5203 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
5204 self.disks.append({"size": size, "mode": mode})
5206 # used in CheckPrereq for ip ping check
5207 self.check_ip = hostname1.ip
5209 # file storage checks
5210 if (self.op.file_driver and
5211 not self.op.file_driver in constants.FILE_DRIVER):
5212 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5213 self.op.file_driver)
5215 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5216 raise errors.OpPrereqError("File storage directory path not absolute")
5218 ### Node/iallocator related checks
5219 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5220 raise errors.OpPrereqError("One and only one of iallocator and primary"
5221 " node must be given")
5223 if self.op.iallocator:
5224 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5226 self.op.pnode = self._ExpandNode(self.op.pnode)
5227 nodelist = [self.op.pnode]
5228 if self.op.snode is not None:
5229 self.op.snode = self._ExpandNode(self.op.snode)
5230 nodelist.append(self.op.snode)
5231 self.needed_locks[locking.LEVEL_NODE] = nodelist
5233 # in case of import lock the source node too
5234 if self.op.mode == constants.INSTANCE_IMPORT:
5235 src_node = getattr(self.op, "src_node", None)
5236 src_path = getattr(self.op, "src_path", None)
5238 if src_path is None:
5239 self.op.src_path = src_path = self.op.instance_name
5241 if src_node is None:
5242 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5243 self.op.src_node = None
5244 if os.path.isabs(src_path):
5245 raise errors.OpPrereqError("Importing an instance from an absolute"
5246 " path requires a source node option.")
5248 self.op.src_node = src_node = self._ExpandNode(src_node)
5249 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5250 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5251 if not os.path.isabs(src_path):
5252 self.op.src_path = src_path = \
5253 os.path.join(constants.EXPORT_DIR, src_path)
5255 else: # INSTANCE_CREATE
5256 if getattr(self.op, "os_type", None) is None:
5257 raise errors.OpPrereqError("No guest OS specified")
5259 def _RunAllocator(self):
5260 """Run the allocator based on input opcode.
5263 nics = [n.ToDict() for n in self.nics]
5264 ial = IAllocator(self.cfg, self.rpc,
5265 mode=constants.IALLOCATOR_MODE_ALLOC,
5266 name=self.op.instance_name,
5267 disk_template=self.op.disk_template,
5270 vcpus=self.be_full[constants.BE_VCPUS],
5271 mem_size=self.be_full[constants.BE_MEMORY],
5274 hypervisor=self.op.hypervisor,
5277 ial.Run(self.op.iallocator)
5280 raise errors.OpPrereqError("Can't compute nodes using"
5281 " iallocator '%s': %s" % (self.op.iallocator,
5283 if len(ial.nodes) != ial.required_nodes:
5284 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5285 " of nodes (%s), required %s" %
5286 (self.op.iallocator, len(ial.nodes),
5287 ial.required_nodes))
5288 self.op.pnode = ial.nodes[0]
5289 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5290 self.op.instance_name, self.op.iallocator,
5291 ", ".join(ial.nodes))
5292 if ial.required_nodes == 2:
5293 self.op.snode = ial.nodes[1]
5295 def BuildHooksEnv(self):
5298 This runs on master, primary and secondary nodes of the instance.
5302 "ADD_MODE": self.op.mode,
5304 if self.op.mode == constants.INSTANCE_IMPORT:
5305 env["SRC_NODE"] = self.op.src_node
5306 env["SRC_PATH"] = self.op.src_path
5307 env["SRC_IMAGES"] = self.src_images
5309 env.update(_BuildInstanceHookEnv(
5310 name=self.op.instance_name,
5311 primary_node=self.op.pnode,
5312 secondary_nodes=self.secondaries,
5313 status=self.op.start,
5314 os_type=self.op.os_type,
5315 memory=self.be_full[constants.BE_MEMORY],
5316 vcpus=self.be_full[constants.BE_VCPUS],
5317 nics=_NICListToTuple(self, self.nics),
5318 disk_template=self.op.disk_template,
5319 disks=[(d["size"], d["mode"]) for d in self.disks],
5322 hypervisor_name=self.op.hypervisor,
5325 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5330 def CheckPrereq(self):
5331 """Check prerequisites.
5334 if (not self.cfg.GetVGName() and
5335 self.op.disk_template not in constants.DTS_NOT_LVM):
5336 raise errors.OpPrereqError("Cluster does not support lvm-based"
5339 if self.op.mode == constants.INSTANCE_IMPORT:
5340 src_node = self.op.src_node
5341 src_path = self.op.src_path
5343 if src_node is None:
5344 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5345 exp_list = self.rpc.call_export_list(locked_nodes)
5347 for node in exp_list:
5348 if exp_list[node].fail_msg:
5350 if src_path in exp_list[node].payload:
5352 self.op.src_node = src_node = node
5353 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5357 raise errors.OpPrereqError("No export found for relative path %s" %
5360 _CheckNodeOnline(self, src_node)
5361 result = self.rpc.call_export_info(src_node, src_path)
5362 result.Raise("No export or invalid export found in dir %s" % src_path)
5364 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5365 if not export_info.has_section(constants.INISECT_EXP):
5366 raise errors.ProgrammerError("Corrupted export config")
5368 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5369 if (int(ei_version) != constants.EXPORT_VERSION):
5370 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5371 (ei_version, constants.EXPORT_VERSION))
5373 # Check that the new instance doesn't have less disks than the export
5374 instance_disks = len(self.disks)
5375 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5376 if instance_disks < export_disks:
5377 raise errors.OpPrereqError("Not enough disks to import."
5378 " (instance: %d, export: %d)" %
5379 (instance_disks, export_disks))
5381 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5383 for idx in range(export_disks):
5384 option = 'disk%d_dump' % idx
5385 if export_info.has_option(constants.INISECT_INS, option):
5386 # FIXME: are the old os-es, disk sizes, etc. useful?
5387 export_name = export_info.get(constants.INISECT_INS, option)
5388 image = os.path.join(src_path, export_name)
5389 disk_images.append(image)
5391 disk_images.append(False)
5393 self.src_images = disk_images
5395 old_name = export_info.get(constants.INISECT_INS, 'name')
5396 # FIXME: int() here could throw a ValueError on broken exports
5397 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5398 if self.op.instance_name == old_name:
5399 for idx, nic in enumerate(self.nics):
5400 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5401 nic_mac_ini = 'nic%d_mac' % idx
5402 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5404 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5405 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5406 if self.op.start and not self.op.ip_check:
5407 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5408 " adding an instance in start mode")
5410 if self.op.ip_check:
5411 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5412 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5413 (self.check_ip, self.op.instance_name))
5415 #### mac address generation
5416 # By generating here the mac address both the allocator and the hooks get
5417 # the real final mac address rather than the 'auto' or 'generate' value.
5418 # There is a race condition between the generation and the instance object
5419 # creation, which means that we know the mac is valid now, but we're not
5420 # sure it will be when we actually add the instance. If things go bad
5421 # adding the instance will abort because of a duplicate mac, and the
5422 # creation job will fail.
5423 for nic in self.nics:
5424 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5425 nic.mac = self.cfg.GenerateMAC()
5429 if self.op.iallocator is not None:
5430 self._RunAllocator()
5432 #### node related checks
5434 # check primary node
5435 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5436 assert self.pnode is not None, \
5437 "Cannot retrieve locked node %s" % self.op.pnode
5439 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5442 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5445 self.secondaries = []
5447 # mirror node verification
5448 if self.op.disk_template in constants.DTS_NET_MIRROR:
5449 if self.op.snode is None:
5450 raise errors.OpPrereqError("The networked disk templates need"
5452 if self.op.snode == pnode.name:
5453 raise errors.OpPrereqError("The secondary node cannot be"
5454 " the primary node.")
5455 _CheckNodeOnline(self, self.op.snode)
5456 _CheckNodeNotDrained(self, self.op.snode)
5457 self.secondaries.append(self.op.snode)
5459 nodenames = [pnode.name] + self.secondaries
5461 req_size = _ComputeDiskSize(self.op.disk_template,
5464 # Check lv size requirements
5465 if req_size is not None:
5466 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5468 for node in nodenames:
5469 info = nodeinfo[node]
5470 info.Raise("Cannot get current information from node %s" % node)
5472 vg_free = info.get('vg_free', None)
5473 if not isinstance(vg_free, int):
5474 raise errors.OpPrereqError("Can't compute free disk space on"
5476 if req_size > vg_free:
5477 raise errors.OpPrereqError("Not enough disk space on target node %s."
5478 " %d MB available, %d MB required" %
5479 (node, vg_free, req_size))
5481 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5484 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5485 result.Raise("OS '%s' not in supported os list for primary node %s" %
5486 (self.op.os_type, pnode.name), prereq=True)
5488 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5490 # memory check on primary node
5492 _CheckNodeFreeMemory(self, self.pnode.name,
5493 "creating instance %s" % self.op.instance_name,
5494 self.be_full[constants.BE_MEMORY],
5497 self.dry_run_result = list(nodenames)
5499 def Exec(self, feedback_fn):
5500 """Create and add the instance to the cluster.
5503 instance = self.op.instance_name
5504 pnode_name = self.pnode.name
5506 ht_kind = self.op.hypervisor
5507 if ht_kind in constants.HTS_REQ_PORT:
5508 network_port = self.cfg.AllocatePort()
5512 ##if self.op.vnc_bind_address is None:
5513 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5515 # this is needed because os.path.join does not accept None arguments
5516 if self.op.file_storage_dir is None:
5517 string_file_storage_dir = ""
5519 string_file_storage_dir = self.op.file_storage_dir
5521 # build the full file storage dir path
5522 file_storage_dir = os.path.normpath(os.path.join(
5523 self.cfg.GetFileStorageDir(),
5524 string_file_storage_dir, instance))
5527 disks = _GenerateDiskTemplate(self,
5528 self.op.disk_template,
5529 instance, pnode_name,
5533 self.op.file_driver,
5536 iobj = objects.Instance(name=instance, os=self.op.os_type,
5537 primary_node=pnode_name,
5538 nics=self.nics, disks=disks,
5539 disk_template=self.op.disk_template,
5541 network_port=network_port,
5542 beparams=self.op.beparams,
5543 hvparams=self.op.hvparams,
5544 hypervisor=self.op.hypervisor,
5547 feedback_fn("* creating instance disks...")
5549 _CreateDisks(self, iobj)
5550 except errors.OpExecError:
5551 self.LogWarning("Device creation failed, reverting...")
5553 _RemoveDisks(self, iobj)
5555 self.cfg.ReleaseDRBDMinors(instance)
5558 feedback_fn("adding instance %s to cluster config" % instance)
5560 self.cfg.AddInstance(iobj)
5561 # Declare that we don't want to remove the instance lock anymore, as we've
5562 # added the instance to the config
5563 del self.remove_locks[locking.LEVEL_INSTANCE]
5564 # Unlock all the nodes
5565 if self.op.mode == constants.INSTANCE_IMPORT:
5566 nodes_keep = [self.op.src_node]
5567 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5568 if node != self.op.src_node]
5569 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5570 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5572 self.context.glm.release(locking.LEVEL_NODE)
5573 del self.acquired_locks[locking.LEVEL_NODE]
5575 if self.op.wait_for_sync:
5576 disk_abort = not _WaitForSync(self, iobj)
5577 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5578 # make sure the disks are not degraded (still sync-ing is ok)
5580 feedback_fn("* checking mirrors status")
5581 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5586 _RemoveDisks(self, iobj)
5587 self.cfg.RemoveInstance(iobj.name)
5588 # Make sure the instance lock gets removed
5589 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5590 raise errors.OpExecError("There are some degraded disks for"
5593 feedback_fn("creating os for instance %s on node %s" %
5594 (instance, pnode_name))
5596 if iobj.disk_template != constants.DT_DISKLESS:
5597 if self.op.mode == constants.INSTANCE_CREATE:
5598 feedback_fn("* running the instance OS create scripts...")
5599 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5600 result.Raise("Could not add os for instance %s"
5601 " on node %s" % (instance, pnode_name))
5603 elif self.op.mode == constants.INSTANCE_IMPORT:
5604 feedback_fn("* running the instance OS import scripts...")
5605 src_node = self.op.src_node
5606 src_images = self.src_images
5607 cluster_name = self.cfg.GetClusterName()
5608 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5609 src_node, src_images,
5611 msg = import_result.fail_msg
5613 self.LogWarning("Error while importing the disk images for instance"
5614 " %s on node %s: %s" % (instance, pnode_name, msg))
5616 # also checked in the prereq part
5617 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5621 iobj.admin_up = True
5622 self.cfg.Update(iobj)
5623 logging.info("Starting instance %s on node %s", instance, pnode_name)
5624 feedback_fn("* starting instance...")
5625 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5626 result.Raise("Could not start instance")
5628 return list(iobj.all_nodes)
5631 class LUConnectConsole(NoHooksLU):
5632 """Connect to an instance's console.
5634 This is somewhat special in that it returns the command line that
5635 you need to run on the master node in order to connect to the
5639 _OP_REQP = ["instance_name"]
5642 def ExpandNames(self):
5643 self._ExpandAndLockInstance()
5645 def CheckPrereq(self):
5646 """Check prerequisites.
5648 This checks that the instance is in the cluster.
5651 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5652 assert self.instance is not None, \
5653 "Cannot retrieve locked instance %s" % self.op.instance_name
5654 _CheckNodeOnline(self, self.instance.primary_node)
5656 def Exec(self, feedback_fn):
5657 """Connect to the console of an instance
5660 instance = self.instance
5661 node = instance.primary_node
5663 node_insts = self.rpc.call_instance_list([node],
5664 [instance.hypervisor])[node]
5665 node_insts.Raise("Can't get node information from %s" % node)
5667 if instance.name not in node_insts.payload:
5668 raise errors.OpExecError("Instance %s is not running." % instance.name)
5670 logging.debug("Connecting to console of %s on %s", instance.name, node)
5672 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5673 cluster = self.cfg.GetClusterInfo()
5674 # beparams and hvparams are passed separately, to avoid editing the
5675 # instance and then saving the defaults in the instance itself.
5676 hvparams = cluster.FillHV(instance)
5677 beparams = cluster.FillBE(instance)
5678 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5681 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5684 class LUReplaceDisks(LogicalUnit):
5685 """Replace the disks of an instance.
5688 HPATH = "mirrors-replace"
5689 HTYPE = constants.HTYPE_INSTANCE
5690 _OP_REQP = ["instance_name", "mode", "disks"]
5693 def CheckArguments(self):
5694 if not hasattr(self.op, "remote_node"):
5695 self.op.remote_node = None
5696 if not hasattr(self.op, "iallocator"):
5697 self.op.iallocator = None
5699 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
5702 def ExpandNames(self):
5703 self._ExpandAndLockInstance()
5705 if self.op.iallocator is not None:
5706 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5708 elif self.op.remote_node is not None:
5709 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5710 if remote_node is None:
5711 raise errors.OpPrereqError("Node '%s' not known" %
5712 self.op.remote_node)
5714 self.op.remote_node = remote_node
5716 # Warning: do not remove the locking of the new secondary here
5717 # unless DRBD8.AddChildren is changed to work in parallel;
5718 # currently it doesn't since parallel invocations of
5719 # FindUnusedMinor will conflict
5720 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5721 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5724 self.needed_locks[locking.LEVEL_NODE] = []
5725 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5727 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
5728 self.op.iallocator, self.op.remote_node,
5731 self.tasklets = [self.replacer]
5733 def DeclareLocks(self, level):
5734 # If we're not already locking all nodes in the set we have to declare the
5735 # instance's primary/secondary nodes.
5736 if (level == locking.LEVEL_NODE and
5737 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5738 self._LockInstancesNodes()
5740 def BuildHooksEnv(self):
5743 This runs on the master, the primary and all the secondaries.
5746 instance = self.replacer.instance
5748 "MODE": self.op.mode,
5749 "NEW_SECONDARY": self.op.remote_node,
5750 "OLD_SECONDARY": instance.secondary_nodes[0],
5752 env.update(_BuildInstanceHookEnvByObject(self, instance))
5754 self.cfg.GetMasterNode(),
5755 instance.primary_node,
5757 if self.op.remote_node is not None:
5758 nl.append(self.op.remote_node)
5762 class LUEvacuateNode(LogicalUnit):
5763 """Relocate the secondary instances from a node.
5766 HPATH = "node-evacuate"
5767 HTYPE = constants.HTYPE_NODE
5768 _OP_REQP = ["node_name"]
5771 def CheckArguments(self):
5772 if not hasattr(self.op, "remote_node"):
5773 self.op.remote_node = None
5774 if not hasattr(self.op, "iallocator"):
5775 self.op.iallocator = None
5777 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
5778 self.op.remote_node,
5781 def ExpandNames(self):
5782 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
5783 if self.op.node_name is None:
5784 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
5786 self.needed_locks = {}
5788 # Declare node locks
5789 if self.op.iallocator is not None:
5790 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5792 elif self.op.remote_node is not None:
5793 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5794 if remote_node is None:
5795 raise errors.OpPrereqError("Node '%s' not known" %
5796 self.op.remote_node)
5798 self.op.remote_node = remote_node
5800 # Warning: do not remove the locking of the new secondary here
5801 # unless DRBD8.AddChildren is changed to work in parallel;
5802 # currently it doesn't since parallel invocations of
5803 # FindUnusedMinor will conflict
5804 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5805 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5808 raise errors.OpPrereqError("Invalid parameters")
5810 # Create tasklets for replacing disks for all secondary instances on this
5815 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
5816 logging.debug("Replacing disks for instance %s", inst.name)
5817 names.append(inst.name)
5819 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
5820 self.op.iallocator, self.op.remote_node, [])
5821 tasklets.append(replacer)
5823 self.tasklets = tasklets
5824 self.instance_names = names
5826 # Declare instance locks
5827 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
5829 def DeclareLocks(self, level):
5830 # If we're not already locking all nodes in the set we have to declare the
5831 # instance's primary/secondary nodes.
5832 if (level == locking.LEVEL_NODE and
5833 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5834 self._LockInstancesNodes()
5836 def BuildHooksEnv(self):
5839 This runs on the master, the primary and all the secondaries.
5843 "NODE_NAME": self.op.node_name,
5846 nl = [self.cfg.GetMasterNode()]
5848 if self.op.remote_node is not None:
5849 env["NEW_SECONDARY"] = self.op.remote_node
5850 nl.append(self.op.remote_node)
5852 return (env, nl, nl)
5855 class TLReplaceDisks(Tasklet):
5856 """Replaces disks for an instance.
5858 Note: Locking is not within the scope of this class.
5861 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
5863 """Initializes this class.
5866 Tasklet.__init__(self, lu)
5869 self.instance_name = instance_name
5871 self.iallocator_name = iallocator_name
5872 self.remote_node = remote_node
5876 self.instance = None
5877 self.new_node = None
5878 self.target_node = None
5879 self.other_node = None
5880 self.remote_node_info = None
5881 self.node_secondary_ip = None
5884 def CheckArguments(mode, remote_node, iallocator):
5885 """Helper function for users of this class.
5888 # check for valid parameter combination
5889 if mode == constants.REPLACE_DISK_CHG:
5890 if remote_node is None and iallocator is None:
5891 raise errors.OpPrereqError("When changing the secondary either an"
5892 " iallocator script must be used or the"
5895 if remote_node is not None and iallocator is not None:
5896 raise errors.OpPrereqError("Give either the iallocator or the new"
5897 " secondary, not both")
5899 elif remote_node is not None or iallocator is not None:
5900 # Not replacing the secondary
5901 raise errors.OpPrereqError("The iallocator and new node options can"
5902 " only be used when changing the"
5906 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
5907 """Compute a new secondary node using an IAllocator.
5910 ial = IAllocator(lu.cfg, lu.rpc,
5911 mode=constants.IALLOCATOR_MODE_RELOC,
5913 relocate_from=relocate_from)
5915 ial.Run(iallocator_name)
5918 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
5919 " %s" % (iallocator_name, ial.info))
5921 if len(ial.nodes) != ial.required_nodes:
5922 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5923 " of nodes (%s), required %s" %
5924 (len(ial.nodes), ial.required_nodes))
5926 remote_node_name = ial.nodes[0]
5928 lu.LogInfo("Selected new secondary for instance '%s': %s",
5929 instance_name, remote_node_name)
5931 return remote_node_name
5933 def _FindFaultyDisks(self, node_name):
5934 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
5937 def CheckPrereq(self):
5938 """Check prerequisites.
5940 This checks that the instance is in the cluster.
5943 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
5944 assert self.instance is not None, \
5945 "Cannot retrieve locked instance %s" % self.instance_name
5947 if self.instance.disk_template != constants.DT_DRBD8:
5948 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5951 if len(self.instance.secondary_nodes) != 1:
5952 raise errors.OpPrereqError("The instance has a strange layout,"
5953 " expected one secondary but found %d" %
5954 len(self.instance.secondary_nodes))
5956 secondary_node = self.instance.secondary_nodes[0]
5958 if self.iallocator_name is None:
5959 remote_node = self.remote_node
5961 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
5962 self.instance.name, secondary_node)
5964 if remote_node is not None:
5965 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5966 assert self.remote_node_info is not None, \
5967 "Cannot retrieve locked node %s" % remote_node
5969 self.remote_node_info = None
5971 if remote_node == self.instance.primary_node:
5972 raise errors.OpPrereqError("The specified node is the primary node of"
5975 if remote_node == secondary_node:
5976 raise errors.OpPrereqError("The specified node is already the"
5977 " secondary node of the instance.")
5979 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
5980 constants.REPLACE_DISK_CHG):
5981 raise errors.OpPrereqError("Cannot specify disks to be replaced")
5983 if self.mode == constants.REPLACE_DISK_AUTO:
5984 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
5985 faulty_secondary = self._FindFaultyDisks(secondary_node)
5987 if faulty_primary and faulty_secondary:
5988 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
5989 " one node and can not be repaired"
5990 " automatically" % self.instance_name)
5993 self.disks = faulty_primary
5994 self.target_node = self.instance.primary_node
5995 self.other_node = secondary_node
5996 check_nodes = [self.target_node, self.other_node]
5997 elif faulty_secondary:
5998 self.disks = faulty_secondary
5999 self.target_node = secondary_node
6000 self.other_node = self.instance.primary_node
6001 check_nodes = [self.target_node, self.other_node]
6007 # Non-automatic modes
6008 if self.mode == constants.REPLACE_DISK_PRI:
6009 self.target_node = self.instance.primary_node
6010 self.other_node = secondary_node
6011 check_nodes = [self.target_node, self.other_node]
6013 elif self.mode == constants.REPLACE_DISK_SEC:
6014 self.target_node = secondary_node
6015 self.other_node = self.instance.primary_node
6016 check_nodes = [self.target_node, self.other_node]
6018 elif self.mode == constants.REPLACE_DISK_CHG:
6019 self.new_node = remote_node
6020 self.other_node = self.instance.primary_node
6021 self.target_node = secondary_node
6022 check_nodes = [self.new_node, self.other_node]
6024 _CheckNodeNotDrained(self.lu, remote_node)
6027 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6030 # If not specified all disks should be replaced
6032 self.disks = range(len(self.instance.disks))
6034 for node in check_nodes:
6035 _CheckNodeOnline(self.lu, node)
6037 # Check whether disks are valid
6038 for disk_idx in self.disks:
6039 self.instance.FindDisk(disk_idx)
6041 # Get secondary node IP addresses
6044 for node_name in [self.target_node, self.other_node, self.new_node]:
6045 if node_name is not None:
6046 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6048 self.node_secondary_ip = node_2nd_ip
6050 def Exec(self, feedback_fn):
6051 """Execute disk replacement.
6053 This dispatches the disk replacement to the appropriate handler.
6057 feedback_fn("No disks need replacement")
6060 feedback_fn("Replacing disk(s) %s for %s" %
6061 (", ".join([str(i) for i in self.disks]), self.instance.name))
6063 activate_disks = (not self.instance.admin_up)
6065 # Activate the instance disks if we're replacing them on a down instance
6067 _StartInstanceDisks(self.lu, self.instance, True)
6070 # Should we replace the secondary node?
6071 if self.new_node is not None:
6072 return self._ExecDrbd8Secondary()
6074 return self._ExecDrbd8DiskOnly()
6077 # Deactivate the instance disks if we're replacing them on a down instance
6079 _SafeShutdownInstanceDisks(self.lu, self.instance)
6081 def _CheckVolumeGroup(self, nodes):
6082 self.lu.LogInfo("Checking volume groups")
6084 vgname = self.cfg.GetVGName()
6086 # Make sure volume group exists on all involved nodes
6087 results = self.rpc.call_vg_list(nodes)
6089 raise errors.OpExecError("Can't list volume groups on the nodes")
6093 res.Raise("Error checking node %s" % node)
6094 if vgname not in res.payload:
6095 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6098 def _CheckDisksExistence(self, nodes):
6099 # Check disk existence
6100 for idx, dev in enumerate(self.instance.disks):
6101 if idx not in self.disks:
6105 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6106 self.cfg.SetDiskID(dev, node)
6108 result = self.rpc.call_blockdev_find(node, dev)
6110 msg = result.fail_msg
6111 if msg or not result.payload:
6113 msg = "disk not found"
6114 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6117 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6118 for idx, dev in enumerate(self.instance.disks):
6119 if idx not in self.disks:
6122 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6125 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6127 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6128 " replace disks for instance %s" %
6129 (node_name, self.instance.name))
6131 def _CreateNewStorage(self, node_name):
6132 vgname = self.cfg.GetVGName()
6135 for idx, dev in enumerate(self.instance.disks):
6136 if idx not in self.disks:
6139 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6141 self.cfg.SetDiskID(dev, node_name)
6143 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6144 names = _GenerateUniqueNames(self.lu, lv_names)
6146 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6147 logical_id=(vgname, names[0]))
6148 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6149 logical_id=(vgname, names[1]))
6151 new_lvs = [lv_data, lv_meta]
6152 old_lvs = dev.children
6153 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6155 # we pass force_create=True to force the LVM creation
6156 for new_lv in new_lvs:
6157 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6158 _GetInstanceInfoText(self.instance), False)
6162 def _CheckDevices(self, node_name, iv_names):
6163 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6164 self.cfg.SetDiskID(dev, node_name)
6166 result = self.rpc.call_blockdev_find(node_name, dev)
6168 msg = result.fail_msg
6169 if msg or not result.payload:
6171 msg = "disk not found"
6172 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6175 if result.payload.is_degraded:
6176 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6178 def _RemoveOldStorage(self, node_name, iv_names):
6179 for name, (dev, old_lvs, _) in iv_names.iteritems():
6180 self.lu.LogInfo("Remove logical volumes for %s" % name)
6183 self.cfg.SetDiskID(lv, node_name)
6185 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6187 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6188 hint="remove unused LVs manually")
6190 def _ExecDrbd8DiskOnly(self):
6191 """Replace a disk on the primary or secondary for DRBD 8.
6193 The algorithm for replace is quite complicated:
6195 1. for each disk to be replaced:
6197 1. create new LVs on the target node with unique names
6198 1. detach old LVs from the drbd device
6199 1. rename old LVs to name_replaced.<time_t>
6200 1. rename new LVs to old LVs
6201 1. attach the new LVs (with the old names now) to the drbd device
6203 1. wait for sync across all devices
6205 1. for each modified disk:
6207 1. remove old LVs (which have the name name_replaces.<time_t>)
6209 Failures are not very well handled.
6214 # Step: check device activation
6215 self.lu.LogStep(1, steps_total, "Check device existence")
6216 self._CheckDisksExistence([self.other_node, self.target_node])
6217 self._CheckVolumeGroup([self.target_node, self.other_node])
6219 # Step: check other node consistency
6220 self.lu.LogStep(2, steps_total, "Check peer consistency")
6221 self._CheckDisksConsistency(self.other_node,
6222 self.other_node == self.instance.primary_node,
6225 # Step: create new storage
6226 self.lu.LogStep(3, steps_total, "Allocate new storage")
6227 iv_names = self._CreateNewStorage(self.target_node)
6229 # Step: for each lv, detach+rename*2+attach
6230 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6231 for dev, old_lvs, new_lvs in iv_names.itervalues():
6232 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6234 result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs)
6235 result.Raise("Can't detach drbd from local storage on node"
6236 " %s for device %s" % (self.target_node, dev.iv_name))
6238 #cfg.Update(instance)
6240 # ok, we created the new LVs, so now we know we have the needed
6241 # storage; as such, we proceed on the target node to rename
6242 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6243 # using the assumption that logical_id == physical_id (which in
6244 # turn is the unique_id on that node)
6246 # FIXME(iustin): use a better name for the replaced LVs
6247 temp_suffix = int(time.time())
6248 ren_fn = lambda d, suff: (d.physical_id[0],
6249 d.physical_id[1] + "_replaced-%s" % suff)
6251 # Build the rename list based on what LVs exist on the node
6252 rename_old_to_new = []
6253 for to_ren in old_lvs:
6254 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6255 if not result.fail_msg and result.payload:
6257 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6259 self.lu.LogInfo("Renaming the old LVs on the target node")
6260 result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new)
6261 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6263 # Now we rename the new LVs to the old LVs
6264 self.lu.LogInfo("Renaming the new LVs on the target node")
6265 rename_new_to_old = [(new, old.physical_id)
6266 for old, new in zip(old_lvs, new_lvs)]
6267 result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old)
6268 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6270 for old, new in zip(old_lvs, new_lvs):
6271 new.logical_id = old.logical_id
6272 self.cfg.SetDiskID(new, self.target_node)
6274 for disk in old_lvs:
6275 disk.logical_id = ren_fn(disk, temp_suffix)
6276 self.cfg.SetDiskID(disk, self.target_node)
6278 # Now that the new lvs have the old name, we can add them to the device
6279 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6280 result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs)
6281 msg = result.fail_msg
6283 for new_lv in new_lvs:
6284 msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg
6286 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6287 hint=("cleanup manually the unused logical"
6289 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6291 dev.children = new_lvs
6293 self.cfg.Update(self.instance)
6296 # This can fail as the old devices are degraded and _WaitForSync
6297 # does a combined result over all disks, so we don't check its return value
6298 self.lu.LogStep(5, steps_total, "Sync devices")
6299 _WaitForSync(self.lu, self.instance, unlock=True)
6301 # Check all devices manually
6302 self._CheckDevices(self.instance.primary_node, iv_names)
6304 # Step: remove old storage
6305 self.lu.LogStep(6, steps_total, "Removing old storage")
6306 self._RemoveOldStorage(self.target_node, iv_names)
6308 def _ExecDrbd8Secondary(self):
6309 """Replace the secondary node for DRBD 8.
6311 The algorithm for replace is quite complicated:
6312 - for all disks of the instance:
6313 - create new LVs on the new node with same names
6314 - shutdown the drbd device on the old secondary
6315 - disconnect the drbd network on the primary
6316 - create the drbd device on the new secondary
6317 - network attach the drbd on the primary, using an artifice:
6318 the drbd code for Attach() will connect to the network if it
6319 finds a device which is connected to the good local disks but
6321 - wait for sync across all devices
6322 - remove all disks from the old secondary
6324 Failures are not very well handled.
6329 # Step: check device activation
6330 self.lu.LogStep(1, steps_total, "Check device existence")
6331 self._CheckDisksExistence([self.instance.primary_node])
6332 self._CheckVolumeGroup([self.instance.primary_node])
6334 # Step: check other node consistency
6335 self.lu.LogStep(2, steps_total, "Check peer consistency")
6336 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6338 # Step: create new storage
6339 self.lu.LogStep(3, steps_total, "Allocate new storage")
6340 for idx, dev in enumerate(self.instance.disks):
6341 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6342 (self.new_node, idx))
6343 # we pass force_create=True to force LVM creation
6344 for new_lv in dev.children:
6345 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6346 _GetInstanceInfoText(self.instance), False)
6348 # Step 4: dbrd minors and drbd setups changes
6349 # after this, we must manually remove the drbd minors on both the
6350 # error and the success paths
6351 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6352 minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks],
6354 logging.debug("Allocated minors %r" % (minors,))
6357 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6358 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx))
6359 # create new devices on new_node; note that we create two IDs:
6360 # one without port, so the drbd will be activated without
6361 # networking information on the new node at this stage, and one
6362 # with network, for the latter activation in step 4
6363 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6364 if self.instance.primary_node == o_node1:
6369 new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret)
6370 new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret)
6372 iv_names[idx] = (dev, dev.children, new_net_id)
6373 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6375 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6376 logical_id=new_alone_id,
6377 children=dev.children,
6380 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6381 _GetInstanceInfoText(self.instance), False)
6382 except errors.GenericError:
6383 self.cfg.ReleaseDRBDMinors(self.instance.name)
6386 # We have new devices, shutdown the drbd on the old secondary
6387 for idx, dev in enumerate(self.instance.disks):
6388 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6389 self.cfg.SetDiskID(dev, self.target_node)
6390 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6392 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6393 "node: %s" % (idx, msg),
6394 hint=("Please cleanup this device manually as"
6395 " soon as possible"))
6397 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6398 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip,
6399 self.instance.disks)[self.instance.primary_node]
6401 msg = result.fail_msg
6403 # detaches didn't succeed (unlikely)
6404 self.cfg.ReleaseDRBDMinors(self.instance.name)
6405 raise errors.OpExecError("Can't detach the disks from the network on"
6406 " old node: %s" % (msg,))
6408 # if we managed to detach at least one, we update all the disks of
6409 # the instance to point to the new secondary
6410 self.lu.LogInfo("Updating instance configuration")
6411 for dev, _, new_logical_id in iv_names.itervalues():
6412 dev.logical_id = new_logical_id
6413 self.cfg.SetDiskID(dev, self.instance.primary_node)
6415 self.cfg.Update(self.instance)
6417 # and now perform the drbd attach
6418 self.lu.LogInfo("Attaching primary drbds to new secondary"
6419 " (standalone => connected)")
6420 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip,
6421 self.instance.disks, self.instance.name,
6423 for to_node, to_result in result.items():
6424 msg = to_result.fail_msg
6426 self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg,
6427 hint=("please do a gnt-instance info to see the"
6428 " status of disks"))
6431 # This can fail as the old devices are degraded and _WaitForSync
6432 # does a combined result over all disks, so we don't check its return value
6433 self.lu.LogStep(5, steps_total, "Sync devices")
6434 _WaitForSync(self.lu, self.instance, unlock=True)
6436 # Check all devices manually
6437 self._CheckDevices(self.instance.primary_node, iv_names)
6439 # Step: remove old storage
6440 self.lu.LogStep(6, steps_total, "Removing old storage")
6441 self._RemoveOldStorage(self.target_node, iv_names)
6444 class LUGrowDisk(LogicalUnit):
6445 """Grow a disk of an instance.
6449 HTYPE = constants.HTYPE_INSTANCE
6450 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6453 def ExpandNames(self):
6454 self._ExpandAndLockInstance()
6455 self.needed_locks[locking.LEVEL_NODE] = []
6456 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6458 def DeclareLocks(self, level):
6459 if level == locking.LEVEL_NODE:
6460 self._LockInstancesNodes()
6462 def BuildHooksEnv(self):
6465 This runs on the master, the primary and all the secondaries.
6469 "DISK": self.op.disk,
6470 "AMOUNT": self.op.amount,
6472 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6474 self.cfg.GetMasterNode(),
6475 self.instance.primary_node,
6479 def CheckPrereq(self):
6480 """Check prerequisites.
6482 This checks that the instance is in the cluster.
6485 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6486 assert instance is not None, \
6487 "Cannot retrieve locked instance %s" % self.op.instance_name
6488 nodenames = list(instance.all_nodes)
6489 for node in nodenames:
6490 _CheckNodeOnline(self, node)
6493 self.instance = instance
6495 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6496 raise errors.OpPrereqError("Instance's disk layout does not support"
6499 self.disk = instance.FindDisk(self.op.disk)
6501 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6502 instance.hypervisor)
6503 for node in nodenames:
6504 info = nodeinfo[node]
6505 info.Raise("Cannot get current information from node %s" % node)
6506 vg_free = info.payload.get('vg_free', None)
6507 if not isinstance(vg_free, int):
6508 raise errors.OpPrereqError("Can't compute free disk space on"
6510 if self.op.amount > vg_free:
6511 raise errors.OpPrereqError("Not enough disk space on target node %s:"
6512 " %d MiB available, %d MiB required" %
6513 (node, vg_free, self.op.amount))
6515 def Exec(self, feedback_fn):
6516 """Execute disk grow.
6519 instance = self.instance
6521 for node in instance.all_nodes:
6522 self.cfg.SetDiskID(disk, node)
6523 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6524 result.Raise("Grow request failed to node %s" % node)
6525 disk.RecordGrow(self.op.amount)
6526 self.cfg.Update(instance)
6527 if self.op.wait_for_sync:
6528 disk_abort = not _WaitForSync(self, instance)
6530 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6531 " status.\nPlease check the instance.")
6534 class LUQueryInstanceData(NoHooksLU):
6535 """Query runtime instance data.
6538 _OP_REQP = ["instances", "static"]
6541 def ExpandNames(self):
6542 self.needed_locks = {}
6543 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6545 if not isinstance(self.op.instances, list):
6546 raise errors.OpPrereqError("Invalid argument type 'instances'")
6548 if self.op.instances:
6549 self.wanted_names = []
6550 for name in self.op.instances:
6551 full_name = self.cfg.ExpandInstanceName(name)
6552 if full_name is None:
6553 raise errors.OpPrereqError("Instance '%s' not known" % name)
6554 self.wanted_names.append(full_name)
6555 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6557 self.wanted_names = None
6558 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6560 self.needed_locks[locking.LEVEL_NODE] = []
6561 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6563 def DeclareLocks(self, level):
6564 if level == locking.LEVEL_NODE:
6565 self._LockInstancesNodes()
6567 def CheckPrereq(self):
6568 """Check prerequisites.
6570 This only checks the optional instance list against the existing names.
6573 if self.wanted_names is None:
6574 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
6576 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
6577 in self.wanted_names]
6580 def _ComputeBlockdevStatus(self, node, instance_name, dev):
6581 """Returns the status of a block device
6587 self.cfg.SetDiskID(dev, node)
6589 result = self.rpc.call_blockdev_find(node, dev)
6593 result.Raise("Can't compute disk status for %s" % instance_name)
6595 status = result.payload
6599 return (status.dev_path, status.major, status.minor,
6600 status.sync_percent, status.estimated_time,
6601 status.is_degraded, status.ldisk_status)
6603 def _ComputeDiskStatus(self, instance, snode, dev):
6604 """Compute block device status.
6607 if dev.dev_type in constants.LDS_DRBD:
6608 # we change the snode then (otherwise we use the one passed in)
6609 if dev.logical_id[0] == instance.primary_node:
6610 snode = dev.logical_id[1]
6612 snode = dev.logical_id[0]
6614 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
6616 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
6619 dev_children = [self._ComputeDiskStatus(instance, snode, child)
6620 for child in dev.children]
6625 "iv_name": dev.iv_name,
6626 "dev_type": dev.dev_type,
6627 "logical_id": dev.logical_id,
6628 "physical_id": dev.physical_id,
6629 "pstatus": dev_pstatus,
6630 "sstatus": dev_sstatus,
6631 "children": dev_children,
6638 def Exec(self, feedback_fn):
6639 """Gather and return data"""
6642 cluster = self.cfg.GetClusterInfo()
6644 for instance in self.wanted_instances:
6645 if not self.op.static:
6646 remote_info = self.rpc.call_instance_info(instance.primary_node,
6648 instance.hypervisor)
6649 remote_info.Raise("Error checking node %s" % instance.primary_node)
6650 remote_info = remote_info.payload
6651 if remote_info and "state" in remote_info:
6654 remote_state = "down"
6657 if instance.admin_up:
6660 config_state = "down"
6662 disks = [self._ComputeDiskStatus(instance, None, device)
6663 for device in instance.disks]
6666 "name": instance.name,
6667 "config_state": config_state,
6668 "run_state": remote_state,
6669 "pnode": instance.primary_node,
6670 "snodes": instance.secondary_nodes,
6672 # this happens to be the same format used for hooks
6673 "nics": _NICListToTuple(self, instance.nics),
6675 "hypervisor": instance.hypervisor,
6676 "network_port": instance.network_port,
6677 "hv_instance": instance.hvparams,
6678 "hv_actual": cluster.FillHV(instance),
6679 "be_instance": instance.beparams,
6680 "be_actual": cluster.FillBE(instance),
6683 result[instance.name] = idict
6688 class LUSetInstanceParams(LogicalUnit):
6689 """Modifies an instances's parameters.
6692 HPATH = "instance-modify"
6693 HTYPE = constants.HTYPE_INSTANCE
6694 _OP_REQP = ["instance_name"]
6697 def CheckArguments(self):
6698 if not hasattr(self.op, 'nics'):
6700 if not hasattr(self.op, 'disks'):
6702 if not hasattr(self.op, 'beparams'):
6703 self.op.beparams = {}
6704 if not hasattr(self.op, 'hvparams'):
6705 self.op.hvparams = {}
6706 self.op.force = getattr(self.op, "force", False)
6707 if not (self.op.nics or self.op.disks or
6708 self.op.hvparams or self.op.beparams):
6709 raise errors.OpPrereqError("No changes submitted")
6713 for disk_op, disk_dict in self.op.disks:
6714 if disk_op == constants.DDM_REMOVE:
6717 elif disk_op == constants.DDM_ADD:
6720 if not isinstance(disk_op, int):
6721 raise errors.OpPrereqError("Invalid disk index")
6722 if not isinstance(disk_dict, dict):
6723 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
6724 raise errors.OpPrereqError(msg)
6726 if disk_op == constants.DDM_ADD:
6727 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
6728 if mode not in constants.DISK_ACCESS_SET:
6729 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
6730 size = disk_dict.get('size', None)
6732 raise errors.OpPrereqError("Required disk parameter size missing")
6735 except ValueError, err:
6736 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
6738 disk_dict['size'] = size
6740 # modification of disk
6741 if 'size' in disk_dict:
6742 raise errors.OpPrereqError("Disk size change not possible, use"
6745 if disk_addremove > 1:
6746 raise errors.OpPrereqError("Only one disk add or remove operation"
6747 " supported at a time")
6751 for nic_op, nic_dict in self.op.nics:
6752 if nic_op == constants.DDM_REMOVE:
6755 elif nic_op == constants.DDM_ADD:
6758 if not isinstance(nic_op, int):
6759 raise errors.OpPrereqError("Invalid nic index")
6760 if not isinstance(nic_dict, dict):
6761 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
6762 raise errors.OpPrereqError(msg)
6764 # nic_dict should be a dict
6765 nic_ip = nic_dict.get('ip', None)
6766 if nic_ip is not None:
6767 if nic_ip.lower() == constants.VALUE_NONE:
6768 nic_dict['ip'] = None
6770 if not utils.IsValidIP(nic_ip):
6771 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
6773 nic_bridge = nic_dict.get('bridge', None)
6774 nic_link = nic_dict.get('link', None)
6775 if nic_bridge and nic_link:
6776 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
6777 " at the same time")
6778 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
6779 nic_dict['bridge'] = None
6780 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
6781 nic_dict['link'] = None
6783 if nic_op == constants.DDM_ADD:
6784 nic_mac = nic_dict.get('mac', None)
6786 nic_dict['mac'] = constants.VALUE_AUTO
6788 if 'mac' in nic_dict:
6789 nic_mac = nic_dict['mac']
6790 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6791 if not utils.IsValidMac(nic_mac):
6792 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6793 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6794 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6795 " modifying an existing nic")
6797 if nic_addremove > 1:
6798 raise errors.OpPrereqError("Only one NIC add or remove operation"
6799 " supported at a time")
6801 def ExpandNames(self):
6802 self._ExpandAndLockInstance()
6803 self.needed_locks[locking.LEVEL_NODE] = []
6804 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6806 def DeclareLocks(self, level):
6807 if level == locking.LEVEL_NODE:
6808 self._LockInstancesNodes()
6810 def BuildHooksEnv(self):
6813 This runs on the master, primary and secondaries.
6817 if constants.BE_MEMORY in self.be_new:
6818 args['memory'] = self.be_new[constants.BE_MEMORY]
6819 if constants.BE_VCPUS in self.be_new:
6820 args['vcpus'] = self.be_new[constants.BE_VCPUS]
6821 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6822 # information at all.
6825 nic_override = dict(self.op.nics)
6826 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6827 for idx, nic in enumerate(self.instance.nics):
6828 if idx in nic_override:
6829 this_nic_override = nic_override[idx]
6831 this_nic_override = {}
6832 if 'ip' in this_nic_override:
6833 ip = this_nic_override['ip']
6836 if 'mac' in this_nic_override:
6837 mac = this_nic_override['mac']
6840 if idx in self.nic_pnew:
6841 nicparams = self.nic_pnew[idx]
6843 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6844 mode = nicparams[constants.NIC_MODE]
6845 link = nicparams[constants.NIC_LINK]
6846 args['nics'].append((ip, mac, mode, link))
6847 if constants.DDM_ADD in nic_override:
6848 ip = nic_override[constants.DDM_ADD].get('ip', None)
6849 mac = nic_override[constants.DDM_ADD]['mac']
6850 nicparams = self.nic_pnew[constants.DDM_ADD]
6851 mode = nicparams[constants.NIC_MODE]
6852 link = nicparams[constants.NIC_LINK]
6853 args['nics'].append((ip, mac, mode, link))
6854 elif constants.DDM_REMOVE in nic_override:
6855 del args['nics'][-1]
6857 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6858 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6861 def _GetUpdatedParams(self, old_params, update_dict,
6862 default_values, parameter_types):
6863 """Return the new params dict for the given params.
6865 @type old_params: dict
6866 @param old_params: old parameters
6867 @type update_dict: dict
6868 @param update_dict: dict containing new parameter values,
6869 or constants.VALUE_DEFAULT to reset the
6870 parameter to its default value
6871 @type default_values: dict
6872 @param default_values: default values for the filled parameters
6873 @type parameter_types: dict
6874 @param parameter_types: dict mapping target dict keys to types
6875 in constants.ENFORCEABLE_TYPES
6876 @rtype: (dict, dict)
6877 @return: (new_parameters, filled_parameters)
6880 params_copy = copy.deepcopy(old_params)
6881 for key, val in update_dict.iteritems():
6882 if val == constants.VALUE_DEFAULT:
6884 del params_copy[key]
6888 params_copy[key] = val
6889 utils.ForceDictType(params_copy, parameter_types)
6890 params_filled = objects.FillDict(default_values, params_copy)
6891 return (params_copy, params_filled)
6893 def CheckPrereq(self):
6894 """Check prerequisites.
6896 This only checks the instance list against the existing names.
6899 self.force = self.op.force
6901 # checking the new params on the primary/secondary nodes
6903 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6904 cluster = self.cluster = self.cfg.GetClusterInfo()
6905 assert self.instance is not None, \
6906 "Cannot retrieve locked instance %s" % self.op.instance_name
6907 pnode = instance.primary_node
6908 nodelist = list(instance.all_nodes)
6910 # hvparams processing
6911 if self.op.hvparams:
6912 i_hvdict, hv_new = self._GetUpdatedParams(
6913 instance.hvparams, self.op.hvparams,
6914 cluster.hvparams[instance.hypervisor],
6915 constants.HVS_PARAMETER_TYPES)
6917 hypervisor.GetHypervisor(
6918 instance.hypervisor).CheckParameterSyntax(hv_new)
6919 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6920 self.hv_new = hv_new # the new actual values
6921 self.hv_inst = i_hvdict # the new dict (without defaults)
6923 self.hv_new = self.hv_inst = {}
6925 # beparams processing
6926 if self.op.beparams:
6927 i_bedict, be_new = self._GetUpdatedParams(
6928 instance.beparams, self.op.beparams,
6929 cluster.beparams[constants.PP_DEFAULT],
6930 constants.BES_PARAMETER_TYPES)
6931 self.be_new = be_new # the new actual values
6932 self.be_inst = i_bedict # the new dict (without defaults)
6934 self.be_new = self.be_inst = {}
6938 if constants.BE_MEMORY in self.op.beparams and not self.force:
6939 mem_check_list = [pnode]
6940 if be_new[constants.BE_AUTO_BALANCE]:
6941 # either we changed auto_balance to yes or it was from before
6942 mem_check_list.extend(instance.secondary_nodes)
6943 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6944 instance.hypervisor)
6945 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6946 instance.hypervisor)
6947 pninfo = nodeinfo[pnode]
6948 msg = pninfo.fail_msg
6950 # Assume the primary node is unreachable and go ahead
6951 self.warn.append("Can't get info from primary node %s: %s" %
6953 elif not isinstance(pninfo.payload.get('memory_free', None), int):
6954 self.warn.append("Node data from primary node %s doesn't contain"
6955 " free memory information" % pnode)
6956 elif instance_info.fail_msg:
6957 self.warn.append("Can't get instance runtime information: %s" %
6958 instance_info.fail_msg)
6960 if instance_info.payload:
6961 current_mem = int(instance_info.payload['memory'])
6963 # Assume instance not running
6964 # (there is a slight race condition here, but it's not very probable,
6965 # and we have no other way to check)
6967 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6968 pninfo.payload['memory_free'])
6970 raise errors.OpPrereqError("This change will prevent the instance"
6971 " from starting, due to %d MB of memory"
6972 " missing on its primary node" % miss_mem)
6974 if be_new[constants.BE_AUTO_BALANCE]:
6975 for node, nres in nodeinfo.items():
6976 if node not in instance.secondary_nodes:
6980 self.warn.append("Can't get info from secondary node %s: %s" %
6982 elif not isinstance(nres.payload.get('memory_free', None), int):
6983 self.warn.append("Secondary node %s didn't return free"
6984 " memory information" % node)
6985 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6986 self.warn.append("Not enough memory to failover instance to"
6987 " secondary node %s" % node)
6992 for nic_op, nic_dict in self.op.nics:
6993 if nic_op == constants.DDM_REMOVE:
6994 if not instance.nics:
6995 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6997 if nic_op != constants.DDM_ADD:
6999 if nic_op < 0 or nic_op >= len(instance.nics):
7000 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7002 (nic_op, len(instance.nics)))
7003 old_nic_params = instance.nics[nic_op].nicparams
7004 old_nic_ip = instance.nics[nic_op].ip
7009 update_params_dict = dict([(key, nic_dict[key])
7010 for key in constants.NICS_PARAMETERS
7011 if key in nic_dict])
7013 if 'bridge' in nic_dict:
7014 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7016 new_nic_params, new_filled_nic_params = \
7017 self._GetUpdatedParams(old_nic_params, update_params_dict,
7018 cluster.nicparams[constants.PP_DEFAULT],
7019 constants.NICS_PARAMETER_TYPES)
7020 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7021 self.nic_pinst[nic_op] = new_nic_params
7022 self.nic_pnew[nic_op] = new_filled_nic_params
7023 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7025 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7026 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7027 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7029 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7031 self.warn.append(msg)
7033 raise errors.OpPrereqError(msg)
7034 if new_nic_mode == constants.NIC_MODE_ROUTED:
7035 if 'ip' in nic_dict:
7036 nic_ip = nic_dict['ip']
7040 raise errors.OpPrereqError('Cannot set the nic ip to None'
7042 if 'mac' in nic_dict:
7043 nic_mac = nic_dict['mac']
7045 raise errors.OpPrereqError('Cannot set the nic mac to None')
7046 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7047 # otherwise generate the mac
7048 nic_dict['mac'] = self.cfg.GenerateMAC()
7050 # or validate/reserve the current one
7051 if self.cfg.IsMacInUse(nic_mac):
7052 raise errors.OpPrereqError("MAC address %s already in use"
7053 " in cluster" % nic_mac)
7056 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7057 raise errors.OpPrereqError("Disk operations not supported for"
7058 " diskless instances")
7059 for disk_op, disk_dict in self.op.disks:
7060 if disk_op == constants.DDM_REMOVE:
7061 if len(instance.disks) == 1:
7062 raise errors.OpPrereqError("Cannot remove the last disk of"
7064 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7065 ins_l = ins_l[pnode]
7066 msg = ins_l.fail_msg
7068 raise errors.OpPrereqError("Can't contact node %s: %s" %
7070 if instance.name in ins_l.payload:
7071 raise errors.OpPrereqError("Instance is running, can't remove"
7074 if (disk_op == constants.DDM_ADD and
7075 len(instance.nics) >= constants.MAX_DISKS):
7076 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7077 " add more" % constants.MAX_DISKS)
7078 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7080 if disk_op < 0 or disk_op >= len(instance.disks):
7081 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7083 (disk_op, len(instance.disks)))
7087 def Exec(self, feedback_fn):
7088 """Modifies an instance.
7090 All parameters take effect only at the next restart of the instance.
7093 # Process here the warnings from CheckPrereq, as we don't have a
7094 # feedback_fn there.
7095 for warn in self.warn:
7096 feedback_fn("WARNING: %s" % warn)
7099 instance = self.instance
7100 cluster = self.cluster
7102 for disk_op, disk_dict in self.op.disks:
7103 if disk_op == constants.DDM_REMOVE:
7104 # remove the last disk
7105 device = instance.disks.pop()
7106 device_idx = len(instance.disks)
7107 for node, disk in device.ComputeNodeTree(instance.primary_node):
7108 self.cfg.SetDiskID(disk, node)
7109 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7111 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7112 " continuing anyway", device_idx, node, msg)
7113 result.append(("disk/%d" % device_idx, "remove"))
7114 elif disk_op == constants.DDM_ADD:
7116 if instance.disk_template == constants.DT_FILE:
7117 file_driver, file_path = instance.disks[0].logical_id
7118 file_path = os.path.dirname(file_path)
7120 file_driver = file_path = None
7121 disk_idx_base = len(instance.disks)
7122 new_disk = _GenerateDiskTemplate(self,
7123 instance.disk_template,
7124 instance.name, instance.primary_node,
7125 instance.secondary_nodes,
7130 instance.disks.append(new_disk)
7131 info = _GetInstanceInfoText(instance)
7133 logging.info("Creating volume %s for instance %s",
7134 new_disk.iv_name, instance.name)
7135 # Note: this needs to be kept in sync with _CreateDisks
7137 for node in instance.all_nodes:
7138 f_create = node == instance.primary_node
7140 _CreateBlockDev(self, node, instance, new_disk,
7141 f_create, info, f_create)
7142 except errors.OpExecError, err:
7143 self.LogWarning("Failed to create volume %s (%s) on"
7145 new_disk.iv_name, new_disk, node, err)
7146 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7147 (new_disk.size, new_disk.mode)))
7149 # change a given disk
7150 instance.disks[disk_op].mode = disk_dict['mode']
7151 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7153 for nic_op, nic_dict in self.op.nics:
7154 if nic_op == constants.DDM_REMOVE:
7155 # remove the last nic
7156 del instance.nics[-1]
7157 result.append(("nic.%d" % len(instance.nics), "remove"))
7158 elif nic_op == constants.DDM_ADD:
7159 # mac and bridge should be set, by now
7160 mac = nic_dict['mac']
7161 ip = nic_dict.get('ip', None)
7162 nicparams = self.nic_pinst[constants.DDM_ADD]
7163 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7164 instance.nics.append(new_nic)
7165 result.append(("nic.%d" % (len(instance.nics) - 1),
7166 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7167 (new_nic.mac, new_nic.ip,
7168 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7169 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7172 for key in 'mac', 'ip':
7174 setattr(instance.nics[nic_op], key, nic_dict[key])
7175 if nic_op in self.nic_pnew:
7176 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
7177 for key, val in nic_dict.iteritems():
7178 result.append(("nic.%s/%d" % (key, nic_op), val))
7181 if self.op.hvparams:
7182 instance.hvparams = self.hv_inst
7183 for key, val in self.op.hvparams.iteritems():
7184 result.append(("hv/%s" % key, val))
7187 if self.op.beparams:
7188 instance.beparams = self.be_inst
7189 for key, val in self.op.beparams.iteritems():
7190 result.append(("be/%s" % key, val))
7192 self.cfg.Update(instance)
7197 class LUQueryExports(NoHooksLU):
7198 """Query the exports list
7201 _OP_REQP = ['nodes']
7204 def ExpandNames(self):
7205 self.needed_locks = {}
7206 self.share_locks[locking.LEVEL_NODE] = 1
7207 if not self.op.nodes:
7208 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7210 self.needed_locks[locking.LEVEL_NODE] = \
7211 _GetWantedNodes(self, self.op.nodes)
7213 def CheckPrereq(self):
7214 """Check prerequisites.
7217 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7219 def Exec(self, feedback_fn):
7220 """Compute the list of all the exported system images.
7223 @return: a dictionary with the structure node->(export-list)
7224 where export-list is a list of the instances exported on
7228 rpcresult = self.rpc.call_export_list(self.nodes)
7230 for node in rpcresult:
7231 if rpcresult[node].fail_msg:
7232 result[node] = False
7234 result[node] = rpcresult[node].payload
7239 class LUExportInstance(LogicalUnit):
7240 """Export an instance to an image in the cluster.
7243 HPATH = "instance-export"
7244 HTYPE = constants.HTYPE_INSTANCE
7245 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7248 def ExpandNames(self):
7249 self._ExpandAndLockInstance()
7250 # FIXME: lock only instance primary and destination node
7252 # Sad but true, for now we have do lock all nodes, as we don't know where
7253 # the previous export might be, and and in this LU we search for it and
7254 # remove it from its current node. In the future we could fix this by:
7255 # - making a tasklet to search (share-lock all), then create the new one,
7256 # then one to remove, after
7257 # - removing the removal operation altogether
7258 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7260 def DeclareLocks(self, level):
7261 """Last minute lock declaration."""
7262 # All nodes are locked anyway, so nothing to do here.
7264 def BuildHooksEnv(self):
7267 This will run on the master, primary node and target node.
7271 "EXPORT_NODE": self.op.target_node,
7272 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7274 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7275 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7276 self.op.target_node]
7279 def CheckPrereq(self):
7280 """Check prerequisites.
7282 This checks that the instance and node names are valid.
7285 instance_name = self.op.instance_name
7286 self.instance = self.cfg.GetInstanceInfo(instance_name)
7287 assert self.instance is not None, \
7288 "Cannot retrieve locked instance %s" % self.op.instance_name
7289 _CheckNodeOnline(self, self.instance.primary_node)
7291 self.dst_node = self.cfg.GetNodeInfo(
7292 self.cfg.ExpandNodeName(self.op.target_node))
7294 if self.dst_node is None:
7295 # This is wrong node name, not a non-locked node
7296 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
7297 _CheckNodeOnline(self, self.dst_node.name)
7298 _CheckNodeNotDrained(self, self.dst_node.name)
7300 # instance disk type verification
7301 for disk in self.instance.disks:
7302 if disk.dev_type == constants.LD_FILE:
7303 raise errors.OpPrereqError("Export not supported for instances with"
7304 " file-based disks")
7306 def Exec(self, feedback_fn):
7307 """Export an instance to an image in the cluster.
7310 instance = self.instance
7311 dst_node = self.dst_node
7312 src_node = instance.primary_node
7313 if self.op.shutdown:
7314 # shutdown the instance, but not the disks
7315 result = self.rpc.call_instance_shutdown(src_node, instance)
7316 result.Raise("Could not shutdown instance %s on"
7317 " node %s" % (instance.name, src_node))
7319 vgname = self.cfg.GetVGName()
7323 # set the disks ID correctly since call_instance_start needs the
7324 # correct drbd minor to create the symlinks
7325 for disk in instance.disks:
7326 self.cfg.SetDiskID(disk, src_node)
7331 for idx, disk in enumerate(instance.disks):
7332 # result.payload will be a snapshot of an lvm leaf of the one we passed
7333 result = self.rpc.call_blockdev_snapshot(src_node, disk)
7334 msg = result.fail_msg
7336 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7338 snap_disks.append(False)
7340 disk_id = (vgname, result.payload)
7341 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7342 logical_id=disk_id, physical_id=disk_id,
7343 iv_name=disk.iv_name)
7344 snap_disks.append(new_dev)
7347 if self.op.shutdown and instance.admin_up:
7348 result = self.rpc.call_instance_start(src_node, instance, None, None)
7349 msg = result.fail_msg
7351 _ShutdownInstanceDisks(self, instance)
7352 raise errors.OpExecError("Could not start instance: %s" % msg)
7354 # TODO: check for size
7356 cluster_name = self.cfg.GetClusterName()
7357 for idx, dev in enumerate(snap_disks):
7359 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7360 instance, cluster_name, idx)
7361 msg = result.fail_msg
7363 self.LogWarning("Could not export disk/%s from node %s to"
7364 " node %s: %s", idx, src_node, dst_node.name, msg)
7365 dresults.append(False)
7367 dresults.append(True)
7368 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7370 self.LogWarning("Could not remove snapshot for disk/%d from node"
7371 " %s: %s", idx, src_node, msg)
7373 dresults.append(False)
7375 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7377 msg = result.fail_msg
7379 self.LogWarning("Could not finalize export for instance %s"
7380 " on node %s: %s", instance.name, dst_node.name, msg)
7383 nodelist = self.cfg.GetNodeList()
7384 nodelist.remove(dst_node.name)
7386 # on one-node clusters nodelist will be empty after the removal
7387 # if we proceed the backup would be removed because OpQueryExports
7388 # substitutes an empty list with the full cluster node list.
7389 iname = instance.name
7391 exportlist = self.rpc.call_export_list(nodelist)
7392 for node in exportlist:
7393 if exportlist[node].fail_msg:
7395 if iname in exportlist[node].payload:
7396 msg = self.rpc.call_export_remove(node, iname).fail_msg
7398 self.LogWarning("Could not remove older export for instance %s"
7399 " on node %s: %s", iname, node, msg)
7400 return fin_resu, dresults
7403 class LURemoveExport(NoHooksLU):
7404 """Remove exports related to the named instance.
7407 _OP_REQP = ["instance_name"]
7410 def ExpandNames(self):
7411 self.needed_locks = {}
7412 # We need all nodes to be locked in order for RemoveExport to work, but we
7413 # don't need to lock the instance itself, as nothing will happen to it (and
7414 # we can remove exports also for a removed instance)
7415 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7417 def CheckPrereq(self):
7418 """Check prerequisites.
7422 def Exec(self, feedback_fn):
7423 """Remove any export.
7426 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7427 # If the instance was not found we'll try with the name that was passed in.
7428 # This will only work if it was an FQDN, though.
7430 if not instance_name:
7432 instance_name = self.op.instance_name
7434 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7435 exportlist = self.rpc.call_export_list(locked_nodes)
7437 for node in exportlist:
7438 msg = exportlist[node].fail_msg
7440 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7442 if instance_name in exportlist[node].payload:
7444 result = self.rpc.call_export_remove(node, instance_name)
7445 msg = result.fail_msg
7447 logging.error("Could not remove export for instance %s"
7448 " on node %s: %s", instance_name, node, msg)
7450 if fqdn_warn and not found:
7451 feedback_fn("Export not found. If trying to remove an export belonging"
7452 " to a deleted instance please use its Fully Qualified"
7456 class TagsLU(NoHooksLU):
7459 This is an abstract class which is the parent of all the other tags LUs.
7463 def ExpandNames(self):
7464 self.needed_locks = {}
7465 if self.op.kind == constants.TAG_NODE:
7466 name = self.cfg.ExpandNodeName(self.op.name)
7468 raise errors.OpPrereqError("Invalid node name (%s)" %
7471 self.needed_locks[locking.LEVEL_NODE] = name
7472 elif self.op.kind == constants.TAG_INSTANCE:
7473 name = self.cfg.ExpandInstanceName(self.op.name)
7475 raise errors.OpPrereqError("Invalid instance name (%s)" %
7478 self.needed_locks[locking.LEVEL_INSTANCE] = name
7480 def CheckPrereq(self):
7481 """Check prerequisites.
7484 if self.op.kind == constants.TAG_CLUSTER:
7485 self.target = self.cfg.GetClusterInfo()
7486 elif self.op.kind == constants.TAG_NODE:
7487 self.target = self.cfg.GetNodeInfo(self.op.name)
7488 elif self.op.kind == constants.TAG_INSTANCE:
7489 self.target = self.cfg.GetInstanceInfo(self.op.name)
7491 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7495 class LUGetTags(TagsLU):
7496 """Returns the tags of a given object.
7499 _OP_REQP = ["kind", "name"]
7502 def Exec(self, feedback_fn):
7503 """Returns the tag list.
7506 return list(self.target.GetTags())
7509 class LUSearchTags(NoHooksLU):
7510 """Searches the tags for a given pattern.
7513 _OP_REQP = ["pattern"]
7516 def ExpandNames(self):
7517 self.needed_locks = {}
7519 def CheckPrereq(self):
7520 """Check prerequisites.
7522 This checks the pattern passed for validity by compiling it.
7526 self.re = re.compile(self.op.pattern)
7527 except re.error, err:
7528 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
7529 (self.op.pattern, err))
7531 def Exec(self, feedback_fn):
7532 """Returns the tag list.
7536 tgts = [("/cluster", cfg.GetClusterInfo())]
7537 ilist = cfg.GetAllInstancesInfo().values()
7538 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7539 nlist = cfg.GetAllNodesInfo().values()
7540 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7542 for path, target in tgts:
7543 for tag in target.GetTags():
7544 if self.re.search(tag):
7545 results.append((path, tag))
7549 class LUAddTags(TagsLU):
7550 """Sets a tag on a given object.
7553 _OP_REQP = ["kind", "name", "tags"]
7556 def CheckPrereq(self):
7557 """Check prerequisites.
7559 This checks the type and length of the tag name and value.
7562 TagsLU.CheckPrereq(self)
7563 for tag in self.op.tags:
7564 objects.TaggableObject.ValidateTag(tag)
7566 def Exec(self, feedback_fn):
7571 for tag in self.op.tags:
7572 self.target.AddTag(tag)
7573 except errors.TagError, err:
7574 raise errors.OpExecError("Error while setting tag: %s" % str(err))
7576 self.cfg.Update(self.target)
7577 except errors.ConfigurationError:
7578 raise errors.OpRetryError("There has been a modification to the"
7579 " config file and the operation has been"
7580 " aborted. Please retry.")
7583 class LUDelTags(TagsLU):
7584 """Delete a list of tags from a given object.
7587 _OP_REQP = ["kind", "name", "tags"]
7590 def CheckPrereq(self):
7591 """Check prerequisites.
7593 This checks that we have the given tag.
7596 TagsLU.CheckPrereq(self)
7597 for tag in self.op.tags:
7598 objects.TaggableObject.ValidateTag(tag)
7599 del_tags = frozenset(self.op.tags)
7600 cur_tags = self.target.GetTags()
7601 if not del_tags <= cur_tags:
7602 diff_tags = del_tags - cur_tags
7603 diff_names = ["'%s'" % tag for tag in diff_tags]
7605 raise errors.OpPrereqError("Tag(s) %s not found" %
7606 (",".join(diff_names)))
7608 def Exec(self, feedback_fn):
7609 """Remove the tag from the object.
7612 for tag in self.op.tags:
7613 self.target.RemoveTag(tag)
7615 self.cfg.Update(self.target)
7616 except errors.ConfigurationError:
7617 raise errors.OpRetryError("There has been a modification to the"
7618 " config file and the operation has been"
7619 " aborted. Please retry.")
7622 class LUTestDelay(NoHooksLU):
7623 """Sleep for a specified amount of time.
7625 This LU sleeps on the master and/or nodes for a specified amount of
7629 _OP_REQP = ["duration", "on_master", "on_nodes"]
7632 def ExpandNames(self):
7633 """Expand names and set required locks.
7635 This expands the node list, if any.
7638 self.needed_locks = {}
7639 if self.op.on_nodes:
7640 # _GetWantedNodes can be used here, but is not always appropriate to use
7641 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
7643 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
7644 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
7646 def CheckPrereq(self):
7647 """Check prerequisites.
7651 def Exec(self, feedback_fn):
7652 """Do the actual sleep.
7655 if self.op.on_master:
7656 if not utils.TestDelay(self.op.duration):
7657 raise errors.OpExecError("Error during master delay test")
7658 if self.op.on_nodes:
7659 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
7660 for node, node_result in result.items():
7661 node_result.Raise("Failure during rpc call to node %s" % node)
7664 class IAllocator(object):
7665 """IAllocator framework.
7667 An IAllocator instance has three sets of attributes:
7668 - cfg that is needed to query the cluster
7669 - input data (all members of the _KEYS class attribute are required)
7670 - four buffer attributes (in|out_data|text), that represent the
7671 input (to the external script) in text and data structure format,
7672 and the output from it, again in two formats
7673 - the result variables from the script (success, info, nodes) for
7678 "mem_size", "disks", "disk_template",
7679 "os", "tags", "nics", "vcpus", "hypervisor",
7685 def __init__(self, cfg, rpc, mode, name, **kwargs):
7688 # init buffer variables
7689 self.in_text = self.out_text = self.in_data = self.out_data = None
7690 # init all input fields so that pylint is happy
7693 self.mem_size = self.disks = self.disk_template = None
7694 self.os = self.tags = self.nics = self.vcpus = None
7695 self.hypervisor = None
7696 self.relocate_from = None
7698 self.required_nodes = None
7699 # init result fields
7700 self.success = self.info = self.nodes = None
7701 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7702 keyset = self._ALLO_KEYS
7703 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7704 keyset = self._RELO_KEYS
7706 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
7707 " IAllocator" % self.mode)
7709 if key not in keyset:
7710 raise errors.ProgrammerError("Invalid input parameter '%s' to"
7711 " IAllocator" % key)
7712 setattr(self, key, kwargs[key])
7714 if key not in kwargs:
7715 raise errors.ProgrammerError("Missing input parameter '%s' to"
7716 " IAllocator" % key)
7717 self._BuildInputData()
7719 def _ComputeClusterData(self):
7720 """Compute the generic allocator input data.
7722 This is the data that is independent of the actual operation.
7726 cluster_info = cfg.GetClusterInfo()
7729 "version": constants.IALLOCATOR_VERSION,
7730 "cluster_name": cfg.GetClusterName(),
7731 "cluster_tags": list(cluster_info.GetTags()),
7732 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
7733 # we don't have job IDs
7735 iinfo = cfg.GetAllInstancesInfo().values()
7736 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
7740 node_list = cfg.GetNodeList()
7742 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7743 hypervisor_name = self.hypervisor
7744 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7745 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
7747 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
7750 self.rpc.call_all_instances_info(node_list,
7751 cluster_info.enabled_hypervisors)
7752 for nname, nresult in node_data.items():
7753 # first fill in static (config-based) values
7754 ninfo = cfg.GetNodeInfo(nname)
7756 "tags": list(ninfo.GetTags()),
7757 "primary_ip": ninfo.primary_ip,
7758 "secondary_ip": ninfo.secondary_ip,
7759 "offline": ninfo.offline,
7760 "drained": ninfo.drained,
7761 "master_candidate": ninfo.master_candidate,
7764 if not (ninfo.offline or ninfo.drained):
7765 nresult.Raise("Can't get data for node %s" % nname)
7766 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
7768 remote_info = nresult.payload
7770 for attr in ['memory_total', 'memory_free', 'memory_dom0',
7771 'vg_size', 'vg_free', 'cpu_total']:
7772 if attr not in remote_info:
7773 raise errors.OpExecError("Node '%s' didn't return attribute"
7774 " '%s'" % (nname, attr))
7775 if not isinstance(remote_info[attr], int):
7776 raise errors.OpExecError("Node '%s' returned invalid value"
7778 (nname, attr, remote_info[attr]))
7779 # compute memory used by primary instances
7780 i_p_mem = i_p_up_mem = 0
7781 for iinfo, beinfo in i_list:
7782 if iinfo.primary_node == nname:
7783 i_p_mem += beinfo[constants.BE_MEMORY]
7784 if iinfo.name not in node_iinfo[nname].payload:
7787 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
7788 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
7789 remote_info['memory_free'] -= max(0, i_mem_diff)
7792 i_p_up_mem += beinfo[constants.BE_MEMORY]
7794 # compute memory used by instances
7796 "total_memory": remote_info['memory_total'],
7797 "reserved_memory": remote_info['memory_dom0'],
7798 "free_memory": remote_info['memory_free'],
7799 "total_disk": remote_info['vg_size'],
7800 "free_disk": remote_info['vg_free'],
7801 "total_cpus": remote_info['cpu_total'],
7802 "i_pri_memory": i_p_mem,
7803 "i_pri_up_memory": i_p_up_mem,
7807 node_results[nname] = pnr
7808 data["nodes"] = node_results
7812 for iinfo, beinfo in i_list:
7814 for nic in iinfo.nics:
7815 filled_params = objects.FillDict(
7816 cluster_info.nicparams[constants.PP_DEFAULT],
7818 nic_dict = {"mac": nic.mac,
7820 "mode": filled_params[constants.NIC_MODE],
7821 "link": filled_params[constants.NIC_LINK],
7823 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7824 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7825 nic_data.append(nic_dict)
7827 "tags": list(iinfo.GetTags()),
7828 "admin_up": iinfo.admin_up,
7829 "vcpus": beinfo[constants.BE_VCPUS],
7830 "memory": beinfo[constants.BE_MEMORY],
7832 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7834 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7835 "disk_template": iinfo.disk_template,
7836 "hypervisor": iinfo.hypervisor,
7838 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7840 instance_data[iinfo.name] = pir
7842 data["instances"] = instance_data
7846 def _AddNewInstance(self):
7847 """Add new instance data to allocator structure.
7849 This in combination with _AllocatorGetClusterData will create the
7850 correct structure needed as input for the allocator.
7852 The checks for the completeness of the opcode must have already been
7858 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7860 if self.disk_template in constants.DTS_NET_MIRROR:
7861 self.required_nodes = 2
7863 self.required_nodes = 1
7867 "disk_template": self.disk_template,
7870 "vcpus": self.vcpus,
7871 "memory": self.mem_size,
7872 "disks": self.disks,
7873 "disk_space_total": disk_space,
7875 "required_nodes": self.required_nodes,
7877 data["request"] = request
7879 def _AddRelocateInstance(self):
7880 """Add relocate instance data to allocator structure.
7882 This in combination with _IAllocatorGetClusterData will create the
7883 correct structure needed as input for the allocator.
7885 The checks for the completeness of the opcode must have already been
7889 instance = self.cfg.GetInstanceInfo(self.name)
7890 if instance is None:
7891 raise errors.ProgrammerError("Unknown instance '%s' passed to"
7892 " IAllocator" % self.name)
7894 if instance.disk_template not in constants.DTS_NET_MIRROR:
7895 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7897 if len(instance.secondary_nodes) != 1:
7898 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7900 self.required_nodes = 1
7901 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7902 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7907 "disk_space_total": disk_space,
7908 "required_nodes": self.required_nodes,
7909 "relocate_from": self.relocate_from,
7911 self.in_data["request"] = request
7913 def _BuildInputData(self):
7914 """Build input data structures.
7917 self._ComputeClusterData()
7919 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7920 self._AddNewInstance()
7922 self._AddRelocateInstance()
7924 self.in_text = serializer.Dump(self.in_data)
7926 def Run(self, name, validate=True, call_fn=None):
7927 """Run an instance allocator and return the results.
7931 call_fn = self.rpc.call_iallocator_runner
7933 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
7934 result.Raise("Failure while running the iallocator script")
7936 self.out_text = result.payload
7938 self._ValidateResult()
7940 def _ValidateResult(self):
7941 """Process the allocator results.
7943 This will process and if successful save the result in
7944 self.out_data and the other parameters.
7948 rdict = serializer.Load(self.out_text)
7949 except Exception, err:
7950 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7952 if not isinstance(rdict, dict):
7953 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7955 for key in "success", "info", "nodes":
7956 if key not in rdict:
7957 raise errors.OpExecError("Can't parse iallocator results:"
7958 " missing key '%s'" % key)
7959 setattr(self, key, rdict[key])
7961 if not isinstance(rdict["nodes"], list):
7962 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7964 self.out_data = rdict
7967 class LUTestAllocator(NoHooksLU):
7968 """Run allocator tests.
7970 This LU runs the allocator tests
7973 _OP_REQP = ["direction", "mode", "name"]
7975 def CheckPrereq(self):
7976 """Check prerequisites.
7978 This checks the opcode parameters depending on the director and mode test.
7981 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7982 for attr in ["name", "mem_size", "disks", "disk_template",
7983 "os", "tags", "nics", "vcpus"]:
7984 if not hasattr(self.op, attr):
7985 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7987 iname = self.cfg.ExpandInstanceName(self.op.name)
7988 if iname is not None:
7989 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7991 if not isinstance(self.op.nics, list):
7992 raise errors.OpPrereqError("Invalid parameter 'nics'")
7993 for row in self.op.nics:
7994 if (not isinstance(row, dict) or
7997 "bridge" not in row):
7998 raise errors.OpPrereqError("Invalid contents of the"
7999 " 'nics' parameter")
8000 if not isinstance(self.op.disks, list):
8001 raise errors.OpPrereqError("Invalid parameter 'disks'")
8002 for row in self.op.disks:
8003 if (not isinstance(row, dict) or
8004 "size" not in row or
8005 not isinstance(row["size"], int) or
8006 "mode" not in row or
8007 row["mode"] not in ['r', 'w']):
8008 raise errors.OpPrereqError("Invalid contents of the"
8009 " 'disks' parameter")
8010 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8011 self.op.hypervisor = self.cfg.GetHypervisorType()
8012 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8013 if not hasattr(self.op, "name"):
8014 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
8015 fname = self.cfg.ExpandInstanceName(self.op.name)
8017 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8019 self.op.name = fname
8020 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8022 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8025 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8026 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8027 raise errors.OpPrereqError("Missing allocator name")
8028 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8029 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8032 def Exec(self, feedback_fn):
8033 """Run the allocator test.
8036 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8037 ial = IAllocator(self.cfg, self.rpc,
8040 mem_size=self.op.mem_size,
8041 disks=self.op.disks,
8042 disk_template=self.op.disk_template,
8046 vcpus=self.op.vcpus,
8047 hypervisor=self.op.hypervisor,
8050 ial = IAllocator(self.cfg, self.rpc,
8053 relocate_from=list(self.relocate_from),
8056 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8057 result = ial.in_text
8059 ial.Run(self.op.allocator, validate=False)
8060 result = ial.out_text