4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
611 def _NICListToTuple(lu, nics):
612 """Build a list of nic information tuples.
614 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615 value in LUQueryInstanceData.
617 @type lu: L{LogicalUnit}
618 @param lu: the logical unit on whose behalf we execute
619 @type nics: list of L{objects.NIC}
620 @param nics: list of nics to convert to hooks tuples
624 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
628 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629 mode = filled_params[constants.NIC_MODE]
630 link = filled_params[constants.NIC_LINK]
631 hooks_nics.append((ip, mac, mode, link))
635 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636 """Builds instance related env variables for hooks from an object.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type instance: L{objects.Instance}
641 @param instance: the instance for which we should build the
644 @param override: dictionary with key/values that will override
647 @return: the hook environment dictionary
650 cluster = lu.cfg.GetClusterInfo()
651 bep = cluster.FillBE(instance)
652 hvp = cluster.FillHV(instance)
654 'name': instance.name,
655 'primary_node': instance.primary_node,
656 'secondary_nodes': instance.secondary_nodes,
657 'os_type': instance.os,
658 'status': instance.admin_up,
659 'memory': bep[constants.BE_MEMORY],
660 'vcpus': bep[constants.BE_VCPUS],
661 'nics': _NICListToTuple(lu, instance.nics),
662 'disk_template': instance.disk_template,
663 'disks': [(disk.size, disk.mode) for disk in instance.disks],
666 'hypervisor_name': instance.hypervisor,
669 args.update(override)
670 return _BuildInstanceHookEnv(**args)
673 def _AdjustCandidatePool(lu):
674 """Adjust the candidate pool after node operations.
677 mod_list = lu.cfg.MaintainCandidatePool()
679 lu.LogInfo("Promoted nodes to master candidate role: %s",
680 ", ".join(node.name for node in mod_list))
681 for name in mod_list:
682 lu.context.ReaddNode(name)
683 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
685 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
689 def _CheckNicsBridgesExist(lu, target_nics, target_node,
690 profile=constants.PP_DEFAULT):
691 """Check that the brigdes needed by a list of nics exist.
694 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696 for nic in target_nics]
697 brlist = [params[constants.NIC_LINK] for params in paramslist
698 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
700 result = lu.rpc.call_bridges_exist(target_node, brlist)
701 result.Raise("Error checking bridges on destination node '%s'" %
702 target_node, prereq=True)
705 def _CheckInstanceBridgesExist(lu, instance, node=None):
706 """Check that the brigdes needed by an instance exist.
710 node = instance.primary_node
711 _CheckNicsBridgesExist(lu, instance.nics, node)
714 def _GetNodeInstancesInner(cfg, fn):
715 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
718 def _GetNodeInstances(cfg, node_name):
719 """Returns a list of all primary and secondary instances on a node.
723 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
726 def _GetNodePrimaryInstances(cfg, node_name):
727 """Returns primary instances on a node.
730 return _GetNodeInstancesInner(cfg,
731 lambda inst: node_name == inst.primary_node)
734 def _GetNodeSecondaryInstances(cfg, node_name):
735 """Returns secondary instances on a node.
738 return _GetNodeInstancesInner(cfg,
739 lambda inst: node_name in inst.secondary_nodes)
742 def _GetStorageTypeArgs(cfg, storage_type):
743 """Returns the arguments for a storage type.
746 # Special case for file storage
747 if storage_type == constants.ST_FILE:
748 # storage.FileStorage wants a list of storage directories
749 return [[cfg.GetFileStorageDir()]]
754 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
757 for dev in instance.disks:
758 cfg.SetDiskID(dev, node_name)
760 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
761 result.Raise("Failed to get disk status from node %s" % node_name,
764 for idx, bdev_status in enumerate(result.payload):
765 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
771 class LUPostInitCluster(LogicalUnit):
772 """Logical unit for running hooks after cluster initialization.
775 HPATH = "cluster-init"
776 HTYPE = constants.HTYPE_CLUSTER
779 def BuildHooksEnv(self):
783 env = {"OP_TARGET": self.cfg.GetClusterName()}
784 mn = self.cfg.GetMasterNode()
787 def CheckPrereq(self):
788 """No prerequisites to check.
793 def Exec(self, feedback_fn):
800 class LUDestroyCluster(NoHooksLU):
801 """Logical unit for destroying the cluster.
806 def CheckPrereq(self):
807 """Check prerequisites.
809 This checks whether the cluster is empty.
811 Any errors are signaled by raising errors.OpPrereqError.
814 master = self.cfg.GetMasterNode()
816 nodelist = self.cfg.GetNodeList()
817 if len(nodelist) != 1 or nodelist[0] != master:
818 raise errors.OpPrereqError("There are still %d node(s) in"
819 " this cluster." % (len(nodelist) - 1))
820 instancelist = self.cfg.GetInstanceList()
822 raise errors.OpPrereqError("There are still %d instance(s) in"
823 " this cluster." % len(instancelist))
825 def Exec(self, feedback_fn):
826 """Destroys the cluster.
829 master = self.cfg.GetMasterNode()
830 result = self.rpc.call_node_stop_master(master, False)
831 result.Raise("Could not disable the master role")
832 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
833 utils.CreateBackup(priv_key)
834 utils.CreateBackup(pub_key)
838 class LUVerifyCluster(LogicalUnit):
839 """Verifies the cluster status.
842 HPATH = "cluster-verify"
843 HTYPE = constants.HTYPE_CLUSTER
844 _OP_REQP = ["skip_checks"]
847 def ExpandNames(self):
848 self.needed_locks = {
849 locking.LEVEL_NODE: locking.ALL_SET,
850 locking.LEVEL_INSTANCE: locking.ALL_SET,
852 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
854 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
855 node_result, feedback_fn, master_files,
857 """Run multiple tests against a node.
861 - compares ganeti version
862 - checks vg existence and size > 20G
863 - checks config file checksum
864 - checks ssh to other nodes
866 @type nodeinfo: L{objects.Node}
867 @param nodeinfo: the node to check
868 @param file_list: required list of files
869 @param local_cksum: dictionary of local files and their checksums
870 @param node_result: the results from the node
871 @param feedback_fn: function used to accumulate results
872 @param master_files: list of files that only masters should have
873 @param drbd_map: the useddrbd minors for this node, in
874 form of minor: (instance, must_exist) which correspond to instances
875 and their running status
876 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
881 # main result, node_result should be a non-empty dict
882 if not node_result or not isinstance(node_result, dict):
883 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
886 # compares ganeti version
887 local_version = constants.PROTOCOL_VERSION
888 remote_version = node_result.get('version', None)
889 if not (remote_version and isinstance(remote_version, (list, tuple)) and
890 len(remote_version) == 2):
891 feedback_fn(" - ERROR: connection to %s failed" % (node))
894 if local_version != remote_version[0]:
895 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
896 " node %s %s" % (local_version, node, remote_version[0]))
899 # node seems compatible, we can actually try to look into its results
903 # full package version
904 if constants.RELEASE_VERSION != remote_version[1]:
905 feedback_fn(" - WARNING: software version mismatch: master %s,"
907 (constants.RELEASE_VERSION, node, remote_version[1]))
909 # checks vg existence and size > 20G
910 if vg_name is not None:
911 vglist = node_result.get(constants.NV_VGLIST, None)
913 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
917 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
918 constants.MIN_VG_SIZE)
920 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
923 # checks config file checksum
925 remote_cksum = node_result.get(constants.NV_FILELIST, None)
926 if not isinstance(remote_cksum, dict):
928 feedback_fn(" - ERROR: node hasn't returned file checksum data")
930 for file_name in file_list:
931 node_is_mc = nodeinfo.master_candidate
932 must_have_file = file_name not in master_files
933 if file_name not in remote_cksum:
934 if node_is_mc or must_have_file:
936 feedback_fn(" - ERROR: file '%s' missing" % file_name)
937 elif remote_cksum[file_name] != local_cksum[file_name]:
938 if node_is_mc or must_have_file:
940 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
942 # not candidate and this is not a must-have file
944 feedback_fn(" - ERROR: file '%s' should not exist on non master"
945 " candidates (and the file is outdated)" % file_name)
947 # all good, except non-master/non-must have combination
948 if not node_is_mc and not must_have_file:
949 feedback_fn(" - ERROR: file '%s' should not exist on non master"
950 " candidates" % file_name)
954 if constants.NV_NODELIST not in node_result:
956 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
958 if node_result[constants.NV_NODELIST]:
960 for node in node_result[constants.NV_NODELIST]:
961 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
962 (node, node_result[constants.NV_NODELIST][node]))
964 if constants.NV_NODENETTEST not in node_result:
966 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
968 if node_result[constants.NV_NODENETTEST]:
970 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
972 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
973 (node, node_result[constants.NV_NODENETTEST][node]))
975 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
976 if isinstance(hyp_result, dict):
977 for hv_name, hv_result in hyp_result.iteritems():
978 if hv_result is not None:
979 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
980 (hv_name, hv_result))
982 # check used drbd list
983 if vg_name is not None:
984 used_minors = node_result.get(constants.NV_DRBDLIST, [])
985 if not isinstance(used_minors, (tuple, list)):
986 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
989 for minor, (iname, must_exist) in drbd_map.items():
990 if minor not in used_minors and must_exist:
991 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
992 " not active" % (minor, iname))
994 for minor in used_minors:
995 if minor not in drbd_map:
996 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
1002 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1003 node_instance, feedback_fn, n_offline):
1004 """Verify an instance.
1006 This function checks to see if the required block devices are
1007 available on the instance's node.
1012 node_current = instanceconfig.primary_node
1014 node_vol_should = {}
1015 instanceconfig.MapLVsByNode(node_vol_should)
1017 for node in node_vol_should:
1018 if node in n_offline:
1019 # ignore missing volumes on offline nodes
1021 for volume in node_vol_should[node]:
1022 if node not in node_vol_is or volume not in node_vol_is[node]:
1023 feedback_fn(" - ERROR: volume %s missing on node %s" %
1027 if instanceconfig.admin_up:
1028 if ((node_current not in node_instance or
1029 not instance in node_instance[node_current]) and
1030 node_current not in n_offline):
1031 feedback_fn(" - ERROR: instance %s not running on node %s" %
1032 (instance, node_current))
1035 for node in node_instance:
1036 if (not node == node_current):
1037 if instance in node_instance[node]:
1038 feedback_fn(" - ERROR: instance %s should not run on node %s" %
1044 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
1045 """Verify if there are any unknown volumes in the cluster.
1047 The .os, .swap and backup volumes are ignored. All other volumes are
1048 reported as unknown.
1053 for node in node_vol_is:
1054 for volume in node_vol_is[node]:
1055 if node not in node_vol_should or volume not in node_vol_should[node]:
1056 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
1061 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
1062 """Verify the list of running instances.
1064 This checks what instances are running but unknown to the cluster.
1068 for node in node_instance:
1069 for runninginstance in node_instance[node]:
1070 if runninginstance not in instancelist:
1071 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
1072 (runninginstance, node))
1076 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1077 """Verify N+1 Memory Resilience.
1079 Check that if one single node dies we can still start all the instances it
1085 for node, nodeinfo in node_info.iteritems():
1086 # This code checks that every node which is now listed as secondary has
1087 # enough memory to host all instances it is supposed to should a single
1088 # other node in the cluster fail.
1089 # FIXME: not ready for failover to an arbitrary node
1090 # FIXME: does not support file-backed instances
1091 # WARNING: we currently take into account down instances as well as up
1092 # ones, considering that even if they're down someone might want to start
1093 # them even in the event of a node failure.
1094 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1096 for instance in instances:
1097 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1098 if bep[constants.BE_AUTO_BALANCE]:
1099 needed_mem += bep[constants.BE_MEMORY]
1100 if nodeinfo['mfree'] < needed_mem:
1101 feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
1102 " failovers should node %s fail" % (node, prinode))
1106 def CheckPrereq(self):
1107 """Check prerequisites.
1109 Transform the list of checks we're going to skip into a set and check that
1110 all its members are valid.
1113 self.skip_set = frozenset(self.op.skip_checks)
1114 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1115 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1117 def BuildHooksEnv(self):
1120 Cluster-Verify hooks just ran in the post phase and their failure makes
1121 the output be logged in the verify output and the verification to fail.
1124 all_nodes = self.cfg.GetNodeList()
1126 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1128 for node in self.cfg.GetAllNodesInfo().values():
1129 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1131 return env, [], all_nodes
1133 def Exec(self, feedback_fn):
1134 """Verify integrity of cluster, performing various test on nodes.
1138 feedback_fn("* Verifying global settings")
1139 for msg in self.cfg.VerifyConfig():
1140 feedback_fn(" - ERROR: %s" % msg)
1142 vg_name = self.cfg.GetVGName()
1143 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1144 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1145 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1146 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1147 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1148 for iname in instancelist)
1149 i_non_redundant = [] # Non redundant instances
1150 i_non_a_balanced = [] # Non auto-balanced instances
1151 n_offline = [] # List of offline nodes
1152 n_drained = [] # List of nodes being drained
1158 # FIXME: verify OS list
1159 # do local checksums
1160 master_files = [constants.CLUSTER_CONF_FILE]
1162 file_names = ssconf.SimpleStore().GetFileList()
1163 file_names.append(constants.SSL_CERT_FILE)
1164 file_names.append(constants.RAPI_CERT_FILE)
1165 file_names.extend(master_files)
1167 local_checksums = utils.FingerprintFiles(file_names)
1169 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1170 node_verify_param = {
1171 constants.NV_FILELIST: file_names,
1172 constants.NV_NODELIST: [node.name for node in nodeinfo
1173 if not node.offline],
1174 constants.NV_HYPERVISOR: hypervisors,
1175 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1176 node.secondary_ip) for node in nodeinfo
1177 if not node.offline],
1178 constants.NV_INSTANCELIST: hypervisors,
1179 constants.NV_VERSION: None,
1180 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1182 if vg_name is not None:
1183 node_verify_param[constants.NV_VGLIST] = None
1184 node_verify_param[constants.NV_LVLIST] = vg_name
1185 node_verify_param[constants.NV_DRBDLIST] = None
1186 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1187 self.cfg.GetClusterName())
1189 cluster = self.cfg.GetClusterInfo()
1190 master_node = self.cfg.GetMasterNode()
1191 all_drbd_map = self.cfg.ComputeDRBDMap()
1193 for node_i in nodeinfo:
1197 feedback_fn("* Skipping offline node %s" % (node,))
1198 n_offline.append(node)
1201 if node == master_node:
1203 elif node_i.master_candidate:
1204 ntype = "master candidate"
1205 elif node_i.drained:
1207 n_drained.append(node)
1210 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1212 msg = all_nvinfo[node].fail_msg
1214 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1218 nresult = all_nvinfo[node].payload
1220 for minor, instance in all_drbd_map[node].items():
1221 if instance not in instanceinfo:
1222 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1224 # ghost instance should not be running, but otherwise we
1225 # don't give double warnings (both ghost instance and
1226 # unallocated minor in use)
1227 node_drbd[minor] = (instance, False)
1229 instance = instanceinfo[instance]
1230 node_drbd[minor] = (instance.name, instance.admin_up)
1231 result = self._VerifyNode(node_i, file_names, local_checksums,
1232 nresult, feedback_fn, master_files,
1236 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1238 node_volume[node] = {}
1239 elif isinstance(lvdata, basestring):
1240 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1241 (node, utils.SafeEncode(lvdata)))
1243 node_volume[node] = {}
1244 elif not isinstance(lvdata, dict):
1245 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1249 node_volume[node] = lvdata
1252 idata = nresult.get(constants.NV_INSTANCELIST, None)
1253 if not isinstance(idata, list):
1254 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1259 node_instance[node] = idata
1262 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1263 if not isinstance(nodeinfo, dict):
1264 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1270 "mfree": int(nodeinfo['memory_free']),
1273 # dictionary holding all instances this node is secondary for,
1274 # grouped by their primary node. Each key is a cluster node, and each
1275 # value is a list of instances which have the key as primary and the
1276 # current node as secondary. this is handy to calculate N+1 memory
1277 # availability if you can only failover from a primary to its
1279 "sinst-by-pnode": {},
1281 # FIXME: devise a free space model for file based instances as well
1282 if vg_name is not None:
1283 if (constants.NV_VGLIST not in nresult or
1284 vg_name not in nresult[constants.NV_VGLIST]):
1285 feedback_fn(" - ERROR: node %s didn't return data for the"
1286 " volume group '%s' - it is either missing or broken" %
1290 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1291 except (ValueError, KeyError):
1292 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1293 " from node %s" % (node,))
1297 node_vol_should = {}
1299 for instance in instancelist:
1300 feedback_fn("* Verifying instance %s" % instance)
1301 inst_config = instanceinfo[instance]
1302 result = self._VerifyInstance(instance, inst_config, node_volume,
1303 node_instance, feedback_fn, n_offline)
1305 inst_nodes_offline = []
1307 inst_config.MapLVsByNode(node_vol_should)
1309 instance_cfg[instance] = inst_config
1311 pnode = inst_config.primary_node
1312 if pnode in node_info:
1313 node_info[pnode]['pinst'].append(instance)
1314 elif pnode not in n_offline:
1315 feedback_fn(" - ERROR: instance %s, connection to primary node"
1316 " %s failed" % (instance, pnode))
1319 if pnode in n_offline:
1320 inst_nodes_offline.append(pnode)
1322 # If the instance is non-redundant we cannot survive losing its primary
1323 # node, so we are not N+1 compliant. On the other hand we have no disk
1324 # templates with more than one secondary so that situation is not well
1326 # FIXME: does not support file-backed instances
1327 if len(inst_config.secondary_nodes) == 0:
1328 i_non_redundant.append(instance)
1329 elif len(inst_config.secondary_nodes) > 1:
1330 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1333 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1334 i_non_a_balanced.append(instance)
1336 for snode in inst_config.secondary_nodes:
1337 if snode in node_info:
1338 node_info[snode]['sinst'].append(instance)
1339 if pnode not in node_info[snode]['sinst-by-pnode']:
1340 node_info[snode]['sinst-by-pnode'][pnode] = []
1341 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1342 elif snode not in n_offline:
1343 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1344 " %s failed" % (instance, snode))
1346 if snode in n_offline:
1347 inst_nodes_offline.append(snode)
1349 if inst_nodes_offline:
1350 # warn that the instance lives on offline nodes, and set bad=True
1351 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1352 ", ".join(inst_nodes_offline))
1355 feedback_fn("* Verifying orphan volumes")
1356 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1360 feedback_fn("* Verifying remaining instances")
1361 result = self._VerifyOrphanInstances(instancelist, node_instance,
1365 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1366 feedback_fn("* Verifying N+1 Memory redundancy")
1367 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1370 feedback_fn("* Other Notes")
1372 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1373 % len(i_non_redundant))
1375 if i_non_a_balanced:
1376 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1377 % len(i_non_a_balanced))
1380 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1383 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1387 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1388 """Analyze the post-hooks' result
1390 This method analyses the hook result, handles it, and sends some
1391 nicely-formatted feedback back to the user.
1393 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1394 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1395 @param hooks_results: the results of the multi-node hooks rpc call
1396 @param feedback_fn: function used send feedback back to the caller
1397 @param lu_result: previous Exec result
1398 @return: the new Exec result, based on the previous result
1402 # We only really run POST phase hooks, and are only interested in
1404 if phase == constants.HOOKS_PHASE_POST:
1405 # Used to change hooks' output to proper indentation
1406 indent_re = re.compile('^', re.M)
1407 feedback_fn("* Hooks Results")
1408 if not hooks_results:
1409 feedback_fn(" - ERROR: general communication failure")
1412 for node_name in hooks_results:
1413 show_node_header = True
1414 res = hooks_results[node_name]
1418 # no need to warn or set fail return value
1420 feedback_fn(" Communication failure in hooks execution: %s" %
1424 for script, hkr, output in res.payload:
1425 if hkr == constants.HKR_FAIL:
1426 # The node header is only shown once, if there are
1427 # failing hooks on that node
1428 if show_node_header:
1429 feedback_fn(" Node %s:" % node_name)
1430 show_node_header = False
1431 feedback_fn(" ERROR: Script %s failed, output:" % script)
1432 output = indent_re.sub(' ', output)
1433 feedback_fn("%s" % output)
1439 class LUVerifyDisks(NoHooksLU):
1440 """Verifies the cluster disks status.
1446 def ExpandNames(self):
1447 self.needed_locks = {
1448 locking.LEVEL_NODE: locking.ALL_SET,
1449 locking.LEVEL_INSTANCE: locking.ALL_SET,
1451 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1453 def CheckPrereq(self):
1454 """Check prerequisites.
1456 This has no prerequisites.
1461 def Exec(self, feedback_fn):
1462 """Verify integrity of cluster disks.
1464 @rtype: tuple of three items
1465 @return: a tuple of (dict of node-to-node_error, list of instances
1466 which need activate-disks, dict of instance: (node, volume) for
1470 result = res_nodes, res_instances, res_missing = {}, [], {}
1472 vg_name = self.cfg.GetVGName()
1473 nodes = utils.NiceSort(self.cfg.GetNodeList())
1474 instances = [self.cfg.GetInstanceInfo(name)
1475 for name in self.cfg.GetInstanceList()]
1478 for inst in instances:
1480 if (not inst.admin_up or
1481 inst.disk_template not in constants.DTS_NET_MIRROR):
1483 inst.MapLVsByNode(inst_lvs)
1484 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1485 for node, vol_list in inst_lvs.iteritems():
1486 for vol in vol_list:
1487 nv_dict[(node, vol)] = inst
1492 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1496 node_res = node_lvs[node]
1497 if node_res.offline:
1499 msg = node_res.fail_msg
1501 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1502 res_nodes[node] = msg
1505 lvs = node_res.payload
1506 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1507 inst = nv_dict.pop((node, lv_name), None)
1508 if (not lv_online and inst is not None
1509 and inst.name not in res_instances):
1510 res_instances.append(inst.name)
1512 # any leftover items in nv_dict are missing LVs, let's arrange the
1514 for key, inst in nv_dict.iteritems():
1515 if inst.name not in res_missing:
1516 res_missing[inst.name] = []
1517 res_missing[inst.name].append(key)
1522 class LURepairDiskSizes(NoHooksLU):
1523 """Verifies the cluster disks sizes.
1526 _OP_REQP = ["instances"]
1529 def ExpandNames(self):
1531 if not isinstance(self.op.instances, list):
1532 raise errors.OpPrereqError("Invalid argument type 'instances'")
1534 if self.op.instances:
1535 self.wanted_names = []
1536 for name in self.op.instances:
1537 full_name = self.cfg.ExpandInstanceName(name)
1538 if full_name is None:
1539 raise errors.OpPrereqError("Instance '%s' not known" % name)
1540 self.wanted_names.append(full_name)
1541 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1542 self.needed_locks = {
1543 locking.LEVEL_NODE: [],
1544 locking.LEVEL_INSTANCE: self.wanted_names,
1546 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1548 self.wanted_names = None
1549 self.needed_locks = {
1550 locking.LEVEL_NODE: locking.ALL_SET,
1551 locking.LEVEL_INSTANCE: locking.ALL_SET,
1553 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1555 def DeclareLocks(self, level):
1556 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1557 self._LockInstancesNodes(primary_only=True)
1559 def CheckPrereq(self):
1560 """Check prerequisites.
1562 This only checks the optional instance list against the existing names.
1565 if self.wanted_names is None:
1566 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1568 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1569 in self.wanted_names]
1571 def Exec(self, feedback_fn):
1572 """Verify the size of cluster disks.
1575 # TODO: check child disks too
1576 # TODO: check differences in size between primary/secondary nodes
1578 for instance in self.wanted_instances:
1579 pnode = instance.primary_node
1580 if pnode not in per_node_disks:
1581 per_node_disks[pnode] = []
1582 for idx, disk in enumerate(instance.disks):
1583 per_node_disks[pnode].append((instance, idx, disk))
1586 for node, dskl in per_node_disks.items():
1587 result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1589 self.LogWarning("Failure in blockdev_getsizes call to node"
1590 " %s, ignoring", node)
1592 if len(result.data) != len(dskl):
1593 self.LogWarning("Invalid result from node %s, ignoring node results",
1596 for ((instance, idx, disk), size) in zip(dskl, result.data):
1598 self.LogWarning("Disk %d of instance %s did not return size"
1599 " information, ignoring", idx, instance.name)
1601 if not isinstance(size, (int, long)):
1602 self.LogWarning("Disk %d of instance %s did not return valid"
1603 " size information, ignoring", idx, instance.name)
1606 if size != disk.size:
1607 self.LogInfo("Disk %d of instance %s has mismatched size,"
1608 " correcting: recorded %d, actual %d", idx,
1609 instance.name, disk.size, size)
1611 self.cfg.Update(instance)
1612 changed.append((instance.name, idx, size))
1616 class LURenameCluster(LogicalUnit):
1617 """Rename the cluster.
1620 HPATH = "cluster-rename"
1621 HTYPE = constants.HTYPE_CLUSTER
1624 def BuildHooksEnv(self):
1629 "OP_TARGET": self.cfg.GetClusterName(),
1630 "NEW_NAME": self.op.name,
1632 mn = self.cfg.GetMasterNode()
1633 return env, [mn], [mn]
1635 def CheckPrereq(self):
1636 """Verify that the passed name is a valid one.
1639 hostname = utils.HostInfo(self.op.name)
1641 new_name = hostname.name
1642 self.ip = new_ip = hostname.ip
1643 old_name = self.cfg.GetClusterName()
1644 old_ip = self.cfg.GetMasterIP()
1645 if new_name == old_name and new_ip == old_ip:
1646 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1647 " cluster has changed")
1648 if new_ip != old_ip:
1649 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1650 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1651 " reachable on the network. Aborting." %
1654 self.op.name = new_name
1656 def Exec(self, feedback_fn):
1657 """Rename the cluster.
1660 clustername = self.op.name
1663 # shutdown the master IP
1664 master = self.cfg.GetMasterNode()
1665 result = self.rpc.call_node_stop_master(master, False)
1666 result.Raise("Could not disable the master role")
1669 cluster = self.cfg.GetClusterInfo()
1670 cluster.cluster_name = clustername
1671 cluster.master_ip = ip
1672 self.cfg.Update(cluster)
1674 # update the known hosts file
1675 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1676 node_list = self.cfg.GetNodeList()
1678 node_list.remove(master)
1681 result = self.rpc.call_upload_file(node_list,
1682 constants.SSH_KNOWN_HOSTS_FILE)
1683 for to_node, to_result in result.iteritems():
1684 msg = to_result.fail_msg
1686 msg = ("Copy of file %s to node %s failed: %s" %
1687 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1688 self.proc.LogWarning(msg)
1691 result = self.rpc.call_node_start_master(master, False, False)
1692 msg = result.fail_msg
1694 self.LogWarning("Could not re-enable the master role on"
1695 " the master, please restart manually: %s", msg)
1698 def _RecursiveCheckIfLVMBased(disk):
1699 """Check if the given disk or its children are lvm-based.
1701 @type disk: L{objects.Disk}
1702 @param disk: the disk to check
1704 @return: boolean indicating whether a LD_LV dev_type was found or not
1708 for chdisk in disk.children:
1709 if _RecursiveCheckIfLVMBased(chdisk):
1711 return disk.dev_type == constants.LD_LV
1714 class LUSetClusterParams(LogicalUnit):
1715 """Change the parameters of the cluster.
1718 HPATH = "cluster-modify"
1719 HTYPE = constants.HTYPE_CLUSTER
1723 def CheckArguments(self):
1727 if not hasattr(self.op, "candidate_pool_size"):
1728 self.op.candidate_pool_size = None
1729 if self.op.candidate_pool_size is not None:
1731 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1732 except (ValueError, TypeError), err:
1733 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1735 if self.op.candidate_pool_size < 1:
1736 raise errors.OpPrereqError("At least one master candidate needed")
1738 def ExpandNames(self):
1739 # FIXME: in the future maybe other cluster params won't require checking on
1740 # all nodes to be modified.
1741 self.needed_locks = {
1742 locking.LEVEL_NODE: locking.ALL_SET,
1744 self.share_locks[locking.LEVEL_NODE] = 1
1746 def BuildHooksEnv(self):
1751 "OP_TARGET": self.cfg.GetClusterName(),
1752 "NEW_VG_NAME": self.op.vg_name,
1754 mn = self.cfg.GetMasterNode()
1755 return env, [mn], [mn]
1757 def CheckPrereq(self):
1758 """Check prerequisites.
1760 This checks whether the given params don't conflict and
1761 if the given volume group is valid.
1764 if self.op.vg_name is not None and not self.op.vg_name:
1765 instances = self.cfg.GetAllInstancesInfo().values()
1766 for inst in instances:
1767 for disk in inst.disks:
1768 if _RecursiveCheckIfLVMBased(disk):
1769 raise errors.OpPrereqError("Cannot disable lvm storage while"
1770 " lvm-based instances exist")
1772 node_list = self.acquired_locks[locking.LEVEL_NODE]
1774 # if vg_name not None, checks given volume group on all nodes
1776 vglist = self.rpc.call_vg_list(node_list)
1777 for node in node_list:
1778 msg = vglist[node].fail_msg
1780 # ignoring down node
1781 self.LogWarning("Error while gathering data on node %s"
1782 " (ignoring node): %s", node, msg)
1784 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1786 constants.MIN_VG_SIZE)
1788 raise errors.OpPrereqError("Error on node '%s': %s" %
1791 self.cluster = cluster = self.cfg.GetClusterInfo()
1792 # validate params changes
1793 if self.op.beparams:
1794 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1795 self.new_beparams = objects.FillDict(
1796 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1798 if self.op.nicparams:
1799 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1800 self.new_nicparams = objects.FillDict(
1801 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1802 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1804 # hypervisor list/parameters
1805 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1806 if self.op.hvparams:
1807 if not isinstance(self.op.hvparams, dict):
1808 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1809 for hv_name, hv_dict in self.op.hvparams.items():
1810 if hv_name not in self.new_hvparams:
1811 self.new_hvparams[hv_name] = hv_dict
1813 self.new_hvparams[hv_name].update(hv_dict)
1815 if self.op.enabled_hypervisors is not None:
1816 self.hv_list = self.op.enabled_hypervisors
1817 if not self.hv_list:
1818 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1819 " least one member")
1820 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1822 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1823 " entries: %s" % invalid_hvs)
1825 self.hv_list = cluster.enabled_hypervisors
1827 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1828 # either the enabled list has changed, or the parameters have, validate
1829 for hv_name, hv_params in self.new_hvparams.items():
1830 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1831 (self.op.enabled_hypervisors and
1832 hv_name in self.op.enabled_hypervisors)):
1833 # either this is a new hypervisor, or its parameters have changed
1834 hv_class = hypervisor.GetHypervisor(hv_name)
1835 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1836 hv_class.CheckParameterSyntax(hv_params)
1837 _CheckHVParams(self, node_list, hv_name, hv_params)
1839 def Exec(self, feedback_fn):
1840 """Change the parameters of the cluster.
1843 if self.op.vg_name is not None:
1844 new_volume = self.op.vg_name
1847 if new_volume != self.cfg.GetVGName():
1848 self.cfg.SetVGName(new_volume)
1850 feedback_fn("Cluster LVM configuration already in desired"
1851 " state, not changing")
1852 if self.op.hvparams:
1853 self.cluster.hvparams = self.new_hvparams
1854 if self.op.enabled_hypervisors is not None:
1855 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1856 if self.op.beparams:
1857 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1858 if self.op.nicparams:
1859 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1861 if self.op.candidate_pool_size is not None:
1862 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1863 # we need to update the pool size here, otherwise the save will fail
1864 _AdjustCandidatePool(self)
1866 self.cfg.Update(self.cluster)
1869 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1870 """Distribute additional files which are part of the cluster configuration.
1872 ConfigWriter takes care of distributing the config and ssconf files, but
1873 there are more files which should be distributed to all nodes. This function
1874 makes sure those are copied.
1876 @param lu: calling logical unit
1877 @param additional_nodes: list of nodes not in the config to distribute to
1880 # 1. Gather target nodes
1881 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1882 dist_nodes = lu.cfg.GetNodeList()
1883 if additional_nodes is not None:
1884 dist_nodes.extend(additional_nodes)
1885 if myself.name in dist_nodes:
1886 dist_nodes.remove(myself.name)
1887 # 2. Gather files to distribute
1888 dist_files = set([constants.ETC_HOSTS,
1889 constants.SSH_KNOWN_HOSTS_FILE,
1890 constants.RAPI_CERT_FILE,
1891 constants.RAPI_USERS_FILE,
1892 constants.HMAC_CLUSTER_KEY,
1895 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1896 for hv_name in enabled_hypervisors:
1897 hv_class = hypervisor.GetHypervisor(hv_name)
1898 dist_files.update(hv_class.GetAncillaryFiles())
1900 # 3. Perform the files upload
1901 for fname in dist_files:
1902 if os.path.exists(fname):
1903 result = lu.rpc.call_upload_file(dist_nodes, fname)
1904 for to_node, to_result in result.items():
1905 msg = to_result.fail_msg
1907 msg = ("Copy of file %s to node %s failed: %s" %
1908 (fname, to_node, msg))
1909 lu.proc.LogWarning(msg)
1912 class LURedistributeConfig(NoHooksLU):
1913 """Force the redistribution of cluster configuration.
1915 This is a very simple LU.
1921 def ExpandNames(self):
1922 self.needed_locks = {
1923 locking.LEVEL_NODE: locking.ALL_SET,
1925 self.share_locks[locking.LEVEL_NODE] = 1
1927 def CheckPrereq(self):
1928 """Check prerequisites.
1932 def Exec(self, feedback_fn):
1933 """Redistribute the configuration.
1936 self.cfg.Update(self.cfg.GetClusterInfo())
1937 _RedistributeAncillaryFiles(self)
1940 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1941 """Sleep and poll for an instance's disk to sync.
1944 if not instance.disks:
1948 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1950 node = instance.primary_node
1952 for dev in instance.disks:
1953 lu.cfg.SetDiskID(dev, node)
1956 degr_retries = 10 # in seconds, as we sleep 1 second each time
1960 cumul_degraded = False
1961 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1962 msg = rstats.fail_msg
1964 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1967 raise errors.RemoteError("Can't contact node %s for mirror data,"
1968 " aborting." % node)
1971 rstats = rstats.payload
1973 for i, mstat in enumerate(rstats):
1975 lu.LogWarning("Can't compute data for node %s/%s",
1976 node, instance.disks[i].iv_name)
1979 cumul_degraded = (cumul_degraded or
1980 (mstat.is_degraded and mstat.sync_percent is None))
1981 if mstat.sync_percent is not None:
1983 if mstat.estimated_time is not None:
1984 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
1985 max_time = mstat.estimated_time
1987 rem_time = "no time estimate"
1988 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1989 (instance.disks[i].iv_name, mstat.sync_percent, rem_time))
1991 # if we're done but degraded, let's do a few small retries, to
1992 # make sure we see a stable and not transient situation; therefore
1993 # we force restart of the loop
1994 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1995 logging.info("Degraded disks found, %d retries left", degr_retries)
2003 time.sleep(min(60, max_time))
2006 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2007 return not cumul_degraded
2010 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2011 """Check that mirrors are not degraded.
2013 The ldisk parameter, if True, will change the test from the
2014 is_degraded attribute (which represents overall non-ok status for
2015 the device(s)) to the ldisk (representing the local storage status).
2018 lu.cfg.SetDiskID(dev, node)
2022 if on_primary or dev.AssembleOnSecondary():
2023 rstats = lu.rpc.call_blockdev_find(node, dev)
2024 msg = rstats.fail_msg
2026 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2028 elif not rstats.payload:
2029 lu.LogWarning("Can't find disk on node %s", node)
2033 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2035 result = result and not rstats.payload.is_degraded
2038 for child in dev.children:
2039 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2044 class LUDiagnoseOS(NoHooksLU):
2045 """Logical unit for OS diagnose/query.
2048 _OP_REQP = ["output_fields", "names"]
2050 _FIELDS_STATIC = utils.FieldSet()
2051 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2053 def ExpandNames(self):
2055 raise errors.OpPrereqError("Selective OS query not supported")
2057 _CheckOutputFields(static=self._FIELDS_STATIC,
2058 dynamic=self._FIELDS_DYNAMIC,
2059 selected=self.op.output_fields)
2061 # Lock all nodes, in shared mode
2062 # Temporary removal of locks, should be reverted later
2063 # TODO: reintroduce locks when they are lighter-weight
2064 self.needed_locks = {}
2065 #self.share_locks[locking.LEVEL_NODE] = 1
2066 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2068 def CheckPrereq(self):
2069 """Check prerequisites.
2074 def _DiagnoseByOS(node_list, rlist):
2075 """Remaps a per-node return list into an a per-os per-node dictionary
2077 @param node_list: a list with the names of all nodes
2078 @param rlist: a map with node names as keys and OS objects as values
2081 @return: a dictionary with osnames as keys and as value another map, with
2082 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2084 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2085 (/srv/..., False, "invalid api")],
2086 "node2": [(/srv/..., True, "")]}
2091 # we build here the list of nodes that didn't fail the RPC (at RPC
2092 # level), so that nodes with a non-responding node daemon don't
2093 # make all OSes invalid
2094 good_nodes = [node_name for node_name in rlist
2095 if not rlist[node_name].fail_msg]
2096 for node_name, nr in rlist.items():
2097 if nr.fail_msg or not nr.payload:
2099 for name, path, status, diagnose in nr.payload:
2100 if name not in all_os:
2101 # build a list of nodes for this os containing empty lists
2102 # for each node in node_list
2104 for nname in good_nodes:
2105 all_os[name][nname] = []
2106 all_os[name][node_name].append((path, status, diagnose))
2109 def Exec(self, feedback_fn):
2110 """Compute the list of OSes.
2113 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2114 node_data = self.rpc.call_os_diagnose(valid_nodes)
2115 pol = self._DiagnoseByOS(valid_nodes, node_data)
2117 for os_name, os_data in pol.items():
2119 for field in self.op.output_fields:
2122 elif field == "valid":
2123 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2124 elif field == "node_status":
2125 # this is just a copy of the dict
2127 for node_name, nos_list in os_data.items():
2128 val[node_name] = nos_list
2130 raise errors.ParameterError(field)
2137 class LURemoveNode(LogicalUnit):
2138 """Logical unit for removing a node.
2141 HPATH = "node-remove"
2142 HTYPE = constants.HTYPE_NODE
2143 _OP_REQP = ["node_name"]
2145 def BuildHooksEnv(self):
2148 This doesn't run on the target node in the pre phase as a failed
2149 node would then be impossible to remove.
2153 "OP_TARGET": self.op.node_name,
2154 "NODE_NAME": self.op.node_name,
2156 all_nodes = self.cfg.GetNodeList()
2157 all_nodes.remove(self.op.node_name)
2158 return env, all_nodes, all_nodes
2160 def CheckPrereq(self):
2161 """Check prerequisites.
2164 - the node exists in the configuration
2165 - it does not have primary or secondary instances
2166 - it's not the master
2168 Any errors are signaled by raising errors.OpPrereqError.
2171 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2173 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2175 instance_list = self.cfg.GetInstanceList()
2177 masternode = self.cfg.GetMasterNode()
2178 if node.name == masternode:
2179 raise errors.OpPrereqError("Node is the master node,"
2180 " you need to failover first.")
2182 for instance_name in instance_list:
2183 instance = self.cfg.GetInstanceInfo(instance_name)
2184 if node.name in instance.all_nodes:
2185 raise errors.OpPrereqError("Instance %s is still running on the node,"
2186 " please remove first." % instance_name)
2187 self.op.node_name = node.name
2190 def Exec(self, feedback_fn):
2191 """Removes the node from the cluster.
2195 logging.info("Stopping the node daemon and removing configs from node %s",
2198 self.context.RemoveNode(node.name)
2200 result = self.rpc.call_node_leave_cluster(node.name)
2201 msg = result.fail_msg
2203 self.LogWarning("Errors encountered on the remote node while leaving"
2204 " the cluster: %s", msg)
2206 # Promote nodes to master candidate as needed
2207 _AdjustCandidatePool(self)
2210 class LUQueryNodes(NoHooksLU):
2211 """Logical unit for querying nodes.
2214 _OP_REQP = ["output_fields", "names", "use_locking"]
2216 _FIELDS_DYNAMIC = utils.FieldSet(
2218 "mtotal", "mnode", "mfree",
2220 "ctotal", "cnodes", "csockets",
2223 _FIELDS_STATIC = utils.FieldSet(
2224 "name", "pinst_cnt", "sinst_cnt",
2225 "pinst_list", "sinst_list",
2226 "pip", "sip", "tags",
2227 "serial_no", "ctime", "mtime",
2235 def ExpandNames(self):
2236 _CheckOutputFields(static=self._FIELDS_STATIC,
2237 dynamic=self._FIELDS_DYNAMIC,
2238 selected=self.op.output_fields)
2240 self.needed_locks = {}
2241 self.share_locks[locking.LEVEL_NODE] = 1
2244 self.wanted = _GetWantedNodes(self, self.op.names)
2246 self.wanted = locking.ALL_SET
2248 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2249 self.do_locking = self.do_node_query and self.op.use_locking
2251 # if we don't request only static fields, we need to lock the nodes
2252 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2255 def CheckPrereq(self):
2256 """Check prerequisites.
2259 # The validation of the node list is done in the _GetWantedNodes,
2260 # if non empty, and if empty, there's no validation to do
2263 def Exec(self, feedback_fn):
2264 """Computes the list of nodes and their attributes.
2267 all_info = self.cfg.GetAllNodesInfo()
2269 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2270 elif self.wanted != locking.ALL_SET:
2271 nodenames = self.wanted
2272 missing = set(nodenames).difference(all_info.keys())
2274 raise errors.OpExecError(
2275 "Some nodes were removed before retrieving their data: %s" % missing)
2277 nodenames = all_info.keys()
2279 nodenames = utils.NiceSort(nodenames)
2280 nodelist = [all_info[name] for name in nodenames]
2282 # begin data gathering
2284 if self.do_node_query:
2286 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2287 self.cfg.GetHypervisorType())
2288 for name in nodenames:
2289 nodeinfo = node_data[name]
2290 if not nodeinfo.fail_msg and nodeinfo.payload:
2291 nodeinfo = nodeinfo.payload
2292 fn = utils.TryConvert
2294 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2295 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2296 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2297 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2298 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2299 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2300 "bootid": nodeinfo.get('bootid', None),
2301 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2302 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2305 live_data[name] = {}
2307 live_data = dict.fromkeys(nodenames, {})
2309 node_to_primary = dict([(name, set()) for name in nodenames])
2310 node_to_secondary = dict([(name, set()) for name in nodenames])
2312 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2313 "sinst_cnt", "sinst_list"))
2314 if inst_fields & frozenset(self.op.output_fields):
2315 instancelist = self.cfg.GetInstanceList()
2317 for instance_name in instancelist:
2318 inst = self.cfg.GetInstanceInfo(instance_name)
2319 if inst.primary_node in node_to_primary:
2320 node_to_primary[inst.primary_node].add(inst.name)
2321 for secnode in inst.secondary_nodes:
2322 if secnode in node_to_secondary:
2323 node_to_secondary[secnode].add(inst.name)
2325 master_node = self.cfg.GetMasterNode()
2327 # end data gathering
2330 for node in nodelist:
2332 for field in self.op.output_fields:
2335 elif field == "pinst_list":
2336 val = list(node_to_primary[node.name])
2337 elif field == "sinst_list":
2338 val = list(node_to_secondary[node.name])
2339 elif field == "pinst_cnt":
2340 val = len(node_to_primary[node.name])
2341 elif field == "sinst_cnt":
2342 val = len(node_to_secondary[node.name])
2343 elif field == "pip":
2344 val = node.primary_ip
2345 elif field == "sip":
2346 val = node.secondary_ip
2347 elif field == "tags":
2348 val = list(node.GetTags())
2349 elif field == "serial_no":
2350 val = node.serial_no
2351 elif field == "ctime":
2353 elif field == "mtime":
2355 elif field == "master_candidate":
2356 val = node.master_candidate
2357 elif field == "master":
2358 val = node.name == master_node
2359 elif field == "offline":
2361 elif field == "drained":
2363 elif self._FIELDS_DYNAMIC.Matches(field):
2364 val = live_data[node.name].get(field, None)
2365 elif field == "role":
2366 if node.name == master_node:
2368 elif node.master_candidate:
2377 raise errors.ParameterError(field)
2378 node_output.append(val)
2379 output.append(node_output)
2384 class LUQueryNodeVolumes(NoHooksLU):
2385 """Logical unit for getting volumes on node(s).
2388 _OP_REQP = ["nodes", "output_fields"]
2390 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2391 _FIELDS_STATIC = utils.FieldSet("node")
2393 def ExpandNames(self):
2394 _CheckOutputFields(static=self._FIELDS_STATIC,
2395 dynamic=self._FIELDS_DYNAMIC,
2396 selected=self.op.output_fields)
2398 self.needed_locks = {}
2399 self.share_locks[locking.LEVEL_NODE] = 1
2400 if not self.op.nodes:
2401 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2403 self.needed_locks[locking.LEVEL_NODE] = \
2404 _GetWantedNodes(self, self.op.nodes)
2406 def CheckPrereq(self):
2407 """Check prerequisites.
2409 This checks that the fields required are valid output fields.
2412 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2414 def Exec(self, feedback_fn):
2415 """Computes the list of nodes and their attributes.
2418 nodenames = self.nodes
2419 volumes = self.rpc.call_node_volumes(nodenames)
2421 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2422 in self.cfg.GetInstanceList()]
2424 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2427 for node in nodenames:
2428 nresult = volumes[node]
2431 msg = nresult.fail_msg
2433 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2436 node_vols = nresult.payload[:]
2437 node_vols.sort(key=lambda vol: vol['dev'])
2439 for vol in node_vols:
2441 for field in self.op.output_fields:
2444 elif field == "phys":
2448 elif field == "name":
2450 elif field == "size":
2451 val = int(float(vol['size']))
2452 elif field == "instance":
2454 if node not in lv_by_node[inst]:
2456 if vol['name'] in lv_by_node[inst][node]:
2462 raise errors.ParameterError(field)
2463 node_output.append(str(val))
2465 output.append(node_output)
2470 class LUQueryNodeStorage(NoHooksLU):
2471 """Logical unit for getting information on storage units on node(s).
2474 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2476 _FIELDS_STATIC = utils.FieldSet("node")
2478 def ExpandNames(self):
2479 storage_type = self.op.storage_type
2481 if storage_type not in constants.VALID_STORAGE_FIELDS:
2482 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2484 dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2486 _CheckOutputFields(static=self._FIELDS_STATIC,
2487 dynamic=utils.FieldSet(*dynamic_fields),
2488 selected=self.op.output_fields)
2490 self.needed_locks = {}
2491 self.share_locks[locking.LEVEL_NODE] = 1
2494 self.needed_locks[locking.LEVEL_NODE] = \
2495 _GetWantedNodes(self, self.op.nodes)
2497 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2499 def CheckPrereq(self):
2500 """Check prerequisites.
2502 This checks that the fields required are valid output fields.
2505 self.op.name = getattr(self.op, "name", None)
2507 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2509 def Exec(self, feedback_fn):
2510 """Computes the list of nodes and their attributes.
2513 # Always get name to sort by
2514 if constants.SF_NAME in self.op.output_fields:
2515 fields = self.op.output_fields[:]
2517 fields = [constants.SF_NAME] + self.op.output_fields
2519 # Never ask for node as it's only known to the LU
2520 while "node" in fields:
2521 fields.remove("node")
2523 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2524 name_idx = field_idx[constants.SF_NAME]
2526 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2527 data = self.rpc.call_storage_list(self.nodes,
2528 self.op.storage_type, st_args,
2529 self.op.name, fields)
2533 for node in utils.NiceSort(self.nodes):
2534 nresult = data[node]
2538 msg = nresult.fail_msg
2540 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2543 rows = dict([(row[name_idx], row) for row in nresult.payload])
2545 for name in utils.NiceSort(rows.keys()):
2550 for field in self.op.output_fields:
2553 elif field in field_idx:
2554 val = row[field_idx[field]]
2556 raise errors.ParameterError(field)
2565 class LUModifyNodeStorage(NoHooksLU):
2566 """Logical unit for modifying a storage volume on a node.
2569 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2572 def CheckArguments(self):
2573 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2574 if node_name is None:
2575 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2577 self.op.node_name = node_name
2579 storage_type = self.op.storage_type
2580 if storage_type not in constants.VALID_STORAGE_FIELDS:
2581 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2583 def ExpandNames(self):
2584 self.needed_locks = {
2585 locking.LEVEL_NODE: self.op.node_name,
2588 def CheckPrereq(self):
2589 """Check prerequisites.
2592 storage_type = self.op.storage_type
2595 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2597 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2598 " modified" % storage_type)
2600 diff = set(self.op.changes.keys()) - modifiable
2602 raise errors.OpPrereqError("The following fields can not be modified for"
2603 " storage units of type '%s': %r" %
2604 (storage_type, list(diff)))
2606 def Exec(self, feedback_fn):
2607 """Computes the list of nodes and their attributes.
2610 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2611 result = self.rpc.call_storage_modify(self.op.node_name,
2612 self.op.storage_type, st_args,
2613 self.op.name, self.op.changes)
2614 result.Raise("Failed to modify storage unit '%s' on %s" %
2615 (self.op.name, self.op.node_name))
2618 class LUAddNode(LogicalUnit):
2619 """Logical unit for adding node to the cluster.
2623 HTYPE = constants.HTYPE_NODE
2624 _OP_REQP = ["node_name"]
2626 def BuildHooksEnv(self):
2629 This will run on all nodes before, and on all nodes + the new node after.
2633 "OP_TARGET": self.op.node_name,
2634 "NODE_NAME": self.op.node_name,
2635 "NODE_PIP": self.op.primary_ip,
2636 "NODE_SIP": self.op.secondary_ip,
2638 nodes_0 = self.cfg.GetNodeList()
2639 nodes_1 = nodes_0 + [self.op.node_name, ]
2640 return env, nodes_0, nodes_1
2642 def CheckPrereq(self):
2643 """Check prerequisites.
2646 - the new node is not already in the config
2648 - its parameters (single/dual homed) matches the cluster
2650 Any errors are signaled by raising errors.OpPrereqError.
2653 node_name = self.op.node_name
2656 dns_data = utils.HostInfo(node_name)
2658 node = dns_data.name
2659 primary_ip = self.op.primary_ip = dns_data.ip
2660 secondary_ip = getattr(self.op, "secondary_ip", None)
2661 if secondary_ip is None:
2662 secondary_ip = primary_ip
2663 if not utils.IsValidIP(secondary_ip):
2664 raise errors.OpPrereqError("Invalid secondary IP given")
2665 self.op.secondary_ip = secondary_ip
2667 node_list = cfg.GetNodeList()
2668 if not self.op.readd and node in node_list:
2669 raise errors.OpPrereqError("Node %s is already in the configuration" %
2671 elif self.op.readd and node not in node_list:
2672 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2674 for existing_node_name in node_list:
2675 existing_node = cfg.GetNodeInfo(existing_node_name)
2677 if self.op.readd and node == existing_node_name:
2678 if (existing_node.primary_ip != primary_ip or
2679 existing_node.secondary_ip != secondary_ip):
2680 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2681 " address configuration as before")
2684 if (existing_node.primary_ip == primary_ip or
2685 existing_node.secondary_ip == primary_ip or
2686 existing_node.primary_ip == secondary_ip or
2687 existing_node.secondary_ip == secondary_ip):
2688 raise errors.OpPrereqError("New node ip address(es) conflict with"
2689 " existing node %s" % existing_node.name)
2691 # check that the type of the node (single versus dual homed) is the
2692 # same as for the master
2693 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2694 master_singlehomed = myself.secondary_ip == myself.primary_ip
2695 newbie_singlehomed = secondary_ip == primary_ip
2696 if master_singlehomed != newbie_singlehomed:
2697 if master_singlehomed:
2698 raise errors.OpPrereqError("The master has no private ip but the"
2699 " new node has one")
2701 raise errors.OpPrereqError("The master has a private ip but the"
2702 " new node doesn't have one")
2704 # checks reachability
2705 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2706 raise errors.OpPrereqError("Node not reachable by ping")
2708 if not newbie_singlehomed:
2709 # check reachability from my secondary ip to newbie's secondary ip
2710 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2711 source=myself.secondary_ip):
2712 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2713 " based ping to noded port")
2715 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2720 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2721 # the new node will increase mc_max with one, so:
2722 mc_max = min(mc_max + 1, cp_size)
2723 self.master_candidate = mc_now < mc_max
2726 self.new_node = self.cfg.GetNodeInfo(node)
2727 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2729 self.new_node = objects.Node(name=node,
2730 primary_ip=primary_ip,
2731 secondary_ip=secondary_ip,
2732 master_candidate=self.master_candidate,
2733 offline=False, drained=False)
2735 def Exec(self, feedback_fn):
2736 """Adds the new node to the cluster.
2739 new_node = self.new_node
2740 node = new_node.name
2742 # for re-adds, reset the offline/drained/master-candidate flags;
2743 # we need to reset here, otherwise offline would prevent RPC calls
2744 # later in the procedure; this also means that if the re-add
2745 # fails, we are left with a non-offlined, broken node
2747 new_node.drained = new_node.offline = False
2748 self.LogInfo("Readding a node, the offline/drained flags were reset")
2749 # if we demote the node, we do cleanup later in the procedure
2750 new_node.master_candidate = self.master_candidate
2752 # notify the user about any possible mc promotion
2753 if new_node.master_candidate:
2754 self.LogInfo("Node will be a master candidate")
2756 # check connectivity
2757 result = self.rpc.call_version([node])[node]
2758 result.Raise("Can't get version information from node %s" % node)
2759 if constants.PROTOCOL_VERSION == result.payload:
2760 logging.info("Communication to node %s fine, sw version %s match",
2761 node, result.payload)
2763 raise errors.OpExecError("Version mismatch master version %s,"
2764 " node version %s" %
2765 (constants.PROTOCOL_VERSION, result.payload))
2768 logging.info("Copy ssh key to node %s", node)
2769 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2771 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2772 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2778 keyarray.append(f.read())
2782 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2784 keyarray[3], keyarray[4], keyarray[5])
2785 result.Raise("Cannot transfer ssh keys to the new node")
2787 # Add node to our /etc/hosts, and add key to known_hosts
2788 if self.cfg.GetClusterInfo().modify_etc_hosts:
2789 utils.AddHostToEtcHosts(new_node.name)
2791 if new_node.secondary_ip != new_node.primary_ip:
2792 result = self.rpc.call_node_has_ip_address(new_node.name,
2793 new_node.secondary_ip)
2794 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2796 if not result.payload:
2797 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2798 " you gave (%s). Please fix and re-run this"
2799 " command." % new_node.secondary_ip)
2801 node_verify_list = [self.cfg.GetMasterNode()]
2802 node_verify_param = {
2804 # TODO: do a node-net-test as well?
2807 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2808 self.cfg.GetClusterName())
2809 for verifier in node_verify_list:
2810 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2811 nl_payload = result[verifier].payload['nodelist']
2813 for failed in nl_payload:
2814 feedback_fn("ssh/hostname verification failed %s -> %s" %
2815 (verifier, nl_payload[failed]))
2816 raise errors.OpExecError("ssh/hostname verification failed.")
2819 _RedistributeAncillaryFiles(self)
2820 self.context.ReaddNode(new_node)
2821 # make sure we redistribute the config
2822 self.cfg.Update(new_node)
2823 # and make sure the new node will not have old files around
2824 if not new_node.master_candidate:
2825 result = self.rpc.call_node_demote_from_mc(new_node.name)
2826 msg = result.RemoteFailMsg()
2828 self.LogWarning("Node failed to demote itself from master"
2829 " candidate status: %s" % msg)
2831 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2832 self.context.AddNode(new_node)
2835 class LUSetNodeParams(LogicalUnit):
2836 """Modifies the parameters of a node.
2839 HPATH = "node-modify"
2840 HTYPE = constants.HTYPE_NODE
2841 _OP_REQP = ["node_name"]
2844 def CheckArguments(self):
2845 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2846 if node_name is None:
2847 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2848 self.op.node_name = node_name
2849 _CheckBooleanOpField(self.op, 'master_candidate')
2850 _CheckBooleanOpField(self.op, 'offline')
2851 _CheckBooleanOpField(self.op, 'drained')
2852 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2853 if all_mods.count(None) == 3:
2854 raise errors.OpPrereqError("Please pass at least one modification")
2855 if all_mods.count(True) > 1:
2856 raise errors.OpPrereqError("Can't set the node into more than one"
2857 " state at the same time")
2859 def ExpandNames(self):
2860 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2862 def BuildHooksEnv(self):
2865 This runs on the master node.
2869 "OP_TARGET": self.op.node_name,
2870 "MASTER_CANDIDATE": str(self.op.master_candidate),
2871 "OFFLINE": str(self.op.offline),
2872 "DRAINED": str(self.op.drained),
2874 nl = [self.cfg.GetMasterNode(),
2878 def CheckPrereq(self):
2879 """Check prerequisites.
2881 This only checks the instance list against the existing names.
2884 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2886 if ((self.op.master_candidate == False or self.op.offline == True or
2887 self.op.drained == True) and node.master_candidate):
2888 # we will demote the node from master_candidate
2889 if self.op.node_name == self.cfg.GetMasterNode():
2890 raise errors.OpPrereqError("The master node has to be a"
2891 " master candidate, online and not drained")
2892 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2893 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2894 if num_candidates <= cp_size:
2895 msg = ("Not enough master candidates (desired"
2896 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2898 self.LogWarning(msg)
2900 raise errors.OpPrereqError(msg)
2902 if (self.op.master_candidate == True and
2903 ((node.offline and not self.op.offline == False) or
2904 (node.drained and not self.op.drained == False))):
2905 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2906 " to master_candidate" % node.name)
2910 def Exec(self, feedback_fn):
2919 if self.op.offline is not None:
2920 node.offline = self.op.offline
2921 result.append(("offline", str(self.op.offline)))
2922 if self.op.offline == True:
2923 if node.master_candidate:
2924 node.master_candidate = False
2926 result.append(("master_candidate", "auto-demotion due to offline"))
2928 node.drained = False
2929 result.append(("drained", "clear drained status due to offline"))
2931 if self.op.master_candidate is not None:
2932 node.master_candidate = self.op.master_candidate
2934 result.append(("master_candidate", str(self.op.master_candidate)))
2935 if self.op.master_candidate == False:
2936 rrc = self.rpc.call_node_demote_from_mc(node.name)
2939 self.LogWarning("Node failed to demote itself: %s" % msg)
2941 if self.op.drained is not None:
2942 node.drained = self.op.drained
2943 result.append(("drained", str(self.op.drained)))
2944 if self.op.drained == True:
2945 if node.master_candidate:
2946 node.master_candidate = False
2948 result.append(("master_candidate", "auto-demotion due to drain"))
2949 rrc = self.rpc.call_node_demote_from_mc(node.name)
2950 msg = rrc.RemoteFailMsg()
2952 self.LogWarning("Node failed to demote itself: %s" % msg)
2954 node.offline = False
2955 result.append(("offline", "clear offline status due to drain"))
2957 # this will trigger configuration file update, if needed
2958 self.cfg.Update(node)
2959 # this will trigger job queue propagation or cleanup
2961 self.context.ReaddNode(node)
2966 class LUPowercycleNode(NoHooksLU):
2967 """Powercycles a node.
2970 _OP_REQP = ["node_name", "force"]
2973 def CheckArguments(self):
2974 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2975 if node_name is None:
2976 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2977 self.op.node_name = node_name
2978 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2979 raise errors.OpPrereqError("The node is the master and the force"
2980 " parameter was not set")
2982 def ExpandNames(self):
2983 """Locking for PowercycleNode.
2985 This is a last-resort option and shouldn't block on other
2986 jobs. Therefore, we grab no locks.
2989 self.needed_locks = {}
2991 def CheckPrereq(self):
2992 """Check prerequisites.
2994 This LU has no prereqs.
2999 def Exec(self, feedback_fn):
3003 result = self.rpc.call_node_powercycle(self.op.node_name,
3004 self.cfg.GetHypervisorType())
3005 result.Raise("Failed to schedule the reboot")
3006 return result.payload
3009 class LUQueryClusterInfo(NoHooksLU):
3010 """Query cluster configuration.
3016 def ExpandNames(self):
3017 self.needed_locks = {}
3019 def CheckPrereq(self):
3020 """No prerequsites needed for this LU.
3025 def Exec(self, feedback_fn):
3026 """Return cluster config.
3029 cluster = self.cfg.GetClusterInfo()
3031 "software_version": constants.RELEASE_VERSION,
3032 "protocol_version": constants.PROTOCOL_VERSION,
3033 "config_version": constants.CONFIG_VERSION,
3034 "os_api_version": max(constants.OS_API_VERSIONS),
3035 "export_version": constants.EXPORT_VERSION,
3036 "architecture": (platform.architecture()[0], platform.machine()),
3037 "name": cluster.cluster_name,
3038 "master": cluster.master_node,
3039 "default_hypervisor": cluster.enabled_hypervisors[0],
3040 "enabled_hypervisors": cluster.enabled_hypervisors,
3041 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3042 for hypervisor_name in cluster.enabled_hypervisors]),
3043 "beparams": cluster.beparams,
3044 "nicparams": cluster.nicparams,
3045 "candidate_pool_size": cluster.candidate_pool_size,
3046 "master_netdev": cluster.master_netdev,
3047 "volume_group_name": cluster.volume_group_name,
3048 "file_storage_dir": cluster.file_storage_dir,
3049 "ctime": cluster.ctime,
3050 "mtime": cluster.mtime,
3056 class LUQueryConfigValues(NoHooksLU):
3057 """Return configuration values.
3062 _FIELDS_DYNAMIC = utils.FieldSet()
3063 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
3065 def ExpandNames(self):
3066 self.needed_locks = {}
3068 _CheckOutputFields(static=self._FIELDS_STATIC,
3069 dynamic=self._FIELDS_DYNAMIC,
3070 selected=self.op.output_fields)
3072 def CheckPrereq(self):
3073 """No prerequisites.
3078 def Exec(self, feedback_fn):
3079 """Dump a representation of the cluster config to the standard output.
3083 for field in self.op.output_fields:
3084 if field == "cluster_name":
3085 entry = self.cfg.GetClusterName()
3086 elif field == "master_node":
3087 entry = self.cfg.GetMasterNode()
3088 elif field == "drain_flag":
3089 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3091 raise errors.ParameterError(field)
3092 values.append(entry)
3096 class LUActivateInstanceDisks(NoHooksLU):
3097 """Bring up an instance's disks.
3100 _OP_REQP = ["instance_name"]
3103 def ExpandNames(self):
3104 self._ExpandAndLockInstance()
3105 self.needed_locks[locking.LEVEL_NODE] = []
3106 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3108 def DeclareLocks(self, level):
3109 if level == locking.LEVEL_NODE:
3110 self._LockInstancesNodes()
3112 def CheckPrereq(self):
3113 """Check prerequisites.
3115 This checks that the instance is in the cluster.
3118 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3119 assert self.instance is not None, \
3120 "Cannot retrieve locked instance %s" % self.op.instance_name
3121 _CheckNodeOnline(self, self.instance.primary_node)
3122 if not hasattr(self.op, "ignore_size"):
3123 self.op.ignore_size = False
3125 def Exec(self, feedback_fn):
3126 """Activate the disks.
3129 disks_ok, disks_info = \
3130 _AssembleInstanceDisks(self, self.instance,
3131 ignore_size=self.op.ignore_size)
3133 raise errors.OpExecError("Cannot activate block devices")
3138 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3140 """Prepare the block devices for an instance.
3142 This sets up the block devices on all nodes.
3144 @type lu: L{LogicalUnit}
3145 @param lu: the logical unit on whose behalf we execute
3146 @type instance: L{objects.Instance}
3147 @param instance: the instance for whose disks we assemble
3148 @type ignore_secondaries: boolean
3149 @param ignore_secondaries: if true, errors on secondary nodes
3150 won't result in an error return from the function
3151 @type ignore_size: boolean
3152 @param ignore_size: if true, the current known size of the disk
3153 will not be used during the disk activation, useful for cases
3154 when the size is wrong
3155 @return: False if the operation failed, otherwise a list of
3156 (host, instance_visible_name, node_visible_name)
3157 with the mapping from node devices to instance devices
3162 iname = instance.name
3163 # With the two passes mechanism we try to reduce the window of
3164 # opportunity for the race condition of switching DRBD to primary
3165 # before handshaking occured, but we do not eliminate it
3167 # The proper fix would be to wait (with some limits) until the
3168 # connection has been made and drbd transitions from WFConnection
3169 # into any other network-connected state (Connected, SyncTarget,
3172 # 1st pass, assemble on all nodes in secondary mode
3173 for inst_disk in instance.disks:
3174 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3176 node_disk = node_disk.Copy()
3177 node_disk.UnsetSize()
3178 lu.cfg.SetDiskID(node_disk, node)
3179 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3180 msg = result.fail_msg
3182 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3183 " (is_primary=False, pass=1): %s",
3184 inst_disk.iv_name, node, msg)
3185 if not ignore_secondaries:
3188 # FIXME: race condition on drbd migration to primary
3190 # 2nd pass, do only the primary node
3191 for inst_disk in instance.disks:
3192 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3193 if node != instance.primary_node:
3196 node_disk = node_disk.Copy()
3197 node_disk.UnsetSize()
3198 lu.cfg.SetDiskID(node_disk, node)
3199 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3200 msg = result.fail_msg
3202 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3203 " (is_primary=True, pass=2): %s",
3204 inst_disk.iv_name, node, msg)
3206 device_info.append((instance.primary_node, inst_disk.iv_name,
3209 # leave the disks configured for the primary node
3210 # this is a workaround that would be fixed better by
3211 # improving the logical/physical id handling
3212 for disk in instance.disks:
3213 lu.cfg.SetDiskID(disk, instance.primary_node)
3215 return disks_ok, device_info
3218 def _StartInstanceDisks(lu, instance, force):
3219 """Start the disks of an instance.
3222 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3223 ignore_secondaries=force)
3225 _ShutdownInstanceDisks(lu, instance)
3226 if force is not None and not force:
3227 lu.proc.LogWarning("", hint="If the message above refers to a"
3229 " you can retry the operation using '--force'.")
3230 raise errors.OpExecError("Disk consistency error")
3233 class LUDeactivateInstanceDisks(NoHooksLU):
3234 """Shutdown an instance's disks.
3237 _OP_REQP = ["instance_name"]
3240 def ExpandNames(self):
3241 self._ExpandAndLockInstance()
3242 self.needed_locks[locking.LEVEL_NODE] = []
3243 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3245 def DeclareLocks(self, level):
3246 if level == locking.LEVEL_NODE:
3247 self._LockInstancesNodes()
3249 def CheckPrereq(self):
3250 """Check prerequisites.
3252 This checks that the instance is in the cluster.
3255 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3256 assert self.instance is not None, \
3257 "Cannot retrieve locked instance %s" % self.op.instance_name
3259 def Exec(self, feedback_fn):
3260 """Deactivate the disks
3263 instance = self.instance
3264 _SafeShutdownInstanceDisks(self, instance)
3267 def _SafeShutdownInstanceDisks(lu, instance):
3268 """Shutdown block devices of an instance.
3270 This function checks if an instance is running, before calling
3271 _ShutdownInstanceDisks.
3274 pnode = instance.primary_node
3275 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3276 ins_l.Raise("Can't contact node %s" % pnode)
3278 if instance.name in ins_l.payload:
3279 raise errors.OpExecError("Instance is running, can't shutdown"
3282 _ShutdownInstanceDisks(lu, instance)
3285 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3286 """Shutdown block devices of an instance.
3288 This does the shutdown on all nodes of the instance.
3290 If the ignore_primary is false, errors on the primary node are
3295 for disk in instance.disks:
3296 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3297 lu.cfg.SetDiskID(top_disk, node)
3298 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3299 msg = result.fail_msg
3301 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3302 disk.iv_name, node, msg)
3303 if not ignore_primary or node != instance.primary_node:
3308 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3309 """Checks if a node has enough free memory.
3311 This function check if a given node has the needed amount of free
3312 memory. In case the node has less memory or we cannot get the
3313 information from the node, this function raise an OpPrereqError
3316 @type lu: C{LogicalUnit}
3317 @param lu: a logical unit from which we get configuration data
3319 @param node: the node to check
3320 @type reason: C{str}
3321 @param reason: string to use in the error message
3322 @type requested: C{int}
3323 @param requested: the amount of memory in MiB to check for
3324 @type hypervisor_name: C{str}
3325 @param hypervisor_name: the hypervisor to ask for memory stats
3326 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3327 we cannot check the node
3330 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3331 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3332 free_mem = nodeinfo[node].payload.get('memory_free', None)
3333 if not isinstance(free_mem, int):
3334 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3335 " was '%s'" % (node, free_mem))
3336 if requested > free_mem:
3337 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3338 " needed %s MiB, available %s MiB" %
3339 (node, reason, requested, free_mem))
3342 class LUStartupInstance(LogicalUnit):
3343 """Starts an instance.
3346 HPATH = "instance-start"
3347 HTYPE = constants.HTYPE_INSTANCE
3348 _OP_REQP = ["instance_name", "force"]
3351 def ExpandNames(self):
3352 self._ExpandAndLockInstance()
3354 def BuildHooksEnv(self):
3357 This runs on master, primary and secondary nodes of the instance.
3361 "FORCE": self.op.force,
3363 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3364 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3367 def CheckPrereq(self):
3368 """Check prerequisites.
3370 This checks that the instance is in the cluster.
3373 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3374 assert self.instance is not None, \
3375 "Cannot retrieve locked instance %s" % self.op.instance_name
3378 self.beparams = getattr(self.op, "beparams", {})
3380 if not isinstance(self.beparams, dict):
3381 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3382 " dict" % (type(self.beparams), ))
3383 # fill the beparams dict
3384 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3385 self.op.beparams = self.beparams
3388 self.hvparams = getattr(self.op, "hvparams", {})
3390 if not isinstance(self.hvparams, dict):
3391 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3392 " dict" % (type(self.hvparams), ))
3394 # check hypervisor parameter syntax (locally)
3395 cluster = self.cfg.GetClusterInfo()
3396 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3397 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3399 filled_hvp.update(self.hvparams)
3400 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3401 hv_type.CheckParameterSyntax(filled_hvp)
3402 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3403 self.op.hvparams = self.hvparams
3405 _CheckNodeOnline(self, instance.primary_node)
3407 bep = self.cfg.GetClusterInfo().FillBE(instance)
3408 # check bridges existence
3409 _CheckInstanceBridgesExist(self, instance)
3411 remote_info = self.rpc.call_instance_info(instance.primary_node,
3413 instance.hypervisor)
3414 remote_info.Raise("Error checking node %s" % instance.primary_node,
3416 if not remote_info.payload: # not running already
3417 _CheckNodeFreeMemory(self, instance.primary_node,
3418 "starting instance %s" % instance.name,
3419 bep[constants.BE_MEMORY], instance.hypervisor)
3421 def Exec(self, feedback_fn):
3422 """Start the instance.
3425 instance = self.instance
3426 force = self.op.force
3428 self.cfg.MarkInstanceUp(instance.name)
3430 node_current = instance.primary_node
3432 _StartInstanceDisks(self, instance, force)
3434 result = self.rpc.call_instance_start(node_current, instance,
3435 self.hvparams, self.beparams)
3436 msg = result.fail_msg
3438 _ShutdownInstanceDisks(self, instance)
3439 raise errors.OpExecError("Could not start instance: %s" % msg)
3442 class LURebootInstance(LogicalUnit):
3443 """Reboot an instance.
3446 HPATH = "instance-reboot"
3447 HTYPE = constants.HTYPE_INSTANCE
3448 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3451 def ExpandNames(self):
3452 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3453 constants.INSTANCE_REBOOT_HARD,
3454 constants.INSTANCE_REBOOT_FULL]:
3455 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3456 (constants.INSTANCE_REBOOT_SOFT,
3457 constants.INSTANCE_REBOOT_HARD,
3458 constants.INSTANCE_REBOOT_FULL))
3459 self._ExpandAndLockInstance()
3461 def BuildHooksEnv(self):
3464 This runs on master, primary and secondary nodes of the instance.
3468 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3469 "REBOOT_TYPE": self.op.reboot_type,
3471 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3472 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3475 def CheckPrereq(self):
3476 """Check prerequisites.
3478 This checks that the instance is in the cluster.
3481 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3482 assert self.instance is not None, \
3483 "Cannot retrieve locked instance %s" % self.op.instance_name
3485 _CheckNodeOnline(self, instance.primary_node)
3487 # check bridges existence
3488 _CheckInstanceBridgesExist(self, instance)
3490 def Exec(self, feedback_fn):
3491 """Reboot the instance.
3494 instance = self.instance
3495 ignore_secondaries = self.op.ignore_secondaries
3496 reboot_type = self.op.reboot_type
3498 node_current = instance.primary_node
3500 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3501 constants.INSTANCE_REBOOT_HARD]:
3502 for disk in instance.disks:
3503 self.cfg.SetDiskID(disk, node_current)
3504 result = self.rpc.call_instance_reboot(node_current, instance,
3506 result.Raise("Could not reboot instance")
3508 result = self.rpc.call_instance_shutdown(node_current, instance)
3509 result.Raise("Could not shutdown instance for full reboot")
3510 _ShutdownInstanceDisks(self, instance)
3511 _StartInstanceDisks(self, instance, ignore_secondaries)
3512 result = self.rpc.call_instance_start(node_current, instance, None, None)
3513 msg = result.fail_msg
3515 _ShutdownInstanceDisks(self, instance)
3516 raise errors.OpExecError("Could not start instance for"
3517 " full reboot: %s" % msg)
3519 self.cfg.MarkInstanceUp(instance.name)
3522 class LUShutdownInstance(LogicalUnit):
3523 """Shutdown an instance.
3526 HPATH = "instance-stop"
3527 HTYPE = constants.HTYPE_INSTANCE
3528 _OP_REQP = ["instance_name"]
3531 def ExpandNames(self):
3532 self._ExpandAndLockInstance()
3534 def BuildHooksEnv(self):
3537 This runs on master, primary and secondary nodes of the instance.
3540 env = _BuildInstanceHookEnvByObject(self, self.instance)
3541 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3544 def CheckPrereq(self):
3545 """Check prerequisites.
3547 This checks that the instance is in the cluster.
3550 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3551 assert self.instance is not None, \
3552 "Cannot retrieve locked instance %s" % self.op.instance_name
3553 _CheckNodeOnline(self, self.instance.primary_node)
3555 def Exec(self, feedback_fn):
3556 """Shutdown the instance.
3559 instance = self.instance
3560 node_current = instance.primary_node
3561 self.cfg.MarkInstanceDown(instance.name)
3562 result = self.rpc.call_instance_shutdown(node_current, instance)
3563 msg = result.fail_msg
3565 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3567 _ShutdownInstanceDisks(self, instance)
3570 class LUReinstallInstance(LogicalUnit):
3571 """Reinstall an instance.
3574 HPATH = "instance-reinstall"
3575 HTYPE = constants.HTYPE_INSTANCE
3576 _OP_REQP = ["instance_name"]
3579 def ExpandNames(self):
3580 self._ExpandAndLockInstance()
3582 def BuildHooksEnv(self):
3585 This runs on master, primary and secondary nodes of the instance.
3588 env = _BuildInstanceHookEnvByObject(self, self.instance)
3589 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3592 def CheckPrereq(self):
3593 """Check prerequisites.
3595 This checks that the instance is in the cluster and is not running.
3598 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3599 assert instance is not None, \
3600 "Cannot retrieve locked instance %s" % self.op.instance_name
3601 _CheckNodeOnline(self, instance.primary_node)
3603 if instance.disk_template == constants.DT_DISKLESS:
3604 raise errors.OpPrereqError("Instance '%s' has no disks" %
3605 self.op.instance_name)
3606 if instance.admin_up:
3607 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3608 self.op.instance_name)
3609 remote_info = self.rpc.call_instance_info(instance.primary_node,
3611 instance.hypervisor)
3612 remote_info.Raise("Error checking node %s" % instance.primary_node,
3614 if remote_info.payload:
3615 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3616 (self.op.instance_name,
3617 instance.primary_node))
3619 self.op.os_type = getattr(self.op, "os_type", None)
3620 if self.op.os_type is not None:
3622 pnode = self.cfg.GetNodeInfo(
3623 self.cfg.ExpandNodeName(instance.primary_node))
3625 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3627 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3628 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3629 (self.op.os_type, pnode.name), prereq=True)
3631 self.instance = instance
3633 def Exec(self, feedback_fn):
3634 """Reinstall the instance.
3637 inst = self.instance
3639 if self.op.os_type is not None:
3640 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3641 inst.os = self.op.os_type
3642 self.cfg.Update(inst)
3644 _StartInstanceDisks(self, inst, None)
3646 feedback_fn("Running the instance OS create scripts...")
3647 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3648 result.Raise("Could not install OS for instance %s on node %s" %
3649 (inst.name, inst.primary_node))
3651 _ShutdownInstanceDisks(self, inst)
3654 class LURecreateInstanceDisks(LogicalUnit):
3655 """Recreate an instance's missing disks.
3658 HPATH = "instance-recreate-disks"
3659 HTYPE = constants.HTYPE_INSTANCE
3660 _OP_REQP = ["instance_name", "disks"]
3663 def CheckArguments(self):
3664 """Check the arguments.
3667 if not isinstance(self.op.disks, list):
3668 raise errors.OpPrereqError("Invalid disks parameter")
3669 for item in self.op.disks:
3670 if (not isinstance(item, int) or
3672 raise errors.OpPrereqError("Invalid disk specification '%s'" %
3675 def ExpandNames(self):
3676 self._ExpandAndLockInstance()
3678 def BuildHooksEnv(self):
3681 This runs on master, primary and secondary nodes of the instance.
3684 env = _BuildInstanceHookEnvByObject(self, self.instance)
3685 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3688 def CheckPrereq(self):
3689 """Check prerequisites.
3691 This checks that the instance is in the cluster and is not running.
3694 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3695 assert instance is not None, \
3696 "Cannot retrieve locked instance %s" % self.op.instance_name
3697 _CheckNodeOnline(self, instance.primary_node)
3699 if instance.disk_template == constants.DT_DISKLESS:
3700 raise errors.OpPrereqError("Instance '%s' has no disks" %
3701 self.op.instance_name)
3702 if instance.admin_up:
3703 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3704 self.op.instance_name)
3705 remote_info = self.rpc.call_instance_info(instance.primary_node,
3707 instance.hypervisor)
3708 remote_info.Raise("Error checking node %s" % instance.primary_node,
3710 if remote_info.payload:
3711 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3712 (self.op.instance_name,
3713 instance.primary_node))
3715 if not self.op.disks:
3716 self.op.disks = range(len(instance.disks))
3718 for idx in self.op.disks:
3719 if idx >= len(instance.disks):
3720 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3722 self.instance = instance
3724 def Exec(self, feedback_fn):
3725 """Recreate the disks.
3729 for idx, disk in enumerate(self.instance.disks):
3730 if idx not in self.op.disks: # disk idx has not been passed in
3734 _CreateDisks(self, self.instance, to_skip=to_skip)
3737 class LURenameInstance(LogicalUnit):
3738 """Rename an instance.
3741 HPATH = "instance-rename"
3742 HTYPE = constants.HTYPE_INSTANCE
3743 _OP_REQP = ["instance_name", "new_name"]
3745 def BuildHooksEnv(self):
3748 This runs on master, primary and secondary nodes of the instance.
3751 env = _BuildInstanceHookEnvByObject(self, self.instance)
3752 env["INSTANCE_NEW_NAME"] = self.op.new_name
3753 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3756 def CheckPrereq(self):
3757 """Check prerequisites.
3759 This checks that the instance is in the cluster and is not running.
3762 instance = self.cfg.GetInstanceInfo(
3763 self.cfg.ExpandInstanceName(self.op.instance_name))
3764 if instance is None:
3765 raise errors.OpPrereqError("Instance '%s' not known" %
3766 self.op.instance_name)
3767 _CheckNodeOnline(self, instance.primary_node)
3769 if instance.admin_up:
3770 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3771 self.op.instance_name)
3772 remote_info = self.rpc.call_instance_info(instance.primary_node,
3774 instance.hypervisor)
3775 remote_info.Raise("Error checking node %s" % instance.primary_node,
3777 if remote_info.payload:
3778 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3779 (self.op.instance_name,
3780 instance.primary_node))
3781 self.instance = instance
3783 # new name verification
3784 name_info = utils.HostInfo(self.op.new_name)
3786 self.op.new_name = new_name = name_info.name
3787 instance_list = self.cfg.GetInstanceList()
3788 if new_name in instance_list:
3789 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3792 if not getattr(self.op, "ignore_ip", False):
3793 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3794 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3795 (name_info.ip, new_name))
3798 def Exec(self, feedback_fn):
3799 """Reinstall the instance.
3802 inst = self.instance
3803 old_name = inst.name
3805 if inst.disk_template == constants.DT_FILE:
3806 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3808 self.cfg.RenameInstance(inst.name, self.op.new_name)
3809 # Change the instance lock. This is definitely safe while we hold the BGL
3810 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3811 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3813 # re-read the instance from the configuration after rename
3814 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3816 if inst.disk_template == constants.DT_FILE:
3817 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3818 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3819 old_file_storage_dir,
3820 new_file_storage_dir)
3821 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3822 " (but the instance has been renamed in Ganeti)" %
3823 (inst.primary_node, old_file_storage_dir,
3824 new_file_storage_dir))
3826 _StartInstanceDisks(self, inst, None)
3828 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3830 msg = result.fail_msg
3832 msg = ("Could not run OS rename script for instance %s on node %s"
3833 " (but the instance has been renamed in Ganeti): %s" %
3834 (inst.name, inst.primary_node, msg))
3835 self.proc.LogWarning(msg)
3837 _ShutdownInstanceDisks(self, inst)
3840 class LURemoveInstance(LogicalUnit):
3841 """Remove an instance.
3844 HPATH = "instance-remove"
3845 HTYPE = constants.HTYPE_INSTANCE
3846 _OP_REQP = ["instance_name", "ignore_failures"]
3849 def ExpandNames(self):
3850 self._ExpandAndLockInstance()
3851 self.needed_locks[locking.LEVEL_NODE] = []
3852 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3854 def DeclareLocks(self, level):
3855 if level == locking.LEVEL_NODE:
3856 self._LockInstancesNodes()
3858 def BuildHooksEnv(self):
3861 This runs on master, primary and secondary nodes of the instance.
3864 env = _BuildInstanceHookEnvByObject(self, self.instance)
3865 nl = [self.cfg.GetMasterNode()]
3868 def CheckPrereq(self):
3869 """Check prerequisites.
3871 This checks that the instance is in the cluster.
3874 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3875 assert self.instance is not None, \
3876 "Cannot retrieve locked instance %s" % self.op.instance_name
3878 def Exec(self, feedback_fn):
3879 """Remove the instance.
3882 instance = self.instance
3883 logging.info("Shutting down instance %s on node %s",
3884 instance.name, instance.primary_node)
3886 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3887 msg = result.fail_msg
3889 if self.op.ignore_failures:
3890 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3892 raise errors.OpExecError("Could not shutdown instance %s on"
3894 (instance.name, instance.primary_node, msg))
3896 logging.info("Removing block devices for instance %s", instance.name)
3898 if not _RemoveDisks(self, instance):
3899 if self.op.ignore_failures:
3900 feedback_fn("Warning: can't remove instance's disks")
3902 raise errors.OpExecError("Can't remove instance's disks")
3904 logging.info("Removing instance %s out of cluster config", instance.name)
3906 self.cfg.RemoveInstance(instance.name)
3907 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3910 class LUQueryInstances(NoHooksLU):
3911 """Logical unit for querying instances.
3914 _OP_REQP = ["output_fields", "names", "use_locking"]
3916 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3918 "disk_template", "ip", "mac", "bridge",
3919 "nic_mode", "nic_link",
3920 "sda_size", "sdb_size", "vcpus", "tags",
3921 "network_port", "beparams",
3922 r"(disk)\.(size)/([0-9]+)",
3923 r"(disk)\.(sizes)", "disk_usage",
3924 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3925 r"(nic)\.(bridge)/([0-9]+)",
3926 r"(nic)\.(macs|ips|modes|links|bridges)",
3927 r"(disk|nic)\.(count)",
3928 "serial_no", "hypervisor", "hvparams",
3932 for name in constants.HVS_PARAMETERS] +
3934 for name in constants.BES_PARAMETERS])
3935 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3938 def ExpandNames(self):
3939 _CheckOutputFields(static=self._FIELDS_STATIC,
3940 dynamic=self._FIELDS_DYNAMIC,
3941 selected=self.op.output_fields)
3943 self.needed_locks = {}
3944 self.share_locks[locking.LEVEL_INSTANCE] = 1
3945 self.share_locks[locking.LEVEL_NODE] = 1
3948 self.wanted = _GetWantedInstances(self, self.op.names)
3950 self.wanted = locking.ALL_SET
3952 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3953 self.do_locking = self.do_node_query and self.op.use_locking
3955 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3956 self.needed_locks[locking.LEVEL_NODE] = []
3957 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3959 def DeclareLocks(self, level):
3960 if level == locking.LEVEL_NODE and self.do_locking:
3961 self._LockInstancesNodes()
3963 def CheckPrereq(self):
3964 """Check prerequisites.
3969 def Exec(self, feedback_fn):
3970 """Computes the list of nodes and their attributes.
3973 all_info = self.cfg.GetAllInstancesInfo()
3974 if self.wanted == locking.ALL_SET:
3975 # caller didn't specify instance names, so ordering is not important
3977 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3979 instance_names = all_info.keys()
3980 instance_names = utils.NiceSort(instance_names)
3982 # caller did specify names, so we must keep the ordering
3984 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3986 tgt_set = all_info.keys()
3987 missing = set(self.wanted).difference(tgt_set)
3989 raise errors.OpExecError("Some instances were removed before"
3990 " retrieving their data: %s" % missing)
3991 instance_names = self.wanted
3993 instance_list = [all_info[iname] for iname in instance_names]
3995 # begin data gathering
3997 nodes = frozenset([inst.primary_node for inst in instance_list])
3998 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4002 if self.do_node_query:
4004 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4006 result = node_data[name]
4008 # offline nodes will be in both lists
4009 off_nodes.append(name)
4010 if result.failed or result.fail_msg:
4011 bad_nodes.append(name)
4014 live_data.update(result.payload)
4015 # else no instance is alive
4017 live_data = dict([(name, {}) for name in instance_names])
4019 # end data gathering
4024 cluster = self.cfg.GetClusterInfo()
4025 for instance in instance_list:
4027 i_hv = cluster.FillHV(instance)
4028 i_be = cluster.FillBE(instance)
4029 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4030 nic.nicparams) for nic in instance.nics]
4031 for field in self.op.output_fields:
4032 st_match = self._FIELDS_STATIC.Matches(field)
4037 elif field == "pnode":
4038 val = instance.primary_node
4039 elif field == "snodes":
4040 val = list(instance.secondary_nodes)
4041 elif field == "admin_state":
4042 val = instance.admin_up
4043 elif field == "oper_state":
4044 if instance.primary_node in bad_nodes:
4047 val = bool(live_data.get(instance.name))
4048 elif field == "status":
4049 if instance.primary_node in off_nodes:
4050 val = "ERROR_nodeoffline"
4051 elif instance.primary_node in bad_nodes:
4052 val = "ERROR_nodedown"
4054 running = bool(live_data.get(instance.name))
4056 if instance.admin_up:
4061 if instance.admin_up:
4065 elif field == "oper_ram":
4066 if instance.primary_node in bad_nodes:
4068 elif instance.name in live_data:
4069 val = live_data[instance.name].get("memory", "?")
4072 elif field == "vcpus":
4073 val = i_be[constants.BE_VCPUS]
4074 elif field == "disk_template":
4075 val = instance.disk_template
4078 val = instance.nics[0].ip
4081 elif field == "nic_mode":
4083 val = i_nicp[0][constants.NIC_MODE]
4086 elif field == "nic_link":
4088 val = i_nicp[0][constants.NIC_LINK]
4091 elif field == "bridge":
4092 if (instance.nics and
4093 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4094 val = i_nicp[0][constants.NIC_LINK]
4097 elif field == "mac":
4099 val = instance.nics[0].mac
4102 elif field == "sda_size" or field == "sdb_size":
4103 idx = ord(field[2]) - ord('a')
4105 val = instance.FindDisk(idx).size
4106 except errors.OpPrereqError:
4108 elif field == "disk_usage": # total disk usage per node
4109 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4110 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4111 elif field == "tags":
4112 val = list(instance.GetTags())
4113 elif field == "serial_no":
4114 val = instance.serial_no
4115 elif field == "ctime":
4116 val = instance.ctime
4117 elif field == "mtime":
4118 val = instance.mtime
4119 elif field == "network_port":
4120 val = instance.network_port
4121 elif field == "hypervisor":
4122 val = instance.hypervisor
4123 elif field == "hvparams":
4125 elif (field.startswith(HVPREFIX) and
4126 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
4127 val = i_hv.get(field[len(HVPREFIX):], None)
4128 elif field == "beparams":
4130 elif (field.startswith(BEPREFIX) and
4131 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4132 val = i_be.get(field[len(BEPREFIX):], None)
4133 elif st_match and st_match.groups():
4134 # matches a variable list
4135 st_groups = st_match.groups()
4136 if st_groups and st_groups[0] == "disk":
4137 if st_groups[1] == "count":
4138 val = len(instance.disks)
4139 elif st_groups[1] == "sizes":
4140 val = [disk.size for disk in instance.disks]
4141 elif st_groups[1] == "size":
4143 val = instance.FindDisk(st_groups[2]).size
4144 except errors.OpPrereqError:
4147 assert False, "Unhandled disk parameter"
4148 elif st_groups[0] == "nic":
4149 if st_groups[1] == "count":
4150 val = len(instance.nics)
4151 elif st_groups[1] == "macs":
4152 val = [nic.mac for nic in instance.nics]
4153 elif st_groups[1] == "ips":
4154 val = [nic.ip for nic in instance.nics]
4155 elif st_groups[1] == "modes":
4156 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4157 elif st_groups[1] == "links":
4158 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4159 elif st_groups[1] == "bridges":
4162 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4163 val.append(nicp[constants.NIC_LINK])
4168 nic_idx = int(st_groups[2])
4169 if nic_idx >= len(instance.nics):
4172 if st_groups[1] == "mac":
4173 val = instance.nics[nic_idx].mac
4174 elif st_groups[1] == "ip":
4175 val = instance.nics[nic_idx].ip
4176 elif st_groups[1] == "mode":
4177 val = i_nicp[nic_idx][constants.NIC_MODE]
4178 elif st_groups[1] == "link":
4179 val = i_nicp[nic_idx][constants.NIC_LINK]
4180 elif st_groups[1] == "bridge":
4181 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4182 if nic_mode == constants.NIC_MODE_BRIDGED:
4183 val = i_nicp[nic_idx][constants.NIC_LINK]
4187 assert False, "Unhandled NIC parameter"
4189 assert False, ("Declared but unhandled variable parameter '%s'" %
4192 assert False, "Declared but unhandled parameter '%s'" % field
4199 class LUFailoverInstance(LogicalUnit):
4200 """Failover an instance.
4203 HPATH = "instance-failover"
4204 HTYPE = constants.HTYPE_INSTANCE
4205 _OP_REQP = ["instance_name", "ignore_consistency"]
4208 def ExpandNames(self):
4209 self._ExpandAndLockInstance()
4210 self.needed_locks[locking.LEVEL_NODE] = []
4211 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4213 def DeclareLocks(self, level):
4214 if level == locking.LEVEL_NODE:
4215 self._LockInstancesNodes()
4217 def BuildHooksEnv(self):
4220 This runs on master, primary and secondary nodes of the instance.
4224 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4226 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4227 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4230 def CheckPrereq(self):
4231 """Check prerequisites.
4233 This checks that the instance is in the cluster.
4236 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4237 assert self.instance is not None, \
4238 "Cannot retrieve locked instance %s" % self.op.instance_name
4240 bep = self.cfg.GetClusterInfo().FillBE(instance)
4241 if instance.disk_template not in constants.DTS_NET_MIRROR:
4242 raise errors.OpPrereqError("Instance's disk layout is not"
4243 " network mirrored, cannot failover.")
4245 secondary_nodes = instance.secondary_nodes
4246 if not secondary_nodes:
4247 raise errors.ProgrammerError("no secondary node but using "
4248 "a mirrored disk template")
4250 target_node = secondary_nodes[0]
4251 _CheckNodeOnline(self, target_node)
4252 _CheckNodeNotDrained(self, target_node)
4253 if instance.admin_up:
4254 # check memory requirements on the secondary node
4255 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4256 instance.name, bep[constants.BE_MEMORY],
4257 instance.hypervisor)
4259 self.LogInfo("Not checking memory on the secondary node as"
4260 " instance will not be started")
4262 # check bridge existance
4263 _CheckInstanceBridgesExist(self, instance, node=target_node)
4265 def Exec(self, feedback_fn):
4266 """Failover an instance.
4268 The failover is done by shutting it down on its present node and
4269 starting it on the secondary.
4272 instance = self.instance
4274 source_node = instance.primary_node
4275 target_node = instance.secondary_nodes[0]
4277 feedback_fn("* checking disk consistency between source and target")
4278 for dev in instance.disks:
4279 # for drbd, these are drbd over lvm
4280 if not _CheckDiskConsistency(self, dev, target_node, False):
4281 if instance.admin_up and not self.op.ignore_consistency:
4282 raise errors.OpExecError("Disk %s is degraded on target node,"
4283 " aborting failover." % dev.iv_name)
4285 feedback_fn("* shutting down instance on source node")
4286 logging.info("Shutting down instance %s on node %s",
4287 instance.name, source_node)
4289 result = self.rpc.call_instance_shutdown(source_node, instance)
4290 msg = result.fail_msg
4292 if self.op.ignore_consistency:
4293 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4294 " Proceeding anyway. Please make sure node"
4295 " %s is down. Error details: %s",
4296 instance.name, source_node, source_node, msg)
4298 raise errors.OpExecError("Could not shutdown instance %s on"
4300 (instance.name, source_node, msg))
4302 feedback_fn("* deactivating the instance's disks on source node")
4303 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4304 raise errors.OpExecError("Can't shut down the instance's disks.")
4306 instance.primary_node = target_node
4307 # distribute new instance config to the other nodes
4308 self.cfg.Update(instance)
4310 # Only start the instance if it's marked as up
4311 if instance.admin_up:
4312 feedback_fn("* activating the instance's disks on target node")
4313 logging.info("Starting instance %s on node %s",
4314 instance.name, target_node)
4316 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4317 ignore_secondaries=True)
4319 _ShutdownInstanceDisks(self, instance)
4320 raise errors.OpExecError("Can't activate the instance's disks")
4322 feedback_fn("* starting the instance on the target node")
4323 result = self.rpc.call_instance_start(target_node, instance, None, None)
4324 msg = result.fail_msg
4326 _ShutdownInstanceDisks(self, instance)
4327 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4328 (instance.name, target_node, msg))
4331 class LUMigrateInstance(LogicalUnit):
4332 """Migrate an instance.
4334 This is migration without shutting down, compared to the failover,
4335 which is done with shutdown.
4338 HPATH = "instance-migrate"
4339 HTYPE = constants.HTYPE_INSTANCE
4340 _OP_REQP = ["instance_name", "live", "cleanup"]
4344 def ExpandNames(self):
4345 self._ExpandAndLockInstance()
4347 self.needed_locks[locking.LEVEL_NODE] = []
4348 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4350 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4351 self.op.live, self.op.cleanup)
4352 self.tasklets = [self._migrater]
4354 def DeclareLocks(self, level):
4355 if level == locking.LEVEL_NODE:
4356 self._LockInstancesNodes()
4358 def BuildHooksEnv(self):
4361 This runs on master, primary and secondary nodes of the instance.
4364 instance = self._migrater.instance
4365 env = _BuildInstanceHookEnvByObject(self, instance)
4366 env["MIGRATE_LIVE"] = self.op.live
4367 env["MIGRATE_CLEANUP"] = self.op.cleanup
4368 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4372 class LUMigrateNode(LogicalUnit):
4373 """Migrate all instances from a node.
4376 HPATH = "node-migrate"
4377 HTYPE = constants.HTYPE_NODE
4378 _OP_REQP = ["node_name", "live"]
4381 def ExpandNames(self):
4382 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4383 if self.op.node_name is None:
4384 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4386 self.needed_locks = {
4387 locking.LEVEL_NODE: [self.op.node_name],
4390 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4392 # Create tasklets for migrating instances for all instances on this node
4396 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4397 logging.debug("Migrating instance %s", inst.name)
4398 names.append(inst.name)
4400 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4402 self.tasklets = tasklets
4404 # Declare instance locks
4405 self.needed_locks[locking.LEVEL_INSTANCE] = names
4407 def DeclareLocks(self, level):
4408 if level == locking.LEVEL_NODE:
4409 self._LockInstancesNodes()
4411 def BuildHooksEnv(self):
4414 This runs on the master, the primary and all the secondaries.
4418 "NODE_NAME": self.op.node_name,
4421 nl = [self.cfg.GetMasterNode()]
4423 return (env, nl, nl)
4426 class TLMigrateInstance(Tasklet):
4427 def __init__(self, lu, instance_name, live, cleanup):
4428 """Initializes this class.
4431 Tasklet.__init__(self, lu)
4434 self.instance_name = instance_name
4436 self.cleanup = cleanup
4438 def CheckPrereq(self):
4439 """Check prerequisites.
4441 This checks that the instance is in the cluster.
4444 instance = self.cfg.GetInstanceInfo(
4445 self.cfg.ExpandInstanceName(self.instance_name))
4446 if instance is None:
4447 raise errors.OpPrereqError("Instance '%s' not known" %
4450 if instance.disk_template != constants.DT_DRBD8:
4451 raise errors.OpPrereqError("Instance's disk layout is not"
4452 " drbd8, cannot migrate.")
4454 secondary_nodes = instance.secondary_nodes
4455 if not secondary_nodes:
4456 raise errors.ConfigurationError("No secondary node but using"
4457 " drbd8 disk template")
4459 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4461 target_node = secondary_nodes[0]
4462 # check memory requirements on the secondary node
4463 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4464 instance.name, i_be[constants.BE_MEMORY],
4465 instance.hypervisor)
4467 # check bridge existance
4468 _CheckInstanceBridgesExist(self, instance, node=target_node)
4470 if not self.cleanup:
4471 _CheckNodeNotDrained(self, target_node)
4472 result = self.rpc.call_instance_migratable(instance.primary_node,
4474 result.Raise("Can't migrate, please use failover", prereq=True)
4476 self.instance = instance
4478 def _WaitUntilSync(self):
4479 """Poll with custom rpc for disk sync.
4481 This uses our own step-based rpc call.
4484 self.feedback_fn("* wait until resync is done")
4488 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4490 self.instance.disks)
4492 for node, nres in result.items():
4493 nres.Raise("Cannot resync disks on node %s" % node)
4494 node_done, node_percent = nres.payload
4495 all_done = all_done and node_done
4496 if node_percent is not None:
4497 min_percent = min(min_percent, node_percent)
4499 if min_percent < 100:
4500 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4503 def _EnsureSecondary(self, node):
4504 """Demote a node to secondary.
4507 self.feedback_fn("* switching node %s to secondary mode" % node)
4509 for dev in self.instance.disks:
4510 self.cfg.SetDiskID(dev, node)
4512 result = self.rpc.call_blockdev_close(node, self.instance.name,
4513 self.instance.disks)
4514 result.Raise("Cannot change disk to secondary on node %s" % node)
4516 def _GoStandalone(self):
4517 """Disconnect from the network.
4520 self.feedback_fn("* changing into standalone mode")
4521 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4522 self.instance.disks)
4523 for node, nres in result.items():
4524 nres.Raise("Cannot disconnect disks node %s" % node)
4526 def _GoReconnect(self, multimaster):
4527 """Reconnect to the network.
4533 msg = "single-master"
4534 self.feedback_fn("* changing disks into %s mode" % msg)
4535 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4536 self.instance.disks,
4537 self.instance.name, multimaster)
4538 for node, nres in result.items():
4539 nres.Raise("Cannot change disks config on node %s" % node)
4541 def _ExecCleanup(self):
4542 """Try to cleanup after a failed migration.
4544 The cleanup is done by:
4545 - check that the instance is running only on one node
4546 (and update the config if needed)
4547 - change disks on its secondary node to secondary
4548 - wait until disks are fully synchronized
4549 - disconnect from the network
4550 - change disks into single-master mode
4551 - wait again until disks are fully synchronized
4554 instance = self.instance
4555 target_node = self.target_node
4556 source_node = self.source_node
4558 # check running on only one node
4559 self.feedback_fn("* checking where the instance actually runs"
4560 " (if this hangs, the hypervisor might be in"
4562 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4563 for node, result in ins_l.items():
4564 result.Raise("Can't contact node %s" % node)
4566 runningon_source = instance.name in ins_l[source_node].payload
4567 runningon_target = instance.name in ins_l[target_node].payload
4569 if runningon_source and runningon_target:
4570 raise errors.OpExecError("Instance seems to be running on two nodes,"
4571 " or the hypervisor is confused. You will have"
4572 " to ensure manually that it runs only on one"
4573 " and restart this operation.")
4575 if not (runningon_source or runningon_target):
4576 raise errors.OpExecError("Instance does not seem to be running at all."
4577 " In this case, it's safer to repair by"
4578 " running 'gnt-instance stop' to ensure disk"
4579 " shutdown, and then restarting it.")
4581 if runningon_target:
4582 # the migration has actually succeeded, we need to update the config
4583 self.feedback_fn("* instance running on secondary node (%s),"
4584 " updating config" % target_node)
4585 instance.primary_node = target_node
4586 self.cfg.Update(instance)
4587 demoted_node = source_node
4589 self.feedback_fn("* instance confirmed to be running on its"
4590 " primary node (%s)" % source_node)
4591 demoted_node = target_node
4593 self._EnsureSecondary(demoted_node)
4595 self._WaitUntilSync()
4596 except errors.OpExecError:
4597 # we ignore here errors, since if the device is standalone, it
4598 # won't be able to sync
4600 self._GoStandalone()
4601 self._GoReconnect(False)
4602 self._WaitUntilSync()
4604 self.feedback_fn("* done")
4606 def _RevertDiskStatus(self):
4607 """Try to revert the disk status after a failed migration.
4610 target_node = self.target_node
4612 self._EnsureSecondary(target_node)
4613 self._GoStandalone()
4614 self._GoReconnect(False)
4615 self._WaitUntilSync()
4616 except errors.OpExecError, err:
4617 self.lu.LogWarning("Migration failed and I can't reconnect the"
4618 " drives: error '%s'\n"
4619 "Please look and recover the instance status" %
4622 def _AbortMigration(self):
4623 """Call the hypervisor code to abort a started migration.
4626 instance = self.instance
4627 target_node = self.target_node
4628 migration_info = self.migration_info
4630 abort_result = self.rpc.call_finalize_migration(target_node,
4634 abort_msg = abort_result.fail_msg
4636 logging.error("Aborting migration failed on target node %s: %s" %
4637 (target_node, abort_msg))
4638 # Don't raise an exception here, as we stil have to try to revert the
4639 # disk status, even if this step failed.
4641 def _ExecMigration(self):
4642 """Migrate an instance.
4644 The migrate is done by:
4645 - change the disks into dual-master mode
4646 - wait until disks are fully synchronized again
4647 - migrate the instance
4648 - change disks on the new secondary node (the old primary) to secondary
4649 - wait until disks are fully synchronized
4650 - change disks into single-master mode
4653 instance = self.instance
4654 target_node = self.target_node
4655 source_node = self.source_node
4657 self.feedback_fn("* checking disk consistency between source and target")
4658 for dev in instance.disks:
4659 if not _CheckDiskConsistency(self, dev, target_node, False):
4660 raise errors.OpExecError("Disk %s is degraded or not fully"
4661 " synchronized on target node,"
4662 " aborting migrate." % dev.iv_name)
4664 # First get the migration information from the remote node
4665 result = self.rpc.call_migration_info(source_node, instance)
4666 msg = result.fail_msg
4668 log_err = ("Failed fetching source migration information from %s: %s" %
4670 logging.error(log_err)
4671 raise errors.OpExecError(log_err)
4673 self.migration_info = migration_info = result.payload
4675 # Then switch the disks to master/master mode
4676 self._EnsureSecondary(target_node)
4677 self._GoStandalone()
4678 self._GoReconnect(True)
4679 self._WaitUntilSync()
4681 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4682 result = self.rpc.call_accept_instance(target_node,
4685 self.nodes_ip[target_node])
4687 msg = result.fail_msg
4689 logging.error("Instance pre-migration failed, trying to revert"
4690 " disk status: %s", msg)
4691 self._AbortMigration()
4692 self._RevertDiskStatus()
4693 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4694 (instance.name, msg))
4696 self.feedback_fn("* migrating instance to %s" % target_node)
4698 result = self.rpc.call_instance_migrate(source_node, instance,
4699 self.nodes_ip[target_node],
4701 msg = result.fail_msg
4703 logging.error("Instance migration failed, trying to revert"
4704 " disk status: %s", msg)
4705 self._AbortMigration()
4706 self._RevertDiskStatus()
4707 raise errors.OpExecError("Could not migrate instance %s: %s" %
4708 (instance.name, msg))
4711 instance.primary_node = target_node
4712 # distribute new instance config to the other nodes
4713 self.cfg.Update(instance)
4715 result = self.rpc.call_finalize_migration(target_node,
4719 msg = result.fail_msg
4721 logging.error("Instance migration succeeded, but finalization failed:"
4723 raise errors.OpExecError("Could not finalize instance migration: %s" %
4726 self._EnsureSecondary(source_node)
4727 self._WaitUntilSync()
4728 self._GoStandalone()
4729 self._GoReconnect(False)
4730 self._WaitUntilSync()
4732 self.feedback_fn("* done")
4734 def Exec(self, feedback_fn):
4735 """Perform the migration.
4738 feedback_fn("Migrating instance %s" % self.instance.name)
4740 self.feedback_fn = feedback_fn
4742 self.source_node = self.instance.primary_node
4743 self.target_node = self.instance.secondary_nodes[0]
4744 self.all_nodes = [self.source_node, self.target_node]
4746 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4747 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4751 return self._ExecCleanup()
4753 return self._ExecMigration()
4756 def _CreateBlockDev(lu, node, instance, device, force_create,
4758 """Create a tree of block devices on a given node.
4760 If this device type has to be created on secondaries, create it and
4763 If not, just recurse to children keeping the same 'force' value.
4765 @param lu: the lu on whose behalf we execute
4766 @param node: the node on which to create the device
4767 @type instance: L{objects.Instance}
4768 @param instance: the instance which owns the device
4769 @type device: L{objects.Disk}
4770 @param device: the device to create
4771 @type force_create: boolean
4772 @param force_create: whether to force creation of this device; this
4773 will be change to True whenever we find a device which has
4774 CreateOnSecondary() attribute
4775 @param info: the extra 'metadata' we should attach to the device
4776 (this will be represented as a LVM tag)
4777 @type force_open: boolean
4778 @param force_open: this parameter will be passes to the
4779 L{backend.BlockdevCreate} function where it specifies
4780 whether we run on primary or not, and it affects both
4781 the child assembly and the device own Open() execution
4784 if device.CreateOnSecondary():
4788 for child in device.children:
4789 _CreateBlockDev(lu, node, instance, child, force_create,
4792 if not force_create:
4795 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4798 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4799 """Create a single block device on a given node.
4801 This will not recurse over children of the device, so they must be
4804 @param lu: the lu on whose behalf we execute
4805 @param node: the node on which to create the device
4806 @type instance: L{objects.Instance}
4807 @param instance: the instance which owns the device
4808 @type device: L{objects.Disk}
4809 @param device: the device to create
4810 @param info: the extra 'metadata' we should attach to the device
4811 (this will be represented as a LVM tag)
4812 @type force_open: boolean
4813 @param force_open: this parameter will be passes to the
4814 L{backend.BlockdevCreate} function where it specifies
4815 whether we run on primary or not, and it affects both
4816 the child assembly and the device own Open() execution
4819 lu.cfg.SetDiskID(device, node)
4820 result = lu.rpc.call_blockdev_create(node, device, device.size,
4821 instance.name, force_open, info)
4822 result.Raise("Can't create block device %s on"
4823 " node %s for instance %s" % (device, node, instance.name))
4824 if device.physical_id is None:
4825 device.physical_id = result.payload
4828 def _GenerateUniqueNames(lu, exts):
4829 """Generate a suitable LV name.
4831 This will generate a logical volume name for the given instance.
4836 new_id = lu.cfg.GenerateUniqueID()
4837 results.append("%s%s" % (new_id, val))
4841 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4843 """Generate a drbd8 device complete with its children.
4846 port = lu.cfg.AllocatePort()
4847 vgname = lu.cfg.GetVGName()
4848 shared_secret = lu.cfg.GenerateDRBDSecret()
4849 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4850 logical_id=(vgname, names[0]))
4851 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4852 logical_id=(vgname, names[1]))
4853 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4854 logical_id=(primary, secondary, port,
4857 children=[dev_data, dev_meta],
4862 def _GenerateDiskTemplate(lu, template_name,
4863 instance_name, primary_node,
4864 secondary_nodes, disk_info,
4865 file_storage_dir, file_driver,
4867 """Generate the entire disk layout for a given template type.
4870 #TODO: compute space requirements
4872 vgname = lu.cfg.GetVGName()
4873 disk_count = len(disk_info)
4875 if template_name == constants.DT_DISKLESS:
4877 elif template_name == constants.DT_PLAIN:
4878 if len(secondary_nodes) != 0:
4879 raise errors.ProgrammerError("Wrong template configuration")
4881 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4882 for i in range(disk_count)])
4883 for idx, disk in enumerate(disk_info):
4884 disk_index = idx + base_index
4885 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4886 logical_id=(vgname, names[idx]),
4887 iv_name="disk/%d" % disk_index,
4889 disks.append(disk_dev)
4890 elif template_name == constants.DT_DRBD8:
4891 if len(secondary_nodes) != 1:
4892 raise errors.ProgrammerError("Wrong template configuration")
4893 remote_node = secondary_nodes[0]
4894 minors = lu.cfg.AllocateDRBDMinor(
4895 [primary_node, remote_node] * len(disk_info), instance_name)
4898 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4899 for i in range(disk_count)]):
4900 names.append(lv_prefix + "_data")
4901 names.append(lv_prefix + "_meta")
4902 for idx, disk in enumerate(disk_info):
4903 disk_index = idx + base_index
4904 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4905 disk["size"], names[idx*2:idx*2+2],
4906 "disk/%d" % disk_index,
4907 minors[idx*2], minors[idx*2+1])
4908 disk_dev.mode = disk["mode"]
4909 disks.append(disk_dev)
4910 elif template_name == constants.DT_FILE:
4911 if len(secondary_nodes) != 0:
4912 raise errors.ProgrammerError("Wrong template configuration")
4914 for idx, disk in enumerate(disk_info):
4915 disk_index = idx + base_index
4916 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4917 iv_name="disk/%d" % disk_index,
4918 logical_id=(file_driver,
4919 "%s/disk%d" % (file_storage_dir,
4922 disks.append(disk_dev)
4924 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4928 def _GetInstanceInfoText(instance):
4929 """Compute that text that should be added to the disk's metadata.
4932 return "originstname+%s" % instance.name
4935 def _CreateDisks(lu, instance, to_skip=None):
4936 """Create all disks for an instance.
4938 This abstracts away some work from AddInstance.
4940 @type lu: L{LogicalUnit}
4941 @param lu: the logical unit on whose behalf we execute
4942 @type instance: L{objects.Instance}
4943 @param instance: the instance whose disks we should create
4945 @param to_skip: list of indices to skip
4947 @return: the success of the creation
4950 info = _GetInstanceInfoText(instance)
4951 pnode = instance.primary_node
4953 if instance.disk_template == constants.DT_FILE:
4954 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4955 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4957 result.Raise("Failed to create directory '%s' on"
4958 " node %s: %s" % (file_storage_dir, pnode))
4960 # Note: this needs to be kept in sync with adding of disks in
4961 # LUSetInstanceParams
4962 for idx, device in enumerate(instance.disks):
4963 if to_skip and idx in to_skip:
4965 logging.info("Creating volume %s for instance %s",
4966 device.iv_name, instance.name)
4968 for node in instance.all_nodes:
4969 f_create = node == pnode
4970 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4973 def _RemoveDisks(lu, instance):
4974 """Remove all disks for an instance.
4976 This abstracts away some work from `AddInstance()` and
4977 `RemoveInstance()`. Note that in case some of the devices couldn't
4978 be removed, the removal will continue with the other ones (compare
4979 with `_CreateDisks()`).
4981 @type lu: L{LogicalUnit}
4982 @param lu: the logical unit on whose behalf we execute
4983 @type instance: L{objects.Instance}
4984 @param instance: the instance whose disks we should remove
4986 @return: the success of the removal
4989 logging.info("Removing block devices for instance %s", instance.name)
4992 for device in instance.disks:
4993 for node, disk in device.ComputeNodeTree(instance.primary_node):
4994 lu.cfg.SetDiskID(disk, node)
4995 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4997 lu.LogWarning("Could not remove block device %s on node %s,"
4998 " continuing anyway: %s", device.iv_name, node, msg)
5001 if instance.disk_template == constants.DT_FILE:
5002 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5003 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
5005 msg = result.fail_msg
5007 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5008 file_storage_dir, instance.primary_node, msg)
5014 def _ComputeDiskSize(disk_template, disks):
5015 """Compute disk size requirements in the volume group
5018 # Required free disk space as a function of disk and swap space
5020 constants.DT_DISKLESS: None,
5021 constants.DT_PLAIN: sum(d["size"] for d in disks),
5022 # 128 MB are added for drbd metadata for each disk
5023 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5024 constants.DT_FILE: None,
5027 if disk_template not in req_size_dict:
5028 raise errors.ProgrammerError("Disk template '%s' size requirement"
5029 " is unknown" % disk_template)
5031 return req_size_dict[disk_template]
5034 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5035 """Hypervisor parameter validation.
5037 This function abstract the hypervisor parameter validation to be
5038 used in both instance create and instance modify.
5040 @type lu: L{LogicalUnit}
5041 @param lu: the logical unit for which we check
5042 @type nodenames: list
5043 @param nodenames: the list of nodes on which we should check
5044 @type hvname: string
5045 @param hvname: the name of the hypervisor we should use
5046 @type hvparams: dict
5047 @param hvparams: the parameters which we need to check
5048 @raise errors.OpPrereqError: if the parameters are not valid
5051 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5054 for node in nodenames:
5058 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5061 class LUCreateInstance(LogicalUnit):
5062 """Create an instance.
5065 HPATH = "instance-add"
5066 HTYPE = constants.HTYPE_INSTANCE
5067 _OP_REQP = ["instance_name", "disks", "disk_template",
5069 "wait_for_sync", "ip_check", "nics",
5070 "hvparams", "beparams"]
5073 def _ExpandNode(self, node):
5074 """Expands and checks one node name.
5077 node_full = self.cfg.ExpandNodeName(node)
5078 if node_full is None:
5079 raise errors.OpPrereqError("Unknown node %s" % node)
5082 def ExpandNames(self):
5083 """ExpandNames for CreateInstance.
5085 Figure out the right locks for instance creation.
5088 self.needed_locks = {}
5090 # set optional parameters to none if they don't exist
5091 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5092 if not hasattr(self.op, attr):
5093 setattr(self.op, attr, None)
5095 # cheap checks, mostly valid constants given
5097 # verify creation mode
5098 if self.op.mode not in (constants.INSTANCE_CREATE,
5099 constants.INSTANCE_IMPORT):
5100 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5103 # disk template and mirror node verification
5104 if self.op.disk_template not in constants.DISK_TEMPLATES:
5105 raise errors.OpPrereqError("Invalid disk template name")
5107 if self.op.hypervisor is None:
5108 self.op.hypervisor = self.cfg.GetHypervisorType()
5110 cluster = self.cfg.GetClusterInfo()
5111 enabled_hvs = cluster.enabled_hypervisors
5112 if self.op.hypervisor not in enabled_hvs:
5113 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5114 " cluster (%s)" % (self.op.hypervisor,
5115 ",".join(enabled_hvs)))
5117 # check hypervisor parameter syntax (locally)
5118 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5119 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5121 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5122 hv_type.CheckParameterSyntax(filled_hvp)
5123 self.hv_full = filled_hvp
5125 # fill and remember the beparams dict
5126 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5127 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5130 #### instance parameters check
5132 # instance name verification
5133 hostname1 = utils.HostInfo(self.op.instance_name)
5134 self.op.instance_name = instance_name = hostname1.name
5136 # this is just a preventive check, but someone might still add this
5137 # instance in the meantime, and creation will fail at lock-add time
5138 if instance_name in self.cfg.GetInstanceList():
5139 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5142 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5146 for idx, nic in enumerate(self.op.nics):
5147 nic_mode_req = nic.get("mode", None)
5148 nic_mode = nic_mode_req
5149 if nic_mode is None:
5150 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5152 # in routed mode, for the first nic, the default ip is 'auto'
5153 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5154 default_ip_mode = constants.VALUE_AUTO
5156 default_ip_mode = constants.VALUE_NONE
5158 # ip validity checks
5159 ip = nic.get("ip", default_ip_mode)
5160 if ip is None or ip.lower() == constants.VALUE_NONE:
5162 elif ip.lower() == constants.VALUE_AUTO:
5163 nic_ip = hostname1.ip
5165 if not utils.IsValidIP(ip):
5166 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5167 " like a valid IP" % ip)
5170 # TODO: check the ip for uniqueness !!
5171 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5172 raise errors.OpPrereqError("Routed nic mode requires an ip address")
5174 # MAC address verification
5175 mac = nic.get("mac", constants.VALUE_AUTO)
5176 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5177 if not utils.IsValidMac(mac.lower()):
5178 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5180 # bridge verification
5181 bridge = nic.get("bridge", None)
5182 link = nic.get("link", None)
5184 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5185 " at the same time")
5186 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5187 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
5193 nicparams[constants.NIC_MODE] = nic_mode_req
5195 nicparams[constants.NIC_LINK] = link
5197 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5199 objects.NIC.CheckParameterSyntax(check_params)
5200 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5202 # disk checks/pre-build
5204 for disk in self.op.disks:
5205 mode = disk.get("mode", constants.DISK_RDWR)
5206 if mode not in constants.DISK_ACCESS_SET:
5207 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5209 size = disk.get("size", None)
5211 raise errors.OpPrereqError("Missing disk size")
5215 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
5216 self.disks.append({"size": size, "mode": mode})
5218 # used in CheckPrereq for ip ping check
5219 self.check_ip = hostname1.ip
5221 # file storage checks
5222 if (self.op.file_driver and
5223 not self.op.file_driver in constants.FILE_DRIVER):
5224 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5225 self.op.file_driver)
5227 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5228 raise errors.OpPrereqError("File storage directory path not absolute")
5230 ### Node/iallocator related checks
5231 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5232 raise errors.OpPrereqError("One and only one of iallocator and primary"
5233 " node must be given")
5235 if self.op.iallocator:
5236 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5238 self.op.pnode = self._ExpandNode(self.op.pnode)
5239 nodelist = [self.op.pnode]
5240 if self.op.snode is not None:
5241 self.op.snode = self._ExpandNode(self.op.snode)
5242 nodelist.append(self.op.snode)
5243 self.needed_locks[locking.LEVEL_NODE] = nodelist
5245 # in case of import lock the source node too
5246 if self.op.mode == constants.INSTANCE_IMPORT:
5247 src_node = getattr(self.op, "src_node", None)
5248 src_path = getattr(self.op, "src_path", None)
5250 if src_path is None:
5251 self.op.src_path = src_path = self.op.instance_name
5253 if src_node is None:
5254 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5255 self.op.src_node = None
5256 if os.path.isabs(src_path):
5257 raise errors.OpPrereqError("Importing an instance from an absolute"
5258 " path requires a source node option.")
5260 self.op.src_node = src_node = self._ExpandNode(src_node)
5261 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5262 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5263 if not os.path.isabs(src_path):
5264 self.op.src_path = src_path = \
5265 os.path.join(constants.EXPORT_DIR, src_path)
5267 else: # INSTANCE_CREATE
5268 if getattr(self.op, "os_type", None) is None:
5269 raise errors.OpPrereqError("No guest OS specified")
5271 def _RunAllocator(self):
5272 """Run the allocator based on input opcode.
5275 nics = [n.ToDict() for n in self.nics]
5276 ial = IAllocator(self.cfg, self.rpc,
5277 mode=constants.IALLOCATOR_MODE_ALLOC,
5278 name=self.op.instance_name,
5279 disk_template=self.op.disk_template,
5282 vcpus=self.be_full[constants.BE_VCPUS],
5283 mem_size=self.be_full[constants.BE_MEMORY],
5286 hypervisor=self.op.hypervisor,
5289 ial.Run(self.op.iallocator)
5292 raise errors.OpPrereqError("Can't compute nodes using"
5293 " iallocator '%s': %s" % (self.op.iallocator,
5295 if len(ial.nodes) != ial.required_nodes:
5296 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5297 " of nodes (%s), required %s" %
5298 (self.op.iallocator, len(ial.nodes),
5299 ial.required_nodes))
5300 self.op.pnode = ial.nodes[0]
5301 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5302 self.op.instance_name, self.op.iallocator,
5303 ", ".join(ial.nodes))
5304 if ial.required_nodes == 2:
5305 self.op.snode = ial.nodes[1]
5307 def BuildHooksEnv(self):
5310 This runs on master, primary and secondary nodes of the instance.
5314 "ADD_MODE": self.op.mode,
5316 if self.op.mode == constants.INSTANCE_IMPORT:
5317 env["SRC_NODE"] = self.op.src_node
5318 env["SRC_PATH"] = self.op.src_path
5319 env["SRC_IMAGES"] = self.src_images
5321 env.update(_BuildInstanceHookEnv(
5322 name=self.op.instance_name,
5323 primary_node=self.op.pnode,
5324 secondary_nodes=self.secondaries,
5325 status=self.op.start,
5326 os_type=self.op.os_type,
5327 memory=self.be_full[constants.BE_MEMORY],
5328 vcpus=self.be_full[constants.BE_VCPUS],
5329 nics=_NICListToTuple(self, self.nics),
5330 disk_template=self.op.disk_template,
5331 disks=[(d["size"], d["mode"]) for d in self.disks],
5334 hypervisor_name=self.op.hypervisor,
5337 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5342 def CheckPrereq(self):
5343 """Check prerequisites.
5346 if (not self.cfg.GetVGName() and
5347 self.op.disk_template not in constants.DTS_NOT_LVM):
5348 raise errors.OpPrereqError("Cluster does not support lvm-based"
5351 if self.op.mode == constants.INSTANCE_IMPORT:
5352 src_node = self.op.src_node
5353 src_path = self.op.src_path
5355 if src_node is None:
5356 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5357 exp_list = self.rpc.call_export_list(locked_nodes)
5359 for node in exp_list:
5360 if exp_list[node].fail_msg:
5362 if src_path in exp_list[node].payload:
5364 self.op.src_node = src_node = node
5365 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5369 raise errors.OpPrereqError("No export found for relative path %s" %
5372 _CheckNodeOnline(self, src_node)
5373 result = self.rpc.call_export_info(src_node, src_path)
5374 result.Raise("No export or invalid export found in dir %s" % src_path)
5376 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5377 if not export_info.has_section(constants.INISECT_EXP):
5378 raise errors.ProgrammerError("Corrupted export config")
5380 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5381 if (int(ei_version) != constants.EXPORT_VERSION):
5382 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5383 (ei_version, constants.EXPORT_VERSION))
5385 # Check that the new instance doesn't have less disks than the export
5386 instance_disks = len(self.disks)
5387 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5388 if instance_disks < export_disks:
5389 raise errors.OpPrereqError("Not enough disks to import."
5390 " (instance: %d, export: %d)" %
5391 (instance_disks, export_disks))
5393 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5395 for idx in range(export_disks):
5396 option = 'disk%d_dump' % idx
5397 if export_info.has_option(constants.INISECT_INS, option):
5398 # FIXME: are the old os-es, disk sizes, etc. useful?
5399 export_name = export_info.get(constants.INISECT_INS, option)
5400 image = os.path.join(src_path, export_name)
5401 disk_images.append(image)
5403 disk_images.append(False)
5405 self.src_images = disk_images
5407 old_name = export_info.get(constants.INISECT_INS, 'name')
5408 # FIXME: int() here could throw a ValueError on broken exports
5409 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5410 if self.op.instance_name == old_name:
5411 for idx, nic in enumerate(self.nics):
5412 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5413 nic_mac_ini = 'nic%d_mac' % idx
5414 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5416 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5417 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5418 if self.op.start and not self.op.ip_check:
5419 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5420 " adding an instance in start mode")
5422 if self.op.ip_check:
5423 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5424 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5425 (self.check_ip, self.op.instance_name))
5427 #### mac address generation
5428 # By generating here the mac address both the allocator and the hooks get
5429 # the real final mac address rather than the 'auto' or 'generate' value.
5430 # There is a race condition between the generation and the instance object
5431 # creation, which means that we know the mac is valid now, but we're not
5432 # sure it will be when we actually add the instance. If things go bad
5433 # adding the instance will abort because of a duplicate mac, and the
5434 # creation job will fail.
5435 for nic in self.nics:
5436 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5437 nic.mac = self.cfg.GenerateMAC()
5441 if self.op.iallocator is not None:
5442 self._RunAllocator()
5444 #### node related checks
5446 # check primary node
5447 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5448 assert self.pnode is not None, \
5449 "Cannot retrieve locked node %s" % self.op.pnode
5451 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5454 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5457 self.secondaries = []
5459 # mirror node verification
5460 if self.op.disk_template in constants.DTS_NET_MIRROR:
5461 if self.op.snode is None:
5462 raise errors.OpPrereqError("The networked disk templates need"
5464 if self.op.snode == pnode.name:
5465 raise errors.OpPrereqError("The secondary node cannot be"
5466 " the primary node.")
5467 _CheckNodeOnline(self, self.op.snode)
5468 _CheckNodeNotDrained(self, self.op.snode)
5469 self.secondaries.append(self.op.snode)
5471 nodenames = [pnode.name] + self.secondaries
5473 req_size = _ComputeDiskSize(self.op.disk_template,
5476 # Check lv size requirements
5477 if req_size is not None:
5478 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5480 for node in nodenames:
5481 info = nodeinfo[node]
5482 info.Raise("Cannot get current information from node %s" % node)
5484 vg_free = info.get('vg_free', None)
5485 if not isinstance(vg_free, int):
5486 raise errors.OpPrereqError("Can't compute free disk space on"
5488 if req_size > vg_free:
5489 raise errors.OpPrereqError("Not enough disk space on target node %s."
5490 " %d MB available, %d MB required" %
5491 (node, vg_free, req_size))
5493 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5496 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5497 result.Raise("OS '%s' not in supported os list for primary node %s" %
5498 (self.op.os_type, pnode.name), prereq=True)
5500 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5502 # memory check on primary node
5504 _CheckNodeFreeMemory(self, self.pnode.name,
5505 "creating instance %s" % self.op.instance_name,
5506 self.be_full[constants.BE_MEMORY],
5509 self.dry_run_result = list(nodenames)
5511 def Exec(self, feedback_fn):
5512 """Create and add the instance to the cluster.
5515 instance = self.op.instance_name
5516 pnode_name = self.pnode.name
5518 ht_kind = self.op.hypervisor
5519 if ht_kind in constants.HTS_REQ_PORT:
5520 network_port = self.cfg.AllocatePort()
5524 ##if self.op.vnc_bind_address is None:
5525 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5527 # this is needed because os.path.join does not accept None arguments
5528 if self.op.file_storage_dir is None:
5529 string_file_storage_dir = ""
5531 string_file_storage_dir = self.op.file_storage_dir
5533 # build the full file storage dir path
5534 file_storage_dir = os.path.normpath(os.path.join(
5535 self.cfg.GetFileStorageDir(),
5536 string_file_storage_dir, instance))
5539 disks = _GenerateDiskTemplate(self,
5540 self.op.disk_template,
5541 instance, pnode_name,
5545 self.op.file_driver,
5548 iobj = objects.Instance(name=instance, os=self.op.os_type,
5549 primary_node=pnode_name,
5550 nics=self.nics, disks=disks,
5551 disk_template=self.op.disk_template,
5553 network_port=network_port,
5554 beparams=self.op.beparams,
5555 hvparams=self.op.hvparams,
5556 hypervisor=self.op.hypervisor,
5559 feedback_fn("* creating instance disks...")
5561 _CreateDisks(self, iobj)
5562 except errors.OpExecError:
5563 self.LogWarning("Device creation failed, reverting...")
5565 _RemoveDisks(self, iobj)
5567 self.cfg.ReleaseDRBDMinors(instance)
5570 feedback_fn("adding instance %s to cluster config" % instance)
5572 self.cfg.AddInstance(iobj)
5573 # Declare that we don't want to remove the instance lock anymore, as we've
5574 # added the instance to the config
5575 del self.remove_locks[locking.LEVEL_INSTANCE]
5576 # Unlock all the nodes
5577 if self.op.mode == constants.INSTANCE_IMPORT:
5578 nodes_keep = [self.op.src_node]
5579 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5580 if node != self.op.src_node]
5581 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5582 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5584 self.context.glm.release(locking.LEVEL_NODE)
5585 del self.acquired_locks[locking.LEVEL_NODE]
5587 if self.op.wait_for_sync:
5588 disk_abort = not _WaitForSync(self, iobj)
5589 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5590 # make sure the disks are not degraded (still sync-ing is ok)
5592 feedback_fn("* checking mirrors status")
5593 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5598 _RemoveDisks(self, iobj)
5599 self.cfg.RemoveInstance(iobj.name)
5600 # Make sure the instance lock gets removed
5601 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5602 raise errors.OpExecError("There are some degraded disks for"
5605 feedback_fn("creating os for instance %s on node %s" %
5606 (instance, pnode_name))
5608 if iobj.disk_template != constants.DT_DISKLESS:
5609 if self.op.mode == constants.INSTANCE_CREATE:
5610 feedback_fn("* running the instance OS create scripts...")
5611 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5612 result.Raise("Could not add os for instance %s"
5613 " on node %s" % (instance, pnode_name))
5615 elif self.op.mode == constants.INSTANCE_IMPORT:
5616 feedback_fn("* running the instance OS import scripts...")
5617 src_node = self.op.src_node
5618 src_images = self.src_images
5619 cluster_name = self.cfg.GetClusterName()
5620 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5621 src_node, src_images,
5623 msg = import_result.fail_msg
5625 self.LogWarning("Error while importing the disk images for instance"
5626 " %s on node %s: %s" % (instance, pnode_name, msg))
5628 # also checked in the prereq part
5629 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5633 iobj.admin_up = True
5634 self.cfg.Update(iobj)
5635 logging.info("Starting instance %s on node %s", instance, pnode_name)
5636 feedback_fn("* starting instance...")
5637 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5638 result.Raise("Could not start instance")
5640 return list(iobj.all_nodes)
5643 class LUConnectConsole(NoHooksLU):
5644 """Connect to an instance's console.
5646 This is somewhat special in that it returns the command line that
5647 you need to run on the master node in order to connect to the
5651 _OP_REQP = ["instance_name"]
5654 def ExpandNames(self):
5655 self._ExpandAndLockInstance()
5657 def CheckPrereq(self):
5658 """Check prerequisites.
5660 This checks that the instance is in the cluster.
5663 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5664 assert self.instance is not None, \
5665 "Cannot retrieve locked instance %s" % self.op.instance_name
5666 _CheckNodeOnline(self, self.instance.primary_node)
5668 def Exec(self, feedback_fn):
5669 """Connect to the console of an instance
5672 instance = self.instance
5673 node = instance.primary_node
5675 node_insts = self.rpc.call_instance_list([node],
5676 [instance.hypervisor])[node]
5677 node_insts.Raise("Can't get node information from %s" % node)
5679 if instance.name not in node_insts.payload:
5680 raise errors.OpExecError("Instance %s is not running." % instance.name)
5682 logging.debug("Connecting to console of %s on %s", instance.name, node)
5684 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5685 cluster = self.cfg.GetClusterInfo()
5686 # beparams and hvparams are passed separately, to avoid editing the
5687 # instance and then saving the defaults in the instance itself.
5688 hvparams = cluster.FillHV(instance)
5689 beparams = cluster.FillBE(instance)
5690 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5693 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5696 class LUReplaceDisks(LogicalUnit):
5697 """Replace the disks of an instance.
5700 HPATH = "mirrors-replace"
5701 HTYPE = constants.HTYPE_INSTANCE
5702 _OP_REQP = ["instance_name", "mode", "disks"]
5705 def CheckArguments(self):
5706 if not hasattr(self.op, "remote_node"):
5707 self.op.remote_node = None
5708 if not hasattr(self.op, "iallocator"):
5709 self.op.iallocator = None
5711 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
5714 def ExpandNames(self):
5715 self._ExpandAndLockInstance()
5717 if self.op.iallocator is not None:
5718 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5720 elif self.op.remote_node is not None:
5721 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5722 if remote_node is None:
5723 raise errors.OpPrereqError("Node '%s' not known" %
5724 self.op.remote_node)
5726 self.op.remote_node = remote_node
5728 # Warning: do not remove the locking of the new secondary here
5729 # unless DRBD8.AddChildren is changed to work in parallel;
5730 # currently it doesn't since parallel invocations of
5731 # FindUnusedMinor will conflict
5732 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5733 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5736 self.needed_locks[locking.LEVEL_NODE] = []
5737 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5739 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
5740 self.op.iallocator, self.op.remote_node,
5743 self.tasklets = [self.replacer]
5745 def DeclareLocks(self, level):
5746 # If we're not already locking all nodes in the set we have to declare the
5747 # instance's primary/secondary nodes.
5748 if (level == locking.LEVEL_NODE and
5749 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5750 self._LockInstancesNodes()
5752 def BuildHooksEnv(self):
5755 This runs on the master, the primary and all the secondaries.
5758 instance = self.replacer.instance
5760 "MODE": self.op.mode,
5761 "NEW_SECONDARY": self.op.remote_node,
5762 "OLD_SECONDARY": instance.secondary_nodes[0],
5764 env.update(_BuildInstanceHookEnvByObject(self, instance))
5766 self.cfg.GetMasterNode(),
5767 instance.primary_node,
5769 if self.op.remote_node is not None:
5770 nl.append(self.op.remote_node)
5774 class LUEvacuateNode(LogicalUnit):
5775 """Relocate the secondary instances from a node.
5778 HPATH = "node-evacuate"
5779 HTYPE = constants.HTYPE_NODE
5780 _OP_REQP = ["node_name"]
5783 def CheckArguments(self):
5784 if not hasattr(self.op, "remote_node"):
5785 self.op.remote_node = None
5786 if not hasattr(self.op, "iallocator"):
5787 self.op.iallocator = None
5789 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
5790 self.op.remote_node,
5793 def ExpandNames(self):
5794 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
5795 if self.op.node_name is None:
5796 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
5798 self.needed_locks = {}
5800 # Declare node locks
5801 if self.op.iallocator is not None:
5802 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5804 elif self.op.remote_node is not None:
5805 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5806 if remote_node is None:
5807 raise errors.OpPrereqError("Node '%s' not known" %
5808 self.op.remote_node)
5810 self.op.remote_node = remote_node
5812 # Warning: do not remove the locking of the new secondary here
5813 # unless DRBD8.AddChildren is changed to work in parallel;
5814 # currently it doesn't since parallel invocations of
5815 # FindUnusedMinor will conflict
5816 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5817 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5820 raise errors.OpPrereqError("Invalid parameters")
5822 # Create tasklets for replacing disks for all secondary instances on this
5827 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
5828 logging.debug("Replacing disks for instance %s", inst.name)
5829 names.append(inst.name)
5831 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
5832 self.op.iallocator, self.op.remote_node, [])
5833 tasklets.append(replacer)
5835 self.tasklets = tasklets
5836 self.instance_names = names
5838 # Declare instance locks
5839 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
5841 def DeclareLocks(self, level):
5842 # If we're not already locking all nodes in the set we have to declare the
5843 # instance's primary/secondary nodes.
5844 if (level == locking.LEVEL_NODE and
5845 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5846 self._LockInstancesNodes()
5848 def BuildHooksEnv(self):
5851 This runs on the master, the primary and all the secondaries.
5855 "NODE_NAME": self.op.node_name,
5858 nl = [self.cfg.GetMasterNode()]
5860 if self.op.remote_node is not None:
5861 env["NEW_SECONDARY"] = self.op.remote_node
5862 nl.append(self.op.remote_node)
5864 return (env, nl, nl)
5867 class TLReplaceDisks(Tasklet):
5868 """Replaces disks for an instance.
5870 Note: Locking is not within the scope of this class.
5873 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
5875 """Initializes this class.
5878 Tasklet.__init__(self, lu)
5881 self.instance_name = instance_name
5883 self.iallocator_name = iallocator_name
5884 self.remote_node = remote_node
5888 self.instance = None
5889 self.new_node = None
5890 self.target_node = None
5891 self.other_node = None
5892 self.remote_node_info = None
5893 self.node_secondary_ip = None
5896 def CheckArguments(mode, remote_node, iallocator):
5897 """Helper function for users of this class.
5900 # check for valid parameter combination
5901 if mode == constants.REPLACE_DISK_CHG:
5902 if remote_node is None and iallocator is None:
5903 raise errors.OpPrereqError("When changing the secondary either an"
5904 " iallocator script must be used or the"
5907 if remote_node is not None and iallocator is not None:
5908 raise errors.OpPrereqError("Give either the iallocator or the new"
5909 " secondary, not both")
5911 elif remote_node is not None or iallocator is not None:
5912 # Not replacing the secondary
5913 raise errors.OpPrereqError("The iallocator and new node options can"
5914 " only be used when changing the"
5918 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
5919 """Compute a new secondary node using an IAllocator.
5922 ial = IAllocator(lu.cfg, lu.rpc,
5923 mode=constants.IALLOCATOR_MODE_RELOC,
5925 relocate_from=relocate_from)
5927 ial.Run(iallocator_name)
5930 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
5931 " %s" % (iallocator_name, ial.info))
5933 if len(ial.nodes) != ial.required_nodes:
5934 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5935 " of nodes (%s), required %s" %
5936 (len(ial.nodes), ial.required_nodes))
5938 remote_node_name = ial.nodes[0]
5940 lu.LogInfo("Selected new secondary for instance '%s': %s",
5941 instance_name, remote_node_name)
5943 return remote_node_name
5945 def _FindFaultyDisks(self, node_name):
5946 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
5949 def CheckPrereq(self):
5950 """Check prerequisites.
5952 This checks that the instance is in the cluster.
5955 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
5956 assert self.instance is not None, \
5957 "Cannot retrieve locked instance %s" % self.instance_name
5959 if self.instance.disk_template != constants.DT_DRBD8:
5960 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5963 if len(self.instance.secondary_nodes) != 1:
5964 raise errors.OpPrereqError("The instance has a strange layout,"
5965 " expected one secondary but found %d" %
5966 len(self.instance.secondary_nodes))
5968 secondary_node = self.instance.secondary_nodes[0]
5970 if self.iallocator_name is None:
5971 remote_node = self.remote_node
5973 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
5974 self.instance.name, secondary_node)
5976 if remote_node is not None:
5977 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5978 assert self.remote_node_info is not None, \
5979 "Cannot retrieve locked node %s" % remote_node
5981 self.remote_node_info = None
5983 if remote_node == self.instance.primary_node:
5984 raise errors.OpPrereqError("The specified node is the primary node of"
5987 if remote_node == secondary_node:
5988 raise errors.OpPrereqError("The specified node is already the"
5989 " secondary node of the instance.")
5991 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
5992 constants.REPLACE_DISK_CHG):
5993 raise errors.OpPrereqError("Cannot specify disks to be replaced")
5995 if self.mode == constants.REPLACE_DISK_AUTO:
5996 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
5997 faulty_secondary = self._FindFaultyDisks(secondary_node)
5999 if faulty_primary and faulty_secondary:
6000 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6001 " one node and can not be repaired"
6002 " automatically" % self.instance_name)
6005 self.disks = faulty_primary
6006 self.target_node = self.instance.primary_node
6007 self.other_node = secondary_node
6008 check_nodes = [self.target_node, self.other_node]
6009 elif faulty_secondary:
6010 self.disks = faulty_secondary
6011 self.target_node = secondary_node
6012 self.other_node = self.instance.primary_node
6013 check_nodes = [self.target_node, self.other_node]
6019 # Non-automatic modes
6020 if self.mode == constants.REPLACE_DISK_PRI:
6021 self.target_node = self.instance.primary_node
6022 self.other_node = secondary_node
6023 check_nodes = [self.target_node, self.other_node]
6025 elif self.mode == constants.REPLACE_DISK_SEC:
6026 self.target_node = secondary_node
6027 self.other_node = self.instance.primary_node
6028 check_nodes = [self.target_node, self.other_node]
6030 elif self.mode == constants.REPLACE_DISK_CHG:
6031 self.new_node = remote_node
6032 self.other_node = self.instance.primary_node
6033 self.target_node = secondary_node
6034 check_nodes = [self.new_node, self.other_node]
6036 _CheckNodeNotDrained(self.lu, remote_node)
6039 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6042 # If not specified all disks should be replaced
6044 self.disks = range(len(self.instance.disks))
6046 for node in check_nodes:
6047 _CheckNodeOnline(self.lu, node)
6049 # Check whether disks are valid
6050 for disk_idx in self.disks:
6051 self.instance.FindDisk(disk_idx)
6053 # Get secondary node IP addresses
6056 for node_name in [self.target_node, self.other_node, self.new_node]:
6057 if node_name is not None:
6058 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6060 self.node_secondary_ip = node_2nd_ip
6062 def Exec(self, feedback_fn):
6063 """Execute disk replacement.
6065 This dispatches the disk replacement to the appropriate handler.
6069 feedback_fn("No disks need replacement")
6072 feedback_fn("Replacing disk(s) %s for %s" %
6073 (", ".join([str(i) for i in self.disks]), self.instance.name))
6075 activate_disks = (not self.instance.admin_up)
6077 # Activate the instance disks if we're replacing them on a down instance
6079 _StartInstanceDisks(self.lu, self.instance, True)
6082 # Should we replace the secondary node?
6083 if self.new_node is not None:
6084 return self._ExecDrbd8Secondary()
6086 return self._ExecDrbd8DiskOnly()
6089 # Deactivate the instance disks if we're replacing them on a down instance
6091 _SafeShutdownInstanceDisks(self.lu, self.instance)
6093 def _CheckVolumeGroup(self, nodes):
6094 self.lu.LogInfo("Checking volume groups")
6096 vgname = self.cfg.GetVGName()
6098 # Make sure volume group exists on all involved nodes
6099 results = self.rpc.call_vg_list(nodes)
6101 raise errors.OpExecError("Can't list volume groups on the nodes")
6105 res.Raise("Error checking node %s" % node)
6106 if vgname not in res.payload:
6107 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6110 def _CheckDisksExistence(self, nodes):
6111 # Check disk existence
6112 for idx, dev in enumerate(self.instance.disks):
6113 if idx not in self.disks:
6117 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6118 self.cfg.SetDiskID(dev, node)
6120 result = self.rpc.call_blockdev_find(node, dev)
6122 msg = result.fail_msg
6123 if msg or not result.payload:
6125 msg = "disk not found"
6126 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6129 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6130 for idx, dev in enumerate(self.instance.disks):
6131 if idx not in self.disks:
6134 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6137 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6139 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6140 " replace disks for instance %s" %
6141 (node_name, self.instance.name))
6143 def _CreateNewStorage(self, node_name):
6144 vgname = self.cfg.GetVGName()
6147 for idx, dev in enumerate(self.instance.disks):
6148 if idx not in self.disks:
6151 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6153 self.cfg.SetDiskID(dev, node_name)
6155 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6156 names = _GenerateUniqueNames(self.lu, lv_names)
6158 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6159 logical_id=(vgname, names[0]))
6160 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6161 logical_id=(vgname, names[1]))
6163 new_lvs = [lv_data, lv_meta]
6164 old_lvs = dev.children
6165 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6167 # we pass force_create=True to force the LVM creation
6168 for new_lv in new_lvs:
6169 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6170 _GetInstanceInfoText(self.instance), False)
6174 def _CheckDevices(self, node_name, iv_names):
6175 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6176 self.cfg.SetDiskID(dev, node_name)
6178 result = self.rpc.call_blockdev_find(node_name, dev)
6180 msg = result.fail_msg
6181 if msg or not result.payload:
6183 msg = "disk not found"
6184 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6187 if result.payload.is_degraded:
6188 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6190 def _RemoveOldStorage(self, node_name, iv_names):
6191 for name, (dev, old_lvs, _) in iv_names.iteritems():
6192 self.lu.LogInfo("Remove logical volumes for %s" % name)
6195 self.cfg.SetDiskID(lv, node_name)
6197 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6199 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6200 hint="remove unused LVs manually")
6202 def _ExecDrbd8DiskOnly(self):
6203 """Replace a disk on the primary or secondary for DRBD 8.
6205 The algorithm for replace is quite complicated:
6207 1. for each disk to be replaced:
6209 1. create new LVs on the target node with unique names
6210 1. detach old LVs from the drbd device
6211 1. rename old LVs to name_replaced.<time_t>
6212 1. rename new LVs to old LVs
6213 1. attach the new LVs (with the old names now) to the drbd device
6215 1. wait for sync across all devices
6217 1. for each modified disk:
6219 1. remove old LVs (which have the name name_replaces.<time_t>)
6221 Failures are not very well handled.
6226 # Step: check device activation
6227 self.lu.LogStep(1, steps_total, "Check device existence")
6228 self._CheckDisksExistence([self.other_node, self.target_node])
6229 self._CheckVolumeGroup([self.target_node, self.other_node])
6231 # Step: check other node consistency
6232 self.lu.LogStep(2, steps_total, "Check peer consistency")
6233 self._CheckDisksConsistency(self.other_node,
6234 self.other_node == self.instance.primary_node,
6237 # Step: create new storage
6238 self.lu.LogStep(3, steps_total, "Allocate new storage")
6239 iv_names = self._CreateNewStorage(self.target_node)
6241 # Step: for each lv, detach+rename*2+attach
6242 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6243 for dev, old_lvs, new_lvs in iv_names.itervalues():
6244 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6246 result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs)
6247 result.Raise("Can't detach drbd from local storage on node"
6248 " %s for device %s" % (self.target_node, dev.iv_name))
6250 #cfg.Update(instance)
6252 # ok, we created the new LVs, so now we know we have the needed
6253 # storage; as such, we proceed on the target node to rename
6254 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6255 # using the assumption that logical_id == physical_id (which in
6256 # turn is the unique_id on that node)
6258 # FIXME(iustin): use a better name for the replaced LVs
6259 temp_suffix = int(time.time())
6260 ren_fn = lambda d, suff: (d.physical_id[0],
6261 d.physical_id[1] + "_replaced-%s" % suff)
6263 # Build the rename list based on what LVs exist on the node
6264 rename_old_to_new = []
6265 for to_ren in old_lvs:
6266 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6267 if not result.fail_msg and result.payload:
6269 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6271 self.lu.LogInfo("Renaming the old LVs on the target node")
6272 result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new)
6273 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6275 # Now we rename the new LVs to the old LVs
6276 self.lu.LogInfo("Renaming the new LVs on the target node")
6277 rename_new_to_old = [(new, old.physical_id)
6278 for old, new in zip(old_lvs, new_lvs)]
6279 result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old)
6280 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6282 for old, new in zip(old_lvs, new_lvs):
6283 new.logical_id = old.logical_id
6284 self.cfg.SetDiskID(new, self.target_node)
6286 for disk in old_lvs:
6287 disk.logical_id = ren_fn(disk, temp_suffix)
6288 self.cfg.SetDiskID(disk, self.target_node)
6290 # Now that the new lvs have the old name, we can add them to the device
6291 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6292 result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs)
6293 msg = result.fail_msg
6295 for new_lv in new_lvs:
6296 msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg
6298 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6299 hint=("cleanup manually the unused logical"
6301 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6303 dev.children = new_lvs
6305 self.cfg.Update(self.instance)
6308 # This can fail as the old devices are degraded and _WaitForSync
6309 # does a combined result over all disks, so we don't check its return value
6310 self.lu.LogStep(5, steps_total, "Sync devices")
6311 _WaitForSync(self.lu, self.instance, unlock=True)
6313 # Check all devices manually
6314 self._CheckDevices(self.instance.primary_node, iv_names)
6316 # Step: remove old storage
6317 self.lu.LogStep(6, steps_total, "Removing old storage")
6318 self._RemoveOldStorage(self.target_node, iv_names)
6320 def _ExecDrbd8Secondary(self):
6321 """Replace the secondary node for DRBD 8.
6323 The algorithm for replace is quite complicated:
6324 - for all disks of the instance:
6325 - create new LVs on the new node with same names
6326 - shutdown the drbd device on the old secondary
6327 - disconnect the drbd network on the primary
6328 - create the drbd device on the new secondary
6329 - network attach the drbd on the primary, using an artifice:
6330 the drbd code for Attach() will connect to the network if it
6331 finds a device which is connected to the good local disks but
6333 - wait for sync across all devices
6334 - remove all disks from the old secondary
6336 Failures are not very well handled.
6341 # Step: check device activation
6342 self.lu.LogStep(1, steps_total, "Check device existence")
6343 self._CheckDisksExistence([self.instance.primary_node])
6344 self._CheckVolumeGroup([self.instance.primary_node])
6346 # Step: check other node consistency
6347 self.lu.LogStep(2, steps_total, "Check peer consistency")
6348 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6350 # Step: create new storage
6351 self.lu.LogStep(3, steps_total, "Allocate new storage")
6352 for idx, dev in enumerate(self.instance.disks):
6353 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6354 (self.new_node, idx))
6355 # we pass force_create=True to force LVM creation
6356 for new_lv in dev.children:
6357 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6358 _GetInstanceInfoText(self.instance), False)
6360 # Step 4: dbrd minors and drbd setups changes
6361 # after this, we must manually remove the drbd minors on both the
6362 # error and the success paths
6363 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6364 minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks],
6366 logging.debug("Allocated minors %r" % (minors,))
6369 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6370 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx))
6371 # create new devices on new_node; note that we create two IDs:
6372 # one without port, so the drbd will be activated without
6373 # networking information on the new node at this stage, and one
6374 # with network, for the latter activation in step 4
6375 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6376 if self.instance.primary_node == o_node1:
6381 new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret)
6382 new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret)
6384 iv_names[idx] = (dev, dev.children, new_net_id)
6385 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6387 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6388 logical_id=new_alone_id,
6389 children=dev.children,
6392 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6393 _GetInstanceInfoText(self.instance), False)
6394 except errors.GenericError:
6395 self.cfg.ReleaseDRBDMinors(self.instance.name)
6398 # We have new devices, shutdown the drbd on the old secondary
6399 for idx, dev in enumerate(self.instance.disks):
6400 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6401 self.cfg.SetDiskID(dev, self.target_node)
6402 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6404 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6405 "node: %s" % (idx, msg),
6406 hint=("Please cleanup this device manually as"
6407 " soon as possible"))
6409 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6410 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip,
6411 self.instance.disks)[self.instance.primary_node]
6413 msg = result.fail_msg
6415 # detaches didn't succeed (unlikely)
6416 self.cfg.ReleaseDRBDMinors(self.instance.name)
6417 raise errors.OpExecError("Can't detach the disks from the network on"
6418 " old node: %s" % (msg,))
6420 # if we managed to detach at least one, we update all the disks of
6421 # the instance to point to the new secondary
6422 self.lu.LogInfo("Updating instance configuration")
6423 for dev, _, new_logical_id in iv_names.itervalues():
6424 dev.logical_id = new_logical_id
6425 self.cfg.SetDiskID(dev, self.instance.primary_node)
6427 self.cfg.Update(self.instance)
6429 # and now perform the drbd attach
6430 self.lu.LogInfo("Attaching primary drbds to new secondary"
6431 " (standalone => connected)")
6432 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip,
6433 self.instance.disks, self.instance.name,
6435 for to_node, to_result in result.items():
6436 msg = to_result.fail_msg
6438 self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg,
6439 hint=("please do a gnt-instance info to see the"
6440 " status of disks"))
6443 # This can fail as the old devices are degraded and _WaitForSync
6444 # does a combined result over all disks, so we don't check its return value
6445 self.lu.LogStep(5, steps_total, "Sync devices")
6446 _WaitForSync(self.lu, self.instance, unlock=True)
6448 # Check all devices manually
6449 self._CheckDevices(self.instance.primary_node, iv_names)
6451 # Step: remove old storage
6452 self.lu.LogStep(6, steps_total, "Removing old storage")
6453 self._RemoveOldStorage(self.target_node, iv_names)
6456 class LURepairNodeStorage(NoHooksLU):
6457 """Repairs the volume group on a node.
6460 _OP_REQP = ["node_name"]
6463 def CheckArguments(self):
6464 node_name = self.cfg.ExpandNodeName(self.op.node_name)
6465 if node_name is None:
6466 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
6468 self.op.node_name = node_name
6470 def ExpandNames(self):
6471 self.needed_locks = {
6472 locking.LEVEL_NODE: [self.op.node_name],
6475 def _CheckFaultyDisks(self, instance, node_name):
6476 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
6478 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
6479 " node '%s'" % (inst.name, node_name))
6481 def CheckPrereq(self):
6482 """Check prerequisites.
6485 storage_type = self.op.storage_type
6487 if (constants.SO_FIX_CONSISTENCY not in
6488 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
6489 raise errors.OpPrereqError("Storage units of type '%s' can not be"
6490 " repaired" % storage_type)
6492 # Check whether any instance on this node has faulty disks
6493 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
6494 check_nodes = set(inst.all_nodes)
6495 check_nodes.discard(self.op.node_name)
6496 for inst_node_name in check_nodes:
6497 self._CheckFaultyDisks(inst, inst_node_name)
6499 def Exec(self, feedback_fn):
6500 feedback_fn("Repairing storage unit '%s' on %s ..." %
6501 (self.op.name, self.op.node_name))
6503 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
6504 result = self.rpc.call_storage_execute(self.op.node_name,
6505 self.op.storage_type, st_args,
6507 constants.SO_FIX_CONSISTENCY)
6508 result.Raise("Failed to repair storage unit '%s' on %s" %
6509 (self.op.name, self.op.node_name))
6512 class LUGrowDisk(LogicalUnit):
6513 """Grow a disk of an instance.
6517 HTYPE = constants.HTYPE_INSTANCE
6518 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6521 def ExpandNames(self):
6522 self._ExpandAndLockInstance()
6523 self.needed_locks[locking.LEVEL_NODE] = []
6524 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6526 def DeclareLocks(self, level):
6527 if level == locking.LEVEL_NODE:
6528 self._LockInstancesNodes()
6530 def BuildHooksEnv(self):
6533 This runs on the master, the primary and all the secondaries.
6537 "DISK": self.op.disk,
6538 "AMOUNT": self.op.amount,
6540 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6542 self.cfg.GetMasterNode(),
6543 self.instance.primary_node,
6547 def CheckPrereq(self):
6548 """Check prerequisites.
6550 This checks that the instance is in the cluster.
6553 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6554 assert instance is not None, \
6555 "Cannot retrieve locked instance %s" % self.op.instance_name
6556 nodenames = list(instance.all_nodes)
6557 for node in nodenames:
6558 _CheckNodeOnline(self, node)
6561 self.instance = instance
6563 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6564 raise errors.OpPrereqError("Instance's disk layout does not support"
6567 self.disk = instance.FindDisk(self.op.disk)
6569 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6570 instance.hypervisor)
6571 for node in nodenames:
6572 info = nodeinfo[node]
6573 info.Raise("Cannot get current information from node %s" % node)
6574 vg_free = info.payload.get('vg_free', None)
6575 if not isinstance(vg_free, int):
6576 raise errors.OpPrereqError("Can't compute free disk space on"
6578 if self.op.amount > vg_free:
6579 raise errors.OpPrereqError("Not enough disk space on target node %s:"
6580 " %d MiB available, %d MiB required" %
6581 (node, vg_free, self.op.amount))
6583 def Exec(self, feedback_fn):
6584 """Execute disk grow.
6587 instance = self.instance
6589 for node in instance.all_nodes:
6590 self.cfg.SetDiskID(disk, node)
6591 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6592 result.Raise("Grow request failed to node %s" % node)
6593 disk.RecordGrow(self.op.amount)
6594 self.cfg.Update(instance)
6595 if self.op.wait_for_sync:
6596 disk_abort = not _WaitForSync(self, instance)
6598 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6599 " status.\nPlease check the instance.")
6602 class LUQueryInstanceData(NoHooksLU):
6603 """Query runtime instance data.
6606 _OP_REQP = ["instances", "static"]
6609 def ExpandNames(self):
6610 self.needed_locks = {}
6611 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6613 if not isinstance(self.op.instances, list):
6614 raise errors.OpPrereqError("Invalid argument type 'instances'")
6616 if self.op.instances:
6617 self.wanted_names = []
6618 for name in self.op.instances:
6619 full_name = self.cfg.ExpandInstanceName(name)
6620 if full_name is None:
6621 raise errors.OpPrereqError("Instance '%s' not known" % name)
6622 self.wanted_names.append(full_name)
6623 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6625 self.wanted_names = None
6626 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6628 self.needed_locks[locking.LEVEL_NODE] = []
6629 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6631 def DeclareLocks(self, level):
6632 if level == locking.LEVEL_NODE:
6633 self._LockInstancesNodes()
6635 def CheckPrereq(self):
6636 """Check prerequisites.
6638 This only checks the optional instance list against the existing names.
6641 if self.wanted_names is None:
6642 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
6644 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
6645 in self.wanted_names]
6648 def _ComputeBlockdevStatus(self, node, instance_name, dev):
6649 """Returns the status of a block device
6652 if self.op.static or not node:
6655 self.cfg.SetDiskID(dev, node)
6657 result = self.rpc.call_blockdev_find(node, dev)
6661 result.Raise("Can't compute disk status for %s" % instance_name)
6663 status = result.payload
6667 return (status.dev_path, status.major, status.minor,
6668 status.sync_percent, status.estimated_time,
6669 status.is_degraded, status.ldisk_status)
6671 def _ComputeDiskStatus(self, instance, snode, dev):
6672 """Compute block device status.
6675 if dev.dev_type in constants.LDS_DRBD:
6676 # we change the snode then (otherwise we use the one passed in)
6677 if dev.logical_id[0] == instance.primary_node:
6678 snode = dev.logical_id[1]
6680 snode = dev.logical_id[0]
6682 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
6684 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
6687 dev_children = [self._ComputeDiskStatus(instance, snode, child)
6688 for child in dev.children]
6693 "iv_name": dev.iv_name,
6694 "dev_type": dev.dev_type,
6695 "logical_id": dev.logical_id,
6696 "physical_id": dev.physical_id,
6697 "pstatus": dev_pstatus,
6698 "sstatus": dev_sstatus,
6699 "children": dev_children,
6706 def Exec(self, feedback_fn):
6707 """Gather and return data"""
6710 cluster = self.cfg.GetClusterInfo()
6712 for instance in self.wanted_instances:
6713 if not self.op.static:
6714 remote_info = self.rpc.call_instance_info(instance.primary_node,
6716 instance.hypervisor)
6717 remote_info.Raise("Error checking node %s" % instance.primary_node)
6718 remote_info = remote_info.payload
6719 if remote_info and "state" in remote_info:
6722 remote_state = "down"
6725 if instance.admin_up:
6728 config_state = "down"
6730 disks = [self._ComputeDiskStatus(instance, None, device)
6731 for device in instance.disks]
6734 "name": instance.name,
6735 "config_state": config_state,
6736 "run_state": remote_state,
6737 "pnode": instance.primary_node,
6738 "snodes": instance.secondary_nodes,
6740 # this happens to be the same format used for hooks
6741 "nics": _NICListToTuple(self, instance.nics),
6743 "hypervisor": instance.hypervisor,
6744 "network_port": instance.network_port,
6745 "hv_instance": instance.hvparams,
6746 "hv_actual": cluster.FillHV(instance),
6747 "be_instance": instance.beparams,
6748 "be_actual": cluster.FillBE(instance),
6749 "serial_no": instance.serial_no,
6750 "mtime": instance.mtime,
6751 "ctime": instance.ctime,
6754 result[instance.name] = idict
6759 class LUSetInstanceParams(LogicalUnit):
6760 """Modifies an instances's parameters.
6763 HPATH = "instance-modify"
6764 HTYPE = constants.HTYPE_INSTANCE
6765 _OP_REQP = ["instance_name"]
6768 def CheckArguments(self):
6769 if not hasattr(self.op, 'nics'):
6771 if not hasattr(self.op, 'disks'):
6773 if not hasattr(self.op, 'beparams'):
6774 self.op.beparams = {}
6775 if not hasattr(self.op, 'hvparams'):
6776 self.op.hvparams = {}
6777 self.op.force = getattr(self.op, "force", False)
6778 if not (self.op.nics or self.op.disks or
6779 self.op.hvparams or self.op.beparams):
6780 raise errors.OpPrereqError("No changes submitted")
6784 for disk_op, disk_dict in self.op.disks:
6785 if disk_op == constants.DDM_REMOVE:
6788 elif disk_op == constants.DDM_ADD:
6791 if not isinstance(disk_op, int):
6792 raise errors.OpPrereqError("Invalid disk index")
6793 if not isinstance(disk_dict, dict):
6794 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
6795 raise errors.OpPrereqError(msg)
6797 if disk_op == constants.DDM_ADD:
6798 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
6799 if mode not in constants.DISK_ACCESS_SET:
6800 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
6801 size = disk_dict.get('size', None)
6803 raise errors.OpPrereqError("Required disk parameter size missing")
6806 except ValueError, err:
6807 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
6809 disk_dict['size'] = size
6811 # modification of disk
6812 if 'size' in disk_dict:
6813 raise errors.OpPrereqError("Disk size change not possible, use"
6816 if disk_addremove > 1:
6817 raise errors.OpPrereqError("Only one disk add or remove operation"
6818 " supported at a time")
6822 for nic_op, nic_dict in self.op.nics:
6823 if nic_op == constants.DDM_REMOVE:
6826 elif nic_op == constants.DDM_ADD:
6829 if not isinstance(nic_op, int):
6830 raise errors.OpPrereqError("Invalid nic index")
6831 if not isinstance(nic_dict, dict):
6832 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
6833 raise errors.OpPrereqError(msg)
6835 # nic_dict should be a dict
6836 nic_ip = nic_dict.get('ip', None)
6837 if nic_ip is not None:
6838 if nic_ip.lower() == constants.VALUE_NONE:
6839 nic_dict['ip'] = None
6841 if not utils.IsValidIP(nic_ip):
6842 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
6844 nic_bridge = nic_dict.get('bridge', None)
6845 nic_link = nic_dict.get('link', None)
6846 if nic_bridge and nic_link:
6847 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
6848 " at the same time")
6849 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
6850 nic_dict['bridge'] = None
6851 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
6852 nic_dict['link'] = None
6854 if nic_op == constants.DDM_ADD:
6855 nic_mac = nic_dict.get('mac', None)
6857 nic_dict['mac'] = constants.VALUE_AUTO
6859 if 'mac' in nic_dict:
6860 nic_mac = nic_dict['mac']
6861 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6862 if not utils.IsValidMac(nic_mac):
6863 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6864 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6865 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6866 " modifying an existing nic")
6868 if nic_addremove > 1:
6869 raise errors.OpPrereqError("Only one NIC add or remove operation"
6870 " supported at a time")
6872 def ExpandNames(self):
6873 self._ExpandAndLockInstance()
6874 self.needed_locks[locking.LEVEL_NODE] = []
6875 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6877 def DeclareLocks(self, level):
6878 if level == locking.LEVEL_NODE:
6879 self._LockInstancesNodes()
6881 def BuildHooksEnv(self):
6884 This runs on the master, primary and secondaries.
6888 if constants.BE_MEMORY in self.be_new:
6889 args['memory'] = self.be_new[constants.BE_MEMORY]
6890 if constants.BE_VCPUS in self.be_new:
6891 args['vcpus'] = self.be_new[constants.BE_VCPUS]
6892 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6893 # information at all.
6896 nic_override = dict(self.op.nics)
6897 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6898 for idx, nic in enumerate(self.instance.nics):
6899 if idx in nic_override:
6900 this_nic_override = nic_override[idx]
6902 this_nic_override = {}
6903 if 'ip' in this_nic_override:
6904 ip = this_nic_override['ip']
6907 if 'mac' in this_nic_override:
6908 mac = this_nic_override['mac']
6911 if idx in self.nic_pnew:
6912 nicparams = self.nic_pnew[idx]
6914 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6915 mode = nicparams[constants.NIC_MODE]
6916 link = nicparams[constants.NIC_LINK]
6917 args['nics'].append((ip, mac, mode, link))
6918 if constants.DDM_ADD in nic_override:
6919 ip = nic_override[constants.DDM_ADD].get('ip', None)
6920 mac = nic_override[constants.DDM_ADD]['mac']
6921 nicparams = self.nic_pnew[constants.DDM_ADD]
6922 mode = nicparams[constants.NIC_MODE]
6923 link = nicparams[constants.NIC_LINK]
6924 args['nics'].append((ip, mac, mode, link))
6925 elif constants.DDM_REMOVE in nic_override:
6926 del args['nics'][-1]
6928 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6929 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6932 def _GetUpdatedParams(self, old_params, update_dict,
6933 default_values, parameter_types):
6934 """Return the new params dict for the given params.
6936 @type old_params: dict
6937 @param old_params: old parameters
6938 @type update_dict: dict
6939 @param update_dict: dict containing new parameter values,
6940 or constants.VALUE_DEFAULT to reset the
6941 parameter to its default value
6942 @type default_values: dict
6943 @param default_values: default values for the filled parameters
6944 @type parameter_types: dict
6945 @param parameter_types: dict mapping target dict keys to types
6946 in constants.ENFORCEABLE_TYPES
6947 @rtype: (dict, dict)
6948 @return: (new_parameters, filled_parameters)
6951 params_copy = copy.deepcopy(old_params)
6952 for key, val in update_dict.iteritems():
6953 if val == constants.VALUE_DEFAULT:
6955 del params_copy[key]
6959 params_copy[key] = val
6960 utils.ForceDictType(params_copy, parameter_types)
6961 params_filled = objects.FillDict(default_values, params_copy)
6962 return (params_copy, params_filled)
6964 def CheckPrereq(self):
6965 """Check prerequisites.
6967 This only checks the instance list against the existing names.
6970 self.force = self.op.force
6972 # checking the new params on the primary/secondary nodes
6974 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6975 cluster = self.cluster = self.cfg.GetClusterInfo()
6976 assert self.instance is not None, \
6977 "Cannot retrieve locked instance %s" % self.op.instance_name
6978 pnode = instance.primary_node
6979 nodelist = list(instance.all_nodes)
6981 # hvparams processing
6982 if self.op.hvparams:
6983 i_hvdict, hv_new = self._GetUpdatedParams(
6984 instance.hvparams, self.op.hvparams,
6985 cluster.hvparams[instance.hypervisor],
6986 constants.HVS_PARAMETER_TYPES)
6988 hypervisor.GetHypervisor(
6989 instance.hypervisor).CheckParameterSyntax(hv_new)
6990 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6991 self.hv_new = hv_new # the new actual values
6992 self.hv_inst = i_hvdict # the new dict (without defaults)
6994 self.hv_new = self.hv_inst = {}
6996 # beparams processing
6997 if self.op.beparams:
6998 i_bedict, be_new = self._GetUpdatedParams(
6999 instance.beparams, self.op.beparams,
7000 cluster.beparams[constants.PP_DEFAULT],
7001 constants.BES_PARAMETER_TYPES)
7002 self.be_new = be_new # the new actual values
7003 self.be_inst = i_bedict # the new dict (without defaults)
7005 self.be_new = self.be_inst = {}
7009 if constants.BE_MEMORY in self.op.beparams and not self.force:
7010 mem_check_list = [pnode]
7011 if be_new[constants.BE_AUTO_BALANCE]:
7012 # either we changed auto_balance to yes or it was from before
7013 mem_check_list.extend(instance.secondary_nodes)
7014 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7015 instance.hypervisor)
7016 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7017 instance.hypervisor)
7018 pninfo = nodeinfo[pnode]
7019 msg = pninfo.fail_msg
7021 # Assume the primary node is unreachable and go ahead
7022 self.warn.append("Can't get info from primary node %s: %s" %
7024 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7025 self.warn.append("Node data from primary node %s doesn't contain"
7026 " free memory information" % pnode)
7027 elif instance_info.fail_msg:
7028 self.warn.append("Can't get instance runtime information: %s" %
7029 instance_info.fail_msg)
7031 if instance_info.payload:
7032 current_mem = int(instance_info.payload['memory'])
7034 # Assume instance not running
7035 # (there is a slight race condition here, but it's not very probable,
7036 # and we have no other way to check)
7038 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7039 pninfo.payload['memory_free'])
7041 raise errors.OpPrereqError("This change will prevent the instance"
7042 " from starting, due to %d MB of memory"
7043 " missing on its primary node" % miss_mem)
7045 if be_new[constants.BE_AUTO_BALANCE]:
7046 for node, nres in nodeinfo.items():
7047 if node not in instance.secondary_nodes:
7051 self.warn.append("Can't get info from secondary node %s: %s" %
7053 elif not isinstance(nres.payload.get('memory_free', None), int):
7054 self.warn.append("Secondary node %s didn't return free"
7055 " memory information" % node)
7056 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7057 self.warn.append("Not enough memory to failover instance to"
7058 " secondary node %s" % node)
7063 for nic_op, nic_dict in self.op.nics:
7064 if nic_op == constants.DDM_REMOVE:
7065 if not instance.nics:
7066 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
7068 if nic_op != constants.DDM_ADD:
7070 if nic_op < 0 or nic_op >= len(instance.nics):
7071 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7073 (nic_op, len(instance.nics)))
7074 old_nic_params = instance.nics[nic_op].nicparams
7075 old_nic_ip = instance.nics[nic_op].ip
7080 update_params_dict = dict([(key, nic_dict[key])
7081 for key in constants.NICS_PARAMETERS
7082 if key in nic_dict])
7084 if 'bridge' in nic_dict:
7085 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7087 new_nic_params, new_filled_nic_params = \
7088 self._GetUpdatedParams(old_nic_params, update_params_dict,
7089 cluster.nicparams[constants.PP_DEFAULT],
7090 constants.NICS_PARAMETER_TYPES)
7091 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7092 self.nic_pinst[nic_op] = new_nic_params
7093 self.nic_pnew[nic_op] = new_filled_nic_params
7094 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7096 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7097 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7098 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7100 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7102 self.warn.append(msg)
7104 raise errors.OpPrereqError(msg)
7105 if new_nic_mode == constants.NIC_MODE_ROUTED:
7106 if 'ip' in nic_dict:
7107 nic_ip = nic_dict['ip']
7111 raise errors.OpPrereqError('Cannot set the nic ip to None'
7113 if 'mac' in nic_dict:
7114 nic_mac = nic_dict['mac']
7116 raise errors.OpPrereqError('Cannot set the nic mac to None')
7117 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7118 # otherwise generate the mac
7119 nic_dict['mac'] = self.cfg.GenerateMAC()
7121 # or validate/reserve the current one
7122 if self.cfg.IsMacInUse(nic_mac):
7123 raise errors.OpPrereqError("MAC address %s already in use"
7124 " in cluster" % nic_mac)
7127 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7128 raise errors.OpPrereqError("Disk operations not supported for"
7129 " diskless instances")
7130 for disk_op, disk_dict in self.op.disks:
7131 if disk_op == constants.DDM_REMOVE:
7132 if len(instance.disks) == 1:
7133 raise errors.OpPrereqError("Cannot remove the last disk of"
7135 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7136 ins_l = ins_l[pnode]
7137 msg = ins_l.fail_msg
7139 raise errors.OpPrereqError("Can't contact node %s: %s" %
7141 if instance.name in ins_l.payload:
7142 raise errors.OpPrereqError("Instance is running, can't remove"
7145 if (disk_op == constants.DDM_ADD and
7146 len(instance.nics) >= constants.MAX_DISKS):
7147 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7148 " add more" % constants.MAX_DISKS)
7149 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7151 if disk_op < 0 or disk_op >= len(instance.disks):
7152 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7154 (disk_op, len(instance.disks)))
7158 def Exec(self, feedback_fn):
7159 """Modifies an instance.
7161 All parameters take effect only at the next restart of the instance.
7164 # Process here the warnings from CheckPrereq, as we don't have a
7165 # feedback_fn there.
7166 for warn in self.warn:
7167 feedback_fn("WARNING: %s" % warn)
7170 instance = self.instance
7171 cluster = self.cluster
7173 for disk_op, disk_dict in self.op.disks:
7174 if disk_op == constants.DDM_REMOVE:
7175 # remove the last disk
7176 device = instance.disks.pop()
7177 device_idx = len(instance.disks)
7178 for node, disk in device.ComputeNodeTree(instance.primary_node):
7179 self.cfg.SetDiskID(disk, node)
7180 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7182 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7183 " continuing anyway", device_idx, node, msg)
7184 result.append(("disk/%d" % device_idx, "remove"))
7185 elif disk_op == constants.DDM_ADD:
7187 if instance.disk_template == constants.DT_FILE:
7188 file_driver, file_path = instance.disks[0].logical_id
7189 file_path = os.path.dirname(file_path)
7191 file_driver = file_path = None
7192 disk_idx_base = len(instance.disks)
7193 new_disk = _GenerateDiskTemplate(self,
7194 instance.disk_template,
7195 instance.name, instance.primary_node,
7196 instance.secondary_nodes,
7201 instance.disks.append(new_disk)
7202 info = _GetInstanceInfoText(instance)
7204 logging.info("Creating volume %s for instance %s",
7205 new_disk.iv_name, instance.name)
7206 # Note: this needs to be kept in sync with _CreateDisks
7208 for node in instance.all_nodes:
7209 f_create = node == instance.primary_node
7211 _CreateBlockDev(self, node, instance, new_disk,
7212 f_create, info, f_create)
7213 except errors.OpExecError, err:
7214 self.LogWarning("Failed to create volume %s (%s) on"
7216 new_disk.iv_name, new_disk, node, err)
7217 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7218 (new_disk.size, new_disk.mode)))
7220 # change a given disk
7221 instance.disks[disk_op].mode = disk_dict['mode']
7222 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7224 for nic_op, nic_dict in self.op.nics:
7225 if nic_op == constants.DDM_REMOVE:
7226 # remove the last nic
7227 del instance.nics[-1]
7228 result.append(("nic.%d" % len(instance.nics), "remove"))
7229 elif nic_op == constants.DDM_ADD:
7230 # mac and bridge should be set, by now
7231 mac = nic_dict['mac']
7232 ip = nic_dict.get('ip', None)
7233 nicparams = self.nic_pinst[constants.DDM_ADD]
7234 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7235 instance.nics.append(new_nic)
7236 result.append(("nic.%d" % (len(instance.nics) - 1),
7237 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7238 (new_nic.mac, new_nic.ip,
7239 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7240 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7243 for key in 'mac', 'ip':
7245 setattr(instance.nics[nic_op], key, nic_dict[key])
7246 if nic_op in self.nic_pnew:
7247 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
7248 for key, val in nic_dict.iteritems():
7249 result.append(("nic.%s/%d" % (key, nic_op), val))
7252 if self.op.hvparams:
7253 instance.hvparams = self.hv_inst
7254 for key, val in self.op.hvparams.iteritems():
7255 result.append(("hv/%s" % key, val))
7258 if self.op.beparams:
7259 instance.beparams = self.be_inst
7260 for key, val in self.op.beparams.iteritems():
7261 result.append(("be/%s" % key, val))
7263 self.cfg.Update(instance)
7268 class LUQueryExports(NoHooksLU):
7269 """Query the exports list
7272 _OP_REQP = ['nodes']
7275 def ExpandNames(self):
7276 self.needed_locks = {}
7277 self.share_locks[locking.LEVEL_NODE] = 1
7278 if not self.op.nodes:
7279 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7281 self.needed_locks[locking.LEVEL_NODE] = \
7282 _GetWantedNodes(self, self.op.nodes)
7284 def CheckPrereq(self):
7285 """Check prerequisites.
7288 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7290 def Exec(self, feedback_fn):
7291 """Compute the list of all the exported system images.
7294 @return: a dictionary with the structure node->(export-list)
7295 where export-list is a list of the instances exported on
7299 rpcresult = self.rpc.call_export_list(self.nodes)
7301 for node in rpcresult:
7302 if rpcresult[node].fail_msg:
7303 result[node] = False
7305 result[node] = rpcresult[node].payload
7310 class LUExportInstance(LogicalUnit):
7311 """Export an instance to an image in the cluster.
7314 HPATH = "instance-export"
7315 HTYPE = constants.HTYPE_INSTANCE
7316 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7319 def ExpandNames(self):
7320 self._ExpandAndLockInstance()
7321 # FIXME: lock only instance primary and destination node
7323 # Sad but true, for now we have do lock all nodes, as we don't know where
7324 # the previous export might be, and and in this LU we search for it and
7325 # remove it from its current node. In the future we could fix this by:
7326 # - making a tasklet to search (share-lock all), then create the new one,
7327 # then one to remove, after
7328 # - removing the removal operation altogether
7329 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7331 def DeclareLocks(self, level):
7332 """Last minute lock declaration."""
7333 # All nodes are locked anyway, so nothing to do here.
7335 def BuildHooksEnv(self):
7338 This will run on the master, primary node and target node.
7342 "EXPORT_NODE": self.op.target_node,
7343 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7345 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7346 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7347 self.op.target_node]
7350 def CheckPrereq(self):
7351 """Check prerequisites.
7353 This checks that the instance and node names are valid.
7356 instance_name = self.op.instance_name
7357 self.instance = self.cfg.GetInstanceInfo(instance_name)
7358 assert self.instance is not None, \
7359 "Cannot retrieve locked instance %s" % self.op.instance_name
7360 _CheckNodeOnline(self, self.instance.primary_node)
7362 self.dst_node = self.cfg.GetNodeInfo(
7363 self.cfg.ExpandNodeName(self.op.target_node))
7365 if self.dst_node is None:
7366 # This is wrong node name, not a non-locked node
7367 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
7368 _CheckNodeOnline(self, self.dst_node.name)
7369 _CheckNodeNotDrained(self, self.dst_node.name)
7371 # instance disk type verification
7372 for disk in self.instance.disks:
7373 if disk.dev_type == constants.LD_FILE:
7374 raise errors.OpPrereqError("Export not supported for instances with"
7375 " file-based disks")
7377 def Exec(self, feedback_fn):
7378 """Export an instance to an image in the cluster.
7381 instance = self.instance
7382 dst_node = self.dst_node
7383 src_node = instance.primary_node
7384 if self.op.shutdown:
7385 # shutdown the instance, but not the disks
7386 result = self.rpc.call_instance_shutdown(src_node, instance)
7387 result.Raise("Could not shutdown instance %s on"
7388 " node %s" % (instance.name, src_node))
7390 vgname = self.cfg.GetVGName()
7394 # set the disks ID correctly since call_instance_start needs the
7395 # correct drbd minor to create the symlinks
7396 for disk in instance.disks:
7397 self.cfg.SetDiskID(disk, src_node)
7402 for idx, disk in enumerate(instance.disks):
7403 # result.payload will be a snapshot of an lvm leaf of the one we passed
7404 result = self.rpc.call_blockdev_snapshot(src_node, disk)
7405 msg = result.fail_msg
7407 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7409 snap_disks.append(False)
7411 disk_id = (vgname, result.payload)
7412 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7413 logical_id=disk_id, physical_id=disk_id,
7414 iv_name=disk.iv_name)
7415 snap_disks.append(new_dev)
7418 if self.op.shutdown and instance.admin_up:
7419 result = self.rpc.call_instance_start(src_node, instance, None, None)
7420 msg = result.fail_msg
7422 _ShutdownInstanceDisks(self, instance)
7423 raise errors.OpExecError("Could not start instance: %s" % msg)
7425 # TODO: check for size
7427 cluster_name = self.cfg.GetClusterName()
7428 for idx, dev in enumerate(snap_disks):
7430 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7431 instance, cluster_name, idx)
7432 msg = result.fail_msg
7434 self.LogWarning("Could not export disk/%s from node %s to"
7435 " node %s: %s", idx, src_node, dst_node.name, msg)
7436 dresults.append(False)
7438 dresults.append(True)
7439 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7441 self.LogWarning("Could not remove snapshot for disk/%d from node"
7442 " %s: %s", idx, src_node, msg)
7444 dresults.append(False)
7446 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7448 msg = result.fail_msg
7450 self.LogWarning("Could not finalize export for instance %s"
7451 " on node %s: %s", instance.name, dst_node.name, msg)
7454 nodelist = self.cfg.GetNodeList()
7455 nodelist.remove(dst_node.name)
7457 # on one-node clusters nodelist will be empty after the removal
7458 # if we proceed the backup would be removed because OpQueryExports
7459 # substitutes an empty list with the full cluster node list.
7460 iname = instance.name
7462 exportlist = self.rpc.call_export_list(nodelist)
7463 for node in exportlist:
7464 if exportlist[node].fail_msg:
7466 if iname in exportlist[node].payload:
7467 msg = self.rpc.call_export_remove(node, iname).fail_msg
7469 self.LogWarning("Could not remove older export for instance %s"
7470 " on node %s: %s", iname, node, msg)
7471 return fin_resu, dresults
7474 class LURemoveExport(NoHooksLU):
7475 """Remove exports related to the named instance.
7478 _OP_REQP = ["instance_name"]
7481 def ExpandNames(self):
7482 self.needed_locks = {}
7483 # We need all nodes to be locked in order for RemoveExport to work, but we
7484 # don't need to lock the instance itself, as nothing will happen to it (and
7485 # we can remove exports also for a removed instance)
7486 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7488 def CheckPrereq(self):
7489 """Check prerequisites.
7493 def Exec(self, feedback_fn):
7494 """Remove any export.
7497 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7498 # If the instance was not found we'll try with the name that was passed in.
7499 # This will only work if it was an FQDN, though.
7501 if not instance_name:
7503 instance_name = self.op.instance_name
7505 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7506 exportlist = self.rpc.call_export_list(locked_nodes)
7508 for node in exportlist:
7509 msg = exportlist[node].fail_msg
7511 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7513 if instance_name in exportlist[node].payload:
7515 result = self.rpc.call_export_remove(node, instance_name)
7516 msg = result.fail_msg
7518 logging.error("Could not remove export for instance %s"
7519 " on node %s: %s", instance_name, node, msg)
7521 if fqdn_warn and not found:
7522 feedback_fn("Export not found. If trying to remove an export belonging"
7523 " to a deleted instance please use its Fully Qualified"
7527 class TagsLU(NoHooksLU):
7530 This is an abstract class which is the parent of all the other tags LUs.
7534 def ExpandNames(self):
7535 self.needed_locks = {}
7536 if self.op.kind == constants.TAG_NODE:
7537 name = self.cfg.ExpandNodeName(self.op.name)
7539 raise errors.OpPrereqError("Invalid node name (%s)" %
7542 self.needed_locks[locking.LEVEL_NODE] = name
7543 elif self.op.kind == constants.TAG_INSTANCE:
7544 name = self.cfg.ExpandInstanceName(self.op.name)
7546 raise errors.OpPrereqError("Invalid instance name (%s)" %
7549 self.needed_locks[locking.LEVEL_INSTANCE] = name
7551 def CheckPrereq(self):
7552 """Check prerequisites.
7555 if self.op.kind == constants.TAG_CLUSTER:
7556 self.target = self.cfg.GetClusterInfo()
7557 elif self.op.kind == constants.TAG_NODE:
7558 self.target = self.cfg.GetNodeInfo(self.op.name)
7559 elif self.op.kind == constants.TAG_INSTANCE:
7560 self.target = self.cfg.GetInstanceInfo(self.op.name)
7562 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7566 class LUGetTags(TagsLU):
7567 """Returns the tags of a given object.
7570 _OP_REQP = ["kind", "name"]
7573 def Exec(self, feedback_fn):
7574 """Returns the tag list.
7577 return list(self.target.GetTags())
7580 class LUSearchTags(NoHooksLU):
7581 """Searches the tags for a given pattern.
7584 _OP_REQP = ["pattern"]
7587 def ExpandNames(self):
7588 self.needed_locks = {}
7590 def CheckPrereq(self):
7591 """Check prerequisites.
7593 This checks the pattern passed for validity by compiling it.
7597 self.re = re.compile(self.op.pattern)
7598 except re.error, err:
7599 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
7600 (self.op.pattern, err))
7602 def Exec(self, feedback_fn):
7603 """Returns the tag list.
7607 tgts = [("/cluster", cfg.GetClusterInfo())]
7608 ilist = cfg.GetAllInstancesInfo().values()
7609 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7610 nlist = cfg.GetAllNodesInfo().values()
7611 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7613 for path, target in tgts:
7614 for tag in target.GetTags():
7615 if self.re.search(tag):
7616 results.append((path, tag))
7620 class LUAddTags(TagsLU):
7621 """Sets a tag on a given object.
7624 _OP_REQP = ["kind", "name", "tags"]
7627 def CheckPrereq(self):
7628 """Check prerequisites.
7630 This checks the type and length of the tag name and value.
7633 TagsLU.CheckPrereq(self)
7634 for tag in self.op.tags:
7635 objects.TaggableObject.ValidateTag(tag)
7637 def Exec(self, feedback_fn):
7642 for tag in self.op.tags:
7643 self.target.AddTag(tag)
7644 except errors.TagError, err:
7645 raise errors.OpExecError("Error while setting tag: %s" % str(err))
7647 self.cfg.Update(self.target)
7648 except errors.ConfigurationError:
7649 raise errors.OpRetryError("There has been a modification to the"
7650 " config file and the operation has been"
7651 " aborted. Please retry.")
7654 class LUDelTags(TagsLU):
7655 """Delete a list of tags from a given object.
7658 _OP_REQP = ["kind", "name", "tags"]
7661 def CheckPrereq(self):
7662 """Check prerequisites.
7664 This checks that we have the given tag.
7667 TagsLU.CheckPrereq(self)
7668 for tag in self.op.tags:
7669 objects.TaggableObject.ValidateTag(tag)
7670 del_tags = frozenset(self.op.tags)
7671 cur_tags = self.target.GetTags()
7672 if not del_tags <= cur_tags:
7673 diff_tags = del_tags - cur_tags
7674 diff_names = ["'%s'" % tag for tag in diff_tags]
7676 raise errors.OpPrereqError("Tag(s) %s not found" %
7677 (",".join(diff_names)))
7679 def Exec(self, feedback_fn):
7680 """Remove the tag from the object.
7683 for tag in self.op.tags:
7684 self.target.RemoveTag(tag)
7686 self.cfg.Update(self.target)
7687 except errors.ConfigurationError:
7688 raise errors.OpRetryError("There has been a modification to the"
7689 " config file and the operation has been"
7690 " aborted. Please retry.")
7693 class LUTestDelay(NoHooksLU):
7694 """Sleep for a specified amount of time.
7696 This LU sleeps on the master and/or nodes for a specified amount of
7700 _OP_REQP = ["duration", "on_master", "on_nodes"]
7703 def ExpandNames(self):
7704 """Expand names and set required locks.
7706 This expands the node list, if any.
7709 self.needed_locks = {}
7710 if self.op.on_nodes:
7711 # _GetWantedNodes can be used here, but is not always appropriate to use
7712 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
7714 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
7715 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
7717 def CheckPrereq(self):
7718 """Check prerequisites.
7722 def Exec(self, feedback_fn):
7723 """Do the actual sleep.
7726 if self.op.on_master:
7727 if not utils.TestDelay(self.op.duration):
7728 raise errors.OpExecError("Error during master delay test")
7729 if self.op.on_nodes:
7730 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
7731 for node, node_result in result.items():
7732 node_result.Raise("Failure during rpc call to node %s" % node)
7735 class IAllocator(object):
7736 """IAllocator framework.
7738 An IAllocator instance has three sets of attributes:
7739 - cfg that is needed to query the cluster
7740 - input data (all members of the _KEYS class attribute are required)
7741 - four buffer attributes (in|out_data|text), that represent the
7742 input (to the external script) in text and data structure format,
7743 and the output from it, again in two formats
7744 - the result variables from the script (success, info, nodes) for
7749 "mem_size", "disks", "disk_template",
7750 "os", "tags", "nics", "vcpus", "hypervisor",
7756 def __init__(self, cfg, rpc, mode, name, **kwargs):
7759 # init buffer variables
7760 self.in_text = self.out_text = self.in_data = self.out_data = None
7761 # init all input fields so that pylint is happy
7764 self.mem_size = self.disks = self.disk_template = None
7765 self.os = self.tags = self.nics = self.vcpus = None
7766 self.hypervisor = None
7767 self.relocate_from = None
7769 self.required_nodes = None
7770 # init result fields
7771 self.success = self.info = self.nodes = None
7772 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7773 keyset = self._ALLO_KEYS
7774 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7775 keyset = self._RELO_KEYS
7777 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
7778 " IAllocator" % self.mode)
7780 if key not in keyset:
7781 raise errors.ProgrammerError("Invalid input parameter '%s' to"
7782 " IAllocator" % key)
7783 setattr(self, key, kwargs[key])
7785 if key not in kwargs:
7786 raise errors.ProgrammerError("Missing input parameter '%s' to"
7787 " IAllocator" % key)
7788 self._BuildInputData()
7790 def _ComputeClusterData(self):
7791 """Compute the generic allocator input data.
7793 This is the data that is independent of the actual operation.
7797 cluster_info = cfg.GetClusterInfo()
7800 "version": constants.IALLOCATOR_VERSION,
7801 "cluster_name": cfg.GetClusterName(),
7802 "cluster_tags": list(cluster_info.GetTags()),
7803 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
7804 # we don't have job IDs
7806 iinfo = cfg.GetAllInstancesInfo().values()
7807 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
7811 node_list = cfg.GetNodeList()
7813 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7814 hypervisor_name = self.hypervisor
7815 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7816 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
7818 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
7821 self.rpc.call_all_instances_info(node_list,
7822 cluster_info.enabled_hypervisors)
7823 for nname, nresult in node_data.items():
7824 # first fill in static (config-based) values
7825 ninfo = cfg.GetNodeInfo(nname)
7827 "tags": list(ninfo.GetTags()),
7828 "primary_ip": ninfo.primary_ip,
7829 "secondary_ip": ninfo.secondary_ip,
7830 "offline": ninfo.offline,
7831 "drained": ninfo.drained,
7832 "master_candidate": ninfo.master_candidate,
7835 if not (ninfo.offline or ninfo.drained):
7836 nresult.Raise("Can't get data for node %s" % nname)
7837 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
7839 remote_info = nresult.payload
7841 for attr in ['memory_total', 'memory_free', 'memory_dom0',
7842 'vg_size', 'vg_free', 'cpu_total']:
7843 if attr not in remote_info:
7844 raise errors.OpExecError("Node '%s' didn't return attribute"
7845 " '%s'" % (nname, attr))
7846 if not isinstance(remote_info[attr], int):
7847 raise errors.OpExecError("Node '%s' returned invalid value"
7849 (nname, attr, remote_info[attr]))
7850 # compute memory used by primary instances
7851 i_p_mem = i_p_up_mem = 0
7852 for iinfo, beinfo in i_list:
7853 if iinfo.primary_node == nname:
7854 i_p_mem += beinfo[constants.BE_MEMORY]
7855 if iinfo.name not in node_iinfo[nname].payload:
7858 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
7859 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
7860 remote_info['memory_free'] -= max(0, i_mem_diff)
7863 i_p_up_mem += beinfo[constants.BE_MEMORY]
7865 # compute memory used by instances
7867 "total_memory": remote_info['memory_total'],
7868 "reserved_memory": remote_info['memory_dom0'],
7869 "free_memory": remote_info['memory_free'],
7870 "total_disk": remote_info['vg_size'],
7871 "free_disk": remote_info['vg_free'],
7872 "total_cpus": remote_info['cpu_total'],
7873 "i_pri_memory": i_p_mem,
7874 "i_pri_up_memory": i_p_up_mem,
7878 node_results[nname] = pnr
7879 data["nodes"] = node_results
7883 for iinfo, beinfo in i_list:
7885 for nic in iinfo.nics:
7886 filled_params = objects.FillDict(
7887 cluster_info.nicparams[constants.PP_DEFAULT],
7889 nic_dict = {"mac": nic.mac,
7891 "mode": filled_params[constants.NIC_MODE],
7892 "link": filled_params[constants.NIC_LINK],
7894 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7895 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7896 nic_data.append(nic_dict)
7898 "tags": list(iinfo.GetTags()),
7899 "admin_up": iinfo.admin_up,
7900 "vcpus": beinfo[constants.BE_VCPUS],
7901 "memory": beinfo[constants.BE_MEMORY],
7903 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7905 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7906 "disk_template": iinfo.disk_template,
7907 "hypervisor": iinfo.hypervisor,
7909 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7911 instance_data[iinfo.name] = pir
7913 data["instances"] = instance_data
7917 def _AddNewInstance(self):
7918 """Add new instance data to allocator structure.
7920 This in combination with _AllocatorGetClusterData will create the
7921 correct structure needed as input for the allocator.
7923 The checks for the completeness of the opcode must have already been
7929 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7931 if self.disk_template in constants.DTS_NET_MIRROR:
7932 self.required_nodes = 2
7934 self.required_nodes = 1
7938 "disk_template": self.disk_template,
7941 "vcpus": self.vcpus,
7942 "memory": self.mem_size,
7943 "disks": self.disks,
7944 "disk_space_total": disk_space,
7946 "required_nodes": self.required_nodes,
7948 data["request"] = request
7950 def _AddRelocateInstance(self):
7951 """Add relocate instance data to allocator structure.
7953 This in combination with _IAllocatorGetClusterData will create the
7954 correct structure needed as input for the allocator.
7956 The checks for the completeness of the opcode must have already been
7960 instance = self.cfg.GetInstanceInfo(self.name)
7961 if instance is None:
7962 raise errors.ProgrammerError("Unknown instance '%s' passed to"
7963 " IAllocator" % self.name)
7965 if instance.disk_template not in constants.DTS_NET_MIRROR:
7966 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7968 if len(instance.secondary_nodes) != 1:
7969 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7971 self.required_nodes = 1
7972 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7973 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7978 "disk_space_total": disk_space,
7979 "required_nodes": self.required_nodes,
7980 "relocate_from": self.relocate_from,
7982 self.in_data["request"] = request
7984 def _BuildInputData(self):
7985 """Build input data structures.
7988 self._ComputeClusterData()
7990 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7991 self._AddNewInstance()
7993 self._AddRelocateInstance()
7995 self.in_text = serializer.Dump(self.in_data)
7997 def Run(self, name, validate=True, call_fn=None):
7998 """Run an instance allocator and return the results.
8002 call_fn = self.rpc.call_iallocator_runner
8004 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8005 result.Raise("Failure while running the iallocator script")
8007 self.out_text = result.payload
8009 self._ValidateResult()
8011 def _ValidateResult(self):
8012 """Process the allocator results.
8014 This will process and if successful save the result in
8015 self.out_data and the other parameters.
8019 rdict = serializer.Load(self.out_text)
8020 except Exception, err:
8021 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8023 if not isinstance(rdict, dict):
8024 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8026 for key in "success", "info", "nodes":
8027 if key not in rdict:
8028 raise errors.OpExecError("Can't parse iallocator results:"
8029 " missing key '%s'" % key)
8030 setattr(self, key, rdict[key])
8032 if not isinstance(rdict["nodes"], list):
8033 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8035 self.out_data = rdict
8038 class LUTestAllocator(NoHooksLU):
8039 """Run allocator tests.
8041 This LU runs the allocator tests
8044 _OP_REQP = ["direction", "mode", "name"]
8046 def CheckPrereq(self):
8047 """Check prerequisites.
8049 This checks the opcode parameters depending on the director and mode test.
8052 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8053 for attr in ["name", "mem_size", "disks", "disk_template",
8054 "os", "tags", "nics", "vcpus"]:
8055 if not hasattr(self.op, attr):
8056 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8058 iname = self.cfg.ExpandInstanceName(self.op.name)
8059 if iname is not None:
8060 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8062 if not isinstance(self.op.nics, list):
8063 raise errors.OpPrereqError("Invalid parameter 'nics'")
8064 for row in self.op.nics:
8065 if (not isinstance(row, dict) or
8068 "bridge" not in row):
8069 raise errors.OpPrereqError("Invalid contents of the"
8070 " 'nics' parameter")
8071 if not isinstance(self.op.disks, list):
8072 raise errors.OpPrereqError("Invalid parameter 'disks'")
8073 for row in self.op.disks:
8074 if (not isinstance(row, dict) or
8075 "size" not in row or
8076 not isinstance(row["size"], int) or
8077 "mode" not in row or
8078 row["mode"] not in ['r', 'w']):
8079 raise errors.OpPrereqError("Invalid contents of the"
8080 " 'disks' parameter")
8081 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8082 self.op.hypervisor = self.cfg.GetHypervisorType()
8083 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8084 if not hasattr(self.op, "name"):
8085 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
8086 fname = self.cfg.ExpandInstanceName(self.op.name)
8088 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8090 self.op.name = fname
8091 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8093 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8096 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8097 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8098 raise errors.OpPrereqError("Missing allocator name")
8099 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8100 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8103 def Exec(self, feedback_fn):
8104 """Run the allocator test.
8107 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8108 ial = IAllocator(self.cfg, self.rpc,
8111 mem_size=self.op.mem_size,
8112 disks=self.op.disks,
8113 disk_template=self.op.disk_template,
8117 vcpus=self.op.vcpus,
8118 hypervisor=self.op.hypervisor,
8121 ial = IAllocator(self.cfg, self.rpc,
8124 relocate_from=list(self.relocate_from),
8127 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8128 result = ial.in_text
8130 ial.Run(self.op.allocator, validate=False)
8131 result = ial.out_text