4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
611 def _NICListToTuple(lu, nics):
612 """Build a list of nic information tuples.
614 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615 value in LUQueryInstanceData.
617 @type lu: L{LogicalUnit}
618 @param lu: the logical unit on whose behalf we execute
619 @type nics: list of L{objects.NIC}
620 @param nics: list of nics to convert to hooks tuples
624 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
628 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629 mode = filled_params[constants.NIC_MODE]
630 link = filled_params[constants.NIC_LINK]
631 hooks_nics.append((ip, mac, mode, link))
635 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636 """Builds instance related env variables for hooks from an object.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type instance: L{objects.Instance}
641 @param instance: the instance for which we should build the
644 @param override: dictionary with key/values that will override
647 @return: the hook environment dictionary
650 cluster = lu.cfg.GetClusterInfo()
651 bep = cluster.FillBE(instance)
652 hvp = cluster.FillHV(instance)
654 'name': instance.name,
655 'primary_node': instance.primary_node,
656 'secondary_nodes': instance.secondary_nodes,
657 'os_type': instance.os,
658 'status': instance.admin_up,
659 'memory': bep[constants.BE_MEMORY],
660 'vcpus': bep[constants.BE_VCPUS],
661 'nics': _NICListToTuple(lu, instance.nics),
662 'disk_template': instance.disk_template,
663 'disks': [(disk.size, disk.mode) for disk in instance.disks],
666 'hypervisor_name': instance.hypervisor,
669 args.update(override)
670 return _BuildInstanceHookEnv(**args)
673 def _AdjustCandidatePool(lu):
674 """Adjust the candidate pool after node operations.
677 mod_list = lu.cfg.MaintainCandidatePool()
679 lu.LogInfo("Promoted nodes to master candidate role: %s",
680 ", ".join(node.name for node in mod_list))
681 for name in mod_list:
682 lu.context.ReaddNode(name)
683 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
685 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
689 def _CheckNicsBridgesExist(lu, target_nics, target_node,
690 profile=constants.PP_DEFAULT):
691 """Check that the brigdes needed by a list of nics exist.
694 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696 for nic in target_nics]
697 brlist = [params[constants.NIC_LINK] for params in paramslist
698 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
700 result = lu.rpc.call_bridges_exist(target_node, brlist)
701 result.Raise("Error checking bridges on destination node '%s'" %
702 target_node, prereq=True)
705 def _CheckInstanceBridgesExist(lu, instance, node=None):
706 """Check that the brigdes needed by an instance exist.
710 node = instance.primary_node
711 _CheckNicsBridgesExist(lu, instance.nics, node)
714 def _GetNodeInstancesInner(cfg, fn):
715 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
718 def _GetNodeInstances(cfg, node_name):
719 """Returns a list of all primary and secondary instances on a node.
723 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
726 def _GetNodePrimaryInstances(cfg, node_name):
727 """Returns primary instances on a node.
730 return _GetNodeInstancesInner(cfg,
731 lambda inst: node_name == inst.primary_node)
734 def _GetNodeSecondaryInstances(cfg, node_name):
735 """Returns secondary instances on a node.
738 return _GetNodeInstancesInner(cfg,
739 lambda inst: node_name in inst.secondary_nodes)
742 def _GetStorageTypeArgs(cfg, storage_type):
743 """Returns the arguments for a storage type.
746 # Special case for file storage
747 if storage_type == constants.ST_FILE:
748 # storage.FileStorage wants a list of storage directories
749 return [[cfg.GetFileStorageDir()]]
754 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
757 for dev in instance.disks:
758 cfg.SetDiskID(dev, node_name)
760 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
761 result.Raise("Failed to get disk status from node %s" % node_name,
764 for idx, bdev_status in enumerate(result.payload):
765 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
771 class LUPostInitCluster(LogicalUnit):
772 """Logical unit for running hooks after cluster initialization.
775 HPATH = "cluster-init"
776 HTYPE = constants.HTYPE_CLUSTER
779 def BuildHooksEnv(self):
783 env = {"OP_TARGET": self.cfg.GetClusterName()}
784 mn = self.cfg.GetMasterNode()
787 def CheckPrereq(self):
788 """No prerequisites to check.
793 def Exec(self, feedback_fn):
800 class LUDestroyCluster(NoHooksLU):
801 """Logical unit for destroying the cluster.
806 def CheckPrereq(self):
807 """Check prerequisites.
809 This checks whether the cluster is empty.
811 Any errors are signaled by raising errors.OpPrereqError.
814 master = self.cfg.GetMasterNode()
816 nodelist = self.cfg.GetNodeList()
817 if len(nodelist) != 1 or nodelist[0] != master:
818 raise errors.OpPrereqError("There are still %d node(s) in"
819 " this cluster." % (len(nodelist) - 1))
820 instancelist = self.cfg.GetInstanceList()
822 raise errors.OpPrereqError("There are still %d instance(s) in"
823 " this cluster." % len(instancelist))
825 def Exec(self, feedback_fn):
826 """Destroys the cluster.
829 master = self.cfg.GetMasterNode()
830 result = self.rpc.call_node_stop_master(master, False)
831 result.Raise("Could not disable the master role")
832 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
833 utils.CreateBackup(priv_key)
834 utils.CreateBackup(pub_key)
838 class LUVerifyCluster(LogicalUnit):
839 """Verifies the cluster status.
842 HPATH = "cluster-verify"
843 HTYPE = constants.HTYPE_CLUSTER
844 _OP_REQP = ["skip_checks"]
847 def ExpandNames(self):
848 self.needed_locks = {
849 locking.LEVEL_NODE: locking.ALL_SET,
850 locking.LEVEL_INSTANCE: locking.ALL_SET,
852 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
854 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
855 node_result, feedback_fn, master_files,
857 """Run multiple tests against a node.
861 - compares ganeti version
862 - checks vg existence and size > 20G
863 - checks config file checksum
864 - checks ssh to other nodes
866 @type nodeinfo: L{objects.Node}
867 @param nodeinfo: the node to check
868 @param file_list: required list of files
869 @param local_cksum: dictionary of local files and their checksums
870 @param node_result: the results from the node
871 @param feedback_fn: function used to accumulate results
872 @param master_files: list of files that only masters should have
873 @param drbd_map: the useddrbd minors for this node, in
874 form of minor: (instance, must_exist) which correspond to instances
875 and their running status
876 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
881 # main result, node_result should be a non-empty dict
882 if not node_result or not isinstance(node_result, dict):
883 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
886 # compares ganeti version
887 local_version = constants.PROTOCOL_VERSION
888 remote_version = node_result.get('version', None)
889 if not (remote_version and isinstance(remote_version, (list, tuple)) and
890 len(remote_version) == 2):
891 feedback_fn(" - ERROR: connection to %s failed" % (node))
894 if local_version != remote_version[0]:
895 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
896 " node %s %s" % (local_version, node, remote_version[0]))
899 # node seems compatible, we can actually try to look into its results
903 # full package version
904 if constants.RELEASE_VERSION != remote_version[1]:
905 feedback_fn(" - WARNING: software version mismatch: master %s,"
907 (constants.RELEASE_VERSION, node, remote_version[1]))
909 # checks vg existence and size > 20G
910 if vg_name is not None:
911 vglist = node_result.get(constants.NV_VGLIST, None)
913 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
917 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
918 constants.MIN_VG_SIZE)
920 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
923 # checks config file checksum
925 remote_cksum = node_result.get(constants.NV_FILELIST, None)
926 if not isinstance(remote_cksum, dict):
928 feedback_fn(" - ERROR: node hasn't returned file checksum data")
930 for file_name in file_list:
931 node_is_mc = nodeinfo.master_candidate
932 must_have_file = file_name not in master_files
933 if file_name not in remote_cksum:
934 if node_is_mc or must_have_file:
936 feedback_fn(" - ERROR: file '%s' missing" % file_name)
937 elif remote_cksum[file_name] != local_cksum[file_name]:
938 if node_is_mc or must_have_file:
940 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
942 # not candidate and this is not a must-have file
944 feedback_fn(" - ERROR: file '%s' should not exist on non master"
945 " candidates (and the file is outdated)" % file_name)
947 # all good, except non-master/non-must have combination
948 if not node_is_mc and not must_have_file:
949 feedback_fn(" - ERROR: file '%s' should not exist on non master"
950 " candidates" % file_name)
954 if constants.NV_NODELIST not in node_result:
956 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
958 if node_result[constants.NV_NODELIST]:
960 for node in node_result[constants.NV_NODELIST]:
961 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
962 (node, node_result[constants.NV_NODELIST][node]))
964 if constants.NV_NODENETTEST not in node_result:
966 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
968 if node_result[constants.NV_NODENETTEST]:
970 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
972 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
973 (node, node_result[constants.NV_NODENETTEST][node]))
975 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
976 if isinstance(hyp_result, dict):
977 for hv_name, hv_result in hyp_result.iteritems():
978 if hv_result is not None:
979 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
980 (hv_name, hv_result))
982 # check used drbd list
983 if vg_name is not None:
984 used_minors = node_result.get(constants.NV_DRBDLIST, [])
985 if not isinstance(used_minors, (tuple, list)):
986 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
989 for minor, (iname, must_exist) in drbd_map.items():
990 if minor not in used_minors and must_exist:
991 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
992 " not active" % (minor, iname))
994 for minor in used_minors:
995 if minor not in drbd_map:
996 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
1002 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1003 node_instance, feedback_fn, n_offline):
1004 """Verify an instance.
1006 This function checks to see if the required block devices are
1007 available on the instance's node.
1012 node_current = instanceconfig.primary_node
1014 node_vol_should = {}
1015 instanceconfig.MapLVsByNode(node_vol_should)
1017 for node in node_vol_should:
1018 if node in n_offline:
1019 # ignore missing volumes on offline nodes
1021 for volume in node_vol_should[node]:
1022 if node not in node_vol_is or volume not in node_vol_is[node]:
1023 feedback_fn(" - ERROR: volume %s missing on node %s" %
1027 if instanceconfig.admin_up:
1028 if ((node_current not in node_instance or
1029 not instance in node_instance[node_current]) and
1030 node_current not in n_offline):
1031 feedback_fn(" - ERROR: instance %s not running on node %s" %
1032 (instance, node_current))
1035 for node in node_instance:
1036 if (not node == node_current):
1037 if instance in node_instance[node]:
1038 feedback_fn(" - ERROR: instance %s should not run on node %s" %
1044 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
1045 """Verify if there are any unknown volumes in the cluster.
1047 The .os, .swap and backup volumes are ignored. All other volumes are
1048 reported as unknown.
1053 for node in node_vol_is:
1054 for volume in node_vol_is[node]:
1055 if node not in node_vol_should or volume not in node_vol_should[node]:
1056 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
1061 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
1062 """Verify the list of running instances.
1064 This checks what instances are running but unknown to the cluster.
1068 for node in node_instance:
1069 for runninginstance in node_instance[node]:
1070 if runninginstance not in instancelist:
1071 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
1072 (runninginstance, node))
1076 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1077 """Verify N+1 Memory Resilience.
1079 Check that if one single node dies we can still start all the instances it
1085 for node, nodeinfo in node_info.iteritems():
1086 # This code checks that every node which is now listed as secondary has
1087 # enough memory to host all instances it is supposed to should a single
1088 # other node in the cluster fail.
1089 # FIXME: not ready for failover to an arbitrary node
1090 # FIXME: does not support file-backed instances
1091 # WARNING: we currently take into account down instances as well as up
1092 # ones, considering that even if they're down someone might want to start
1093 # them even in the event of a node failure.
1094 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1096 for instance in instances:
1097 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1098 if bep[constants.BE_AUTO_BALANCE]:
1099 needed_mem += bep[constants.BE_MEMORY]
1100 if nodeinfo['mfree'] < needed_mem:
1101 feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
1102 " failovers should node %s fail" % (node, prinode))
1106 def CheckPrereq(self):
1107 """Check prerequisites.
1109 Transform the list of checks we're going to skip into a set and check that
1110 all its members are valid.
1113 self.skip_set = frozenset(self.op.skip_checks)
1114 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1115 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1117 def BuildHooksEnv(self):
1120 Cluster-Verify hooks just ran in the post phase and their failure makes
1121 the output be logged in the verify output and the verification to fail.
1124 all_nodes = self.cfg.GetNodeList()
1126 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1128 for node in self.cfg.GetAllNodesInfo().values():
1129 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1131 return env, [], all_nodes
1133 def Exec(self, feedback_fn):
1134 """Verify integrity of cluster, performing various test on nodes.
1138 feedback_fn("* Verifying global settings")
1139 for msg in self.cfg.VerifyConfig():
1140 feedback_fn(" - ERROR: %s" % msg)
1142 vg_name = self.cfg.GetVGName()
1143 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1144 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1145 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1146 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1147 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1148 for iname in instancelist)
1149 i_non_redundant = [] # Non redundant instances
1150 i_non_a_balanced = [] # Non auto-balanced instances
1151 n_offline = [] # List of offline nodes
1152 n_drained = [] # List of nodes being drained
1158 # FIXME: verify OS list
1159 # do local checksums
1160 master_files = [constants.CLUSTER_CONF_FILE]
1162 file_names = ssconf.SimpleStore().GetFileList()
1163 file_names.append(constants.SSL_CERT_FILE)
1164 file_names.append(constants.RAPI_CERT_FILE)
1165 file_names.extend(master_files)
1167 local_checksums = utils.FingerprintFiles(file_names)
1169 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1170 node_verify_param = {
1171 constants.NV_FILELIST: file_names,
1172 constants.NV_NODELIST: [node.name for node in nodeinfo
1173 if not node.offline],
1174 constants.NV_HYPERVISOR: hypervisors,
1175 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1176 node.secondary_ip) for node in nodeinfo
1177 if not node.offline],
1178 constants.NV_INSTANCELIST: hypervisors,
1179 constants.NV_VERSION: None,
1180 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1182 if vg_name is not None:
1183 node_verify_param[constants.NV_VGLIST] = None
1184 node_verify_param[constants.NV_LVLIST] = vg_name
1185 node_verify_param[constants.NV_DRBDLIST] = None
1186 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1187 self.cfg.GetClusterName())
1189 cluster = self.cfg.GetClusterInfo()
1190 master_node = self.cfg.GetMasterNode()
1191 all_drbd_map = self.cfg.ComputeDRBDMap()
1193 for node_i in nodeinfo:
1197 feedback_fn("* Skipping offline node %s" % (node,))
1198 n_offline.append(node)
1201 if node == master_node:
1203 elif node_i.master_candidate:
1204 ntype = "master candidate"
1205 elif node_i.drained:
1207 n_drained.append(node)
1210 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1212 msg = all_nvinfo[node].fail_msg
1214 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1218 nresult = all_nvinfo[node].payload
1220 for minor, instance in all_drbd_map[node].items():
1221 if instance not in instanceinfo:
1222 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1224 # ghost instance should not be running, but otherwise we
1225 # don't give double warnings (both ghost instance and
1226 # unallocated minor in use)
1227 node_drbd[minor] = (instance, False)
1229 instance = instanceinfo[instance]
1230 node_drbd[minor] = (instance.name, instance.admin_up)
1231 result = self._VerifyNode(node_i, file_names, local_checksums,
1232 nresult, feedback_fn, master_files,
1236 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1238 node_volume[node] = {}
1239 elif isinstance(lvdata, basestring):
1240 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1241 (node, utils.SafeEncode(lvdata)))
1243 node_volume[node] = {}
1244 elif not isinstance(lvdata, dict):
1245 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1249 node_volume[node] = lvdata
1252 idata = nresult.get(constants.NV_INSTANCELIST, None)
1253 if not isinstance(idata, list):
1254 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1259 node_instance[node] = idata
1262 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1263 if not isinstance(nodeinfo, dict):
1264 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1270 "mfree": int(nodeinfo['memory_free']),
1273 # dictionary holding all instances this node is secondary for,
1274 # grouped by their primary node. Each key is a cluster node, and each
1275 # value is a list of instances which have the key as primary and the
1276 # current node as secondary. this is handy to calculate N+1 memory
1277 # availability if you can only failover from a primary to its
1279 "sinst-by-pnode": {},
1281 # FIXME: devise a free space model for file based instances as well
1282 if vg_name is not None:
1283 if (constants.NV_VGLIST not in nresult or
1284 vg_name not in nresult[constants.NV_VGLIST]):
1285 feedback_fn(" - ERROR: node %s didn't return data for the"
1286 " volume group '%s' - it is either missing or broken" %
1290 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1291 except (ValueError, KeyError):
1292 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1293 " from node %s" % (node,))
1297 node_vol_should = {}
1299 for instance in instancelist:
1300 feedback_fn("* Verifying instance %s" % instance)
1301 inst_config = instanceinfo[instance]
1302 result = self._VerifyInstance(instance, inst_config, node_volume,
1303 node_instance, feedback_fn, n_offline)
1305 inst_nodes_offline = []
1307 inst_config.MapLVsByNode(node_vol_should)
1309 instance_cfg[instance] = inst_config
1311 pnode = inst_config.primary_node
1312 if pnode in node_info:
1313 node_info[pnode]['pinst'].append(instance)
1314 elif pnode not in n_offline:
1315 feedback_fn(" - ERROR: instance %s, connection to primary node"
1316 " %s failed" % (instance, pnode))
1319 if pnode in n_offline:
1320 inst_nodes_offline.append(pnode)
1322 # If the instance is non-redundant we cannot survive losing its primary
1323 # node, so we are not N+1 compliant. On the other hand we have no disk
1324 # templates with more than one secondary so that situation is not well
1326 # FIXME: does not support file-backed instances
1327 if len(inst_config.secondary_nodes) == 0:
1328 i_non_redundant.append(instance)
1329 elif len(inst_config.secondary_nodes) > 1:
1330 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1333 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1334 i_non_a_balanced.append(instance)
1336 for snode in inst_config.secondary_nodes:
1337 if snode in node_info:
1338 node_info[snode]['sinst'].append(instance)
1339 if pnode not in node_info[snode]['sinst-by-pnode']:
1340 node_info[snode]['sinst-by-pnode'][pnode] = []
1341 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1342 elif snode not in n_offline:
1343 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1344 " %s failed" % (instance, snode))
1346 if snode in n_offline:
1347 inst_nodes_offline.append(snode)
1349 if inst_nodes_offline:
1350 # warn that the instance lives on offline nodes, and set bad=True
1351 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1352 ", ".join(inst_nodes_offline))
1355 feedback_fn("* Verifying orphan volumes")
1356 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1360 feedback_fn("* Verifying remaining instances")
1361 result = self._VerifyOrphanInstances(instancelist, node_instance,
1365 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1366 feedback_fn("* Verifying N+1 Memory redundancy")
1367 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1370 feedback_fn("* Other Notes")
1372 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1373 % len(i_non_redundant))
1375 if i_non_a_balanced:
1376 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1377 % len(i_non_a_balanced))
1380 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1383 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1387 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1388 """Analyze the post-hooks' result
1390 This method analyses the hook result, handles it, and sends some
1391 nicely-formatted feedback back to the user.
1393 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1394 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1395 @param hooks_results: the results of the multi-node hooks rpc call
1396 @param feedback_fn: function used send feedback back to the caller
1397 @param lu_result: previous Exec result
1398 @return: the new Exec result, based on the previous result
1402 # We only really run POST phase hooks, and are only interested in
1404 if phase == constants.HOOKS_PHASE_POST:
1405 # Used to change hooks' output to proper indentation
1406 indent_re = re.compile('^', re.M)
1407 feedback_fn("* Hooks Results")
1408 if not hooks_results:
1409 feedback_fn(" - ERROR: general communication failure")
1412 for node_name in hooks_results:
1413 show_node_header = True
1414 res = hooks_results[node_name]
1418 # no need to warn or set fail return value
1420 feedback_fn(" Communication failure in hooks execution: %s" %
1424 for script, hkr, output in res.payload:
1425 if hkr == constants.HKR_FAIL:
1426 # The node header is only shown once, if there are
1427 # failing hooks on that node
1428 if show_node_header:
1429 feedback_fn(" Node %s:" % node_name)
1430 show_node_header = False
1431 feedback_fn(" ERROR: Script %s failed, output:" % script)
1432 output = indent_re.sub(' ', output)
1433 feedback_fn("%s" % output)
1439 class LUVerifyDisks(NoHooksLU):
1440 """Verifies the cluster disks status.
1446 def ExpandNames(self):
1447 self.needed_locks = {
1448 locking.LEVEL_NODE: locking.ALL_SET,
1449 locking.LEVEL_INSTANCE: locking.ALL_SET,
1451 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1453 def CheckPrereq(self):
1454 """Check prerequisites.
1456 This has no prerequisites.
1461 def Exec(self, feedback_fn):
1462 """Verify integrity of cluster disks.
1464 @rtype: tuple of three items
1465 @return: a tuple of (dict of node-to-node_error, list of instances
1466 which need activate-disks, dict of instance: (node, volume) for
1470 result = res_nodes, res_instances, res_missing = {}, [], {}
1472 vg_name = self.cfg.GetVGName()
1473 nodes = utils.NiceSort(self.cfg.GetNodeList())
1474 instances = [self.cfg.GetInstanceInfo(name)
1475 for name in self.cfg.GetInstanceList()]
1478 for inst in instances:
1480 if (not inst.admin_up or
1481 inst.disk_template not in constants.DTS_NET_MIRROR):
1483 inst.MapLVsByNode(inst_lvs)
1484 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1485 for node, vol_list in inst_lvs.iteritems():
1486 for vol in vol_list:
1487 nv_dict[(node, vol)] = inst
1492 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1496 node_res = node_lvs[node]
1497 if node_res.offline:
1499 msg = node_res.fail_msg
1501 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1502 res_nodes[node] = msg
1505 lvs = node_res.payload
1506 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1507 inst = nv_dict.pop((node, lv_name), None)
1508 if (not lv_online and inst is not None
1509 and inst.name not in res_instances):
1510 res_instances.append(inst.name)
1512 # any leftover items in nv_dict are missing LVs, let's arrange the
1514 for key, inst in nv_dict.iteritems():
1515 if inst.name not in res_missing:
1516 res_missing[inst.name] = []
1517 res_missing[inst.name].append(key)
1522 class LURepairDiskSizes(NoHooksLU):
1523 """Verifies the cluster disks sizes.
1526 _OP_REQP = ["instances"]
1529 def ExpandNames(self):
1531 if not isinstance(self.op.instances, list):
1532 raise errors.OpPrereqError("Invalid argument type 'instances'")
1534 if self.op.instances:
1535 self.wanted_names = []
1536 for name in self.op.instances:
1537 full_name = self.cfg.ExpandInstanceName(name)
1538 if full_name is None:
1539 raise errors.OpPrereqError("Instance '%s' not known" % name)
1540 self.wanted_names.append(full_name)
1541 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1542 self.needed_locks = {
1543 locking.LEVEL_NODE: [],
1544 locking.LEVEL_INSTANCE: self.wanted_names,
1546 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1548 self.wanted_names = None
1549 self.needed_locks = {
1550 locking.LEVEL_NODE: locking.ALL_SET,
1551 locking.LEVEL_INSTANCE: locking.ALL_SET,
1553 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1555 def DeclareLocks(self, level):
1556 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1557 self._LockInstancesNodes(primary_only=True)
1559 def CheckPrereq(self):
1560 """Check prerequisites.
1562 This only checks the optional instance list against the existing names.
1565 if self.wanted_names is None:
1566 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1568 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1569 in self.wanted_names]
1571 def Exec(self, feedback_fn):
1572 """Verify the size of cluster disks.
1575 # TODO: check child disks too
1576 # TODO: check differences in size between primary/secondary nodes
1578 for instance in self.wanted_instances:
1579 pnode = instance.primary_node
1580 if pnode not in per_node_disks:
1581 per_node_disks[pnode] = []
1582 for idx, disk in enumerate(instance.disks):
1583 per_node_disks[pnode].append((instance, idx, disk))
1586 for node, dskl in per_node_disks.items():
1587 result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1589 self.LogWarning("Failure in blockdev_getsizes call to node"
1590 " %s, ignoring", node)
1592 if len(result.data) != len(dskl):
1593 self.LogWarning("Invalid result from node %s, ignoring node results",
1596 for ((instance, idx, disk), size) in zip(dskl, result.data):
1598 self.LogWarning("Disk %d of instance %s did not return size"
1599 " information, ignoring", idx, instance.name)
1601 if not isinstance(size, (int, long)):
1602 self.LogWarning("Disk %d of instance %s did not return valid"
1603 " size information, ignoring", idx, instance.name)
1606 if size != disk.size:
1607 self.LogInfo("Disk %d of instance %s has mismatched size,"
1608 " correcting: recorded %d, actual %d", idx,
1609 instance.name, disk.size, size)
1611 self.cfg.Update(instance)
1612 changed.append((instance.name, idx, size))
1616 class LURenameCluster(LogicalUnit):
1617 """Rename the cluster.
1620 HPATH = "cluster-rename"
1621 HTYPE = constants.HTYPE_CLUSTER
1624 def BuildHooksEnv(self):
1629 "OP_TARGET": self.cfg.GetClusterName(),
1630 "NEW_NAME": self.op.name,
1632 mn = self.cfg.GetMasterNode()
1633 return env, [mn], [mn]
1635 def CheckPrereq(self):
1636 """Verify that the passed name is a valid one.
1639 hostname = utils.HostInfo(self.op.name)
1641 new_name = hostname.name
1642 self.ip = new_ip = hostname.ip
1643 old_name = self.cfg.GetClusterName()
1644 old_ip = self.cfg.GetMasterIP()
1645 if new_name == old_name and new_ip == old_ip:
1646 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1647 " cluster has changed")
1648 if new_ip != old_ip:
1649 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1650 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1651 " reachable on the network. Aborting." %
1654 self.op.name = new_name
1656 def Exec(self, feedback_fn):
1657 """Rename the cluster.
1660 clustername = self.op.name
1663 # shutdown the master IP
1664 master = self.cfg.GetMasterNode()
1665 result = self.rpc.call_node_stop_master(master, False)
1666 result.Raise("Could not disable the master role")
1669 cluster = self.cfg.GetClusterInfo()
1670 cluster.cluster_name = clustername
1671 cluster.master_ip = ip
1672 self.cfg.Update(cluster)
1674 # update the known hosts file
1675 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1676 node_list = self.cfg.GetNodeList()
1678 node_list.remove(master)
1681 result = self.rpc.call_upload_file(node_list,
1682 constants.SSH_KNOWN_HOSTS_FILE)
1683 for to_node, to_result in result.iteritems():
1684 msg = to_result.fail_msg
1686 msg = ("Copy of file %s to node %s failed: %s" %
1687 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1688 self.proc.LogWarning(msg)
1691 result = self.rpc.call_node_start_master(master, False, False)
1692 msg = result.fail_msg
1694 self.LogWarning("Could not re-enable the master role on"
1695 " the master, please restart manually: %s", msg)
1698 def _RecursiveCheckIfLVMBased(disk):
1699 """Check if the given disk or its children are lvm-based.
1701 @type disk: L{objects.Disk}
1702 @param disk: the disk to check
1704 @return: boolean indicating whether a LD_LV dev_type was found or not
1708 for chdisk in disk.children:
1709 if _RecursiveCheckIfLVMBased(chdisk):
1711 return disk.dev_type == constants.LD_LV
1714 class LUSetClusterParams(LogicalUnit):
1715 """Change the parameters of the cluster.
1718 HPATH = "cluster-modify"
1719 HTYPE = constants.HTYPE_CLUSTER
1723 def CheckArguments(self):
1727 if not hasattr(self.op, "candidate_pool_size"):
1728 self.op.candidate_pool_size = None
1729 if self.op.candidate_pool_size is not None:
1731 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1732 except (ValueError, TypeError), err:
1733 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1735 if self.op.candidate_pool_size < 1:
1736 raise errors.OpPrereqError("At least one master candidate needed")
1738 def ExpandNames(self):
1739 # FIXME: in the future maybe other cluster params won't require checking on
1740 # all nodes to be modified.
1741 self.needed_locks = {
1742 locking.LEVEL_NODE: locking.ALL_SET,
1744 self.share_locks[locking.LEVEL_NODE] = 1
1746 def BuildHooksEnv(self):
1751 "OP_TARGET": self.cfg.GetClusterName(),
1752 "NEW_VG_NAME": self.op.vg_name,
1754 mn = self.cfg.GetMasterNode()
1755 return env, [mn], [mn]
1757 def CheckPrereq(self):
1758 """Check prerequisites.
1760 This checks whether the given params don't conflict and
1761 if the given volume group is valid.
1764 if self.op.vg_name is not None and not self.op.vg_name:
1765 instances = self.cfg.GetAllInstancesInfo().values()
1766 for inst in instances:
1767 for disk in inst.disks:
1768 if _RecursiveCheckIfLVMBased(disk):
1769 raise errors.OpPrereqError("Cannot disable lvm storage while"
1770 " lvm-based instances exist")
1772 node_list = self.acquired_locks[locking.LEVEL_NODE]
1774 # if vg_name not None, checks given volume group on all nodes
1776 vglist = self.rpc.call_vg_list(node_list)
1777 for node in node_list:
1778 msg = vglist[node].fail_msg
1780 # ignoring down node
1781 self.LogWarning("Error while gathering data on node %s"
1782 " (ignoring node): %s", node, msg)
1784 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1786 constants.MIN_VG_SIZE)
1788 raise errors.OpPrereqError("Error on node '%s': %s" %
1791 self.cluster = cluster = self.cfg.GetClusterInfo()
1792 # validate params changes
1793 if self.op.beparams:
1794 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1795 self.new_beparams = objects.FillDict(
1796 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1798 if self.op.nicparams:
1799 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1800 self.new_nicparams = objects.FillDict(
1801 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1802 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1804 # hypervisor list/parameters
1805 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1806 if self.op.hvparams:
1807 if not isinstance(self.op.hvparams, dict):
1808 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1809 for hv_name, hv_dict in self.op.hvparams.items():
1810 if hv_name not in self.new_hvparams:
1811 self.new_hvparams[hv_name] = hv_dict
1813 self.new_hvparams[hv_name].update(hv_dict)
1815 if self.op.enabled_hypervisors is not None:
1816 self.hv_list = self.op.enabled_hypervisors
1817 if not self.hv_list:
1818 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1819 " least one member")
1820 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1822 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1824 utils.CommaJoin(invalid_hvs))
1826 self.hv_list = cluster.enabled_hypervisors
1828 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1829 # either the enabled list has changed, or the parameters have, validate
1830 for hv_name, hv_params in self.new_hvparams.items():
1831 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1832 (self.op.enabled_hypervisors and
1833 hv_name in self.op.enabled_hypervisors)):
1834 # either this is a new hypervisor, or its parameters have changed
1835 hv_class = hypervisor.GetHypervisor(hv_name)
1836 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1837 hv_class.CheckParameterSyntax(hv_params)
1838 _CheckHVParams(self, node_list, hv_name, hv_params)
1840 def Exec(self, feedback_fn):
1841 """Change the parameters of the cluster.
1844 if self.op.vg_name is not None:
1845 new_volume = self.op.vg_name
1848 if new_volume != self.cfg.GetVGName():
1849 self.cfg.SetVGName(new_volume)
1851 feedback_fn("Cluster LVM configuration already in desired"
1852 " state, not changing")
1853 if self.op.hvparams:
1854 self.cluster.hvparams = self.new_hvparams
1855 if self.op.enabled_hypervisors is not None:
1856 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1857 if self.op.beparams:
1858 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1859 if self.op.nicparams:
1860 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1862 if self.op.candidate_pool_size is not None:
1863 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1864 # we need to update the pool size here, otherwise the save will fail
1865 _AdjustCandidatePool(self)
1867 self.cfg.Update(self.cluster)
1870 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1871 """Distribute additional files which are part of the cluster configuration.
1873 ConfigWriter takes care of distributing the config and ssconf files, but
1874 there are more files which should be distributed to all nodes. This function
1875 makes sure those are copied.
1877 @param lu: calling logical unit
1878 @param additional_nodes: list of nodes not in the config to distribute to
1881 # 1. Gather target nodes
1882 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1883 dist_nodes = lu.cfg.GetNodeList()
1884 if additional_nodes is not None:
1885 dist_nodes.extend(additional_nodes)
1886 if myself.name in dist_nodes:
1887 dist_nodes.remove(myself.name)
1888 # 2. Gather files to distribute
1889 dist_files = set([constants.ETC_HOSTS,
1890 constants.SSH_KNOWN_HOSTS_FILE,
1891 constants.RAPI_CERT_FILE,
1892 constants.RAPI_USERS_FILE,
1893 constants.HMAC_CLUSTER_KEY,
1896 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1897 for hv_name in enabled_hypervisors:
1898 hv_class = hypervisor.GetHypervisor(hv_name)
1899 dist_files.update(hv_class.GetAncillaryFiles())
1901 # 3. Perform the files upload
1902 for fname in dist_files:
1903 if os.path.exists(fname):
1904 result = lu.rpc.call_upload_file(dist_nodes, fname)
1905 for to_node, to_result in result.items():
1906 msg = to_result.fail_msg
1908 msg = ("Copy of file %s to node %s failed: %s" %
1909 (fname, to_node, msg))
1910 lu.proc.LogWarning(msg)
1913 class LURedistributeConfig(NoHooksLU):
1914 """Force the redistribution of cluster configuration.
1916 This is a very simple LU.
1922 def ExpandNames(self):
1923 self.needed_locks = {
1924 locking.LEVEL_NODE: locking.ALL_SET,
1926 self.share_locks[locking.LEVEL_NODE] = 1
1928 def CheckPrereq(self):
1929 """Check prerequisites.
1933 def Exec(self, feedback_fn):
1934 """Redistribute the configuration.
1937 self.cfg.Update(self.cfg.GetClusterInfo())
1938 _RedistributeAncillaryFiles(self)
1941 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1942 """Sleep and poll for an instance's disk to sync.
1945 if not instance.disks:
1949 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1951 node = instance.primary_node
1953 for dev in instance.disks:
1954 lu.cfg.SetDiskID(dev, node)
1957 degr_retries = 10 # in seconds, as we sleep 1 second each time
1961 cumul_degraded = False
1962 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1963 msg = rstats.fail_msg
1965 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1968 raise errors.RemoteError("Can't contact node %s for mirror data,"
1969 " aborting." % node)
1972 rstats = rstats.payload
1974 for i, mstat in enumerate(rstats):
1976 lu.LogWarning("Can't compute data for node %s/%s",
1977 node, instance.disks[i].iv_name)
1980 cumul_degraded = (cumul_degraded or
1981 (mstat.is_degraded and mstat.sync_percent is None))
1982 if mstat.sync_percent is not None:
1984 if mstat.estimated_time is not None:
1985 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
1986 max_time = mstat.estimated_time
1988 rem_time = "no time estimate"
1989 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1990 (instance.disks[i].iv_name, mstat.sync_percent, rem_time))
1992 # if we're done but degraded, let's do a few small retries, to
1993 # make sure we see a stable and not transient situation; therefore
1994 # we force restart of the loop
1995 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1996 logging.info("Degraded disks found, %d retries left", degr_retries)
2004 time.sleep(min(60, max_time))
2007 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2008 return not cumul_degraded
2011 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2012 """Check that mirrors are not degraded.
2014 The ldisk parameter, if True, will change the test from the
2015 is_degraded attribute (which represents overall non-ok status for
2016 the device(s)) to the ldisk (representing the local storage status).
2019 lu.cfg.SetDiskID(dev, node)
2023 if on_primary or dev.AssembleOnSecondary():
2024 rstats = lu.rpc.call_blockdev_find(node, dev)
2025 msg = rstats.fail_msg
2027 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2029 elif not rstats.payload:
2030 lu.LogWarning("Can't find disk on node %s", node)
2034 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2036 result = result and not rstats.payload.is_degraded
2039 for child in dev.children:
2040 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2045 class LUDiagnoseOS(NoHooksLU):
2046 """Logical unit for OS diagnose/query.
2049 _OP_REQP = ["output_fields", "names"]
2051 _FIELDS_STATIC = utils.FieldSet()
2052 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2054 def ExpandNames(self):
2056 raise errors.OpPrereqError("Selective OS query not supported")
2058 _CheckOutputFields(static=self._FIELDS_STATIC,
2059 dynamic=self._FIELDS_DYNAMIC,
2060 selected=self.op.output_fields)
2062 # Lock all nodes, in shared mode
2063 # Temporary removal of locks, should be reverted later
2064 # TODO: reintroduce locks when they are lighter-weight
2065 self.needed_locks = {}
2066 #self.share_locks[locking.LEVEL_NODE] = 1
2067 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2069 def CheckPrereq(self):
2070 """Check prerequisites.
2075 def _DiagnoseByOS(node_list, rlist):
2076 """Remaps a per-node return list into an a per-os per-node dictionary
2078 @param node_list: a list with the names of all nodes
2079 @param rlist: a map with node names as keys and OS objects as values
2082 @return: a dictionary with osnames as keys and as value another map, with
2083 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2085 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2086 (/srv/..., False, "invalid api")],
2087 "node2": [(/srv/..., True, "")]}
2092 # we build here the list of nodes that didn't fail the RPC (at RPC
2093 # level), so that nodes with a non-responding node daemon don't
2094 # make all OSes invalid
2095 good_nodes = [node_name for node_name in rlist
2096 if not rlist[node_name].fail_msg]
2097 for node_name, nr in rlist.items():
2098 if nr.fail_msg or not nr.payload:
2100 for name, path, status, diagnose in nr.payload:
2101 if name not in all_os:
2102 # build a list of nodes for this os containing empty lists
2103 # for each node in node_list
2105 for nname in good_nodes:
2106 all_os[name][nname] = []
2107 all_os[name][node_name].append((path, status, diagnose))
2110 def Exec(self, feedback_fn):
2111 """Compute the list of OSes.
2114 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2115 node_data = self.rpc.call_os_diagnose(valid_nodes)
2116 pol = self._DiagnoseByOS(valid_nodes, node_data)
2118 for os_name, os_data in pol.items():
2120 for field in self.op.output_fields:
2123 elif field == "valid":
2124 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2125 elif field == "node_status":
2126 # this is just a copy of the dict
2128 for node_name, nos_list in os_data.items():
2129 val[node_name] = nos_list
2131 raise errors.ParameterError(field)
2138 class LURemoveNode(LogicalUnit):
2139 """Logical unit for removing a node.
2142 HPATH = "node-remove"
2143 HTYPE = constants.HTYPE_NODE
2144 _OP_REQP = ["node_name"]
2146 def BuildHooksEnv(self):
2149 This doesn't run on the target node in the pre phase as a failed
2150 node would then be impossible to remove.
2154 "OP_TARGET": self.op.node_name,
2155 "NODE_NAME": self.op.node_name,
2157 all_nodes = self.cfg.GetNodeList()
2158 all_nodes.remove(self.op.node_name)
2159 return env, all_nodes, all_nodes
2161 def CheckPrereq(self):
2162 """Check prerequisites.
2165 - the node exists in the configuration
2166 - it does not have primary or secondary instances
2167 - it's not the master
2169 Any errors are signaled by raising errors.OpPrereqError.
2172 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2174 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2176 instance_list = self.cfg.GetInstanceList()
2178 masternode = self.cfg.GetMasterNode()
2179 if node.name == masternode:
2180 raise errors.OpPrereqError("Node is the master node,"
2181 " you need to failover first.")
2183 for instance_name in instance_list:
2184 instance = self.cfg.GetInstanceInfo(instance_name)
2185 if node.name in instance.all_nodes:
2186 raise errors.OpPrereqError("Instance %s is still running on the node,"
2187 " please remove first." % instance_name)
2188 self.op.node_name = node.name
2191 def Exec(self, feedback_fn):
2192 """Removes the node from the cluster.
2196 logging.info("Stopping the node daemon and removing configs from node %s",
2199 self.context.RemoveNode(node.name)
2201 result = self.rpc.call_node_leave_cluster(node.name)
2202 msg = result.fail_msg
2204 self.LogWarning("Errors encountered on the remote node while leaving"
2205 " the cluster: %s", msg)
2207 # Promote nodes to master candidate as needed
2208 _AdjustCandidatePool(self)
2211 class LUQueryNodes(NoHooksLU):
2212 """Logical unit for querying nodes.
2215 _OP_REQP = ["output_fields", "names", "use_locking"]
2217 _FIELDS_DYNAMIC = utils.FieldSet(
2219 "mtotal", "mnode", "mfree",
2221 "ctotal", "cnodes", "csockets",
2224 _FIELDS_STATIC = utils.FieldSet(
2225 "name", "pinst_cnt", "sinst_cnt",
2226 "pinst_list", "sinst_list",
2227 "pip", "sip", "tags",
2228 "serial_no", "ctime", "mtime",
2236 def ExpandNames(self):
2237 _CheckOutputFields(static=self._FIELDS_STATIC,
2238 dynamic=self._FIELDS_DYNAMIC,
2239 selected=self.op.output_fields)
2241 self.needed_locks = {}
2242 self.share_locks[locking.LEVEL_NODE] = 1
2245 self.wanted = _GetWantedNodes(self, self.op.names)
2247 self.wanted = locking.ALL_SET
2249 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2250 self.do_locking = self.do_node_query and self.op.use_locking
2252 # if we don't request only static fields, we need to lock the nodes
2253 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2256 def CheckPrereq(self):
2257 """Check prerequisites.
2260 # The validation of the node list is done in the _GetWantedNodes,
2261 # if non empty, and if empty, there's no validation to do
2264 def Exec(self, feedback_fn):
2265 """Computes the list of nodes and their attributes.
2268 all_info = self.cfg.GetAllNodesInfo()
2270 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2271 elif self.wanted != locking.ALL_SET:
2272 nodenames = self.wanted
2273 missing = set(nodenames).difference(all_info.keys())
2275 raise errors.OpExecError(
2276 "Some nodes were removed before retrieving their data: %s" % missing)
2278 nodenames = all_info.keys()
2280 nodenames = utils.NiceSort(nodenames)
2281 nodelist = [all_info[name] for name in nodenames]
2283 # begin data gathering
2285 if self.do_node_query:
2287 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2288 self.cfg.GetHypervisorType())
2289 for name in nodenames:
2290 nodeinfo = node_data[name]
2291 if not nodeinfo.fail_msg and nodeinfo.payload:
2292 nodeinfo = nodeinfo.payload
2293 fn = utils.TryConvert
2295 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2296 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2297 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2298 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2299 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2300 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2301 "bootid": nodeinfo.get('bootid', None),
2302 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2303 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2306 live_data[name] = {}
2308 live_data = dict.fromkeys(nodenames, {})
2310 node_to_primary = dict([(name, set()) for name in nodenames])
2311 node_to_secondary = dict([(name, set()) for name in nodenames])
2313 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2314 "sinst_cnt", "sinst_list"))
2315 if inst_fields & frozenset(self.op.output_fields):
2316 instancelist = self.cfg.GetInstanceList()
2318 for instance_name in instancelist:
2319 inst = self.cfg.GetInstanceInfo(instance_name)
2320 if inst.primary_node in node_to_primary:
2321 node_to_primary[inst.primary_node].add(inst.name)
2322 for secnode in inst.secondary_nodes:
2323 if secnode in node_to_secondary:
2324 node_to_secondary[secnode].add(inst.name)
2326 master_node = self.cfg.GetMasterNode()
2328 # end data gathering
2331 for node in nodelist:
2333 for field in self.op.output_fields:
2336 elif field == "pinst_list":
2337 val = list(node_to_primary[node.name])
2338 elif field == "sinst_list":
2339 val = list(node_to_secondary[node.name])
2340 elif field == "pinst_cnt":
2341 val = len(node_to_primary[node.name])
2342 elif field == "sinst_cnt":
2343 val = len(node_to_secondary[node.name])
2344 elif field == "pip":
2345 val = node.primary_ip
2346 elif field == "sip":
2347 val = node.secondary_ip
2348 elif field == "tags":
2349 val = list(node.GetTags())
2350 elif field == "serial_no":
2351 val = node.serial_no
2352 elif field == "ctime":
2354 elif field == "mtime":
2356 elif field == "master_candidate":
2357 val = node.master_candidate
2358 elif field == "master":
2359 val = node.name == master_node
2360 elif field == "offline":
2362 elif field == "drained":
2364 elif self._FIELDS_DYNAMIC.Matches(field):
2365 val = live_data[node.name].get(field, None)
2366 elif field == "role":
2367 if node.name == master_node:
2369 elif node.master_candidate:
2378 raise errors.ParameterError(field)
2379 node_output.append(val)
2380 output.append(node_output)
2385 class LUQueryNodeVolumes(NoHooksLU):
2386 """Logical unit for getting volumes on node(s).
2389 _OP_REQP = ["nodes", "output_fields"]
2391 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2392 _FIELDS_STATIC = utils.FieldSet("node")
2394 def ExpandNames(self):
2395 _CheckOutputFields(static=self._FIELDS_STATIC,
2396 dynamic=self._FIELDS_DYNAMIC,
2397 selected=self.op.output_fields)
2399 self.needed_locks = {}
2400 self.share_locks[locking.LEVEL_NODE] = 1
2401 if not self.op.nodes:
2402 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2404 self.needed_locks[locking.LEVEL_NODE] = \
2405 _GetWantedNodes(self, self.op.nodes)
2407 def CheckPrereq(self):
2408 """Check prerequisites.
2410 This checks that the fields required are valid output fields.
2413 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2415 def Exec(self, feedback_fn):
2416 """Computes the list of nodes and their attributes.
2419 nodenames = self.nodes
2420 volumes = self.rpc.call_node_volumes(nodenames)
2422 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2423 in self.cfg.GetInstanceList()]
2425 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2428 for node in nodenames:
2429 nresult = volumes[node]
2432 msg = nresult.fail_msg
2434 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2437 node_vols = nresult.payload[:]
2438 node_vols.sort(key=lambda vol: vol['dev'])
2440 for vol in node_vols:
2442 for field in self.op.output_fields:
2445 elif field == "phys":
2449 elif field == "name":
2451 elif field == "size":
2452 val = int(float(vol['size']))
2453 elif field == "instance":
2455 if node not in lv_by_node[inst]:
2457 if vol['name'] in lv_by_node[inst][node]:
2463 raise errors.ParameterError(field)
2464 node_output.append(str(val))
2466 output.append(node_output)
2471 class LUQueryNodeStorage(NoHooksLU):
2472 """Logical unit for getting information on storage units on node(s).
2475 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2477 _FIELDS_STATIC = utils.FieldSet("node")
2479 def ExpandNames(self):
2480 storage_type = self.op.storage_type
2482 if storage_type not in constants.VALID_STORAGE_FIELDS:
2483 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2485 dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2487 _CheckOutputFields(static=self._FIELDS_STATIC,
2488 dynamic=utils.FieldSet(*dynamic_fields),
2489 selected=self.op.output_fields)
2491 self.needed_locks = {}
2492 self.share_locks[locking.LEVEL_NODE] = 1
2495 self.needed_locks[locking.LEVEL_NODE] = \
2496 _GetWantedNodes(self, self.op.nodes)
2498 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2500 def CheckPrereq(self):
2501 """Check prerequisites.
2503 This checks that the fields required are valid output fields.
2506 self.op.name = getattr(self.op, "name", None)
2508 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2510 def Exec(self, feedback_fn):
2511 """Computes the list of nodes and their attributes.
2514 # Always get name to sort by
2515 if constants.SF_NAME in self.op.output_fields:
2516 fields = self.op.output_fields[:]
2518 fields = [constants.SF_NAME] + self.op.output_fields
2520 # Never ask for node as it's only known to the LU
2521 while "node" in fields:
2522 fields.remove("node")
2524 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2525 name_idx = field_idx[constants.SF_NAME]
2527 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2528 data = self.rpc.call_storage_list(self.nodes,
2529 self.op.storage_type, st_args,
2530 self.op.name, fields)
2534 for node in utils.NiceSort(self.nodes):
2535 nresult = data[node]
2539 msg = nresult.fail_msg
2541 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2544 rows = dict([(row[name_idx], row) for row in nresult.payload])
2546 for name in utils.NiceSort(rows.keys()):
2551 for field in self.op.output_fields:
2554 elif field in field_idx:
2555 val = row[field_idx[field]]
2557 raise errors.ParameterError(field)
2566 class LUModifyNodeStorage(NoHooksLU):
2567 """Logical unit for modifying a storage volume on a node.
2570 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2573 def CheckArguments(self):
2574 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2575 if node_name is None:
2576 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2578 self.op.node_name = node_name
2580 storage_type = self.op.storage_type
2581 if storage_type not in constants.VALID_STORAGE_FIELDS:
2582 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2584 def ExpandNames(self):
2585 self.needed_locks = {
2586 locking.LEVEL_NODE: self.op.node_name,
2589 def CheckPrereq(self):
2590 """Check prerequisites.
2593 storage_type = self.op.storage_type
2596 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2598 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2599 " modified" % storage_type)
2601 diff = set(self.op.changes.keys()) - modifiable
2603 raise errors.OpPrereqError("The following fields can not be modified for"
2604 " storage units of type '%s': %r" %
2605 (storage_type, list(diff)))
2607 def Exec(self, feedback_fn):
2608 """Computes the list of nodes and their attributes.
2611 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2612 result = self.rpc.call_storage_modify(self.op.node_name,
2613 self.op.storage_type, st_args,
2614 self.op.name, self.op.changes)
2615 result.Raise("Failed to modify storage unit '%s' on %s" %
2616 (self.op.name, self.op.node_name))
2619 class LUAddNode(LogicalUnit):
2620 """Logical unit for adding node to the cluster.
2624 HTYPE = constants.HTYPE_NODE
2625 _OP_REQP = ["node_name"]
2627 def BuildHooksEnv(self):
2630 This will run on all nodes before, and on all nodes + the new node after.
2634 "OP_TARGET": self.op.node_name,
2635 "NODE_NAME": self.op.node_name,
2636 "NODE_PIP": self.op.primary_ip,
2637 "NODE_SIP": self.op.secondary_ip,
2639 nodes_0 = self.cfg.GetNodeList()
2640 nodes_1 = nodes_0 + [self.op.node_name, ]
2641 return env, nodes_0, nodes_1
2643 def CheckPrereq(self):
2644 """Check prerequisites.
2647 - the new node is not already in the config
2649 - its parameters (single/dual homed) matches the cluster
2651 Any errors are signaled by raising errors.OpPrereqError.
2654 node_name = self.op.node_name
2657 dns_data = utils.HostInfo(node_name)
2659 node = dns_data.name
2660 primary_ip = self.op.primary_ip = dns_data.ip
2661 secondary_ip = getattr(self.op, "secondary_ip", None)
2662 if secondary_ip is None:
2663 secondary_ip = primary_ip
2664 if not utils.IsValidIP(secondary_ip):
2665 raise errors.OpPrereqError("Invalid secondary IP given")
2666 self.op.secondary_ip = secondary_ip
2668 node_list = cfg.GetNodeList()
2669 if not self.op.readd and node in node_list:
2670 raise errors.OpPrereqError("Node %s is already in the configuration" %
2672 elif self.op.readd and node not in node_list:
2673 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2675 for existing_node_name in node_list:
2676 existing_node = cfg.GetNodeInfo(existing_node_name)
2678 if self.op.readd and node == existing_node_name:
2679 if (existing_node.primary_ip != primary_ip or
2680 existing_node.secondary_ip != secondary_ip):
2681 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2682 " address configuration as before")
2685 if (existing_node.primary_ip == primary_ip or
2686 existing_node.secondary_ip == primary_ip or
2687 existing_node.primary_ip == secondary_ip or
2688 existing_node.secondary_ip == secondary_ip):
2689 raise errors.OpPrereqError("New node ip address(es) conflict with"
2690 " existing node %s" % existing_node.name)
2692 # check that the type of the node (single versus dual homed) is the
2693 # same as for the master
2694 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2695 master_singlehomed = myself.secondary_ip == myself.primary_ip
2696 newbie_singlehomed = secondary_ip == primary_ip
2697 if master_singlehomed != newbie_singlehomed:
2698 if master_singlehomed:
2699 raise errors.OpPrereqError("The master has no private ip but the"
2700 " new node has one")
2702 raise errors.OpPrereqError("The master has a private ip but the"
2703 " new node doesn't have one")
2705 # checks reachability
2706 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2707 raise errors.OpPrereqError("Node not reachable by ping")
2709 if not newbie_singlehomed:
2710 # check reachability from my secondary ip to newbie's secondary ip
2711 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2712 source=myself.secondary_ip):
2713 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2714 " based ping to noded port")
2716 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2721 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2722 # the new node will increase mc_max with one, so:
2723 mc_max = min(mc_max + 1, cp_size)
2724 self.master_candidate = mc_now < mc_max
2727 self.new_node = self.cfg.GetNodeInfo(node)
2728 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2730 self.new_node = objects.Node(name=node,
2731 primary_ip=primary_ip,
2732 secondary_ip=secondary_ip,
2733 master_candidate=self.master_candidate,
2734 offline=False, drained=False)
2736 def Exec(self, feedback_fn):
2737 """Adds the new node to the cluster.
2740 new_node = self.new_node
2741 node = new_node.name
2743 # for re-adds, reset the offline/drained/master-candidate flags;
2744 # we need to reset here, otherwise offline would prevent RPC calls
2745 # later in the procedure; this also means that if the re-add
2746 # fails, we are left with a non-offlined, broken node
2748 new_node.drained = new_node.offline = False
2749 self.LogInfo("Readding a node, the offline/drained flags were reset")
2750 # if we demote the node, we do cleanup later in the procedure
2751 new_node.master_candidate = self.master_candidate
2753 # notify the user about any possible mc promotion
2754 if new_node.master_candidate:
2755 self.LogInfo("Node will be a master candidate")
2757 # check connectivity
2758 result = self.rpc.call_version([node])[node]
2759 result.Raise("Can't get version information from node %s" % node)
2760 if constants.PROTOCOL_VERSION == result.payload:
2761 logging.info("Communication to node %s fine, sw version %s match",
2762 node, result.payload)
2764 raise errors.OpExecError("Version mismatch master version %s,"
2765 " node version %s" %
2766 (constants.PROTOCOL_VERSION, result.payload))
2769 logging.info("Copy ssh key to node %s", node)
2770 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2772 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2773 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2779 keyarray.append(f.read())
2783 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2785 keyarray[3], keyarray[4], keyarray[5])
2786 result.Raise("Cannot transfer ssh keys to the new node")
2788 # Add node to our /etc/hosts, and add key to known_hosts
2789 if self.cfg.GetClusterInfo().modify_etc_hosts:
2790 utils.AddHostToEtcHosts(new_node.name)
2792 if new_node.secondary_ip != new_node.primary_ip:
2793 result = self.rpc.call_node_has_ip_address(new_node.name,
2794 new_node.secondary_ip)
2795 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2797 if not result.payload:
2798 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2799 " you gave (%s). Please fix and re-run this"
2800 " command." % new_node.secondary_ip)
2802 node_verify_list = [self.cfg.GetMasterNode()]
2803 node_verify_param = {
2805 # TODO: do a node-net-test as well?
2808 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2809 self.cfg.GetClusterName())
2810 for verifier in node_verify_list:
2811 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2812 nl_payload = result[verifier].payload['nodelist']
2814 for failed in nl_payload:
2815 feedback_fn("ssh/hostname verification failed %s -> %s" %
2816 (verifier, nl_payload[failed]))
2817 raise errors.OpExecError("ssh/hostname verification failed.")
2820 _RedistributeAncillaryFiles(self)
2821 self.context.ReaddNode(new_node)
2822 # make sure we redistribute the config
2823 self.cfg.Update(new_node)
2824 # and make sure the new node will not have old files around
2825 if not new_node.master_candidate:
2826 result = self.rpc.call_node_demote_from_mc(new_node.name)
2827 msg = result.RemoteFailMsg()
2829 self.LogWarning("Node failed to demote itself from master"
2830 " candidate status: %s" % msg)
2832 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2833 self.context.AddNode(new_node)
2836 class LUSetNodeParams(LogicalUnit):
2837 """Modifies the parameters of a node.
2840 HPATH = "node-modify"
2841 HTYPE = constants.HTYPE_NODE
2842 _OP_REQP = ["node_name"]
2845 def CheckArguments(self):
2846 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2847 if node_name is None:
2848 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2849 self.op.node_name = node_name
2850 _CheckBooleanOpField(self.op, 'master_candidate')
2851 _CheckBooleanOpField(self.op, 'offline')
2852 _CheckBooleanOpField(self.op, 'drained')
2853 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2854 if all_mods.count(None) == 3:
2855 raise errors.OpPrereqError("Please pass at least one modification")
2856 if all_mods.count(True) > 1:
2857 raise errors.OpPrereqError("Can't set the node into more than one"
2858 " state at the same time")
2860 def ExpandNames(self):
2861 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2863 def BuildHooksEnv(self):
2866 This runs on the master node.
2870 "OP_TARGET": self.op.node_name,
2871 "MASTER_CANDIDATE": str(self.op.master_candidate),
2872 "OFFLINE": str(self.op.offline),
2873 "DRAINED": str(self.op.drained),
2875 nl = [self.cfg.GetMasterNode(),
2879 def CheckPrereq(self):
2880 """Check prerequisites.
2882 This only checks the instance list against the existing names.
2885 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2887 if ((self.op.master_candidate == False or self.op.offline == True or
2888 self.op.drained == True) and node.master_candidate):
2889 # we will demote the node from master_candidate
2890 if self.op.node_name == self.cfg.GetMasterNode():
2891 raise errors.OpPrereqError("The master node has to be a"
2892 " master candidate, online and not drained")
2893 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2894 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2895 if num_candidates <= cp_size:
2896 msg = ("Not enough master candidates (desired"
2897 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2899 self.LogWarning(msg)
2901 raise errors.OpPrereqError(msg)
2903 if (self.op.master_candidate == True and
2904 ((node.offline and not self.op.offline == False) or
2905 (node.drained and not self.op.drained == False))):
2906 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2907 " to master_candidate" % node.name)
2911 def Exec(self, feedback_fn):
2920 if self.op.offline is not None:
2921 node.offline = self.op.offline
2922 result.append(("offline", str(self.op.offline)))
2923 if self.op.offline == True:
2924 if node.master_candidate:
2925 node.master_candidate = False
2927 result.append(("master_candidate", "auto-demotion due to offline"))
2929 node.drained = False
2930 result.append(("drained", "clear drained status due to offline"))
2932 if self.op.master_candidate is not None:
2933 node.master_candidate = self.op.master_candidate
2935 result.append(("master_candidate", str(self.op.master_candidate)))
2936 if self.op.master_candidate == False:
2937 rrc = self.rpc.call_node_demote_from_mc(node.name)
2940 self.LogWarning("Node failed to demote itself: %s" % msg)
2942 if self.op.drained is not None:
2943 node.drained = self.op.drained
2944 result.append(("drained", str(self.op.drained)))
2945 if self.op.drained == True:
2946 if node.master_candidate:
2947 node.master_candidate = False
2949 result.append(("master_candidate", "auto-demotion due to drain"))
2950 rrc = self.rpc.call_node_demote_from_mc(node.name)
2951 msg = rrc.RemoteFailMsg()
2953 self.LogWarning("Node failed to demote itself: %s" % msg)
2955 node.offline = False
2956 result.append(("offline", "clear offline status due to drain"))
2958 # this will trigger configuration file update, if needed
2959 self.cfg.Update(node)
2960 # this will trigger job queue propagation or cleanup
2962 self.context.ReaddNode(node)
2967 class LUPowercycleNode(NoHooksLU):
2968 """Powercycles a node.
2971 _OP_REQP = ["node_name", "force"]
2974 def CheckArguments(self):
2975 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2976 if node_name is None:
2977 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2978 self.op.node_name = node_name
2979 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2980 raise errors.OpPrereqError("The node is the master and the force"
2981 " parameter was not set")
2983 def ExpandNames(self):
2984 """Locking for PowercycleNode.
2986 This is a last-resort option and shouldn't block on other
2987 jobs. Therefore, we grab no locks.
2990 self.needed_locks = {}
2992 def CheckPrereq(self):
2993 """Check prerequisites.
2995 This LU has no prereqs.
3000 def Exec(self, feedback_fn):
3004 result = self.rpc.call_node_powercycle(self.op.node_name,
3005 self.cfg.GetHypervisorType())
3006 result.Raise("Failed to schedule the reboot")
3007 return result.payload
3010 class LUQueryClusterInfo(NoHooksLU):
3011 """Query cluster configuration.
3017 def ExpandNames(self):
3018 self.needed_locks = {}
3020 def CheckPrereq(self):
3021 """No prerequsites needed for this LU.
3026 def Exec(self, feedback_fn):
3027 """Return cluster config.
3030 cluster = self.cfg.GetClusterInfo()
3032 "software_version": constants.RELEASE_VERSION,
3033 "protocol_version": constants.PROTOCOL_VERSION,
3034 "config_version": constants.CONFIG_VERSION,
3035 "os_api_version": max(constants.OS_API_VERSIONS),
3036 "export_version": constants.EXPORT_VERSION,
3037 "architecture": (platform.architecture()[0], platform.machine()),
3038 "name": cluster.cluster_name,
3039 "master": cluster.master_node,
3040 "default_hypervisor": cluster.enabled_hypervisors[0],
3041 "enabled_hypervisors": cluster.enabled_hypervisors,
3042 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3043 for hypervisor_name in cluster.enabled_hypervisors]),
3044 "beparams": cluster.beparams,
3045 "nicparams": cluster.nicparams,
3046 "candidate_pool_size": cluster.candidate_pool_size,
3047 "master_netdev": cluster.master_netdev,
3048 "volume_group_name": cluster.volume_group_name,
3049 "file_storage_dir": cluster.file_storage_dir,
3050 "ctime": cluster.ctime,
3051 "mtime": cluster.mtime,
3057 class LUQueryConfigValues(NoHooksLU):
3058 """Return configuration values.
3063 _FIELDS_DYNAMIC = utils.FieldSet()
3064 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
3066 def ExpandNames(self):
3067 self.needed_locks = {}
3069 _CheckOutputFields(static=self._FIELDS_STATIC,
3070 dynamic=self._FIELDS_DYNAMIC,
3071 selected=self.op.output_fields)
3073 def CheckPrereq(self):
3074 """No prerequisites.
3079 def Exec(self, feedback_fn):
3080 """Dump a representation of the cluster config to the standard output.
3084 for field in self.op.output_fields:
3085 if field == "cluster_name":
3086 entry = self.cfg.GetClusterName()
3087 elif field == "master_node":
3088 entry = self.cfg.GetMasterNode()
3089 elif field == "drain_flag":
3090 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3092 raise errors.ParameterError(field)
3093 values.append(entry)
3097 class LUActivateInstanceDisks(NoHooksLU):
3098 """Bring up an instance's disks.
3101 _OP_REQP = ["instance_name"]
3104 def ExpandNames(self):
3105 self._ExpandAndLockInstance()
3106 self.needed_locks[locking.LEVEL_NODE] = []
3107 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3109 def DeclareLocks(self, level):
3110 if level == locking.LEVEL_NODE:
3111 self._LockInstancesNodes()
3113 def CheckPrereq(self):
3114 """Check prerequisites.
3116 This checks that the instance is in the cluster.
3119 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3120 assert self.instance is not None, \
3121 "Cannot retrieve locked instance %s" % self.op.instance_name
3122 _CheckNodeOnline(self, self.instance.primary_node)
3123 if not hasattr(self.op, "ignore_size"):
3124 self.op.ignore_size = False
3126 def Exec(self, feedback_fn):
3127 """Activate the disks.
3130 disks_ok, disks_info = \
3131 _AssembleInstanceDisks(self, self.instance,
3132 ignore_size=self.op.ignore_size)
3134 raise errors.OpExecError("Cannot activate block devices")
3139 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3141 """Prepare the block devices for an instance.
3143 This sets up the block devices on all nodes.
3145 @type lu: L{LogicalUnit}
3146 @param lu: the logical unit on whose behalf we execute
3147 @type instance: L{objects.Instance}
3148 @param instance: the instance for whose disks we assemble
3149 @type ignore_secondaries: boolean
3150 @param ignore_secondaries: if true, errors on secondary nodes
3151 won't result in an error return from the function
3152 @type ignore_size: boolean
3153 @param ignore_size: if true, the current known size of the disk
3154 will not be used during the disk activation, useful for cases
3155 when the size is wrong
3156 @return: False if the operation failed, otherwise a list of
3157 (host, instance_visible_name, node_visible_name)
3158 with the mapping from node devices to instance devices
3163 iname = instance.name
3164 # With the two passes mechanism we try to reduce the window of
3165 # opportunity for the race condition of switching DRBD to primary
3166 # before handshaking occured, but we do not eliminate it
3168 # The proper fix would be to wait (with some limits) until the
3169 # connection has been made and drbd transitions from WFConnection
3170 # into any other network-connected state (Connected, SyncTarget,
3173 # 1st pass, assemble on all nodes in secondary mode
3174 for inst_disk in instance.disks:
3175 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3177 node_disk = node_disk.Copy()
3178 node_disk.UnsetSize()
3179 lu.cfg.SetDiskID(node_disk, node)
3180 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3181 msg = result.fail_msg
3183 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3184 " (is_primary=False, pass=1): %s",
3185 inst_disk.iv_name, node, msg)
3186 if not ignore_secondaries:
3189 # FIXME: race condition on drbd migration to primary
3191 # 2nd pass, do only the primary node
3192 for inst_disk in instance.disks:
3193 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3194 if node != instance.primary_node:
3197 node_disk = node_disk.Copy()
3198 node_disk.UnsetSize()
3199 lu.cfg.SetDiskID(node_disk, node)
3200 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3201 msg = result.fail_msg
3203 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3204 " (is_primary=True, pass=2): %s",
3205 inst_disk.iv_name, node, msg)
3207 device_info.append((instance.primary_node, inst_disk.iv_name,
3210 # leave the disks configured for the primary node
3211 # this is a workaround that would be fixed better by
3212 # improving the logical/physical id handling
3213 for disk in instance.disks:
3214 lu.cfg.SetDiskID(disk, instance.primary_node)
3216 return disks_ok, device_info
3219 def _StartInstanceDisks(lu, instance, force):
3220 """Start the disks of an instance.
3223 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3224 ignore_secondaries=force)
3226 _ShutdownInstanceDisks(lu, instance)
3227 if force is not None and not force:
3228 lu.proc.LogWarning("", hint="If the message above refers to a"
3230 " you can retry the operation using '--force'.")
3231 raise errors.OpExecError("Disk consistency error")
3234 class LUDeactivateInstanceDisks(NoHooksLU):
3235 """Shutdown an instance's disks.
3238 _OP_REQP = ["instance_name"]
3241 def ExpandNames(self):
3242 self._ExpandAndLockInstance()
3243 self.needed_locks[locking.LEVEL_NODE] = []
3244 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3246 def DeclareLocks(self, level):
3247 if level == locking.LEVEL_NODE:
3248 self._LockInstancesNodes()
3250 def CheckPrereq(self):
3251 """Check prerequisites.
3253 This checks that the instance is in the cluster.
3256 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3257 assert self.instance is not None, \
3258 "Cannot retrieve locked instance %s" % self.op.instance_name
3260 def Exec(self, feedback_fn):
3261 """Deactivate the disks
3264 instance = self.instance
3265 _SafeShutdownInstanceDisks(self, instance)
3268 def _SafeShutdownInstanceDisks(lu, instance):
3269 """Shutdown block devices of an instance.
3271 This function checks if an instance is running, before calling
3272 _ShutdownInstanceDisks.
3275 pnode = instance.primary_node
3276 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3277 ins_l.Raise("Can't contact node %s" % pnode)
3279 if instance.name in ins_l.payload:
3280 raise errors.OpExecError("Instance is running, can't shutdown"
3283 _ShutdownInstanceDisks(lu, instance)
3286 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3287 """Shutdown block devices of an instance.
3289 This does the shutdown on all nodes of the instance.
3291 If the ignore_primary is false, errors on the primary node are
3296 for disk in instance.disks:
3297 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3298 lu.cfg.SetDiskID(top_disk, node)
3299 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3300 msg = result.fail_msg
3302 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3303 disk.iv_name, node, msg)
3304 if not ignore_primary or node != instance.primary_node:
3309 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3310 """Checks if a node has enough free memory.
3312 This function check if a given node has the needed amount of free
3313 memory. In case the node has less memory or we cannot get the
3314 information from the node, this function raise an OpPrereqError
3317 @type lu: C{LogicalUnit}
3318 @param lu: a logical unit from which we get configuration data
3320 @param node: the node to check
3321 @type reason: C{str}
3322 @param reason: string to use in the error message
3323 @type requested: C{int}
3324 @param requested: the amount of memory in MiB to check for
3325 @type hypervisor_name: C{str}
3326 @param hypervisor_name: the hypervisor to ask for memory stats
3327 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3328 we cannot check the node
3331 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3332 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3333 free_mem = nodeinfo[node].payload.get('memory_free', None)
3334 if not isinstance(free_mem, int):
3335 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3336 " was '%s'" % (node, free_mem))
3337 if requested > free_mem:
3338 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3339 " needed %s MiB, available %s MiB" %
3340 (node, reason, requested, free_mem))
3343 class LUStartupInstance(LogicalUnit):
3344 """Starts an instance.
3347 HPATH = "instance-start"
3348 HTYPE = constants.HTYPE_INSTANCE
3349 _OP_REQP = ["instance_name", "force"]
3352 def ExpandNames(self):
3353 self._ExpandAndLockInstance()
3355 def BuildHooksEnv(self):
3358 This runs on master, primary and secondary nodes of the instance.
3362 "FORCE": self.op.force,
3364 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3365 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3368 def CheckPrereq(self):
3369 """Check prerequisites.
3371 This checks that the instance is in the cluster.
3374 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3375 assert self.instance is not None, \
3376 "Cannot retrieve locked instance %s" % self.op.instance_name
3379 self.beparams = getattr(self.op, "beparams", {})
3381 if not isinstance(self.beparams, dict):
3382 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3383 " dict" % (type(self.beparams), ))
3384 # fill the beparams dict
3385 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3386 self.op.beparams = self.beparams
3389 self.hvparams = getattr(self.op, "hvparams", {})
3391 if not isinstance(self.hvparams, dict):
3392 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3393 " dict" % (type(self.hvparams), ))
3395 # check hypervisor parameter syntax (locally)
3396 cluster = self.cfg.GetClusterInfo()
3397 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3398 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3400 filled_hvp.update(self.hvparams)
3401 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3402 hv_type.CheckParameterSyntax(filled_hvp)
3403 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3404 self.op.hvparams = self.hvparams
3406 _CheckNodeOnline(self, instance.primary_node)
3408 bep = self.cfg.GetClusterInfo().FillBE(instance)
3409 # check bridges existence
3410 _CheckInstanceBridgesExist(self, instance)
3412 remote_info = self.rpc.call_instance_info(instance.primary_node,
3414 instance.hypervisor)
3415 remote_info.Raise("Error checking node %s" % instance.primary_node,
3417 if not remote_info.payload: # not running already
3418 _CheckNodeFreeMemory(self, instance.primary_node,
3419 "starting instance %s" % instance.name,
3420 bep[constants.BE_MEMORY], instance.hypervisor)
3422 def Exec(self, feedback_fn):
3423 """Start the instance.
3426 instance = self.instance
3427 force = self.op.force
3429 self.cfg.MarkInstanceUp(instance.name)
3431 node_current = instance.primary_node
3433 _StartInstanceDisks(self, instance, force)
3435 result = self.rpc.call_instance_start(node_current, instance,
3436 self.hvparams, self.beparams)
3437 msg = result.fail_msg
3439 _ShutdownInstanceDisks(self, instance)
3440 raise errors.OpExecError("Could not start instance: %s" % msg)
3443 class LURebootInstance(LogicalUnit):
3444 """Reboot an instance.
3447 HPATH = "instance-reboot"
3448 HTYPE = constants.HTYPE_INSTANCE
3449 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3452 def ExpandNames(self):
3453 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3454 constants.INSTANCE_REBOOT_HARD,
3455 constants.INSTANCE_REBOOT_FULL]:
3456 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3457 (constants.INSTANCE_REBOOT_SOFT,
3458 constants.INSTANCE_REBOOT_HARD,
3459 constants.INSTANCE_REBOOT_FULL))
3460 self._ExpandAndLockInstance()
3462 def BuildHooksEnv(self):
3465 This runs on master, primary and secondary nodes of the instance.
3469 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3470 "REBOOT_TYPE": self.op.reboot_type,
3472 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3473 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3476 def CheckPrereq(self):
3477 """Check prerequisites.
3479 This checks that the instance is in the cluster.
3482 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3483 assert self.instance is not None, \
3484 "Cannot retrieve locked instance %s" % self.op.instance_name
3486 _CheckNodeOnline(self, instance.primary_node)
3488 # check bridges existence
3489 _CheckInstanceBridgesExist(self, instance)
3491 def Exec(self, feedback_fn):
3492 """Reboot the instance.
3495 instance = self.instance
3496 ignore_secondaries = self.op.ignore_secondaries
3497 reboot_type = self.op.reboot_type
3499 node_current = instance.primary_node
3501 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3502 constants.INSTANCE_REBOOT_HARD]:
3503 for disk in instance.disks:
3504 self.cfg.SetDiskID(disk, node_current)
3505 result = self.rpc.call_instance_reboot(node_current, instance,
3507 result.Raise("Could not reboot instance")
3509 result = self.rpc.call_instance_shutdown(node_current, instance)
3510 result.Raise("Could not shutdown instance for full reboot")
3511 _ShutdownInstanceDisks(self, instance)
3512 _StartInstanceDisks(self, instance, ignore_secondaries)
3513 result = self.rpc.call_instance_start(node_current, instance, None, None)
3514 msg = result.fail_msg
3516 _ShutdownInstanceDisks(self, instance)
3517 raise errors.OpExecError("Could not start instance for"
3518 " full reboot: %s" % msg)
3520 self.cfg.MarkInstanceUp(instance.name)
3523 class LUShutdownInstance(LogicalUnit):
3524 """Shutdown an instance.
3527 HPATH = "instance-stop"
3528 HTYPE = constants.HTYPE_INSTANCE
3529 _OP_REQP = ["instance_name"]
3532 def ExpandNames(self):
3533 self._ExpandAndLockInstance()
3535 def BuildHooksEnv(self):
3538 This runs on master, primary and secondary nodes of the instance.
3541 env = _BuildInstanceHookEnvByObject(self, self.instance)
3542 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3545 def CheckPrereq(self):
3546 """Check prerequisites.
3548 This checks that the instance is in the cluster.
3551 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3552 assert self.instance is not None, \
3553 "Cannot retrieve locked instance %s" % self.op.instance_name
3554 _CheckNodeOnline(self, self.instance.primary_node)
3556 def Exec(self, feedback_fn):
3557 """Shutdown the instance.
3560 instance = self.instance
3561 node_current = instance.primary_node
3562 self.cfg.MarkInstanceDown(instance.name)
3563 result = self.rpc.call_instance_shutdown(node_current, instance)
3564 msg = result.fail_msg
3566 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3568 _ShutdownInstanceDisks(self, instance)
3571 class LUReinstallInstance(LogicalUnit):
3572 """Reinstall an instance.
3575 HPATH = "instance-reinstall"
3576 HTYPE = constants.HTYPE_INSTANCE
3577 _OP_REQP = ["instance_name"]
3580 def ExpandNames(self):
3581 self._ExpandAndLockInstance()
3583 def BuildHooksEnv(self):
3586 This runs on master, primary and secondary nodes of the instance.
3589 env = _BuildInstanceHookEnvByObject(self, self.instance)
3590 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3593 def CheckPrereq(self):
3594 """Check prerequisites.
3596 This checks that the instance is in the cluster and is not running.
3599 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3600 assert instance is not None, \
3601 "Cannot retrieve locked instance %s" % self.op.instance_name
3602 _CheckNodeOnline(self, instance.primary_node)
3604 if instance.disk_template == constants.DT_DISKLESS:
3605 raise errors.OpPrereqError("Instance '%s' has no disks" %
3606 self.op.instance_name)
3607 if instance.admin_up:
3608 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3609 self.op.instance_name)
3610 remote_info = self.rpc.call_instance_info(instance.primary_node,
3612 instance.hypervisor)
3613 remote_info.Raise("Error checking node %s" % instance.primary_node,
3615 if remote_info.payload:
3616 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3617 (self.op.instance_name,
3618 instance.primary_node))
3620 self.op.os_type = getattr(self.op, "os_type", None)
3621 if self.op.os_type is not None:
3623 pnode = self.cfg.GetNodeInfo(
3624 self.cfg.ExpandNodeName(instance.primary_node))
3626 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3628 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3629 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3630 (self.op.os_type, pnode.name), prereq=True)
3632 self.instance = instance
3634 def Exec(self, feedback_fn):
3635 """Reinstall the instance.
3638 inst = self.instance
3640 if self.op.os_type is not None:
3641 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3642 inst.os = self.op.os_type
3643 self.cfg.Update(inst)
3645 _StartInstanceDisks(self, inst, None)
3647 feedback_fn("Running the instance OS create scripts...")
3648 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3649 result.Raise("Could not install OS for instance %s on node %s" %
3650 (inst.name, inst.primary_node))
3652 _ShutdownInstanceDisks(self, inst)
3655 class LURecreateInstanceDisks(LogicalUnit):
3656 """Recreate an instance's missing disks.
3659 HPATH = "instance-recreate-disks"
3660 HTYPE = constants.HTYPE_INSTANCE
3661 _OP_REQP = ["instance_name", "disks"]
3664 def CheckArguments(self):
3665 """Check the arguments.
3668 if not isinstance(self.op.disks, list):
3669 raise errors.OpPrereqError("Invalid disks parameter")
3670 for item in self.op.disks:
3671 if (not isinstance(item, int) or
3673 raise errors.OpPrereqError("Invalid disk specification '%s'" %
3676 def ExpandNames(self):
3677 self._ExpandAndLockInstance()
3679 def BuildHooksEnv(self):
3682 This runs on master, primary and secondary nodes of the instance.
3685 env = _BuildInstanceHookEnvByObject(self, self.instance)
3686 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3689 def CheckPrereq(self):
3690 """Check prerequisites.
3692 This checks that the instance is in the cluster and is not running.
3695 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3696 assert instance is not None, \
3697 "Cannot retrieve locked instance %s" % self.op.instance_name
3698 _CheckNodeOnline(self, instance.primary_node)
3700 if instance.disk_template == constants.DT_DISKLESS:
3701 raise errors.OpPrereqError("Instance '%s' has no disks" %
3702 self.op.instance_name)
3703 if instance.admin_up:
3704 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3705 self.op.instance_name)
3706 remote_info = self.rpc.call_instance_info(instance.primary_node,
3708 instance.hypervisor)
3709 remote_info.Raise("Error checking node %s" % instance.primary_node,
3711 if remote_info.payload:
3712 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3713 (self.op.instance_name,
3714 instance.primary_node))
3716 if not self.op.disks:
3717 self.op.disks = range(len(instance.disks))
3719 for idx in self.op.disks:
3720 if idx >= len(instance.disks):
3721 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3723 self.instance = instance
3725 def Exec(self, feedback_fn):
3726 """Recreate the disks.
3730 for idx, disk in enumerate(self.instance.disks):
3731 if idx not in self.op.disks: # disk idx has not been passed in
3735 _CreateDisks(self, self.instance, to_skip=to_skip)
3738 class LURenameInstance(LogicalUnit):
3739 """Rename an instance.
3742 HPATH = "instance-rename"
3743 HTYPE = constants.HTYPE_INSTANCE
3744 _OP_REQP = ["instance_name", "new_name"]
3746 def BuildHooksEnv(self):
3749 This runs on master, primary and secondary nodes of the instance.
3752 env = _BuildInstanceHookEnvByObject(self, self.instance)
3753 env["INSTANCE_NEW_NAME"] = self.op.new_name
3754 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3757 def CheckPrereq(self):
3758 """Check prerequisites.
3760 This checks that the instance is in the cluster and is not running.
3763 instance = self.cfg.GetInstanceInfo(
3764 self.cfg.ExpandInstanceName(self.op.instance_name))
3765 if instance is None:
3766 raise errors.OpPrereqError("Instance '%s' not known" %
3767 self.op.instance_name)
3768 _CheckNodeOnline(self, instance.primary_node)
3770 if instance.admin_up:
3771 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3772 self.op.instance_name)
3773 remote_info = self.rpc.call_instance_info(instance.primary_node,
3775 instance.hypervisor)
3776 remote_info.Raise("Error checking node %s" % instance.primary_node,
3778 if remote_info.payload:
3779 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3780 (self.op.instance_name,
3781 instance.primary_node))
3782 self.instance = instance
3784 # new name verification
3785 name_info = utils.HostInfo(self.op.new_name)
3787 self.op.new_name = new_name = name_info.name
3788 instance_list = self.cfg.GetInstanceList()
3789 if new_name in instance_list:
3790 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3793 if not getattr(self.op, "ignore_ip", False):
3794 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3795 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3796 (name_info.ip, new_name))
3799 def Exec(self, feedback_fn):
3800 """Reinstall the instance.
3803 inst = self.instance
3804 old_name = inst.name
3806 if inst.disk_template == constants.DT_FILE:
3807 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3809 self.cfg.RenameInstance(inst.name, self.op.new_name)
3810 # Change the instance lock. This is definitely safe while we hold the BGL
3811 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3812 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3814 # re-read the instance from the configuration after rename
3815 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3817 if inst.disk_template == constants.DT_FILE:
3818 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3819 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3820 old_file_storage_dir,
3821 new_file_storage_dir)
3822 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3823 " (but the instance has been renamed in Ganeti)" %
3824 (inst.primary_node, old_file_storage_dir,
3825 new_file_storage_dir))
3827 _StartInstanceDisks(self, inst, None)
3829 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3831 msg = result.fail_msg
3833 msg = ("Could not run OS rename script for instance %s on node %s"
3834 " (but the instance has been renamed in Ganeti): %s" %
3835 (inst.name, inst.primary_node, msg))
3836 self.proc.LogWarning(msg)
3838 _ShutdownInstanceDisks(self, inst)
3841 class LURemoveInstance(LogicalUnit):
3842 """Remove an instance.
3845 HPATH = "instance-remove"
3846 HTYPE = constants.HTYPE_INSTANCE
3847 _OP_REQP = ["instance_name", "ignore_failures"]
3850 def ExpandNames(self):
3851 self._ExpandAndLockInstance()
3852 self.needed_locks[locking.LEVEL_NODE] = []
3853 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3855 def DeclareLocks(self, level):
3856 if level == locking.LEVEL_NODE:
3857 self._LockInstancesNodes()
3859 def BuildHooksEnv(self):
3862 This runs on master, primary and secondary nodes of the instance.
3865 env = _BuildInstanceHookEnvByObject(self, self.instance)
3866 nl = [self.cfg.GetMasterNode()]
3869 def CheckPrereq(self):
3870 """Check prerequisites.
3872 This checks that the instance is in the cluster.
3875 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3876 assert self.instance is not None, \
3877 "Cannot retrieve locked instance %s" % self.op.instance_name
3879 def Exec(self, feedback_fn):
3880 """Remove the instance.
3883 instance = self.instance
3884 logging.info("Shutting down instance %s on node %s",
3885 instance.name, instance.primary_node)
3887 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3888 msg = result.fail_msg
3890 if self.op.ignore_failures:
3891 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3893 raise errors.OpExecError("Could not shutdown instance %s on"
3895 (instance.name, instance.primary_node, msg))
3897 logging.info("Removing block devices for instance %s", instance.name)
3899 if not _RemoveDisks(self, instance):
3900 if self.op.ignore_failures:
3901 feedback_fn("Warning: can't remove instance's disks")
3903 raise errors.OpExecError("Can't remove instance's disks")
3905 logging.info("Removing instance %s out of cluster config", instance.name)
3907 self.cfg.RemoveInstance(instance.name)
3908 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3911 class LUQueryInstances(NoHooksLU):
3912 """Logical unit for querying instances.
3915 _OP_REQP = ["output_fields", "names", "use_locking"]
3917 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3919 "disk_template", "ip", "mac", "bridge",
3920 "nic_mode", "nic_link",
3921 "sda_size", "sdb_size", "vcpus", "tags",
3922 "network_port", "beparams",
3923 r"(disk)\.(size)/([0-9]+)",
3924 r"(disk)\.(sizes)", "disk_usage",
3925 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3926 r"(nic)\.(bridge)/([0-9]+)",
3927 r"(nic)\.(macs|ips|modes|links|bridges)",
3928 r"(disk|nic)\.(count)",
3929 "serial_no", "hypervisor", "hvparams",
3933 for name in constants.HVS_PARAMETERS] +
3935 for name in constants.BES_PARAMETERS])
3936 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3939 def ExpandNames(self):
3940 _CheckOutputFields(static=self._FIELDS_STATIC,
3941 dynamic=self._FIELDS_DYNAMIC,
3942 selected=self.op.output_fields)
3944 self.needed_locks = {}
3945 self.share_locks[locking.LEVEL_INSTANCE] = 1
3946 self.share_locks[locking.LEVEL_NODE] = 1
3949 self.wanted = _GetWantedInstances(self, self.op.names)
3951 self.wanted = locking.ALL_SET
3953 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3954 self.do_locking = self.do_node_query and self.op.use_locking
3956 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3957 self.needed_locks[locking.LEVEL_NODE] = []
3958 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3960 def DeclareLocks(self, level):
3961 if level == locking.LEVEL_NODE and self.do_locking:
3962 self._LockInstancesNodes()
3964 def CheckPrereq(self):
3965 """Check prerequisites.
3970 def Exec(self, feedback_fn):
3971 """Computes the list of nodes and their attributes.
3974 all_info = self.cfg.GetAllInstancesInfo()
3975 if self.wanted == locking.ALL_SET:
3976 # caller didn't specify instance names, so ordering is not important
3978 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3980 instance_names = all_info.keys()
3981 instance_names = utils.NiceSort(instance_names)
3983 # caller did specify names, so we must keep the ordering
3985 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3987 tgt_set = all_info.keys()
3988 missing = set(self.wanted).difference(tgt_set)
3990 raise errors.OpExecError("Some instances were removed before"
3991 " retrieving their data: %s" % missing)
3992 instance_names = self.wanted
3994 instance_list = [all_info[iname] for iname in instance_names]
3996 # begin data gathering
3998 nodes = frozenset([inst.primary_node for inst in instance_list])
3999 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4003 if self.do_node_query:
4005 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4007 result = node_data[name]
4009 # offline nodes will be in both lists
4010 off_nodes.append(name)
4011 if result.failed or result.fail_msg:
4012 bad_nodes.append(name)
4015 live_data.update(result.payload)
4016 # else no instance is alive
4018 live_data = dict([(name, {}) for name in instance_names])
4020 # end data gathering
4025 cluster = self.cfg.GetClusterInfo()
4026 for instance in instance_list:
4028 i_hv = cluster.FillHV(instance)
4029 i_be = cluster.FillBE(instance)
4030 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4031 nic.nicparams) for nic in instance.nics]
4032 for field in self.op.output_fields:
4033 st_match = self._FIELDS_STATIC.Matches(field)
4038 elif field == "pnode":
4039 val = instance.primary_node
4040 elif field == "snodes":
4041 val = list(instance.secondary_nodes)
4042 elif field == "admin_state":
4043 val = instance.admin_up
4044 elif field == "oper_state":
4045 if instance.primary_node in bad_nodes:
4048 val = bool(live_data.get(instance.name))
4049 elif field == "status":
4050 if instance.primary_node in off_nodes:
4051 val = "ERROR_nodeoffline"
4052 elif instance.primary_node in bad_nodes:
4053 val = "ERROR_nodedown"
4055 running = bool(live_data.get(instance.name))
4057 if instance.admin_up:
4062 if instance.admin_up:
4066 elif field == "oper_ram":
4067 if instance.primary_node in bad_nodes:
4069 elif instance.name in live_data:
4070 val = live_data[instance.name].get("memory", "?")
4073 elif field == "vcpus":
4074 val = i_be[constants.BE_VCPUS]
4075 elif field == "disk_template":
4076 val = instance.disk_template
4079 val = instance.nics[0].ip
4082 elif field == "nic_mode":
4084 val = i_nicp[0][constants.NIC_MODE]
4087 elif field == "nic_link":
4089 val = i_nicp[0][constants.NIC_LINK]
4092 elif field == "bridge":
4093 if (instance.nics and
4094 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4095 val = i_nicp[0][constants.NIC_LINK]
4098 elif field == "mac":
4100 val = instance.nics[0].mac
4103 elif field == "sda_size" or field == "sdb_size":
4104 idx = ord(field[2]) - ord('a')
4106 val = instance.FindDisk(idx).size
4107 except errors.OpPrereqError:
4109 elif field == "disk_usage": # total disk usage per node
4110 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4111 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4112 elif field == "tags":
4113 val = list(instance.GetTags())
4114 elif field == "serial_no":
4115 val = instance.serial_no
4116 elif field == "ctime":
4117 val = instance.ctime
4118 elif field == "mtime":
4119 val = instance.mtime
4120 elif field == "network_port":
4121 val = instance.network_port
4122 elif field == "hypervisor":
4123 val = instance.hypervisor
4124 elif field == "hvparams":
4126 elif (field.startswith(HVPREFIX) and
4127 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
4128 val = i_hv.get(field[len(HVPREFIX):], None)
4129 elif field == "beparams":
4131 elif (field.startswith(BEPREFIX) and
4132 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4133 val = i_be.get(field[len(BEPREFIX):], None)
4134 elif st_match and st_match.groups():
4135 # matches a variable list
4136 st_groups = st_match.groups()
4137 if st_groups and st_groups[0] == "disk":
4138 if st_groups[1] == "count":
4139 val = len(instance.disks)
4140 elif st_groups[1] == "sizes":
4141 val = [disk.size for disk in instance.disks]
4142 elif st_groups[1] == "size":
4144 val = instance.FindDisk(st_groups[2]).size
4145 except errors.OpPrereqError:
4148 assert False, "Unhandled disk parameter"
4149 elif st_groups[0] == "nic":
4150 if st_groups[1] == "count":
4151 val = len(instance.nics)
4152 elif st_groups[1] == "macs":
4153 val = [nic.mac for nic in instance.nics]
4154 elif st_groups[1] == "ips":
4155 val = [nic.ip for nic in instance.nics]
4156 elif st_groups[1] == "modes":
4157 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4158 elif st_groups[1] == "links":
4159 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4160 elif st_groups[1] == "bridges":
4163 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4164 val.append(nicp[constants.NIC_LINK])
4169 nic_idx = int(st_groups[2])
4170 if nic_idx >= len(instance.nics):
4173 if st_groups[1] == "mac":
4174 val = instance.nics[nic_idx].mac
4175 elif st_groups[1] == "ip":
4176 val = instance.nics[nic_idx].ip
4177 elif st_groups[1] == "mode":
4178 val = i_nicp[nic_idx][constants.NIC_MODE]
4179 elif st_groups[1] == "link":
4180 val = i_nicp[nic_idx][constants.NIC_LINK]
4181 elif st_groups[1] == "bridge":
4182 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4183 if nic_mode == constants.NIC_MODE_BRIDGED:
4184 val = i_nicp[nic_idx][constants.NIC_LINK]
4188 assert False, "Unhandled NIC parameter"
4190 assert False, ("Declared but unhandled variable parameter '%s'" %
4193 assert False, "Declared but unhandled parameter '%s'" % field
4200 class LUFailoverInstance(LogicalUnit):
4201 """Failover an instance.
4204 HPATH = "instance-failover"
4205 HTYPE = constants.HTYPE_INSTANCE
4206 _OP_REQP = ["instance_name", "ignore_consistency"]
4209 def ExpandNames(self):
4210 self._ExpandAndLockInstance()
4211 self.needed_locks[locking.LEVEL_NODE] = []
4212 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4214 def DeclareLocks(self, level):
4215 if level == locking.LEVEL_NODE:
4216 self._LockInstancesNodes()
4218 def BuildHooksEnv(self):
4221 This runs on master, primary and secondary nodes of the instance.
4225 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4227 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4228 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4231 def CheckPrereq(self):
4232 """Check prerequisites.
4234 This checks that the instance is in the cluster.
4237 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4238 assert self.instance is not None, \
4239 "Cannot retrieve locked instance %s" % self.op.instance_name
4241 bep = self.cfg.GetClusterInfo().FillBE(instance)
4242 if instance.disk_template not in constants.DTS_NET_MIRROR:
4243 raise errors.OpPrereqError("Instance's disk layout is not"
4244 " network mirrored, cannot failover.")
4246 secondary_nodes = instance.secondary_nodes
4247 if not secondary_nodes:
4248 raise errors.ProgrammerError("no secondary node but using "
4249 "a mirrored disk template")
4251 target_node = secondary_nodes[0]
4252 _CheckNodeOnline(self, target_node)
4253 _CheckNodeNotDrained(self, target_node)
4254 if instance.admin_up:
4255 # check memory requirements on the secondary node
4256 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4257 instance.name, bep[constants.BE_MEMORY],
4258 instance.hypervisor)
4260 self.LogInfo("Not checking memory on the secondary node as"
4261 " instance will not be started")
4263 # check bridge existance
4264 _CheckInstanceBridgesExist(self, instance, node=target_node)
4266 def Exec(self, feedback_fn):
4267 """Failover an instance.
4269 The failover is done by shutting it down on its present node and
4270 starting it on the secondary.
4273 instance = self.instance
4275 source_node = instance.primary_node
4276 target_node = instance.secondary_nodes[0]
4278 feedback_fn("* checking disk consistency between source and target")
4279 for dev in instance.disks:
4280 # for drbd, these are drbd over lvm
4281 if not _CheckDiskConsistency(self, dev, target_node, False):
4282 if instance.admin_up and not self.op.ignore_consistency:
4283 raise errors.OpExecError("Disk %s is degraded on target node,"
4284 " aborting failover." % dev.iv_name)
4286 feedback_fn("* shutting down instance on source node")
4287 logging.info("Shutting down instance %s on node %s",
4288 instance.name, source_node)
4290 result = self.rpc.call_instance_shutdown(source_node, instance)
4291 msg = result.fail_msg
4293 if self.op.ignore_consistency:
4294 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4295 " Proceeding anyway. Please make sure node"
4296 " %s is down. Error details: %s",
4297 instance.name, source_node, source_node, msg)
4299 raise errors.OpExecError("Could not shutdown instance %s on"
4301 (instance.name, source_node, msg))
4303 feedback_fn("* deactivating the instance's disks on source node")
4304 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4305 raise errors.OpExecError("Can't shut down the instance's disks.")
4307 instance.primary_node = target_node
4308 # distribute new instance config to the other nodes
4309 self.cfg.Update(instance)
4311 # Only start the instance if it's marked as up
4312 if instance.admin_up:
4313 feedback_fn("* activating the instance's disks on target node")
4314 logging.info("Starting instance %s on node %s",
4315 instance.name, target_node)
4317 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4318 ignore_secondaries=True)
4320 _ShutdownInstanceDisks(self, instance)
4321 raise errors.OpExecError("Can't activate the instance's disks")
4323 feedback_fn("* starting the instance on the target node")
4324 result = self.rpc.call_instance_start(target_node, instance, None, None)
4325 msg = result.fail_msg
4327 _ShutdownInstanceDisks(self, instance)
4328 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4329 (instance.name, target_node, msg))
4332 class LUMigrateInstance(LogicalUnit):
4333 """Migrate an instance.
4335 This is migration without shutting down, compared to the failover,
4336 which is done with shutdown.
4339 HPATH = "instance-migrate"
4340 HTYPE = constants.HTYPE_INSTANCE
4341 _OP_REQP = ["instance_name", "live", "cleanup"]
4345 def ExpandNames(self):
4346 self._ExpandAndLockInstance()
4348 self.needed_locks[locking.LEVEL_NODE] = []
4349 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4351 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4352 self.op.live, self.op.cleanup)
4353 self.tasklets = [self._migrater]
4355 def DeclareLocks(self, level):
4356 if level == locking.LEVEL_NODE:
4357 self._LockInstancesNodes()
4359 def BuildHooksEnv(self):
4362 This runs on master, primary and secondary nodes of the instance.
4365 instance = self._migrater.instance
4366 env = _BuildInstanceHookEnvByObject(self, instance)
4367 env["MIGRATE_LIVE"] = self.op.live
4368 env["MIGRATE_CLEANUP"] = self.op.cleanup
4369 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4373 class LUMoveInstance(LogicalUnit):
4374 """Move an instance by data-copying.
4377 HPATH = "instance-move"
4378 HTYPE = constants.HTYPE_INSTANCE
4379 _OP_REQP = ["instance_name", "target_node"]
4382 def ExpandNames(self):
4383 self._ExpandAndLockInstance()
4384 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4385 if target_node is None:
4386 raise errors.OpPrereqError("Node '%s' not known" %
4387 self.op.target_node)
4388 self.op.target_node = target_node
4389 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4390 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4392 def DeclareLocks(self, level):
4393 if level == locking.LEVEL_NODE:
4394 self._LockInstancesNodes(primary_only=True)
4396 def BuildHooksEnv(self):
4399 This runs on master, primary and secondary nodes of the instance.
4403 "TARGET_NODE": self.op.target_node,
4405 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4406 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4407 self.op.target_node]
4410 def CheckPrereq(self):
4411 """Check prerequisites.
4413 This checks that the instance is in the cluster.
4416 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4417 assert self.instance is not None, \
4418 "Cannot retrieve locked instance %s" % self.op.instance_name
4420 node = self.cfg.GetNodeInfo(self.op.target_node)
4421 assert node is not None, \
4422 "Cannot retrieve locked node %s" % self.op.target_node
4424 self.target_node = target_node = node.name
4426 if target_node == instance.primary_node:
4427 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4428 (instance.name, target_node))
4430 bep = self.cfg.GetClusterInfo().FillBE(instance)
4432 for idx, dsk in enumerate(instance.disks):
4433 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4434 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4437 _CheckNodeOnline(self, target_node)
4438 _CheckNodeNotDrained(self, target_node)
4440 if instance.admin_up:
4441 # check memory requirements on the secondary node
4442 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4443 instance.name, bep[constants.BE_MEMORY],
4444 instance.hypervisor)
4446 self.LogInfo("Not checking memory on the secondary node as"
4447 " instance will not be started")
4449 # check bridge existance
4450 _CheckInstanceBridgesExist(self, instance, node=target_node)
4452 def Exec(self, feedback_fn):
4453 """Move an instance.
4455 The move is done by shutting it down on its present node, copying
4456 the data over (slow) and starting it on the new node.
4459 instance = self.instance
4461 source_node = instance.primary_node
4462 target_node = self.target_node
4464 self.LogInfo("Shutting down instance %s on source node %s",
4465 instance.name, source_node)
4467 result = self.rpc.call_instance_shutdown(source_node, instance)
4468 msg = result.fail_msg
4470 if self.op.ignore_consistency:
4471 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4472 " Proceeding anyway. Please make sure node"
4473 " %s is down. Error details: %s",
4474 instance.name, source_node, source_node, msg)
4476 raise errors.OpExecError("Could not shutdown instance %s on"
4478 (instance.name, source_node, msg))
4480 # create the target disks
4482 _CreateDisks(self, instance, target_node=target_node)
4483 except errors.OpExecError:
4484 self.LogWarning("Device creation failed, reverting...")
4486 _RemoveDisks(self, instance, target_node=target_node)
4488 self.cfg.ReleaseDRBDMinors(instance.name)
4491 cluster_name = self.cfg.GetClusterInfo().cluster_name
4494 # activate, get path, copy the data over
4495 for idx, disk in enumerate(instance.disks):
4496 self.LogInfo("Copying data for disk %d", idx)
4497 result = self.rpc.call_blockdev_assemble(target_node, disk,
4498 instance.name, True)
4500 self.LogWarning("Can't assemble newly created disk %d: %s",
4501 idx, result.fail_msg)
4502 errs.append(result.fail_msg)
4504 dev_path = result.payload
4505 result = self.rpc.call_blockdev_export(source_node, disk,
4506 target_node, dev_path,
4509 self.LogWarning("Can't copy data over for disk %d: %s",
4510 idx, result.fail_msg)
4511 errs.append(result.fail_msg)
4515 self.LogWarning("Some disks failed to copy, aborting")
4517 _RemoveDisks(self, instance, target_node=target_node)
4519 self.cfg.ReleaseDRBDMinors(instance.name)
4520 raise errors.OpExecError("Errors during disk copy: %s" %
4523 instance.primary_node = target_node
4524 self.cfg.Update(instance)
4526 self.LogInfo("Removing the disks on the original node")
4527 _RemoveDisks(self, instance, target_node=source_node)
4529 # Only start the instance if it's marked as up
4530 if instance.admin_up:
4531 self.LogInfo("Starting instance %s on node %s",
4532 instance.name, target_node)
4534 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4535 ignore_secondaries=True)
4537 _ShutdownInstanceDisks(self, instance)
4538 raise errors.OpExecError("Can't activate the instance's disks")
4540 result = self.rpc.call_instance_start(target_node, instance, None, None)
4541 msg = result.fail_msg
4543 _ShutdownInstanceDisks(self, instance)
4544 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4545 (instance.name, target_node, msg))
4548 class LUMigrateNode(LogicalUnit):
4549 """Migrate all instances from a node.
4552 HPATH = "node-migrate"
4553 HTYPE = constants.HTYPE_NODE
4554 _OP_REQP = ["node_name", "live"]
4557 def ExpandNames(self):
4558 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4559 if self.op.node_name is None:
4560 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4562 self.needed_locks = {
4563 locking.LEVEL_NODE: [self.op.node_name],
4566 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4568 # Create tasklets for migrating instances for all instances on this node
4572 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4573 logging.debug("Migrating instance %s", inst.name)
4574 names.append(inst.name)
4576 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4578 self.tasklets = tasklets
4580 # Declare instance locks
4581 self.needed_locks[locking.LEVEL_INSTANCE] = names
4583 def DeclareLocks(self, level):
4584 if level == locking.LEVEL_NODE:
4585 self._LockInstancesNodes()
4587 def BuildHooksEnv(self):
4590 This runs on the master, the primary and all the secondaries.
4594 "NODE_NAME": self.op.node_name,
4597 nl = [self.cfg.GetMasterNode()]
4599 return (env, nl, nl)
4602 class TLMigrateInstance(Tasklet):
4603 def __init__(self, lu, instance_name, live, cleanup):
4604 """Initializes this class.
4607 Tasklet.__init__(self, lu)
4610 self.instance_name = instance_name
4612 self.cleanup = cleanup
4614 def CheckPrereq(self):
4615 """Check prerequisites.
4617 This checks that the instance is in the cluster.
4620 instance = self.cfg.GetInstanceInfo(
4621 self.cfg.ExpandInstanceName(self.instance_name))
4622 if instance is None:
4623 raise errors.OpPrereqError("Instance '%s' not known" %
4626 if instance.disk_template != constants.DT_DRBD8:
4627 raise errors.OpPrereqError("Instance's disk layout is not"
4628 " drbd8, cannot migrate.")
4630 secondary_nodes = instance.secondary_nodes
4631 if not secondary_nodes:
4632 raise errors.ConfigurationError("No secondary node but using"
4633 " drbd8 disk template")
4635 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4637 target_node = secondary_nodes[0]
4638 # check memory requirements on the secondary node
4639 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4640 instance.name, i_be[constants.BE_MEMORY],
4641 instance.hypervisor)
4643 # check bridge existance
4644 _CheckInstanceBridgesExist(self, instance, node=target_node)
4646 if not self.cleanup:
4647 _CheckNodeNotDrained(self, target_node)
4648 result = self.rpc.call_instance_migratable(instance.primary_node,
4650 result.Raise("Can't migrate, please use failover", prereq=True)
4652 self.instance = instance
4654 def _WaitUntilSync(self):
4655 """Poll with custom rpc for disk sync.
4657 This uses our own step-based rpc call.
4660 self.feedback_fn("* wait until resync is done")
4664 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4666 self.instance.disks)
4668 for node, nres in result.items():
4669 nres.Raise("Cannot resync disks on node %s" % node)
4670 node_done, node_percent = nres.payload
4671 all_done = all_done and node_done
4672 if node_percent is not None:
4673 min_percent = min(min_percent, node_percent)
4675 if min_percent < 100:
4676 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4679 def _EnsureSecondary(self, node):
4680 """Demote a node to secondary.
4683 self.feedback_fn("* switching node %s to secondary mode" % node)
4685 for dev in self.instance.disks:
4686 self.cfg.SetDiskID(dev, node)
4688 result = self.rpc.call_blockdev_close(node, self.instance.name,
4689 self.instance.disks)
4690 result.Raise("Cannot change disk to secondary on node %s" % node)
4692 def _GoStandalone(self):
4693 """Disconnect from the network.
4696 self.feedback_fn("* changing into standalone mode")
4697 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4698 self.instance.disks)
4699 for node, nres in result.items():
4700 nres.Raise("Cannot disconnect disks node %s" % node)
4702 def _GoReconnect(self, multimaster):
4703 """Reconnect to the network.
4709 msg = "single-master"
4710 self.feedback_fn("* changing disks into %s mode" % msg)
4711 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4712 self.instance.disks,
4713 self.instance.name, multimaster)
4714 for node, nres in result.items():
4715 nres.Raise("Cannot change disks config on node %s" % node)
4717 def _ExecCleanup(self):
4718 """Try to cleanup after a failed migration.
4720 The cleanup is done by:
4721 - check that the instance is running only on one node
4722 (and update the config if needed)
4723 - change disks on its secondary node to secondary
4724 - wait until disks are fully synchronized
4725 - disconnect from the network
4726 - change disks into single-master mode
4727 - wait again until disks are fully synchronized
4730 instance = self.instance
4731 target_node = self.target_node
4732 source_node = self.source_node
4734 # check running on only one node
4735 self.feedback_fn("* checking where the instance actually runs"
4736 " (if this hangs, the hypervisor might be in"
4738 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4739 for node, result in ins_l.items():
4740 result.Raise("Can't contact node %s" % node)
4742 runningon_source = instance.name in ins_l[source_node].payload
4743 runningon_target = instance.name in ins_l[target_node].payload
4745 if runningon_source and runningon_target:
4746 raise errors.OpExecError("Instance seems to be running on two nodes,"
4747 " or the hypervisor is confused. You will have"
4748 " to ensure manually that it runs only on one"
4749 " and restart this operation.")
4751 if not (runningon_source or runningon_target):
4752 raise errors.OpExecError("Instance does not seem to be running at all."
4753 " In this case, it's safer to repair by"
4754 " running 'gnt-instance stop' to ensure disk"
4755 " shutdown, and then restarting it.")
4757 if runningon_target:
4758 # the migration has actually succeeded, we need to update the config
4759 self.feedback_fn("* instance running on secondary node (%s),"
4760 " updating config" % target_node)
4761 instance.primary_node = target_node
4762 self.cfg.Update(instance)
4763 demoted_node = source_node
4765 self.feedback_fn("* instance confirmed to be running on its"
4766 " primary node (%s)" % source_node)
4767 demoted_node = target_node
4769 self._EnsureSecondary(demoted_node)
4771 self._WaitUntilSync()
4772 except errors.OpExecError:
4773 # we ignore here errors, since if the device is standalone, it
4774 # won't be able to sync
4776 self._GoStandalone()
4777 self._GoReconnect(False)
4778 self._WaitUntilSync()
4780 self.feedback_fn("* done")
4782 def _RevertDiskStatus(self):
4783 """Try to revert the disk status after a failed migration.
4786 target_node = self.target_node
4788 self._EnsureSecondary(target_node)
4789 self._GoStandalone()
4790 self._GoReconnect(False)
4791 self._WaitUntilSync()
4792 except errors.OpExecError, err:
4793 self.lu.LogWarning("Migration failed and I can't reconnect the"
4794 " drives: error '%s'\n"
4795 "Please look and recover the instance status" %
4798 def _AbortMigration(self):
4799 """Call the hypervisor code to abort a started migration.
4802 instance = self.instance
4803 target_node = self.target_node
4804 migration_info = self.migration_info
4806 abort_result = self.rpc.call_finalize_migration(target_node,
4810 abort_msg = abort_result.fail_msg
4812 logging.error("Aborting migration failed on target node %s: %s" %
4813 (target_node, abort_msg))
4814 # Don't raise an exception here, as we stil have to try to revert the
4815 # disk status, even if this step failed.
4817 def _ExecMigration(self):
4818 """Migrate an instance.
4820 The migrate is done by:
4821 - change the disks into dual-master mode
4822 - wait until disks are fully synchronized again
4823 - migrate the instance
4824 - change disks on the new secondary node (the old primary) to secondary
4825 - wait until disks are fully synchronized
4826 - change disks into single-master mode
4829 instance = self.instance
4830 target_node = self.target_node
4831 source_node = self.source_node
4833 self.feedback_fn("* checking disk consistency between source and target")
4834 for dev in instance.disks:
4835 if not _CheckDiskConsistency(self, dev, target_node, False):
4836 raise errors.OpExecError("Disk %s is degraded or not fully"
4837 " synchronized on target node,"
4838 " aborting migrate." % dev.iv_name)
4840 # First get the migration information from the remote node
4841 result = self.rpc.call_migration_info(source_node, instance)
4842 msg = result.fail_msg
4844 log_err = ("Failed fetching source migration information from %s: %s" %
4846 logging.error(log_err)
4847 raise errors.OpExecError(log_err)
4849 self.migration_info = migration_info = result.payload
4851 # Then switch the disks to master/master mode
4852 self._EnsureSecondary(target_node)
4853 self._GoStandalone()
4854 self._GoReconnect(True)
4855 self._WaitUntilSync()
4857 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4858 result = self.rpc.call_accept_instance(target_node,
4861 self.nodes_ip[target_node])
4863 msg = result.fail_msg
4865 logging.error("Instance pre-migration failed, trying to revert"
4866 " disk status: %s", msg)
4867 self._AbortMigration()
4868 self._RevertDiskStatus()
4869 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4870 (instance.name, msg))
4872 self.feedback_fn("* migrating instance to %s" % target_node)
4874 result = self.rpc.call_instance_migrate(source_node, instance,
4875 self.nodes_ip[target_node],
4877 msg = result.fail_msg
4879 logging.error("Instance migration failed, trying to revert"
4880 " disk status: %s", msg)
4881 self._AbortMigration()
4882 self._RevertDiskStatus()
4883 raise errors.OpExecError("Could not migrate instance %s: %s" %
4884 (instance.name, msg))
4887 instance.primary_node = target_node
4888 # distribute new instance config to the other nodes
4889 self.cfg.Update(instance)
4891 result = self.rpc.call_finalize_migration(target_node,
4895 msg = result.fail_msg
4897 logging.error("Instance migration succeeded, but finalization failed:"
4899 raise errors.OpExecError("Could not finalize instance migration: %s" %
4902 self._EnsureSecondary(source_node)
4903 self._WaitUntilSync()
4904 self._GoStandalone()
4905 self._GoReconnect(False)
4906 self._WaitUntilSync()
4908 self.feedback_fn("* done")
4910 def Exec(self, feedback_fn):
4911 """Perform the migration.
4914 feedback_fn("Migrating instance %s" % self.instance.name)
4916 self.feedback_fn = feedback_fn
4918 self.source_node = self.instance.primary_node
4919 self.target_node = self.instance.secondary_nodes[0]
4920 self.all_nodes = [self.source_node, self.target_node]
4922 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4923 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4927 return self._ExecCleanup()
4929 return self._ExecMigration()
4932 def _CreateBlockDev(lu, node, instance, device, force_create,
4934 """Create a tree of block devices on a given node.
4936 If this device type has to be created on secondaries, create it and
4939 If not, just recurse to children keeping the same 'force' value.
4941 @param lu: the lu on whose behalf we execute
4942 @param node: the node on which to create the device
4943 @type instance: L{objects.Instance}
4944 @param instance: the instance which owns the device
4945 @type device: L{objects.Disk}
4946 @param device: the device to create
4947 @type force_create: boolean
4948 @param force_create: whether to force creation of this device; this
4949 will be change to True whenever we find a device which has
4950 CreateOnSecondary() attribute
4951 @param info: the extra 'metadata' we should attach to the device
4952 (this will be represented as a LVM tag)
4953 @type force_open: boolean
4954 @param force_open: this parameter will be passes to the
4955 L{backend.BlockdevCreate} function where it specifies
4956 whether we run on primary or not, and it affects both
4957 the child assembly and the device own Open() execution
4960 if device.CreateOnSecondary():
4964 for child in device.children:
4965 _CreateBlockDev(lu, node, instance, child, force_create,
4968 if not force_create:
4971 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4974 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4975 """Create a single block device on a given node.
4977 This will not recurse over children of the device, so they must be
4980 @param lu: the lu on whose behalf we execute
4981 @param node: the node on which to create the device
4982 @type instance: L{objects.Instance}
4983 @param instance: the instance which owns the device
4984 @type device: L{objects.Disk}
4985 @param device: the device to create
4986 @param info: the extra 'metadata' we should attach to the device
4987 (this will be represented as a LVM tag)
4988 @type force_open: boolean
4989 @param force_open: this parameter will be passes to the
4990 L{backend.BlockdevCreate} function where it specifies
4991 whether we run on primary or not, and it affects both
4992 the child assembly and the device own Open() execution
4995 lu.cfg.SetDiskID(device, node)
4996 result = lu.rpc.call_blockdev_create(node, device, device.size,
4997 instance.name, force_open, info)
4998 result.Raise("Can't create block device %s on"
4999 " node %s for instance %s" % (device, node, instance.name))
5000 if device.physical_id is None:
5001 device.physical_id = result.payload
5004 def _GenerateUniqueNames(lu, exts):
5005 """Generate a suitable LV name.
5007 This will generate a logical volume name for the given instance.
5012 new_id = lu.cfg.GenerateUniqueID()
5013 results.append("%s%s" % (new_id, val))
5017 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5019 """Generate a drbd8 device complete with its children.
5022 port = lu.cfg.AllocatePort()
5023 vgname = lu.cfg.GetVGName()
5024 shared_secret = lu.cfg.GenerateDRBDSecret()
5025 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5026 logical_id=(vgname, names[0]))
5027 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5028 logical_id=(vgname, names[1]))
5029 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5030 logical_id=(primary, secondary, port,
5033 children=[dev_data, dev_meta],
5038 def _GenerateDiskTemplate(lu, template_name,
5039 instance_name, primary_node,
5040 secondary_nodes, disk_info,
5041 file_storage_dir, file_driver,
5043 """Generate the entire disk layout for a given template type.
5046 #TODO: compute space requirements
5048 vgname = lu.cfg.GetVGName()
5049 disk_count = len(disk_info)
5051 if template_name == constants.DT_DISKLESS:
5053 elif template_name == constants.DT_PLAIN:
5054 if len(secondary_nodes) != 0:
5055 raise errors.ProgrammerError("Wrong template configuration")
5057 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5058 for i in range(disk_count)])
5059 for idx, disk in enumerate(disk_info):
5060 disk_index = idx + base_index
5061 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5062 logical_id=(vgname, names[idx]),
5063 iv_name="disk/%d" % disk_index,
5065 disks.append(disk_dev)
5066 elif template_name == constants.DT_DRBD8:
5067 if len(secondary_nodes) != 1:
5068 raise errors.ProgrammerError("Wrong template configuration")
5069 remote_node = secondary_nodes[0]
5070 minors = lu.cfg.AllocateDRBDMinor(
5071 [primary_node, remote_node] * len(disk_info), instance_name)
5074 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5075 for i in range(disk_count)]):
5076 names.append(lv_prefix + "_data")
5077 names.append(lv_prefix + "_meta")
5078 for idx, disk in enumerate(disk_info):
5079 disk_index = idx + base_index
5080 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5081 disk["size"], names[idx*2:idx*2+2],
5082 "disk/%d" % disk_index,
5083 minors[idx*2], minors[idx*2+1])
5084 disk_dev.mode = disk["mode"]
5085 disks.append(disk_dev)
5086 elif template_name == constants.DT_FILE:
5087 if len(secondary_nodes) != 0:
5088 raise errors.ProgrammerError("Wrong template configuration")
5090 for idx, disk in enumerate(disk_info):
5091 disk_index = idx + base_index
5092 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5093 iv_name="disk/%d" % disk_index,
5094 logical_id=(file_driver,
5095 "%s/disk%d" % (file_storage_dir,
5098 disks.append(disk_dev)
5100 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5104 def _GetInstanceInfoText(instance):
5105 """Compute that text that should be added to the disk's metadata.
5108 return "originstname+%s" % instance.name
5111 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5112 """Create all disks for an instance.
5114 This abstracts away some work from AddInstance.
5116 @type lu: L{LogicalUnit}
5117 @param lu: the logical unit on whose behalf we execute
5118 @type instance: L{objects.Instance}
5119 @param instance: the instance whose disks we should create
5121 @param to_skip: list of indices to skip
5122 @type target_node: string
5123 @param target_node: if passed, overrides the target node for creation
5125 @return: the success of the creation
5128 info = _GetInstanceInfoText(instance)
5129 if target_node is None:
5130 pnode = instance.primary_node
5131 all_nodes = instance.all_nodes
5136 if instance.disk_template == constants.DT_FILE:
5137 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5138 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5140 result.Raise("Failed to create directory '%s' on"
5141 " node %s: %s" % (file_storage_dir, pnode))
5143 # Note: this needs to be kept in sync with adding of disks in
5144 # LUSetInstanceParams
5145 for idx, device in enumerate(instance.disks):
5146 if to_skip and idx in to_skip:
5148 logging.info("Creating volume %s for instance %s",
5149 device.iv_name, instance.name)
5151 for node in all_nodes:
5152 f_create = node == pnode
5153 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5156 def _RemoveDisks(lu, instance, target_node=None):
5157 """Remove all disks for an instance.
5159 This abstracts away some work from `AddInstance()` and
5160 `RemoveInstance()`. Note that in case some of the devices couldn't
5161 be removed, the removal will continue with the other ones (compare
5162 with `_CreateDisks()`).
5164 @type lu: L{LogicalUnit}
5165 @param lu: the logical unit on whose behalf we execute
5166 @type instance: L{objects.Instance}
5167 @param instance: the instance whose disks we should remove
5168 @type target_node: string
5169 @param target_node: used to override the node on which to remove the disks
5171 @return: the success of the removal
5174 logging.info("Removing block devices for instance %s", instance.name)
5177 for device in instance.disks:
5179 edata = [(target_node, device)]
5181 edata = device.ComputeNodeTree(instance.primary_node)
5182 for node, disk in edata:
5183 lu.cfg.SetDiskID(disk, node)
5184 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5186 lu.LogWarning("Could not remove block device %s on node %s,"
5187 " continuing anyway: %s", device.iv_name, node, msg)
5190 if instance.disk_template == constants.DT_FILE:
5191 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5192 if target_node is node:
5193 tgt = instance.primary_node
5195 tgt = instance.target_node
5196 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5198 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5199 file_storage_dir, instance.primary_node, result.fail_msg)
5205 def _ComputeDiskSize(disk_template, disks):
5206 """Compute disk size requirements in the volume group
5209 # Required free disk space as a function of disk and swap space
5211 constants.DT_DISKLESS: None,
5212 constants.DT_PLAIN: sum(d["size"] for d in disks),
5213 # 128 MB are added for drbd metadata for each disk
5214 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5215 constants.DT_FILE: None,
5218 if disk_template not in req_size_dict:
5219 raise errors.ProgrammerError("Disk template '%s' size requirement"
5220 " is unknown" % disk_template)
5222 return req_size_dict[disk_template]
5225 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5226 """Hypervisor parameter validation.
5228 This function abstract the hypervisor parameter validation to be
5229 used in both instance create and instance modify.
5231 @type lu: L{LogicalUnit}
5232 @param lu: the logical unit for which we check
5233 @type nodenames: list
5234 @param nodenames: the list of nodes on which we should check
5235 @type hvname: string
5236 @param hvname: the name of the hypervisor we should use
5237 @type hvparams: dict
5238 @param hvparams: the parameters which we need to check
5239 @raise errors.OpPrereqError: if the parameters are not valid
5242 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5245 for node in nodenames:
5249 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5252 class LUCreateInstance(LogicalUnit):
5253 """Create an instance.
5256 HPATH = "instance-add"
5257 HTYPE = constants.HTYPE_INSTANCE
5258 _OP_REQP = ["instance_name", "disks", "disk_template",
5260 "wait_for_sync", "ip_check", "nics",
5261 "hvparams", "beparams"]
5264 def _ExpandNode(self, node):
5265 """Expands and checks one node name.
5268 node_full = self.cfg.ExpandNodeName(node)
5269 if node_full is None:
5270 raise errors.OpPrereqError("Unknown node %s" % node)
5273 def ExpandNames(self):
5274 """ExpandNames for CreateInstance.
5276 Figure out the right locks for instance creation.
5279 self.needed_locks = {}
5281 # set optional parameters to none if they don't exist
5282 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5283 if not hasattr(self.op, attr):
5284 setattr(self.op, attr, None)
5286 # cheap checks, mostly valid constants given
5288 # verify creation mode
5289 if self.op.mode not in (constants.INSTANCE_CREATE,
5290 constants.INSTANCE_IMPORT):
5291 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5294 # disk template and mirror node verification
5295 if self.op.disk_template not in constants.DISK_TEMPLATES:
5296 raise errors.OpPrereqError("Invalid disk template name")
5298 if self.op.hypervisor is None:
5299 self.op.hypervisor = self.cfg.GetHypervisorType()
5301 cluster = self.cfg.GetClusterInfo()
5302 enabled_hvs = cluster.enabled_hypervisors
5303 if self.op.hypervisor not in enabled_hvs:
5304 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5305 " cluster (%s)" % (self.op.hypervisor,
5306 ",".join(enabled_hvs)))
5308 # check hypervisor parameter syntax (locally)
5309 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5310 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5312 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5313 hv_type.CheckParameterSyntax(filled_hvp)
5314 self.hv_full = filled_hvp
5316 # fill and remember the beparams dict
5317 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5318 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5321 #### instance parameters check
5323 # instance name verification
5324 hostname1 = utils.HostInfo(self.op.instance_name)
5325 self.op.instance_name = instance_name = hostname1.name
5327 # this is just a preventive check, but someone might still add this
5328 # instance in the meantime, and creation will fail at lock-add time
5329 if instance_name in self.cfg.GetInstanceList():
5330 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5333 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5337 for idx, nic in enumerate(self.op.nics):
5338 nic_mode_req = nic.get("mode", None)
5339 nic_mode = nic_mode_req
5340 if nic_mode is None:
5341 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5343 # in routed mode, for the first nic, the default ip is 'auto'
5344 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5345 default_ip_mode = constants.VALUE_AUTO
5347 default_ip_mode = constants.VALUE_NONE
5349 # ip validity checks
5350 ip = nic.get("ip", default_ip_mode)
5351 if ip is None or ip.lower() == constants.VALUE_NONE:
5353 elif ip.lower() == constants.VALUE_AUTO:
5354 nic_ip = hostname1.ip
5356 if not utils.IsValidIP(ip):
5357 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5358 " like a valid IP" % ip)
5361 # TODO: check the ip for uniqueness !!
5362 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5363 raise errors.OpPrereqError("Routed nic mode requires an ip address")
5365 # MAC address verification
5366 mac = nic.get("mac", constants.VALUE_AUTO)
5367 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5368 if not utils.IsValidMac(mac.lower()):
5369 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5371 # bridge verification
5372 bridge = nic.get("bridge", None)
5373 link = nic.get("link", None)
5375 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5376 " at the same time")
5377 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5378 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
5384 nicparams[constants.NIC_MODE] = nic_mode_req
5386 nicparams[constants.NIC_LINK] = link
5388 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5390 objects.NIC.CheckParameterSyntax(check_params)
5391 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5393 # disk checks/pre-build
5395 for disk in self.op.disks:
5396 mode = disk.get("mode", constants.DISK_RDWR)
5397 if mode not in constants.DISK_ACCESS_SET:
5398 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5400 size = disk.get("size", None)
5402 raise errors.OpPrereqError("Missing disk size")
5406 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
5407 self.disks.append({"size": size, "mode": mode})
5409 # used in CheckPrereq for ip ping check
5410 self.check_ip = hostname1.ip
5412 # file storage checks
5413 if (self.op.file_driver and
5414 not self.op.file_driver in constants.FILE_DRIVER):
5415 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5416 self.op.file_driver)
5418 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5419 raise errors.OpPrereqError("File storage directory path not absolute")
5421 ### Node/iallocator related checks
5422 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5423 raise errors.OpPrereqError("One and only one of iallocator and primary"
5424 " node must be given")
5426 if self.op.iallocator:
5427 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5429 self.op.pnode = self._ExpandNode(self.op.pnode)
5430 nodelist = [self.op.pnode]
5431 if self.op.snode is not None:
5432 self.op.snode = self._ExpandNode(self.op.snode)
5433 nodelist.append(self.op.snode)
5434 self.needed_locks[locking.LEVEL_NODE] = nodelist
5436 # in case of import lock the source node too
5437 if self.op.mode == constants.INSTANCE_IMPORT:
5438 src_node = getattr(self.op, "src_node", None)
5439 src_path = getattr(self.op, "src_path", None)
5441 if src_path is None:
5442 self.op.src_path = src_path = self.op.instance_name
5444 if src_node is None:
5445 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5446 self.op.src_node = None
5447 if os.path.isabs(src_path):
5448 raise errors.OpPrereqError("Importing an instance from an absolute"
5449 " path requires a source node option.")
5451 self.op.src_node = src_node = self._ExpandNode(src_node)
5452 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5453 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5454 if not os.path.isabs(src_path):
5455 self.op.src_path = src_path = \
5456 os.path.join(constants.EXPORT_DIR, src_path)
5458 else: # INSTANCE_CREATE
5459 if getattr(self.op, "os_type", None) is None:
5460 raise errors.OpPrereqError("No guest OS specified")
5462 def _RunAllocator(self):
5463 """Run the allocator based on input opcode.
5466 nics = [n.ToDict() for n in self.nics]
5467 ial = IAllocator(self.cfg, self.rpc,
5468 mode=constants.IALLOCATOR_MODE_ALLOC,
5469 name=self.op.instance_name,
5470 disk_template=self.op.disk_template,
5473 vcpus=self.be_full[constants.BE_VCPUS],
5474 mem_size=self.be_full[constants.BE_MEMORY],
5477 hypervisor=self.op.hypervisor,
5480 ial.Run(self.op.iallocator)
5483 raise errors.OpPrereqError("Can't compute nodes using"
5484 " iallocator '%s': %s" % (self.op.iallocator,
5486 if len(ial.nodes) != ial.required_nodes:
5487 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5488 " of nodes (%s), required %s" %
5489 (self.op.iallocator, len(ial.nodes),
5490 ial.required_nodes))
5491 self.op.pnode = ial.nodes[0]
5492 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5493 self.op.instance_name, self.op.iallocator,
5494 ", ".join(ial.nodes))
5495 if ial.required_nodes == 2:
5496 self.op.snode = ial.nodes[1]
5498 def BuildHooksEnv(self):
5501 This runs on master, primary and secondary nodes of the instance.
5505 "ADD_MODE": self.op.mode,
5507 if self.op.mode == constants.INSTANCE_IMPORT:
5508 env["SRC_NODE"] = self.op.src_node
5509 env["SRC_PATH"] = self.op.src_path
5510 env["SRC_IMAGES"] = self.src_images
5512 env.update(_BuildInstanceHookEnv(
5513 name=self.op.instance_name,
5514 primary_node=self.op.pnode,
5515 secondary_nodes=self.secondaries,
5516 status=self.op.start,
5517 os_type=self.op.os_type,
5518 memory=self.be_full[constants.BE_MEMORY],
5519 vcpus=self.be_full[constants.BE_VCPUS],
5520 nics=_NICListToTuple(self, self.nics),
5521 disk_template=self.op.disk_template,
5522 disks=[(d["size"], d["mode"]) for d in self.disks],
5525 hypervisor_name=self.op.hypervisor,
5528 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5533 def CheckPrereq(self):
5534 """Check prerequisites.
5537 if (not self.cfg.GetVGName() and
5538 self.op.disk_template not in constants.DTS_NOT_LVM):
5539 raise errors.OpPrereqError("Cluster does not support lvm-based"
5542 if self.op.mode == constants.INSTANCE_IMPORT:
5543 src_node = self.op.src_node
5544 src_path = self.op.src_path
5546 if src_node is None:
5547 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5548 exp_list = self.rpc.call_export_list(locked_nodes)
5550 for node in exp_list:
5551 if exp_list[node].fail_msg:
5553 if src_path in exp_list[node].payload:
5555 self.op.src_node = src_node = node
5556 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5560 raise errors.OpPrereqError("No export found for relative path %s" %
5563 _CheckNodeOnline(self, src_node)
5564 result = self.rpc.call_export_info(src_node, src_path)
5565 result.Raise("No export or invalid export found in dir %s" % src_path)
5567 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5568 if not export_info.has_section(constants.INISECT_EXP):
5569 raise errors.ProgrammerError("Corrupted export config")
5571 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5572 if (int(ei_version) != constants.EXPORT_VERSION):
5573 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5574 (ei_version, constants.EXPORT_VERSION))
5576 # Check that the new instance doesn't have less disks than the export
5577 instance_disks = len(self.disks)
5578 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5579 if instance_disks < export_disks:
5580 raise errors.OpPrereqError("Not enough disks to import."
5581 " (instance: %d, export: %d)" %
5582 (instance_disks, export_disks))
5584 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5586 for idx in range(export_disks):
5587 option = 'disk%d_dump' % idx
5588 if export_info.has_option(constants.INISECT_INS, option):
5589 # FIXME: are the old os-es, disk sizes, etc. useful?
5590 export_name = export_info.get(constants.INISECT_INS, option)
5591 image = os.path.join(src_path, export_name)
5592 disk_images.append(image)
5594 disk_images.append(False)
5596 self.src_images = disk_images
5598 old_name = export_info.get(constants.INISECT_INS, 'name')
5599 # FIXME: int() here could throw a ValueError on broken exports
5600 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5601 if self.op.instance_name == old_name:
5602 for idx, nic in enumerate(self.nics):
5603 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5604 nic_mac_ini = 'nic%d_mac' % idx
5605 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5607 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5608 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5609 if self.op.start and not self.op.ip_check:
5610 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5611 " adding an instance in start mode")
5613 if self.op.ip_check:
5614 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5615 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5616 (self.check_ip, self.op.instance_name))
5618 #### mac address generation
5619 # By generating here the mac address both the allocator and the hooks get
5620 # the real final mac address rather than the 'auto' or 'generate' value.
5621 # There is a race condition between the generation and the instance object
5622 # creation, which means that we know the mac is valid now, but we're not
5623 # sure it will be when we actually add the instance. If things go bad
5624 # adding the instance will abort because of a duplicate mac, and the
5625 # creation job will fail.
5626 for nic in self.nics:
5627 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5628 nic.mac = self.cfg.GenerateMAC()
5632 if self.op.iallocator is not None:
5633 self._RunAllocator()
5635 #### node related checks
5637 # check primary node
5638 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5639 assert self.pnode is not None, \
5640 "Cannot retrieve locked node %s" % self.op.pnode
5642 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5645 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5648 self.secondaries = []
5650 # mirror node verification
5651 if self.op.disk_template in constants.DTS_NET_MIRROR:
5652 if self.op.snode is None:
5653 raise errors.OpPrereqError("The networked disk templates need"
5655 if self.op.snode == pnode.name:
5656 raise errors.OpPrereqError("The secondary node cannot be"
5657 " the primary node.")
5658 _CheckNodeOnline(self, self.op.snode)
5659 _CheckNodeNotDrained(self, self.op.snode)
5660 self.secondaries.append(self.op.snode)
5662 nodenames = [pnode.name] + self.secondaries
5664 req_size = _ComputeDiskSize(self.op.disk_template,
5667 # Check lv size requirements
5668 if req_size is not None:
5669 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5671 for node in nodenames:
5672 info = nodeinfo[node]
5673 info.Raise("Cannot get current information from node %s" % node)
5675 vg_free = info.get('vg_free', None)
5676 if not isinstance(vg_free, int):
5677 raise errors.OpPrereqError("Can't compute free disk space on"
5679 if req_size > vg_free:
5680 raise errors.OpPrereqError("Not enough disk space on target node %s."
5681 " %d MB available, %d MB required" %
5682 (node, vg_free, req_size))
5684 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5687 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5688 result.Raise("OS '%s' not in supported os list for primary node %s" %
5689 (self.op.os_type, pnode.name), prereq=True)
5691 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5693 # memory check on primary node
5695 _CheckNodeFreeMemory(self, self.pnode.name,
5696 "creating instance %s" % self.op.instance_name,
5697 self.be_full[constants.BE_MEMORY],
5700 self.dry_run_result = list(nodenames)
5702 def Exec(self, feedback_fn):
5703 """Create and add the instance to the cluster.
5706 instance = self.op.instance_name
5707 pnode_name = self.pnode.name
5709 ht_kind = self.op.hypervisor
5710 if ht_kind in constants.HTS_REQ_PORT:
5711 network_port = self.cfg.AllocatePort()
5715 ##if self.op.vnc_bind_address is None:
5716 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5718 # this is needed because os.path.join does not accept None arguments
5719 if self.op.file_storage_dir is None:
5720 string_file_storage_dir = ""
5722 string_file_storage_dir = self.op.file_storage_dir
5724 # build the full file storage dir path
5725 file_storage_dir = os.path.normpath(os.path.join(
5726 self.cfg.GetFileStorageDir(),
5727 string_file_storage_dir, instance))
5730 disks = _GenerateDiskTemplate(self,
5731 self.op.disk_template,
5732 instance, pnode_name,
5736 self.op.file_driver,
5739 iobj = objects.Instance(name=instance, os=self.op.os_type,
5740 primary_node=pnode_name,
5741 nics=self.nics, disks=disks,
5742 disk_template=self.op.disk_template,
5744 network_port=network_port,
5745 beparams=self.op.beparams,
5746 hvparams=self.op.hvparams,
5747 hypervisor=self.op.hypervisor,
5750 feedback_fn("* creating instance disks...")
5752 _CreateDisks(self, iobj)
5753 except errors.OpExecError:
5754 self.LogWarning("Device creation failed, reverting...")
5756 _RemoveDisks(self, iobj)
5758 self.cfg.ReleaseDRBDMinors(instance)
5761 feedback_fn("adding instance %s to cluster config" % instance)
5763 self.cfg.AddInstance(iobj)
5764 # Declare that we don't want to remove the instance lock anymore, as we've
5765 # added the instance to the config
5766 del self.remove_locks[locking.LEVEL_INSTANCE]
5767 # Unlock all the nodes
5768 if self.op.mode == constants.INSTANCE_IMPORT:
5769 nodes_keep = [self.op.src_node]
5770 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5771 if node != self.op.src_node]
5772 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5773 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5775 self.context.glm.release(locking.LEVEL_NODE)
5776 del self.acquired_locks[locking.LEVEL_NODE]
5778 if self.op.wait_for_sync:
5779 disk_abort = not _WaitForSync(self, iobj)
5780 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5781 # make sure the disks are not degraded (still sync-ing is ok)
5783 feedback_fn("* checking mirrors status")
5784 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5789 _RemoveDisks(self, iobj)
5790 self.cfg.RemoveInstance(iobj.name)
5791 # Make sure the instance lock gets removed
5792 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5793 raise errors.OpExecError("There are some degraded disks for"
5796 feedback_fn("creating os for instance %s on node %s" %
5797 (instance, pnode_name))
5799 if iobj.disk_template != constants.DT_DISKLESS:
5800 if self.op.mode == constants.INSTANCE_CREATE:
5801 feedback_fn("* running the instance OS create scripts...")
5802 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5803 result.Raise("Could not add os for instance %s"
5804 " on node %s" % (instance, pnode_name))
5806 elif self.op.mode == constants.INSTANCE_IMPORT:
5807 feedback_fn("* running the instance OS import scripts...")
5808 src_node = self.op.src_node
5809 src_images = self.src_images
5810 cluster_name = self.cfg.GetClusterName()
5811 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5812 src_node, src_images,
5814 msg = import_result.fail_msg
5816 self.LogWarning("Error while importing the disk images for instance"
5817 " %s on node %s: %s" % (instance, pnode_name, msg))
5819 # also checked in the prereq part
5820 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5824 iobj.admin_up = True
5825 self.cfg.Update(iobj)
5826 logging.info("Starting instance %s on node %s", instance, pnode_name)
5827 feedback_fn("* starting instance...")
5828 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5829 result.Raise("Could not start instance")
5831 return list(iobj.all_nodes)
5834 class LUConnectConsole(NoHooksLU):
5835 """Connect to an instance's console.
5837 This is somewhat special in that it returns the command line that
5838 you need to run on the master node in order to connect to the
5842 _OP_REQP = ["instance_name"]
5845 def ExpandNames(self):
5846 self._ExpandAndLockInstance()
5848 def CheckPrereq(self):
5849 """Check prerequisites.
5851 This checks that the instance is in the cluster.
5854 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5855 assert self.instance is not None, \
5856 "Cannot retrieve locked instance %s" % self.op.instance_name
5857 _CheckNodeOnline(self, self.instance.primary_node)
5859 def Exec(self, feedback_fn):
5860 """Connect to the console of an instance
5863 instance = self.instance
5864 node = instance.primary_node
5866 node_insts = self.rpc.call_instance_list([node],
5867 [instance.hypervisor])[node]
5868 node_insts.Raise("Can't get node information from %s" % node)
5870 if instance.name not in node_insts.payload:
5871 raise errors.OpExecError("Instance %s is not running." % instance.name)
5873 logging.debug("Connecting to console of %s on %s", instance.name, node)
5875 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5876 cluster = self.cfg.GetClusterInfo()
5877 # beparams and hvparams are passed separately, to avoid editing the
5878 # instance and then saving the defaults in the instance itself.
5879 hvparams = cluster.FillHV(instance)
5880 beparams = cluster.FillBE(instance)
5881 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5884 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5887 class LUReplaceDisks(LogicalUnit):
5888 """Replace the disks of an instance.
5891 HPATH = "mirrors-replace"
5892 HTYPE = constants.HTYPE_INSTANCE
5893 _OP_REQP = ["instance_name", "mode", "disks"]
5896 def CheckArguments(self):
5897 if not hasattr(self.op, "remote_node"):
5898 self.op.remote_node = None
5899 if not hasattr(self.op, "iallocator"):
5900 self.op.iallocator = None
5902 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
5905 def ExpandNames(self):
5906 self._ExpandAndLockInstance()
5908 if self.op.iallocator is not None:
5909 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5911 elif self.op.remote_node is not None:
5912 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5913 if remote_node is None:
5914 raise errors.OpPrereqError("Node '%s' not known" %
5915 self.op.remote_node)
5917 self.op.remote_node = remote_node
5919 # Warning: do not remove the locking of the new secondary here
5920 # unless DRBD8.AddChildren is changed to work in parallel;
5921 # currently it doesn't since parallel invocations of
5922 # FindUnusedMinor will conflict
5923 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5924 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5927 self.needed_locks[locking.LEVEL_NODE] = []
5928 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5930 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
5931 self.op.iallocator, self.op.remote_node,
5934 self.tasklets = [self.replacer]
5936 def DeclareLocks(self, level):
5937 # If we're not already locking all nodes in the set we have to declare the
5938 # instance's primary/secondary nodes.
5939 if (level == locking.LEVEL_NODE and
5940 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5941 self._LockInstancesNodes()
5943 def BuildHooksEnv(self):
5946 This runs on the master, the primary and all the secondaries.
5949 instance = self.replacer.instance
5951 "MODE": self.op.mode,
5952 "NEW_SECONDARY": self.op.remote_node,
5953 "OLD_SECONDARY": instance.secondary_nodes[0],
5955 env.update(_BuildInstanceHookEnvByObject(self, instance))
5957 self.cfg.GetMasterNode(),
5958 instance.primary_node,
5960 if self.op.remote_node is not None:
5961 nl.append(self.op.remote_node)
5965 class LUEvacuateNode(LogicalUnit):
5966 """Relocate the secondary instances from a node.
5969 HPATH = "node-evacuate"
5970 HTYPE = constants.HTYPE_NODE
5971 _OP_REQP = ["node_name"]
5974 def CheckArguments(self):
5975 if not hasattr(self.op, "remote_node"):
5976 self.op.remote_node = None
5977 if not hasattr(self.op, "iallocator"):
5978 self.op.iallocator = None
5980 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
5981 self.op.remote_node,
5984 def ExpandNames(self):
5985 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
5986 if self.op.node_name is None:
5987 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
5989 self.needed_locks = {}
5991 # Declare node locks
5992 if self.op.iallocator is not None:
5993 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5995 elif self.op.remote_node is not None:
5996 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5997 if remote_node is None:
5998 raise errors.OpPrereqError("Node '%s' not known" %
5999 self.op.remote_node)
6001 self.op.remote_node = remote_node
6003 # Warning: do not remove the locking of the new secondary here
6004 # unless DRBD8.AddChildren is changed to work in parallel;
6005 # currently it doesn't since parallel invocations of
6006 # FindUnusedMinor will conflict
6007 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6008 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6011 raise errors.OpPrereqError("Invalid parameters")
6013 # Create tasklets for replacing disks for all secondary instances on this
6018 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6019 logging.debug("Replacing disks for instance %s", inst.name)
6020 names.append(inst.name)
6022 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6023 self.op.iallocator, self.op.remote_node, [])
6024 tasklets.append(replacer)
6026 self.tasklets = tasklets
6027 self.instance_names = names
6029 # Declare instance locks
6030 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6032 def DeclareLocks(self, level):
6033 # If we're not already locking all nodes in the set we have to declare the
6034 # instance's primary/secondary nodes.
6035 if (level == locking.LEVEL_NODE and
6036 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6037 self._LockInstancesNodes()
6039 def BuildHooksEnv(self):
6042 This runs on the master, the primary and all the secondaries.
6046 "NODE_NAME": self.op.node_name,
6049 nl = [self.cfg.GetMasterNode()]
6051 if self.op.remote_node is not None:
6052 env["NEW_SECONDARY"] = self.op.remote_node
6053 nl.append(self.op.remote_node)
6055 return (env, nl, nl)
6058 class TLReplaceDisks(Tasklet):
6059 """Replaces disks for an instance.
6061 Note: Locking is not within the scope of this class.
6064 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6066 """Initializes this class.
6069 Tasklet.__init__(self, lu)
6072 self.instance_name = instance_name
6074 self.iallocator_name = iallocator_name
6075 self.remote_node = remote_node
6079 self.instance = None
6080 self.new_node = None
6081 self.target_node = None
6082 self.other_node = None
6083 self.remote_node_info = None
6084 self.node_secondary_ip = None
6087 def CheckArguments(mode, remote_node, iallocator):
6088 """Helper function for users of this class.
6091 # check for valid parameter combination
6092 if mode == constants.REPLACE_DISK_CHG:
6093 if remote_node is None and iallocator is None:
6094 raise errors.OpPrereqError("When changing the secondary either an"
6095 " iallocator script must be used or the"
6098 if remote_node is not None and iallocator is not None:
6099 raise errors.OpPrereqError("Give either the iallocator or the new"
6100 " secondary, not both")
6102 elif remote_node is not None or iallocator is not None:
6103 # Not replacing the secondary
6104 raise errors.OpPrereqError("The iallocator and new node options can"
6105 " only be used when changing the"
6109 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6110 """Compute a new secondary node using an IAllocator.
6113 ial = IAllocator(lu.cfg, lu.rpc,
6114 mode=constants.IALLOCATOR_MODE_RELOC,
6116 relocate_from=relocate_from)
6118 ial.Run(iallocator_name)
6121 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6122 " %s" % (iallocator_name, ial.info))
6124 if len(ial.nodes) != ial.required_nodes:
6125 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6126 " of nodes (%s), required %s" %
6127 (len(ial.nodes), ial.required_nodes))
6129 remote_node_name = ial.nodes[0]
6131 lu.LogInfo("Selected new secondary for instance '%s': %s",
6132 instance_name, remote_node_name)
6134 return remote_node_name
6136 def _FindFaultyDisks(self, node_name):
6137 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6140 def CheckPrereq(self):
6141 """Check prerequisites.
6143 This checks that the instance is in the cluster.
6146 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
6147 assert self.instance is not None, \
6148 "Cannot retrieve locked instance %s" % self.instance_name
6150 if self.instance.disk_template != constants.DT_DRBD8:
6151 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6154 if len(self.instance.secondary_nodes) != 1:
6155 raise errors.OpPrereqError("The instance has a strange layout,"
6156 " expected one secondary but found %d" %
6157 len(self.instance.secondary_nodes))
6159 secondary_node = self.instance.secondary_nodes[0]
6161 if self.iallocator_name is None:
6162 remote_node = self.remote_node
6164 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6165 self.instance.name, secondary_node)
6167 if remote_node is not None:
6168 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6169 assert self.remote_node_info is not None, \
6170 "Cannot retrieve locked node %s" % remote_node
6172 self.remote_node_info = None
6174 if remote_node == self.instance.primary_node:
6175 raise errors.OpPrereqError("The specified node is the primary node of"
6178 if remote_node == secondary_node:
6179 raise errors.OpPrereqError("The specified node is already the"
6180 " secondary node of the instance.")
6182 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6183 constants.REPLACE_DISK_CHG):
6184 raise errors.OpPrereqError("Cannot specify disks to be replaced")
6186 if self.mode == constants.REPLACE_DISK_AUTO:
6187 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
6188 faulty_secondary = self._FindFaultyDisks(secondary_node)
6190 if faulty_primary and faulty_secondary:
6191 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6192 " one node and can not be repaired"
6193 " automatically" % self.instance_name)
6196 self.disks = faulty_primary
6197 self.target_node = self.instance.primary_node
6198 self.other_node = secondary_node
6199 check_nodes = [self.target_node, self.other_node]
6200 elif faulty_secondary:
6201 self.disks = faulty_secondary
6202 self.target_node = secondary_node
6203 self.other_node = self.instance.primary_node
6204 check_nodes = [self.target_node, self.other_node]
6210 # Non-automatic modes
6211 if self.mode == constants.REPLACE_DISK_PRI:
6212 self.target_node = self.instance.primary_node
6213 self.other_node = secondary_node
6214 check_nodes = [self.target_node, self.other_node]
6216 elif self.mode == constants.REPLACE_DISK_SEC:
6217 self.target_node = secondary_node
6218 self.other_node = self.instance.primary_node
6219 check_nodes = [self.target_node, self.other_node]
6221 elif self.mode == constants.REPLACE_DISK_CHG:
6222 self.new_node = remote_node
6223 self.other_node = self.instance.primary_node
6224 self.target_node = secondary_node
6225 check_nodes = [self.new_node, self.other_node]
6227 _CheckNodeNotDrained(self.lu, remote_node)
6230 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6233 # If not specified all disks should be replaced
6235 self.disks = range(len(self.instance.disks))
6237 for node in check_nodes:
6238 _CheckNodeOnline(self.lu, node)
6240 # Check whether disks are valid
6241 for disk_idx in self.disks:
6242 self.instance.FindDisk(disk_idx)
6244 # Get secondary node IP addresses
6247 for node_name in [self.target_node, self.other_node, self.new_node]:
6248 if node_name is not None:
6249 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6251 self.node_secondary_ip = node_2nd_ip
6253 def Exec(self, feedback_fn):
6254 """Execute disk replacement.
6256 This dispatches the disk replacement to the appropriate handler.
6260 feedback_fn("No disks need replacement")
6263 feedback_fn("Replacing disk(s) %s for %s" %
6264 (", ".join([str(i) for i in self.disks]), self.instance.name))
6266 activate_disks = (not self.instance.admin_up)
6268 # Activate the instance disks if we're replacing them on a down instance
6270 _StartInstanceDisks(self.lu, self.instance, True)
6273 # Should we replace the secondary node?
6274 if self.new_node is not None:
6275 return self._ExecDrbd8Secondary()
6277 return self._ExecDrbd8DiskOnly()
6280 # Deactivate the instance disks if we're replacing them on a down instance
6282 _SafeShutdownInstanceDisks(self.lu, self.instance)
6284 def _CheckVolumeGroup(self, nodes):
6285 self.lu.LogInfo("Checking volume groups")
6287 vgname = self.cfg.GetVGName()
6289 # Make sure volume group exists on all involved nodes
6290 results = self.rpc.call_vg_list(nodes)
6292 raise errors.OpExecError("Can't list volume groups on the nodes")
6296 res.Raise("Error checking node %s" % node)
6297 if vgname not in res.payload:
6298 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6301 def _CheckDisksExistence(self, nodes):
6302 # Check disk existence
6303 for idx, dev in enumerate(self.instance.disks):
6304 if idx not in self.disks:
6308 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6309 self.cfg.SetDiskID(dev, node)
6311 result = self.rpc.call_blockdev_find(node, dev)
6313 msg = result.fail_msg
6314 if msg or not result.payload:
6316 msg = "disk not found"
6317 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6320 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6321 for idx, dev in enumerate(self.instance.disks):
6322 if idx not in self.disks:
6325 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6328 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6330 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6331 " replace disks for instance %s" %
6332 (node_name, self.instance.name))
6334 def _CreateNewStorage(self, node_name):
6335 vgname = self.cfg.GetVGName()
6338 for idx, dev in enumerate(self.instance.disks):
6339 if idx not in self.disks:
6342 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6344 self.cfg.SetDiskID(dev, node_name)
6346 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6347 names = _GenerateUniqueNames(self.lu, lv_names)
6349 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6350 logical_id=(vgname, names[0]))
6351 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6352 logical_id=(vgname, names[1]))
6354 new_lvs = [lv_data, lv_meta]
6355 old_lvs = dev.children
6356 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6358 # we pass force_create=True to force the LVM creation
6359 for new_lv in new_lvs:
6360 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6361 _GetInstanceInfoText(self.instance), False)
6365 def _CheckDevices(self, node_name, iv_names):
6366 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6367 self.cfg.SetDiskID(dev, node_name)
6369 result = self.rpc.call_blockdev_find(node_name, dev)
6371 msg = result.fail_msg
6372 if msg or not result.payload:
6374 msg = "disk not found"
6375 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6378 if result.payload.is_degraded:
6379 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6381 def _RemoveOldStorage(self, node_name, iv_names):
6382 for name, (dev, old_lvs, _) in iv_names.iteritems():
6383 self.lu.LogInfo("Remove logical volumes for %s" % name)
6386 self.cfg.SetDiskID(lv, node_name)
6388 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6390 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6391 hint="remove unused LVs manually")
6393 def _ExecDrbd8DiskOnly(self):
6394 """Replace a disk on the primary or secondary for DRBD 8.
6396 The algorithm for replace is quite complicated:
6398 1. for each disk to be replaced:
6400 1. create new LVs on the target node with unique names
6401 1. detach old LVs from the drbd device
6402 1. rename old LVs to name_replaced.<time_t>
6403 1. rename new LVs to old LVs
6404 1. attach the new LVs (with the old names now) to the drbd device
6406 1. wait for sync across all devices
6408 1. for each modified disk:
6410 1. remove old LVs (which have the name name_replaces.<time_t>)
6412 Failures are not very well handled.
6417 # Step: check device activation
6418 self.lu.LogStep(1, steps_total, "Check device existence")
6419 self._CheckDisksExistence([self.other_node, self.target_node])
6420 self._CheckVolumeGroup([self.target_node, self.other_node])
6422 # Step: check other node consistency
6423 self.lu.LogStep(2, steps_total, "Check peer consistency")
6424 self._CheckDisksConsistency(self.other_node,
6425 self.other_node == self.instance.primary_node,
6428 # Step: create new storage
6429 self.lu.LogStep(3, steps_total, "Allocate new storage")
6430 iv_names = self._CreateNewStorage(self.target_node)
6432 # Step: for each lv, detach+rename*2+attach
6433 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6434 for dev, old_lvs, new_lvs in iv_names.itervalues():
6435 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6437 result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs)
6438 result.Raise("Can't detach drbd from local storage on node"
6439 " %s for device %s" % (self.target_node, dev.iv_name))
6441 #cfg.Update(instance)
6443 # ok, we created the new LVs, so now we know we have the needed
6444 # storage; as such, we proceed on the target node to rename
6445 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6446 # using the assumption that logical_id == physical_id (which in
6447 # turn is the unique_id on that node)
6449 # FIXME(iustin): use a better name for the replaced LVs
6450 temp_suffix = int(time.time())
6451 ren_fn = lambda d, suff: (d.physical_id[0],
6452 d.physical_id[1] + "_replaced-%s" % suff)
6454 # Build the rename list based on what LVs exist on the node
6455 rename_old_to_new = []
6456 for to_ren in old_lvs:
6457 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6458 if not result.fail_msg and result.payload:
6460 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6462 self.lu.LogInfo("Renaming the old LVs on the target node")
6463 result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new)
6464 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6466 # Now we rename the new LVs to the old LVs
6467 self.lu.LogInfo("Renaming the new LVs on the target node")
6468 rename_new_to_old = [(new, old.physical_id)
6469 for old, new in zip(old_lvs, new_lvs)]
6470 result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old)
6471 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6473 for old, new in zip(old_lvs, new_lvs):
6474 new.logical_id = old.logical_id
6475 self.cfg.SetDiskID(new, self.target_node)
6477 for disk in old_lvs:
6478 disk.logical_id = ren_fn(disk, temp_suffix)
6479 self.cfg.SetDiskID(disk, self.target_node)
6481 # Now that the new lvs have the old name, we can add them to the device
6482 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6483 result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs)
6484 msg = result.fail_msg
6486 for new_lv in new_lvs:
6487 msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg
6489 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6490 hint=("cleanup manually the unused logical"
6492 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6494 dev.children = new_lvs
6496 self.cfg.Update(self.instance)
6499 # This can fail as the old devices are degraded and _WaitForSync
6500 # does a combined result over all disks, so we don't check its return value
6501 self.lu.LogStep(5, steps_total, "Sync devices")
6502 _WaitForSync(self.lu, self.instance, unlock=True)
6504 # Check all devices manually
6505 self._CheckDevices(self.instance.primary_node, iv_names)
6507 # Step: remove old storage
6508 self.lu.LogStep(6, steps_total, "Removing old storage")
6509 self._RemoveOldStorage(self.target_node, iv_names)
6511 def _ExecDrbd8Secondary(self):
6512 """Replace the secondary node for DRBD 8.
6514 The algorithm for replace is quite complicated:
6515 - for all disks of the instance:
6516 - create new LVs on the new node with same names
6517 - shutdown the drbd device on the old secondary
6518 - disconnect the drbd network on the primary
6519 - create the drbd device on the new secondary
6520 - network attach the drbd on the primary, using an artifice:
6521 the drbd code for Attach() will connect to the network if it
6522 finds a device which is connected to the good local disks but
6524 - wait for sync across all devices
6525 - remove all disks from the old secondary
6527 Failures are not very well handled.
6532 # Step: check device activation
6533 self.lu.LogStep(1, steps_total, "Check device existence")
6534 self._CheckDisksExistence([self.instance.primary_node])
6535 self._CheckVolumeGroup([self.instance.primary_node])
6537 # Step: check other node consistency
6538 self.lu.LogStep(2, steps_total, "Check peer consistency")
6539 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6541 # Step: create new storage
6542 self.lu.LogStep(3, steps_total, "Allocate new storage")
6543 for idx, dev in enumerate(self.instance.disks):
6544 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6545 (self.new_node, idx))
6546 # we pass force_create=True to force LVM creation
6547 for new_lv in dev.children:
6548 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6549 _GetInstanceInfoText(self.instance), False)
6551 # Step 4: dbrd minors and drbd setups changes
6552 # after this, we must manually remove the drbd minors on both the
6553 # error and the success paths
6554 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6555 minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks],
6557 logging.debug("Allocated minors %r" % (minors,))
6560 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6561 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx))
6562 # create new devices on new_node; note that we create two IDs:
6563 # one without port, so the drbd will be activated without
6564 # networking information on the new node at this stage, and one
6565 # with network, for the latter activation in step 4
6566 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6567 if self.instance.primary_node == o_node1:
6572 new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret)
6573 new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret)
6575 iv_names[idx] = (dev, dev.children, new_net_id)
6576 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6578 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6579 logical_id=new_alone_id,
6580 children=dev.children,
6583 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6584 _GetInstanceInfoText(self.instance), False)
6585 except errors.GenericError:
6586 self.cfg.ReleaseDRBDMinors(self.instance.name)
6589 # We have new devices, shutdown the drbd on the old secondary
6590 for idx, dev in enumerate(self.instance.disks):
6591 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6592 self.cfg.SetDiskID(dev, self.target_node)
6593 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6595 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6596 "node: %s" % (idx, msg),
6597 hint=("Please cleanup this device manually as"
6598 " soon as possible"))
6600 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6601 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip,
6602 self.instance.disks)[self.instance.primary_node]
6604 msg = result.fail_msg
6606 # detaches didn't succeed (unlikely)
6607 self.cfg.ReleaseDRBDMinors(self.instance.name)
6608 raise errors.OpExecError("Can't detach the disks from the network on"
6609 " old node: %s" % (msg,))
6611 # if we managed to detach at least one, we update all the disks of
6612 # the instance to point to the new secondary
6613 self.lu.LogInfo("Updating instance configuration")
6614 for dev, _, new_logical_id in iv_names.itervalues():
6615 dev.logical_id = new_logical_id
6616 self.cfg.SetDiskID(dev, self.instance.primary_node)
6618 self.cfg.Update(self.instance)
6620 # and now perform the drbd attach
6621 self.lu.LogInfo("Attaching primary drbds to new secondary"
6622 " (standalone => connected)")
6623 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip,
6624 self.instance.disks, self.instance.name,
6626 for to_node, to_result in result.items():
6627 msg = to_result.fail_msg
6629 self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg,
6630 hint=("please do a gnt-instance info to see the"
6631 " status of disks"))
6634 # This can fail as the old devices are degraded and _WaitForSync
6635 # does a combined result over all disks, so we don't check its return value
6636 self.lu.LogStep(5, steps_total, "Sync devices")
6637 _WaitForSync(self.lu, self.instance, unlock=True)
6639 # Check all devices manually
6640 self._CheckDevices(self.instance.primary_node, iv_names)
6642 # Step: remove old storage
6643 self.lu.LogStep(6, steps_total, "Removing old storage")
6644 self._RemoveOldStorage(self.target_node, iv_names)
6647 class LURepairNodeStorage(NoHooksLU):
6648 """Repairs the volume group on a node.
6651 _OP_REQP = ["node_name"]
6654 def CheckArguments(self):
6655 node_name = self.cfg.ExpandNodeName(self.op.node_name)
6656 if node_name is None:
6657 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
6659 self.op.node_name = node_name
6661 def ExpandNames(self):
6662 self.needed_locks = {
6663 locking.LEVEL_NODE: [self.op.node_name],
6666 def _CheckFaultyDisks(self, instance, node_name):
6667 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
6669 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
6670 " node '%s'" % (inst.name, node_name))
6672 def CheckPrereq(self):
6673 """Check prerequisites.
6676 storage_type = self.op.storage_type
6678 if (constants.SO_FIX_CONSISTENCY not in
6679 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
6680 raise errors.OpPrereqError("Storage units of type '%s' can not be"
6681 " repaired" % storage_type)
6683 # Check whether any instance on this node has faulty disks
6684 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
6685 check_nodes = set(inst.all_nodes)
6686 check_nodes.discard(self.op.node_name)
6687 for inst_node_name in check_nodes:
6688 self._CheckFaultyDisks(inst, inst_node_name)
6690 def Exec(self, feedback_fn):
6691 feedback_fn("Repairing storage unit '%s' on %s ..." %
6692 (self.op.name, self.op.node_name))
6694 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
6695 result = self.rpc.call_storage_execute(self.op.node_name,
6696 self.op.storage_type, st_args,
6698 constants.SO_FIX_CONSISTENCY)
6699 result.Raise("Failed to repair storage unit '%s' on %s" %
6700 (self.op.name, self.op.node_name))
6703 class LUGrowDisk(LogicalUnit):
6704 """Grow a disk of an instance.
6708 HTYPE = constants.HTYPE_INSTANCE
6709 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6712 def ExpandNames(self):
6713 self._ExpandAndLockInstance()
6714 self.needed_locks[locking.LEVEL_NODE] = []
6715 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6717 def DeclareLocks(self, level):
6718 if level == locking.LEVEL_NODE:
6719 self._LockInstancesNodes()
6721 def BuildHooksEnv(self):
6724 This runs on the master, the primary and all the secondaries.
6728 "DISK": self.op.disk,
6729 "AMOUNT": self.op.amount,
6731 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6733 self.cfg.GetMasterNode(),
6734 self.instance.primary_node,
6738 def CheckPrereq(self):
6739 """Check prerequisites.
6741 This checks that the instance is in the cluster.
6744 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6745 assert instance is not None, \
6746 "Cannot retrieve locked instance %s" % self.op.instance_name
6747 nodenames = list(instance.all_nodes)
6748 for node in nodenames:
6749 _CheckNodeOnline(self, node)
6752 self.instance = instance
6754 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6755 raise errors.OpPrereqError("Instance's disk layout does not support"
6758 self.disk = instance.FindDisk(self.op.disk)
6760 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6761 instance.hypervisor)
6762 for node in nodenames:
6763 info = nodeinfo[node]
6764 info.Raise("Cannot get current information from node %s" % node)
6765 vg_free = info.payload.get('vg_free', None)
6766 if not isinstance(vg_free, int):
6767 raise errors.OpPrereqError("Can't compute free disk space on"
6769 if self.op.amount > vg_free:
6770 raise errors.OpPrereqError("Not enough disk space on target node %s:"
6771 " %d MiB available, %d MiB required" %
6772 (node, vg_free, self.op.amount))
6774 def Exec(self, feedback_fn):
6775 """Execute disk grow.
6778 instance = self.instance
6780 for node in instance.all_nodes:
6781 self.cfg.SetDiskID(disk, node)
6782 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6783 result.Raise("Grow request failed to node %s" % node)
6784 disk.RecordGrow(self.op.amount)
6785 self.cfg.Update(instance)
6786 if self.op.wait_for_sync:
6787 disk_abort = not _WaitForSync(self, instance)
6789 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6790 " status.\nPlease check the instance.")
6793 class LUQueryInstanceData(NoHooksLU):
6794 """Query runtime instance data.
6797 _OP_REQP = ["instances", "static"]
6800 def ExpandNames(self):
6801 self.needed_locks = {}
6802 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6804 if not isinstance(self.op.instances, list):
6805 raise errors.OpPrereqError("Invalid argument type 'instances'")
6807 if self.op.instances:
6808 self.wanted_names = []
6809 for name in self.op.instances:
6810 full_name = self.cfg.ExpandInstanceName(name)
6811 if full_name is None:
6812 raise errors.OpPrereqError("Instance '%s' not known" % name)
6813 self.wanted_names.append(full_name)
6814 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6816 self.wanted_names = None
6817 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6819 self.needed_locks[locking.LEVEL_NODE] = []
6820 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6822 def DeclareLocks(self, level):
6823 if level == locking.LEVEL_NODE:
6824 self._LockInstancesNodes()
6826 def CheckPrereq(self):
6827 """Check prerequisites.
6829 This only checks the optional instance list against the existing names.
6832 if self.wanted_names is None:
6833 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
6835 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
6836 in self.wanted_names]
6839 def _ComputeBlockdevStatus(self, node, instance_name, dev):
6840 """Returns the status of a block device
6843 if self.op.static or not node:
6846 self.cfg.SetDiskID(dev, node)
6848 result = self.rpc.call_blockdev_find(node, dev)
6852 result.Raise("Can't compute disk status for %s" % instance_name)
6854 status = result.payload
6858 return (status.dev_path, status.major, status.minor,
6859 status.sync_percent, status.estimated_time,
6860 status.is_degraded, status.ldisk_status)
6862 def _ComputeDiskStatus(self, instance, snode, dev):
6863 """Compute block device status.
6866 if dev.dev_type in constants.LDS_DRBD:
6867 # we change the snode then (otherwise we use the one passed in)
6868 if dev.logical_id[0] == instance.primary_node:
6869 snode = dev.logical_id[1]
6871 snode = dev.logical_id[0]
6873 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
6875 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
6878 dev_children = [self._ComputeDiskStatus(instance, snode, child)
6879 for child in dev.children]
6884 "iv_name": dev.iv_name,
6885 "dev_type": dev.dev_type,
6886 "logical_id": dev.logical_id,
6887 "physical_id": dev.physical_id,
6888 "pstatus": dev_pstatus,
6889 "sstatus": dev_sstatus,
6890 "children": dev_children,
6897 def Exec(self, feedback_fn):
6898 """Gather and return data"""
6901 cluster = self.cfg.GetClusterInfo()
6903 for instance in self.wanted_instances:
6904 if not self.op.static:
6905 remote_info = self.rpc.call_instance_info(instance.primary_node,
6907 instance.hypervisor)
6908 remote_info.Raise("Error checking node %s" % instance.primary_node)
6909 remote_info = remote_info.payload
6910 if remote_info and "state" in remote_info:
6913 remote_state = "down"
6916 if instance.admin_up:
6919 config_state = "down"
6921 disks = [self._ComputeDiskStatus(instance, None, device)
6922 for device in instance.disks]
6925 "name": instance.name,
6926 "config_state": config_state,
6927 "run_state": remote_state,
6928 "pnode": instance.primary_node,
6929 "snodes": instance.secondary_nodes,
6931 # this happens to be the same format used for hooks
6932 "nics": _NICListToTuple(self, instance.nics),
6934 "hypervisor": instance.hypervisor,
6935 "network_port": instance.network_port,
6936 "hv_instance": instance.hvparams,
6937 "hv_actual": cluster.FillHV(instance),
6938 "be_instance": instance.beparams,
6939 "be_actual": cluster.FillBE(instance),
6940 "serial_no": instance.serial_no,
6941 "mtime": instance.mtime,
6942 "ctime": instance.ctime,
6945 result[instance.name] = idict
6950 class LUSetInstanceParams(LogicalUnit):
6951 """Modifies an instances's parameters.
6954 HPATH = "instance-modify"
6955 HTYPE = constants.HTYPE_INSTANCE
6956 _OP_REQP = ["instance_name"]
6959 def CheckArguments(self):
6960 if not hasattr(self.op, 'nics'):
6962 if not hasattr(self.op, 'disks'):
6964 if not hasattr(self.op, 'beparams'):
6965 self.op.beparams = {}
6966 if not hasattr(self.op, 'hvparams'):
6967 self.op.hvparams = {}
6968 self.op.force = getattr(self.op, "force", False)
6969 if not (self.op.nics or self.op.disks or
6970 self.op.hvparams or self.op.beparams):
6971 raise errors.OpPrereqError("No changes submitted")
6975 for disk_op, disk_dict in self.op.disks:
6976 if disk_op == constants.DDM_REMOVE:
6979 elif disk_op == constants.DDM_ADD:
6982 if not isinstance(disk_op, int):
6983 raise errors.OpPrereqError("Invalid disk index")
6984 if not isinstance(disk_dict, dict):
6985 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
6986 raise errors.OpPrereqError(msg)
6988 if disk_op == constants.DDM_ADD:
6989 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
6990 if mode not in constants.DISK_ACCESS_SET:
6991 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
6992 size = disk_dict.get('size', None)
6994 raise errors.OpPrereqError("Required disk parameter size missing")
6997 except ValueError, err:
6998 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7000 disk_dict['size'] = size
7002 # modification of disk
7003 if 'size' in disk_dict:
7004 raise errors.OpPrereqError("Disk size change not possible, use"
7007 if disk_addremove > 1:
7008 raise errors.OpPrereqError("Only one disk add or remove operation"
7009 " supported at a time")
7013 for nic_op, nic_dict in self.op.nics:
7014 if nic_op == constants.DDM_REMOVE:
7017 elif nic_op == constants.DDM_ADD:
7020 if not isinstance(nic_op, int):
7021 raise errors.OpPrereqError("Invalid nic index")
7022 if not isinstance(nic_dict, dict):
7023 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7024 raise errors.OpPrereqError(msg)
7026 # nic_dict should be a dict
7027 nic_ip = nic_dict.get('ip', None)
7028 if nic_ip is not None:
7029 if nic_ip.lower() == constants.VALUE_NONE:
7030 nic_dict['ip'] = None
7032 if not utils.IsValidIP(nic_ip):
7033 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
7035 nic_bridge = nic_dict.get('bridge', None)
7036 nic_link = nic_dict.get('link', None)
7037 if nic_bridge and nic_link:
7038 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7039 " at the same time")
7040 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7041 nic_dict['bridge'] = None
7042 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7043 nic_dict['link'] = None
7045 if nic_op == constants.DDM_ADD:
7046 nic_mac = nic_dict.get('mac', None)
7048 nic_dict['mac'] = constants.VALUE_AUTO
7050 if 'mac' in nic_dict:
7051 nic_mac = nic_dict['mac']
7052 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7053 if not utils.IsValidMac(nic_mac):
7054 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
7055 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7056 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7057 " modifying an existing nic")
7059 if nic_addremove > 1:
7060 raise errors.OpPrereqError("Only one NIC add or remove operation"
7061 " supported at a time")
7063 def ExpandNames(self):
7064 self._ExpandAndLockInstance()
7065 self.needed_locks[locking.LEVEL_NODE] = []
7066 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7068 def DeclareLocks(self, level):
7069 if level == locking.LEVEL_NODE:
7070 self._LockInstancesNodes()
7072 def BuildHooksEnv(self):
7075 This runs on the master, primary and secondaries.
7079 if constants.BE_MEMORY in self.be_new:
7080 args['memory'] = self.be_new[constants.BE_MEMORY]
7081 if constants.BE_VCPUS in self.be_new:
7082 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7083 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7084 # information at all.
7087 nic_override = dict(self.op.nics)
7088 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7089 for idx, nic in enumerate(self.instance.nics):
7090 if idx in nic_override:
7091 this_nic_override = nic_override[idx]
7093 this_nic_override = {}
7094 if 'ip' in this_nic_override:
7095 ip = this_nic_override['ip']
7098 if 'mac' in this_nic_override:
7099 mac = this_nic_override['mac']
7102 if idx in self.nic_pnew:
7103 nicparams = self.nic_pnew[idx]
7105 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7106 mode = nicparams[constants.NIC_MODE]
7107 link = nicparams[constants.NIC_LINK]
7108 args['nics'].append((ip, mac, mode, link))
7109 if constants.DDM_ADD in nic_override:
7110 ip = nic_override[constants.DDM_ADD].get('ip', None)
7111 mac = nic_override[constants.DDM_ADD]['mac']
7112 nicparams = self.nic_pnew[constants.DDM_ADD]
7113 mode = nicparams[constants.NIC_MODE]
7114 link = nicparams[constants.NIC_LINK]
7115 args['nics'].append((ip, mac, mode, link))
7116 elif constants.DDM_REMOVE in nic_override:
7117 del args['nics'][-1]
7119 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7120 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7123 def _GetUpdatedParams(self, old_params, update_dict,
7124 default_values, parameter_types):
7125 """Return the new params dict for the given params.
7127 @type old_params: dict
7128 @param old_params: old parameters
7129 @type update_dict: dict
7130 @param update_dict: dict containing new parameter values,
7131 or constants.VALUE_DEFAULT to reset the
7132 parameter to its default value
7133 @type default_values: dict
7134 @param default_values: default values for the filled parameters
7135 @type parameter_types: dict
7136 @param parameter_types: dict mapping target dict keys to types
7137 in constants.ENFORCEABLE_TYPES
7138 @rtype: (dict, dict)
7139 @return: (new_parameters, filled_parameters)
7142 params_copy = copy.deepcopy(old_params)
7143 for key, val in update_dict.iteritems():
7144 if val == constants.VALUE_DEFAULT:
7146 del params_copy[key]
7150 params_copy[key] = val
7151 utils.ForceDictType(params_copy, parameter_types)
7152 params_filled = objects.FillDict(default_values, params_copy)
7153 return (params_copy, params_filled)
7155 def CheckPrereq(self):
7156 """Check prerequisites.
7158 This only checks the instance list against the existing names.
7161 self.force = self.op.force
7163 # checking the new params on the primary/secondary nodes
7165 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7166 cluster = self.cluster = self.cfg.GetClusterInfo()
7167 assert self.instance is not None, \
7168 "Cannot retrieve locked instance %s" % self.op.instance_name
7169 pnode = instance.primary_node
7170 nodelist = list(instance.all_nodes)
7172 # hvparams processing
7173 if self.op.hvparams:
7174 i_hvdict, hv_new = self._GetUpdatedParams(
7175 instance.hvparams, self.op.hvparams,
7176 cluster.hvparams[instance.hypervisor],
7177 constants.HVS_PARAMETER_TYPES)
7179 hypervisor.GetHypervisor(
7180 instance.hypervisor).CheckParameterSyntax(hv_new)
7181 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7182 self.hv_new = hv_new # the new actual values
7183 self.hv_inst = i_hvdict # the new dict (without defaults)
7185 self.hv_new = self.hv_inst = {}
7187 # beparams processing
7188 if self.op.beparams:
7189 i_bedict, be_new = self._GetUpdatedParams(
7190 instance.beparams, self.op.beparams,
7191 cluster.beparams[constants.PP_DEFAULT],
7192 constants.BES_PARAMETER_TYPES)
7193 self.be_new = be_new # the new actual values
7194 self.be_inst = i_bedict # the new dict (without defaults)
7196 self.be_new = self.be_inst = {}
7200 if constants.BE_MEMORY in self.op.beparams and not self.force:
7201 mem_check_list = [pnode]
7202 if be_new[constants.BE_AUTO_BALANCE]:
7203 # either we changed auto_balance to yes or it was from before
7204 mem_check_list.extend(instance.secondary_nodes)
7205 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7206 instance.hypervisor)
7207 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7208 instance.hypervisor)
7209 pninfo = nodeinfo[pnode]
7210 msg = pninfo.fail_msg
7212 # Assume the primary node is unreachable and go ahead
7213 self.warn.append("Can't get info from primary node %s: %s" %
7215 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7216 self.warn.append("Node data from primary node %s doesn't contain"
7217 " free memory information" % pnode)
7218 elif instance_info.fail_msg:
7219 self.warn.append("Can't get instance runtime information: %s" %
7220 instance_info.fail_msg)
7222 if instance_info.payload:
7223 current_mem = int(instance_info.payload['memory'])
7225 # Assume instance not running
7226 # (there is a slight race condition here, but it's not very probable,
7227 # and we have no other way to check)
7229 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7230 pninfo.payload['memory_free'])
7232 raise errors.OpPrereqError("This change will prevent the instance"
7233 " from starting, due to %d MB of memory"
7234 " missing on its primary node" % miss_mem)
7236 if be_new[constants.BE_AUTO_BALANCE]:
7237 for node, nres in nodeinfo.items():
7238 if node not in instance.secondary_nodes:
7242 self.warn.append("Can't get info from secondary node %s: %s" %
7244 elif not isinstance(nres.payload.get('memory_free', None), int):
7245 self.warn.append("Secondary node %s didn't return free"
7246 " memory information" % node)
7247 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7248 self.warn.append("Not enough memory to failover instance to"
7249 " secondary node %s" % node)
7254 for nic_op, nic_dict in self.op.nics:
7255 if nic_op == constants.DDM_REMOVE:
7256 if not instance.nics:
7257 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
7259 if nic_op != constants.DDM_ADD:
7261 if nic_op < 0 or nic_op >= len(instance.nics):
7262 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7264 (nic_op, len(instance.nics)))
7265 old_nic_params = instance.nics[nic_op].nicparams
7266 old_nic_ip = instance.nics[nic_op].ip
7271 update_params_dict = dict([(key, nic_dict[key])
7272 for key in constants.NICS_PARAMETERS
7273 if key in nic_dict])
7275 if 'bridge' in nic_dict:
7276 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7278 new_nic_params, new_filled_nic_params = \
7279 self._GetUpdatedParams(old_nic_params, update_params_dict,
7280 cluster.nicparams[constants.PP_DEFAULT],
7281 constants.NICS_PARAMETER_TYPES)
7282 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7283 self.nic_pinst[nic_op] = new_nic_params
7284 self.nic_pnew[nic_op] = new_filled_nic_params
7285 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7287 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7288 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7289 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7291 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7293 self.warn.append(msg)
7295 raise errors.OpPrereqError(msg)
7296 if new_nic_mode == constants.NIC_MODE_ROUTED:
7297 if 'ip' in nic_dict:
7298 nic_ip = nic_dict['ip']
7302 raise errors.OpPrereqError('Cannot set the nic ip to None'
7304 if 'mac' in nic_dict:
7305 nic_mac = nic_dict['mac']
7307 raise errors.OpPrereqError('Cannot set the nic mac to None')
7308 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7309 # otherwise generate the mac
7310 nic_dict['mac'] = self.cfg.GenerateMAC()
7312 # or validate/reserve the current one
7313 if self.cfg.IsMacInUse(nic_mac):
7314 raise errors.OpPrereqError("MAC address %s already in use"
7315 " in cluster" % nic_mac)
7318 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7319 raise errors.OpPrereqError("Disk operations not supported for"
7320 " diskless instances")
7321 for disk_op, disk_dict in self.op.disks:
7322 if disk_op == constants.DDM_REMOVE:
7323 if len(instance.disks) == 1:
7324 raise errors.OpPrereqError("Cannot remove the last disk of"
7326 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7327 ins_l = ins_l[pnode]
7328 msg = ins_l.fail_msg
7330 raise errors.OpPrereqError("Can't contact node %s: %s" %
7332 if instance.name in ins_l.payload:
7333 raise errors.OpPrereqError("Instance is running, can't remove"
7336 if (disk_op == constants.DDM_ADD and
7337 len(instance.nics) >= constants.MAX_DISKS):
7338 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7339 " add more" % constants.MAX_DISKS)
7340 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7342 if disk_op < 0 or disk_op >= len(instance.disks):
7343 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7345 (disk_op, len(instance.disks)))
7349 def Exec(self, feedback_fn):
7350 """Modifies an instance.
7352 All parameters take effect only at the next restart of the instance.
7355 # Process here the warnings from CheckPrereq, as we don't have a
7356 # feedback_fn there.
7357 for warn in self.warn:
7358 feedback_fn("WARNING: %s" % warn)
7361 instance = self.instance
7362 cluster = self.cluster
7364 for disk_op, disk_dict in self.op.disks:
7365 if disk_op == constants.DDM_REMOVE:
7366 # remove the last disk
7367 device = instance.disks.pop()
7368 device_idx = len(instance.disks)
7369 for node, disk in device.ComputeNodeTree(instance.primary_node):
7370 self.cfg.SetDiskID(disk, node)
7371 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7373 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7374 " continuing anyway", device_idx, node, msg)
7375 result.append(("disk/%d" % device_idx, "remove"))
7376 elif disk_op == constants.DDM_ADD:
7378 if instance.disk_template == constants.DT_FILE:
7379 file_driver, file_path = instance.disks[0].logical_id
7380 file_path = os.path.dirname(file_path)
7382 file_driver = file_path = None
7383 disk_idx_base = len(instance.disks)
7384 new_disk = _GenerateDiskTemplate(self,
7385 instance.disk_template,
7386 instance.name, instance.primary_node,
7387 instance.secondary_nodes,
7392 instance.disks.append(new_disk)
7393 info = _GetInstanceInfoText(instance)
7395 logging.info("Creating volume %s for instance %s",
7396 new_disk.iv_name, instance.name)
7397 # Note: this needs to be kept in sync with _CreateDisks
7399 for node in instance.all_nodes:
7400 f_create = node == instance.primary_node
7402 _CreateBlockDev(self, node, instance, new_disk,
7403 f_create, info, f_create)
7404 except errors.OpExecError, err:
7405 self.LogWarning("Failed to create volume %s (%s) on"
7407 new_disk.iv_name, new_disk, node, err)
7408 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7409 (new_disk.size, new_disk.mode)))
7411 # change a given disk
7412 instance.disks[disk_op].mode = disk_dict['mode']
7413 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7415 for nic_op, nic_dict in self.op.nics:
7416 if nic_op == constants.DDM_REMOVE:
7417 # remove the last nic
7418 del instance.nics[-1]
7419 result.append(("nic.%d" % len(instance.nics), "remove"))
7420 elif nic_op == constants.DDM_ADD:
7421 # mac and bridge should be set, by now
7422 mac = nic_dict['mac']
7423 ip = nic_dict.get('ip', None)
7424 nicparams = self.nic_pinst[constants.DDM_ADD]
7425 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7426 instance.nics.append(new_nic)
7427 result.append(("nic.%d" % (len(instance.nics) - 1),
7428 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7429 (new_nic.mac, new_nic.ip,
7430 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7431 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7434 for key in 'mac', 'ip':
7436 setattr(instance.nics[nic_op], key, nic_dict[key])
7437 if nic_op in self.nic_pnew:
7438 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
7439 for key, val in nic_dict.iteritems():
7440 result.append(("nic.%s/%d" % (key, nic_op), val))
7443 if self.op.hvparams:
7444 instance.hvparams = self.hv_inst
7445 for key, val in self.op.hvparams.iteritems():
7446 result.append(("hv/%s" % key, val))
7449 if self.op.beparams:
7450 instance.beparams = self.be_inst
7451 for key, val in self.op.beparams.iteritems():
7452 result.append(("be/%s" % key, val))
7454 self.cfg.Update(instance)
7459 class LUQueryExports(NoHooksLU):
7460 """Query the exports list
7463 _OP_REQP = ['nodes']
7466 def ExpandNames(self):
7467 self.needed_locks = {}
7468 self.share_locks[locking.LEVEL_NODE] = 1
7469 if not self.op.nodes:
7470 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7472 self.needed_locks[locking.LEVEL_NODE] = \
7473 _GetWantedNodes(self, self.op.nodes)
7475 def CheckPrereq(self):
7476 """Check prerequisites.
7479 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7481 def Exec(self, feedback_fn):
7482 """Compute the list of all the exported system images.
7485 @return: a dictionary with the structure node->(export-list)
7486 where export-list is a list of the instances exported on
7490 rpcresult = self.rpc.call_export_list(self.nodes)
7492 for node in rpcresult:
7493 if rpcresult[node].fail_msg:
7494 result[node] = False
7496 result[node] = rpcresult[node].payload
7501 class LUExportInstance(LogicalUnit):
7502 """Export an instance to an image in the cluster.
7505 HPATH = "instance-export"
7506 HTYPE = constants.HTYPE_INSTANCE
7507 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7510 def ExpandNames(self):
7511 self._ExpandAndLockInstance()
7512 # FIXME: lock only instance primary and destination node
7514 # Sad but true, for now we have do lock all nodes, as we don't know where
7515 # the previous export might be, and and in this LU we search for it and
7516 # remove it from its current node. In the future we could fix this by:
7517 # - making a tasklet to search (share-lock all), then create the new one,
7518 # then one to remove, after
7519 # - removing the removal operation altogether
7520 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7522 def DeclareLocks(self, level):
7523 """Last minute lock declaration."""
7524 # All nodes are locked anyway, so nothing to do here.
7526 def BuildHooksEnv(self):
7529 This will run on the master, primary node and target node.
7533 "EXPORT_NODE": self.op.target_node,
7534 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7536 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7537 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7538 self.op.target_node]
7541 def CheckPrereq(self):
7542 """Check prerequisites.
7544 This checks that the instance and node names are valid.
7547 instance_name = self.op.instance_name
7548 self.instance = self.cfg.GetInstanceInfo(instance_name)
7549 assert self.instance is not None, \
7550 "Cannot retrieve locked instance %s" % self.op.instance_name
7551 _CheckNodeOnline(self, self.instance.primary_node)
7553 self.dst_node = self.cfg.GetNodeInfo(
7554 self.cfg.ExpandNodeName(self.op.target_node))
7556 if self.dst_node is None:
7557 # This is wrong node name, not a non-locked node
7558 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
7559 _CheckNodeOnline(self, self.dst_node.name)
7560 _CheckNodeNotDrained(self, self.dst_node.name)
7562 # instance disk type verification
7563 for disk in self.instance.disks:
7564 if disk.dev_type == constants.LD_FILE:
7565 raise errors.OpPrereqError("Export not supported for instances with"
7566 " file-based disks")
7568 def Exec(self, feedback_fn):
7569 """Export an instance to an image in the cluster.
7572 instance = self.instance
7573 dst_node = self.dst_node
7574 src_node = instance.primary_node
7575 if self.op.shutdown:
7576 # shutdown the instance, but not the disks
7577 result = self.rpc.call_instance_shutdown(src_node, instance)
7578 result.Raise("Could not shutdown instance %s on"
7579 " node %s" % (instance.name, src_node))
7581 vgname = self.cfg.GetVGName()
7585 # set the disks ID correctly since call_instance_start needs the
7586 # correct drbd minor to create the symlinks
7587 for disk in instance.disks:
7588 self.cfg.SetDiskID(disk, src_node)
7593 for idx, disk in enumerate(instance.disks):
7594 # result.payload will be a snapshot of an lvm leaf of the one we passed
7595 result = self.rpc.call_blockdev_snapshot(src_node, disk)
7596 msg = result.fail_msg
7598 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7600 snap_disks.append(False)
7602 disk_id = (vgname, result.payload)
7603 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7604 logical_id=disk_id, physical_id=disk_id,
7605 iv_name=disk.iv_name)
7606 snap_disks.append(new_dev)
7609 if self.op.shutdown and instance.admin_up:
7610 result = self.rpc.call_instance_start(src_node, instance, None, None)
7611 msg = result.fail_msg
7613 _ShutdownInstanceDisks(self, instance)
7614 raise errors.OpExecError("Could not start instance: %s" % msg)
7616 # TODO: check for size
7618 cluster_name = self.cfg.GetClusterName()
7619 for idx, dev in enumerate(snap_disks):
7621 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7622 instance, cluster_name, idx)
7623 msg = result.fail_msg
7625 self.LogWarning("Could not export disk/%s from node %s to"
7626 " node %s: %s", idx, src_node, dst_node.name, msg)
7627 dresults.append(False)
7629 dresults.append(True)
7630 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7632 self.LogWarning("Could not remove snapshot for disk/%d from node"
7633 " %s: %s", idx, src_node, msg)
7635 dresults.append(False)
7637 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7639 msg = result.fail_msg
7641 self.LogWarning("Could not finalize export for instance %s"
7642 " on node %s: %s", instance.name, dst_node.name, msg)
7645 nodelist = self.cfg.GetNodeList()
7646 nodelist.remove(dst_node.name)
7648 # on one-node clusters nodelist will be empty after the removal
7649 # if we proceed the backup would be removed because OpQueryExports
7650 # substitutes an empty list with the full cluster node list.
7651 iname = instance.name
7653 exportlist = self.rpc.call_export_list(nodelist)
7654 for node in exportlist:
7655 if exportlist[node].fail_msg:
7657 if iname in exportlist[node].payload:
7658 msg = self.rpc.call_export_remove(node, iname).fail_msg
7660 self.LogWarning("Could not remove older export for instance %s"
7661 " on node %s: %s", iname, node, msg)
7662 return fin_resu, dresults
7665 class LURemoveExport(NoHooksLU):
7666 """Remove exports related to the named instance.
7669 _OP_REQP = ["instance_name"]
7672 def ExpandNames(self):
7673 self.needed_locks = {}
7674 # We need all nodes to be locked in order for RemoveExport to work, but we
7675 # don't need to lock the instance itself, as nothing will happen to it (and
7676 # we can remove exports also for a removed instance)
7677 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7679 def CheckPrereq(self):
7680 """Check prerequisites.
7684 def Exec(self, feedback_fn):
7685 """Remove any export.
7688 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7689 # If the instance was not found we'll try with the name that was passed in.
7690 # This will only work if it was an FQDN, though.
7692 if not instance_name:
7694 instance_name = self.op.instance_name
7696 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7697 exportlist = self.rpc.call_export_list(locked_nodes)
7699 for node in exportlist:
7700 msg = exportlist[node].fail_msg
7702 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7704 if instance_name in exportlist[node].payload:
7706 result = self.rpc.call_export_remove(node, instance_name)
7707 msg = result.fail_msg
7709 logging.error("Could not remove export for instance %s"
7710 " on node %s: %s", instance_name, node, msg)
7712 if fqdn_warn and not found:
7713 feedback_fn("Export not found. If trying to remove an export belonging"
7714 " to a deleted instance please use its Fully Qualified"
7718 class TagsLU(NoHooksLU):
7721 This is an abstract class which is the parent of all the other tags LUs.
7725 def ExpandNames(self):
7726 self.needed_locks = {}
7727 if self.op.kind == constants.TAG_NODE:
7728 name = self.cfg.ExpandNodeName(self.op.name)
7730 raise errors.OpPrereqError("Invalid node name (%s)" %
7733 self.needed_locks[locking.LEVEL_NODE] = name
7734 elif self.op.kind == constants.TAG_INSTANCE:
7735 name = self.cfg.ExpandInstanceName(self.op.name)
7737 raise errors.OpPrereqError("Invalid instance name (%s)" %
7740 self.needed_locks[locking.LEVEL_INSTANCE] = name
7742 def CheckPrereq(self):
7743 """Check prerequisites.
7746 if self.op.kind == constants.TAG_CLUSTER:
7747 self.target = self.cfg.GetClusterInfo()
7748 elif self.op.kind == constants.TAG_NODE:
7749 self.target = self.cfg.GetNodeInfo(self.op.name)
7750 elif self.op.kind == constants.TAG_INSTANCE:
7751 self.target = self.cfg.GetInstanceInfo(self.op.name)
7753 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7757 class LUGetTags(TagsLU):
7758 """Returns the tags of a given object.
7761 _OP_REQP = ["kind", "name"]
7764 def Exec(self, feedback_fn):
7765 """Returns the tag list.
7768 return list(self.target.GetTags())
7771 class LUSearchTags(NoHooksLU):
7772 """Searches the tags for a given pattern.
7775 _OP_REQP = ["pattern"]
7778 def ExpandNames(self):
7779 self.needed_locks = {}
7781 def CheckPrereq(self):
7782 """Check prerequisites.
7784 This checks the pattern passed for validity by compiling it.
7788 self.re = re.compile(self.op.pattern)
7789 except re.error, err:
7790 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
7791 (self.op.pattern, err))
7793 def Exec(self, feedback_fn):
7794 """Returns the tag list.
7798 tgts = [("/cluster", cfg.GetClusterInfo())]
7799 ilist = cfg.GetAllInstancesInfo().values()
7800 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7801 nlist = cfg.GetAllNodesInfo().values()
7802 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7804 for path, target in tgts:
7805 for tag in target.GetTags():
7806 if self.re.search(tag):
7807 results.append((path, tag))
7811 class LUAddTags(TagsLU):
7812 """Sets a tag on a given object.
7815 _OP_REQP = ["kind", "name", "tags"]
7818 def CheckPrereq(self):
7819 """Check prerequisites.
7821 This checks the type and length of the tag name and value.
7824 TagsLU.CheckPrereq(self)
7825 for tag in self.op.tags:
7826 objects.TaggableObject.ValidateTag(tag)
7828 def Exec(self, feedback_fn):
7833 for tag in self.op.tags:
7834 self.target.AddTag(tag)
7835 except errors.TagError, err:
7836 raise errors.OpExecError("Error while setting tag: %s" % str(err))
7838 self.cfg.Update(self.target)
7839 except errors.ConfigurationError:
7840 raise errors.OpRetryError("There has been a modification to the"
7841 " config file and the operation has been"
7842 " aborted. Please retry.")
7845 class LUDelTags(TagsLU):
7846 """Delete a list of tags from a given object.
7849 _OP_REQP = ["kind", "name", "tags"]
7852 def CheckPrereq(self):
7853 """Check prerequisites.
7855 This checks that we have the given tag.
7858 TagsLU.CheckPrereq(self)
7859 for tag in self.op.tags:
7860 objects.TaggableObject.ValidateTag(tag)
7861 del_tags = frozenset(self.op.tags)
7862 cur_tags = self.target.GetTags()
7863 if not del_tags <= cur_tags:
7864 diff_tags = del_tags - cur_tags
7865 diff_names = ["'%s'" % tag for tag in diff_tags]
7867 raise errors.OpPrereqError("Tag(s) %s not found" %
7868 (",".join(diff_names)))
7870 def Exec(self, feedback_fn):
7871 """Remove the tag from the object.
7874 for tag in self.op.tags:
7875 self.target.RemoveTag(tag)
7877 self.cfg.Update(self.target)
7878 except errors.ConfigurationError:
7879 raise errors.OpRetryError("There has been a modification to the"
7880 " config file and the operation has been"
7881 " aborted. Please retry.")
7884 class LUTestDelay(NoHooksLU):
7885 """Sleep for a specified amount of time.
7887 This LU sleeps on the master and/or nodes for a specified amount of
7891 _OP_REQP = ["duration", "on_master", "on_nodes"]
7894 def ExpandNames(self):
7895 """Expand names and set required locks.
7897 This expands the node list, if any.
7900 self.needed_locks = {}
7901 if self.op.on_nodes:
7902 # _GetWantedNodes can be used here, but is not always appropriate to use
7903 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
7905 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
7906 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
7908 def CheckPrereq(self):
7909 """Check prerequisites.
7913 def Exec(self, feedback_fn):
7914 """Do the actual sleep.
7917 if self.op.on_master:
7918 if not utils.TestDelay(self.op.duration):
7919 raise errors.OpExecError("Error during master delay test")
7920 if self.op.on_nodes:
7921 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
7922 for node, node_result in result.items():
7923 node_result.Raise("Failure during rpc call to node %s" % node)
7926 class IAllocator(object):
7927 """IAllocator framework.
7929 An IAllocator instance has three sets of attributes:
7930 - cfg that is needed to query the cluster
7931 - input data (all members of the _KEYS class attribute are required)
7932 - four buffer attributes (in|out_data|text), that represent the
7933 input (to the external script) in text and data structure format,
7934 and the output from it, again in two formats
7935 - the result variables from the script (success, info, nodes) for
7940 "mem_size", "disks", "disk_template",
7941 "os", "tags", "nics", "vcpus", "hypervisor",
7947 def __init__(self, cfg, rpc, mode, name, **kwargs):
7950 # init buffer variables
7951 self.in_text = self.out_text = self.in_data = self.out_data = None
7952 # init all input fields so that pylint is happy
7955 self.mem_size = self.disks = self.disk_template = None
7956 self.os = self.tags = self.nics = self.vcpus = None
7957 self.hypervisor = None
7958 self.relocate_from = None
7960 self.required_nodes = None
7961 # init result fields
7962 self.success = self.info = self.nodes = None
7963 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7964 keyset = self._ALLO_KEYS
7965 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7966 keyset = self._RELO_KEYS
7968 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
7969 " IAllocator" % self.mode)
7971 if key not in keyset:
7972 raise errors.ProgrammerError("Invalid input parameter '%s' to"
7973 " IAllocator" % key)
7974 setattr(self, key, kwargs[key])
7976 if key not in kwargs:
7977 raise errors.ProgrammerError("Missing input parameter '%s' to"
7978 " IAllocator" % key)
7979 self._BuildInputData()
7981 def _ComputeClusterData(self):
7982 """Compute the generic allocator input data.
7984 This is the data that is independent of the actual operation.
7988 cluster_info = cfg.GetClusterInfo()
7991 "version": constants.IALLOCATOR_VERSION,
7992 "cluster_name": cfg.GetClusterName(),
7993 "cluster_tags": list(cluster_info.GetTags()),
7994 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
7995 # we don't have job IDs
7997 iinfo = cfg.GetAllInstancesInfo().values()
7998 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8002 node_list = cfg.GetNodeList()
8004 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8005 hypervisor_name = self.hypervisor
8006 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8007 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8009 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8012 self.rpc.call_all_instances_info(node_list,
8013 cluster_info.enabled_hypervisors)
8014 for nname, nresult in node_data.items():
8015 # first fill in static (config-based) values
8016 ninfo = cfg.GetNodeInfo(nname)
8018 "tags": list(ninfo.GetTags()),
8019 "primary_ip": ninfo.primary_ip,
8020 "secondary_ip": ninfo.secondary_ip,
8021 "offline": ninfo.offline,
8022 "drained": ninfo.drained,
8023 "master_candidate": ninfo.master_candidate,
8026 if not (ninfo.offline or ninfo.drained):
8027 nresult.Raise("Can't get data for node %s" % nname)
8028 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8030 remote_info = nresult.payload
8032 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8033 'vg_size', 'vg_free', 'cpu_total']:
8034 if attr not in remote_info:
8035 raise errors.OpExecError("Node '%s' didn't return attribute"
8036 " '%s'" % (nname, attr))
8037 if not isinstance(remote_info[attr], int):
8038 raise errors.OpExecError("Node '%s' returned invalid value"
8040 (nname, attr, remote_info[attr]))
8041 # compute memory used by primary instances
8042 i_p_mem = i_p_up_mem = 0
8043 for iinfo, beinfo in i_list:
8044 if iinfo.primary_node == nname:
8045 i_p_mem += beinfo[constants.BE_MEMORY]
8046 if iinfo.name not in node_iinfo[nname].payload:
8049 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8050 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8051 remote_info['memory_free'] -= max(0, i_mem_diff)
8054 i_p_up_mem += beinfo[constants.BE_MEMORY]
8056 # compute memory used by instances
8058 "total_memory": remote_info['memory_total'],
8059 "reserved_memory": remote_info['memory_dom0'],
8060 "free_memory": remote_info['memory_free'],
8061 "total_disk": remote_info['vg_size'],
8062 "free_disk": remote_info['vg_free'],
8063 "total_cpus": remote_info['cpu_total'],
8064 "i_pri_memory": i_p_mem,
8065 "i_pri_up_memory": i_p_up_mem,
8069 node_results[nname] = pnr
8070 data["nodes"] = node_results
8074 for iinfo, beinfo in i_list:
8076 for nic in iinfo.nics:
8077 filled_params = objects.FillDict(
8078 cluster_info.nicparams[constants.PP_DEFAULT],
8080 nic_dict = {"mac": nic.mac,
8082 "mode": filled_params[constants.NIC_MODE],
8083 "link": filled_params[constants.NIC_LINK],
8085 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8086 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8087 nic_data.append(nic_dict)
8089 "tags": list(iinfo.GetTags()),
8090 "admin_up": iinfo.admin_up,
8091 "vcpus": beinfo[constants.BE_VCPUS],
8092 "memory": beinfo[constants.BE_MEMORY],
8094 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8096 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8097 "disk_template": iinfo.disk_template,
8098 "hypervisor": iinfo.hypervisor,
8100 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8102 instance_data[iinfo.name] = pir
8104 data["instances"] = instance_data
8108 def _AddNewInstance(self):
8109 """Add new instance data to allocator structure.
8111 This in combination with _AllocatorGetClusterData will create the
8112 correct structure needed as input for the allocator.
8114 The checks for the completeness of the opcode must have already been
8120 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8122 if self.disk_template in constants.DTS_NET_MIRROR:
8123 self.required_nodes = 2
8125 self.required_nodes = 1
8129 "disk_template": self.disk_template,
8132 "vcpus": self.vcpus,
8133 "memory": self.mem_size,
8134 "disks": self.disks,
8135 "disk_space_total": disk_space,
8137 "required_nodes": self.required_nodes,
8139 data["request"] = request
8141 def _AddRelocateInstance(self):
8142 """Add relocate instance data to allocator structure.
8144 This in combination with _IAllocatorGetClusterData will create the
8145 correct structure needed as input for the allocator.
8147 The checks for the completeness of the opcode must have already been
8151 instance = self.cfg.GetInstanceInfo(self.name)
8152 if instance is None:
8153 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8154 " IAllocator" % self.name)
8156 if instance.disk_template not in constants.DTS_NET_MIRROR:
8157 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
8159 if len(instance.secondary_nodes) != 1:
8160 raise errors.OpPrereqError("Instance has not exactly one secondary node")
8162 self.required_nodes = 1
8163 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8164 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8169 "disk_space_total": disk_space,
8170 "required_nodes": self.required_nodes,
8171 "relocate_from": self.relocate_from,
8173 self.in_data["request"] = request
8175 def _BuildInputData(self):
8176 """Build input data structures.
8179 self._ComputeClusterData()
8181 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8182 self._AddNewInstance()
8184 self._AddRelocateInstance()
8186 self.in_text = serializer.Dump(self.in_data)
8188 def Run(self, name, validate=True, call_fn=None):
8189 """Run an instance allocator and return the results.
8193 call_fn = self.rpc.call_iallocator_runner
8195 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8196 result.Raise("Failure while running the iallocator script")
8198 self.out_text = result.payload
8200 self._ValidateResult()
8202 def _ValidateResult(self):
8203 """Process the allocator results.
8205 This will process and if successful save the result in
8206 self.out_data and the other parameters.
8210 rdict = serializer.Load(self.out_text)
8211 except Exception, err:
8212 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8214 if not isinstance(rdict, dict):
8215 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8217 for key in "success", "info", "nodes":
8218 if key not in rdict:
8219 raise errors.OpExecError("Can't parse iallocator results:"
8220 " missing key '%s'" % key)
8221 setattr(self, key, rdict[key])
8223 if not isinstance(rdict["nodes"], list):
8224 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8226 self.out_data = rdict
8229 class LUTestAllocator(NoHooksLU):
8230 """Run allocator tests.
8232 This LU runs the allocator tests
8235 _OP_REQP = ["direction", "mode", "name"]
8237 def CheckPrereq(self):
8238 """Check prerequisites.
8240 This checks the opcode parameters depending on the director and mode test.
8243 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8244 for attr in ["name", "mem_size", "disks", "disk_template",
8245 "os", "tags", "nics", "vcpus"]:
8246 if not hasattr(self.op, attr):
8247 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8249 iname = self.cfg.ExpandInstanceName(self.op.name)
8250 if iname is not None:
8251 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8253 if not isinstance(self.op.nics, list):
8254 raise errors.OpPrereqError("Invalid parameter 'nics'")
8255 for row in self.op.nics:
8256 if (not isinstance(row, dict) or
8259 "bridge" not in row):
8260 raise errors.OpPrereqError("Invalid contents of the"
8261 " 'nics' parameter")
8262 if not isinstance(self.op.disks, list):
8263 raise errors.OpPrereqError("Invalid parameter 'disks'")
8264 for row in self.op.disks:
8265 if (not isinstance(row, dict) or
8266 "size" not in row or
8267 not isinstance(row["size"], int) or
8268 "mode" not in row or
8269 row["mode"] not in ['r', 'w']):
8270 raise errors.OpPrereqError("Invalid contents of the"
8271 " 'disks' parameter")
8272 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8273 self.op.hypervisor = self.cfg.GetHypervisorType()
8274 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8275 if not hasattr(self.op, "name"):
8276 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
8277 fname = self.cfg.ExpandInstanceName(self.op.name)
8279 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8281 self.op.name = fname
8282 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8284 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8287 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8288 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8289 raise errors.OpPrereqError("Missing allocator name")
8290 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8291 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8294 def Exec(self, feedback_fn):
8295 """Run the allocator test.
8298 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8299 ial = IAllocator(self.cfg, self.rpc,
8302 mem_size=self.op.mem_size,
8303 disks=self.op.disks,
8304 disk_template=self.op.disk_template,
8308 vcpus=self.op.vcpus,
8309 hypervisor=self.op.hypervisor,
8312 ial = IAllocator(self.cfg, self.rpc,
8315 relocate_from=list(self.relocate_from),
8318 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8319 result = ial.in_text
8321 ial.Run(self.op.allocator, validate=False)
8322 result = ial.out_text