4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
611 def _NICListToTuple(lu, nics):
612 """Build a list of nic information tuples.
614 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615 value in LUQueryInstanceData.
617 @type lu: L{LogicalUnit}
618 @param lu: the logical unit on whose behalf we execute
619 @type nics: list of L{objects.NIC}
620 @param nics: list of nics to convert to hooks tuples
624 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
628 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629 mode = filled_params[constants.NIC_MODE]
630 link = filled_params[constants.NIC_LINK]
631 hooks_nics.append((ip, mac, mode, link))
635 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636 """Builds instance related env variables for hooks from an object.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type instance: L{objects.Instance}
641 @param instance: the instance for which we should build the
644 @param override: dictionary with key/values that will override
647 @return: the hook environment dictionary
650 cluster = lu.cfg.GetClusterInfo()
651 bep = cluster.FillBE(instance)
652 hvp = cluster.FillHV(instance)
654 'name': instance.name,
655 'primary_node': instance.primary_node,
656 'secondary_nodes': instance.secondary_nodes,
657 'os_type': instance.os,
658 'status': instance.admin_up,
659 'memory': bep[constants.BE_MEMORY],
660 'vcpus': bep[constants.BE_VCPUS],
661 'nics': _NICListToTuple(lu, instance.nics),
662 'disk_template': instance.disk_template,
663 'disks': [(disk.size, disk.mode) for disk in instance.disks],
666 'hypervisor_name': instance.hypervisor,
669 args.update(override)
670 return _BuildInstanceHookEnv(**args)
673 def _AdjustCandidatePool(lu):
674 """Adjust the candidate pool after node operations.
677 mod_list = lu.cfg.MaintainCandidatePool()
679 lu.LogInfo("Promoted nodes to master candidate role: %s",
680 ", ".join(node.name for node in mod_list))
681 for name in mod_list:
682 lu.context.ReaddNode(name)
683 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
685 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
689 def _CheckNicsBridgesExist(lu, target_nics, target_node,
690 profile=constants.PP_DEFAULT):
691 """Check that the brigdes needed by a list of nics exist.
694 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696 for nic in target_nics]
697 brlist = [params[constants.NIC_LINK] for params in paramslist
698 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
700 result = lu.rpc.call_bridges_exist(target_node, brlist)
701 result.Raise("Error checking bridges on destination node '%s'" %
702 target_node, prereq=True)
705 def _CheckInstanceBridgesExist(lu, instance, node=None):
706 """Check that the brigdes needed by an instance exist.
710 node = instance.primary_node
711 _CheckNicsBridgesExist(lu, instance.nics, node)
714 def _GetNodeInstancesInner(cfg, fn):
715 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
718 def _GetNodeInstances(cfg, node_name):
719 """Returns a list of all primary and secondary instances on a node.
723 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
726 def _GetNodePrimaryInstances(cfg, node_name):
727 """Returns primary instances on a node.
730 return _GetNodeInstancesInner(cfg,
731 lambda inst: node_name == inst.primary_node)
734 def _GetNodeSecondaryInstances(cfg, node_name):
735 """Returns secondary instances on a node.
738 return _GetNodeInstancesInner(cfg,
739 lambda inst: node_name in inst.secondary_nodes)
742 def _GetStorageTypeArgs(cfg, storage_type):
743 """Returns the arguments for a storage type.
746 # Special case for file storage
747 if storage_type == constants.ST_FILE:
748 # storage.FileStorage wants a list of storage directories
749 return [[cfg.GetFileStorageDir()]]
754 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
757 for dev in instance.disks:
758 cfg.SetDiskID(dev, node_name)
760 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
761 result.Raise("Failed to get disk status from node %s" % node_name,
764 for idx, bdev_status in enumerate(result.payload):
765 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
771 class LUPostInitCluster(LogicalUnit):
772 """Logical unit for running hooks after cluster initialization.
775 HPATH = "cluster-init"
776 HTYPE = constants.HTYPE_CLUSTER
779 def BuildHooksEnv(self):
783 env = {"OP_TARGET": self.cfg.GetClusterName()}
784 mn = self.cfg.GetMasterNode()
787 def CheckPrereq(self):
788 """No prerequisites to check.
793 def Exec(self, feedback_fn):
800 class LUDestroyCluster(NoHooksLU):
801 """Logical unit for destroying the cluster.
806 def CheckPrereq(self):
807 """Check prerequisites.
809 This checks whether the cluster is empty.
811 Any errors are signaled by raising errors.OpPrereqError.
814 master = self.cfg.GetMasterNode()
816 nodelist = self.cfg.GetNodeList()
817 if len(nodelist) != 1 or nodelist[0] != master:
818 raise errors.OpPrereqError("There are still %d node(s) in"
819 " this cluster." % (len(nodelist) - 1))
820 instancelist = self.cfg.GetInstanceList()
822 raise errors.OpPrereqError("There are still %d instance(s) in"
823 " this cluster." % len(instancelist))
825 def Exec(self, feedback_fn):
826 """Destroys the cluster.
829 master = self.cfg.GetMasterNode()
830 result = self.rpc.call_node_stop_master(master, False)
831 result.Raise("Could not disable the master role")
832 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
833 utils.CreateBackup(priv_key)
834 utils.CreateBackup(pub_key)
838 class LUVerifyCluster(LogicalUnit):
839 """Verifies the cluster status.
842 HPATH = "cluster-verify"
843 HTYPE = constants.HTYPE_CLUSTER
844 _OP_REQP = ["skip_checks"]
847 def ExpandNames(self):
848 self.needed_locks = {
849 locking.LEVEL_NODE: locking.ALL_SET,
850 locking.LEVEL_INSTANCE: locking.ALL_SET,
852 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
854 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
855 node_result, feedback_fn, master_files,
857 """Run multiple tests against a node.
861 - compares ganeti version
862 - checks vg existence and size > 20G
863 - checks config file checksum
864 - checks ssh to other nodes
866 @type nodeinfo: L{objects.Node}
867 @param nodeinfo: the node to check
868 @param file_list: required list of files
869 @param local_cksum: dictionary of local files and their checksums
870 @param node_result: the results from the node
871 @param feedback_fn: function used to accumulate results
872 @param master_files: list of files that only masters should have
873 @param drbd_map: the useddrbd minors for this node, in
874 form of minor: (instance, must_exist) which correspond to instances
875 and their running status
876 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
881 # main result, node_result should be a non-empty dict
882 if not node_result or not isinstance(node_result, dict):
883 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
886 # compares ganeti version
887 local_version = constants.PROTOCOL_VERSION
888 remote_version = node_result.get('version', None)
889 if not (remote_version and isinstance(remote_version, (list, tuple)) and
890 len(remote_version) == 2):
891 feedback_fn(" - ERROR: connection to %s failed" % (node))
894 if local_version != remote_version[0]:
895 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
896 " node %s %s" % (local_version, node, remote_version[0]))
899 # node seems compatible, we can actually try to look into its results
903 # full package version
904 if constants.RELEASE_VERSION != remote_version[1]:
905 feedback_fn(" - WARNING: software version mismatch: master %s,"
907 (constants.RELEASE_VERSION, node, remote_version[1]))
909 # checks vg existence and size > 20G
910 if vg_name is not None:
911 vglist = node_result.get(constants.NV_VGLIST, None)
913 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
917 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
918 constants.MIN_VG_SIZE)
920 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
923 # checks config file checksum
925 remote_cksum = node_result.get(constants.NV_FILELIST, None)
926 if not isinstance(remote_cksum, dict):
928 feedback_fn(" - ERROR: node hasn't returned file checksum data")
930 for file_name in file_list:
931 node_is_mc = nodeinfo.master_candidate
932 must_have_file = file_name not in master_files
933 if file_name not in remote_cksum:
934 if node_is_mc or must_have_file:
936 feedback_fn(" - ERROR: file '%s' missing" % file_name)
937 elif remote_cksum[file_name] != local_cksum[file_name]:
938 if node_is_mc or must_have_file:
940 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
942 # not candidate and this is not a must-have file
944 feedback_fn(" - ERROR: file '%s' should not exist on non master"
945 " candidates (and the file is outdated)" % file_name)
947 # all good, except non-master/non-must have combination
948 if not node_is_mc and not must_have_file:
949 feedback_fn(" - ERROR: file '%s' should not exist on non master"
950 " candidates" % file_name)
954 if constants.NV_NODELIST not in node_result:
956 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
958 if node_result[constants.NV_NODELIST]:
960 for node in node_result[constants.NV_NODELIST]:
961 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
962 (node, node_result[constants.NV_NODELIST][node]))
964 if constants.NV_NODENETTEST not in node_result:
966 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
968 if node_result[constants.NV_NODENETTEST]:
970 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
972 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
973 (node, node_result[constants.NV_NODENETTEST][node]))
975 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
976 if isinstance(hyp_result, dict):
977 for hv_name, hv_result in hyp_result.iteritems():
978 if hv_result is not None:
979 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
980 (hv_name, hv_result))
982 # check used drbd list
983 if vg_name is not None:
984 used_minors = node_result.get(constants.NV_DRBDLIST, [])
985 if not isinstance(used_minors, (tuple, list)):
986 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
989 for minor, (iname, must_exist) in drbd_map.items():
990 if minor not in used_minors and must_exist:
991 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
992 " not active" % (minor, iname))
994 for minor in used_minors:
995 if minor not in drbd_map:
996 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
1002 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1003 node_instance, feedback_fn, n_offline):
1004 """Verify an instance.
1006 This function checks to see if the required block devices are
1007 available on the instance's node.
1012 node_current = instanceconfig.primary_node
1014 node_vol_should = {}
1015 instanceconfig.MapLVsByNode(node_vol_should)
1017 for node in node_vol_should:
1018 if node in n_offline:
1019 # ignore missing volumes on offline nodes
1021 for volume in node_vol_should[node]:
1022 if node not in node_vol_is or volume not in node_vol_is[node]:
1023 feedback_fn(" - ERROR: volume %s missing on node %s" %
1027 if instanceconfig.admin_up:
1028 if ((node_current not in node_instance or
1029 not instance in node_instance[node_current]) and
1030 node_current not in n_offline):
1031 feedback_fn(" - ERROR: instance %s not running on node %s" %
1032 (instance, node_current))
1035 for node in node_instance:
1036 if (not node == node_current):
1037 if instance in node_instance[node]:
1038 feedback_fn(" - ERROR: instance %s should not run on node %s" %
1044 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
1045 """Verify if there are any unknown volumes in the cluster.
1047 The .os, .swap and backup volumes are ignored. All other volumes are
1048 reported as unknown.
1053 for node in node_vol_is:
1054 for volume in node_vol_is[node]:
1055 if node not in node_vol_should or volume not in node_vol_should[node]:
1056 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
1061 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
1062 """Verify the list of running instances.
1064 This checks what instances are running but unknown to the cluster.
1068 for node in node_instance:
1069 for runninginstance in node_instance[node]:
1070 if runninginstance not in instancelist:
1071 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
1072 (runninginstance, node))
1076 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1077 """Verify N+1 Memory Resilience.
1079 Check that if one single node dies we can still start all the instances it
1085 for node, nodeinfo in node_info.iteritems():
1086 # This code checks that every node which is now listed as secondary has
1087 # enough memory to host all instances it is supposed to should a single
1088 # other node in the cluster fail.
1089 # FIXME: not ready for failover to an arbitrary node
1090 # FIXME: does not support file-backed instances
1091 # WARNING: we currently take into account down instances as well as up
1092 # ones, considering that even if they're down someone might want to start
1093 # them even in the event of a node failure.
1094 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1096 for instance in instances:
1097 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1098 if bep[constants.BE_AUTO_BALANCE]:
1099 needed_mem += bep[constants.BE_MEMORY]
1100 if nodeinfo['mfree'] < needed_mem:
1101 feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
1102 " failovers should node %s fail" % (node, prinode))
1106 def CheckPrereq(self):
1107 """Check prerequisites.
1109 Transform the list of checks we're going to skip into a set and check that
1110 all its members are valid.
1113 self.skip_set = frozenset(self.op.skip_checks)
1114 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1115 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1117 def BuildHooksEnv(self):
1120 Cluster-Verify hooks just ran in the post phase and their failure makes
1121 the output be logged in the verify output and the verification to fail.
1124 all_nodes = self.cfg.GetNodeList()
1126 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1128 for node in self.cfg.GetAllNodesInfo().values():
1129 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1131 return env, [], all_nodes
1133 def Exec(self, feedback_fn):
1134 """Verify integrity of cluster, performing various test on nodes.
1138 feedback_fn("* Verifying global settings")
1139 for msg in self.cfg.VerifyConfig():
1140 feedback_fn(" - ERROR: %s" % msg)
1142 vg_name = self.cfg.GetVGName()
1143 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1144 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1145 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1146 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1147 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1148 for iname in instancelist)
1149 i_non_redundant = [] # Non redundant instances
1150 i_non_a_balanced = [] # Non auto-balanced instances
1151 n_offline = [] # List of offline nodes
1152 n_drained = [] # List of nodes being drained
1158 # FIXME: verify OS list
1159 # do local checksums
1160 master_files = [constants.CLUSTER_CONF_FILE]
1162 file_names = ssconf.SimpleStore().GetFileList()
1163 file_names.append(constants.SSL_CERT_FILE)
1164 file_names.append(constants.RAPI_CERT_FILE)
1165 file_names.extend(master_files)
1167 local_checksums = utils.FingerprintFiles(file_names)
1169 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1170 node_verify_param = {
1171 constants.NV_FILELIST: file_names,
1172 constants.NV_NODELIST: [node.name for node in nodeinfo
1173 if not node.offline],
1174 constants.NV_HYPERVISOR: hypervisors,
1175 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1176 node.secondary_ip) for node in nodeinfo
1177 if not node.offline],
1178 constants.NV_INSTANCELIST: hypervisors,
1179 constants.NV_VERSION: None,
1180 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1182 if vg_name is not None:
1183 node_verify_param[constants.NV_VGLIST] = None
1184 node_verify_param[constants.NV_LVLIST] = vg_name
1185 node_verify_param[constants.NV_DRBDLIST] = None
1186 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1187 self.cfg.GetClusterName())
1189 cluster = self.cfg.GetClusterInfo()
1190 master_node = self.cfg.GetMasterNode()
1191 all_drbd_map = self.cfg.ComputeDRBDMap()
1193 for node_i in nodeinfo:
1197 feedback_fn("* Skipping offline node %s" % (node,))
1198 n_offline.append(node)
1201 if node == master_node:
1203 elif node_i.master_candidate:
1204 ntype = "master candidate"
1205 elif node_i.drained:
1207 n_drained.append(node)
1210 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1212 msg = all_nvinfo[node].fail_msg
1214 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1218 nresult = all_nvinfo[node].payload
1220 for minor, instance in all_drbd_map[node].items():
1221 if instance not in instanceinfo:
1222 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1224 # ghost instance should not be running, but otherwise we
1225 # don't give double warnings (both ghost instance and
1226 # unallocated minor in use)
1227 node_drbd[minor] = (instance, False)
1229 instance = instanceinfo[instance]
1230 node_drbd[minor] = (instance.name, instance.admin_up)
1231 result = self._VerifyNode(node_i, file_names, local_checksums,
1232 nresult, feedback_fn, master_files,
1236 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1238 node_volume[node] = {}
1239 elif isinstance(lvdata, basestring):
1240 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1241 (node, utils.SafeEncode(lvdata)))
1243 node_volume[node] = {}
1244 elif not isinstance(lvdata, dict):
1245 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1249 node_volume[node] = lvdata
1252 idata = nresult.get(constants.NV_INSTANCELIST, None)
1253 if not isinstance(idata, list):
1254 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1259 node_instance[node] = idata
1262 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1263 if not isinstance(nodeinfo, dict):
1264 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1270 "mfree": int(nodeinfo['memory_free']),
1273 # dictionary holding all instances this node is secondary for,
1274 # grouped by their primary node. Each key is a cluster node, and each
1275 # value is a list of instances which have the key as primary and the
1276 # current node as secondary. this is handy to calculate N+1 memory
1277 # availability if you can only failover from a primary to its
1279 "sinst-by-pnode": {},
1281 # FIXME: devise a free space model for file based instances as well
1282 if vg_name is not None:
1283 if (constants.NV_VGLIST not in nresult or
1284 vg_name not in nresult[constants.NV_VGLIST]):
1285 feedback_fn(" - ERROR: node %s didn't return data for the"
1286 " volume group '%s' - it is either missing or broken" %
1290 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1291 except (ValueError, KeyError):
1292 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1293 " from node %s" % (node,))
1297 node_vol_should = {}
1299 for instance in instancelist:
1300 feedback_fn("* Verifying instance %s" % instance)
1301 inst_config = instanceinfo[instance]
1302 result = self._VerifyInstance(instance, inst_config, node_volume,
1303 node_instance, feedback_fn, n_offline)
1305 inst_nodes_offline = []
1307 inst_config.MapLVsByNode(node_vol_should)
1309 instance_cfg[instance] = inst_config
1311 pnode = inst_config.primary_node
1312 if pnode in node_info:
1313 node_info[pnode]['pinst'].append(instance)
1314 elif pnode not in n_offline:
1315 feedback_fn(" - ERROR: instance %s, connection to primary node"
1316 " %s failed" % (instance, pnode))
1319 if pnode in n_offline:
1320 inst_nodes_offline.append(pnode)
1322 # If the instance is non-redundant we cannot survive losing its primary
1323 # node, so we are not N+1 compliant. On the other hand we have no disk
1324 # templates with more than one secondary so that situation is not well
1326 # FIXME: does not support file-backed instances
1327 if len(inst_config.secondary_nodes) == 0:
1328 i_non_redundant.append(instance)
1329 elif len(inst_config.secondary_nodes) > 1:
1330 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1333 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1334 i_non_a_balanced.append(instance)
1336 for snode in inst_config.secondary_nodes:
1337 if snode in node_info:
1338 node_info[snode]['sinst'].append(instance)
1339 if pnode not in node_info[snode]['sinst-by-pnode']:
1340 node_info[snode]['sinst-by-pnode'][pnode] = []
1341 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1342 elif snode not in n_offline:
1343 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1344 " %s failed" % (instance, snode))
1346 if snode in n_offline:
1347 inst_nodes_offline.append(snode)
1349 if inst_nodes_offline:
1350 # warn that the instance lives on offline nodes, and set bad=True
1351 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1352 ", ".join(inst_nodes_offline))
1355 feedback_fn("* Verifying orphan volumes")
1356 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1360 feedback_fn("* Verifying remaining instances")
1361 result = self._VerifyOrphanInstances(instancelist, node_instance,
1365 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1366 feedback_fn("* Verifying N+1 Memory redundancy")
1367 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1370 feedback_fn("* Other Notes")
1372 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1373 % len(i_non_redundant))
1375 if i_non_a_balanced:
1376 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1377 % len(i_non_a_balanced))
1380 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1383 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1387 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1388 """Analyze the post-hooks' result
1390 This method analyses the hook result, handles it, and sends some
1391 nicely-formatted feedback back to the user.
1393 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1394 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1395 @param hooks_results: the results of the multi-node hooks rpc call
1396 @param feedback_fn: function used send feedback back to the caller
1397 @param lu_result: previous Exec result
1398 @return: the new Exec result, based on the previous result
1402 # We only really run POST phase hooks, and are only interested in
1404 if phase == constants.HOOKS_PHASE_POST:
1405 # Used to change hooks' output to proper indentation
1406 indent_re = re.compile('^', re.M)
1407 feedback_fn("* Hooks Results")
1408 if not hooks_results:
1409 feedback_fn(" - ERROR: general communication failure")
1412 for node_name in hooks_results:
1413 show_node_header = True
1414 res = hooks_results[node_name]
1418 # no need to warn or set fail return value
1420 feedback_fn(" Communication failure in hooks execution: %s" %
1424 for script, hkr, output in res.payload:
1425 if hkr == constants.HKR_FAIL:
1426 # The node header is only shown once, if there are
1427 # failing hooks on that node
1428 if show_node_header:
1429 feedback_fn(" Node %s:" % node_name)
1430 show_node_header = False
1431 feedback_fn(" ERROR: Script %s failed, output:" % script)
1432 output = indent_re.sub(' ', output)
1433 feedback_fn("%s" % output)
1439 class LUVerifyDisks(NoHooksLU):
1440 """Verifies the cluster disks status.
1446 def ExpandNames(self):
1447 self.needed_locks = {
1448 locking.LEVEL_NODE: locking.ALL_SET,
1449 locking.LEVEL_INSTANCE: locking.ALL_SET,
1451 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1453 def CheckPrereq(self):
1454 """Check prerequisites.
1456 This has no prerequisites.
1461 def Exec(self, feedback_fn):
1462 """Verify integrity of cluster disks.
1464 @rtype: tuple of three items
1465 @return: a tuple of (dict of node-to-node_error, list of instances
1466 which need activate-disks, dict of instance: (node, volume) for
1470 result = res_nodes, res_instances, res_missing = {}, [], {}
1472 vg_name = self.cfg.GetVGName()
1473 nodes = utils.NiceSort(self.cfg.GetNodeList())
1474 instances = [self.cfg.GetInstanceInfo(name)
1475 for name in self.cfg.GetInstanceList()]
1478 for inst in instances:
1480 if (not inst.admin_up or
1481 inst.disk_template not in constants.DTS_NET_MIRROR):
1483 inst.MapLVsByNode(inst_lvs)
1484 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1485 for node, vol_list in inst_lvs.iteritems():
1486 for vol in vol_list:
1487 nv_dict[(node, vol)] = inst
1492 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1496 node_res = node_lvs[node]
1497 if node_res.offline:
1499 msg = node_res.fail_msg
1501 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1502 res_nodes[node] = msg
1505 lvs = node_res.payload
1506 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1507 inst = nv_dict.pop((node, lv_name), None)
1508 if (not lv_online and inst is not None
1509 and inst.name not in res_instances):
1510 res_instances.append(inst.name)
1512 # any leftover items in nv_dict are missing LVs, let's arrange the
1514 for key, inst in nv_dict.iteritems():
1515 if inst.name not in res_missing:
1516 res_missing[inst.name] = []
1517 res_missing[inst.name].append(key)
1522 class LURepairDiskSizes(NoHooksLU):
1523 """Verifies the cluster disks sizes.
1526 _OP_REQP = ["instances"]
1529 def ExpandNames(self):
1531 if not isinstance(self.op.instances, list):
1532 raise errors.OpPrereqError("Invalid argument type 'instances'")
1534 if self.op.instances:
1535 self.wanted_names = []
1536 for name in self.op.instances:
1537 full_name = self.cfg.ExpandInstanceName(name)
1538 if full_name is None:
1539 raise errors.OpPrereqError("Instance '%s' not known" % name)
1540 self.wanted_names.append(full_name)
1541 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1542 self.needed_locks = {
1543 locking.LEVEL_NODE: [],
1544 locking.LEVEL_INSTANCE: self.wanted_names,
1546 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1548 self.wanted_names = None
1549 self.needed_locks = {
1550 locking.LEVEL_NODE: locking.ALL_SET,
1551 locking.LEVEL_INSTANCE: locking.ALL_SET,
1553 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1555 def DeclareLocks(self, level):
1556 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1557 self._LockInstancesNodes(primary_only=True)
1559 def CheckPrereq(self):
1560 """Check prerequisites.
1562 This only checks the optional instance list against the existing names.
1565 if self.wanted_names is None:
1566 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1568 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1569 in self.wanted_names]
1571 def Exec(self, feedback_fn):
1572 """Verify the size of cluster disks.
1575 # TODO: check child disks too
1576 # TODO: check differences in size between primary/secondary nodes
1578 for instance in self.wanted_instances:
1579 pnode = instance.primary_node
1580 if pnode not in per_node_disks:
1581 per_node_disks[pnode] = []
1582 for idx, disk in enumerate(instance.disks):
1583 per_node_disks[pnode].append((instance, idx, disk))
1586 for node, dskl in per_node_disks.items():
1587 result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1589 self.LogWarning("Failure in blockdev_getsizes call to node"
1590 " %s, ignoring", node)
1592 if len(result.data) != len(dskl):
1593 self.LogWarning("Invalid result from node %s, ignoring node results",
1596 for ((instance, idx, disk), size) in zip(dskl, result.data):
1598 self.LogWarning("Disk %d of instance %s did not return size"
1599 " information, ignoring", idx, instance.name)
1601 if not isinstance(size, (int, long)):
1602 self.LogWarning("Disk %d of instance %s did not return valid"
1603 " size information, ignoring", idx, instance.name)
1606 if size != disk.size:
1607 self.LogInfo("Disk %d of instance %s has mismatched size,"
1608 " correcting: recorded %d, actual %d", idx,
1609 instance.name, disk.size, size)
1611 self.cfg.Update(instance)
1612 changed.append((instance.name, idx, size))
1616 class LURenameCluster(LogicalUnit):
1617 """Rename the cluster.
1620 HPATH = "cluster-rename"
1621 HTYPE = constants.HTYPE_CLUSTER
1624 def BuildHooksEnv(self):
1629 "OP_TARGET": self.cfg.GetClusterName(),
1630 "NEW_NAME": self.op.name,
1632 mn = self.cfg.GetMasterNode()
1633 return env, [mn], [mn]
1635 def CheckPrereq(self):
1636 """Verify that the passed name is a valid one.
1639 hostname = utils.HostInfo(self.op.name)
1641 new_name = hostname.name
1642 self.ip = new_ip = hostname.ip
1643 old_name = self.cfg.GetClusterName()
1644 old_ip = self.cfg.GetMasterIP()
1645 if new_name == old_name and new_ip == old_ip:
1646 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1647 " cluster has changed")
1648 if new_ip != old_ip:
1649 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1650 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1651 " reachable on the network. Aborting." %
1654 self.op.name = new_name
1656 def Exec(self, feedback_fn):
1657 """Rename the cluster.
1660 clustername = self.op.name
1663 # shutdown the master IP
1664 master = self.cfg.GetMasterNode()
1665 result = self.rpc.call_node_stop_master(master, False)
1666 result.Raise("Could not disable the master role")
1669 cluster = self.cfg.GetClusterInfo()
1670 cluster.cluster_name = clustername
1671 cluster.master_ip = ip
1672 self.cfg.Update(cluster)
1674 # update the known hosts file
1675 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1676 node_list = self.cfg.GetNodeList()
1678 node_list.remove(master)
1681 result = self.rpc.call_upload_file(node_list,
1682 constants.SSH_KNOWN_HOSTS_FILE)
1683 for to_node, to_result in result.iteritems():
1684 msg = to_result.fail_msg
1686 msg = ("Copy of file %s to node %s failed: %s" %
1687 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1688 self.proc.LogWarning(msg)
1691 result = self.rpc.call_node_start_master(master, False, False)
1692 msg = result.fail_msg
1694 self.LogWarning("Could not re-enable the master role on"
1695 " the master, please restart manually: %s", msg)
1698 def _RecursiveCheckIfLVMBased(disk):
1699 """Check if the given disk or its children are lvm-based.
1701 @type disk: L{objects.Disk}
1702 @param disk: the disk to check
1704 @return: boolean indicating whether a LD_LV dev_type was found or not
1708 for chdisk in disk.children:
1709 if _RecursiveCheckIfLVMBased(chdisk):
1711 return disk.dev_type == constants.LD_LV
1714 class LUSetClusterParams(LogicalUnit):
1715 """Change the parameters of the cluster.
1718 HPATH = "cluster-modify"
1719 HTYPE = constants.HTYPE_CLUSTER
1723 def CheckArguments(self):
1727 if not hasattr(self.op, "candidate_pool_size"):
1728 self.op.candidate_pool_size = None
1729 if self.op.candidate_pool_size is not None:
1731 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1732 except (ValueError, TypeError), err:
1733 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1735 if self.op.candidate_pool_size < 1:
1736 raise errors.OpPrereqError("At least one master candidate needed")
1738 def ExpandNames(self):
1739 # FIXME: in the future maybe other cluster params won't require checking on
1740 # all nodes to be modified.
1741 self.needed_locks = {
1742 locking.LEVEL_NODE: locking.ALL_SET,
1744 self.share_locks[locking.LEVEL_NODE] = 1
1746 def BuildHooksEnv(self):
1751 "OP_TARGET": self.cfg.GetClusterName(),
1752 "NEW_VG_NAME": self.op.vg_name,
1754 mn = self.cfg.GetMasterNode()
1755 return env, [mn], [mn]
1757 def CheckPrereq(self):
1758 """Check prerequisites.
1760 This checks whether the given params don't conflict and
1761 if the given volume group is valid.
1764 if self.op.vg_name is not None and not self.op.vg_name:
1765 instances = self.cfg.GetAllInstancesInfo().values()
1766 for inst in instances:
1767 for disk in inst.disks:
1768 if _RecursiveCheckIfLVMBased(disk):
1769 raise errors.OpPrereqError("Cannot disable lvm storage while"
1770 " lvm-based instances exist")
1772 node_list = self.acquired_locks[locking.LEVEL_NODE]
1774 # if vg_name not None, checks given volume group on all nodes
1776 vglist = self.rpc.call_vg_list(node_list)
1777 for node in node_list:
1778 msg = vglist[node].fail_msg
1780 # ignoring down node
1781 self.LogWarning("Error while gathering data on node %s"
1782 " (ignoring node): %s", node, msg)
1784 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1786 constants.MIN_VG_SIZE)
1788 raise errors.OpPrereqError("Error on node '%s': %s" %
1791 self.cluster = cluster = self.cfg.GetClusterInfo()
1792 # validate params changes
1793 if self.op.beparams:
1794 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1795 self.new_beparams = objects.FillDict(
1796 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1798 if self.op.nicparams:
1799 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1800 self.new_nicparams = objects.FillDict(
1801 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1802 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1804 # hypervisor list/parameters
1805 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1806 if self.op.hvparams:
1807 if not isinstance(self.op.hvparams, dict):
1808 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1809 for hv_name, hv_dict in self.op.hvparams.items():
1810 if hv_name not in self.new_hvparams:
1811 self.new_hvparams[hv_name] = hv_dict
1813 self.new_hvparams[hv_name].update(hv_dict)
1815 if self.op.enabled_hypervisors is not None:
1816 self.hv_list = self.op.enabled_hypervisors
1817 if not self.hv_list:
1818 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1819 " least one member")
1820 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1822 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1823 " entries: %s" % invalid_hvs)
1825 self.hv_list = cluster.enabled_hypervisors
1827 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1828 # either the enabled list has changed, or the parameters have, validate
1829 for hv_name, hv_params in self.new_hvparams.items():
1830 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1831 (self.op.enabled_hypervisors and
1832 hv_name in self.op.enabled_hypervisors)):
1833 # either this is a new hypervisor, or its parameters have changed
1834 hv_class = hypervisor.GetHypervisor(hv_name)
1835 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1836 hv_class.CheckParameterSyntax(hv_params)
1837 _CheckHVParams(self, node_list, hv_name, hv_params)
1839 def Exec(self, feedback_fn):
1840 """Change the parameters of the cluster.
1843 if self.op.vg_name is not None:
1844 new_volume = self.op.vg_name
1847 if new_volume != self.cfg.GetVGName():
1848 self.cfg.SetVGName(new_volume)
1850 feedback_fn("Cluster LVM configuration already in desired"
1851 " state, not changing")
1852 if self.op.hvparams:
1853 self.cluster.hvparams = self.new_hvparams
1854 if self.op.enabled_hypervisors is not None:
1855 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1856 if self.op.beparams:
1857 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1858 if self.op.nicparams:
1859 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1861 if self.op.candidate_pool_size is not None:
1862 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1863 # we need to update the pool size here, otherwise the save will fail
1864 _AdjustCandidatePool(self)
1866 self.cfg.Update(self.cluster)
1869 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1870 """Distribute additional files which are part of the cluster configuration.
1872 ConfigWriter takes care of distributing the config and ssconf files, but
1873 there are more files which should be distributed to all nodes. This function
1874 makes sure those are copied.
1876 @param lu: calling logical unit
1877 @param additional_nodes: list of nodes not in the config to distribute to
1880 # 1. Gather target nodes
1881 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1882 dist_nodes = lu.cfg.GetNodeList()
1883 if additional_nodes is not None:
1884 dist_nodes.extend(additional_nodes)
1885 if myself.name in dist_nodes:
1886 dist_nodes.remove(myself.name)
1887 # 2. Gather files to distribute
1888 dist_files = set([constants.ETC_HOSTS,
1889 constants.SSH_KNOWN_HOSTS_FILE,
1890 constants.RAPI_CERT_FILE,
1891 constants.RAPI_USERS_FILE,
1892 constants.HMAC_CLUSTER_KEY,
1895 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1896 for hv_name in enabled_hypervisors:
1897 hv_class = hypervisor.GetHypervisor(hv_name)
1898 dist_files.update(hv_class.GetAncillaryFiles())
1900 # 3. Perform the files upload
1901 for fname in dist_files:
1902 if os.path.exists(fname):
1903 result = lu.rpc.call_upload_file(dist_nodes, fname)
1904 for to_node, to_result in result.items():
1905 msg = to_result.fail_msg
1907 msg = ("Copy of file %s to node %s failed: %s" %
1908 (fname, to_node, msg))
1909 lu.proc.LogWarning(msg)
1912 class LURedistributeConfig(NoHooksLU):
1913 """Force the redistribution of cluster configuration.
1915 This is a very simple LU.
1921 def ExpandNames(self):
1922 self.needed_locks = {
1923 locking.LEVEL_NODE: locking.ALL_SET,
1925 self.share_locks[locking.LEVEL_NODE] = 1
1927 def CheckPrereq(self):
1928 """Check prerequisites.
1932 def Exec(self, feedback_fn):
1933 """Redistribute the configuration.
1936 self.cfg.Update(self.cfg.GetClusterInfo())
1937 _RedistributeAncillaryFiles(self)
1940 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1941 """Sleep and poll for an instance's disk to sync.
1944 if not instance.disks:
1948 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1950 node = instance.primary_node
1952 for dev in instance.disks:
1953 lu.cfg.SetDiskID(dev, node)
1956 degr_retries = 10 # in seconds, as we sleep 1 second each time
1960 cumul_degraded = False
1961 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1962 msg = rstats.fail_msg
1964 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1967 raise errors.RemoteError("Can't contact node %s for mirror data,"
1968 " aborting." % node)
1971 rstats = rstats.payload
1973 for i, mstat in enumerate(rstats):
1975 lu.LogWarning("Can't compute data for node %s/%s",
1976 node, instance.disks[i].iv_name)
1979 cumul_degraded = (cumul_degraded or
1980 (mstat.is_degraded and mstat.sync_percent is None))
1981 if mstat.sync_percent is not None:
1983 if mstat.estimated_time is not None:
1984 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
1985 max_time = mstat.estimated_time
1987 rem_time = "no time estimate"
1988 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1989 (instance.disks[i].iv_name, mstat.sync_percent, rem_time))
1991 # if we're done but degraded, let's do a few small retries, to
1992 # make sure we see a stable and not transient situation; therefore
1993 # we force restart of the loop
1994 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1995 logging.info("Degraded disks found, %d retries left", degr_retries)
2003 time.sleep(min(60, max_time))
2006 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2007 return not cumul_degraded
2010 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2011 """Check that mirrors are not degraded.
2013 The ldisk parameter, if True, will change the test from the
2014 is_degraded attribute (which represents overall non-ok status for
2015 the device(s)) to the ldisk (representing the local storage status).
2018 lu.cfg.SetDiskID(dev, node)
2022 if on_primary or dev.AssembleOnSecondary():
2023 rstats = lu.rpc.call_blockdev_find(node, dev)
2024 msg = rstats.fail_msg
2026 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2028 elif not rstats.payload:
2029 lu.LogWarning("Can't find disk on node %s", node)
2033 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2035 result = result and not rstats.payload.is_degraded
2038 for child in dev.children:
2039 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2044 class LUDiagnoseOS(NoHooksLU):
2045 """Logical unit for OS diagnose/query.
2048 _OP_REQP = ["output_fields", "names"]
2050 _FIELDS_STATIC = utils.FieldSet()
2051 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2053 def ExpandNames(self):
2055 raise errors.OpPrereqError("Selective OS query not supported")
2057 _CheckOutputFields(static=self._FIELDS_STATIC,
2058 dynamic=self._FIELDS_DYNAMIC,
2059 selected=self.op.output_fields)
2061 # Lock all nodes, in shared mode
2062 # Temporary removal of locks, should be reverted later
2063 # TODO: reintroduce locks when they are lighter-weight
2064 self.needed_locks = {}
2065 #self.share_locks[locking.LEVEL_NODE] = 1
2066 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2068 def CheckPrereq(self):
2069 """Check prerequisites.
2074 def _DiagnoseByOS(node_list, rlist):
2075 """Remaps a per-node return list into an a per-os per-node dictionary
2077 @param node_list: a list with the names of all nodes
2078 @param rlist: a map with node names as keys and OS objects as values
2081 @return: a dictionary with osnames as keys and as value another map, with
2082 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2084 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2085 (/srv/..., False, "invalid api")],
2086 "node2": [(/srv/..., True, "")]}
2091 # we build here the list of nodes that didn't fail the RPC (at RPC
2092 # level), so that nodes with a non-responding node daemon don't
2093 # make all OSes invalid
2094 good_nodes = [node_name for node_name in rlist
2095 if not rlist[node_name].fail_msg]
2096 for node_name, nr in rlist.items():
2097 if nr.fail_msg or not nr.payload:
2099 for name, path, status, diagnose in nr.payload:
2100 if name not in all_os:
2101 # build a list of nodes for this os containing empty lists
2102 # for each node in node_list
2104 for nname in good_nodes:
2105 all_os[name][nname] = []
2106 all_os[name][node_name].append((path, status, diagnose))
2109 def Exec(self, feedback_fn):
2110 """Compute the list of OSes.
2113 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2114 node_data = self.rpc.call_os_diagnose(valid_nodes)
2115 pol = self._DiagnoseByOS(valid_nodes, node_data)
2117 for os_name, os_data in pol.items():
2119 for field in self.op.output_fields:
2122 elif field == "valid":
2123 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2124 elif field == "node_status":
2125 # this is just a copy of the dict
2127 for node_name, nos_list in os_data.items():
2128 val[node_name] = nos_list
2130 raise errors.ParameterError(field)
2137 class LURemoveNode(LogicalUnit):
2138 """Logical unit for removing a node.
2141 HPATH = "node-remove"
2142 HTYPE = constants.HTYPE_NODE
2143 _OP_REQP = ["node_name"]
2145 def BuildHooksEnv(self):
2148 This doesn't run on the target node in the pre phase as a failed
2149 node would then be impossible to remove.
2153 "OP_TARGET": self.op.node_name,
2154 "NODE_NAME": self.op.node_name,
2156 all_nodes = self.cfg.GetNodeList()
2157 all_nodes.remove(self.op.node_name)
2158 return env, all_nodes, all_nodes
2160 def CheckPrereq(self):
2161 """Check prerequisites.
2164 - the node exists in the configuration
2165 - it does not have primary or secondary instances
2166 - it's not the master
2168 Any errors are signaled by raising errors.OpPrereqError.
2171 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2173 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2175 instance_list = self.cfg.GetInstanceList()
2177 masternode = self.cfg.GetMasterNode()
2178 if node.name == masternode:
2179 raise errors.OpPrereqError("Node is the master node,"
2180 " you need to failover first.")
2182 for instance_name in instance_list:
2183 instance = self.cfg.GetInstanceInfo(instance_name)
2184 if node.name in instance.all_nodes:
2185 raise errors.OpPrereqError("Instance %s is still running on the node,"
2186 " please remove first." % instance_name)
2187 self.op.node_name = node.name
2190 def Exec(self, feedback_fn):
2191 """Removes the node from the cluster.
2195 logging.info("Stopping the node daemon and removing configs from node %s",
2198 self.context.RemoveNode(node.name)
2200 result = self.rpc.call_node_leave_cluster(node.name)
2201 msg = result.fail_msg
2203 self.LogWarning("Errors encountered on the remote node while leaving"
2204 " the cluster: %s", msg)
2206 # Promote nodes to master candidate as needed
2207 _AdjustCandidatePool(self)
2210 class LUQueryNodes(NoHooksLU):
2211 """Logical unit for querying nodes.
2214 _OP_REQP = ["output_fields", "names", "use_locking"]
2216 _FIELDS_DYNAMIC = utils.FieldSet(
2218 "mtotal", "mnode", "mfree",
2220 "ctotal", "cnodes", "csockets",
2223 _FIELDS_STATIC = utils.FieldSet(
2224 "name", "pinst_cnt", "sinst_cnt",
2225 "pinst_list", "sinst_list",
2226 "pip", "sip", "tags",
2227 "serial_no", "ctime", "mtime",
2235 def ExpandNames(self):
2236 _CheckOutputFields(static=self._FIELDS_STATIC,
2237 dynamic=self._FIELDS_DYNAMIC,
2238 selected=self.op.output_fields)
2240 self.needed_locks = {}
2241 self.share_locks[locking.LEVEL_NODE] = 1
2244 self.wanted = _GetWantedNodes(self, self.op.names)
2246 self.wanted = locking.ALL_SET
2248 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2249 self.do_locking = self.do_node_query and self.op.use_locking
2251 # if we don't request only static fields, we need to lock the nodes
2252 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2255 def CheckPrereq(self):
2256 """Check prerequisites.
2259 # The validation of the node list is done in the _GetWantedNodes,
2260 # if non empty, and if empty, there's no validation to do
2263 def Exec(self, feedback_fn):
2264 """Computes the list of nodes and their attributes.
2267 all_info = self.cfg.GetAllNodesInfo()
2269 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2270 elif self.wanted != locking.ALL_SET:
2271 nodenames = self.wanted
2272 missing = set(nodenames).difference(all_info.keys())
2274 raise errors.OpExecError(
2275 "Some nodes were removed before retrieving their data: %s" % missing)
2277 nodenames = all_info.keys()
2279 nodenames = utils.NiceSort(nodenames)
2280 nodelist = [all_info[name] for name in nodenames]
2282 # begin data gathering
2284 if self.do_node_query:
2286 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2287 self.cfg.GetHypervisorType())
2288 for name in nodenames:
2289 nodeinfo = node_data[name]
2290 if not nodeinfo.fail_msg and nodeinfo.payload:
2291 nodeinfo = nodeinfo.payload
2292 fn = utils.TryConvert
2294 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2295 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2296 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2297 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2298 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2299 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2300 "bootid": nodeinfo.get('bootid', None),
2301 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2302 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2305 live_data[name] = {}
2307 live_data = dict.fromkeys(nodenames, {})
2309 node_to_primary = dict([(name, set()) for name in nodenames])
2310 node_to_secondary = dict([(name, set()) for name in nodenames])
2312 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2313 "sinst_cnt", "sinst_list"))
2314 if inst_fields & frozenset(self.op.output_fields):
2315 instancelist = self.cfg.GetInstanceList()
2317 for instance_name in instancelist:
2318 inst = self.cfg.GetInstanceInfo(instance_name)
2319 if inst.primary_node in node_to_primary:
2320 node_to_primary[inst.primary_node].add(inst.name)
2321 for secnode in inst.secondary_nodes:
2322 if secnode in node_to_secondary:
2323 node_to_secondary[secnode].add(inst.name)
2325 master_node = self.cfg.GetMasterNode()
2327 # end data gathering
2330 for node in nodelist:
2332 for field in self.op.output_fields:
2335 elif field == "pinst_list":
2336 val = list(node_to_primary[node.name])
2337 elif field == "sinst_list":
2338 val = list(node_to_secondary[node.name])
2339 elif field == "pinst_cnt":
2340 val = len(node_to_primary[node.name])
2341 elif field == "sinst_cnt":
2342 val = len(node_to_secondary[node.name])
2343 elif field == "pip":
2344 val = node.primary_ip
2345 elif field == "sip":
2346 val = node.secondary_ip
2347 elif field == "tags":
2348 val = list(node.GetTags())
2349 elif field == "serial_no":
2350 val = node.serial_no
2351 elif field == "ctime":
2353 elif field == "mtime":
2355 elif field == "master_candidate":
2356 val = node.master_candidate
2357 elif field == "master":
2358 val = node.name == master_node
2359 elif field == "offline":
2361 elif field == "drained":
2363 elif self._FIELDS_DYNAMIC.Matches(field):
2364 val = live_data[node.name].get(field, None)
2365 elif field == "role":
2366 if node.name == master_node:
2368 elif node.master_candidate:
2377 raise errors.ParameterError(field)
2378 node_output.append(val)
2379 output.append(node_output)
2384 class LUQueryNodeVolumes(NoHooksLU):
2385 """Logical unit for getting volumes on node(s).
2388 _OP_REQP = ["nodes", "output_fields"]
2390 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2391 _FIELDS_STATIC = utils.FieldSet("node")
2393 def ExpandNames(self):
2394 _CheckOutputFields(static=self._FIELDS_STATIC,
2395 dynamic=self._FIELDS_DYNAMIC,
2396 selected=self.op.output_fields)
2398 self.needed_locks = {}
2399 self.share_locks[locking.LEVEL_NODE] = 1
2400 if not self.op.nodes:
2401 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2403 self.needed_locks[locking.LEVEL_NODE] = \
2404 _GetWantedNodes(self, self.op.nodes)
2406 def CheckPrereq(self):
2407 """Check prerequisites.
2409 This checks that the fields required are valid output fields.
2412 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2414 def Exec(self, feedback_fn):
2415 """Computes the list of nodes and their attributes.
2418 nodenames = self.nodes
2419 volumes = self.rpc.call_node_volumes(nodenames)
2421 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2422 in self.cfg.GetInstanceList()]
2424 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2427 for node in nodenames:
2428 nresult = volumes[node]
2431 msg = nresult.fail_msg
2433 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2436 node_vols = nresult.payload[:]
2437 node_vols.sort(key=lambda vol: vol['dev'])
2439 for vol in node_vols:
2441 for field in self.op.output_fields:
2444 elif field == "phys":
2448 elif field == "name":
2450 elif field == "size":
2451 val = int(float(vol['size']))
2452 elif field == "instance":
2454 if node not in lv_by_node[inst]:
2456 if vol['name'] in lv_by_node[inst][node]:
2462 raise errors.ParameterError(field)
2463 node_output.append(str(val))
2465 output.append(node_output)
2470 class LUQueryNodeStorage(NoHooksLU):
2471 """Logical unit for getting information on storage units on node(s).
2474 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2476 _FIELDS_STATIC = utils.FieldSet("node")
2478 def ExpandNames(self):
2479 storage_type = self.op.storage_type
2481 if storage_type not in constants.VALID_STORAGE_FIELDS:
2482 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2484 dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2486 _CheckOutputFields(static=self._FIELDS_STATIC,
2487 dynamic=utils.FieldSet(*dynamic_fields),
2488 selected=self.op.output_fields)
2490 self.needed_locks = {}
2491 self.share_locks[locking.LEVEL_NODE] = 1
2494 self.needed_locks[locking.LEVEL_NODE] = \
2495 _GetWantedNodes(self, self.op.nodes)
2497 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2499 def CheckPrereq(self):
2500 """Check prerequisites.
2502 This checks that the fields required are valid output fields.
2505 self.op.name = getattr(self.op, "name", None)
2507 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2509 def Exec(self, feedback_fn):
2510 """Computes the list of nodes and their attributes.
2513 # Always get name to sort by
2514 if constants.SF_NAME in self.op.output_fields:
2515 fields = self.op.output_fields[:]
2517 fields = [constants.SF_NAME] + self.op.output_fields
2519 # Never ask for node as it's only known to the LU
2520 while "node" in fields:
2521 fields.remove("node")
2523 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2524 name_idx = field_idx[constants.SF_NAME]
2526 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2527 data = self.rpc.call_storage_list(self.nodes,
2528 self.op.storage_type, st_args,
2529 self.op.name, fields)
2533 for node in utils.NiceSort(self.nodes):
2534 nresult = data[node]
2538 msg = nresult.fail_msg
2540 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2543 rows = dict([(row[name_idx], row) for row in nresult.payload])
2545 for name in utils.NiceSort(rows.keys()):
2550 for field in self.op.output_fields:
2553 elif field in field_idx:
2554 val = row[field_idx[field]]
2556 raise errors.ParameterError(field)
2565 class LUModifyNodeStorage(NoHooksLU):
2566 """Logical unit for modifying a storage volume on a node.
2569 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2572 def CheckArguments(self):
2573 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2574 if node_name is None:
2575 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2577 self.op.node_name = node_name
2579 storage_type = self.op.storage_type
2580 if storage_type not in constants.VALID_STORAGE_FIELDS:
2581 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2583 def ExpandNames(self):
2584 self.needed_locks = {
2585 locking.LEVEL_NODE: self.op.node_name,
2588 def CheckPrereq(self):
2589 """Check prerequisites.
2592 storage_type = self.op.storage_type
2595 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2597 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2598 " modified" % storage_type)
2600 diff = set(self.op.changes.keys()) - modifiable
2602 raise errors.OpPrereqError("The following fields can not be modified for"
2603 " storage units of type '%s': %r" %
2604 (storage_type, list(diff)))
2606 def Exec(self, feedback_fn):
2607 """Computes the list of nodes and their attributes.
2610 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2611 result = self.rpc.call_storage_modify(self.op.node_name,
2612 self.op.storage_type, st_args,
2613 self.op.name, self.op.changes)
2614 result.Raise("Failed to modify storage unit '%s' on %s" %
2615 (self.op.name, self.op.node_name))
2618 class LUAddNode(LogicalUnit):
2619 """Logical unit for adding node to the cluster.
2623 HTYPE = constants.HTYPE_NODE
2624 _OP_REQP = ["node_name"]
2626 def BuildHooksEnv(self):
2629 This will run on all nodes before, and on all nodes + the new node after.
2633 "OP_TARGET": self.op.node_name,
2634 "NODE_NAME": self.op.node_name,
2635 "NODE_PIP": self.op.primary_ip,
2636 "NODE_SIP": self.op.secondary_ip,
2638 nodes_0 = self.cfg.GetNodeList()
2639 nodes_1 = nodes_0 + [self.op.node_name, ]
2640 return env, nodes_0, nodes_1
2642 def CheckPrereq(self):
2643 """Check prerequisites.
2646 - the new node is not already in the config
2648 - its parameters (single/dual homed) matches the cluster
2650 Any errors are signaled by raising errors.OpPrereqError.
2653 node_name = self.op.node_name
2656 dns_data = utils.HostInfo(node_name)
2658 node = dns_data.name
2659 primary_ip = self.op.primary_ip = dns_data.ip
2660 secondary_ip = getattr(self.op, "secondary_ip", None)
2661 if secondary_ip is None:
2662 secondary_ip = primary_ip
2663 if not utils.IsValidIP(secondary_ip):
2664 raise errors.OpPrereqError("Invalid secondary IP given")
2665 self.op.secondary_ip = secondary_ip
2667 node_list = cfg.GetNodeList()
2668 if not self.op.readd and node in node_list:
2669 raise errors.OpPrereqError("Node %s is already in the configuration" %
2671 elif self.op.readd and node not in node_list:
2672 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2674 for existing_node_name in node_list:
2675 existing_node = cfg.GetNodeInfo(existing_node_name)
2677 if self.op.readd and node == existing_node_name:
2678 if (existing_node.primary_ip != primary_ip or
2679 existing_node.secondary_ip != secondary_ip):
2680 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2681 " address configuration as before")
2684 if (existing_node.primary_ip == primary_ip or
2685 existing_node.secondary_ip == primary_ip or
2686 existing_node.primary_ip == secondary_ip or
2687 existing_node.secondary_ip == secondary_ip):
2688 raise errors.OpPrereqError("New node ip address(es) conflict with"
2689 " existing node %s" % existing_node.name)
2691 # check that the type of the node (single versus dual homed) is the
2692 # same as for the master
2693 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2694 master_singlehomed = myself.secondary_ip == myself.primary_ip
2695 newbie_singlehomed = secondary_ip == primary_ip
2696 if master_singlehomed != newbie_singlehomed:
2697 if master_singlehomed:
2698 raise errors.OpPrereqError("The master has no private ip but the"
2699 " new node has one")
2701 raise errors.OpPrereqError("The master has a private ip but the"
2702 " new node doesn't have one")
2704 # checks reachability
2705 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2706 raise errors.OpPrereqError("Node not reachable by ping")
2708 if not newbie_singlehomed:
2709 # check reachability from my secondary ip to newbie's secondary ip
2710 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2711 source=myself.secondary_ip):
2712 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2713 " based ping to noded port")
2715 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2720 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2721 # the new node will increase mc_max with one, so:
2722 mc_max = min(mc_max + 1, cp_size)
2723 self.master_candidate = mc_now < mc_max
2726 self.new_node = self.cfg.GetNodeInfo(node)
2727 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2729 self.new_node = objects.Node(name=node,
2730 primary_ip=primary_ip,
2731 secondary_ip=secondary_ip,
2732 master_candidate=self.master_candidate,
2733 offline=False, drained=False)
2735 def Exec(self, feedback_fn):
2736 """Adds the new node to the cluster.
2739 new_node = self.new_node
2740 node = new_node.name
2742 # for re-adds, reset the offline/drained/master-candidate flags;
2743 # we need to reset here, otherwise offline would prevent RPC calls
2744 # later in the procedure; this also means that if the re-add
2745 # fails, we are left with a non-offlined, broken node
2747 new_node.drained = new_node.offline = False
2748 self.LogInfo("Readding a node, the offline/drained flags were reset")
2749 # if we demote the node, we do cleanup later in the procedure
2750 new_node.master_candidate = self.master_candidate
2752 # notify the user about any possible mc promotion
2753 if new_node.master_candidate:
2754 self.LogInfo("Node will be a master candidate")
2756 # check connectivity
2757 result = self.rpc.call_version([node])[node]
2758 result.Raise("Can't get version information from node %s" % node)
2759 if constants.PROTOCOL_VERSION == result.payload:
2760 logging.info("Communication to node %s fine, sw version %s match",
2761 node, result.payload)
2763 raise errors.OpExecError("Version mismatch master version %s,"
2764 " node version %s" %
2765 (constants.PROTOCOL_VERSION, result.payload))
2768 logging.info("Copy ssh key to node %s", node)
2769 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2771 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2772 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2778 keyarray.append(f.read())
2782 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2784 keyarray[3], keyarray[4], keyarray[5])
2785 result.Raise("Cannot transfer ssh keys to the new node")
2787 # Add node to our /etc/hosts, and add key to known_hosts
2788 if self.cfg.GetClusterInfo().modify_etc_hosts:
2789 utils.AddHostToEtcHosts(new_node.name)
2791 if new_node.secondary_ip != new_node.primary_ip:
2792 result = self.rpc.call_node_has_ip_address(new_node.name,
2793 new_node.secondary_ip)
2794 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2796 if not result.payload:
2797 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2798 " you gave (%s). Please fix and re-run this"
2799 " command." % new_node.secondary_ip)
2801 node_verify_list = [self.cfg.GetMasterNode()]
2802 node_verify_param = {
2804 # TODO: do a node-net-test as well?
2807 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2808 self.cfg.GetClusterName())
2809 for verifier in node_verify_list:
2810 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2811 nl_payload = result[verifier].payload['nodelist']
2813 for failed in nl_payload:
2814 feedback_fn("ssh/hostname verification failed %s -> %s" %
2815 (verifier, nl_payload[failed]))
2816 raise errors.OpExecError("ssh/hostname verification failed.")
2819 _RedistributeAncillaryFiles(self)
2820 self.context.ReaddNode(new_node)
2821 # make sure we redistribute the config
2822 self.cfg.Update(new_node)
2823 # and make sure the new node will not have old files around
2824 if not new_node.master_candidate:
2825 result = self.rpc.call_node_demote_from_mc(new_node.name)
2826 msg = result.RemoteFailMsg()
2828 self.LogWarning("Node failed to demote itself from master"
2829 " candidate status: %s" % msg)
2831 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2832 self.context.AddNode(new_node)
2835 class LUSetNodeParams(LogicalUnit):
2836 """Modifies the parameters of a node.
2839 HPATH = "node-modify"
2840 HTYPE = constants.HTYPE_NODE
2841 _OP_REQP = ["node_name"]
2844 def CheckArguments(self):
2845 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2846 if node_name is None:
2847 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2848 self.op.node_name = node_name
2849 _CheckBooleanOpField(self.op, 'master_candidate')
2850 _CheckBooleanOpField(self.op, 'offline')
2851 _CheckBooleanOpField(self.op, 'drained')
2852 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2853 if all_mods.count(None) == 3:
2854 raise errors.OpPrereqError("Please pass at least one modification")
2855 if all_mods.count(True) > 1:
2856 raise errors.OpPrereqError("Can't set the node into more than one"
2857 " state at the same time")
2859 def ExpandNames(self):
2860 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2862 def BuildHooksEnv(self):
2865 This runs on the master node.
2869 "OP_TARGET": self.op.node_name,
2870 "MASTER_CANDIDATE": str(self.op.master_candidate),
2871 "OFFLINE": str(self.op.offline),
2872 "DRAINED": str(self.op.drained),
2874 nl = [self.cfg.GetMasterNode(),
2878 def CheckPrereq(self):
2879 """Check prerequisites.
2881 This only checks the instance list against the existing names.
2884 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2886 if ((self.op.master_candidate == False or self.op.offline == True or
2887 self.op.drained == True) and node.master_candidate):
2888 # we will demote the node from master_candidate
2889 if self.op.node_name == self.cfg.GetMasterNode():
2890 raise errors.OpPrereqError("The master node has to be a"
2891 " master candidate, online and not drained")
2892 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2893 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2894 if num_candidates <= cp_size:
2895 msg = ("Not enough master candidates (desired"
2896 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2898 self.LogWarning(msg)
2900 raise errors.OpPrereqError(msg)
2902 if (self.op.master_candidate == True and
2903 ((node.offline and not self.op.offline == False) or
2904 (node.drained and not self.op.drained == False))):
2905 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2906 " to master_candidate" % node.name)
2910 def Exec(self, feedback_fn):
2919 if self.op.offline is not None:
2920 node.offline = self.op.offline
2921 result.append(("offline", str(self.op.offline)))
2922 if self.op.offline == True:
2923 if node.master_candidate:
2924 node.master_candidate = False
2926 result.append(("master_candidate", "auto-demotion due to offline"))
2928 node.drained = False
2929 result.append(("drained", "clear drained status due to offline"))
2931 if self.op.master_candidate is not None:
2932 node.master_candidate = self.op.master_candidate
2934 result.append(("master_candidate", str(self.op.master_candidate)))
2935 if self.op.master_candidate == False:
2936 rrc = self.rpc.call_node_demote_from_mc(node.name)
2939 self.LogWarning("Node failed to demote itself: %s" % msg)
2941 if self.op.drained is not None:
2942 node.drained = self.op.drained
2943 result.append(("drained", str(self.op.drained)))
2944 if self.op.drained == True:
2945 if node.master_candidate:
2946 node.master_candidate = False
2948 result.append(("master_candidate", "auto-demotion due to drain"))
2949 rrc = self.rpc.call_node_demote_from_mc(node.name)
2950 msg = rrc.RemoteFailMsg()
2952 self.LogWarning("Node failed to demote itself: %s" % msg)
2954 node.offline = False
2955 result.append(("offline", "clear offline status due to drain"))
2957 # this will trigger configuration file update, if needed
2958 self.cfg.Update(node)
2959 # this will trigger job queue propagation or cleanup
2961 self.context.ReaddNode(node)
2966 class LUPowercycleNode(NoHooksLU):
2967 """Powercycles a node.
2970 _OP_REQP = ["node_name", "force"]
2973 def CheckArguments(self):
2974 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2975 if node_name is None:
2976 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2977 self.op.node_name = node_name
2978 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2979 raise errors.OpPrereqError("The node is the master and the force"
2980 " parameter was not set")
2982 def ExpandNames(self):
2983 """Locking for PowercycleNode.
2985 This is a last-resort option and shouldn't block on other
2986 jobs. Therefore, we grab no locks.
2989 self.needed_locks = {}
2991 def CheckPrereq(self):
2992 """Check prerequisites.
2994 This LU has no prereqs.
2999 def Exec(self, feedback_fn):
3003 result = self.rpc.call_node_powercycle(self.op.node_name,
3004 self.cfg.GetHypervisorType())
3005 result.Raise("Failed to schedule the reboot")
3006 return result.payload
3009 class LUQueryClusterInfo(NoHooksLU):
3010 """Query cluster configuration.
3016 def ExpandNames(self):
3017 self.needed_locks = {}
3019 def CheckPrereq(self):
3020 """No prerequsites needed for this LU.
3025 def Exec(self, feedback_fn):
3026 """Return cluster config.
3029 cluster = self.cfg.GetClusterInfo()
3031 "software_version": constants.RELEASE_VERSION,
3032 "protocol_version": constants.PROTOCOL_VERSION,
3033 "config_version": constants.CONFIG_VERSION,
3034 "os_api_version": max(constants.OS_API_VERSIONS),
3035 "export_version": constants.EXPORT_VERSION,
3036 "architecture": (platform.architecture()[0], platform.machine()),
3037 "name": cluster.cluster_name,
3038 "master": cluster.master_node,
3039 "default_hypervisor": cluster.enabled_hypervisors[0],
3040 "enabled_hypervisors": cluster.enabled_hypervisors,
3041 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3042 for hypervisor_name in cluster.enabled_hypervisors]),
3043 "beparams": cluster.beparams,
3044 "nicparams": cluster.nicparams,
3045 "candidate_pool_size": cluster.candidate_pool_size,
3046 "master_netdev": cluster.master_netdev,
3047 "volume_group_name": cluster.volume_group_name,
3048 "file_storage_dir": cluster.file_storage_dir,
3049 "ctime": cluster.ctime,
3050 "mtime": cluster.mtime,
3056 class LUQueryConfigValues(NoHooksLU):
3057 """Return configuration values.
3062 _FIELDS_DYNAMIC = utils.FieldSet()
3063 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
3065 def ExpandNames(self):
3066 self.needed_locks = {}
3068 _CheckOutputFields(static=self._FIELDS_STATIC,
3069 dynamic=self._FIELDS_DYNAMIC,
3070 selected=self.op.output_fields)
3072 def CheckPrereq(self):
3073 """No prerequisites.
3078 def Exec(self, feedback_fn):
3079 """Dump a representation of the cluster config to the standard output.
3083 for field in self.op.output_fields:
3084 if field == "cluster_name":
3085 entry = self.cfg.GetClusterName()
3086 elif field == "master_node":
3087 entry = self.cfg.GetMasterNode()
3088 elif field == "drain_flag":
3089 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3091 raise errors.ParameterError(field)
3092 values.append(entry)
3096 class LUActivateInstanceDisks(NoHooksLU):
3097 """Bring up an instance's disks.
3100 _OP_REQP = ["instance_name"]
3103 def ExpandNames(self):
3104 self._ExpandAndLockInstance()
3105 self.needed_locks[locking.LEVEL_NODE] = []
3106 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3108 def DeclareLocks(self, level):
3109 if level == locking.LEVEL_NODE:
3110 self._LockInstancesNodes()
3112 def CheckPrereq(self):
3113 """Check prerequisites.
3115 This checks that the instance is in the cluster.
3118 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3119 assert self.instance is not None, \
3120 "Cannot retrieve locked instance %s" % self.op.instance_name
3121 _CheckNodeOnline(self, self.instance.primary_node)
3122 if not hasattr(self.op, "ignore_size"):
3123 self.op.ignore_size = False
3125 def Exec(self, feedback_fn):
3126 """Activate the disks.
3129 disks_ok, disks_info = \
3130 _AssembleInstanceDisks(self, self.instance,
3131 ignore_size=self.op.ignore_size)
3133 raise errors.OpExecError("Cannot activate block devices")
3138 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3140 """Prepare the block devices for an instance.
3142 This sets up the block devices on all nodes.
3144 @type lu: L{LogicalUnit}
3145 @param lu: the logical unit on whose behalf we execute
3146 @type instance: L{objects.Instance}
3147 @param instance: the instance for whose disks we assemble
3148 @type ignore_secondaries: boolean
3149 @param ignore_secondaries: if true, errors on secondary nodes
3150 won't result in an error return from the function
3151 @type ignore_size: boolean
3152 @param ignore_size: if true, the current known size of the disk
3153 will not be used during the disk activation, useful for cases
3154 when the size is wrong
3155 @return: False if the operation failed, otherwise a list of
3156 (host, instance_visible_name, node_visible_name)
3157 with the mapping from node devices to instance devices
3162 iname = instance.name
3163 # With the two passes mechanism we try to reduce the window of
3164 # opportunity for the race condition of switching DRBD to primary
3165 # before handshaking occured, but we do not eliminate it
3167 # The proper fix would be to wait (with some limits) until the
3168 # connection has been made and drbd transitions from WFConnection
3169 # into any other network-connected state (Connected, SyncTarget,
3172 # 1st pass, assemble on all nodes in secondary mode
3173 for inst_disk in instance.disks:
3174 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3176 node_disk = node_disk.Copy()
3177 node_disk.UnsetSize()
3178 lu.cfg.SetDiskID(node_disk, node)
3179 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3180 msg = result.fail_msg
3182 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3183 " (is_primary=False, pass=1): %s",
3184 inst_disk.iv_name, node, msg)
3185 if not ignore_secondaries:
3188 # FIXME: race condition on drbd migration to primary
3190 # 2nd pass, do only the primary node
3191 for inst_disk in instance.disks:
3192 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3193 if node != instance.primary_node:
3196 node_disk = node_disk.Copy()
3197 node_disk.UnsetSize()
3198 lu.cfg.SetDiskID(node_disk, node)
3199 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3200 msg = result.fail_msg
3202 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3203 " (is_primary=True, pass=2): %s",
3204 inst_disk.iv_name, node, msg)
3206 device_info.append((instance.primary_node, inst_disk.iv_name,
3209 # leave the disks configured for the primary node
3210 # this is a workaround that would be fixed better by
3211 # improving the logical/physical id handling
3212 for disk in instance.disks:
3213 lu.cfg.SetDiskID(disk, instance.primary_node)
3215 return disks_ok, device_info
3218 def _StartInstanceDisks(lu, instance, force):
3219 """Start the disks of an instance.
3222 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3223 ignore_secondaries=force)
3225 _ShutdownInstanceDisks(lu, instance)
3226 if force is not None and not force:
3227 lu.proc.LogWarning("", hint="If the message above refers to a"
3229 " you can retry the operation using '--force'.")
3230 raise errors.OpExecError("Disk consistency error")
3233 class LUDeactivateInstanceDisks(NoHooksLU):
3234 """Shutdown an instance's disks.
3237 _OP_REQP = ["instance_name"]
3240 def ExpandNames(self):
3241 self._ExpandAndLockInstance()
3242 self.needed_locks[locking.LEVEL_NODE] = []
3243 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3245 def DeclareLocks(self, level):
3246 if level == locking.LEVEL_NODE:
3247 self._LockInstancesNodes()
3249 def CheckPrereq(self):
3250 """Check prerequisites.
3252 This checks that the instance is in the cluster.
3255 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3256 assert self.instance is not None, \
3257 "Cannot retrieve locked instance %s" % self.op.instance_name
3259 def Exec(self, feedback_fn):
3260 """Deactivate the disks
3263 instance = self.instance
3264 _SafeShutdownInstanceDisks(self, instance)
3267 def _SafeShutdownInstanceDisks(lu, instance):
3268 """Shutdown block devices of an instance.
3270 This function checks if an instance is running, before calling
3271 _ShutdownInstanceDisks.
3274 pnode = instance.primary_node
3275 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3276 ins_l.Raise("Can't contact node %s" % pnode)
3278 if instance.name in ins_l.payload:
3279 raise errors.OpExecError("Instance is running, can't shutdown"
3282 _ShutdownInstanceDisks(lu, instance)
3285 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3286 """Shutdown block devices of an instance.
3288 This does the shutdown on all nodes of the instance.
3290 If the ignore_primary is false, errors on the primary node are
3295 for disk in instance.disks:
3296 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3297 lu.cfg.SetDiskID(top_disk, node)
3298 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3299 msg = result.fail_msg
3301 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3302 disk.iv_name, node, msg)
3303 if not ignore_primary or node != instance.primary_node:
3308 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3309 """Checks if a node has enough free memory.
3311 This function check if a given node has the needed amount of free
3312 memory. In case the node has less memory or we cannot get the
3313 information from the node, this function raise an OpPrereqError
3316 @type lu: C{LogicalUnit}
3317 @param lu: a logical unit from which we get configuration data
3319 @param node: the node to check
3320 @type reason: C{str}
3321 @param reason: string to use in the error message
3322 @type requested: C{int}
3323 @param requested: the amount of memory in MiB to check for
3324 @type hypervisor_name: C{str}
3325 @param hypervisor_name: the hypervisor to ask for memory stats
3326 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3327 we cannot check the node
3330 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3331 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3332 free_mem = nodeinfo[node].payload.get('memory_free', None)
3333 if not isinstance(free_mem, int):
3334 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3335 " was '%s'" % (node, free_mem))
3336 if requested > free_mem:
3337 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3338 " needed %s MiB, available %s MiB" %
3339 (node, reason, requested, free_mem))
3342 class LUStartupInstance(LogicalUnit):
3343 """Starts an instance.
3346 HPATH = "instance-start"
3347 HTYPE = constants.HTYPE_INSTANCE
3348 _OP_REQP = ["instance_name", "force"]
3351 def ExpandNames(self):
3352 self._ExpandAndLockInstance()
3354 def BuildHooksEnv(self):
3357 This runs on master, primary and secondary nodes of the instance.
3361 "FORCE": self.op.force,
3363 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3364 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3367 def CheckPrereq(self):
3368 """Check prerequisites.
3370 This checks that the instance is in the cluster.
3373 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3374 assert self.instance is not None, \
3375 "Cannot retrieve locked instance %s" % self.op.instance_name
3378 self.beparams = getattr(self.op, "beparams", {})
3380 if not isinstance(self.beparams, dict):
3381 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3382 " dict" % (type(self.beparams), ))
3383 # fill the beparams dict
3384 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3385 self.op.beparams = self.beparams
3388 self.hvparams = getattr(self.op, "hvparams", {})
3390 if not isinstance(self.hvparams, dict):
3391 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3392 " dict" % (type(self.hvparams), ))
3394 # check hypervisor parameter syntax (locally)
3395 cluster = self.cfg.GetClusterInfo()
3396 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3397 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3399 filled_hvp.update(self.hvparams)
3400 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3401 hv_type.CheckParameterSyntax(filled_hvp)
3402 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3403 self.op.hvparams = self.hvparams
3405 _CheckNodeOnline(self, instance.primary_node)
3407 bep = self.cfg.GetClusterInfo().FillBE(instance)
3408 # check bridges existence
3409 _CheckInstanceBridgesExist(self, instance)
3411 remote_info = self.rpc.call_instance_info(instance.primary_node,
3413 instance.hypervisor)
3414 remote_info.Raise("Error checking node %s" % instance.primary_node,
3416 if not remote_info.payload: # not running already
3417 _CheckNodeFreeMemory(self, instance.primary_node,
3418 "starting instance %s" % instance.name,
3419 bep[constants.BE_MEMORY], instance.hypervisor)
3421 def Exec(self, feedback_fn):
3422 """Start the instance.
3425 instance = self.instance
3426 force = self.op.force
3428 self.cfg.MarkInstanceUp(instance.name)
3430 node_current = instance.primary_node
3432 _StartInstanceDisks(self, instance, force)
3434 result = self.rpc.call_instance_start(node_current, instance,
3435 self.hvparams, self.beparams)
3436 msg = result.fail_msg
3438 _ShutdownInstanceDisks(self, instance)
3439 raise errors.OpExecError("Could not start instance: %s" % msg)
3442 class LURebootInstance(LogicalUnit):
3443 """Reboot an instance.
3446 HPATH = "instance-reboot"
3447 HTYPE = constants.HTYPE_INSTANCE
3448 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3451 def ExpandNames(self):
3452 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3453 constants.INSTANCE_REBOOT_HARD,
3454 constants.INSTANCE_REBOOT_FULL]:
3455 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3456 (constants.INSTANCE_REBOOT_SOFT,
3457 constants.INSTANCE_REBOOT_HARD,
3458 constants.INSTANCE_REBOOT_FULL))
3459 self._ExpandAndLockInstance()
3461 def BuildHooksEnv(self):
3464 This runs on master, primary and secondary nodes of the instance.
3468 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3469 "REBOOT_TYPE": self.op.reboot_type,
3471 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3472 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3475 def CheckPrereq(self):
3476 """Check prerequisites.
3478 This checks that the instance is in the cluster.
3481 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3482 assert self.instance is not None, \
3483 "Cannot retrieve locked instance %s" % self.op.instance_name
3485 _CheckNodeOnline(self, instance.primary_node)
3487 # check bridges existence
3488 _CheckInstanceBridgesExist(self, instance)
3490 def Exec(self, feedback_fn):
3491 """Reboot the instance.
3494 instance = self.instance
3495 ignore_secondaries = self.op.ignore_secondaries
3496 reboot_type = self.op.reboot_type
3498 node_current = instance.primary_node
3500 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3501 constants.INSTANCE_REBOOT_HARD]:
3502 for disk in instance.disks:
3503 self.cfg.SetDiskID(disk, node_current)
3504 result = self.rpc.call_instance_reboot(node_current, instance,
3506 result.Raise("Could not reboot instance")
3508 result = self.rpc.call_instance_shutdown(node_current, instance)
3509 result.Raise("Could not shutdown instance for full reboot")
3510 _ShutdownInstanceDisks(self, instance)
3511 _StartInstanceDisks(self, instance, ignore_secondaries)
3512 result = self.rpc.call_instance_start(node_current, instance, None, None)
3513 msg = result.fail_msg
3515 _ShutdownInstanceDisks(self, instance)
3516 raise errors.OpExecError("Could not start instance for"
3517 " full reboot: %s" % msg)
3519 self.cfg.MarkInstanceUp(instance.name)
3522 class LUShutdownInstance(LogicalUnit):
3523 """Shutdown an instance.
3526 HPATH = "instance-stop"
3527 HTYPE = constants.HTYPE_INSTANCE
3528 _OP_REQP = ["instance_name"]
3531 def ExpandNames(self):
3532 self._ExpandAndLockInstance()
3534 def BuildHooksEnv(self):
3537 This runs on master, primary and secondary nodes of the instance.
3540 env = _BuildInstanceHookEnvByObject(self, self.instance)
3541 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3544 def CheckPrereq(self):
3545 """Check prerequisites.
3547 This checks that the instance is in the cluster.
3550 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3551 assert self.instance is not None, \
3552 "Cannot retrieve locked instance %s" % self.op.instance_name
3553 _CheckNodeOnline(self, self.instance.primary_node)
3555 def Exec(self, feedback_fn):
3556 """Shutdown the instance.
3559 instance = self.instance
3560 node_current = instance.primary_node
3561 self.cfg.MarkInstanceDown(instance.name)
3562 result = self.rpc.call_instance_shutdown(node_current, instance)
3563 msg = result.fail_msg
3565 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3567 _ShutdownInstanceDisks(self, instance)
3570 class LUReinstallInstance(LogicalUnit):
3571 """Reinstall an instance.
3574 HPATH = "instance-reinstall"
3575 HTYPE = constants.HTYPE_INSTANCE
3576 _OP_REQP = ["instance_name"]
3579 def ExpandNames(self):
3580 self._ExpandAndLockInstance()
3582 def BuildHooksEnv(self):
3585 This runs on master, primary and secondary nodes of the instance.
3588 env = _BuildInstanceHookEnvByObject(self, self.instance)
3589 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3592 def CheckPrereq(self):
3593 """Check prerequisites.
3595 This checks that the instance is in the cluster and is not running.
3598 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3599 assert instance is not None, \
3600 "Cannot retrieve locked instance %s" % self.op.instance_name
3601 _CheckNodeOnline(self, instance.primary_node)
3603 if instance.disk_template == constants.DT_DISKLESS:
3604 raise errors.OpPrereqError("Instance '%s' has no disks" %
3605 self.op.instance_name)
3606 if instance.admin_up:
3607 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3608 self.op.instance_name)
3609 remote_info = self.rpc.call_instance_info(instance.primary_node,
3611 instance.hypervisor)
3612 remote_info.Raise("Error checking node %s" % instance.primary_node,
3614 if remote_info.payload:
3615 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3616 (self.op.instance_name,
3617 instance.primary_node))
3619 self.op.os_type = getattr(self.op, "os_type", None)
3620 if self.op.os_type is not None:
3622 pnode = self.cfg.GetNodeInfo(
3623 self.cfg.ExpandNodeName(instance.primary_node))
3625 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3627 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3628 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3629 (self.op.os_type, pnode.name), prereq=True)
3631 self.instance = instance
3633 def Exec(self, feedback_fn):
3634 """Reinstall the instance.
3637 inst = self.instance
3639 if self.op.os_type is not None:
3640 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3641 inst.os = self.op.os_type
3642 self.cfg.Update(inst)
3644 _StartInstanceDisks(self, inst, None)
3646 feedback_fn("Running the instance OS create scripts...")
3647 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3648 result.Raise("Could not install OS for instance %s on node %s" %
3649 (inst.name, inst.primary_node))
3651 _ShutdownInstanceDisks(self, inst)
3654 class LURecreateInstanceDisks(LogicalUnit):
3655 """Recreate an instance's missing disks.
3658 HPATH = "instance-recreate-disks"
3659 HTYPE = constants.HTYPE_INSTANCE
3660 _OP_REQP = ["instance_name", "disks"]
3663 def CheckArguments(self):
3664 """Check the arguments.
3667 if not isinstance(self.op.disks, list):
3668 raise errors.OpPrereqError("Invalid disks parameter")
3669 for item in self.op.disks:
3670 if (not isinstance(item, int) or
3672 raise errors.OpPrereqError("Invalid disk specification '%s'" %
3675 def ExpandNames(self):
3676 self._ExpandAndLockInstance()
3678 def BuildHooksEnv(self):
3681 This runs on master, primary and secondary nodes of the instance.
3684 env = _BuildInstanceHookEnvByObject(self, self.instance)
3685 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3688 def CheckPrereq(self):
3689 """Check prerequisites.
3691 This checks that the instance is in the cluster and is not running.
3694 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3695 assert instance is not None, \
3696 "Cannot retrieve locked instance %s" % self.op.instance_name
3697 _CheckNodeOnline(self, instance.primary_node)
3699 if instance.disk_template == constants.DT_DISKLESS:
3700 raise errors.OpPrereqError("Instance '%s' has no disks" %
3701 self.op.instance_name)
3702 if instance.admin_up:
3703 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3704 self.op.instance_name)
3705 remote_info = self.rpc.call_instance_info(instance.primary_node,
3707 instance.hypervisor)
3708 remote_info.Raise("Error checking node %s" % instance.primary_node,
3710 if remote_info.payload:
3711 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3712 (self.op.instance_name,
3713 instance.primary_node))
3715 if not self.op.disks:
3716 self.op.disks = range(len(instance.disks))
3718 for idx in self.op.disks:
3719 if idx >= len(instance.disks):
3720 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
3722 self.instance = instance
3724 def Exec(self, feedback_fn):
3725 """Recreate the disks.
3729 for idx, disk in enumerate(self.instance.disks):
3730 if idx not in self.op.disks: # disk idx has not been passed in
3734 _CreateDisks(self, self.instance, to_skip=to_skip)
3737 class LURenameInstance(LogicalUnit):
3738 """Rename an instance.
3741 HPATH = "instance-rename"
3742 HTYPE = constants.HTYPE_INSTANCE
3743 _OP_REQP = ["instance_name", "new_name"]
3745 def BuildHooksEnv(self):
3748 This runs on master, primary and secondary nodes of the instance.
3751 env = _BuildInstanceHookEnvByObject(self, self.instance)
3752 env["INSTANCE_NEW_NAME"] = self.op.new_name
3753 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3756 def CheckPrereq(self):
3757 """Check prerequisites.
3759 This checks that the instance is in the cluster and is not running.
3762 instance = self.cfg.GetInstanceInfo(
3763 self.cfg.ExpandInstanceName(self.op.instance_name))
3764 if instance is None:
3765 raise errors.OpPrereqError("Instance '%s' not known" %
3766 self.op.instance_name)
3767 _CheckNodeOnline(self, instance.primary_node)
3769 if instance.admin_up:
3770 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3771 self.op.instance_name)
3772 remote_info = self.rpc.call_instance_info(instance.primary_node,
3774 instance.hypervisor)
3775 remote_info.Raise("Error checking node %s" % instance.primary_node,
3777 if remote_info.payload:
3778 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3779 (self.op.instance_name,
3780 instance.primary_node))
3781 self.instance = instance
3783 # new name verification
3784 name_info = utils.HostInfo(self.op.new_name)
3786 self.op.new_name = new_name = name_info.name
3787 instance_list = self.cfg.GetInstanceList()
3788 if new_name in instance_list:
3789 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3792 if not getattr(self.op, "ignore_ip", False):
3793 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3794 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3795 (name_info.ip, new_name))
3798 def Exec(self, feedback_fn):
3799 """Reinstall the instance.
3802 inst = self.instance
3803 old_name = inst.name
3805 if inst.disk_template == constants.DT_FILE:
3806 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3808 self.cfg.RenameInstance(inst.name, self.op.new_name)
3809 # Change the instance lock. This is definitely safe while we hold the BGL
3810 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3811 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3813 # re-read the instance from the configuration after rename
3814 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3816 if inst.disk_template == constants.DT_FILE:
3817 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3818 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3819 old_file_storage_dir,
3820 new_file_storage_dir)
3821 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3822 " (but the instance has been renamed in Ganeti)" %
3823 (inst.primary_node, old_file_storage_dir,
3824 new_file_storage_dir))
3826 _StartInstanceDisks(self, inst, None)
3828 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3830 msg = result.fail_msg
3832 msg = ("Could not run OS rename script for instance %s on node %s"
3833 " (but the instance has been renamed in Ganeti): %s" %
3834 (inst.name, inst.primary_node, msg))
3835 self.proc.LogWarning(msg)
3837 _ShutdownInstanceDisks(self, inst)
3840 class LURemoveInstance(LogicalUnit):
3841 """Remove an instance.
3844 HPATH = "instance-remove"
3845 HTYPE = constants.HTYPE_INSTANCE
3846 _OP_REQP = ["instance_name", "ignore_failures"]
3849 def ExpandNames(self):
3850 self._ExpandAndLockInstance()
3851 self.needed_locks[locking.LEVEL_NODE] = []
3852 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3854 def DeclareLocks(self, level):
3855 if level == locking.LEVEL_NODE:
3856 self._LockInstancesNodes()
3858 def BuildHooksEnv(self):
3861 This runs on master, primary and secondary nodes of the instance.
3864 env = _BuildInstanceHookEnvByObject(self, self.instance)
3865 nl = [self.cfg.GetMasterNode()]
3868 def CheckPrereq(self):
3869 """Check prerequisites.
3871 This checks that the instance is in the cluster.
3874 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3875 assert self.instance is not None, \
3876 "Cannot retrieve locked instance %s" % self.op.instance_name
3878 def Exec(self, feedback_fn):
3879 """Remove the instance.
3882 instance = self.instance
3883 logging.info("Shutting down instance %s on node %s",
3884 instance.name, instance.primary_node)
3886 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3887 msg = result.fail_msg
3889 if self.op.ignore_failures:
3890 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3892 raise errors.OpExecError("Could not shutdown instance %s on"
3894 (instance.name, instance.primary_node, msg))
3896 logging.info("Removing block devices for instance %s", instance.name)
3898 if not _RemoveDisks(self, instance):
3899 if self.op.ignore_failures:
3900 feedback_fn("Warning: can't remove instance's disks")
3902 raise errors.OpExecError("Can't remove instance's disks")
3904 logging.info("Removing instance %s out of cluster config", instance.name)
3906 self.cfg.RemoveInstance(instance.name)
3907 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3910 class LUQueryInstances(NoHooksLU):
3911 """Logical unit for querying instances.
3914 _OP_REQP = ["output_fields", "names", "use_locking"]
3916 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3918 "disk_template", "ip", "mac", "bridge",
3919 "nic_mode", "nic_link",
3920 "sda_size", "sdb_size", "vcpus", "tags",
3921 "network_port", "beparams",
3922 r"(disk)\.(size)/([0-9]+)",
3923 r"(disk)\.(sizes)", "disk_usage",
3924 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3925 r"(nic)\.(bridge)/([0-9]+)",
3926 r"(nic)\.(macs|ips|modes|links|bridges)",
3927 r"(disk|nic)\.(count)",
3928 "serial_no", "hypervisor", "hvparams",
3932 for name in constants.HVS_PARAMETERS] +
3934 for name in constants.BES_PARAMETERS])
3935 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3938 def ExpandNames(self):
3939 _CheckOutputFields(static=self._FIELDS_STATIC,
3940 dynamic=self._FIELDS_DYNAMIC,
3941 selected=self.op.output_fields)
3943 self.needed_locks = {}
3944 self.share_locks[locking.LEVEL_INSTANCE] = 1
3945 self.share_locks[locking.LEVEL_NODE] = 1
3948 self.wanted = _GetWantedInstances(self, self.op.names)
3950 self.wanted = locking.ALL_SET
3952 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3953 self.do_locking = self.do_node_query and self.op.use_locking
3955 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3956 self.needed_locks[locking.LEVEL_NODE] = []
3957 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3959 def DeclareLocks(self, level):
3960 if level == locking.LEVEL_NODE and self.do_locking:
3961 self._LockInstancesNodes()
3963 def CheckPrereq(self):
3964 """Check prerequisites.
3969 def Exec(self, feedback_fn):
3970 """Computes the list of nodes and their attributes.
3973 all_info = self.cfg.GetAllInstancesInfo()
3974 if self.wanted == locking.ALL_SET:
3975 # caller didn't specify instance names, so ordering is not important
3977 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3979 instance_names = all_info.keys()
3980 instance_names = utils.NiceSort(instance_names)
3982 # caller did specify names, so we must keep the ordering
3984 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3986 tgt_set = all_info.keys()
3987 missing = set(self.wanted).difference(tgt_set)
3989 raise errors.OpExecError("Some instances were removed before"
3990 " retrieving their data: %s" % missing)
3991 instance_names = self.wanted
3993 instance_list = [all_info[iname] for iname in instance_names]
3995 # begin data gathering
3997 nodes = frozenset([inst.primary_node for inst in instance_list])
3998 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4002 if self.do_node_query:
4004 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4006 result = node_data[name]
4008 # offline nodes will be in both lists
4009 off_nodes.append(name)
4010 if result.failed or result.fail_msg:
4011 bad_nodes.append(name)
4014 live_data.update(result.payload)
4015 # else no instance is alive
4017 live_data = dict([(name, {}) for name in instance_names])
4019 # end data gathering
4024 cluster = self.cfg.GetClusterInfo()
4025 for instance in instance_list:
4027 i_hv = cluster.FillHV(instance)
4028 i_be = cluster.FillBE(instance)
4029 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4030 nic.nicparams) for nic in instance.nics]
4031 for field in self.op.output_fields:
4032 st_match = self._FIELDS_STATIC.Matches(field)
4037 elif field == "pnode":
4038 val = instance.primary_node
4039 elif field == "snodes":
4040 val = list(instance.secondary_nodes)
4041 elif field == "admin_state":
4042 val = instance.admin_up
4043 elif field == "oper_state":
4044 if instance.primary_node in bad_nodes:
4047 val = bool(live_data.get(instance.name))
4048 elif field == "status":
4049 if instance.primary_node in off_nodes:
4050 val = "ERROR_nodeoffline"
4051 elif instance.primary_node in bad_nodes:
4052 val = "ERROR_nodedown"
4054 running = bool(live_data.get(instance.name))
4056 if instance.admin_up:
4061 if instance.admin_up:
4065 elif field == "oper_ram":
4066 if instance.primary_node in bad_nodes:
4068 elif instance.name in live_data:
4069 val = live_data[instance.name].get("memory", "?")
4072 elif field == "vcpus":
4073 val = i_be[constants.BE_VCPUS]
4074 elif field == "disk_template":
4075 val = instance.disk_template
4078 val = instance.nics[0].ip
4081 elif field == "nic_mode":
4083 val = i_nicp[0][constants.NIC_MODE]
4086 elif field == "nic_link":
4088 val = i_nicp[0][constants.NIC_LINK]
4091 elif field == "bridge":
4092 if (instance.nics and
4093 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4094 val = i_nicp[0][constants.NIC_LINK]
4097 elif field == "mac":
4099 val = instance.nics[0].mac
4102 elif field == "sda_size" or field == "sdb_size":
4103 idx = ord(field[2]) - ord('a')
4105 val = instance.FindDisk(idx).size
4106 except errors.OpPrereqError:
4108 elif field == "disk_usage": # total disk usage per node
4109 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4110 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4111 elif field == "tags":
4112 val = list(instance.GetTags())
4113 elif field == "serial_no":
4114 val = instance.serial_no
4115 elif field == "ctime":
4116 val = instance.ctime
4117 elif field == "mtime":
4118 val = instance.mtime
4119 elif field == "network_port":
4120 val = instance.network_port
4121 elif field == "hypervisor":
4122 val = instance.hypervisor
4123 elif field == "hvparams":
4125 elif (field.startswith(HVPREFIX) and
4126 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
4127 val = i_hv.get(field[len(HVPREFIX):], None)
4128 elif field == "beparams":
4130 elif (field.startswith(BEPREFIX) and
4131 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4132 val = i_be.get(field[len(BEPREFIX):], None)
4133 elif st_match and st_match.groups():
4134 # matches a variable list
4135 st_groups = st_match.groups()
4136 if st_groups and st_groups[0] == "disk":
4137 if st_groups[1] == "count":
4138 val = len(instance.disks)
4139 elif st_groups[1] == "sizes":
4140 val = [disk.size for disk in instance.disks]
4141 elif st_groups[1] == "size":
4143 val = instance.FindDisk(st_groups[2]).size
4144 except errors.OpPrereqError:
4147 assert False, "Unhandled disk parameter"
4148 elif st_groups[0] == "nic":
4149 if st_groups[1] == "count":
4150 val = len(instance.nics)
4151 elif st_groups[1] == "macs":
4152 val = [nic.mac for nic in instance.nics]
4153 elif st_groups[1] == "ips":
4154 val = [nic.ip for nic in instance.nics]
4155 elif st_groups[1] == "modes":
4156 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4157 elif st_groups[1] == "links":
4158 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4159 elif st_groups[1] == "bridges":
4162 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4163 val.append(nicp[constants.NIC_LINK])
4168 nic_idx = int(st_groups[2])
4169 if nic_idx >= len(instance.nics):
4172 if st_groups[1] == "mac":
4173 val = instance.nics[nic_idx].mac
4174 elif st_groups[1] == "ip":
4175 val = instance.nics[nic_idx].ip
4176 elif st_groups[1] == "mode":
4177 val = i_nicp[nic_idx][constants.NIC_MODE]
4178 elif st_groups[1] == "link":
4179 val = i_nicp[nic_idx][constants.NIC_LINK]
4180 elif st_groups[1] == "bridge":
4181 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4182 if nic_mode == constants.NIC_MODE_BRIDGED:
4183 val = i_nicp[nic_idx][constants.NIC_LINK]
4187 assert False, "Unhandled NIC parameter"
4189 assert False, ("Declared but unhandled variable parameter '%s'" %
4192 assert False, "Declared but unhandled parameter '%s'" % field
4199 class LUFailoverInstance(LogicalUnit):
4200 """Failover an instance.
4203 HPATH = "instance-failover"
4204 HTYPE = constants.HTYPE_INSTANCE
4205 _OP_REQP = ["instance_name", "ignore_consistency"]
4208 def ExpandNames(self):
4209 self._ExpandAndLockInstance()
4210 self.needed_locks[locking.LEVEL_NODE] = []
4211 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4213 def DeclareLocks(self, level):
4214 if level == locking.LEVEL_NODE:
4215 self._LockInstancesNodes()
4217 def BuildHooksEnv(self):
4220 This runs on master, primary and secondary nodes of the instance.
4224 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4226 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4227 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4230 def CheckPrereq(self):
4231 """Check prerequisites.
4233 This checks that the instance is in the cluster.
4236 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4237 assert self.instance is not None, \
4238 "Cannot retrieve locked instance %s" % self.op.instance_name
4240 bep = self.cfg.GetClusterInfo().FillBE(instance)
4241 if instance.disk_template not in constants.DTS_NET_MIRROR:
4242 raise errors.OpPrereqError("Instance's disk layout is not"
4243 " network mirrored, cannot failover.")
4245 secondary_nodes = instance.secondary_nodes
4246 if not secondary_nodes:
4247 raise errors.ProgrammerError("no secondary node but using "
4248 "a mirrored disk template")
4250 target_node = secondary_nodes[0]
4251 _CheckNodeOnline(self, target_node)
4252 _CheckNodeNotDrained(self, target_node)
4253 if instance.admin_up:
4254 # check memory requirements on the secondary node
4255 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4256 instance.name, bep[constants.BE_MEMORY],
4257 instance.hypervisor)
4259 self.LogInfo("Not checking memory on the secondary node as"
4260 " instance will not be started")
4262 # check bridge existance
4263 _CheckInstanceBridgesExist(self, instance, node=target_node)
4265 def Exec(self, feedback_fn):
4266 """Failover an instance.
4268 The failover is done by shutting it down on its present node and
4269 starting it on the secondary.
4272 instance = self.instance
4274 source_node = instance.primary_node
4275 target_node = instance.secondary_nodes[0]
4277 feedback_fn("* checking disk consistency between source and target")
4278 for dev in instance.disks:
4279 # for drbd, these are drbd over lvm
4280 if not _CheckDiskConsistency(self, dev, target_node, False):
4281 if instance.admin_up and not self.op.ignore_consistency:
4282 raise errors.OpExecError("Disk %s is degraded on target node,"
4283 " aborting failover." % dev.iv_name)
4285 feedback_fn("* shutting down instance on source node")
4286 logging.info("Shutting down instance %s on node %s",
4287 instance.name, source_node)
4289 result = self.rpc.call_instance_shutdown(source_node, instance)
4290 msg = result.fail_msg
4292 if self.op.ignore_consistency:
4293 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4294 " Proceeding anyway. Please make sure node"
4295 " %s is down. Error details: %s",
4296 instance.name, source_node, source_node, msg)
4298 raise errors.OpExecError("Could not shutdown instance %s on"
4300 (instance.name, source_node, msg))
4302 feedback_fn("* deactivating the instance's disks on source node")
4303 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4304 raise errors.OpExecError("Can't shut down the instance's disks.")
4306 instance.primary_node = target_node
4307 # distribute new instance config to the other nodes
4308 self.cfg.Update(instance)
4310 # Only start the instance if it's marked as up
4311 if instance.admin_up:
4312 feedback_fn("* activating the instance's disks on target node")
4313 logging.info("Starting instance %s on node %s",
4314 instance.name, target_node)
4316 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4317 ignore_secondaries=True)
4319 _ShutdownInstanceDisks(self, instance)
4320 raise errors.OpExecError("Can't activate the instance's disks")
4322 feedback_fn("* starting the instance on the target node")
4323 result = self.rpc.call_instance_start(target_node, instance, None, None)
4324 msg = result.fail_msg
4326 _ShutdownInstanceDisks(self, instance)
4327 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4328 (instance.name, target_node, msg))
4331 class LUMigrateInstance(LogicalUnit):
4332 """Migrate an instance.
4334 This is migration without shutting down, compared to the failover,
4335 which is done with shutdown.
4338 HPATH = "instance-migrate"
4339 HTYPE = constants.HTYPE_INSTANCE
4340 _OP_REQP = ["instance_name", "live", "cleanup"]
4344 def ExpandNames(self):
4345 self._ExpandAndLockInstance()
4347 self.needed_locks[locking.LEVEL_NODE] = []
4348 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4350 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4351 self.op.live, self.op.cleanup)
4352 self.tasklets = [self._migrater]
4354 def DeclareLocks(self, level):
4355 if level == locking.LEVEL_NODE:
4356 self._LockInstancesNodes()
4358 def BuildHooksEnv(self):
4361 This runs on master, primary and secondary nodes of the instance.
4364 instance = self._migrater.instance
4365 env = _BuildInstanceHookEnvByObject(self, instance)
4366 env["MIGRATE_LIVE"] = self.op.live
4367 env["MIGRATE_CLEANUP"] = self.op.cleanup
4368 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4372 class LUMoveInstance(LogicalUnit):
4373 """Move an instance by data-copying.
4376 HPATH = "instance-move"
4377 HTYPE = constants.HTYPE_INSTANCE
4378 _OP_REQP = ["instance_name", "target_node"]
4381 def ExpandNames(self):
4382 self._ExpandAndLockInstance()
4383 target_node = self.cfg.ExpandNodeName(self.op.target_node)
4384 if target_node is None:
4385 raise errors.OpPrereqError("Node '%s' not known" %
4386 self.op.target_node)
4387 self.op.target_node = target_node
4388 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4389 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4391 def DeclareLocks(self, level):
4392 if level == locking.LEVEL_NODE:
4393 self._LockInstancesNodes(primary_only=True)
4395 def BuildHooksEnv(self):
4398 This runs on master, primary and secondary nodes of the instance.
4402 "TARGET_NODE": self.op.target_node,
4404 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4405 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4406 self.op.target_node]
4409 def CheckPrereq(self):
4410 """Check prerequisites.
4412 This checks that the instance is in the cluster.
4415 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4416 assert self.instance is not None, \
4417 "Cannot retrieve locked instance %s" % self.op.instance_name
4419 node = self.cfg.GetNodeInfo(self.op.target_node)
4420 assert node is not None, \
4421 "Cannot retrieve locked node %s" % self.op.target_node
4423 self.target_node = target_node = node.name
4425 if target_node == instance.primary_node:
4426 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4427 (instance.name, target_node))
4429 bep = self.cfg.GetClusterInfo().FillBE(instance)
4431 for idx, dsk in enumerate(instance.disks):
4432 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4433 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4436 _CheckNodeOnline(self, target_node)
4437 _CheckNodeNotDrained(self, target_node)
4439 if instance.admin_up:
4440 # check memory requirements on the secondary node
4441 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4442 instance.name, bep[constants.BE_MEMORY],
4443 instance.hypervisor)
4445 self.LogInfo("Not checking memory on the secondary node as"
4446 " instance will not be started")
4448 # check bridge existance
4449 _CheckInstanceBridgesExist(self, instance, node=target_node)
4451 def Exec(self, feedback_fn):
4452 """Move an instance.
4454 The move is done by shutting it down on its present node, copying
4455 the data over (slow) and starting it on the new node.
4458 instance = self.instance
4460 source_node = instance.primary_node
4461 target_node = self.target_node
4463 self.LogInfo("Shutting down instance %s on source node %s",
4464 instance.name, source_node)
4466 result = self.rpc.call_instance_shutdown(source_node, instance)
4467 msg = result.fail_msg
4469 if self.op.ignore_consistency:
4470 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4471 " Proceeding anyway. Please make sure node"
4472 " %s is down. Error details: %s",
4473 instance.name, source_node, source_node, msg)
4475 raise errors.OpExecError("Could not shutdown instance %s on"
4477 (instance.name, source_node, msg))
4479 # create the target disks
4481 _CreateDisks(self, instance, target_node=target_node)
4482 except errors.OpExecError:
4483 self.LogWarning("Device creation failed, reverting...")
4485 _RemoveDisks(self, instance, target_node=target_node)
4487 self.cfg.ReleaseDRBDMinors(instance.name)
4490 cluster_name = self.cfg.GetClusterInfo().cluster_name
4493 # activate, get path, copy the data over
4494 for idx, disk in enumerate(instance.disks):
4495 self.LogInfo("Copying data for disk %d", idx)
4496 result = self.rpc.call_blockdev_assemble(target_node, disk,
4497 instance.name, True)
4499 self.LogWarning("Can't assemble newly created disk %d: %s",
4500 idx, result.fail_msg)
4501 errs.append(result.fail_msg)
4503 dev_path = result.payload
4504 result = self.rpc.call_blockdev_export(source_node, disk,
4505 target_node, dev_path,
4508 self.LogWarning("Can't copy data over for disk %d: %s",
4509 idx, result.fail_msg)
4510 errs.append(result.fail_msg)
4514 self.LogWarning("Some disks failed to copy, aborting")
4516 _RemoveDisks(self, instance, target_node=target_node)
4518 self.cfg.ReleaseDRBDMinors(instance.name)
4519 raise errors.OpExecError("Errors during disk copy: %s" %
4522 instance.primary_node = target_node
4523 self.cfg.Update(instance)
4525 self.LogInfo("Removing the disks on the original node")
4526 _RemoveDisks(self, instance, target_node=source_node)
4528 # Only start the instance if it's marked as up
4529 if instance.admin_up:
4530 self.LogInfo("Starting instance %s on node %s",
4531 instance.name, target_node)
4533 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4534 ignore_secondaries=True)
4536 _ShutdownInstanceDisks(self, instance)
4537 raise errors.OpExecError("Can't activate the instance's disks")
4539 result = self.rpc.call_instance_start(target_node, instance, None, None)
4540 msg = result.fail_msg
4542 _ShutdownInstanceDisks(self, instance)
4543 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4544 (instance.name, target_node, msg))
4547 class LUMigrateNode(LogicalUnit):
4548 """Migrate all instances from a node.
4551 HPATH = "node-migrate"
4552 HTYPE = constants.HTYPE_NODE
4553 _OP_REQP = ["node_name", "live"]
4556 def ExpandNames(self):
4557 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4558 if self.op.node_name is None:
4559 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4561 self.needed_locks = {
4562 locking.LEVEL_NODE: [self.op.node_name],
4565 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4567 # Create tasklets for migrating instances for all instances on this node
4571 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4572 logging.debug("Migrating instance %s", inst.name)
4573 names.append(inst.name)
4575 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4577 self.tasklets = tasklets
4579 # Declare instance locks
4580 self.needed_locks[locking.LEVEL_INSTANCE] = names
4582 def DeclareLocks(self, level):
4583 if level == locking.LEVEL_NODE:
4584 self._LockInstancesNodes()
4586 def BuildHooksEnv(self):
4589 This runs on the master, the primary and all the secondaries.
4593 "NODE_NAME": self.op.node_name,
4596 nl = [self.cfg.GetMasterNode()]
4598 return (env, nl, nl)
4601 class TLMigrateInstance(Tasklet):
4602 def __init__(self, lu, instance_name, live, cleanup):
4603 """Initializes this class.
4606 Tasklet.__init__(self, lu)
4609 self.instance_name = instance_name
4611 self.cleanup = cleanup
4613 def CheckPrereq(self):
4614 """Check prerequisites.
4616 This checks that the instance is in the cluster.
4619 instance = self.cfg.GetInstanceInfo(
4620 self.cfg.ExpandInstanceName(self.instance_name))
4621 if instance is None:
4622 raise errors.OpPrereqError("Instance '%s' not known" %
4625 if instance.disk_template != constants.DT_DRBD8:
4626 raise errors.OpPrereqError("Instance's disk layout is not"
4627 " drbd8, cannot migrate.")
4629 secondary_nodes = instance.secondary_nodes
4630 if not secondary_nodes:
4631 raise errors.ConfigurationError("No secondary node but using"
4632 " drbd8 disk template")
4634 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4636 target_node = secondary_nodes[0]
4637 # check memory requirements on the secondary node
4638 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4639 instance.name, i_be[constants.BE_MEMORY],
4640 instance.hypervisor)
4642 # check bridge existance
4643 _CheckInstanceBridgesExist(self, instance, node=target_node)
4645 if not self.cleanup:
4646 _CheckNodeNotDrained(self, target_node)
4647 result = self.rpc.call_instance_migratable(instance.primary_node,
4649 result.Raise("Can't migrate, please use failover", prereq=True)
4651 self.instance = instance
4653 def _WaitUntilSync(self):
4654 """Poll with custom rpc for disk sync.
4656 This uses our own step-based rpc call.
4659 self.feedback_fn("* wait until resync is done")
4663 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4665 self.instance.disks)
4667 for node, nres in result.items():
4668 nres.Raise("Cannot resync disks on node %s" % node)
4669 node_done, node_percent = nres.payload
4670 all_done = all_done and node_done
4671 if node_percent is not None:
4672 min_percent = min(min_percent, node_percent)
4674 if min_percent < 100:
4675 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4678 def _EnsureSecondary(self, node):
4679 """Demote a node to secondary.
4682 self.feedback_fn("* switching node %s to secondary mode" % node)
4684 for dev in self.instance.disks:
4685 self.cfg.SetDiskID(dev, node)
4687 result = self.rpc.call_blockdev_close(node, self.instance.name,
4688 self.instance.disks)
4689 result.Raise("Cannot change disk to secondary on node %s" % node)
4691 def _GoStandalone(self):
4692 """Disconnect from the network.
4695 self.feedback_fn("* changing into standalone mode")
4696 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4697 self.instance.disks)
4698 for node, nres in result.items():
4699 nres.Raise("Cannot disconnect disks node %s" % node)
4701 def _GoReconnect(self, multimaster):
4702 """Reconnect to the network.
4708 msg = "single-master"
4709 self.feedback_fn("* changing disks into %s mode" % msg)
4710 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4711 self.instance.disks,
4712 self.instance.name, multimaster)
4713 for node, nres in result.items():
4714 nres.Raise("Cannot change disks config on node %s" % node)
4716 def _ExecCleanup(self):
4717 """Try to cleanup after a failed migration.
4719 The cleanup is done by:
4720 - check that the instance is running only on one node
4721 (and update the config if needed)
4722 - change disks on its secondary node to secondary
4723 - wait until disks are fully synchronized
4724 - disconnect from the network
4725 - change disks into single-master mode
4726 - wait again until disks are fully synchronized
4729 instance = self.instance
4730 target_node = self.target_node
4731 source_node = self.source_node
4733 # check running on only one node
4734 self.feedback_fn("* checking where the instance actually runs"
4735 " (if this hangs, the hypervisor might be in"
4737 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4738 for node, result in ins_l.items():
4739 result.Raise("Can't contact node %s" % node)
4741 runningon_source = instance.name in ins_l[source_node].payload
4742 runningon_target = instance.name in ins_l[target_node].payload
4744 if runningon_source and runningon_target:
4745 raise errors.OpExecError("Instance seems to be running on two nodes,"
4746 " or the hypervisor is confused. You will have"
4747 " to ensure manually that it runs only on one"
4748 " and restart this operation.")
4750 if not (runningon_source or runningon_target):
4751 raise errors.OpExecError("Instance does not seem to be running at all."
4752 " In this case, it's safer to repair by"
4753 " running 'gnt-instance stop' to ensure disk"
4754 " shutdown, and then restarting it.")
4756 if runningon_target:
4757 # the migration has actually succeeded, we need to update the config
4758 self.feedback_fn("* instance running on secondary node (%s),"
4759 " updating config" % target_node)
4760 instance.primary_node = target_node
4761 self.cfg.Update(instance)
4762 demoted_node = source_node
4764 self.feedback_fn("* instance confirmed to be running on its"
4765 " primary node (%s)" % source_node)
4766 demoted_node = target_node
4768 self._EnsureSecondary(demoted_node)
4770 self._WaitUntilSync()
4771 except errors.OpExecError:
4772 # we ignore here errors, since if the device is standalone, it
4773 # won't be able to sync
4775 self._GoStandalone()
4776 self._GoReconnect(False)
4777 self._WaitUntilSync()
4779 self.feedback_fn("* done")
4781 def _RevertDiskStatus(self):
4782 """Try to revert the disk status after a failed migration.
4785 target_node = self.target_node
4787 self._EnsureSecondary(target_node)
4788 self._GoStandalone()
4789 self._GoReconnect(False)
4790 self._WaitUntilSync()
4791 except errors.OpExecError, err:
4792 self.lu.LogWarning("Migration failed and I can't reconnect the"
4793 " drives: error '%s'\n"
4794 "Please look and recover the instance status" %
4797 def _AbortMigration(self):
4798 """Call the hypervisor code to abort a started migration.
4801 instance = self.instance
4802 target_node = self.target_node
4803 migration_info = self.migration_info
4805 abort_result = self.rpc.call_finalize_migration(target_node,
4809 abort_msg = abort_result.fail_msg
4811 logging.error("Aborting migration failed on target node %s: %s" %
4812 (target_node, abort_msg))
4813 # Don't raise an exception here, as we stil have to try to revert the
4814 # disk status, even if this step failed.
4816 def _ExecMigration(self):
4817 """Migrate an instance.
4819 The migrate is done by:
4820 - change the disks into dual-master mode
4821 - wait until disks are fully synchronized again
4822 - migrate the instance
4823 - change disks on the new secondary node (the old primary) to secondary
4824 - wait until disks are fully synchronized
4825 - change disks into single-master mode
4828 instance = self.instance
4829 target_node = self.target_node
4830 source_node = self.source_node
4832 self.feedback_fn("* checking disk consistency between source and target")
4833 for dev in instance.disks:
4834 if not _CheckDiskConsistency(self, dev, target_node, False):
4835 raise errors.OpExecError("Disk %s is degraded or not fully"
4836 " synchronized on target node,"
4837 " aborting migrate." % dev.iv_name)
4839 # First get the migration information from the remote node
4840 result = self.rpc.call_migration_info(source_node, instance)
4841 msg = result.fail_msg
4843 log_err = ("Failed fetching source migration information from %s: %s" %
4845 logging.error(log_err)
4846 raise errors.OpExecError(log_err)
4848 self.migration_info = migration_info = result.payload
4850 # Then switch the disks to master/master mode
4851 self._EnsureSecondary(target_node)
4852 self._GoStandalone()
4853 self._GoReconnect(True)
4854 self._WaitUntilSync()
4856 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4857 result = self.rpc.call_accept_instance(target_node,
4860 self.nodes_ip[target_node])
4862 msg = result.fail_msg
4864 logging.error("Instance pre-migration failed, trying to revert"
4865 " disk status: %s", msg)
4866 self._AbortMigration()
4867 self._RevertDiskStatus()
4868 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4869 (instance.name, msg))
4871 self.feedback_fn("* migrating instance to %s" % target_node)
4873 result = self.rpc.call_instance_migrate(source_node, instance,
4874 self.nodes_ip[target_node],
4876 msg = result.fail_msg
4878 logging.error("Instance migration failed, trying to revert"
4879 " disk status: %s", msg)
4880 self._AbortMigration()
4881 self._RevertDiskStatus()
4882 raise errors.OpExecError("Could not migrate instance %s: %s" %
4883 (instance.name, msg))
4886 instance.primary_node = target_node
4887 # distribute new instance config to the other nodes
4888 self.cfg.Update(instance)
4890 result = self.rpc.call_finalize_migration(target_node,
4894 msg = result.fail_msg
4896 logging.error("Instance migration succeeded, but finalization failed:"
4898 raise errors.OpExecError("Could not finalize instance migration: %s" %
4901 self._EnsureSecondary(source_node)
4902 self._WaitUntilSync()
4903 self._GoStandalone()
4904 self._GoReconnect(False)
4905 self._WaitUntilSync()
4907 self.feedback_fn("* done")
4909 def Exec(self, feedback_fn):
4910 """Perform the migration.
4913 feedback_fn("Migrating instance %s" % self.instance.name)
4915 self.feedback_fn = feedback_fn
4917 self.source_node = self.instance.primary_node
4918 self.target_node = self.instance.secondary_nodes[0]
4919 self.all_nodes = [self.source_node, self.target_node]
4921 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4922 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4926 return self._ExecCleanup()
4928 return self._ExecMigration()
4931 def _CreateBlockDev(lu, node, instance, device, force_create,
4933 """Create a tree of block devices on a given node.
4935 If this device type has to be created on secondaries, create it and
4938 If not, just recurse to children keeping the same 'force' value.
4940 @param lu: the lu on whose behalf we execute
4941 @param node: the node on which to create the device
4942 @type instance: L{objects.Instance}
4943 @param instance: the instance which owns the device
4944 @type device: L{objects.Disk}
4945 @param device: the device to create
4946 @type force_create: boolean
4947 @param force_create: whether to force creation of this device; this
4948 will be change to True whenever we find a device which has
4949 CreateOnSecondary() attribute
4950 @param info: the extra 'metadata' we should attach to the device
4951 (this will be represented as a LVM tag)
4952 @type force_open: boolean
4953 @param force_open: this parameter will be passes to the
4954 L{backend.BlockdevCreate} function where it specifies
4955 whether we run on primary or not, and it affects both
4956 the child assembly and the device own Open() execution
4959 if device.CreateOnSecondary():
4963 for child in device.children:
4964 _CreateBlockDev(lu, node, instance, child, force_create,
4967 if not force_create:
4970 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4973 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4974 """Create a single block device on a given node.
4976 This will not recurse over children of the device, so they must be
4979 @param lu: the lu on whose behalf we execute
4980 @param node: the node on which to create the device
4981 @type instance: L{objects.Instance}
4982 @param instance: the instance which owns the device
4983 @type device: L{objects.Disk}
4984 @param device: the device to create
4985 @param info: the extra 'metadata' we should attach to the device
4986 (this will be represented as a LVM tag)
4987 @type force_open: boolean
4988 @param force_open: this parameter will be passes to the
4989 L{backend.BlockdevCreate} function where it specifies
4990 whether we run on primary or not, and it affects both
4991 the child assembly and the device own Open() execution
4994 lu.cfg.SetDiskID(device, node)
4995 result = lu.rpc.call_blockdev_create(node, device, device.size,
4996 instance.name, force_open, info)
4997 result.Raise("Can't create block device %s on"
4998 " node %s for instance %s" % (device, node, instance.name))
4999 if device.physical_id is None:
5000 device.physical_id = result.payload
5003 def _GenerateUniqueNames(lu, exts):
5004 """Generate a suitable LV name.
5006 This will generate a logical volume name for the given instance.
5011 new_id = lu.cfg.GenerateUniqueID()
5012 results.append("%s%s" % (new_id, val))
5016 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5018 """Generate a drbd8 device complete with its children.
5021 port = lu.cfg.AllocatePort()
5022 vgname = lu.cfg.GetVGName()
5023 shared_secret = lu.cfg.GenerateDRBDSecret()
5024 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5025 logical_id=(vgname, names[0]))
5026 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5027 logical_id=(vgname, names[1]))
5028 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5029 logical_id=(primary, secondary, port,
5032 children=[dev_data, dev_meta],
5037 def _GenerateDiskTemplate(lu, template_name,
5038 instance_name, primary_node,
5039 secondary_nodes, disk_info,
5040 file_storage_dir, file_driver,
5042 """Generate the entire disk layout for a given template type.
5045 #TODO: compute space requirements
5047 vgname = lu.cfg.GetVGName()
5048 disk_count = len(disk_info)
5050 if template_name == constants.DT_DISKLESS:
5052 elif template_name == constants.DT_PLAIN:
5053 if len(secondary_nodes) != 0:
5054 raise errors.ProgrammerError("Wrong template configuration")
5056 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5057 for i in range(disk_count)])
5058 for idx, disk in enumerate(disk_info):
5059 disk_index = idx + base_index
5060 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5061 logical_id=(vgname, names[idx]),
5062 iv_name="disk/%d" % disk_index,
5064 disks.append(disk_dev)
5065 elif template_name == constants.DT_DRBD8:
5066 if len(secondary_nodes) != 1:
5067 raise errors.ProgrammerError("Wrong template configuration")
5068 remote_node = secondary_nodes[0]
5069 minors = lu.cfg.AllocateDRBDMinor(
5070 [primary_node, remote_node] * len(disk_info), instance_name)
5073 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5074 for i in range(disk_count)]):
5075 names.append(lv_prefix + "_data")
5076 names.append(lv_prefix + "_meta")
5077 for idx, disk in enumerate(disk_info):
5078 disk_index = idx + base_index
5079 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5080 disk["size"], names[idx*2:idx*2+2],
5081 "disk/%d" % disk_index,
5082 minors[idx*2], minors[idx*2+1])
5083 disk_dev.mode = disk["mode"]
5084 disks.append(disk_dev)
5085 elif template_name == constants.DT_FILE:
5086 if len(secondary_nodes) != 0:
5087 raise errors.ProgrammerError("Wrong template configuration")
5089 for idx, disk in enumerate(disk_info):
5090 disk_index = idx + base_index
5091 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5092 iv_name="disk/%d" % disk_index,
5093 logical_id=(file_driver,
5094 "%s/disk%d" % (file_storage_dir,
5097 disks.append(disk_dev)
5099 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5103 def _GetInstanceInfoText(instance):
5104 """Compute that text that should be added to the disk's metadata.
5107 return "originstname+%s" % instance.name
5110 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5111 """Create all disks for an instance.
5113 This abstracts away some work from AddInstance.
5115 @type lu: L{LogicalUnit}
5116 @param lu: the logical unit on whose behalf we execute
5117 @type instance: L{objects.Instance}
5118 @param instance: the instance whose disks we should create
5120 @param to_skip: list of indices to skip
5121 @type target_node: string
5122 @param target_node: if passed, overrides the target node for creation
5124 @return: the success of the creation
5127 info = _GetInstanceInfoText(instance)
5128 if target_node is None:
5129 pnode = instance.primary_node
5130 all_nodes = instance.all_nodes
5135 if instance.disk_template == constants.DT_FILE:
5136 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5137 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5139 result.Raise("Failed to create directory '%s' on"
5140 " node %s: %s" % (file_storage_dir, pnode))
5142 # Note: this needs to be kept in sync with adding of disks in
5143 # LUSetInstanceParams
5144 for idx, device in enumerate(instance.disks):
5145 if to_skip and idx in to_skip:
5147 logging.info("Creating volume %s for instance %s",
5148 device.iv_name, instance.name)
5150 for node in all_nodes:
5151 f_create = node == pnode
5152 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5155 def _RemoveDisks(lu, instance, target_node=None):
5156 """Remove all disks for an instance.
5158 This abstracts away some work from `AddInstance()` and
5159 `RemoveInstance()`. Note that in case some of the devices couldn't
5160 be removed, the removal will continue with the other ones (compare
5161 with `_CreateDisks()`).
5163 @type lu: L{LogicalUnit}
5164 @param lu: the logical unit on whose behalf we execute
5165 @type instance: L{objects.Instance}
5166 @param instance: the instance whose disks we should remove
5167 @type target_node: string
5168 @param target_node: used to override the node on which to remove the disks
5170 @return: the success of the removal
5173 logging.info("Removing block devices for instance %s", instance.name)
5176 for device in instance.disks:
5178 edata = [(target_node, device)]
5180 edata = device.ComputeNodeTree(instance.primary_node)
5181 for node, disk in edata:
5182 lu.cfg.SetDiskID(disk, node)
5183 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5185 lu.LogWarning("Could not remove block device %s on node %s,"
5186 " continuing anyway: %s", device.iv_name, node, msg)
5189 if instance.disk_template == constants.DT_FILE:
5190 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5191 if target_node is node:
5192 tgt = instance.primary_node
5194 tgt = instance.target_node
5195 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5197 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5198 file_storage_dir, instance.primary_node, result.fail_msg)
5204 def _ComputeDiskSize(disk_template, disks):
5205 """Compute disk size requirements in the volume group
5208 # Required free disk space as a function of disk and swap space
5210 constants.DT_DISKLESS: None,
5211 constants.DT_PLAIN: sum(d["size"] for d in disks),
5212 # 128 MB are added for drbd metadata for each disk
5213 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5214 constants.DT_FILE: None,
5217 if disk_template not in req_size_dict:
5218 raise errors.ProgrammerError("Disk template '%s' size requirement"
5219 " is unknown" % disk_template)
5221 return req_size_dict[disk_template]
5224 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5225 """Hypervisor parameter validation.
5227 This function abstract the hypervisor parameter validation to be
5228 used in both instance create and instance modify.
5230 @type lu: L{LogicalUnit}
5231 @param lu: the logical unit for which we check
5232 @type nodenames: list
5233 @param nodenames: the list of nodes on which we should check
5234 @type hvname: string
5235 @param hvname: the name of the hypervisor we should use
5236 @type hvparams: dict
5237 @param hvparams: the parameters which we need to check
5238 @raise errors.OpPrereqError: if the parameters are not valid
5241 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5244 for node in nodenames:
5248 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5251 class LUCreateInstance(LogicalUnit):
5252 """Create an instance.
5255 HPATH = "instance-add"
5256 HTYPE = constants.HTYPE_INSTANCE
5257 _OP_REQP = ["instance_name", "disks", "disk_template",
5259 "wait_for_sync", "ip_check", "nics",
5260 "hvparams", "beparams"]
5263 def _ExpandNode(self, node):
5264 """Expands and checks one node name.
5267 node_full = self.cfg.ExpandNodeName(node)
5268 if node_full is None:
5269 raise errors.OpPrereqError("Unknown node %s" % node)
5272 def ExpandNames(self):
5273 """ExpandNames for CreateInstance.
5275 Figure out the right locks for instance creation.
5278 self.needed_locks = {}
5280 # set optional parameters to none if they don't exist
5281 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5282 if not hasattr(self.op, attr):
5283 setattr(self.op, attr, None)
5285 # cheap checks, mostly valid constants given
5287 # verify creation mode
5288 if self.op.mode not in (constants.INSTANCE_CREATE,
5289 constants.INSTANCE_IMPORT):
5290 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5293 # disk template and mirror node verification
5294 if self.op.disk_template not in constants.DISK_TEMPLATES:
5295 raise errors.OpPrereqError("Invalid disk template name")
5297 if self.op.hypervisor is None:
5298 self.op.hypervisor = self.cfg.GetHypervisorType()
5300 cluster = self.cfg.GetClusterInfo()
5301 enabled_hvs = cluster.enabled_hypervisors
5302 if self.op.hypervisor not in enabled_hvs:
5303 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5304 " cluster (%s)" % (self.op.hypervisor,
5305 ",".join(enabled_hvs)))
5307 # check hypervisor parameter syntax (locally)
5308 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5309 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5311 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5312 hv_type.CheckParameterSyntax(filled_hvp)
5313 self.hv_full = filled_hvp
5315 # fill and remember the beparams dict
5316 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5317 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5320 #### instance parameters check
5322 # instance name verification
5323 hostname1 = utils.HostInfo(self.op.instance_name)
5324 self.op.instance_name = instance_name = hostname1.name
5326 # this is just a preventive check, but someone might still add this
5327 # instance in the meantime, and creation will fail at lock-add time
5328 if instance_name in self.cfg.GetInstanceList():
5329 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5332 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5336 for idx, nic in enumerate(self.op.nics):
5337 nic_mode_req = nic.get("mode", None)
5338 nic_mode = nic_mode_req
5339 if nic_mode is None:
5340 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5342 # in routed mode, for the first nic, the default ip is 'auto'
5343 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5344 default_ip_mode = constants.VALUE_AUTO
5346 default_ip_mode = constants.VALUE_NONE
5348 # ip validity checks
5349 ip = nic.get("ip", default_ip_mode)
5350 if ip is None or ip.lower() == constants.VALUE_NONE:
5352 elif ip.lower() == constants.VALUE_AUTO:
5353 nic_ip = hostname1.ip
5355 if not utils.IsValidIP(ip):
5356 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5357 " like a valid IP" % ip)
5360 # TODO: check the ip for uniqueness !!
5361 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5362 raise errors.OpPrereqError("Routed nic mode requires an ip address")
5364 # MAC address verification
5365 mac = nic.get("mac", constants.VALUE_AUTO)
5366 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5367 if not utils.IsValidMac(mac.lower()):
5368 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5370 # bridge verification
5371 bridge = nic.get("bridge", None)
5372 link = nic.get("link", None)
5374 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5375 " at the same time")
5376 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5377 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
5383 nicparams[constants.NIC_MODE] = nic_mode_req
5385 nicparams[constants.NIC_LINK] = link
5387 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5389 objects.NIC.CheckParameterSyntax(check_params)
5390 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5392 # disk checks/pre-build
5394 for disk in self.op.disks:
5395 mode = disk.get("mode", constants.DISK_RDWR)
5396 if mode not in constants.DISK_ACCESS_SET:
5397 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5399 size = disk.get("size", None)
5401 raise errors.OpPrereqError("Missing disk size")
5405 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
5406 self.disks.append({"size": size, "mode": mode})
5408 # used in CheckPrereq for ip ping check
5409 self.check_ip = hostname1.ip
5411 # file storage checks
5412 if (self.op.file_driver and
5413 not self.op.file_driver in constants.FILE_DRIVER):
5414 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5415 self.op.file_driver)
5417 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5418 raise errors.OpPrereqError("File storage directory path not absolute")
5420 ### Node/iallocator related checks
5421 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5422 raise errors.OpPrereqError("One and only one of iallocator and primary"
5423 " node must be given")
5425 if self.op.iallocator:
5426 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5428 self.op.pnode = self._ExpandNode(self.op.pnode)
5429 nodelist = [self.op.pnode]
5430 if self.op.snode is not None:
5431 self.op.snode = self._ExpandNode(self.op.snode)
5432 nodelist.append(self.op.snode)
5433 self.needed_locks[locking.LEVEL_NODE] = nodelist
5435 # in case of import lock the source node too
5436 if self.op.mode == constants.INSTANCE_IMPORT:
5437 src_node = getattr(self.op, "src_node", None)
5438 src_path = getattr(self.op, "src_path", None)
5440 if src_path is None:
5441 self.op.src_path = src_path = self.op.instance_name
5443 if src_node is None:
5444 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5445 self.op.src_node = None
5446 if os.path.isabs(src_path):
5447 raise errors.OpPrereqError("Importing an instance from an absolute"
5448 " path requires a source node option.")
5450 self.op.src_node = src_node = self._ExpandNode(src_node)
5451 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5452 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5453 if not os.path.isabs(src_path):
5454 self.op.src_path = src_path = \
5455 os.path.join(constants.EXPORT_DIR, src_path)
5457 else: # INSTANCE_CREATE
5458 if getattr(self.op, "os_type", None) is None:
5459 raise errors.OpPrereqError("No guest OS specified")
5461 def _RunAllocator(self):
5462 """Run the allocator based on input opcode.
5465 nics = [n.ToDict() for n in self.nics]
5466 ial = IAllocator(self.cfg, self.rpc,
5467 mode=constants.IALLOCATOR_MODE_ALLOC,
5468 name=self.op.instance_name,
5469 disk_template=self.op.disk_template,
5472 vcpus=self.be_full[constants.BE_VCPUS],
5473 mem_size=self.be_full[constants.BE_MEMORY],
5476 hypervisor=self.op.hypervisor,
5479 ial.Run(self.op.iallocator)
5482 raise errors.OpPrereqError("Can't compute nodes using"
5483 " iallocator '%s': %s" % (self.op.iallocator,
5485 if len(ial.nodes) != ial.required_nodes:
5486 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5487 " of nodes (%s), required %s" %
5488 (self.op.iallocator, len(ial.nodes),
5489 ial.required_nodes))
5490 self.op.pnode = ial.nodes[0]
5491 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5492 self.op.instance_name, self.op.iallocator,
5493 ", ".join(ial.nodes))
5494 if ial.required_nodes == 2:
5495 self.op.snode = ial.nodes[1]
5497 def BuildHooksEnv(self):
5500 This runs on master, primary and secondary nodes of the instance.
5504 "ADD_MODE": self.op.mode,
5506 if self.op.mode == constants.INSTANCE_IMPORT:
5507 env["SRC_NODE"] = self.op.src_node
5508 env["SRC_PATH"] = self.op.src_path
5509 env["SRC_IMAGES"] = self.src_images
5511 env.update(_BuildInstanceHookEnv(
5512 name=self.op.instance_name,
5513 primary_node=self.op.pnode,
5514 secondary_nodes=self.secondaries,
5515 status=self.op.start,
5516 os_type=self.op.os_type,
5517 memory=self.be_full[constants.BE_MEMORY],
5518 vcpus=self.be_full[constants.BE_VCPUS],
5519 nics=_NICListToTuple(self, self.nics),
5520 disk_template=self.op.disk_template,
5521 disks=[(d["size"], d["mode"]) for d in self.disks],
5524 hypervisor_name=self.op.hypervisor,
5527 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5532 def CheckPrereq(self):
5533 """Check prerequisites.
5536 if (not self.cfg.GetVGName() and
5537 self.op.disk_template not in constants.DTS_NOT_LVM):
5538 raise errors.OpPrereqError("Cluster does not support lvm-based"
5541 if self.op.mode == constants.INSTANCE_IMPORT:
5542 src_node = self.op.src_node
5543 src_path = self.op.src_path
5545 if src_node is None:
5546 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5547 exp_list = self.rpc.call_export_list(locked_nodes)
5549 for node in exp_list:
5550 if exp_list[node].fail_msg:
5552 if src_path in exp_list[node].payload:
5554 self.op.src_node = src_node = node
5555 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5559 raise errors.OpPrereqError("No export found for relative path %s" %
5562 _CheckNodeOnline(self, src_node)
5563 result = self.rpc.call_export_info(src_node, src_path)
5564 result.Raise("No export or invalid export found in dir %s" % src_path)
5566 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5567 if not export_info.has_section(constants.INISECT_EXP):
5568 raise errors.ProgrammerError("Corrupted export config")
5570 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5571 if (int(ei_version) != constants.EXPORT_VERSION):
5572 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5573 (ei_version, constants.EXPORT_VERSION))
5575 # Check that the new instance doesn't have less disks than the export
5576 instance_disks = len(self.disks)
5577 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5578 if instance_disks < export_disks:
5579 raise errors.OpPrereqError("Not enough disks to import."
5580 " (instance: %d, export: %d)" %
5581 (instance_disks, export_disks))
5583 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5585 for idx in range(export_disks):
5586 option = 'disk%d_dump' % idx
5587 if export_info.has_option(constants.INISECT_INS, option):
5588 # FIXME: are the old os-es, disk sizes, etc. useful?
5589 export_name = export_info.get(constants.INISECT_INS, option)
5590 image = os.path.join(src_path, export_name)
5591 disk_images.append(image)
5593 disk_images.append(False)
5595 self.src_images = disk_images
5597 old_name = export_info.get(constants.INISECT_INS, 'name')
5598 # FIXME: int() here could throw a ValueError on broken exports
5599 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5600 if self.op.instance_name == old_name:
5601 for idx, nic in enumerate(self.nics):
5602 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5603 nic_mac_ini = 'nic%d_mac' % idx
5604 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5606 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5607 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5608 if self.op.start and not self.op.ip_check:
5609 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5610 " adding an instance in start mode")
5612 if self.op.ip_check:
5613 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5614 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5615 (self.check_ip, self.op.instance_name))
5617 #### mac address generation
5618 # By generating here the mac address both the allocator and the hooks get
5619 # the real final mac address rather than the 'auto' or 'generate' value.
5620 # There is a race condition between the generation and the instance object
5621 # creation, which means that we know the mac is valid now, but we're not
5622 # sure it will be when we actually add the instance. If things go bad
5623 # adding the instance will abort because of a duplicate mac, and the
5624 # creation job will fail.
5625 for nic in self.nics:
5626 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5627 nic.mac = self.cfg.GenerateMAC()
5631 if self.op.iallocator is not None:
5632 self._RunAllocator()
5634 #### node related checks
5636 # check primary node
5637 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5638 assert self.pnode is not None, \
5639 "Cannot retrieve locked node %s" % self.op.pnode
5641 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5644 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5647 self.secondaries = []
5649 # mirror node verification
5650 if self.op.disk_template in constants.DTS_NET_MIRROR:
5651 if self.op.snode is None:
5652 raise errors.OpPrereqError("The networked disk templates need"
5654 if self.op.snode == pnode.name:
5655 raise errors.OpPrereqError("The secondary node cannot be"
5656 " the primary node.")
5657 _CheckNodeOnline(self, self.op.snode)
5658 _CheckNodeNotDrained(self, self.op.snode)
5659 self.secondaries.append(self.op.snode)
5661 nodenames = [pnode.name] + self.secondaries
5663 req_size = _ComputeDiskSize(self.op.disk_template,
5666 # Check lv size requirements
5667 if req_size is not None:
5668 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5670 for node in nodenames:
5671 info = nodeinfo[node]
5672 info.Raise("Cannot get current information from node %s" % node)
5674 vg_free = info.get('vg_free', None)
5675 if not isinstance(vg_free, int):
5676 raise errors.OpPrereqError("Can't compute free disk space on"
5678 if req_size > vg_free:
5679 raise errors.OpPrereqError("Not enough disk space on target node %s."
5680 " %d MB available, %d MB required" %
5681 (node, vg_free, req_size))
5683 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5686 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5687 result.Raise("OS '%s' not in supported os list for primary node %s" %
5688 (self.op.os_type, pnode.name), prereq=True)
5690 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5692 # memory check on primary node
5694 _CheckNodeFreeMemory(self, self.pnode.name,
5695 "creating instance %s" % self.op.instance_name,
5696 self.be_full[constants.BE_MEMORY],
5699 self.dry_run_result = list(nodenames)
5701 def Exec(self, feedback_fn):
5702 """Create and add the instance to the cluster.
5705 instance = self.op.instance_name
5706 pnode_name = self.pnode.name
5708 ht_kind = self.op.hypervisor
5709 if ht_kind in constants.HTS_REQ_PORT:
5710 network_port = self.cfg.AllocatePort()
5714 ##if self.op.vnc_bind_address is None:
5715 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5717 # this is needed because os.path.join does not accept None arguments
5718 if self.op.file_storage_dir is None:
5719 string_file_storage_dir = ""
5721 string_file_storage_dir = self.op.file_storage_dir
5723 # build the full file storage dir path
5724 file_storage_dir = os.path.normpath(os.path.join(
5725 self.cfg.GetFileStorageDir(),
5726 string_file_storage_dir, instance))
5729 disks = _GenerateDiskTemplate(self,
5730 self.op.disk_template,
5731 instance, pnode_name,
5735 self.op.file_driver,
5738 iobj = objects.Instance(name=instance, os=self.op.os_type,
5739 primary_node=pnode_name,
5740 nics=self.nics, disks=disks,
5741 disk_template=self.op.disk_template,
5743 network_port=network_port,
5744 beparams=self.op.beparams,
5745 hvparams=self.op.hvparams,
5746 hypervisor=self.op.hypervisor,
5749 feedback_fn("* creating instance disks...")
5751 _CreateDisks(self, iobj)
5752 except errors.OpExecError:
5753 self.LogWarning("Device creation failed, reverting...")
5755 _RemoveDisks(self, iobj)
5757 self.cfg.ReleaseDRBDMinors(instance)
5760 feedback_fn("adding instance %s to cluster config" % instance)
5762 self.cfg.AddInstance(iobj)
5763 # Declare that we don't want to remove the instance lock anymore, as we've
5764 # added the instance to the config
5765 del self.remove_locks[locking.LEVEL_INSTANCE]
5766 # Unlock all the nodes
5767 if self.op.mode == constants.INSTANCE_IMPORT:
5768 nodes_keep = [self.op.src_node]
5769 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5770 if node != self.op.src_node]
5771 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5772 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5774 self.context.glm.release(locking.LEVEL_NODE)
5775 del self.acquired_locks[locking.LEVEL_NODE]
5777 if self.op.wait_for_sync:
5778 disk_abort = not _WaitForSync(self, iobj)
5779 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5780 # make sure the disks are not degraded (still sync-ing is ok)
5782 feedback_fn("* checking mirrors status")
5783 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5788 _RemoveDisks(self, iobj)
5789 self.cfg.RemoveInstance(iobj.name)
5790 # Make sure the instance lock gets removed
5791 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5792 raise errors.OpExecError("There are some degraded disks for"
5795 feedback_fn("creating os for instance %s on node %s" %
5796 (instance, pnode_name))
5798 if iobj.disk_template != constants.DT_DISKLESS:
5799 if self.op.mode == constants.INSTANCE_CREATE:
5800 feedback_fn("* running the instance OS create scripts...")
5801 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5802 result.Raise("Could not add os for instance %s"
5803 " on node %s" % (instance, pnode_name))
5805 elif self.op.mode == constants.INSTANCE_IMPORT:
5806 feedback_fn("* running the instance OS import scripts...")
5807 src_node = self.op.src_node
5808 src_images = self.src_images
5809 cluster_name = self.cfg.GetClusterName()
5810 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5811 src_node, src_images,
5813 msg = import_result.fail_msg
5815 self.LogWarning("Error while importing the disk images for instance"
5816 " %s on node %s: %s" % (instance, pnode_name, msg))
5818 # also checked in the prereq part
5819 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5823 iobj.admin_up = True
5824 self.cfg.Update(iobj)
5825 logging.info("Starting instance %s on node %s", instance, pnode_name)
5826 feedback_fn("* starting instance...")
5827 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5828 result.Raise("Could not start instance")
5830 return list(iobj.all_nodes)
5833 class LUConnectConsole(NoHooksLU):
5834 """Connect to an instance's console.
5836 This is somewhat special in that it returns the command line that
5837 you need to run on the master node in order to connect to the
5841 _OP_REQP = ["instance_name"]
5844 def ExpandNames(self):
5845 self._ExpandAndLockInstance()
5847 def CheckPrereq(self):
5848 """Check prerequisites.
5850 This checks that the instance is in the cluster.
5853 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5854 assert self.instance is not None, \
5855 "Cannot retrieve locked instance %s" % self.op.instance_name
5856 _CheckNodeOnline(self, self.instance.primary_node)
5858 def Exec(self, feedback_fn):
5859 """Connect to the console of an instance
5862 instance = self.instance
5863 node = instance.primary_node
5865 node_insts = self.rpc.call_instance_list([node],
5866 [instance.hypervisor])[node]
5867 node_insts.Raise("Can't get node information from %s" % node)
5869 if instance.name not in node_insts.payload:
5870 raise errors.OpExecError("Instance %s is not running." % instance.name)
5872 logging.debug("Connecting to console of %s on %s", instance.name, node)
5874 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5875 cluster = self.cfg.GetClusterInfo()
5876 # beparams and hvparams are passed separately, to avoid editing the
5877 # instance and then saving the defaults in the instance itself.
5878 hvparams = cluster.FillHV(instance)
5879 beparams = cluster.FillBE(instance)
5880 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5883 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5886 class LUReplaceDisks(LogicalUnit):
5887 """Replace the disks of an instance.
5890 HPATH = "mirrors-replace"
5891 HTYPE = constants.HTYPE_INSTANCE
5892 _OP_REQP = ["instance_name", "mode", "disks"]
5895 def CheckArguments(self):
5896 if not hasattr(self.op, "remote_node"):
5897 self.op.remote_node = None
5898 if not hasattr(self.op, "iallocator"):
5899 self.op.iallocator = None
5901 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
5904 def ExpandNames(self):
5905 self._ExpandAndLockInstance()
5907 if self.op.iallocator is not None:
5908 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5910 elif self.op.remote_node is not None:
5911 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5912 if remote_node is None:
5913 raise errors.OpPrereqError("Node '%s' not known" %
5914 self.op.remote_node)
5916 self.op.remote_node = remote_node
5918 # Warning: do not remove the locking of the new secondary here
5919 # unless DRBD8.AddChildren is changed to work in parallel;
5920 # currently it doesn't since parallel invocations of
5921 # FindUnusedMinor will conflict
5922 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5923 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5926 self.needed_locks[locking.LEVEL_NODE] = []
5927 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5929 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
5930 self.op.iallocator, self.op.remote_node,
5933 self.tasklets = [self.replacer]
5935 def DeclareLocks(self, level):
5936 # If we're not already locking all nodes in the set we have to declare the
5937 # instance's primary/secondary nodes.
5938 if (level == locking.LEVEL_NODE and
5939 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5940 self._LockInstancesNodes()
5942 def BuildHooksEnv(self):
5945 This runs on the master, the primary and all the secondaries.
5948 instance = self.replacer.instance
5950 "MODE": self.op.mode,
5951 "NEW_SECONDARY": self.op.remote_node,
5952 "OLD_SECONDARY": instance.secondary_nodes[0],
5954 env.update(_BuildInstanceHookEnvByObject(self, instance))
5956 self.cfg.GetMasterNode(),
5957 instance.primary_node,
5959 if self.op.remote_node is not None:
5960 nl.append(self.op.remote_node)
5964 class LUEvacuateNode(LogicalUnit):
5965 """Relocate the secondary instances from a node.
5968 HPATH = "node-evacuate"
5969 HTYPE = constants.HTYPE_NODE
5970 _OP_REQP = ["node_name"]
5973 def CheckArguments(self):
5974 if not hasattr(self.op, "remote_node"):
5975 self.op.remote_node = None
5976 if not hasattr(self.op, "iallocator"):
5977 self.op.iallocator = None
5979 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
5980 self.op.remote_node,
5983 def ExpandNames(self):
5984 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
5985 if self.op.node_name is None:
5986 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
5988 self.needed_locks = {}
5990 # Declare node locks
5991 if self.op.iallocator is not None:
5992 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5994 elif self.op.remote_node is not None:
5995 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5996 if remote_node is None:
5997 raise errors.OpPrereqError("Node '%s' not known" %
5998 self.op.remote_node)
6000 self.op.remote_node = remote_node
6002 # Warning: do not remove the locking of the new secondary here
6003 # unless DRBD8.AddChildren is changed to work in parallel;
6004 # currently it doesn't since parallel invocations of
6005 # FindUnusedMinor will conflict
6006 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6007 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6010 raise errors.OpPrereqError("Invalid parameters")
6012 # Create tasklets for replacing disks for all secondary instances on this
6017 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6018 logging.debug("Replacing disks for instance %s", inst.name)
6019 names.append(inst.name)
6021 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6022 self.op.iallocator, self.op.remote_node, [])
6023 tasklets.append(replacer)
6025 self.tasklets = tasklets
6026 self.instance_names = names
6028 # Declare instance locks
6029 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6031 def DeclareLocks(self, level):
6032 # If we're not already locking all nodes in the set we have to declare the
6033 # instance's primary/secondary nodes.
6034 if (level == locking.LEVEL_NODE and
6035 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6036 self._LockInstancesNodes()
6038 def BuildHooksEnv(self):
6041 This runs on the master, the primary and all the secondaries.
6045 "NODE_NAME": self.op.node_name,
6048 nl = [self.cfg.GetMasterNode()]
6050 if self.op.remote_node is not None:
6051 env["NEW_SECONDARY"] = self.op.remote_node
6052 nl.append(self.op.remote_node)
6054 return (env, nl, nl)
6057 class TLReplaceDisks(Tasklet):
6058 """Replaces disks for an instance.
6060 Note: Locking is not within the scope of this class.
6063 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6065 """Initializes this class.
6068 Tasklet.__init__(self, lu)
6071 self.instance_name = instance_name
6073 self.iallocator_name = iallocator_name
6074 self.remote_node = remote_node
6078 self.instance = None
6079 self.new_node = None
6080 self.target_node = None
6081 self.other_node = None
6082 self.remote_node_info = None
6083 self.node_secondary_ip = None
6086 def CheckArguments(mode, remote_node, iallocator):
6087 """Helper function for users of this class.
6090 # check for valid parameter combination
6091 if mode == constants.REPLACE_DISK_CHG:
6092 if remote_node is None and iallocator is None:
6093 raise errors.OpPrereqError("When changing the secondary either an"
6094 " iallocator script must be used or the"
6097 if remote_node is not None and iallocator is not None:
6098 raise errors.OpPrereqError("Give either the iallocator or the new"
6099 " secondary, not both")
6101 elif remote_node is not None or iallocator is not None:
6102 # Not replacing the secondary
6103 raise errors.OpPrereqError("The iallocator and new node options can"
6104 " only be used when changing the"
6108 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6109 """Compute a new secondary node using an IAllocator.
6112 ial = IAllocator(lu.cfg, lu.rpc,
6113 mode=constants.IALLOCATOR_MODE_RELOC,
6115 relocate_from=relocate_from)
6117 ial.Run(iallocator_name)
6120 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6121 " %s" % (iallocator_name, ial.info))
6123 if len(ial.nodes) != ial.required_nodes:
6124 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6125 " of nodes (%s), required %s" %
6126 (len(ial.nodes), ial.required_nodes))
6128 remote_node_name = ial.nodes[0]
6130 lu.LogInfo("Selected new secondary for instance '%s': %s",
6131 instance_name, remote_node_name)
6133 return remote_node_name
6135 def _FindFaultyDisks(self, node_name):
6136 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6139 def CheckPrereq(self):
6140 """Check prerequisites.
6142 This checks that the instance is in the cluster.
6145 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
6146 assert self.instance is not None, \
6147 "Cannot retrieve locked instance %s" % self.instance_name
6149 if self.instance.disk_template != constants.DT_DRBD8:
6150 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6153 if len(self.instance.secondary_nodes) != 1:
6154 raise errors.OpPrereqError("The instance has a strange layout,"
6155 " expected one secondary but found %d" %
6156 len(self.instance.secondary_nodes))
6158 secondary_node = self.instance.secondary_nodes[0]
6160 if self.iallocator_name is None:
6161 remote_node = self.remote_node
6163 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6164 self.instance.name, secondary_node)
6166 if remote_node is not None:
6167 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6168 assert self.remote_node_info is not None, \
6169 "Cannot retrieve locked node %s" % remote_node
6171 self.remote_node_info = None
6173 if remote_node == self.instance.primary_node:
6174 raise errors.OpPrereqError("The specified node is the primary node of"
6177 if remote_node == secondary_node:
6178 raise errors.OpPrereqError("The specified node is already the"
6179 " secondary node of the instance.")
6181 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6182 constants.REPLACE_DISK_CHG):
6183 raise errors.OpPrereqError("Cannot specify disks to be replaced")
6185 if self.mode == constants.REPLACE_DISK_AUTO:
6186 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
6187 faulty_secondary = self._FindFaultyDisks(secondary_node)
6189 if faulty_primary and faulty_secondary:
6190 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6191 " one node and can not be repaired"
6192 " automatically" % self.instance_name)
6195 self.disks = faulty_primary
6196 self.target_node = self.instance.primary_node
6197 self.other_node = secondary_node
6198 check_nodes = [self.target_node, self.other_node]
6199 elif faulty_secondary:
6200 self.disks = faulty_secondary
6201 self.target_node = secondary_node
6202 self.other_node = self.instance.primary_node
6203 check_nodes = [self.target_node, self.other_node]
6209 # Non-automatic modes
6210 if self.mode == constants.REPLACE_DISK_PRI:
6211 self.target_node = self.instance.primary_node
6212 self.other_node = secondary_node
6213 check_nodes = [self.target_node, self.other_node]
6215 elif self.mode == constants.REPLACE_DISK_SEC:
6216 self.target_node = secondary_node
6217 self.other_node = self.instance.primary_node
6218 check_nodes = [self.target_node, self.other_node]
6220 elif self.mode == constants.REPLACE_DISK_CHG:
6221 self.new_node = remote_node
6222 self.other_node = self.instance.primary_node
6223 self.target_node = secondary_node
6224 check_nodes = [self.new_node, self.other_node]
6226 _CheckNodeNotDrained(self.lu, remote_node)
6229 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6232 # If not specified all disks should be replaced
6234 self.disks = range(len(self.instance.disks))
6236 for node in check_nodes:
6237 _CheckNodeOnline(self.lu, node)
6239 # Check whether disks are valid
6240 for disk_idx in self.disks:
6241 self.instance.FindDisk(disk_idx)
6243 # Get secondary node IP addresses
6246 for node_name in [self.target_node, self.other_node, self.new_node]:
6247 if node_name is not None:
6248 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6250 self.node_secondary_ip = node_2nd_ip
6252 def Exec(self, feedback_fn):
6253 """Execute disk replacement.
6255 This dispatches the disk replacement to the appropriate handler.
6259 feedback_fn("No disks need replacement")
6262 feedback_fn("Replacing disk(s) %s for %s" %
6263 (", ".join([str(i) for i in self.disks]), self.instance.name))
6265 activate_disks = (not self.instance.admin_up)
6267 # Activate the instance disks if we're replacing them on a down instance
6269 _StartInstanceDisks(self.lu, self.instance, True)
6272 # Should we replace the secondary node?
6273 if self.new_node is not None:
6274 return self._ExecDrbd8Secondary()
6276 return self._ExecDrbd8DiskOnly()
6279 # Deactivate the instance disks if we're replacing them on a down instance
6281 _SafeShutdownInstanceDisks(self.lu, self.instance)
6283 def _CheckVolumeGroup(self, nodes):
6284 self.lu.LogInfo("Checking volume groups")
6286 vgname = self.cfg.GetVGName()
6288 # Make sure volume group exists on all involved nodes
6289 results = self.rpc.call_vg_list(nodes)
6291 raise errors.OpExecError("Can't list volume groups on the nodes")
6295 res.Raise("Error checking node %s" % node)
6296 if vgname not in res.payload:
6297 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6300 def _CheckDisksExistence(self, nodes):
6301 # Check disk existence
6302 for idx, dev in enumerate(self.instance.disks):
6303 if idx not in self.disks:
6307 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6308 self.cfg.SetDiskID(dev, node)
6310 result = self.rpc.call_blockdev_find(node, dev)
6312 msg = result.fail_msg
6313 if msg or not result.payload:
6315 msg = "disk not found"
6316 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6319 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6320 for idx, dev in enumerate(self.instance.disks):
6321 if idx not in self.disks:
6324 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6327 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6329 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6330 " replace disks for instance %s" %
6331 (node_name, self.instance.name))
6333 def _CreateNewStorage(self, node_name):
6334 vgname = self.cfg.GetVGName()
6337 for idx, dev in enumerate(self.instance.disks):
6338 if idx not in self.disks:
6341 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6343 self.cfg.SetDiskID(dev, node_name)
6345 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6346 names = _GenerateUniqueNames(self.lu, lv_names)
6348 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6349 logical_id=(vgname, names[0]))
6350 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6351 logical_id=(vgname, names[1]))
6353 new_lvs = [lv_data, lv_meta]
6354 old_lvs = dev.children
6355 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6357 # we pass force_create=True to force the LVM creation
6358 for new_lv in new_lvs:
6359 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6360 _GetInstanceInfoText(self.instance), False)
6364 def _CheckDevices(self, node_name, iv_names):
6365 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6366 self.cfg.SetDiskID(dev, node_name)
6368 result = self.rpc.call_blockdev_find(node_name, dev)
6370 msg = result.fail_msg
6371 if msg or not result.payload:
6373 msg = "disk not found"
6374 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6377 if result.payload.is_degraded:
6378 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6380 def _RemoveOldStorage(self, node_name, iv_names):
6381 for name, (dev, old_lvs, _) in iv_names.iteritems():
6382 self.lu.LogInfo("Remove logical volumes for %s" % name)
6385 self.cfg.SetDiskID(lv, node_name)
6387 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6389 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6390 hint="remove unused LVs manually")
6392 def _ExecDrbd8DiskOnly(self):
6393 """Replace a disk on the primary or secondary for DRBD 8.
6395 The algorithm for replace is quite complicated:
6397 1. for each disk to be replaced:
6399 1. create new LVs on the target node with unique names
6400 1. detach old LVs from the drbd device
6401 1. rename old LVs to name_replaced.<time_t>
6402 1. rename new LVs to old LVs
6403 1. attach the new LVs (with the old names now) to the drbd device
6405 1. wait for sync across all devices
6407 1. for each modified disk:
6409 1. remove old LVs (which have the name name_replaces.<time_t>)
6411 Failures are not very well handled.
6416 # Step: check device activation
6417 self.lu.LogStep(1, steps_total, "Check device existence")
6418 self._CheckDisksExistence([self.other_node, self.target_node])
6419 self._CheckVolumeGroup([self.target_node, self.other_node])
6421 # Step: check other node consistency
6422 self.lu.LogStep(2, steps_total, "Check peer consistency")
6423 self._CheckDisksConsistency(self.other_node,
6424 self.other_node == self.instance.primary_node,
6427 # Step: create new storage
6428 self.lu.LogStep(3, steps_total, "Allocate new storage")
6429 iv_names = self._CreateNewStorage(self.target_node)
6431 # Step: for each lv, detach+rename*2+attach
6432 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6433 for dev, old_lvs, new_lvs in iv_names.itervalues():
6434 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6436 result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs)
6437 result.Raise("Can't detach drbd from local storage on node"
6438 " %s for device %s" % (self.target_node, dev.iv_name))
6440 #cfg.Update(instance)
6442 # ok, we created the new LVs, so now we know we have the needed
6443 # storage; as such, we proceed on the target node to rename
6444 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6445 # using the assumption that logical_id == physical_id (which in
6446 # turn is the unique_id on that node)
6448 # FIXME(iustin): use a better name for the replaced LVs
6449 temp_suffix = int(time.time())
6450 ren_fn = lambda d, suff: (d.physical_id[0],
6451 d.physical_id[1] + "_replaced-%s" % suff)
6453 # Build the rename list based on what LVs exist on the node
6454 rename_old_to_new = []
6455 for to_ren in old_lvs:
6456 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6457 if not result.fail_msg and result.payload:
6459 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6461 self.lu.LogInfo("Renaming the old LVs on the target node")
6462 result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new)
6463 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6465 # Now we rename the new LVs to the old LVs
6466 self.lu.LogInfo("Renaming the new LVs on the target node")
6467 rename_new_to_old = [(new, old.physical_id)
6468 for old, new in zip(old_lvs, new_lvs)]
6469 result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old)
6470 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6472 for old, new in zip(old_lvs, new_lvs):
6473 new.logical_id = old.logical_id
6474 self.cfg.SetDiskID(new, self.target_node)
6476 for disk in old_lvs:
6477 disk.logical_id = ren_fn(disk, temp_suffix)
6478 self.cfg.SetDiskID(disk, self.target_node)
6480 # Now that the new lvs have the old name, we can add them to the device
6481 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6482 result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs)
6483 msg = result.fail_msg
6485 for new_lv in new_lvs:
6486 msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg
6488 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6489 hint=("cleanup manually the unused logical"
6491 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6493 dev.children = new_lvs
6495 self.cfg.Update(self.instance)
6498 # This can fail as the old devices are degraded and _WaitForSync
6499 # does a combined result over all disks, so we don't check its return value
6500 self.lu.LogStep(5, steps_total, "Sync devices")
6501 _WaitForSync(self.lu, self.instance, unlock=True)
6503 # Check all devices manually
6504 self._CheckDevices(self.instance.primary_node, iv_names)
6506 # Step: remove old storage
6507 self.lu.LogStep(6, steps_total, "Removing old storage")
6508 self._RemoveOldStorage(self.target_node, iv_names)
6510 def _ExecDrbd8Secondary(self):
6511 """Replace the secondary node for DRBD 8.
6513 The algorithm for replace is quite complicated:
6514 - for all disks of the instance:
6515 - create new LVs on the new node with same names
6516 - shutdown the drbd device on the old secondary
6517 - disconnect the drbd network on the primary
6518 - create the drbd device on the new secondary
6519 - network attach the drbd on the primary, using an artifice:
6520 the drbd code for Attach() will connect to the network if it
6521 finds a device which is connected to the good local disks but
6523 - wait for sync across all devices
6524 - remove all disks from the old secondary
6526 Failures are not very well handled.
6531 # Step: check device activation
6532 self.lu.LogStep(1, steps_total, "Check device existence")
6533 self._CheckDisksExistence([self.instance.primary_node])
6534 self._CheckVolumeGroup([self.instance.primary_node])
6536 # Step: check other node consistency
6537 self.lu.LogStep(2, steps_total, "Check peer consistency")
6538 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6540 # Step: create new storage
6541 self.lu.LogStep(3, steps_total, "Allocate new storage")
6542 for idx, dev in enumerate(self.instance.disks):
6543 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6544 (self.new_node, idx))
6545 # we pass force_create=True to force LVM creation
6546 for new_lv in dev.children:
6547 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6548 _GetInstanceInfoText(self.instance), False)
6550 # Step 4: dbrd minors and drbd setups changes
6551 # after this, we must manually remove the drbd minors on both the
6552 # error and the success paths
6553 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6554 minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks],
6556 logging.debug("Allocated minors %r" % (minors,))
6559 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6560 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx))
6561 # create new devices on new_node; note that we create two IDs:
6562 # one without port, so the drbd will be activated without
6563 # networking information on the new node at this stage, and one
6564 # with network, for the latter activation in step 4
6565 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6566 if self.instance.primary_node == o_node1:
6571 new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret)
6572 new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret)
6574 iv_names[idx] = (dev, dev.children, new_net_id)
6575 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6577 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6578 logical_id=new_alone_id,
6579 children=dev.children,
6582 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6583 _GetInstanceInfoText(self.instance), False)
6584 except errors.GenericError:
6585 self.cfg.ReleaseDRBDMinors(self.instance.name)
6588 # We have new devices, shutdown the drbd on the old secondary
6589 for idx, dev in enumerate(self.instance.disks):
6590 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6591 self.cfg.SetDiskID(dev, self.target_node)
6592 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6594 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6595 "node: %s" % (idx, msg),
6596 hint=("Please cleanup this device manually as"
6597 " soon as possible"))
6599 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6600 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip,
6601 self.instance.disks)[self.instance.primary_node]
6603 msg = result.fail_msg
6605 # detaches didn't succeed (unlikely)
6606 self.cfg.ReleaseDRBDMinors(self.instance.name)
6607 raise errors.OpExecError("Can't detach the disks from the network on"
6608 " old node: %s" % (msg,))
6610 # if we managed to detach at least one, we update all the disks of
6611 # the instance to point to the new secondary
6612 self.lu.LogInfo("Updating instance configuration")
6613 for dev, _, new_logical_id in iv_names.itervalues():
6614 dev.logical_id = new_logical_id
6615 self.cfg.SetDiskID(dev, self.instance.primary_node)
6617 self.cfg.Update(self.instance)
6619 # and now perform the drbd attach
6620 self.lu.LogInfo("Attaching primary drbds to new secondary"
6621 " (standalone => connected)")
6622 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip,
6623 self.instance.disks, self.instance.name,
6625 for to_node, to_result in result.items():
6626 msg = to_result.fail_msg
6628 self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg,
6629 hint=("please do a gnt-instance info to see the"
6630 " status of disks"))
6633 # This can fail as the old devices are degraded and _WaitForSync
6634 # does a combined result over all disks, so we don't check its return value
6635 self.lu.LogStep(5, steps_total, "Sync devices")
6636 _WaitForSync(self.lu, self.instance, unlock=True)
6638 # Check all devices manually
6639 self._CheckDevices(self.instance.primary_node, iv_names)
6641 # Step: remove old storage
6642 self.lu.LogStep(6, steps_total, "Removing old storage")
6643 self._RemoveOldStorage(self.target_node, iv_names)
6646 class LURepairNodeStorage(NoHooksLU):
6647 """Repairs the volume group on a node.
6650 _OP_REQP = ["node_name"]
6653 def CheckArguments(self):
6654 node_name = self.cfg.ExpandNodeName(self.op.node_name)
6655 if node_name is None:
6656 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
6658 self.op.node_name = node_name
6660 def ExpandNames(self):
6661 self.needed_locks = {
6662 locking.LEVEL_NODE: [self.op.node_name],
6665 def _CheckFaultyDisks(self, instance, node_name):
6666 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
6668 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
6669 " node '%s'" % (inst.name, node_name))
6671 def CheckPrereq(self):
6672 """Check prerequisites.
6675 storage_type = self.op.storage_type
6677 if (constants.SO_FIX_CONSISTENCY not in
6678 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
6679 raise errors.OpPrereqError("Storage units of type '%s' can not be"
6680 " repaired" % storage_type)
6682 # Check whether any instance on this node has faulty disks
6683 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
6684 check_nodes = set(inst.all_nodes)
6685 check_nodes.discard(self.op.node_name)
6686 for inst_node_name in check_nodes:
6687 self._CheckFaultyDisks(inst, inst_node_name)
6689 def Exec(self, feedback_fn):
6690 feedback_fn("Repairing storage unit '%s' on %s ..." %
6691 (self.op.name, self.op.node_name))
6693 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
6694 result = self.rpc.call_storage_execute(self.op.node_name,
6695 self.op.storage_type, st_args,
6697 constants.SO_FIX_CONSISTENCY)
6698 result.Raise("Failed to repair storage unit '%s' on %s" %
6699 (self.op.name, self.op.node_name))
6702 class LUGrowDisk(LogicalUnit):
6703 """Grow a disk of an instance.
6707 HTYPE = constants.HTYPE_INSTANCE
6708 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6711 def ExpandNames(self):
6712 self._ExpandAndLockInstance()
6713 self.needed_locks[locking.LEVEL_NODE] = []
6714 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6716 def DeclareLocks(self, level):
6717 if level == locking.LEVEL_NODE:
6718 self._LockInstancesNodes()
6720 def BuildHooksEnv(self):
6723 This runs on the master, the primary and all the secondaries.
6727 "DISK": self.op.disk,
6728 "AMOUNT": self.op.amount,
6730 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6732 self.cfg.GetMasterNode(),
6733 self.instance.primary_node,
6737 def CheckPrereq(self):
6738 """Check prerequisites.
6740 This checks that the instance is in the cluster.
6743 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6744 assert instance is not None, \
6745 "Cannot retrieve locked instance %s" % self.op.instance_name
6746 nodenames = list(instance.all_nodes)
6747 for node in nodenames:
6748 _CheckNodeOnline(self, node)
6751 self.instance = instance
6753 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6754 raise errors.OpPrereqError("Instance's disk layout does not support"
6757 self.disk = instance.FindDisk(self.op.disk)
6759 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6760 instance.hypervisor)
6761 for node in nodenames:
6762 info = nodeinfo[node]
6763 info.Raise("Cannot get current information from node %s" % node)
6764 vg_free = info.payload.get('vg_free', None)
6765 if not isinstance(vg_free, int):
6766 raise errors.OpPrereqError("Can't compute free disk space on"
6768 if self.op.amount > vg_free:
6769 raise errors.OpPrereqError("Not enough disk space on target node %s:"
6770 " %d MiB available, %d MiB required" %
6771 (node, vg_free, self.op.amount))
6773 def Exec(self, feedback_fn):
6774 """Execute disk grow.
6777 instance = self.instance
6779 for node in instance.all_nodes:
6780 self.cfg.SetDiskID(disk, node)
6781 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6782 result.Raise("Grow request failed to node %s" % node)
6783 disk.RecordGrow(self.op.amount)
6784 self.cfg.Update(instance)
6785 if self.op.wait_for_sync:
6786 disk_abort = not _WaitForSync(self, instance)
6788 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6789 " status.\nPlease check the instance.")
6792 class LUQueryInstanceData(NoHooksLU):
6793 """Query runtime instance data.
6796 _OP_REQP = ["instances", "static"]
6799 def ExpandNames(self):
6800 self.needed_locks = {}
6801 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6803 if not isinstance(self.op.instances, list):
6804 raise errors.OpPrereqError("Invalid argument type 'instances'")
6806 if self.op.instances:
6807 self.wanted_names = []
6808 for name in self.op.instances:
6809 full_name = self.cfg.ExpandInstanceName(name)
6810 if full_name is None:
6811 raise errors.OpPrereqError("Instance '%s' not known" % name)
6812 self.wanted_names.append(full_name)
6813 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6815 self.wanted_names = None
6816 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6818 self.needed_locks[locking.LEVEL_NODE] = []
6819 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6821 def DeclareLocks(self, level):
6822 if level == locking.LEVEL_NODE:
6823 self._LockInstancesNodes()
6825 def CheckPrereq(self):
6826 """Check prerequisites.
6828 This only checks the optional instance list against the existing names.
6831 if self.wanted_names is None:
6832 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
6834 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
6835 in self.wanted_names]
6838 def _ComputeBlockdevStatus(self, node, instance_name, dev):
6839 """Returns the status of a block device
6842 if self.op.static or not node:
6845 self.cfg.SetDiskID(dev, node)
6847 result = self.rpc.call_blockdev_find(node, dev)
6851 result.Raise("Can't compute disk status for %s" % instance_name)
6853 status = result.payload
6857 return (status.dev_path, status.major, status.minor,
6858 status.sync_percent, status.estimated_time,
6859 status.is_degraded, status.ldisk_status)
6861 def _ComputeDiskStatus(self, instance, snode, dev):
6862 """Compute block device status.
6865 if dev.dev_type in constants.LDS_DRBD:
6866 # we change the snode then (otherwise we use the one passed in)
6867 if dev.logical_id[0] == instance.primary_node:
6868 snode = dev.logical_id[1]
6870 snode = dev.logical_id[0]
6872 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
6874 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
6877 dev_children = [self._ComputeDiskStatus(instance, snode, child)
6878 for child in dev.children]
6883 "iv_name": dev.iv_name,
6884 "dev_type": dev.dev_type,
6885 "logical_id": dev.logical_id,
6886 "physical_id": dev.physical_id,
6887 "pstatus": dev_pstatus,
6888 "sstatus": dev_sstatus,
6889 "children": dev_children,
6896 def Exec(self, feedback_fn):
6897 """Gather and return data"""
6900 cluster = self.cfg.GetClusterInfo()
6902 for instance in self.wanted_instances:
6903 if not self.op.static:
6904 remote_info = self.rpc.call_instance_info(instance.primary_node,
6906 instance.hypervisor)
6907 remote_info.Raise("Error checking node %s" % instance.primary_node)
6908 remote_info = remote_info.payload
6909 if remote_info and "state" in remote_info:
6912 remote_state = "down"
6915 if instance.admin_up:
6918 config_state = "down"
6920 disks = [self._ComputeDiskStatus(instance, None, device)
6921 for device in instance.disks]
6924 "name": instance.name,
6925 "config_state": config_state,
6926 "run_state": remote_state,
6927 "pnode": instance.primary_node,
6928 "snodes": instance.secondary_nodes,
6930 # this happens to be the same format used for hooks
6931 "nics": _NICListToTuple(self, instance.nics),
6933 "hypervisor": instance.hypervisor,
6934 "network_port": instance.network_port,
6935 "hv_instance": instance.hvparams,
6936 "hv_actual": cluster.FillHV(instance),
6937 "be_instance": instance.beparams,
6938 "be_actual": cluster.FillBE(instance),
6939 "serial_no": instance.serial_no,
6940 "mtime": instance.mtime,
6941 "ctime": instance.ctime,
6944 result[instance.name] = idict
6949 class LUSetInstanceParams(LogicalUnit):
6950 """Modifies an instances's parameters.
6953 HPATH = "instance-modify"
6954 HTYPE = constants.HTYPE_INSTANCE
6955 _OP_REQP = ["instance_name"]
6958 def CheckArguments(self):
6959 if not hasattr(self.op, 'nics'):
6961 if not hasattr(self.op, 'disks'):
6963 if not hasattr(self.op, 'beparams'):
6964 self.op.beparams = {}
6965 if not hasattr(self.op, 'hvparams'):
6966 self.op.hvparams = {}
6967 self.op.force = getattr(self.op, "force", False)
6968 if not (self.op.nics or self.op.disks or
6969 self.op.hvparams or self.op.beparams):
6970 raise errors.OpPrereqError("No changes submitted")
6974 for disk_op, disk_dict in self.op.disks:
6975 if disk_op == constants.DDM_REMOVE:
6978 elif disk_op == constants.DDM_ADD:
6981 if not isinstance(disk_op, int):
6982 raise errors.OpPrereqError("Invalid disk index")
6983 if not isinstance(disk_dict, dict):
6984 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
6985 raise errors.OpPrereqError(msg)
6987 if disk_op == constants.DDM_ADD:
6988 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
6989 if mode not in constants.DISK_ACCESS_SET:
6990 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
6991 size = disk_dict.get('size', None)
6993 raise errors.OpPrereqError("Required disk parameter size missing")
6996 except ValueError, err:
6997 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
6999 disk_dict['size'] = size
7001 # modification of disk
7002 if 'size' in disk_dict:
7003 raise errors.OpPrereqError("Disk size change not possible, use"
7006 if disk_addremove > 1:
7007 raise errors.OpPrereqError("Only one disk add or remove operation"
7008 " supported at a time")
7012 for nic_op, nic_dict in self.op.nics:
7013 if nic_op == constants.DDM_REMOVE:
7016 elif nic_op == constants.DDM_ADD:
7019 if not isinstance(nic_op, int):
7020 raise errors.OpPrereqError("Invalid nic index")
7021 if not isinstance(nic_dict, dict):
7022 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7023 raise errors.OpPrereqError(msg)
7025 # nic_dict should be a dict
7026 nic_ip = nic_dict.get('ip', None)
7027 if nic_ip is not None:
7028 if nic_ip.lower() == constants.VALUE_NONE:
7029 nic_dict['ip'] = None
7031 if not utils.IsValidIP(nic_ip):
7032 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
7034 nic_bridge = nic_dict.get('bridge', None)
7035 nic_link = nic_dict.get('link', None)
7036 if nic_bridge and nic_link:
7037 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7038 " at the same time")
7039 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7040 nic_dict['bridge'] = None
7041 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7042 nic_dict['link'] = None
7044 if nic_op == constants.DDM_ADD:
7045 nic_mac = nic_dict.get('mac', None)
7047 nic_dict['mac'] = constants.VALUE_AUTO
7049 if 'mac' in nic_dict:
7050 nic_mac = nic_dict['mac']
7051 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7052 if not utils.IsValidMac(nic_mac):
7053 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
7054 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7055 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7056 " modifying an existing nic")
7058 if nic_addremove > 1:
7059 raise errors.OpPrereqError("Only one NIC add or remove operation"
7060 " supported at a time")
7062 def ExpandNames(self):
7063 self._ExpandAndLockInstance()
7064 self.needed_locks[locking.LEVEL_NODE] = []
7065 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7067 def DeclareLocks(self, level):
7068 if level == locking.LEVEL_NODE:
7069 self._LockInstancesNodes()
7071 def BuildHooksEnv(self):
7074 This runs on the master, primary and secondaries.
7078 if constants.BE_MEMORY in self.be_new:
7079 args['memory'] = self.be_new[constants.BE_MEMORY]
7080 if constants.BE_VCPUS in self.be_new:
7081 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7082 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7083 # information at all.
7086 nic_override = dict(self.op.nics)
7087 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7088 for idx, nic in enumerate(self.instance.nics):
7089 if idx in nic_override:
7090 this_nic_override = nic_override[idx]
7092 this_nic_override = {}
7093 if 'ip' in this_nic_override:
7094 ip = this_nic_override['ip']
7097 if 'mac' in this_nic_override:
7098 mac = this_nic_override['mac']
7101 if idx in self.nic_pnew:
7102 nicparams = self.nic_pnew[idx]
7104 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7105 mode = nicparams[constants.NIC_MODE]
7106 link = nicparams[constants.NIC_LINK]
7107 args['nics'].append((ip, mac, mode, link))
7108 if constants.DDM_ADD in nic_override:
7109 ip = nic_override[constants.DDM_ADD].get('ip', None)
7110 mac = nic_override[constants.DDM_ADD]['mac']
7111 nicparams = self.nic_pnew[constants.DDM_ADD]
7112 mode = nicparams[constants.NIC_MODE]
7113 link = nicparams[constants.NIC_LINK]
7114 args['nics'].append((ip, mac, mode, link))
7115 elif constants.DDM_REMOVE in nic_override:
7116 del args['nics'][-1]
7118 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7119 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7122 def _GetUpdatedParams(self, old_params, update_dict,
7123 default_values, parameter_types):
7124 """Return the new params dict for the given params.
7126 @type old_params: dict
7127 @param old_params: old parameters
7128 @type update_dict: dict
7129 @param update_dict: dict containing new parameter values,
7130 or constants.VALUE_DEFAULT to reset the
7131 parameter to its default value
7132 @type default_values: dict
7133 @param default_values: default values for the filled parameters
7134 @type parameter_types: dict
7135 @param parameter_types: dict mapping target dict keys to types
7136 in constants.ENFORCEABLE_TYPES
7137 @rtype: (dict, dict)
7138 @return: (new_parameters, filled_parameters)
7141 params_copy = copy.deepcopy(old_params)
7142 for key, val in update_dict.iteritems():
7143 if val == constants.VALUE_DEFAULT:
7145 del params_copy[key]
7149 params_copy[key] = val
7150 utils.ForceDictType(params_copy, parameter_types)
7151 params_filled = objects.FillDict(default_values, params_copy)
7152 return (params_copy, params_filled)
7154 def CheckPrereq(self):
7155 """Check prerequisites.
7157 This only checks the instance list against the existing names.
7160 self.force = self.op.force
7162 # checking the new params on the primary/secondary nodes
7164 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7165 cluster = self.cluster = self.cfg.GetClusterInfo()
7166 assert self.instance is not None, \
7167 "Cannot retrieve locked instance %s" % self.op.instance_name
7168 pnode = instance.primary_node
7169 nodelist = list(instance.all_nodes)
7171 # hvparams processing
7172 if self.op.hvparams:
7173 i_hvdict, hv_new = self._GetUpdatedParams(
7174 instance.hvparams, self.op.hvparams,
7175 cluster.hvparams[instance.hypervisor],
7176 constants.HVS_PARAMETER_TYPES)
7178 hypervisor.GetHypervisor(
7179 instance.hypervisor).CheckParameterSyntax(hv_new)
7180 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7181 self.hv_new = hv_new # the new actual values
7182 self.hv_inst = i_hvdict # the new dict (without defaults)
7184 self.hv_new = self.hv_inst = {}
7186 # beparams processing
7187 if self.op.beparams:
7188 i_bedict, be_new = self._GetUpdatedParams(
7189 instance.beparams, self.op.beparams,
7190 cluster.beparams[constants.PP_DEFAULT],
7191 constants.BES_PARAMETER_TYPES)
7192 self.be_new = be_new # the new actual values
7193 self.be_inst = i_bedict # the new dict (without defaults)
7195 self.be_new = self.be_inst = {}
7199 if constants.BE_MEMORY in self.op.beparams and not self.force:
7200 mem_check_list = [pnode]
7201 if be_new[constants.BE_AUTO_BALANCE]:
7202 # either we changed auto_balance to yes or it was from before
7203 mem_check_list.extend(instance.secondary_nodes)
7204 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7205 instance.hypervisor)
7206 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7207 instance.hypervisor)
7208 pninfo = nodeinfo[pnode]
7209 msg = pninfo.fail_msg
7211 # Assume the primary node is unreachable and go ahead
7212 self.warn.append("Can't get info from primary node %s: %s" %
7214 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7215 self.warn.append("Node data from primary node %s doesn't contain"
7216 " free memory information" % pnode)
7217 elif instance_info.fail_msg:
7218 self.warn.append("Can't get instance runtime information: %s" %
7219 instance_info.fail_msg)
7221 if instance_info.payload:
7222 current_mem = int(instance_info.payload['memory'])
7224 # Assume instance not running
7225 # (there is a slight race condition here, but it's not very probable,
7226 # and we have no other way to check)
7228 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7229 pninfo.payload['memory_free'])
7231 raise errors.OpPrereqError("This change will prevent the instance"
7232 " from starting, due to %d MB of memory"
7233 " missing on its primary node" % miss_mem)
7235 if be_new[constants.BE_AUTO_BALANCE]:
7236 for node, nres in nodeinfo.items():
7237 if node not in instance.secondary_nodes:
7241 self.warn.append("Can't get info from secondary node %s: %s" %
7243 elif not isinstance(nres.payload.get('memory_free', None), int):
7244 self.warn.append("Secondary node %s didn't return free"
7245 " memory information" % node)
7246 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7247 self.warn.append("Not enough memory to failover instance to"
7248 " secondary node %s" % node)
7253 for nic_op, nic_dict in self.op.nics:
7254 if nic_op == constants.DDM_REMOVE:
7255 if not instance.nics:
7256 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
7258 if nic_op != constants.DDM_ADD:
7260 if nic_op < 0 or nic_op >= len(instance.nics):
7261 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7263 (nic_op, len(instance.nics)))
7264 old_nic_params = instance.nics[nic_op].nicparams
7265 old_nic_ip = instance.nics[nic_op].ip
7270 update_params_dict = dict([(key, nic_dict[key])
7271 for key in constants.NICS_PARAMETERS
7272 if key in nic_dict])
7274 if 'bridge' in nic_dict:
7275 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7277 new_nic_params, new_filled_nic_params = \
7278 self._GetUpdatedParams(old_nic_params, update_params_dict,
7279 cluster.nicparams[constants.PP_DEFAULT],
7280 constants.NICS_PARAMETER_TYPES)
7281 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7282 self.nic_pinst[nic_op] = new_nic_params
7283 self.nic_pnew[nic_op] = new_filled_nic_params
7284 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7286 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7287 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7288 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7290 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7292 self.warn.append(msg)
7294 raise errors.OpPrereqError(msg)
7295 if new_nic_mode == constants.NIC_MODE_ROUTED:
7296 if 'ip' in nic_dict:
7297 nic_ip = nic_dict['ip']
7301 raise errors.OpPrereqError('Cannot set the nic ip to None'
7303 if 'mac' in nic_dict:
7304 nic_mac = nic_dict['mac']
7306 raise errors.OpPrereqError('Cannot set the nic mac to None')
7307 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7308 # otherwise generate the mac
7309 nic_dict['mac'] = self.cfg.GenerateMAC()
7311 # or validate/reserve the current one
7312 if self.cfg.IsMacInUse(nic_mac):
7313 raise errors.OpPrereqError("MAC address %s already in use"
7314 " in cluster" % nic_mac)
7317 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7318 raise errors.OpPrereqError("Disk operations not supported for"
7319 " diskless instances")
7320 for disk_op, disk_dict in self.op.disks:
7321 if disk_op == constants.DDM_REMOVE:
7322 if len(instance.disks) == 1:
7323 raise errors.OpPrereqError("Cannot remove the last disk of"
7325 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7326 ins_l = ins_l[pnode]
7327 msg = ins_l.fail_msg
7329 raise errors.OpPrereqError("Can't contact node %s: %s" %
7331 if instance.name in ins_l.payload:
7332 raise errors.OpPrereqError("Instance is running, can't remove"
7335 if (disk_op == constants.DDM_ADD and
7336 len(instance.nics) >= constants.MAX_DISKS):
7337 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7338 " add more" % constants.MAX_DISKS)
7339 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7341 if disk_op < 0 or disk_op >= len(instance.disks):
7342 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7344 (disk_op, len(instance.disks)))
7348 def Exec(self, feedback_fn):
7349 """Modifies an instance.
7351 All parameters take effect only at the next restart of the instance.
7354 # Process here the warnings from CheckPrereq, as we don't have a
7355 # feedback_fn there.
7356 for warn in self.warn:
7357 feedback_fn("WARNING: %s" % warn)
7360 instance = self.instance
7361 cluster = self.cluster
7363 for disk_op, disk_dict in self.op.disks:
7364 if disk_op == constants.DDM_REMOVE:
7365 # remove the last disk
7366 device = instance.disks.pop()
7367 device_idx = len(instance.disks)
7368 for node, disk in device.ComputeNodeTree(instance.primary_node):
7369 self.cfg.SetDiskID(disk, node)
7370 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7372 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7373 " continuing anyway", device_idx, node, msg)
7374 result.append(("disk/%d" % device_idx, "remove"))
7375 elif disk_op == constants.DDM_ADD:
7377 if instance.disk_template == constants.DT_FILE:
7378 file_driver, file_path = instance.disks[0].logical_id
7379 file_path = os.path.dirname(file_path)
7381 file_driver = file_path = None
7382 disk_idx_base = len(instance.disks)
7383 new_disk = _GenerateDiskTemplate(self,
7384 instance.disk_template,
7385 instance.name, instance.primary_node,
7386 instance.secondary_nodes,
7391 instance.disks.append(new_disk)
7392 info = _GetInstanceInfoText(instance)
7394 logging.info("Creating volume %s for instance %s",
7395 new_disk.iv_name, instance.name)
7396 # Note: this needs to be kept in sync with _CreateDisks
7398 for node in instance.all_nodes:
7399 f_create = node == instance.primary_node
7401 _CreateBlockDev(self, node, instance, new_disk,
7402 f_create, info, f_create)
7403 except errors.OpExecError, err:
7404 self.LogWarning("Failed to create volume %s (%s) on"
7406 new_disk.iv_name, new_disk, node, err)
7407 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7408 (new_disk.size, new_disk.mode)))
7410 # change a given disk
7411 instance.disks[disk_op].mode = disk_dict['mode']
7412 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7414 for nic_op, nic_dict in self.op.nics:
7415 if nic_op == constants.DDM_REMOVE:
7416 # remove the last nic
7417 del instance.nics[-1]
7418 result.append(("nic.%d" % len(instance.nics), "remove"))
7419 elif nic_op == constants.DDM_ADD:
7420 # mac and bridge should be set, by now
7421 mac = nic_dict['mac']
7422 ip = nic_dict.get('ip', None)
7423 nicparams = self.nic_pinst[constants.DDM_ADD]
7424 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7425 instance.nics.append(new_nic)
7426 result.append(("nic.%d" % (len(instance.nics) - 1),
7427 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7428 (new_nic.mac, new_nic.ip,
7429 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7430 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7433 for key in 'mac', 'ip':
7435 setattr(instance.nics[nic_op], key, nic_dict[key])
7436 if nic_op in self.nic_pnew:
7437 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
7438 for key, val in nic_dict.iteritems():
7439 result.append(("nic.%s/%d" % (key, nic_op), val))
7442 if self.op.hvparams:
7443 instance.hvparams = self.hv_inst
7444 for key, val in self.op.hvparams.iteritems():
7445 result.append(("hv/%s" % key, val))
7448 if self.op.beparams:
7449 instance.beparams = self.be_inst
7450 for key, val in self.op.beparams.iteritems():
7451 result.append(("be/%s" % key, val))
7453 self.cfg.Update(instance)
7458 class LUQueryExports(NoHooksLU):
7459 """Query the exports list
7462 _OP_REQP = ['nodes']
7465 def ExpandNames(self):
7466 self.needed_locks = {}
7467 self.share_locks[locking.LEVEL_NODE] = 1
7468 if not self.op.nodes:
7469 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7471 self.needed_locks[locking.LEVEL_NODE] = \
7472 _GetWantedNodes(self, self.op.nodes)
7474 def CheckPrereq(self):
7475 """Check prerequisites.
7478 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7480 def Exec(self, feedback_fn):
7481 """Compute the list of all the exported system images.
7484 @return: a dictionary with the structure node->(export-list)
7485 where export-list is a list of the instances exported on
7489 rpcresult = self.rpc.call_export_list(self.nodes)
7491 for node in rpcresult:
7492 if rpcresult[node].fail_msg:
7493 result[node] = False
7495 result[node] = rpcresult[node].payload
7500 class LUExportInstance(LogicalUnit):
7501 """Export an instance to an image in the cluster.
7504 HPATH = "instance-export"
7505 HTYPE = constants.HTYPE_INSTANCE
7506 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7509 def ExpandNames(self):
7510 self._ExpandAndLockInstance()
7511 # FIXME: lock only instance primary and destination node
7513 # Sad but true, for now we have do lock all nodes, as we don't know where
7514 # the previous export might be, and and in this LU we search for it and
7515 # remove it from its current node. In the future we could fix this by:
7516 # - making a tasklet to search (share-lock all), then create the new one,
7517 # then one to remove, after
7518 # - removing the removal operation altogether
7519 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7521 def DeclareLocks(self, level):
7522 """Last minute lock declaration."""
7523 # All nodes are locked anyway, so nothing to do here.
7525 def BuildHooksEnv(self):
7528 This will run on the master, primary node and target node.
7532 "EXPORT_NODE": self.op.target_node,
7533 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7535 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7536 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7537 self.op.target_node]
7540 def CheckPrereq(self):
7541 """Check prerequisites.
7543 This checks that the instance and node names are valid.
7546 instance_name = self.op.instance_name
7547 self.instance = self.cfg.GetInstanceInfo(instance_name)
7548 assert self.instance is not None, \
7549 "Cannot retrieve locked instance %s" % self.op.instance_name
7550 _CheckNodeOnline(self, self.instance.primary_node)
7552 self.dst_node = self.cfg.GetNodeInfo(
7553 self.cfg.ExpandNodeName(self.op.target_node))
7555 if self.dst_node is None:
7556 # This is wrong node name, not a non-locked node
7557 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
7558 _CheckNodeOnline(self, self.dst_node.name)
7559 _CheckNodeNotDrained(self, self.dst_node.name)
7561 # instance disk type verification
7562 for disk in self.instance.disks:
7563 if disk.dev_type == constants.LD_FILE:
7564 raise errors.OpPrereqError("Export not supported for instances with"
7565 " file-based disks")
7567 def Exec(self, feedback_fn):
7568 """Export an instance to an image in the cluster.
7571 instance = self.instance
7572 dst_node = self.dst_node
7573 src_node = instance.primary_node
7574 if self.op.shutdown:
7575 # shutdown the instance, but not the disks
7576 result = self.rpc.call_instance_shutdown(src_node, instance)
7577 result.Raise("Could not shutdown instance %s on"
7578 " node %s" % (instance.name, src_node))
7580 vgname = self.cfg.GetVGName()
7584 # set the disks ID correctly since call_instance_start needs the
7585 # correct drbd minor to create the symlinks
7586 for disk in instance.disks:
7587 self.cfg.SetDiskID(disk, src_node)
7592 for idx, disk in enumerate(instance.disks):
7593 # result.payload will be a snapshot of an lvm leaf of the one we passed
7594 result = self.rpc.call_blockdev_snapshot(src_node, disk)
7595 msg = result.fail_msg
7597 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7599 snap_disks.append(False)
7601 disk_id = (vgname, result.payload)
7602 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7603 logical_id=disk_id, physical_id=disk_id,
7604 iv_name=disk.iv_name)
7605 snap_disks.append(new_dev)
7608 if self.op.shutdown and instance.admin_up:
7609 result = self.rpc.call_instance_start(src_node, instance, None, None)
7610 msg = result.fail_msg
7612 _ShutdownInstanceDisks(self, instance)
7613 raise errors.OpExecError("Could not start instance: %s" % msg)
7615 # TODO: check for size
7617 cluster_name = self.cfg.GetClusterName()
7618 for idx, dev in enumerate(snap_disks):
7620 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7621 instance, cluster_name, idx)
7622 msg = result.fail_msg
7624 self.LogWarning("Could not export disk/%s from node %s to"
7625 " node %s: %s", idx, src_node, dst_node.name, msg)
7626 dresults.append(False)
7628 dresults.append(True)
7629 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7631 self.LogWarning("Could not remove snapshot for disk/%d from node"
7632 " %s: %s", idx, src_node, msg)
7634 dresults.append(False)
7636 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7638 msg = result.fail_msg
7640 self.LogWarning("Could not finalize export for instance %s"
7641 " on node %s: %s", instance.name, dst_node.name, msg)
7644 nodelist = self.cfg.GetNodeList()
7645 nodelist.remove(dst_node.name)
7647 # on one-node clusters nodelist will be empty after the removal
7648 # if we proceed the backup would be removed because OpQueryExports
7649 # substitutes an empty list with the full cluster node list.
7650 iname = instance.name
7652 exportlist = self.rpc.call_export_list(nodelist)
7653 for node in exportlist:
7654 if exportlist[node].fail_msg:
7656 if iname in exportlist[node].payload:
7657 msg = self.rpc.call_export_remove(node, iname).fail_msg
7659 self.LogWarning("Could not remove older export for instance %s"
7660 " on node %s: %s", iname, node, msg)
7661 return fin_resu, dresults
7664 class LURemoveExport(NoHooksLU):
7665 """Remove exports related to the named instance.
7668 _OP_REQP = ["instance_name"]
7671 def ExpandNames(self):
7672 self.needed_locks = {}
7673 # We need all nodes to be locked in order for RemoveExport to work, but we
7674 # don't need to lock the instance itself, as nothing will happen to it (and
7675 # we can remove exports also for a removed instance)
7676 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7678 def CheckPrereq(self):
7679 """Check prerequisites.
7683 def Exec(self, feedback_fn):
7684 """Remove any export.
7687 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7688 # If the instance was not found we'll try with the name that was passed in.
7689 # This will only work if it was an FQDN, though.
7691 if not instance_name:
7693 instance_name = self.op.instance_name
7695 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7696 exportlist = self.rpc.call_export_list(locked_nodes)
7698 for node in exportlist:
7699 msg = exportlist[node].fail_msg
7701 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7703 if instance_name in exportlist[node].payload:
7705 result = self.rpc.call_export_remove(node, instance_name)
7706 msg = result.fail_msg
7708 logging.error("Could not remove export for instance %s"
7709 " on node %s: %s", instance_name, node, msg)
7711 if fqdn_warn and not found:
7712 feedback_fn("Export not found. If trying to remove an export belonging"
7713 " to a deleted instance please use its Fully Qualified"
7717 class TagsLU(NoHooksLU):
7720 This is an abstract class which is the parent of all the other tags LUs.
7724 def ExpandNames(self):
7725 self.needed_locks = {}
7726 if self.op.kind == constants.TAG_NODE:
7727 name = self.cfg.ExpandNodeName(self.op.name)
7729 raise errors.OpPrereqError("Invalid node name (%s)" %
7732 self.needed_locks[locking.LEVEL_NODE] = name
7733 elif self.op.kind == constants.TAG_INSTANCE:
7734 name = self.cfg.ExpandInstanceName(self.op.name)
7736 raise errors.OpPrereqError("Invalid instance name (%s)" %
7739 self.needed_locks[locking.LEVEL_INSTANCE] = name
7741 def CheckPrereq(self):
7742 """Check prerequisites.
7745 if self.op.kind == constants.TAG_CLUSTER:
7746 self.target = self.cfg.GetClusterInfo()
7747 elif self.op.kind == constants.TAG_NODE:
7748 self.target = self.cfg.GetNodeInfo(self.op.name)
7749 elif self.op.kind == constants.TAG_INSTANCE:
7750 self.target = self.cfg.GetInstanceInfo(self.op.name)
7752 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7756 class LUGetTags(TagsLU):
7757 """Returns the tags of a given object.
7760 _OP_REQP = ["kind", "name"]
7763 def Exec(self, feedback_fn):
7764 """Returns the tag list.
7767 return list(self.target.GetTags())
7770 class LUSearchTags(NoHooksLU):
7771 """Searches the tags for a given pattern.
7774 _OP_REQP = ["pattern"]
7777 def ExpandNames(self):
7778 self.needed_locks = {}
7780 def CheckPrereq(self):
7781 """Check prerequisites.
7783 This checks the pattern passed for validity by compiling it.
7787 self.re = re.compile(self.op.pattern)
7788 except re.error, err:
7789 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
7790 (self.op.pattern, err))
7792 def Exec(self, feedback_fn):
7793 """Returns the tag list.
7797 tgts = [("/cluster", cfg.GetClusterInfo())]
7798 ilist = cfg.GetAllInstancesInfo().values()
7799 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7800 nlist = cfg.GetAllNodesInfo().values()
7801 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7803 for path, target in tgts:
7804 for tag in target.GetTags():
7805 if self.re.search(tag):
7806 results.append((path, tag))
7810 class LUAddTags(TagsLU):
7811 """Sets a tag on a given object.
7814 _OP_REQP = ["kind", "name", "tags"]
7817 def CheckPrereq(self):
7818 """Check prerequisites.
7820 This checks the type and length of the tag name and value.
7823 TagsLU.CheckPrereq(self)
7824 for tag in self.op.tags:
7825 objects.TaggableObject.ValidateTag(tag)
7827 def Exec(self, feedback_fn):
7832 for tag in self.op.tags:
7833 self.target.AddTag(tag)
7834 except errors.TagError, err:
7835 raise errors.OpExecError("Error while setting tag: %s" % str(err))
7837 self.cfg.Update(self.target)
7838 except errors.ConfigurationError:
7839 raise errors.OpRetryError("There has been a modification to the"
7840 " config file and the operation has been"
7841 " aborted. Please retry.")
7844 class LUDelTags(TagsLU):
7845 """Delete a list of tags from a given object.
7848 _OP_REQP = ["kind", "name", "tags"]
7851 def CheckPrereq(self):
7852 """Check prerequisites.
7854 This checks that we have the given tag.
7857 TagsLU.CheckPrereq(self)
7858 for tag in self.op.tags:
7859 objects.TaggableObject.ValidateTag(tag)
7860 del_tags = frozenset(self.op.tags)
7861 cur_tags = self.target.GetTags()
7862 if not del_tags <= cur_tags:
7863 diff_tags = del_tags - cur_tags
7864 diff_names = ["'%s'" % tag for tag in diff_tags]
7866 raise errors.OpPrereqError("Tag(s) %s not found" %
7867 (",".join(diff_names)))
7869 def Exec(self, feedback_fn):
7870 """Remove the tag from the object.
7873 for tag in self.op.tags:
7874 self.target.RemoveTag(tag)
7876 self.cfg.Update(self.target)
7877 except errors.ConfigurationError:
7878 raise errors.OpRetryError("There has been a modification to the"
7879 " config file and the operation has been"
7880 " aborted. Please retry.")
7883 class LUTestDelay(NoHooksLU):
7884 """Sleep for a specified amount of time.
7886 This LU sleeps on the master and/or nodes for a specified amount of
7890 _OP_REQP = ["duration", "on_master", "on_nodes"]
7893 def ExpandNames(self):
7894 """Expand names and set required locks.
7896 This expands the node list, if any.
7899 self.needed_locks = {}
7900 if self.op.on_nodes:
7901 # _GetWantedNodes can be used here, but is not always appropriate to use
7902 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
7904 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
7905 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
7907 def CheckPrereq(self):
7908 """Check prerequisites.
7912 def Exec(self, feedback_fn):
7913 """Do the actual sleep.
7916 if self.op.on_master:
7917 if not utils.TestDelay(self.op.duration):
7918 raise errors.OpExecError("Error during master delay test")
7919 if self.op.on_nodes:
7920 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
7921 for node, node_result in result.items():
7922 node_result.Raise("Failure during rpc call to node %s" % node)
7925 class IAllocator(object):
7926 """IAllocator framework.
7928 An IAllocator instance has three sets of attributes:
7929 - cfg that is needed to query the cluster
7930 - input data (all members of the _KEYS class attribute are required)
7931 - four buffer attributes (in|out_data|text), that represent the
7932 input (to the external script) in text and data structure format,
7933 and the output from it, again in two formats
7934 - the result variables from the script (success, info, nodes) for
7939 "mem_size", "disks", "disk_template",
7940 "os", "tags", "nics", "vcpus", "hypervisor",
7946 def __init__(self, cfg, rpc, mode, name, **kwargs):
7949 # init buffer variables
7950 self.in_text = self.out_text = self.in_data = self.out_data = None
7951 # init all input fields so that pylint is happy
7954 self.mem_size = self.disks = self.disk_template = None
7955 self.os = self.tags = self.nics = self.vcpus = None
7956 self.hypervisor = None
7957 self.relocate_from = None
7959 self.required_nodes = None
7960 # init result fields
7961 self.success = self.info = self.nodes = None
7962 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7963 keyset = self._ALLO_KEYS
7964 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7965 keyset = self._RELO_KEYS
7967 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
7968 " IAllocator" % self.mode)
7970 if key not in keyset:
7971 raise errors.ProgrammerError("Invalid input parameter '%s' to"
7972 " IAllocator" % key)
7973 setattr(self, key, kwargs[key])
7975 if key not in kwargs:
7976 raise errors.ProgrammerError("Missing input parameter '%s' to"
7977 " IAllocator" % key)
7978 self._BuildInputData()
7980 def _ComputeClusterData(self):
7981 """Compute the generic allocator input data.
7983 This is the data that is independent of the actual operation.
7987 cluster_info = cfg.GetClusterInfo()
7990 "version": constants.IALLOCATOR_VERSION,
7991 "cluster_name": cfg.GetClusterName(),
7992 "cluster_tags": list(cluster_info.GetTags()),
7993 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
7994 # we don't have job IDs
7996 iinfo = cfg.GetAllInstancesInfo().values()
7997 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8001 node_list = cfg.GetNodeList()
8003 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8004 hypervisor_name = self.hypervisor
8005 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8006 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8008 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8011 self.rpc.call_all_instances_info(node_list,
8012 cluster_info.enabled_hypervisors)
8013 for nname, nresult in node_data.items():
8014 # first fill in static (config-based) values
8015 ninfo = cfg.GetNodeInfo(nname)
8017 "tags": list(ninfo.GetTags()),
8018 "primary_ip": ninfo.primary_ip,
8019 "secondary_ip": ninfo.secondary_ip,
8020 "offline": ninfo.offline,
8021 "drained": ninfo.drained,
8022 "master_candidate": ninfo.master_candidate,
8025 if not (ninfo.offline or ninfo.drained):
8026 nresult.Raise("Can't get data for node %s" % nname)
8027 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8029 remote_info = nresult.payload
8031 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8032 'vg_size', 'vg_free', 'cpu_total']:
8033 if attr not in remote_info:
8034 raise errors.OpExecError("Node '%s' didn't return attribute"
8035 " '%s'" % (nname, attr))
8036 if not isinstance(remote_info[attr], int):
8037 raise errors.OpExecError("Node '%s' returned invalid value"
8039 (nname, attr, remote_info[attr]))
8040 # compute memory used by primary instances
8041 i_p_mem = i_p_up_mem = 0
8042 for iinfo, beinfo in i_list:
8043 if iinfo.primary_node == nname:
8044 i_p_mem += beinfo[constants.BE_MEMORY]
8045 if iinfo.name not in node_iinfo[nname].payload:
8048 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8049 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8050 remote_info['memory_free'] -= max(0, i_mem_diff)
8053 i_p_up_mem += beinfo[constants.BE_MEMORY]
8055 # compute memory used by instances
8057 "total_memory": remote_info['memory_total'],
8058 "reserved_memory": remote_info['memory_dom0'],
8059 "free_memory": remote_info['memory_free'],
8060 "total_disk": remote_info['vg_size'],
8061 "free_disk": remote_info['vg_free'],
8062 "total_cpus": remote_info['cpu_total'],
8063 "i_pri_memory": i_p_mem,
8064 "i_pri_up_memory": i_p_up_mem,
8068 node_results[nname] = pnr
8069 data["nodes"] = node_results
8073 for iinfo, beinfo in i_list:
8075 for nic in iinfo.nics:
8076 filled_params = objects.FillDict(
8077 cluster_info.nicparams[constants.PP_DEFAULT],
8079 nic_dict = {"mac": nic.mac,
8081 "mode": filled_params[constants.NIC_MODE],
8082 "link": filled_params[constants.NIC_LINK],
8084 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8085 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8086 nic_data.append(nic_dict)
8088 "tags": list(iinfo.GetTags()),
8089 "admin_up": iinfo.admin_up,
8090 "vcpus": beinfo[constants.BE_VCPUS],
8091 "memory": beinfo[constants.BE_MEMORY],
8093 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8095 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8096 "disk_template": iinfo.disk_template,
8097 "hypervisor": iinfo.hypervisor,
8099 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8101 instance_data[iinfo.name] = pir
8103 data["instances"] = instance_data
8107 def _AddNewInstance(self):
8108 """Add new instance data to allocator structure.
8110 This in combination with _AllocatorGetClusterData will create the
8111 correct structure needed as input for the allocator.
8113 The checks for the completeness of the opcode must have already been
8119 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8121 if self.disk_template in constants.DTS_NET_MIRROR:
8122 self.required_nodes = 2
8124 self.required_nodes = 1
8128 "disk_template": self.disk_template,
8131 "vcpus": self.vcpus,
8132 "memory": self.mem_size,
8133 "disks": self.disks,
8134 "disk_space_total": disk_space,
8136 "required_nodes": self.required_nodes,
8138 data["request"] = request
8140 def _AddRelocateInstance(self):
8141 """Add relocate instance data to allocator structure.
8143 This in combination with _IAllocatorGetClusterData will create the
8144 correct structure needed as input for the allocator.
8146 The checks for the completeness of the opcode must have already been
8150 instance = self.cfg.GetInstanceInfo(self.name)
8151 if instance is None:
8152 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8153 " IAllocator" % self.name)
8155 if instance.disk_template not in constants.DTS_NET_MIRROR:
8156 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
8158 if len(instance.secondary_nodes) != 1:
8159 raise errors.OpPrereqError("Instance has not exactly one secondary node")
8161 self.required_nodes = 1
8162 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8163 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8168 "disk_space_total": disk_space,
8169 "required_nodes": self.required_nodes,
8170 "relocate_from": self.relocate_from,
8172 self.in_data["request"] = request
8174 def _BuildInputData(self):
8175 """Build input data structures.
8178 self._ComputeClusterData()
8180 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8181 self._AddNewInstance()
8183 self._AddRelocateInstance()
8185 self.in_text = serializer.Dump(self.in_data)
8187 def Run(self, name, validate=True, call_fn=None):
8188 """Run an instance allocator and return the results.
8192 call_fn = self.rpc.call_iallocator_runner
8194 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8195 result.Raise("Failure while running the iallocator script")
8197 self.out_text = result.payload
8199 self._ValidateResult()
8201 def _ValidateResult(self):
8202 """Process the allocator results.
8204 This will process and if successful save the result in
8205 self.out_data and the other parameters.
8209 rdict = serializer.Load(self.out_text)
8210 except Exception, err:
8211 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8213 if not isinstance(rdict, dict):
8214 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8216 for key in "success", "info", "nodes":
8217 if key not in rdict:
8218 raise errors.OpExecError("Can't parse iallocator results:"
8219 " missing key '%s'" % key)
8220 setattr(self, key, rdict[key])
8222 if not isinstance(rdict["nodes"], list):
8223 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
8225 self.out_data = rdict
8228 class LUTestAllocator(NoHooksLU):
8229 """Run allocator tests.
8231 This LU runs the allocator tests
8234 _OP_REQP = ["direction", "mode", "name"]
8236 def CheckPrereq(self):
8237 """Check prerequisites.
8239 This checks the opcode parameters depending on the director and mode test.
8242 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8243 for attr in ["name", "mem_size", "disks", "disk_template",
8244 "os", "tags", "nics", "vcpus"]:
8245 if not hasattr(self.op, attr):
8246 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8248 iname = self.cfg.ExpandInstanceName(self.op.name)
8249 if iname is not None:
8250 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8252 if not isinstance(self.op.nics, list):
8253 raise errors.OpPrereqError("Invalid parameter 'nics'")
8254 for row in self.op.nics:
8255 if (not isinstance(row, dict) or
8258 "bridge" not in row):
8259 raise errors.OpPrereqError("Invalid contents of the"
8260 " 'nics' parameter")
8261 if not isinstance(self.op.disks, list):
8262 raise errors.OpPrereqError("Invalid parameter 'disks'")
8263 for row in self.op.disks:
8264 if (not isinstance(row, dict) or
8265 "size" not in row or
8266 not isinstance(row["size"], int) or
8267 "mode" not in row or
8268 row["mode"] not in ['r', 'w']):
8269 raise errors.OpPrereqError("Invalid contents of the"
8270 " 'disks' parameter")
8271 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8272 self.op.hypervisor = self.cfg.GetHypervisorType()
8273 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8274 if not hasattr(self.op, "name"):
8275 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
8276 fname = self.cfg.ExpandInstanceName(self.op.name)
8278 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
8280 self.op.name = fname
8281 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8283 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8286 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8287 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8288 raise errors.OpPrereqError("Missing allocator name")
8289 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8290 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8293 def Exec(self, feedback_fn):
8294 """Run the allocator test.
8297 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8298 ial = IAllocator(self.cfg, self.rpc,
8301 mem_size=self.op.mem_size,
8302 disks=self.op.disks,
8303 disk_template=self.op.disk_template,
8307 vcpus=self.op.vcpus,
8308 hypervisor=self.op.hypervisor,
8311 ial = IAllocator(self.cfg, self.rpc,
8314 relocate_from=list(self.relocate_from),
8317 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8318 result = ial.in_text
8320 ial.Run(self.op.allocator, validate=False)
8321 result = ial.out_text