4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
45 class LogicalUnit(object):
46 """Logical Unit base class.
48 Subclasses must follow these rules:
49 - implement ExpandNames
50 - implement CheckPrereq (except when tasklets are used)
51 - implement Exec (except when tasklets are used)
52 - implement BuildHooksEnv
53 - redefine HPATH and HTYPE
54 - optionally redefine their run requirements:
55 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
57 Note that all commands require root permissions.
59 @ivar dry_run_result: the value (if any) that will be returned to the caller
60 in dry-run mode (signalled by opcode dry_run parameter)
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overridden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
92 self.LogStep = processor.LogStep
94 self.dry_run_result = None
99 for attr_name in self._OP_REQP:
100 attr_val = getattr(op, attr_name, None)
102 raise errors.OpPrereqError("Required parameter '%s' missing" %
105 self.CheckArguments()
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def CheckArguments(self):
118 """Check syntactic validity for the opcode arguments.
120 This method is for doing a simple syntactic check and ensure
121 validity of opcode parameters, without any cluster-related
122 checks. While the same can be accomplished in ExpandNames and/or
123 CheckPrereq, doing these separate is better because:
125 - ExpandNames is left as as purely a lock-related function
126 - CheckPrereq is run after we have acquired locks (and possible
129 The function is allowed to change the self.op attribute so that
130 later methods can no longer worry about missing parameters.
135 def ExpandNames(self):
136 """Expand names for this LU.
138 This method is called before starting to execute the opcode, and it should
139 update all the parameters of the opcode to their canonical form (e.g. a
140 short node name must be fully expanded after this method has successfully
141 completed). This way locking, hooks, logging, ecc. can work correctly.
143 LUs which implement this method must also populate the self.needed_locks
144 member, as a dict with lock levels as keys, and a list of needed lock names
147 - use an empty dict if you don't need any lock
148 - if you don't need any lock at a particular level omit that level
149 - don't put anything for the BGL level
150 - if you want all locks at a level use locking.ALL_SET as a value
152 If you need to share locks (rather than acquire them exclusively) at one
153 level you can modify self.share_locks, setting a true value (usually 1) for
154 that level. By default locks are not shared.
156 This function can also define a list of tasklets, which then will be
157 executed in order instead of the usual LU-level CheckPrereq and Exec
158 functions, if those are not defined by the LU.
162 # Acquire all nodes and one instance
163 self.needed_locks = {
164 locking.LEVEL_NODE: locking.ALL_SET,
165 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
167 # Acquire just two nodes
168 self.needed_locks = {
169 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
172 self.needed_locks = {} # No, you can't leave it to the default value None
175 # The implementation of this method is mandatory only if the new LU is
176 # concurrent, so that old LUs don't need to be changed all at the same
179 self.needed_locks = {} # Exclusive LUs don't need locks.
181 raise NotImplementedError
183 def DeclareLocks(self, level):
184 """Declare LU locking needs for a level
186 While most LUs can just declare their locking needs at ExpandNames time,
187 sometimes there's the need to calculate some locks after having acquired
188 the ones before. This function is called just before acquiring locks at a
189 particular level, but after acquiring the ones at lower levels, and permits
190 such calculations. It can be used to modify self.needed_locks, and by
191 default it does nothing.
193 This function is only called if you have something already set in
194 self.needed_locks for the level.
196 @param level: Locking level which is going to be locked
197 @type level: member of ganeti.locking.LEVELS
201 def CheckPrereq(self):
202 """Check prerequisites for this LU.
204 This method should check that the prerequisites for the execution
205 of this LU are fulfilled. It can do internode communication, but
206 it should be idempotent - no cluster or system changes are
209 The method should raise errors.OpPrereqError in case something is
210 not fulfilled. Its return value is ignored.
212 This method should also update all the parameters of the opcode to
213 their canonical form if it hasn't been done by ExpandNames before.
216 if self.tasklets is not None:
217 for (idx, tl) in enumerate(self.tasklets):
218 logging.debug("Checking prerequisites for tasklet %s/%s",
219 idx + 1, len(self.tasklets))
222 raise NotImplementedError
224 def Exec(self, feedback_fn):
227 This method should implement the actual work. It should raise
228 errors.OpExecError for failures that are somewhat dealt with in
232 if self.tasklets is not None:
233 for (idx, tl) in enumerate(self.tasklets):
234 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
237 raise NotImplementedError
239 def BuildHooksEnv(self):
240 """Build hooks environment for this LU.
242 This method should return a three-node tuple consisting of: a dict
243 containing the environment that will be used for running the
244 specific hook for this LU, a list of node names on which the hook
245 should run before the execution, and a list of node names on which
246 the hook should run after the execution.
248 The keys of the dict must not have 'GANETI_' prefixed as this will
249 be handled in the hooks runner. Also note additional keys will be
250 added by the hooks runner. If the LU doesn't define any
251 environment, an empty dict (and not None) should be returned.
253 No nodes should be returned as an empty list (and not None).
255 Note that if the HPATH for a LU class is None, this function will
259 raise NotImplementedError
261 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262 """Notify the LU about the results of its hooks.
264 This method is called every time a hooks phase is executed, and notifies
265 the Logical Unit about the hooks' result. The LU can then use it to alter
266 its result based on the hooks. By default the method does nothing and the
267 previous result is passed back unchanged but any LU can define it if it
268 wants to use the local cluster hook-scripts somehow.
270 @param phase: one of L{constants.HOOKS_PHASE_POST} or
271 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272 @param hook_results: the results of the multi-node hooks rpc call
273 @param feedback_fn: function used send feedback back to the caller
274 @param lu_result: the previous Exec result this LU had, or None
276 @return: the new Exec result, based on the previous result
282 def _ExpandAndLockInstance(self):
283 """Helper function to expand and lock an instance.
285 Many LUs that work on an instance take its name in self.op.instance_name
286 and need to expand it and then declare the expanded name for locking. This
287 function does it, and then updates self.op.instance_name to the expanded
288 name. It also initializes needed_locks as a dict, if this hasn't been done
292 if self.needed_locks is None:
293 self.needed_locks = {}
295 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296 "_ExpandAndLockInstance called with instance-level locks set"
297 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298 if expanded_name is None:
299 raise errors.OpPrereqError("Instance '%s' not known" %
300 self.op.instance_name)
301 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302 self.op.instance_name = expanded_name
304 def _LockInstancesNodes(self, primary_only=False):
305 """Helper function to declare instances' nodes for locking.
307 This function should be called after locking one or more instances to lock
308 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309 with all primary or secondary nodes for instances already locked and
310 present in self.needed_locks[locking.LEVEL_INSTANCE].
312 It should be called from DeclareLocks, and for safety only works if
313 self.recalculate_locks[locking.LEVEL_NODE] is set.
315 In the future it may grow parameters to just lock some instance's nodes, or
316 to just lock primaries or secondary nodes, if needed.
318 If should be called in DeclareLocks in a way similar to::
320 if level == locking.LEVEL_NODE:
321 self._LockInstancesNodes()
323 @type primary_only: boolean
324 @param primary_only: only lock primary nodes of locked instances
327 assert locking.LEVEL_NODE in self.recalculate_locks, \
328 "_LockInstancesNodes helper function called with no nodes to recalculate"
330 # TODO: check if we're really been called with the instance locks held
332 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333 # future we might want to have different behaviors depending on the value
334 # of self.recalculate_locks[locking.LEVEL_NODE]
336 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337 instance = self.context.cfg.GetInstanceInfo(instance_name)
338 wanted_nodes.append(instance.primary_node)
340 wanted_nodes.extend(instance.secondary_nodes)
342 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
347 del self.recalculate_locks[locking.LEVEL_NODE]
350 class NoHooksLU(LogicalUnit):
351 """Simple LU which runs no hooks.
353 This LU is intended as a parent for other LogicalUnits which will
354 run no hooks, in order to reduce duplicate code.
362 """Tasklet base class.
364 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365 they can mix legacy code with tasklets. Locking needs to be done in the LU,
366 tasklets know nothing about locks.
368 Subclasses must follow these rules:
369 - Implement CheckPrereq
373 def __init__(self, lu):
380 def CheckPrereq(self):
381 """Check prerequisites for this tasklets.
383 This method should check whether the prerequisites for the execution of
384 this tasklet are fulfilled. It can do internode communication, but it
385 should be idempotent - no cluster or system changes are allowed.
387 The method should raise errors.OpPrereqError in case something is not
388 fulfilled. Its return value is ignored.
390 This method should also update all parameters to their canonical form if it
391 hasn't been done before.
394 raise NotImplementedError
396 def Exec(self, feedback_fn):
397 """Execute the tasklet.
399 This method should implement the actual work. It should raise
400 errors.OpExecError for failures that are somewhat dealt with in code, or
404 raise NotImplementedError
407 def _GetWantedNodes(lu, nodes):
408 """Returns list of checked and expanded node names.
410 @type lu: L{LogicalUnit}
411 @param lu: the logical unit on whose behalf we execute
413 @param nodes: list of node names or None for all nodes
415 @return: the list of nodes, sorted
416 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
419 if not isinstance(nodes, list):
420 raise errors.OpPrereqError("Invalid argument type 'nodes'")
423 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424 " non-empty list of nodes whose name is to be expanded.")
428 node = lu.cfg.ExpandNodeName(name)
430 raise errors.OpPrereqError("No such node name '%s'" % name)
433 return utils.NiceSort(wanted)
436 def _GetWantedInstances(lu, instances):
437 """Returns list of checked and expanded instance names.
439 @type lu: L{LogicalUnit}
440 @param lu: the logical unit on whose behalf we execute
441 @type instances: list
442 @param instances: list of instance names or None for all instances
444 @return: the list of instances, sorted
445 @raise errors.OpPrereqError: if the instances parameter is wrong type
446 @raise errors.OpPrereqError: if any of the passed instances is not found
449 if not isinstance(instances, list):
450 raise errors.OpPrereqError("Invalid argument type 'instances'")
455 for name in instances:
456 instance = lu.cfg.ExpandInstanceName(name)
458 raise errors.OpPrereqError("No such instance name '%s'" % name)
459 wanted.append(instance)
462 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
466 def _CheckOutputFields(static, dynamic, selected):
467 """Checks whether all selected fields are valid.
469 @type static: L{utils.FieldSet}
470 @param static: static fields set
471 @type dynamic: L{utils.FieldSet}
472 @param dynamic: dynamic fields set
479 delta = f.NonMatching(selected)
481 raise errors.OpPrereqError("Unknown output fields selected: %s"
485 def _CheckBooleanOpField(op, name):
486 """Validates boolean opcode parameters.
488 This will ensure that an opcode parameter is either a boolean value,
489 or None (but that it always exists).
492 val = getattr(op, name, None)
493 if not (val is None or isinstance(val, bool)):
494 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
496 setattr(op, name, val)
499 def _CheckNodeOnline(lu, node):
500 """Ensure that a given node is online.
502 @param lu: the LU on behalf of which we make the check
503 @param node: the node to check
504 @raise errors.OpPrereqError: if the node is offline
507 if lu.cfg.GetNodeInfo(node).offline:
508 raise errors.OpPrereqError("Can't use offline node %s" % node)
511 def _CheckNodeNotDrained(lu, node):
512 """Ensure that a given node is not drained.
514 @param lu: the LU on behalf of which we make the check
515 @param node: the node to check
516 @raise errors.OpPrereqError: if the node is drained
519 if lu.cfg.GetNodeInfo(node).drained:
520 raise errors.OpPrereqError("Can't use drained node %s" % node)
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524 memory, vcpus, nics, disk_template, disks,
525 bep, hvp, hypervisor_name):
526 """Builds instance related env variables for hooks
528 This builds the hook environment from individual variables.
531 @param name: the name of the instance
532 @type primary_node: string
533 @param primary_node: the name of the instance's primary node
534 @type secondary_nodes: list
535 @param secondary_nodes: list of secondary nodes as strings
536 @type os_type: string
537 @param os_type: the name of the instance's OS
538 @type status: boolean
539 @param status: the should_run status of the instance
541 @param memory: the memory size of the instance
543 @param vcpus: the count of VCPUs the instance has
545 @param nics: list of tuples (ip, mac, mode, link) representing
546 the NICs the instance has
547 @type disk_template: string
548 @param disk_template: the disk template of the instance
550 @param disks: the list of (size, mode) pairs
552 @param bep: the backend parameters for the instance
554 @param hvp: the hypervisor parameters for the instance
555 @type hypervisor_name: string
556 @param hypervisor_name: the hypervisor for the instance
558 @return: the hook environment for this instance
567 "INSTANCE_NAME": name,
568 "INSTANCE_PRIMARY": primary_node,
569 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570 "INSTANCE_OS_TYPE": os_type,
571 "INSTANCE_STATUS": str_status,
572 "INSTANCE_MEMORY": memory,
573 "INSTANCE_VCPUS": vcpus,
574 "INSTANCE_DISK_TEMPLATE": disk_template,
575 "INSTANCE_HYPERVISOR": hypervisor_name,
579 nic_count = len(nics)
580 for idx, (ip, mac, mode, link) in enumerate(nics):
583 env["INSTANCE_NIC%d_IP" % idx] = ip
584 env["INSTANCE_NIC%d_MAC" % idx] = mac
585 env["INSTANCE_NIC%d_MODE" % idx] = mode
586 env["INSTANCE_NIC%d_LINK" % idx] = link
587 if mode == constants.NIC_MODE_BRIDGED:
588 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
592 env["INSTANCE_NIC_COUNT"] = nic_count
595 disk_count = len(disks)
596 for idx, (size, mode) in enumerate(disks):
597 env["INSTANCE_DISK%d_SIZE" % idx] = size
598 env["INSTANCE_DISK%d_MODE" % idx] = mode
602 env["INSTANCE_DISK_COUNT"] = disk_count
604 for source, kind in [(bep, "BE"), (hvp, "HV")]:
605 for key, value in source.items():
606 env["INSTANCE_%s_%s" % (kind, key)] = value
611 def _NICListToTuple(lu, nics):
612 """Build a list of nic information tuples.
614 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
615 value in LUQueryInstanceData.
617 @type lu: L{LogicalUnit}
618 @param lu: the logical unit on whose behalf we execute
619 @type nics: list of L{objects.NIC}
620 @param nics: list of nics to convert to hooks tuples
624 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
628 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
629 mode = filled_params[constants.NIC_MODE]
630 link = filled_params[constants.NIC_LINK]
631 hooks_nics.append((ip, mac, mode, link))
635 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
636 """Builds instance related env variables for hooks from an object.
638 @type lu: L{LogicalUnit}
639 @param lu: the logical unit on whose behalf we execute
640 @type instance: L{objects.Instance}
641 @param instance: the instance for which we should build the
644 @param override: dictionary with key/values that will override
647 @return: the hook environment dictionary
650 cluster = lu.cfg.GetClusterInfo()
651 bep = cluster.FillBE(instance)
652 hvp = cluster.FillHV(instance)
654 'name': instance.name,
655 'primary_node': instance.primary_node,
656 'secondary_nodes': instance.secondary_nodes,
657 'os_type': instance.os,
658 'status': instance.admin_up,
659 'memory': bep[constants.BE_MEMORY],
660 'vcpus': bep[constants.BE_VCPUS],
661 'nics': _NICListToTuple(lu, instance.nics),
662 'disk_template': instance.disk_template,
663 'disks': [(disk.size, disk.mode) for disk in instance.disks],
666 'hypervisor_name': instance.hypervisor,
669 args.update(override)
670 return _BuildInstanceHookEnv(**args)
673 def _AdjustCandidatePool(lu):
674 """Adjust the candidate pool after node operations.
677 mod_list = lu.cfg.MaintainCandidatePool()
679 lu.LogInfo("Promoted nodes to master candidate role: %s",
680 ", ".join(node.name for node in mod_list))
681 for name in mod_list:
682 lu.context.ReaddNode(name)
683 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
685 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
689 def _CheckNicsBridgesExist(lu, target_nics, target_node,
690 profile=constants.PP_DEFAULT):
691 """Check that the brigdes needed by a list of nics exist.
694 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
695 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
696 for nic in target_nics]
697 brlist = [params[constants.NIC_LINK] for params in paramslist
698 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
700 result = lu.rpc.call_bridges_exist(target_node, brlist)
701 result.Raise("Error checking bridges on destination node '%s'" %
702 target_node, prereq=True)
705 def _CheckInstanceBridgesExist(lu, instance, node=None):
706 """Check that the brigdes needed by an instance exist.
710 node = instance.primary_node
711 _CheckNicsBridgesExist(lu, instance.nics, node)
714 def _GetNodePrimaryInstances(cfg, node_name):
715 """Returns primary instances on a node.
720 for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
721 if node_name == inst.primary_node:
722 instances.append(inst)
727 def _GetNodeSecondaryInstances(cfg, node_name):
728 """Returns secondary instances on a node.
733 for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
734 if node_name in inst.secondary_nodes:
735 instances.append(inst)
740 def _GetStorageTypeArgs(cfg, storage_type):
741 """Returns the arguments for a storage type.
744 # Special case for file storage
745 if storage_type == constants.ST_FILE:
746 # storage.FileStorage wants a list of storage directories
747 return [[cfg.GetFileStorageDir()]]
752 class LUPostInitCluster(LogicalUnit):
753 """Logical unit for running hooks after cluster initialization.
756 HPATH = "cluster-init"
757 HTYPE = constants.HTYPE_CLUSTER
760 def BuildHooksEnv(self):
764 env = {"OP_TARGET": self.cfg.GetClusterName()}
765 mn = self.cfg.GetMasterNode()
768 def CheckPrereq(self):
769 """No prerequisites to check.
774 def Exec(self, feedback_fn):
781 class LUDestroyCluster(NoHooksLU):
782 """Logical unit for destroying the cluster.
787 def CheckPrereq(self):
788 """Check prerequisites.
790 This checks whether the cluster is empty.
792 Any errors are signaled by raising errors.OpPrereqError.
795 master = self.cfg.GetMasterNode()
797 nodelist = self.cfg.GetNodeList()
798 if len(nodelist) != 1 or nodelist[0] != master:
799 raise errors.OpPrereqError("There are still %d node(s) in"
800 " this cluster." % (len(nodelist) - 1))
801 instancelist = self.cfg.GetInstanceList()
803 raise errors.OpPrereqError("There are still %d instance(s) in"
804 " this cluster." % len(instancelist))
806 def Exec(self, feedback_fn):
807 """Destroys the cluster.
810 master = self.cfg.GetMasterNode()
811 result = self.rpc.call_node_stop_master(master, False)
812 result.Raise("Could not disable the master role")
813 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
814 utils.CreateBackup(priv_key)
815 utils.CreateBackup(pub_key)
819 class LUVerifyCluster(LogicalUnit):
820 """Verifies the cluster status.
823 HPATH = "cluster-verify"
824 HTYPE = constants.HTYPE_CLUSTER
825 _OP_REQP = ["skip_checks"]
828 def ExpandNames(self):
829 self.needed_locks = {
830 locking.LEVEL_NODE: locking.ALL_SET,
831 locking.LEVEL_INSTANCE: locking.ALL_SET,
833 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
835 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
836 node_result, feedback_fn, master_files,
838 """Run multiple tests against a node.
842 - compares ganeti version
843 - checks vg existence and size > 20G
844 - checks config file checksum
845 - checks ssh to other nodes
847 @type nodeinfo: L{objects.Node}
848 @param nodeinfo: the node to check
849 @param file_list: required list of files
850 @param local_cksum: dictionary of local files and their checksums
851 @param node_result: the results from the node
852 @param feedback_fn: function used to accumulate results
853 @param master_files: list of files that only masters should have
854 @param drbd_map: the useddrbd minors for this node, in
855 form of minor: (instance, must_exist) which correspond to instances
856 and their running status
857 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
862 # main result, node_result should be a non-empty dict
863 if not node_result or not isinstance(node_result, dict):
864 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
867 # compares ganeti version
868 local_version = constants.PROTOCOL_VERSION
869 remote_version = node_result.get('version', None)
870 if not (remote_version and isinstance(remote_version, (list, tuple)) and
871 len(remote_version) == 2):
872 feedback_fn(" - ERROR: connection to %s failed" % (node))
875 if local_version != remote_version[0]:
876 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
877 " node %s %s" % (local_version, node, remote_version[0]))
880 # node seems compatible, we can actually try to look into its results
884 # full package version
885 if constants.RELEASE_VERSION != remote_version[1]:
886 feedback_fn(" - WARNING: software version mismatch: master %s,"
888 (constants.RELEASE_VERSION, node, remote_version[1]))
890 # checks vg existence and size > 20G
891 if vg_name is not None:
892 vglist = node_result.get(constants.NV_VGLIST, None)
894 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
898 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
899 constants.MIN_VG_SIZE)
901 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
904 # checks config file checksum
906 remote_cksum = node_result.get(constants.NV_FILELIST, None)
907 if not isinstance(remote_cksum, dict):
909 feedback_fn(" - ERROR: node hasn't returned file checksum data")
911 for file_name in file_list:
912 node_is_mc = nodeinfo.master_candidate
913 must_have_file = file_name not in master_files
914 if file_name not in remote_cksum:
915 if node_is_mc or must_have_file:
917 feedback_fn(" - ERROR: file '%s' missing" % file_name)
918 elif remote_cksum[file_name] != local_cksum[file_name]:
919 if node_is_mc or must_have_file:
921 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
923 # not candidate and this is not a must-have file
925 feedback_fn(" - ERROR: file '%s' should not exist on non master"
926 " candidates (and the file is outdated)" % file_name)
928 # all good, except non-master/non-must have combination
929 if not node_is_mc and not must_have_file:
930 feedback_fn(" - ERROR: file '%s' should not exist on non master"
931 " candidates" % file_name)
935 if constants.NV_NODELIST not in node_result:
937 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
939 if node_result[constants.NV_NODELIST]:
941 for node in node_result[constants.NV_NODELIST]:
942 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
943 (node, node_result[constants.NV_NODELIST][node]))
945 if constants.NV_NODENETTEST not in node_result:
947 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
949 if node_result[constants.NV_NODENETTEST]:
951 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
953 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
954 (node, node_result[constants.NV_NODENETTEST][node]))
956 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
957 if isinstance(hyp_result, dict):
958 for hv_name, hv_result in hyp_result.iteritems():
959 if hv_result is not None:
960 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
961 (hv_name, hv_result))
963 # check used drbd list
964 if vg_name is not None:
965 used_minors = node_result.get(constants.NV_DRBDLIST, [])
966 if not isinstance(used_minors, (tuple, list)):
967 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
970 for minor, (iname, must_exist) in drbd_map.items():
971 if minor not in used_minors and must_exist:
972 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
973 " not active" % (minor, iname))
975 for minor in used_minors:
976 if minor not in drbd_map:
977 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
983 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
984 node_instance, feedback_fn, n_offline):
985 """Verify an instance.
987 This function checks to see if the required block devices are
988 available on the instance's node.
993 node_current = instanceconfig.primary_node
996 instanceconfig.MapLVsByNode(node_vol_should)
998 for node in node_vol_should:
999 if node in n_offline:
1000 # ignore missing volumes on offline nodes
1002 for volume in node_vol_should[node]:
1003 if node not in node_vol_is or volume not in node_vol_is[node]:
1004 feedback_fn(" - ERROR: volume %s missing on node %s" %
1008 if instanceconfig.admin_up:
1009 if ((node_current not in node_instance or
1010 not instance in node_instance[node_current]) and
1011 node_current not in n_offline):
1012 feedback_fn(" - ERROR: instance %s not running on node %s" %
1013 (instance, node_current))
1016 for node in node_instance:
1017 if (not node == node_current):
1018 if instance in node_instance[node]:
1019 feedback_fn(" - ERROR: instance %s should not run on node %s" %
1025 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
1026 """Verify if there are any unknown volumes in the cluster.
1028 The .os, .swap and backup volumes are ignored. All other volumes are
1029 reported as unknown.
1034 for node in node_vol_is:
1035 for volume in node_vol_is[node]:
1036 if node not in node_vol_should or volume not in node_vol_should[node]:
1037 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
1042 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
1043 """Verify the list of running instances.
1045 This checks what instances are running but unknown to the cluster.
1049 for node in node_instance:
1050 for runninginstance in node_instance[node]:
1051 if runninginstance not in instancelist:
1052 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
1053 (runninginstance, node))
1057 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1058 """Verify N+1 Memory Resilience.
1060 Check that if one single node dies we can still start all the instances it
1066 for node, nodeinfo in node_info.iteritems():
1067 # This code checks that every node which is now listed as secondary has
1068 # enough memory to host all instances it is supposed to should a single
1069 # other node in the cluster fail.
1070 # FIXME: not ready for failover to an arbitrary node
1071 # FIXME: does not support file-backed instances
1072 # WARNING: we currently take into account down instances as well as up
1073 # ones, considering that even if they're down someone might want to start
1074 # them even in the event of a node failure.
1075 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1077 for instance in instances:
1078 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1079 if bep[constants.BE_AUTO_BALANCE]:
1080 needed_mem += bep[constants.BE_MEMORY]
1081 if nodeinfo['mfree'] < needed_mem:
1082 feedback_fn(" - ERROR: not enough memory on node %s to accommodate"
1083 " failovers should node %s fail" % (node, prinode))
1087 def CheckPrereq(self):
1088 """Check prerequisites.
1090 Transform the list of checks we're going to skip into a set and check that
1091 all its members are valid.
1094 self.skip_set = frozenset(self.op.skip_checks)
1095 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1096 raise errors.OpPrereqError("Invalid checks to be skipped specified")
1098 def BuildHooksEnv(self):
1101 Cluster-Verify hooks just ran in the post phase and their failure makes
1102 the output be logged in the verify output and the verification to fail.
1105 all_nodes = self.cfg.GetNodeList()
1107 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1109 for node in self.cfg.GetAllNodesInfo().values():
1110 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1112 return env, [], all_nodes
1114 def Exec(self, feedback_fn):
1115 """Verify integrity of cluster, performing various test on nodes.
1119 feedback_fn("* Verifying global settings")
1120 for msg in self.cfg.VerifyConfig():
1121 feedback_fn(" - ERROR: %s" % msg)
1123 vg_name = self.cfg.GetVGName()
1124 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1125 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1126 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1127 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1128 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1129 for iname in instancelist)
1130 i_non_redundant = [] # Non redundant instances
1131 i_non_a_balanced = [] # Non auto-balanced instances
1132 n_offline = [] # List of offline nodes
1133 n_drained = [] # List of nodes being drained
1139 # FIXME: verify OS list
1140 # do local checksums
1141 master_files = [constants.CLUSTER_CONF_FILE]
1143 file_names = ssconf.SimpleStore().GetFileList()
1144 file_names.append(constants.SSL_CERT_FILE)
1145 file_names.append(constants.RAPI_CERT_FILE)
1146 file_names.extend(master_files)
1148 local_checksums = utils.FingerprintFiles(file_names)
1150 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1151 node_verify_param = {
1152 constants.NV_FILELIST: file_names,
1153 constants.NV_NODELIST: [node.name for node in nodeinfo
1154 if not node.offline],
1155 constants.NV_HYPERVISOR: hypervisors,
1156 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1157 node.secondary_ip) for node in nodeinfo
1158 if not node.offline],
1159 constants.NV_INSTANCELIST: hypervisors,
1160 constants.NV_VERSION: None,
1161 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1163 if vg_name is not None:
1164 node_verify_param[constants.NV_VGLIST] = None
1165 node_verify_param[constants.NV_LVLIST] = vg_name
1166 node_verify_param[constants.NV_DRBDLIST] = None
1167 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1168 self.cfg.GetClusterName())
1170 cluster = self.cfg.GetClusterInfo()
1171 master_node = self.cfg.GetMasterNode()
1172 all_drbd_map = self.cfg.ComputeDRBDMap()
1174 for node_i in nodeinfo:
1178 feedback_fn("* Skipping offline node %s" % (node,))
1179 n_offline.append(node)
1182 if node == master_node:
1184 elif node_i.master_candidate:
1185 ntype = "master candidate"
1186 elif node_i.drained:
1188 n_drained.append(node)
1191 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1193 msg = all_nvinfo[node].fail_msg
1195 feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg))
1199 nresult = all_nvinfo[node].payload
1201 for minor, instance in all_drbd_map[node].items():
1202 if instance not in instanceinfo:
1203 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1205 # ghost instance should not be running, but otherwise we
1206 # don't give double warnings (both ghost instance and
1207 # unallocated minor in use)
1208 node_drbd[minor] = (instance, False)
1210 instance = instanceinfo[instance]
1211 node_drbd[minor] = (instance.name, instance.admin_up)
1212 result = self._VerifyNode(node_i, file_names, local_checksums,
1213 nresult, feedback_fn, master_files,
1217 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1219 node_volume[node] = {}
1220 elif isinstance(lvdata, basestring):
1221 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1222 (node, utils.SafeEncode(lvdata)))
1224 node_volume[node] = {}
1225 elif not isinstance(lvdata, dict):
1226 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1230 node_volume[node] = lvdata
1233 idata = nresult.get(constants.NV_INSTANCELIST, None)
1234 if not isinstance(idata, list):
1235 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1240 node_instance[node] = idata
1243 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1244 if not isinstance(nodeinfo, dict):
1245 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1251 "mfree": int(nodeinfo['memory_free']),
1254 # dictionary holding all instances this node is secondary for,
1255 # grouped by their primary node. Each key is a cluster node, and each
1256 # value is a list of instances which have the key as primary and the
1257 # current node as secondary. this is handy to calculate N+1 memory
1258 # availability if you can only failover from a primary to its
1260 "sinst-by-pnode": {},
1262 # FIXME: devise a free space model for file based instances as well
1263 if vg_name is not None:
1264 if (constants.NV_VGLIST not in nresult or
1265 vg_name not in nresult[constants.NV_VGLIST]):
1266 feedback_fn(" - ERROR: node %s didn't return data for the"
1267 " volume group '%s' - it is either missing or broken" %
1271 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1272 except (ValueError, KeyError):
1273 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1274 " from node %s" % (node,))
1278 node_vol_should = {}
1280 for instance in instancelist:
1281 feedback_fn("* Verifying instance %s" % instance)
1282 inst_config = instanceinfo[instance]
1283 result = self._VerifyInstance(instance, inst_config, node_volume,
1284 node_instance, feedback_fn, n_offline)
1286 inst_nodes_offline = []
1288 inst_config.MapLVsByNode(node_vol_should)
1290 instance_cfg[instance] = inst_config
1292 pnode = inst_config.primary_node
1293 if pnode in node_info:
1294 node_info[pnode]['pinst'].append(instance)
1295 elif pnode not in n_offline:
1296 feedback_fn(" - ERROR: instance %s, connection to primary node"
1297 " %s failed" % (instance, pnode))
1300 if pnode in n_offline:
1301 inst_nodes_offline.append(pnode)
1303 # If the instance is non-redundant we cannot survive losing its primary
1304 # node, so we are not N+1 compliant. On the other hand we have no disk
1305 # templates with more than one secondary so that situation is not well
1307 # FIXME: does not support file-backed instances
1308 if len(inst_config.secondary_nodes) == 0:
1309 i_non_redundant.append(instance)
1310 elif len(inst_config.secondary_nodes) > 1:
1311 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1314 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1315 i_non_a_balanced.append(instance)
1317 for snode in inst_config.secondary_nodes:
1318 if snode in node_info:
1319 node_info[snode]['sinst'].append(instance)
1320 if pnode not in node_info[snode]['sinst-by-pnode']:
1321 node_info[snode]['sinst-by-pnode'][pnode] = []
1322 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1323 elif snode not in n_offline:
1324 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1325 " %s failed" % (instance, snode))
1327 if snode in n_offline:
1328 inst_nodes_offline.append(snode)
1330 if inst_nodes_offline:
1331 # warn that the instance lives on offline nodes, and set bad=True
1332 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1333 ", ".join(inst_nodes_offline))
1336 feedback_fn("* Verifying orphan volumes")
1337 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1341 feedback_fn("* Verifying remaining instances")
1342 result = self._VerifyOrphanInstances(instancelist, node_instance,
1346 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1347 feedback_fn("* Verifying N+1 Memory redundancy")
1348 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1351 feedback_fn("* Other Notes")
1353 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1354 % len(i_non_redundant))
1356 if i_non_a_balanced:
1357 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1358 % len(i_non_a_balanced))
1361 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1364 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1368 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1369 """Analyze the post-hooks' result
1371 This method analyses the hook result, handles it, and sends some
1372 nicely-formatted feedback back to the user.
1374 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1375 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1376 @param hooks_results: the results of the multi-node hooks rpc call
1377 @param feedback_fn: function used send feedback back to the caller
1378 @param lu_result: previous Exec result
1379 @return: the new Exec result, based on the previous result
1383 # We only really run POST phase hooks, and are only interested in
1385 if phase == constants.HOOKS_PHASE_POST:
1386 # Used to change hooks' output to proper indentation
1387 indent_re = re.compile('^', re.M)
1388 feedback_fn("* Hooks Results")
1389 if not hooks_results:
1390 feedback_fn(" - ERROR: general communication failure")
1393 for node_name in hooks_results:
1394 show_node_header = True
1395 res = hooks_results[node_name]
1399 # no need to warn or set fail return value
1401 feedback_fn(" Communication failure in hooks execution: %s" %
1405 for script, hkr, output in res.payload:
1406 if hkr == constants.HKR_FAIL:
1407 # The node header is only shown once, if there are
1408 # failing hooks on that node
1409 if show_node_header:
1410 feedback_fn(" Node %s:" % node_name)
1411 show_node_header = False
1412 feedback_fn(" ERROR: Script %s failed, output:" % script)
1413 output = indent_re.sub(' ', output)
1414 feedback_fn("%s" % output)
1420 class LUVerifyDisks(NoHooksLU):
1421 """Verifies the cluster disks status.
1427 def ExpandNames(self):
1428 self.needed_locks = {
1429 locking.LEVEL_NODE: locking.ALL_SET,
1430 locking.LEVEL_INSTANCE: locking.ALL_SET,
1432 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1434 def CheckPrereq(self):
1435 """Check prerequisites.
1437 This has no prerequisites.
1442 def Exec(self, feedback_fn):
1443 """Verify integrity of cluster disks.
1445 @rtype: tuple of three items
1446 @return: a tuple of (dict of node-to-node_error, list of instances
1447 which need activate-disks, dict of instance: (node, volume) for
1451 result = res_nodes, res_instances, res_missing = {}, [], {}
1453 vg_name = self.cfg.GetVGName()
1454 nodes = utils.NiceSort(self.cfg.GetNodeList())
1455 instances = [self.cfg.GetInstanceInfo(name)
1456 for name in self.cfg.GetInstanceList()]
1459 for inst in instances:
1461 if (not inst.admin_up or
1462 inst.disk_template not in constants.DTS_NET_MIRROR):
1464 inst.MapLVsByNode(inst_lvs)
1465 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1466 for node, vol_list in inst_lvs.iteritems():
1467 for vol in vol_list:
1468 nv_dict[(node, vol)] = inst
1473 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1477 node_res = node_lvs[node]
1478 if node_res.offline:
1480 msg = node_res.fail_msg
1482 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1483 res_nodes[node] = msg
1486 lvs = node_res.payload
1487 for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1488 inst = nv_dict.pop((node, lv_name), None)
1489 if (not lv_online and inst is not None
1490 and inst.name not in res_instances):
1491 res_instances.append(inst.name)
1493 # any leftover items in nv_dict are missing LVs, let's arrange the
1495 for key, inst in nv_dict.iteritems():
1496 if inst.name not in res_missing:
1497 res_missing[inst.name] = []
1498 res_missing[inst.name].append(key)
1503 class LURepairDiskSizes(NoHooksLU):
1504 """Verifies the cluster disks sizes.
1507 _OP_REQP = ["instances"]
1510 def ExpandNames(self):
1512 if not isinstance(self.op.instances, list):
1513 raise errors.OpPrereqError("Invalid argument type 'instances'")
1515 if self.op.instances:
1516 self.wanted_names = []
1517 for name in self.op.instances:
1518 full_name = self.cfg.ExpandInstanceName(name)
1519 if full_name is None:
1520 raise errors.OpPrereqError("Instance '%s' not known" % name)
1521 self.wanted_names.append(full_name)
1522 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1523 self.needed_locks = {
1524 locking.LEVEL_NODE: [],
1525 locking.LEVEL_INSTANCE: self.wanted_names,
1527 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1529 self.wanted_names = None
1530 self.needed_locks = {
1531 locking.LEVEL_NODE: locking.ALL_SET,
1532 locking.LEVEL_INSTANCE: locking.ALL_SET,
1534 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1536 def DeclareLocks(self, level):
1537 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1538 self._LockInstancesNodes(primary_only=True)
1540 def CheckPrereq(self):
1541 """Check prerequisites.
1543 This only checks the optional instance list against the existing names.
1546 if self.wanted_names is None:
1547 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1549 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1550 in self.wanted_names]
1552 def Exec(self, feedback_fn):
1553 """Verify the size of cluster disks.
1556 # TODO: check child disks too
1557 # TODO: check differences in size between primary/secondary nodes
1559 for instance in self.wanted_instances:
1560 pnode = instance.primary_node
1561 if pnode not in per_node_disks:
1562 per_node_disks[pnode] = []
1563 for idx, disk in enumerate(instance.disks):
1564 per_node_disks[pnode].append((instance, idx, disk))
1567 for node, dskl in per_node_disks.items():
1568 result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1570 self.LogWarning("Failure in blockdev_getsizes call to node"
1571 " %s, ignoring", node)
1573 if len(result.data) != len(dskl):
1574 self.LogWarning("Invalid result from node %s, ignoring node results",
1577 for ((instance, idx, disk), size) in zip(dskl, result.data):
1579 self.LogWarning("Disk %d of instance %s did not return size"
1580 " information, ignoring", idx, instance.name)
1582 if not isinstance(size, (int, long)):
1583 self.LogWarning("Disk %d of instance %s did not return valid"
1584 " size information, ignoring", idx, instance.name)
1587 if size != disk.size:
1588 self.LogInfo("Disk %d of instance %s has mismatched size,"
1589 " correcting: recorded %d, actual %d", idx,
1590 instance.name, disk.size, size)
1592 self.cfg.Update(instance)
1593 changed.append((instance.name, idx, size))
1597 class LURenameCluster(LogicalUnit):
1598 """Rename the cluster.
1601 HPATH = "cluster-rename"
1602 HTYPE = constants.HTYPE_CLUSTER
1605 def BuildHooksEnv(self):
1610 "OP_TARGET": self.cfg.GetClusterName(),
1611 "NEW_NAME": self.op.name,
1613 mn = self.cfg.GetMasterNode()
1614 return env, [mn], [mn]
1616 def CheckPrereq(self):
1617 """Verify that the passed name is a valid one.
1620 hostname = utils.HostInfo(self.op.name)
1622 new_name = hostname.name
1623 self.ip = new_ip = hostname.ip
1624 old_name = self.cfg.GetClusterName()
1625 old_ip = self.cfg.GetMasterIP()
1626 if new_name == old_name and new_ip == old_ip:
1627 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1628 " cluster has changed")
1629 if new_ip != old_ip:
1630 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1631 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1632 " reachable on the network. Aborting." %
1635 self.op.name = new_name
1637 def Exec(self, feedback_fn):
1638 """Rename the cluster.
1641 clustername = self.op.name
1644 # shutdown the master IP
1645 master = self.cfg.GetMasterNode()
1646 result = self.rpc.call_node_stop_master(master, False)
1647 result.Raise("Could not disable the master role")
1650 cluster = self.cfg.GetClusterInfo()
1651 cluster.cluster_name = clustername
1652 cluster.master_ip = ip
1653 self.cfg.Update(cluster)
1655 # update the known hosts file
1656 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1657 node_list = self.cfg.GetNodeList()
1659 node_list.remove(master)
1662 result = self.rpc.call_upload_file(node_list,
1663 constants.SSH_KNOWN_HOSTS_FILE)
1664 for to_node, to_result in result.iteritems():
1665 msg = to_result.fail_msg
1667 msg = ("Copy of file %s to node %s failed: %s" %
1668 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1669 self.proc.LogWarning(msg)
1672 result = self.rpc.call_node_start_master(master, False, False)
1673 msg = result.fail_msg
1675 self.LogWarning("Could not re-enable the master role on"
1676 " the master, please restart manually: %s", msg)
1679 def _RecursiveCheckIfLVMBased(disk):
1680 """Check if the given disk or its children are lvm-based.
1682 @type disk: L{objects.Disk}
1683 @param disk: the disk to check
1685 @return: boolean indicating whether a LD_LV dev_type was found or not
1689 for chdisk in disk.children:
1690 if _RecursiveCheckIfLVMBased(chdisk):
1692 return disk.dev_type == constants.LD_LV
1695 class LUSetClusterParams(LogicalUnit):
1696 """Change the parameters of the cluster.
1699 HPATH = "cluster-modify"
1700 HTYPE = constants.HTYPE_CLUSTER
1704 def CheckArguments(self):
1708 if not hasattr(self.op, "candidate_pool_size"):
1709 self.op.candidate_pool_size = None
1710 if self.op.candidate_pool_size is not None:
1712 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1713 except (ValueError, TypeError), err:
1714 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1716 if self.op.candidate_pool_size < 1:
1717 raise errors.OpPrereqError("At least one master candidate needed")
1719 def ExpandNames(self):
1720 # FIXME: in the future maybe other cluster params won't require checking on
1721 # all nodes to be modified.
1722 self.needed_locks = {
1723 locking.LEVEL_NODE: locking.ALL_SET,
1725 self.share_locks[locking.LEVEL_NODE] = 1
1727 def BuildHooksEnv(self):
1732 "OP_TARGET": self.cfg.GetClusterName(),
1733 "NEW_VG_NAME": self.op.vg_name,
1735 mn = self.cfg.GetMasterNode()
1736 return env, [mn], [mn]
1738 def CheckPrereq(self):
1739 """Check prerequisites.
1741 This checks whether the given params don't conflict and
1742 if the given volume group is valid.
1745 if self.op.vg_name is not None and not self.op.vg_name:
1746 instances = self.cfg.GetAllInstancesInfo().values()
1747 for inst in instances:
1748 for disk in inst.disks:
1749 if _RecursiveCheckIfLVMBased(disk):
1750 raise errors.OpPrereqError("Cannot disable lvm storage while"
1751 " lvm-based instances exist")
1753 node_list = self.acquired_locks[locking.LEVEL_NODE]
1755 # if vg_name not None, checks given volume group on all nodes
1757 vglist = self.rpc.call_vg_list(node_list)
1758 for node in node_list:
1759 msg = vglist[node].fail_msg
1761 # ignoring down node
1762 self.LogWarning("Error while gathering data on node %s"
1763 " (ignoring node): %s", node, msg)
1765 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1767 constants.MIN_VG_SIZE)
1769 raise errors.OpPrereqError("Error on node '%s': %s" %
1772 self.cluster = cluster = self.cfg.GetClusterInfo()
1773 # validate params changes
1774 if self.op.beparams:
1775 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1776 self.new_beparams = objects.FillDict(
1777 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1779 if self.op.nicparams:
1780 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1781 self.new_nicparams = objects.FillDict(
1782 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1783 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1785 # hypervisor list/parameters
1786 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1787 if self.op.hvparams:
1788 if not isinstance(self.op.hvparams, dict):
1789 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1790 for hv_name, hv_dict in self.op.hvparams.items():
1791 if hv_name not in self.new_hvparams:
1792 self.new_hvparams[hv_name] = hv_dict
1794 self.new_hvparams[hv_name].update(hv_dict)
1796 if self.op.enabled_hypervisors is not None:
1797 self.hv_list = self.op.enabled_hypervisors
1798 if not self.hv_list:
1799 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1800 " least one member")
1801 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1803 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1804 " entries: %s" % invalid_hvs)
1806 self.hv_list = cluster.enabled_hypervisors
1808 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1809 # either the enabled list has changed, or the parameters have, validate
1810 for hv_name, hv_params in self.new_hvparams.items():
1811 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1812 (self.op.enabled_hypervisors and
1813 hv_name in self.op.enabled_hypervisors)):
1814 # either this is a new hypervisor, or its parameters have changed
1815 hv_class = hypervisor.GetHypervisor(hv_name)
1816 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1817 hv_class.CheckParameterSyntax(hv_params)
1818 _CheckHVParams(self, node_list, hv_name, hv_params)
1820 def Exec(self, feedback_fn):
1821 """Change the parameters of the cluster.
1824 if self.op.vg_name is not None:
1825 new_volume = self.op.vg_name
1828 if new_volume != self.cfg.GetVGName():
1829 self.cfg.SetVGName(new_volume)
1831 feedback_fn("Cluster LVM configuration already in desired"
1832 " state, not changing")
1833 if self.op.hvparams:
1834 self.cluster.hvparams = self.new_hvparams
1835 if self.op.enabled_hypervisors is not None:
1836 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1837 if self.op.beparams:
1838 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1839 if self.op.nicparams:
1840 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1842 if self.op.candidate_pool_size is not None:
1843 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1844 # we need to update the pool size here, otherwise the save will fail
1845 _AdjustCandidatePool(self)
1847 self.cfg.Update(self.cluster)
1850 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1851 """Distribute additional files which are part of the cluster configuration.
1853 ConfigWriter takes care of distributing the config and ssconf files, but
1854 there are more files which should be distributed to all nodes. This function
1855 makes sure those are copied.
1857 @param lu: calling logical unit
1858 @param additional_nodes: list of nodes not in the config to distribute to
1861 # 1. Gather target nodes
1862 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1863 dist_nodes = lu.cfg.GetNodeList()
1864 if additional_nodes is not None:
1865 dist_nodes.extend(additional_nodes)
1866 if myself.name in dist_nodes:
1867 dist_nodes.remove(myself.name)
1868 # 2. Gather files to distribute
1869 dist_files = set([constants.ETC_HOSTS,
1870 constants.SSH_KNOWN_HOSTS_FILE,
1871 constants.RAPI_CERT_FILE,
1872 constants.RAPI_USERS_FILE,
1873 constants.HMAC_CLUSTER_KEY,
1876 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1877 for hv_name in enabled_hypervisors:
1878 hv_class = hypervisor.GetHypervisor(hv_name)
1879 dist_files.update(hv_class.GetAncillaryFiles())
1881 # 3. Perform the files upload
1882 for fname in dist_files:
1883 if os.path.exists(fname):
1884 result = lu.rpc.call_upload_file(dist_nodes, fname)
1885 for to_node, to_result in result.items():
1886 msg = to_result.fail_msg
1888 msg = ("Copy of file %s to node %s failed: %s" %
1889 (fname, to_node, msg))
1890 lu.proc.LogWarning(msg)
1893 class LURedistributeConfig(NoHooksLU):
1894 """Force the redistribution of cluster configuration.
1896 This is a very simple LU.
1902 def ExpandNames(self):
1903 self.needed_locks = {
1904 locking.LEVEL_NODE: locking.ALL_SET,
1906 self.share_locks[locking.LEVEL_NODE] = 1
1908 def CheckPrereq(self):
1909 """Check prerequisites.
1913 def Exec(self, feedback_fn):
1914 """Redistribute the configuration.
1917 self.cfg.Update(self.cfg.GetClusterInfo())
1918 _RedistributeAncillaryFiles(self)
1921 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1922 """Sleep and poll for an instance's disk to sync.
1925 if not instance.disks:
1929 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1931 node = instance.primary_node
1933 for dev in instance.disks:
1934 lu.cfg.SetDiskID(dev, node)
1937 degr_retries = 10 # in seconds, as we sleep 1 second each time
1941 cumul_degraded = False
1942 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1943 msg = rstats.fail_msg
1945 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1948 raise errors.RemoteError("Can't contact node %s for mirror data,"
1949 " aborting." % node)
1952 rstats = rstats.payload
1954 for i, mstat in enumerate(rstats):
1956 lu.LogWarning("Can't compute data for node %s/%s",
1957 node, instance.disks[i].iv_name)
1960 cumul_degraded = (cumul_degraded or
1961 (mstat.is_degraded and mstat.sync_percent is None))
1962 if mstat.sync_percent is not None:
1964 if mstat.estimated_time is not None:
1965 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
1966 max_time = mstat.estimated_time
1968 rem_time = "no time estimate"
1969 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1970 (instance.disks[i].iv_name, mstat.sync_percent, rem_time))
1972 # if we're done but degraded, let's do a few small retries, to
1973 # make sure we see a stable and not transient situation; therefore
1974 # we force restart of the loop
1975 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1976 logging.info("Degraded disks found, %d retries left", degr_retries)
1984 time.sleep(min(60, max_time))
1987 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1988 return not cumul_degraded
1991 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1992 """Check that mirrors are not degraded.
1994 The ldisk parameter, if True, will change the test from the
1995 is_degraded attribute (which represents overall non-ok status for
1996 the device(s)) to the ldisk (representing the local storage status).
1999 lu.cfg.SetDiskID(dev, node)
2003 if on_primary or dev.AssembleOnSecondary():
2004 rstats = lu.rpc.call_blockdev_find(node, dev)
2005 msg = rstats.fail_msg
2007 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2009 elif not rstats.payload:
2010 lu.LogWarning("Can't find disk on node %s", node)
2014 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2016 result = result and not rstats.payload.is_degraded
2019 for child in dev.children:
2020 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2025 class LUDiagnoseOS(NoHooksLU):
2026 """Logical unit for OS diagnose/query.
2029 _OP_REQP = ["output_fields", "names"]
2031 _FIELDS_STATIC = utils.FieldSet()
2032 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
2034 def ExpandNames(self):
2036 raise errors.OpPrereqError("Selective OS query not supported")
2038 _CheckOutputFields(static=self._FIELDS_STATIC,
2039 dynamic=self._FIELDS_DYNAMIC,
2040 selected=self.op.output_fields)
2042 # Lock all nodes, in shared mode
2043 # Temporary removal of locks, should be reverted later
2044 # TODO: reintroduce locks when they are lighter-weight
2045 self.needed_locks = {}
2046 #self.share_locks[locking.LEVEL_NODE] = 1
2047 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2049 def CheckPrereq(self):
2050 """Check prerequisites.
2055 def _DiagnoseByOS(node_list, rlist):
2056 """Remaps a per-node return list into an a per-os per-node dictionary
2058 @param node_list: a list with the names of all nodes
2059 @param rlist: a map with node names as keys and OS objects as values
2062 @return: a dictionary with osnames as keys and as value another map, with
2063 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2065 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2066 (/srv/..., False, "invalid api")],
2067 "node2": [(/srv/..., True, "")]}
2072 # we build here the list of nodes that didn't fail the RPC (at RPC
2073 # level), so that nodes with a non-responding node daemon don't
2074 # make all OSes invalid
2075 good_nodes = [node_name for node_name in rlist
2076 if not rlist[node_name].fail_msg]
2077 for node_name, nr in rlist.items():
2078 if nr.fail_msg or not nr.payload:
2080 for name, path, status, diagnose in nr.payload:
2081 if name not in all_os:
2082 # build a list of nodes for this os containing empty lists
2083 # for each node in node_list
2085 for nname in good_nodes:
2086 all_os[name][nname] = []
2087 all_os[name][node_name].append((path, status, diagnose))
2090 def Exec(self, feedback_fn):
2091 """Compute the list of OSes.
2094 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2095 node_data = self.rpc.call_os_diagnose(valid_nodes)
2096 pol = self._DiagnoseByOS(valid_nodes, node_data)
2098 for os_name, os_data in pol.items():
2100 for field in self.op.output_fields:
2103 elif field == "valid":
2104 val = utils.all([osl and osl[0][1] for osl in os_data.values()])
2105 elif field == "node_status":
2106 # this is just a copy of the dict
2108 for node_name, nos_list in os_data.items():
2109 val[node_name] = nos_list
2111 raise errors.ParameterError(field)
2118 class LURemoveNode(LogicalUnit):
2119 """Logical unit for removing a node.
2122 HPATH = "node-remove"
2123 HTYPE = constants.HTYPE_NODE
2124 _OP_REQP = ["node_name"]
2126 def BuildHooksEnv(self):
2129 This doesn't run on the target node in the pre phase as a failed
2130 node would then be impossible to remove.
2134 "OP_TARGET": self.op.node_name,
2135 "NODE_NAME": self.op.node_name,
2137 all_nodes = self.cfg.GetNodeList()
2138 all_nodes.remove(self.op.node_name)
2139 return env, all_nodes, all_nodes
2141 def CheckPrereq(self):
2142 """Check prerequisites.
2145 - the node exists in the configuration
2146 - it does not have primary or secondary instances
2147 - it's not the master
2149 Any errors are signaled by raising errors.OpPrereqError.
2152 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2154 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2156 instance_list = self.cfg.GetInstanceList()
2158 masternode = self.cfg.GetMasterNode()
2159 if node.name == masternode:
2160 raise errors.OpPrereqError("Node is the master node,"
2161 " you need to failover first.")
2163 for instance_name in instance_list:
2164 instance = self.cfg.GetInstanceInfo(instance_name)
2165 if node.name in instance.all_nodes:
2166 raise errors.OpPrereqError("Instance %s is still running on the node,"
2167 " please remove first." % instance_name)
2168 self.op.node_name = node.name
2171 def Exec(self, feedback_fn):
2172 """Removes the node from the cluster.
2176 logging.info("Stopping the node daemon and removing configs from node %s",
2179 self.context.RemoveNode(node.name)
2181 result = self.rpc.call_node_leave_cluster(node.name)
2182 msg = result.fail_msg
2184 self.LogWarning("Errors encountered on the remote node while leaving"
2185 " the cluster: %s", msg)
2187 # Promote nodes to master candidate as needed
2188 _AdjustCandidatePool(self)
2191 class LUQueryNodes(NoHooksLU):
2192 """Logical unit for querying nodes.
2195 _OP_REQP = ["output_fields", "names", "use_locking"]
2197 _FIELDS_DYNAMIC = utils.FieldSet(
2199 "mtotal", "mnode", "mfree",
2201 "ctotal", "cnodes", "csockets",
2204 _FIELDS_STATIC = utils.FieldSet(
2205 "name", "pinst_cnt", "sinst_cnt",
2206 "pinst_list", "sinst_list",
2207 "pip", "sip", "tags",
2216 def ExpandNames(self):
2217 _CheckOutputFields(static=self._FIELDS_STATIC,
2218 dynamic=self._FIELDS_DYNAMIC,
2219 selected=self.op.output_fields)
2221 self.needed_locks = {}
2222 self.share_locks[locking.LEVEL_NODE] = 1
2225 self.wanted = _GetWantedNodes(self, self.op.names)
2227 self.wanted = locking.ALL_SET
2229 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2230 self.do_locking = self.do_node_query and self.op.use_locking
2232 # if we don't request only static fields, we need to lock the nodes
2233 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2236 def CheckPrereq(self):
2237 """Check prerequisites.
2240 # The validation of the node list is done in the _GetWantedNodes,
2241 # if non empty, and if empty, there's no validation to do
2244 def Exec(self, feedback_fn):
2245 """Computes the list of nodes and their attributes.
2248 all_info = self.cfg.GetAllNodesInfo()
2250 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2251 elif self.wanted != locking.ALL_SET:
2252 nodenames = self.wanted
2253 missing = set(nodenames).difference(all_info.keys())
2255 raise errors.OpExecError(
2256 "Some nodes were removed before retrieving their data: %s" % missing)
2258 nodenames = all_info.keys()
2260 nodenames = utils.NiceSort(nodenames)
2261 nodelist = [all_info[name] for name in nodenames]
2263 # begin data gathering
2265 if self.do_node_query:
2267 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2268 self.cfg.GetHypervisorType())
2269 for name in nodenames:
2270 nodeinfo = node_data[name]
2271 if not nodeinfo.fail_msg and nodeinfo.payload:
2272 nodeinfo = nodeinfo.payload
2273 fn = utils.TryConvert
2275 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2276 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2277 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2278 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2279 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2280 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2281 "bootid": nodeinfo.get('bootid', None),
2282 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2283 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2286 live_data[name] = {}
2288 live_data = dict.fromkeys(nodenames, {})
2290 node_to_primary = dict([(name, set()) for name in nodenames])
2291 node_to_secondary = dict([(name, set()) for name in nodenames])
2293 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2294 "sinst_cnt", "sinst_list"))
2295 if inst_fields & frozenset(self.op.output_fields):
2296 instancelist = self.cfg.GetInstanceList()
2298 for instance_name in instancelist:
2299 inst = self.cfg.GetInstanceInfo(instance_name)
2300 if inst.primary_node in node_to_primary:
2301 node_to_primary[inst.primary_node].add(inst.name)
2302 for secnode in inst.secondary_nodes:
2303 if secnode in node_to_secondary:
2304 node_to_secondary[secnode].add(inst.name)
2306 master_node = self.cfg.GetMasterNode()
2308 # end data gathering
2311 for node in nodelist:
2313 for field in self.op.output_fields:
2316 elif field == "pinst_list":
2317 val = list(node_to_primary[node.name])
2318 elif field == "sinst_list":
2319 val = list(node_to_secondary[node.name])
2320 elif field == "pinst_cnt":
2321 val = len(node_to_primary[node.name])
2322 elif field == "sinst_cnt":
2323 val = len(node_to_secondary[node.name])
2324 elif field == "pip":
2325 val = node.primary_ip
2326 elif field == "sip":
2327 val = node.secondary_ip
2328 elif field == "tags":
2329 val = list(node.GetTags())
2330 elif field == "serial_no":
2331 val = node.serial_no
2332 elif field == "master_candidate":
2333 val = node.master_candidate
2334 elif field == "master":
2335 val = node.name == master_node
2336 elif field == "offline":
2338 elif field == "drained":
2340 elif self._FIELDS_DYNAMIC.Matches(field):
2341 val = live_data[node.name].get(field, None)
2342 elif field == "role":
2343 if node.name == master_node:
2345 elif node.master_candidate:
2354 raise errors.ParameterError(field)
2355 node_output.append(val)
2356 output.append(node_output)
2361 class LUQueryNodeVolumes(NoHooksLU):
2362 """Logical unit for getting volumes on node(s).
2365 _OP_REQP = ["nodes", "output_fields"]
2367 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2368 _FIELDS_STATIC = utils.FieldSet("node")
2370 def ExpandNames(self):
2371 _CheckOutputFields(static=self._FIELDS_STATIC,
2372 dynamic=self._FIELDS_DYNAMIC,
2373 selected=self.op.output_fields)
2375 self.needed_locks = {}
2376 self.share_locks[locking.LEVEL_NODE] = 1
2377 if not self.op.nodes:
2378 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2380 self.needed_locks[locking.LEVEL_NODE] = \
2381 _GetWantedNodes(self, self.op.nodes)
2383 def CheckPrereq(self):
2384 """Check prerequisites.
2386 This checks that the fields required are valid output fields.
2389 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2391 def Exec(self, feedback_fn):
2392 """Computes the list of nodes and their attributes.
2395 nodenames = self.nodes
2396 volumes = self.rpc.call_node_volumes(nodenames)
2398 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2399 in self.cfg.GetInstanceList()]
2401 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2404 for node in nodenames:
2405 nresult = volumes[node]
2408 msg = nresult.fail_msg
2410 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2413 node_vols = nresult.payload[:]
2414 node_vols.sort(key=lambda vol: vol['dev'])
2416 for vol in node_vols:
2418 for field in self.op.output_fields:
2421 elif field == "phys":
2425 elif field == "name":
2427 elif field == "size":
2428 val = int(float(vol['size']))
2429 elif field == "instance":
2431 if node not in lv_by_node[inst]:
2433 if vol['name'] in lv_by_node[inst][node]:
2439 raise errors.ParameterError(field)
2440 node_output.append(str(val))
2442 output.append(node_output)
2447 class LUQueryNodeStorage(NoHooksLU):
2448 """Logical unit for getting information on storage units on node(s).
2451 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2453 _FIELDS_STATIC = utils.FieldSet("node")
2455 def ExpandNames(self):
2456 storage_type = self.op.storage_type
2458 if storage_type not in constants.VALID_STORAGE_FIELDS:
2459 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2461 dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2463 _CheckOutputFields(static=self._FIELDS_STATIC,
2464 dynamic=utils.FieldSet(*dynamic_fields),
2465 selected=self.op.output_fields)
2467 self.needed_locks = {}
2468 self.share_locks[locking.LEVEL_NODE] = 1
2471 self.needed_locks[locking.LEVEL_NODE] = \
2472 _GetWantedNodes(self, self.op.nodes)
2474 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2476 def CheckPrereq(self):
2477 """Check prerequisites.
2479 This checks that the fields required are valid output fields.
2482 self.op.name = getattr(self.op, "name", None)
2484 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2486 def Exec(self, feedback_fn):
2487 """Computes the list of nodes and their attributes.
2490 # Always get name to sort by
2491 if constants.SF_NAME in self.op.output_fields:
2492 fields = self.op.output_fields[:]
2494 fields = [constants.SF_NAME] + self.op.output_fields
2496 # Never ask for node as it's only known to the LU
2497 while "node" in fields:
2498 fields.remove("node")
2500 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2501 name_idx = field_idx[constants.SF_NAME]
2503 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2504 data = self.rpc.call_storage_list(self.nodes,
2505 self.op.storage_type, st_args,
2506 self.op.name, fields)
2510 for node in utils.NiceSort(self.nodes):
2511 nresult = data[node]
2515 msg = nresult.fail_msg
2517 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2520 rows = dict([(row[name_idx], row) for row in nresult.payload])
2522 for name in utils.NiceSort(rows.keys()):
2527 for field in self.op.output_fields:
2530 elif field in field_idx:
2531 val = row[field_idx[field]]
2533 raise errors.ParameterError(field)
2542 class LUModifyNodeStorage(NoHooksLU):
2543 """Logical unit for modifying a storage volume on a node.
2546 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2549 def CheckArguments(self):
2550 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2551 if node_name is None:
2552 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2554 self.op.node_name = node_name
2556 storage_type = self.op.storage_type
2557 if storage_type not in constants.VALID_STORAGE_FIELDS:
2558 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2560 def ExpandNames(self):
2561 self.needed_locks = {
2562 locking.LEVEL_NODE: self.op.node_name,
2565 def CheckPrereq(self):
2566 """Check prerequisites.
2569 storage_type = self.op.storage_type
2572 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2574 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2575 " modified" % storage_type)
2577 diff = set(self.op.changes.keys()) - modifiable
2579 raise errors.OpPrereqError("The following fields can not be modified for"
2580 " storage units of type '%s': %r" %
2581 (storage_type, list(diff)))
2583 def Exec(self, feedback_fn):
2584 """Computes the list of nodes and their attributes.
2587 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2588 result = self.rpc.call_storage_modify(self.op.node_name,
2589 self.op.storage_type, st_args,
2590 self.op.name, self.op.changes)
2591 result.Raise("Failed to modify storage unit '%s' on %s" %
2592 (self.op.name, self.op.node_name))
2595 class LUAddNode(LogicalUnit):
2596 """Logical unit for adding node to the cluster.
2600 HTYPE = constants.HTYPE_NODE
2601 _OP_REQP = ["node_name"]
2603 def BuildHooksEnv(self):
2606 This will run on all nodes before, and on all nodes + the new node after.
2610 "OP_TARGET": self.op.node_name,
2611 "NODE_NAME": self.op.node_name,
2612 "NODE_PIP": self.op.primary_ip,
2613 "NODE_SIP": self.op.secondary_ip,
2615 nodes_0 = self.cfg.GetNodeList()
2616 nodes_1 = nodes_0 + [self.op.node_name, ]
2617 return env, nodes_0, nodes_1
2619 def CheckPrereq(self):
2620 """Check prerequisites.
2623 - the new node is not already in the config
2625 - its parameters (single/dual homed) matches the cluster
2627 Any errors are signaled by raising errors.OpPrereqError.
2630 node_name = self.op.node_name
2633 dns_data = utils.HostInfo(node_name)
2635 node = dns_data.name
2636 primary_ip = self.op.primary_ip = dns_data.ip
2637 secondary_ip = getattr(self.op, "secondary_ip", None)
2638 if secondary_ip is None:
2639 secondary_ip = primary_ip
2640 if not utils.IsValidIP(secondary_ip):
2641 raise errors.OpPrereqError("Invalid secondary IP given")
2642 self.op.secondary_ip = secondary_ip
2644 node_list = cfg.GetNodeList()
2645 if not self.op.readd and node in node_list:
2646 raise errors.OpPrereqError("Node %s is already in the configuration" %
2648 elif self.op.readd and node not in node_list:
2649 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2651 for existing_node_name in node_list:
2652 existing_node = cfg.GetNodeInfo(existing_node_name)
2654 if self.op.readd and node == existing_node_name:
2655 if (existing_node.primary_ip != primary_ip or
2656 existing_node.secondary_ip != secondary_ip):
2657 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2658 " address configuration as before")
2661 if (existing_node.primary_ip == primary_ip or
2662 existing_node.secondary_ip == primary_ip or
2663 existing_node.primary_ip == secondary_ip or
2664 existing_node.secondary_ip == secondary_ip):
2665 raise errors.OpPrereqError("New node ip address(es) conflict with"
2666 " existing node %s" % existing_node.name)
2668 # check that the type of the node (single versus dual homed) is the
2669 # same as for the master
2670 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2671 master_singlehomed = myself.secondary_ip == myself.primary_ip
2672 newbie_singlehomed = secondary_ip == primary_ip
2673 if master_singlehomed != newbie_singlehomed:
2674 if master_singlehomed:
2675 raise errors.OpPrereqError("The master has no private ip but the"
2676 " new node has one")
2678 raise errors.OpPrereqError("The master has a private ip but the"
2679 " new node doesn't have one")
2681 # checks reachability
2682 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2683 raise errors.OpPrereqError("Node not reachable by ping")
2685 if not newbie_singlehomed:
2686 # check reachability from my secondary ip to newbie's secondary ip
2687 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2688 source=myself.secondary_ip):
2689 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2690 " based ping to noded port")
2692 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2697 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2698 # the new node will increase mc_max with one, so:
2699 mc_max = min(mc_max + 1, cp_size)
2700 self.master_candidate = mc_now < mc_max
2703 self.new_node = self.cfg.GetNodeInfo(node)
2704 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2706 self.new_node = objects.Node(name=node,
2707 primary_ip=primary_ip,
2708 secondary_ip=secondary_ip,
2709 master_candidate=self.master_candidate,
2710 offline=False, drained=False)
2712 def Exec(self, feedback_fn):
2713 """Adds the new node to the cluster.
2716 new_node = self.new_node
2717 node = new_node.name
2719 # for re-adds, reset the offline/drained/master-candidate flags;
2720 # we need to reset here, otherwise offline would prevent RPC calls
2721 # later in the procedure; this also means that if the re-add
2722 # fails, we are left with a non-offlined, broken node
2724 new_node.drained = new_node.offline = False
2725 self.LogInfo("Readding a node, the offline/drained flags were reset")
2726 # if we demote the node, we do cleanup later in the procedure
2727 new_node.master_candidate = self.master_candidate
2729 # notify the user about any possible mc promotion
2730 if new_node.master_candidate:
2731 self.LogInfo("Node will be a master candidate")
2733 # check connectivity
2734 result = self.rpc.call_version([node])[node]
2735 result.Raise("Can't get version information from node %s" % node)
2736 if constants.PROTOCOL_VERSION == result.payload:
2737 logging.info("Communication to node %s fine, sw version %s match",
2738 node, result.payload)
2740 raise errors.OpExecError("Version mismatch master version %s,"
2741 " node version %s" %
2742 (constants.PROTOCOL_VERSION, result.payload))
2745 logging.info("Copy ssh key to node %s", node)
2746 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2748 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2749 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2755 keyarray.append(f.read())
2759 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2761 keyarray[3], keyarray[4], keyarray[5])
2762 result.Raise("Cannot transfer ssh keys to the new node")
2764 # Add node to our /etc/hosts, and add key to known_hosts
2765 if self.cfg.GetClusterInfo().modify_etc_hosts:
2766 utils.AddHostToEtcHosts(new_node.name)
2768 if new_node.secondary_ip != new_node.primary_ip:
2769 result = self.rpc.call_node_has_ip_address(new_node.name,
2770 new_node.secondary_ip)
2771 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2773 if not result.payload:
2774 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2775 " you gave (%s). Please fix and re-run this"
2776 " command." % new_node.secondary_ip)
2778 node_verify_list = [self.cfg.GetMasterNode()]
2779 node_verify_param = {
2781 # TODO: do a node-net-test as well?
2784 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2785 self.cfg.GetClusterName())
2786 for verifier in node_verify_list:
2787 result[verifier].Raise("Cannot communicate with node %s" % verifier)
2788 nl_payload = result[verifier].payload['nodelist']
2790 for failed in nl_payload:
2791 feedback_fn("ssh/hostname verification failed %s -> %s" %
2792 (verifier, nl_payload[failed]))
2793 raise errors.OpExecError("ssh/hostname verification failed.")
2796 _RedistributeAncillaryFiles(self)
2797 self.context.ReaddNode(new_node)
2798 # make sure we redistribute the config
2799 self.cfg.Update(new_node)
2800 # and make sure the new node will not have old files around
2801 if not new_node.master_candidate:
2802 result = self.rpc.call_node_demote_from_mc(new_node.name)
2803 msg = result.RemoteFailMsg()
2805 self.LogWarning("Node failed to demote itself from master"
2806 " candidate status: %s" % msg)
2808 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2809 self.context.AddNode(new_node)
2812 class LUSetNodeParams(LogicalUnit):
2813 """Modifies the parameters of a node.
2816 HPATH = "node-modify"
2817 HTYPE = constants.HTYPE_NODE
2818 _OP_REQP = ["node_name"]
2821 def CheckArguments(self):
2822 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2823 if node_name is None:
2824 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2825 self.op.node_name = node_name
2826 _CheckBooleanOpField(self.op, 'master_candidate')
2827 _CheckBooleanOpField(self.op, 'offline')
2828 _CheckBooleanOpField(self.op, 'drained')
2829 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2830 if all_mods.count(None) == 3:
2831 raise errors.OpPrereqError("Please pass at least one modification")
2832 if all_mods.count(True) > 1:
2833 raise errors.OpPrereqError("Can't set the node into more than one"
2834 " state at the same time")
2836 def ExpandNames(self):
2837 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2839 def BuildHooksEnv(self):
2842 This runs on the master node.
2846 "OP_TARGET": self.op.node_name,
2847 "MASTER_CANDIDATE": str(self.op.master_candidate),
2848 "OFFLINE": str(self.op.offline),
2849 "DRAINED": str(self.op.drained),
2851 nl = [self.cfg.GetMasterNode(),
2855 def CheckPrereq(self):
2856 """Check prerequisites.
2858 This only checks the instance list against the existing names.
2861 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2863 if ((self.op.master_candidate == False or self.op.offline == True or
2864 self.op.drained == True) and node.master_candidate):
2865 # we will demote the node from master_candidate
2866 if self.op.node_name == self.cfg.GetMasterNode():
2867 raise errors.OpPrereqError("The master node has to be a"
2868 " master candidate, online and not drained")
2869 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2870 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2871 if num_candidates <= cp_size:
2872 msg = ("Not enough master candidates (desired"
2873 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2875 self.LogWarning(msg)
2877 raise errors.OpPrereqError(msg)
2879 if (self.op.master_candidate == True and
2880 ((node.offline and not self.op.offline == False) or
2881 (node.drained and not self.op.drained == False))):
2882 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2883 " to master_candidate" % node.name)
2887 def Exec(self, feedback_fn):
2896 if self.op.offline is not None:
2897 node.offline = self.op.offline
2898 result.append(("offline", str(self.op.offline)))
2899 if self.op.offline == True:
2900 if node.master_candidate:
2901 node.master_candidate = False
2903 result.append(("master_candidate", "auto-demotion due to offline"))
2905 node.drained = False
2906 result.append(("drained", "clear drained status due to offline"))
2908 if self.op.master_candidate is not None:
2909 node.master_candidate = self.op.master_candidate
2911 result.append(("master_candidate", str(self.op.master_candidate)))
2912 if self.op.master_candidate == False:
2913 rrc = self.rpc.call_node_demote_from_mc(node.name)
2916 self.LogWarning("Node failed to demote itself: %s" % msg)
2918 if self.op.drained is not None:
2919 node.drained = self.op.drained
2920 result.append(("drained", str(self.op.drained)))
2921 if self.op.drained == True:
2922 if node.master_candidate:
2923 node.master_candidate = False
2925 result.append(("master_candidate", "auto-demotion due to drain"))
2926 rrc = self.rpc.call_node_demote_from_mc(node.name)
2927 msg = rrc.RemoteFailMsg()
2929 self.LogWarning("Node failed to demote itself: %s" % msg)
2931 node.offline = False
2932 result.append(("offline", "clear offline status due to drain"))
2934 # this will trigger configuration file update, if needed
2935 self.cfg.Update(node)
2936 # this will trigger job queue propagation or cleanup
2938 self.context.ReaddNode(node)
2943 class LUPowercycleNode(NoHooksLU):
2944 """Powercycles a node.
2947 _OP_REQP = ["node_name", "force"]
2950 def CheckArguments(self):
2951 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2952 if node_name is None:
2953 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2954 self.op.node_name = node_name
2955 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2956 raise errors.OpPrereqError("The node is the master and the force"
2957 " parameter was not set")
2959 def ExpandNames(self):
2960 """Locking for PowercycleNode.
2962 This is a last-resort option and shouldn't block on other
2963 jobs. Therefore, we grab no locks.
2966 self.needed_locks = {}
2968 def CheckPrereq(self):
2969 """Check prerequisites.
2971 This LU has no prereqs.
2976 def Exec(self, feedback_fn):
2980 result = self.rpc.call_node_powercycle(self.op.node_name,
2981 self.cfg.GetHypervisorType())
2982 result.Raise("Failed to schedule the reboot")
2983 return result.payload
2986 class LUQueryClusterInfo(NoHooksLU):
2987 """Query cluster configuration.
2993 def ExpandNames(self):
2994 self.needed_locks = {}
2996 def CheckPrereq(self):
2997 """No prerequsites needed for this LU.
3002 def Exec(self, feedback_fn):
3003 """Return cluster config.
3006 cluster = self.cfg.GetClusterInfo()
3008 "software_version": constants.RELEASE_VERSION,
3009 "protocol_version": constants.PROTOCOL_VERSION,
3010 "config_version": constants.CONFIG_VERSION,
3011 "os_api_version": max(constants.OS_API_VERSIONS),
3012 "export_version": constants.EXPORT_VERSION,
3013 "architecture": (platform.architecture()[0], platform.machine()),
3014 "name": cluster.cluster_name,
3015 "master": cluster.master_node,
3016 "default_hypervisor": cluster.enabled_hypervisors[0],
3017 "enabled_hypervisors": cluster.enabled_hypervisors,
3018 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3019 for hypervisor_name in cluster.enabled_hypervisors]),
3020 "beparams": cluster.beparams,
3021 "nicparams": cluster.nicparams,
3022 "candidate_pool_size": cluster.candidate_pool_size,
3023 "master_netdev": cluster.master_netdev,
3024 "volume_group_name": cluster.volume_group_name,
3025 "file_storage_dir": cluster.file_storage_dir,
3031 class LUQueryConfigValues(NoHooksLU):
3032 """Return configuration values.
3037 _FIELDS_DYNAMIC = utils.FieldSet()
3038 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
3040 def ExpandNames(self):
3041 self.needed_locks = {}
3043 _CheckOutputFields(static=self._FIELDS_STATIC,
3044 dynamic=self._FIELDS_DYNAMIC,
3045 selected=self.op.output_fields)
3047 def CheckPrereq(self):
3048 """No prerequisites.
3053 def Exec(self, feedback_fn):
3054 """Dump a representation of the cluster config to the standard output.
3058 for field in self.op.output_fields:
3059 if field == "cluster_name":
3060 entry = self.cfg.GetClusterName()
3061 elif field == "master_node":
3062 entry = self.cfg.GetMasterNode()
3063 elif field == "drain_flag":
3064 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3066 raise errors.ParameterError(field)
3067 values.append(entry)
3071 class LUActivateInstanceDisks(NoHooksLU):
3072 """Bring up an instance's disks.
3075 _OP_REQP = ["instance_name"]
3078 def ExpandNames(self):
3079 self._ExpandAndLockInstance()
3080 self.needed_locks[locking.LEVEL_NODE] = []
3081 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3083 def DeclareLocks(self, level):
3084 if level == locking.LEVEL_NODE:
3085 self._LockInstancesNodes()
3087 def CheckPrereq(self):
3088 """Check prerequisites.
3090 This checks that the instance is in the cluster.
3093 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3094 assert self.instance is not None, \
3095 "Cannot retrieve locked instance %s" % self.op.instance_name
3096 _CheckNodeOnline(self, self.instance.primary_node)
3097 if not hasattr(self.op, "ignore_size"):
3098 self.op.ignore_size = False
3100 def Exec(self, feedback_fn):
3101 """Activate the disks.
3104 disks_ok, disks_info = \
3105 _AssembleInstanceDisks(self, self.instance,
3106 ignore_size=self.op.ignore_size)
3108 raise errors.OpExecError("Cannot activate block devices")
3113 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3115 """Prepare the block devices for an instance.
3117 This sets up the block devices on all nodes.
3119 @type lu: L{LogicalUnit}
3120 @param lu: the logical unit on whose behalf we execute
3121 @type instance: L{objects.Instance}
3122 @param instance: the instance for whose disks we assemble
3123 @type ignore_secondaries: boolean
3124 @param ignore_secondaries: if true, errors on secondary nodes
3125 won't result in an error return from the function
3126 @type ignore_size: boolean
3127 @param ignore_size: if true, the current known size of the disk
3128 will not be used during the disk activation, useful for cases
3129 when the size is wrong
3130 @return: False if the operation failed, otherwise a list of
3131 (host, instance_visible_name, node_visible_name)
3132 with the mapping from node devices to instance devices
3137 iname = instance.name
3138 # With the two passes mechanism we try to reduce the window of
3139 # opportunity for the race condition of switching DRBD to primary
3140 # before handshaking occured, but we do not eliminate it
3142 # The proper fix would be to wait (with some limits) until the
3143 # connection has been made and drbd transitions from WFConnection
3144 # into any other network-connected state (Connected, SyncTarget,
3147 # 1st pass, assemble on all nodes in secondary mode
3148 for inst_disk in instance.disks:
3149 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3151 node_disk = node_disk.Copy()
3152 node_disk.UnsetSize()
3153 lu.cfg.SetDiskID(node_disk, node)
3154 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3155 msg = result.fail_msg
3157 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3158 " (is_primary=False, pass=1): %s",
3159 inst_disk.iv_name, node, msg)
3160 if not ignore_secondaries:
3163 # FIXME: race condition on drbd migration to primary
3165 # 2nd pass, do only the primary node
3166 for inst_disk in instance.disks:
3167 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3168 if node != instance.primary_node:
3171 node_disk = node_disk.Copy()
3172 node_disk.UnsetSize()
3173 lu.cfg.SetDiskID(node_disk, node)
3174 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3175 msg = result.fail_msg
3177 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3178 " (is_primary=True, pass=2): %s",
3179 inst_disk.iv_name, node, msg)
3181 device_info.append((instance.primary_node, inst_disk.iv_name,
3184 # leave the disks configured for the primary node
3185 # this is a workaround that would be fixed better by
3186 # improving the logical/physical id handling
3187 for disk in instance.disks:
3188 lu.cfg.SetDiskID(disk, instance.primary_node)
3190 return disks_ok, device_info
3193 def _StartInstanceDisks(lu, instance, force):
3194 """Start the disks of an instance.
3197 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3198 ignore_secondaries=force)
3200 _ShutdownInstanceDisks(lu, instance)
3201 if force is not None and not force:
3202 lu.proc.LogWarning("", hint="If the message above refers to a"
3204 " you can retry the operation using '--force'.")
3205 raise errors.OpExecError("Disk consistency error")
3208 class LUDeactivateInstanceDisks(NoHooksLU):
3209 """Shutdown an instance's disks.
3212 _OP_REQP = ["instance_name"]
3215 def ExpandNames(self):
3216 self._ExpandAndLockInstance()
3217 self.needed_locks[locking.LEVEL_NODE] = []
3218 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3220 def DeclareLocks(self, level):
3221 if level == locking.LEVEL_NODE:
3222 self._LockInstancesNodes()
3224 def CheckPrereq(self):
3225 """Check prerequisites.
3227 This checks that the instance is in the cluster.
3230 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3231 assert self.instance is not None, \
3232 "Cannot retrieve locked instance %s" % self.op.instance_name
3234 def Exec(self, feedback_fn):
3235 """Deactivate the disks
3238 instance = self.instance
3239 _SafeShutdownInstanceDisks(self, instance)
3242 def _SafeShutdownInstanceDisks(lu, instance):
3243 """Shutdown block devices of an instance.
3245 This function checks if an instance is running, before calling
3246 _ShutdownInstanceDisks.
3249 pnode = instance.primary_node
3250 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3251 ins_l.Raise("Can't contact node %s" % pnode)
3253 if instance.name in ins_l.payload:
3254 raise errors.OpExecError("Instance is running, can't shutdown"
3257 _ShutdownInstanceDisks(lu, instance)
3260 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3261 """Shutdown block devices of an instance.
3263 This does the shutdown on all nodes of the instance.
3265 If the ignore_primary is false, errors on the primary node are
3270 for disk in instance.disks:
3271 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3272 lu.cfg.SetDiskID(top_disk, node)
3273 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3274 msg = result.fail_msg
3276 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3277 disk.iv_name, node, msg)
3278 if not ignore_primary or node != instance.primary_node:
3283 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3284 """Checks if a node has enough free memory.
3286 This function check if a given node has the needed amount of free
3287 memory. In case the node has less memory or we cannot get the
3288 information from the node, this function raise an OpPrereqError
3291 @type lu: C{LogicalUnit}
3292 @param lu: a logical unit from which we get configuration data
3294 @param node: the node to check
3295 @type reason: C{str}
3296 @param reason: string to use in the error message
3297 @type requested: C{int}
3298 @param requested: the amount of memory in MiB to check for
3299 @type hypervisor_name: C{str}
3300 @param hypervisor_name: the hypervisor to ask for memory stats
3301 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3302 we cannot check the node
3305 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3306 nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3307 free_mem = nodeinfo[node].payload.get('memory_free', None)
3308 if not isinstance(free_mem, int):
3309 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3310 " was '%s'" % (node, free_mem))
3311 if requested > free_mem:
3312 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3313 " needed %s MiB, available %s MiB" %
3314 (node, reason, requested, free_mem))
3317 class LUStartupInstance(LogicalUnit):
3318 """Starts an instance.
3321 HPATH = "instance-start"
3322 HTYPE = constants.HTYPE_INSTANCE
3323 _OP_REQP = ["instance_name", "force"]
3326 def ExpandNames(self):
3327 self._ExpandAndLockInstance()
3329 def BuildHooksEnv(self):
3332 This runs on master, primary and secondary nodes of the instance.
3336 "FORCE": self.op.force,
3338 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3339 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3342 def CheckPrereq(self):
3343 """Check prerequisites.
3345 This checks that the instance is in the cluster.
3348 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3349 assert self.instance is not None, \
3350 "Cannot retrieve locked instance %s" % self.op.instance_name
3353 self.beparams = getattr(self.op, "beparams", {})
3355 if not isinstance(self.beparams, dict):
3356 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3357 " dict" % (type(self.beparams), ))
3358 # fill the beparams dict
3359 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3360 self.op.beparams = self.beparams
3363 self.hvparams = getattr(self.op, "hvparams", {})
3365 if not isinstance(self.hvparams, dict):
3366 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3367 " dict" % (type(self.hvparams), ))
3369 # check hypervisor parameter syntax (locally)
3370 cluster = self.cfg.GetClusterInfo()
3371 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3372 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3374 filled_hvp.update(self.hvparams)
3375 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3376 hv_type.CheckParameterSyntax(filled_hvp)
3377 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3378 self.op.hvparams = self.hvparams
3380 _CheckNodeOnline(self, instance.primary_node)
3382 bep = self.cfg.GetClusterInfo().FillBE(instance)
3383 # check bridges existence
3384 _CheckInstanceBridgesExist(self, instance)
3386 remote_info = self.rpc.call_instance_info(instance.primary_node,
3388 instance.hypervisor)
3389 remote_info.Raise("Error checking node %s" % instance.primary_node,
3391 if not remote_info.payload: # not running already
3392 _CheckNodeFreeMemory(self, instance.primary_node,
3393 "starting instance %s" % instance.name,
3394 bep[constants.BE_MEMORY], instance.hypervisor)
3396 def Exec(self, feedback_fn):
3397 """Start the instance.
3400 instance = self.instance
3401 force = self.op.force
3403 self.cfg.MarkInstanceUp(instance.name)
3405 node_current = instance.primary_node
3407 _StartInstanceDisks(self, instance, force)
3409 result = self.rpc.call_instance_start(node_current, instance,
3410 self.hvparams, self.beparams)
3411 msg = result.fail_msg
3413 _ShutdownInstanceDisks(self, instance)
3414 raise errors.OpExecError("Could not start instance: %s" % msg)
3417 class LURebootInstance(LogicalUnit):
3418 """Reboot an instance.
3421 HPATH = "instance-reboot"
3422 HTYPE = constants.HTYPE_INSTANCE
3423 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3426 def ExpandNames(self):
3427 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3428 constants.INSTANCE_REBOOT_HARD,
3429 constants.INSTANCE_REBOOT_FULL]:
3430 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3431 (constants.INSTANCE_REBOOT_SOFT,
3432 constants.INSTANCE_REBOOT_HARD,
3433 constants.INSTANCE_REBOOT_FULL))
3434 self._ExpandAndLockInstance()
3436 def BuildHooksEnv(self):
3439 This runs on master, primary and secondary nodes of the instance.
3443 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3444 "REBOOT_TYPE": self.op.reboot_type,
3446 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3447 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3450 def CheckPrereq(self):
3451 """Check prerequisites.
3453 This checks that the instance is in the cluster.
3456 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3457 assert self.instance is not None, \
3458 "Cannot retrieve locked instance %s" % self.op.instance_name
3460 _CheckNodeOnline(self, instance.primary_node)
3462 # check bridges existence
3463 _CheckInstanceBridgesExist(self, instance)
3465 def Exec(self, feedback_fn):
3466 """Reboot the instance.
3469 instance = self.instance
3470 ignore_secondaries = self.op.ignore_secondaries
3471 reboot_type = self.op.reboot_type
3473 node_current = instance.primary_node
3475 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3476 constants.INSTANCE_REBOOT_HARD]:
3477 for disk in instance.disks:
3478 self.cfg.SetDiskID(disk, node_current)
3479 result = self.rpc.call_instance_reboot(node_current, instance,
3481 result.Raise("Could not reboot instance")
3483 result = self.rpc.call_instance_shutdown(node_current, instance)
3484 result.Raise("Could not shutdown instance for full reboot")
3485 _ShutdownInstanceDisks(self, instance)
3486 _StartInstanceDisks(self, instance, ignore_secondaries)
3487 result = self.rpc.call_instance_start(node_current, instance, None, None)
3488 msg = result.fail_msg
3490 _ShutdownInstanceDisks(self, instance)
3491 raise errors.OpExecError("Could not start instance for"
3492 " full reboot: %s" % msg)
3494 self.cfg.MarkInstanceUp(instance.name)
3497 class LUShutdownInstance(LogicalUnit):
3498 """Shutdown an instance.
3501 HPATH = "instance-stop"
3502 HTYPE = constants.HTYPE_INSTANCE
3503 _OP_REQP = ["instance_name"]
3506 def ExpandNames(self):
3507 self._ExpandAndLockInstance()
3509 def BuildHooksEnv(self):
3512 This runs on master, primary and secondary nodes of the instance.
3515 env = _BuildInstanceHookEnvByObject(self, self.instance)
3516 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3519 def CheckPrereq(self):
3520 """Check prerequisites.
3522 This checks that the instance is in the cluster.
3525 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3526 assert self.instance is not None, \
3527 "Cannot retrieve locked instance %s" % self.op.instance_name
3528 _CheckNodeOnline(self, self.instance.primary_node)
3530 def Exec(self, feedback_fn):
3531 """Shutdown the instance.
3534 instance = self.instance
3535 node_current = instance.primary_node
3536 self.cfg.MarkInstanceDown(instance.name)
3537 result = self.rpc.call_instance_shutdown(node_current, instance)
3538 msg = result.fail_msg
3540 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3542 _ShutdownInstanceDisks(self, instance)
3545 class LUReinstallInstance(LogicalUnit):
3546 """Reinstall an instance.
3549 HPATH = "instance-reinstall"
3550 HTYPE = constants.HTYPE_INSTANCE
3551 _OP_REQP = ["instance_name"]
3554 def ExpandNames(self):
3555 self._ExpandAndLockInstance()
3557 def BuildHooksEnv(self):
3560 This runs on master, primary and secondary nodes of the instance.
3563 env = _BuildInstanceHookEnvByObject(self, self.instance)
3564 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3567 def CheckPrereq(self):
3568 """Check prerequisites.
3570 This checks that the instance is in the cluster and is not running.
3573 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3574 assert instance is not None, \
3575 "Cannot retrieve locked instance %s" % self.op.instance_name
3576 _CheckNodeOnline(self, instance.primary_node)
3578 if instance.disk_template == constants.DT_DISKLESS:
3579 raise errors.OpPrereqError("Instance '%s' has no disks" %
3580 self.op.instance_name)
3581 if instance.admin_up:
3582 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3583 self.op.instance_name)
3584 remote_info = self.rpc.call_instance_info(instance.primary_node,
3586 instance.hypervisor)
3587 remote_info.Raise("Error checking node %s" % instance.primary_node,
3589 if remote_info.payload:
3590 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3591 (self.op.instance_name,
3592 instance.primary_node))
3594 self.op.os_type = getattr(self.op, "os_type", None)
3595 if self.op.os_type is not None:
3597 pnode = self.cfg.GetNodeInfo(
3598 self.cfg.ExpandNodeName(instance.primary_node))
3600 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3602 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3603 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3604 (self.op.os_type, pnode.name), prereq=True)
3606 self.instance = instance
3608 def Exec(self, feedback_fn):
3609 """Reinstall the instance.
3612 inst = self.instance
3614 if self.op.os_type is not None:
3615 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3616 inst.os = self.op.os_type
3617 self.cfg.Update(inst)
3619 _StartInstanceDisks(self, inst, None)
3621 feedback_fn("Running the instance OS create scripts...")
3622 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3623 result.Raise("Could not install OS for instance %s on node %s" %
3624 (inst.name, inst.primary_node))
3626 _ShutdownInstanceDisks(self, inst)
3629 class LURenameInstance(LogicalUnit):
3630 """Rename an instance.
3633 HPATH = "instance-rename"
3634 HTYPE = constants.HTYPE_INSTANCE
3635 _OP_REQP = ["instance_name", "new_name"]
3637 def BuildHooksEnv(self):
3640 This runs on master, primary and secondary nodes of the instance.
3643 env = _BuildInstanceHookEnvByObject(self, self.instance)
3644 env["INSTANCE_NEW_NAME"] = self.op.new_name
3645 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3648 def CheckPrereq(self):
3649 """Check prerequisites.
3651 This checks that the instance is in the cluster and is not running.
3654 instance = self.cfg.GetInstanceInfo(
3655 self.cfg.ExpandInstanceName(self.op.instance_name))
3656 if instance is None:
3657 raise errors.OpPrereqError("Instance '%s' not known" %
3658 self.op.instance_name)
3659 _CheckNodeOnline(self, instance.primary_node)
3661 if instance.admin_up:
3662 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3663 self.op.instance_name)
3664 remote_info = self.rpc.call_instance_info(instance.primary_node,
3666 instance.hypervisor)
3667 remote_info.Raise("Error checking node %s" % instance.primary_node,
3669 if remote_info.payload:
3670 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3671 (self.op.instance_name,
3672 instance.primary_node))
3673 self.instance = instance
3675 # new name verification
3676 name_info = utils.HostInfo(self.op.new_name)
3678 self.op.new_name = new_name = name_info.name
3679 instance_list = self.cfg.GetInstanceList()
3680 if new_name in instance_list:
3681 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3684 if not getattr(self.op, "ignore_ip", False):
3685 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3686 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3687 (name_info.ip, new_name))
3690 def Exec(self, feedback_fn):
3691 """Reinstall the instance.
3694 inst = self.instance
3695 old_name = inst.name
3697 if inst.disk_template == constants.DT_FILE:
3698 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3700 self.cfg.RenameInstance(inst.name, self.op.new_name)
3701 # Change the instance lock. This is definitely safe while we hold the BGL
3702 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3703 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3705 # re-read the instance from the configuration after rename
3706 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3708 if inst.disk_template == constants.DT_FILE:
3709 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3710 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3711 old_file_storage_dir,
3712 new_file_storage_dir)
3713 result.Raise("Could not rename on node %s directory '%s' to '%s'"
3714 " (but the instance has been renamed in Ganeti)" %
3715 (inst.primary_node, old_file_storage_dir,
3716 new_file_storage_dir))
3718 _StartInstanceDisks(self, inst, None)
3720 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3722 msg = result.fail_msg
3724 msg = ("Could not run OS rename script for instance %s on node %s"
3725 " (but the instance has been renamed in Ganeti): %s" %
3726 (inst.name, inst.primary_node, msg))
3727 self.proc.LogWarning(msg)
3729 _ShutdownInstanceDisks(self, inst)
3732 class LURemoveInstance(LogicalUnit):
3733 """Remove an instance.
3736 HPATH = "instance-remove"
3737 HTYPE = constants.HTYPE_INSTANCE
3738 _OP_REQP = ["instance_name", "ignore_failures"]
3741 def ExpandNames(self):
3742 self._ExpandAndLockInstance()
3743 self.needed_locks[locking.LEVEL_NODE] = []
3744 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3746 def DeclareLocks(self, level):
3747 if level == locking.LEVEL_NODE:
3748 self._LockInstancesNodes()
3750 def BuildHooksEnv(self):
3753 This runs on master, primary and secondary nodes of the instance.
3756 env = _BuildInstanceHookEnvByObject(self, self.instance)
3757 nl = [self.cfg.GetMasterNode()]
3760 def CheckPrereq(self):
3761 """Check prerequisites.
3763 This checks that the instance is in the cluster.
3766 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3767 assert self.instance is not None, \
3768 "Cannot retrieve locked instance %s" % self.op.instance_name
3770 def Exec(self, feedback_fn):
3771 """Remove the instance.
3774 instance = self.instance
3775 logging.info("Shutting down instance %s on node %s",
3776 instance.name, instance.primary_node)
3778 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3779 msg = result.fail_msg
3781 if self.op.ignore_failures:
3782 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3784 raise errors.OpExecError("Could not shutdown instance %s on"
3786 (instance.name, instance.primary_node, msg))
3788 logging.info("Removing block devices for instance %s", instance.name)
3790 if not _RemoveDisks(self, instance):
3791 if self.op.ignore_failures:
3792 feedback_fn("Warning: can't remove instance's disks")
3794 raise errors.OpExecError("Can't remove instance's disks")
3796 logging.info("Removing instance %s out of cluster config", instance.name)
3798 self.cfg.RemoveInstance(instance.name)
3799 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3802 class LUQueryInstances(NoHooksLU):
3803 """Logical unit for querying instances.
3806 _OP_REQP = ["output_fields", "names", "use_locking"]
3808 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3810 "disk_template", "ip", "mac", "bridge",
3811 "nic_mode", "nic_link",
3812 "sda_size", "sdb_size", "vcpus", "tags",
3813 "network_port", "beparams",
3814 r"(disk)\.(size)/([0-9]+)",
3815 r"(disk)\.(sizes)", "disk_usage",
3816 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3817 r"(nic)\.(bridge)/([0-9]+)",
3818 r"(nic)\.(macs|ips|modes|links|bridges)",
3819 r"(disk|nic)\.(count)",
3820 "serial_no", "hypervisor", "hvparams",] +
3822 for name in constants.HVS_PARAMETERS] +
3824 for name in constants.BES_PARAMETERS])
3825 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3828 def ExpandNames(self):
3829 _CheckOutputFields(static=self._FIELDS_STATIC,
3830 dynamic=self._FIELDS_DYNAMIC,
3831 selected=self.op.output_fields)
3833 self.needed_locks = {}
3834 self.share_locks[locking.LEVEL_INSTANCE] = 1
3835 self.share_locks[locking.LEVEL_NODE] = 1
3838 self.wanted = _GetWantedInstances(self, self.op.names)
3840 self.wanted = locking.ALL_SET
3842 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3843 self.do_locking = self.do_node_query and self.op.use_locking
3845 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3846 self.needed_locks[locking.LEVEL_NODE] = []
3847 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3849 def DeclareLocks(self, level):
3850 if level == locking.LEVEL_NODE and self.do_locking:
3851 self._LockInstancesNodes()
3853 def CheckPrereq(self):
3854 """Check prerequisites.
3859 def Exec(self, feedback_fn):
3860 """Computes the list of nodes and their attributes.
3863 all_info = self.cfg.GetAllInstancesInfo()
3864 if self.wanted == locking.ALL_SET:
3865 # caller didn't specify instance names, so ordering is not important
3867 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3869 instance_names = all_info.keys()
3870 instance_names = utils.NiceSort(instance_names)
3872 # caller did specify names, so we must keep the ordering
3874 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3876 tgt_set = all_info.keys()
3877 missing = set(self.wanted).difference(tgt_set)
3879 raise errors.OpExecError("Some instances were removed before"
3880 " retrieving their data: %s" % missing)
3881 instance_names = self.wanted
3883 instance_list = [all_info[iname] for iname in instance_names]
3885 # begin data gathering
3887 nodes = frozenset([inst.primary_node for inst in instance_list])
3888 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3892 if self.do_node_query:
3894 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3896 result = node_data[name]
3898 # offline nodes will be in both lists
3899 off_nodes.append(name)
3900 if result.failed or result.fail_msg:
3901 bad_nodes.append(name)
3904 live_data.update(result.payload)
3905 # else no instance is alive
3907 live_data = dict([(name, {}) for name in instance_names])
3909 # end data gathering
3914 cluster = self.cfg.GetClusterInfo()
3915 for instance in instance_list:
3917 i_hv = cluster.FillHV(instance)
3918 i_be = cluster.FillBE(instance)
3919 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3920 nic.nicparams) for nic in instance.nics]
3921 for field in self.op.output_fields:
3922 st_match = self._FIELDS_STATIC.Matches(field)
3927 elif field == "pnode":
3928 val = instance.primary_node
3929 elif field == "snodes":
3930 val = list(instance.secondary_nodes)
3931 elif field == "admin_state":
3932 val = instance.admin_up
3933 elif field == "oper_state":
3934 if instance.primary_node in bad_nodes:
3937 val = bool(live_data.get(instance.name))
3938 elif field == "status":
3939 if instance.primary_node in off_nodes:
3940 val = "ERROR_nodeoffline"
3941 elif instance.primary_node in bad_nodes:
3942 val = "ERROR_nodedown"
3944 running = bool(live_data.get(instance.name))
3946 if instance.admin_up:
3951 if instance.admin_up:
3955 elif field == "oper_ram":
3956 if instance.primary_node in bad_nodes:
3958 elif instance.name in live_data:
3959 val = live_data[instance.name].get("memory", "?")
3962 elif field == "vcpus":
3963 val = i_be[constants.BE_VCPUS]
3964 elif field == "disk_template":
3965 val = instance.disk_template
3968 val = instance.nics[0].ip
3971 elif field == "nic_mode":
3973 val = i_nicp[0][constants.NIC_MODE]
3976 elif field == "nic_link":
3978 val = i_nicp[0][constants.NIC_LINK]
3981 elif field == "bridge":
3982 if (instance.nics and
3983 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3984 val = i_nicp[0][constants.NIC_LINK]
3987 elif field == "mac":
3989 val = instance.nics[0].mac
3992 elif field == "sda_size" or field == "sdb_size":
3993 idx = ord(field[2]) - ord('a')
3995 val = instance.FindDisk(idx).size
3996 except errors.OpPrereqError:
3998 elif field == "disk_usage": # total disk usage per node
3999 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4000 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4001 elif field == "tags":
4002 val = list(instance.GetTags())
4003 elif field == "serial_no":
4004 val = instance.serial_no
4005 elif field == "network_port":
4006 val = instance.network_port
4007 elif field == "hypervisor":
4008 val = instance.hypervisor
4009 elif field == "hvparams":
4011 elif (field.startswith(HVPREFIX) and
4012 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
4013 val = i_hv.get(field[len(HVPREFIX):], None)
4014 elif field == "beparams":
4016 elif (field.startswith(BEPREFIX) and
4017 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4018 val = i_be.get(field[len(BEPREFIX):], None)
4019 elif st_match and st_match.groups():
4020 # matches a variable list
4021 st_groups = st_match.groups()
4022 if st_groups and st_groups[0] == "disk":
4023 if st_groups[1] == "count":
4024 val = len(instance.disks)
4025 elif st_groups[1] == "sizes":
4026 val = [disk.size for disk in instance.disks]
4027 elif st_groups[1] == "size":
4029 val = instance.FindDisk(st_groups[2]).size
4030 except errors.OpPrereqError:
4033 assert False, "Unhandled disk parameter"
4034 elif st_groups[0] == "nic":
4035 if st_groups[1] == "count":
4036 val = len(instance.nics)
4037 elif st_groups[1] == "macs":
4038 val = [nic.mac for nic in instance.nics]
4039 elif st_groups[1] == "ips":
4040 val = [nic.ip for nic in instance.nics]
4041 elif st_groups[1] == "modes":
4042 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4043 elif st_groups[1] == "links":
4044 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4045 elif st_groups[1] == "bridges":
4048 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4049 val.append(nicp[constants.NIC_LINK])
4054 nic_idx = int(st_groups[2])
4055 if nic_idx >= len(instance.nics):
4058 if st_groups[1] == "mac":
4059 val = instance.nics[nic_idx].mac
4060 elif st_groups[1] == "ip":
4061 val = instance.nics[nic_idx].ip
4062 elif st_groups[1] == "mode":
4063 val = i_nicp[nic_idx][constants.NIC_MODE]
4064 elif st_groups[1] == "link":
4065 val = i_nicp[nic_idx][constants.NIC_LINK]
4066 elif st_groups[1] == "bridge":
4067 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4068 if nic_mode == constants.NIC_MODE_BRIDGED:
4069 val = i_nicp[nic_idx][constants.NIC_LINK]
4073 assert False, "Unhandled NIC parameter"
4075 assert False, ("Declared but unhandled variable parameter '%s'" %
4078 assert False, "Declared but unhandled parameter '%s'" % field
4085 class LUFailoverInstance(LogicalUnit):
4086 """Failover an instance.
4089 HPATH = "instance-failover"
4090 HTYPE = constants.HTYPE_INSTANCE
4091 _OP_REQP = ["instance_name", "ignore_consistency"]
4094 def ExpandNames(self):
4095 self._ExpandAndLockInstance()
4096 self.needed_locks[locking.LEVEL_NODE] = []
4097 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4099 def DeclareLocks(self, level):
4100 if level == locking.LEVEL_NODE:
4101 self._LockInstancesNodes()
4103 def BuildHooksEnv(self):
4106 This runs on master, primary and secondary nodes of the instance.
4110 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4112 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4113 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
4116 def CheckPrereq(self):
4117 """Check prerequisites.
4119 This checks that the instance is in the cluster.
4122 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4123 assert self.instance is not None, \
4124 "Cannot retrieve locked instance %s" % self.op.instance_name
4126 bep = self.cfg.GetClusterInfo().FillBE(instance)
4127 if instance.disk_template not in constants.DTS_NET_MIRROR:
4128 raise errors.OpPrereqError("Instance's disk layout is not"
4129 " network mirrored, cannot failover.")
4131 secondary_nodes = instance.secondary_nodes
4132 if not secondary_nodes:
4133 raise errors.ProgrammerError("no secondary node but using "
4134 "a mirrored disk template")
4136 target_node = secondary_nodes[0]
4137 _CheckNodeOnline(self, target_node)
4138 _CheckNodeNotDrained(self, target_node)
4139 if instance.admin_up:
4140 # check memory requirements on the secondary node
4141 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4142 instance.name, bep[constants.BE_MEMORY],
4143 instance.hypervisor)
4145 self.LogInfo("Not checking memory on the secondary node as"
4146 " instance will not be started")
4148 # check bridge existance
4149 _CheckInstanceBridgesExist(self, instance, node=target_node)
4151 def Exec(self, feedback_fn):
4152 """Failover an instance.
4154 The failover is done by shutting it down on its present node and
4155 starting it on the secondary.
4158 instance = self.instance
4160 source_node = instance.primary_node
4161 target_node = instance.secondary_nodes[0]
4163 feedback_fn("* checking disk consistency between source and target")
4164 for dev in instance.disks:
4165 # for drbd, these are drbd over lvm
4166 if not _CheckDiskConsistency(self, dev, target_node, False):
4167 if instance.admin_up and not self.op.ignore_consistency:
4168 raise errors.OpExecError("Disk %s is degraded on target node,"
4169 " aborting failover." % dev.iv_name)
4171 feedback_fn("* shutting down instance on source node")
4172 logging.info("Shutting down instance %s on node %s",
4173 instance.name, source_node)
4175 result = self.rpc.call_instance_shutdown(source_node, instance)
4176 msg = result.fail_msg
4178 if self.op.ignore_consistency:
4179 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4180 " Proceeding anyway. Please make sure node"
4181 " %s is down. Error details: %s",
4182 instance.name, source_node, source_node, msg)
4184 raise errors.OpExecError("Could not shutdown instance %s on"
4186 (instance.name, source_node, msg))
4188 feedback_fn("* deactivating the instance's disks on source node")
4189 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4190 raise errors.OpExecError("Can't shut down the instance's disks.")
4192 instance.primary_node = target_node
4193 # distribute new instance config to the other nodes
4194 self.cfg.Update(instance)
4196 # Only start the instance if it's marked as up
4197 if instance.admin_up:
4198 feedback_fn("* activating the instance's disks on target node")
4199 logging.info("Starting instance %s on node %s",
4200 instance.name, target_node)
4202 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4203 ignore_secondaries=True)
4205 _ShutdownInstanceDisks(self, instance)
4206 raise errors.OpExecError("Can't activate the instance's disks")
4208 feedback_fn("* starting the instance on the target node")
4209 result = self.rpc.call_instance_start(target_node, instance, None, None)
4210 msg = result.fail_msg
4212 _ShutdownInstanceDisks(self, instance)
4213 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4214 (instance.name, target_node, msg))
4217 class LUMigrateInstance(LogicalUnit):
4218 """Migrate an instance.
4220 This is migration without shutting down, compared to the failover,
4221 which is done with shutdown.
4224 HPATH = "instance-migrate"
4225 HTYPE = constants.HTYPE_INSTANCE
4226 _OP_REQP = ["instance_name", "live", "cleanup"]
4230 def ExpandNames(self):
4231 self._ExpandAndLockInstance()
4233 self.needed_locks[locking.LEVEL_NODE] = []
4234 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4236 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4237 self.op.live, self.op.cleanup)
4238 self.tasklets = [self._migrater]
4240 def DeclareLocks(self, level):
4241 if level == locking.LEVEL_NODE:
4242 self._LockInstancesNodes()
4244 def BuildHooksEnv(self):
4247 This runs on master, primary and secondary nodes of the instance.
4250 instance = self._migrater.instance
4251 env = _BuildInstanceHookEnvByObject(self, instance)
4252 env["MIGRATE_LIVE"] = self.op.live
4253 env["MIGRATE_CLEANUP"] = self.op.cleanup
4254 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4258 class LUMigrateNode(LogicalUnit):
4259 """Migrate all instances from a node.
4262 HPATH = "node-migrate"
4263 HTYPE = constants.HTYPE_NODE
4264 _OP_REQP = ["node_name", "live"]
4267 def ExpandNames(self):
4268 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4269 if self.op.node_name is None:
4270 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4272 self.needed_locks = {
4273 locking.LEVEL_NODE: [self.op.node_name],
4276 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4278 # Create tasklets for migrating instances for all instances on this node
4282 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4283 logging.debug("Migrating instance %s", inst.name)
4284 names.append(inst.name)
4286 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4288 self.tasklets = tasklets
4290 # Declare instance locks
4291 self.needed_locks[locking.LEVEL_INSTANCE] = names
4293 def DeclareLocks(self, level):
4294 if level == locking.LEVEL_NODE:
4295 self._LockInstancesNodes()
4297 def BuildHooksEnv(self):
4300 This runs on the master, the primary and all the secondaries.
4304 "NODE_NAME": self.op.node_name,
4307 nl = [self.cfg.GetMasterNode()]
4309 return (env, nl, nl)
4312 class TLMigrateInstance(Tasklet):
4313 def __init__(self, lu, instance_name, live, cleanup):
4314 """Initializes this class.
4317 Tasklet.__init__(self, lu)
4320 self.instance_name = instance_name
4322 self.cleanup = cleanup
4324 def CheckPrereq(self):
4325 """Check prerequisites.
4327 This checks that the instance is in the cluster.
4330 instance = self.cfg.GetInstanceInfo(
4331 self.cfg.ExpandInstanceName(self.instance_name))
4332 if instance is None:
4333 raise errors.OpPrereqError("Instance '%s' not known" %
4336 if instance.disk_template != constants.DT_DRBD8:
4337 raise errors.OpPrereqError("Instance's disk layout is not"
4338 " drbd8, cannot migrate.")
4340 secondary_nodes = instance.secondary_nodes
4341 if not secondary_nodes:
4342 raise errors.ConfigurationError("No secondary node but using"
4343 " drbd8 disk template")
4345 i_be = self.cfg.GetClusterInfo().FillBE(instance)
4347 target_node = secondary_nodes[0]
4348 # check memory requirements on the secondary node
4349 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4350 instance.name, i_be[constants.BE_MEMORY],
4351 instance.hypervisor)
4353 # check bridge existance
4354 _CheckInstanceBridgesExist(self, instance, node=target_node)
4356 if not self.cleanup:
4357 _CheckNodeNotDrained(self, target_node)
4358 result = self.rpc.call_instance_migratable(instance.primary_node,
4360 result.Raise("Can't migrate, please use failover", prereq=True)
4362 self.instance = instance
4364 def _WaitUntilSync(self):
4365 """Poll with custom rpc for disk sync.
4367 This uses our own step-based rpc call.
4370 self.feedback_fn("* wait until resync is done")
4374 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4376 self.instance.disks)
4378 for node, nres in result.items():
4379 nres.Raise("Cannot resync disks on node %s" % node)
4380 node_done, node_percent = nres.payload
4381 all_done = all_done and node_done
4382 if node_percent is not None:
4383 min_percent = min(min_percent, node_percent)
4385 if min_percent < 100:
4386 self.feedback_fn(" - progress: %.1f%%" % min_percent)
4389 def _EnsureSecondary(self, node):
4390 """Demote a node to secondary.
4393 self.feedback_fn("* switching node %s to secondary mode" % node)
4395 for dev in self.instance.disks:
4396 self.cfg.SetDiskID(dev, node)
4398 result = self.rpc.call_blockdev_close(node, self.instance.name,
4399 self.instance.disks)
4400 result.Raise("Cannot change disk to secondary on node %s" % node)
4402 def _GoStandalone(self):
4403 """Disconnect from the network.
4406 self.feedback_fn("* changing into standalone mode")
4407 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4408 self.instance.disks)
4409 for node, nres in result.items():
4410 nres.Raise("Cannot disconnect disks node %s" % node)
4412 def _GoReconnect(self, multimaster):
4413 """Reconnect to the network.
4419 msg = "single-master"
4420 self.feedback_fn("* changing disks into %s mode" % msg)
4421 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4422 self.instance.disks,
4423 self.instance.name, multimaster)
4424 for node, nres in result.items():
4425 nres.Raise("Cannot change disks config on node %s" % node)
4427 def _ExecCleanup(self):
4428 """Try to cleanup after a failed migration.
4430 The cleanup is done by:
4431 - check that the instance is running only on one node
4432 (and update the config if needed)
4433 - change disks on its secondary node to secondary
4434 - wait until disks are fully synchronized
4435 - disconnect from the network
4436 - change disks into single-master mode
4437 - wait again until disks are fully synchronized
4440 instance = self.instance
4441 target_node = self.target_node
4442 source_node = self.source_node
4444 # check running on only one node
4445 self.feedback_fn("* checking where the instance actually runs"
4446 " (if this hangs, the hypervisor might be in"
4448 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4449 for node, result in ins_l.items():
4450 result.Raise("Can't contact node %s" % node)
4452 runningon_source = instance.name in ins_l[source_node].payload
4453 runningon_target = instance.name in ins_l[target_node].payload
4455 if runningon_source and runningon_target:
4456 raise errors.OpExecError("Instance seems to be running on two nodes,"
4457 " or the hypervisor is confused. You will have"
4458 " to ensure manually that it runs only on one"
4459 " and restart this operation.")
4461 if not (runningon_source or runningon_target):
4462 raise errors.OpExecError("Instance does not seem to be running at all."
4463 " In this case, it's safer to repair by"
4464 " running 'gnt-instance stop' to ensure disk"
4465 " shutdown, and then restarting it.")
4467 if runningon_target:
4468 # the migration has actually succeeded, we need to update the config
4469 self.feedback_fn("* instance running on secondary node (%s),"
4470 " updating config" % target_node)
4471 instance.primary_node = target_node
4472 self.cfg.Update(instance)
4473 demoted_node = source_node
4475 self.feedback_fn("* instance confirmed to be running on its"
4476 " primary node (%s)" % source_node)
4477 demoted_node = target_node
4479 self._EnsureSecondary(demoted_node)
4481 self._WaitUntilSync()
4482 except errors.OpExecError:
4483 # we ignore here errors, since if the device is standalone, it
4484 # won't be able to sync
4486 self._GoStandalone()
4487 self._GoReconnect(False)
4488 self._WaitUntilSync()
4490 self.feedback_fn("* done")
4492 def _RevertDiskStatus(self):
4493 """Try to revert the disk status after a failed migration.
4496 target_node = self.target_node
4498 self._EnsureSecondary(target_node)
4499 self._GoStandalone()
4500 self._GoReconnect(False)
4501 self._WaitUntilSync()
4502 except errors.OpExecError, err:
4503 self.lu.LogWarning("Migration failed and I can't reconnect the"
4504 " drives: error '%s'\n"
4505 "Please look and recover the instance status" %
4508 def _AbortMigration(self):
4509 """Call the hypervisor code to abort a started migration.
4512 instance = self.instance
4513 target_node = self.target_node
4514 migration_info = self.migration_info
4516 abort_result = self.rpc.call_finalize_migration(target_node,
4520 abort_msg = abort_result.fail_msg
4522 logging.error("Aborting migration failed on target node %s: %s" %
4523 (target_node, abort_msg))
4524 # Don't raise an exception here, as we stil have to try to revert the
4525 # disk status, even if this step failed.
4527 def _ExecMigration(self):
4528 """Migrate an instance.
4530 The migrate is done by:
4531 - change the disks into dual-master mode
4532 - wait until disks are fully synchronized again
4533 - migrate the instance
4534 - change disks on the new secondary node (the old primary) to secondary
4535 - wait until disks are fully synchronized
4536 - change disks into single-master mode
4539 instance = self.instance
4540 target_node = self.target_node
4541 source_node = self.source_node
4543 self.feedback_fn("* checking disk consistency between source and target")
4544 for dev in instance.disks:
4545 if not _CheckDiskConsistency(self, dev, target_node, False):
4546 raise errors.OpExecError("Disk %s is degraded or not fully"
4547 " synchronized on target node,"
4548 " aborting migrate." % dev.iv_name)
4550 # First get the migration information from the remote node
4551 result = self.rpc.call_migration_info(source_node, instance)
4552 msg = result.fail_msg
4554 log_err = ("Failed fetching source migration information from %s: %s" %
4556 logging.error(log_err)
4557 raise errors.OpExecError(log_err)
4559 self.migration_info = migration_info = result.payload
4561 # Then switch the disks to master/master mode
4562 self._EnsureSecondary(target_node)
4563 self._GoStandalone()
4564 self._GoReconnect(True)
4565 self._WaitUntilSync()
4567 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4568 result = self.rpc.call_accept_instance(target_node,
4571 self.nodes_ip[target_node])
4573 msg = result.fail_msg
4575 logging.error("Instance pre-migration failed, trying to revert"
4576 " disk status: %s", msg)
4577 self._AbortMigration()
4578 self._RevertDiskStatus()
4579 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4580 (instance.name, msg))
4582 self.feedback_fn("* migrating instance to %s" % target_node)
4584 result = self.rpc.call_instance_migrate(source_node, instance,
4585 self.nodes_ip[target_node],
4587 msg = result.fail_msg
4589 logging.error("Instance migration failed, trying to revert"
4590 " disk status: %s", msg)
4591 self._AbortMigration()
4592 self._RevertDiskStatus()
4593 raise errors.OpExecError("Could not migrate instance %s: %s" %
4594 (instance.name, msg))
4597 instance.primary_node = target_node
4598 # distribute new instance config to the other nodes
4599 self.cfg.Update(instance)
4601 result = self.rpc.call_finalize_migration(target_node,
4605 msg = result.fail_msg
4607 logging.error("Instance migration succeeded, but finalization failed:"
4609 raise errors.OpExecError("Could not finalize instance migration: %s" %
4612 self._EnsureSecondary(source_node)
4613 self._WaitUntilSync()
4614 self._GoStandalone()
4615 self._GoReconnect(False)
4616 self._WaitUntilSync()
4618 self.feedback_fn("* done")
4620 def Exec(self, feedback_fn):
4621 """Perform the migration.
4624 feedback_fn("Migrating instance %s" % self.instance.name)
4626 self.feedback_fn = feedback_fn
4628 self.source_node = self.instance.primary_node
4629 self.target_node = self.instance.secondary_nodes[0]
4630 self.all_nodes = [self.source_node, self.target_node]
4632 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4633 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4637 return self._ExecCleanup()
4639 return self._ExecMigration()
4642 def _CreateBlockDev(lu, node, instance, device, force_create,
4644 """Create a tree of block devices on a given node.
4646 If this device type has to be created on secondaries, create it and
4649 If not, just recurse to children keeping the same 'force' value.
4651 @param lu: the lu on whose behalf we execute
4652 @param node: the node on which to create the device
4653 @type instance: L{objects.Instance}
4654 @param instance: the instance which owns the device
4655 @type device: L{objects.Disk}
4656 @param device: the device to create
4657 @type force_create: boolean
4658 @param force_create: whether to force creation of this device; this
4659 will be change to True whenever we find a device which has
4660 CreateOnSecondary() attribute
4661 @param info: the extra 'metadata' we should attach to the device
4662 (this will be represented as a LVM tag)
4663 @type force_open: boolean
4664 @param force_open: this parameter will be passes to the
4665 L{backend.BlockdevCreate} function where it specifies
4666 whether we run on primary or not, and it affects both
4667 the child assembly and the device own Open() execution
4670 if device.CreateOnSecondary():
4674 for child in device.children:
4675 _CreateBlockDev(lu, node, instance, child, force_create,
4678 if not force_create:
4681 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4684 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4685 """Create a single block device on a given node.
4687 This will not recurse over children of the device, so they must be
4690 @param lu: the lu on whose behalf we execute
4691 @param node: the node on which to create the device
4692 @type instance: L{objects.Instance}
4693 @param instance: the instance which owns the device
4694 @type device: L{objects.Disk}
4695 @param device: the device to create
4696 @param info: the extra 'metadata' we should attach to the device
4697 (this will be represented as a LVM tag)
4698 @type force_open: boolean
4699 @param force_open: this parameter will be passes to the
4700 L{backend.BlockdevCreate} function where it specifies
4701 whether we run on primary or not, and it affects both
4702 the child assembly and the device own Open() execution
4705 lu.cfg.SetDiskID(device, node)
4706 result = lu.rpc.call_blockdev_create(node, device, device.size,
4707 instance.name, force_open, info)
4708 result.Raise("Can't create block device %s on"
4709 " node %s for instance %s" % (device, node, instance.name))
4710 if device.physical_id is None:
4711 device.physical_id = result.payload
4714 def _GenerateUniqueNames(lu, exts):
4715 """Generate a suitable LV name.
4717 This will generate a logical volume name for the given instance.
4722 new_id = lu.cfg.GenerateUniqueID()
4723 results.append("%s%s" % (new_id, val))
4727 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4729 """Generate a drbd8 device complete with its children.
4732 port = lu.cfg.AllocatePort()
4733 vgname = lu.cfg.GetVGName()
4734 shared_secret = lu.cfg.GenerateDRBDSecret()
4735 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4736 logical_id=(vgname, names[0]))
4737 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4738 logical_id=(vgname, names[1]))
4739 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4740 logical_id=(primary, secondary, port,
4743 children=[dev_data, dev_meta],
4748 def _GenerateDiskTemplate(lu, template_name,
4749 instance_name, primary_node,
4750 secondary_nodes, disk_info,
4751 file_storage_dir, file_driver,
4753 """Generate the entire disk layout for a given template type.
4756 #TODO: compute space requirements
4758 vgname = lu.cfg.GetVGName()
4759 disk_count = len(disk_info)
4761 if template_name == constants.DT_DISKLESS:
4763 elif template_name == constants.DT_PLAIN:
4764 if len(secondary_nodes) != 0:
4765 raise errors.ProgrammerError("Wrong template configuration")
4767 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4768 for i in range(disk_count)])
4769 for idx, disk in enumerate(disk_info):
4770 disk_index = idx + base_index
4771 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4772 logical_id=(vgname, names[idx]),
4773 iv_name="disk/%d" % disk_index,
4775 disks.append(disk_dev)
4776 elif template_name == constants.DT_DRBD8:
4777 if len(secondary_nodes) != 1:
4778 raise errors.ProgrammerError("Wrong template configuration")
4779 remote_node = secondary_nodes[0]
4780 minors = lu.cfg.AllocateDRBDMinor(
4781 [primary_node, remote_node] * len(disk_info), instance_name)
4784 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4785 for i in range(disk_count)]):
4786 names.append(lv_prefix + "_data")
4787 names.append(lv_prefix + "_meta")
4788 for idx, disk in enumerate(disk_info):
4789 disk_index = idx + base_index
4790 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4791 disk["size"], names[idx*2:idx*2+2],
4792 "disk/%d" % disk_index,
4793 minors[idx*2], minors[idx*2+1])
4794 disk_dev.mode = disk["mode"]
4795 disks.append(disk_dev)
4796 elif template_name == constants.DT_FILE:
4797 if len(secondary_nodes) != 0:
4798 raise errors.ProgrammerError("Wrong template configuration")
4800 for idx, disk in enumerate(disk_info):
4801 disk_index = idx + base_index
4802 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4803 iv_name="disk/%d" % disk_index,
4804 logical_id=(file_driver,
4805 "%s/disk%d" % (file_storage_dir,
4808 disks.append(disk_dev)
4810 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4814 def _GetInstanceInfoText(instance):
4815 """Compute that text that should be added to the disk's metadata.
4818 return "originstname+%s" % instance.name
4821 def _CreateDisks(lu, instance):
4822 """Create all disks for an instance.
4824 This abstracts away some work from AddInstance.
4826 @type lu: L{LogicalUnit}
4827 @param lu: the logical unit on whose behalf we execute
4828 @type instance: L{objects.Instance}
4829 @param instance: the instance whose disks we should create
4831 @return: the success of the creation
4834 info = _GetInstanceInfoText(instance)
4835 pnode = instance.primary_node
4837 if instance.disk_template == constants.DT_FILE:
4838 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4839 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4841 result.Raise("Failed to create directory '%s' on"
4842 " node %s: %s" % (file_storage_dir, pnode))
4844 # Note: this needs to be kept in sync with adding of disks in
4845 # LUSetInstanceParams
4846 for device in instance.disks:
4847 logging.info("Creating volume %s for instance %s",
4848 device.iv_name, instance.name)
4850 for node in instance.all_nodes:
4851 f_create = node == pnode
4852 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4855 def _RemoveDisks(lu, instance):
4856 """Remove all disks for an instance.
4858 This abstracts away some work from `AddInstance()` and
4859 `RemoveInstance()`. Note that in case some of the devices couldn't
4860 be removed, the removal will continue with the other ones (compare
4861 with `_CreateDisks()`).
4863 @type lu: L{LogicalUnit}
4864 @param lu: the logical unit on whose behalf we execute
4865 @type instance: L{objects.Instance}
4866 @param instance: the instance whose disks we should remove
4868 @return: the success of the removal
4871 logging.info("Removing block devices for instance %s", instance.name)
4874 for device in instance.disks:
4875 for node, disk in device.ComputeNodeTree(instance.primary_node):
4876 lu.cfg.SetDiskID(disk, node)
4877 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4879 lu.LogWarning("Could not remove block device %s on node %s,"
4880 " continuing anyway: %s", device.iv_name, node, msg)
4883 if instance.disk_template == constants.DT_FILE:
4884 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4885 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4887 msg = result.fail_msg
4889 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4890 file_storage_dir, instance.primary_node, msg)
4896 def _ComputeDiskSize(disk_template, disks):
4897 """Compute disk size requirements in the volume group
4900 # Required free disk space as a function of disk and swap space
4902 constants.DT_DISKLESS: None,
4903 constants.DT_PLAIN: sum(d["size"] for d in disks),
4904 # 128 MB are added for drbd metadata for each disk
4905 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4906 constants.DT_FILE: None,
4909 if disk_template not in req_size_dict:
4910 raise errors.ProgrammerError("Disk template '%s' size requirement"
4911 " is unknown" % disk_template)
4913 return req_size_dict[disk_template]
4916 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4917 """Hypervisor parameter validation.
4919 This function abstract the hypervisor parameter validation to be
4920 used in both instance create and instance modify.
4922 @type lu: L{LogicalUnit}
4923 @param lu: the logical unit for which we check
4924 @type nodenames: list
4925 @param nodenames: the list of nodes on which we should check
4926 @type hvname: string
4927 @param hvname: the name of the hypervisor we should use
4928 @type hvparams: dict
4929 @param hvparams: the parameters which we need to check
4930 @raise errors.OpPrereqError: if the parameters are not valid
4933 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4936 for node in nodenames:
4940 info.Raise("Hypervisor parameter validation failed on node %s" % node)
4943 class LUCreateInstance(LogicalUnit):
4944 """Create an instance.
4947 HPATH = "instance-add"
4948 HTYPE = constants.HTYPE_INSTANCE
4949 _OP_REQP = ["instance_name", "disks", "disk_template",
4951 "wait_for_sync", "ip_check", "nics",
4952 "hvparams", "beparams"]
4955 def _ExpandNode(self, node):
4956 """Expands and checks one node name.
4959 node_full = self.cfg.ExpandNodeName(node)
4960 if node_full is None:
4961 raise errors.OpPrereqError("Unknown node %s" % node)
4964 def ExpandNames(self):
4965 """ExpandNames for CreateInstance.
4967 Figure out the right locks for instance creation.
4970 self.needed_locks = {}
4972 # set optional parameters to none if they don't exist
4973 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4974 if not hasattr(self.op, attr):
4975 setattr(self.op, attr, None)
4977 # cheap checks, mostly valid constants given
4979 # verify creation mode
4980 if self.op.mode not in (constants.INSTANCE_CREATE,
4981 constants.INSTANCE_IMPORT):
4982 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4985 # disk template and mirror node verification
4986 if self.op.disk_template not in constants.DISK_TEMPLATES:
4987 raise errors.OpPrereqError("Invalid disk template name")
4989 if self.op.hypervisor is None:
4990 self.op.hypervisor = self.cfg.GetHypervisorType()
4992 cluster = self.cfg.GetClusterInfo()
4993 enabled_hvs = cluster.enabled_hypervisors
4994 if self.op.hypervisor not in enabled_hvs:
4995 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4996 " cluster (%s)" % (self.op.hypervisor,
4997 ",".join(enabled_hvs)))
4999 # check hypervisor parameter syntax (locally)
5000 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5001 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5003 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5004 hv_type.CheckParameterSyntax(filled_hvp)
5005 self.hv_full = filled_hvp
5007 # fill and remember the beparams dict
5008 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5009 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5012 #### instance parameters check
5014 # instance name verification
5015 hostname1 = utils.HostInfo(self.op.instance_name)
5016 self.op.instance_name = instance_name = hostname1.name
5018 # this is just a preventive check, but someone might still add this
5019 # instance in the meantime, and creation will fail at lock-add time
5020 if instance_name in self.cfg.GetInstanceList():
5021 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5024 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5028 for idx, nic in enumerate(self.op.nics):
5029 nic_mode_req = nic.get("mode", None)
5030 nic_mode = nic_mode_req
5031 if nic_mode is None:
5032 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5034 # in routed mode, for the first nic, the default ip is 'auto'
5035 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5036 default_ip_mode = constants.VALUE_AUTO
5038 default_ip_mode = constants.VALUE_NONE
5040 # ip validity checks
5041 ip = nic.get("ip", default_ip_mode)
5042 if ip is None or ip.lower() == constants.VALUE_NONE:
5044 elif ip.lower() == constants.VALUE_AUTO:
5045 nic_ip = hostname1.ip
5047 if not utils.IsValidIP(ip):
5048 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5049 " like a valid IP" % ip)
5052 # TODO: check the ip for uniqueness !!
5053 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5054 raise errors.OpPrereqError("Routed nic mode requires an ip address")
5056 # MAC address verification
5057 mac = nic.get("mac", constants.VALUE_AUTO)
5058 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5059 if not utils.IsValidMac(mac.lower()):
5060 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
5062 # bridge verification
5063 bridge = nic.get("bridge", None)
5064 link = nic.get("link", None)
5066 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5067 " at the same time")
5068 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5069 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
5075 nicparams[constants.NIC_MODE] = nic_mode_req
5077 nicparams[constants.NIC_LINK] = link
5079 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5081 objects.NIC.CheckParameterSyntax(check_params)
5082 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5084 # disk checks/pre-build
5086 for disk in self.op.disks:
5087 mode = disk.get("mode", constants.DISK_RDWR)
5088 if mode not in constants.DISK_ACCESS_SET:
5089 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5091 size = disk.get("size", None)
5093 raise errors.OpPrereqError("Missing disk size")
5097 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
5098 self.disks.append({"size": size, "mode": mode})
5100 # used in CheckPrereq for ip ping check
5101 self.check_ip = hostname1.ip
5103 # file storage checks
5104 if (self.op.file_driver and
5105 not self.op.file_driver in constants.FILE_DRIVER):
5106 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5107 self.op.file_driver)
5109 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5110 raise errors.OpPrereqError("File storage directory path not absolute")
5112 ### Node/iallocator related checks
5113 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5114 raise errors.OpPrereqError("One and only one of iallocator and primary"
5115 " node must be given")
5117 if self.op.iallocator:
5118 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5120 self.op.pnode = self._ExpandNode(self.op.pnode)
5121 nodelist = [self.op.pnode]
5122 if self.op.snode is not None:
5123 self.op.snode = self._ExpandNode(self.op.snode)
5124 nodelist.append(self.op.snode)
5125 self.needed_locks[locking.LEVEL_NODE] = nodelist
5127 # in case of import lock the source node too
5128 if self.op.mode == constants.INSTANCE_IMPORT:
5129 src_node = getattr(self.op, "src_node", None)
5130 src_path = getattr(self.op, "src_path", None)
5132 if src_path is None:
5133 self.op.src_path = src_path = self.op.instance_name
5135 if src_node is None:
5136 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5137 self.op.src_node = None
5138 if os.path.isabs(src_path):
5139 raise errors.OpPrereqError("Importing an instance from an absolute"
5140 " path requires a source node option.")
5142 self.op.src_node = src_node = self._ExpandNode(src_node)
5143 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5144 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5145 if not os.path.isabs(src_path):
5146 self.op.src_path = src_path = \
5147 os.path.join(constants.EXPORT_DIR, src_path)
5149 else: # INSTANCE_CREATE
5150 if getattr(self.op, "os_type", None) is None:
5151 raise errors.OpPrereqError("No guest OS specified")
5153 def _RunAllocator(self):
5154 """Run the allocator based on input opcode.
5157 nics = [n.ToDict() for n in self.nics]
5158 ial = IAllocator(self.cfg, self.rpc,
5159 mode=constants.IALLOCATOR_MODE_ALLOC,
5160 name=self.op.instance_name,
5161 disk_template=self.op.disk_template,
5164 vcpus=self.be_full[constants.BE_VCPUS],
5165 mem_size=self.be_full[constants.BE_MEMORY],
5168 hypervisor=self.op.hypervisor,
5171 ial.Run(self.op.iallocator)
5174 raise errors.OpPrereqError("Can't compute nodes using"
5175 " iallocator '%s': %s" % (self.op.iallocator,
5177 if len(ial.nodes) != ial.required_nodes:
5178 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5179 " of nodes (%s), required %s" %
5180 (self.op.iallocator, len(ial.nodes),
5181 ial.required_nodes))
5182 self.op.pnode = ial.nodes[0]
5183 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5184 self.op.instance_name, self.op.iallocator,
5185 ", ".join(ial.nodes))
5186 if ial.required_nodes == 2:
5187 self.op.snode = ial.nodes[1]
5189 def BuildHooksEnv(self):
5192 This runs on master, primary and secondary nodes of the instance.
5196 "ADD_MODE": self.op.mode,
5198 if self.op.mode == constants.INSTANCE_IMPORT:
5199 env["SRC_NODE"] = self.op.src_node
5200 env["SRC_PATH"] = self.op.src_path
5201 env["SRC_IMAGES"] = self.src_images
5203 env.update(_BuildInstanceHookEnv(
5204 name=self.op.instance_name,
5205 primary_node=self.op.pnode,
5206 secondary_nodes=self.secondaries,
5207 status=self.op.start,
5208 os_type=self.op.os_type,
5209 memory=self.be_full[constants.BE_MEMORY],
5210 vcpus=self.be_full[constants.BE_VCPUS],
5211 nics=_NICListToTuple(self, self.nics),
5212 disk_template=self.op.disk_template,
5213 disks=[(d["size"], d["mode"]) for d in self.disks],
5216 hypervisor_name=self.op.hypervisor,
5219 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5224 def CheckPrereq(self):
5225 """Check prerequisites.
5228 if (not self.cfg.GetVGName() and
5229 self.op.disk_template not in constants.DTS_NOT_LVM):
5230 raise errors.OpPrereqError("Cluster does not support lvm-based"
5233 if self.op.mode == constants.INSTANCE_IMPORT:
5234 src_node = self.op.src_node
5235 src_path = self.op.src_path
5237 if src_node is None:
5238 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5239 exp_list = self.rpc.call_export_list(locked_nodes)
5241 for node in exp_list:
5242 if exp_list[node].fail_msg:
5244 if src_path in exp_list[node].payload:
5246 self.op.src_node = src_node = node
5247 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5251 raise errors.OpPrereqError("No export found for relative path %s" %
5254 _CheckNodeOnline(self, src_node)
5255 result = self.rpc.call_export_info(src_node, src_path)
5256 result.Raise("No export or invalid export found in dir %s" % src_path)
5258 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5259 if not export_info.has_section(constants.INISECT_EXP):
5260 raise errors.ProgrammerError("Corrupted export config")
5262 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5263 if (int(ei_version) != constants.EXPORT_VERSION):
5264 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5265 (ei_version, constants.EXPORT_VERSION))
5267 # Check that the new instance doesn't have less disks than the export
5268 instance_disks = len(self.disks)
5269 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5270 if instance_disks < export_disks:
5271 raise errors.OpPrereqError("Not enough disks to import."
5272 " (instance: %d, export: %d)" %
5273 (instance_disks, export_disks))
5275 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5277 for idx in range(export_disks):
5278 option = 'disk%d_dump' % idx
5279 if export_info.has_option(constants.INISECT_INS, option):
5280 # FIXME: are the old os-es, disk sizes, etc. useful?
5281 export_name = export_info.get(constants.INISECT_INS, option)
5282 image = os.path.join(src_path, export_name)
5283 disk_images.append(image)
5285 disk_images.append(False)
5287 self.src_images = disk_images
5289 old_name = export_info.get(constants.INISECT_INS, 'name')
5290 # FIXME: int() here could throw a ValueError on broken exports
5291 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5292 if self.op.instance_name == old_name:
5293 for idx, nic in enumerate(self.nics):
5294 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5295 nic_mac_ini = 'nic%d_mac' % idx
5296 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5298 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5299 # ip ping checks (we use the same ip that was resolved in ExpandNames)
5300 if self.op.start and not self.op.ip_check:
5301 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5302 " adding an instance in start mode")
5304 if self.op.ip_check:
5305 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5306 raise errors.OpPrereqError("IP %s of instance %s already in use" %
5307 (self.check_ip, self.op.instance_name))
5309 #### mac address generation
5310 # By generating here the mac address both the allocator and the hooks get
5311 # the real final mac address rather than the 'auto' or 'generate' value.
5312 # There is a race condition between the generation and the instance object
5313 # creation, which means that we know the mac is valid now, but we're not
5314 # sure it will be when we actually add the instance. If things go bad
5315 # adding the instance will abort because of a duplicate mac, and the
5316 # creation job will fail.
5317 for nic in self.nics:
5318 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5319 nic.mac = self.cfg.GenerateMAC()
5323 if self.op.iallocator is not None:
5324 self._RunAllocator()
5326 #### node related checks
5328 # check primary node
5329 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5330 assert self.pnode is not None, \
5331 "Cannot retrieve locked node %s" % self.op.pnode
5333 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5336 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5339 self.secondaries = []
5341 # mirror node verification
5342 if self.op.disk_template in constants.DTS_NET_MIRROR:
5343 if self.op.snode is None:
5344 raise errors.OpPrereqError("The networked disk templates need"
5346 if self.op.snode == pnode.name:
5347 raise errors.OpPrereqError("The secondary node cannot be"
5348 " the primary node.")
5349 _CheckNodeOnline(self, self.op.snode)
5350 _CheckNodeNotDrained(self, self.op.snode)
5351 self.secondaries.append(self.op.snode)
5353 nodenames = [pnode.name] + self.secondaries
5355 req_size = _ComputeDiskSize(self.op.disk_template,
5358 # Check lv size requirements
5359 if req_size is not None:
5360 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5362 for node in nodenames:
5363 info = nodeinfo[node]
5364 info.Raise("Cannot get current information from node %s" % node)
5366 vg_free = info.get('vg_free', None)
5367 if not isinstance(vg_free, int):
5368 raise errors.OpPrereqError("Can't compute free disk space on"
5370 if req_size > vg_free:
5371 raise errors.OpPrereqError("Not enough disk space on target node %s."
5372 " %d MB available, %d MB required" %
5373 (node, vg_free, req_size))
5375 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5378 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5379 result.Raise("OS '%s' not in supported os list for primary node %s" %
5380 (self.op.os_type, pnode.name), prereq=True)
5382 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5384 # memory check on primary node
5386 _CheckNodeFreeMemory(self, self.pnode.name,
5387 "creating instance %s" % self.op.instance_name,
5388 self.be_full[constants.BE_MEMORY],
5391 self.dry_run_result = list(nodenames)
5393 def Exec(self, feedback_fn):
5394 """Create and add the instance to the cluster.
5397 instance = self.op.instance_name
5398 pnode_name = self.pnode.name
5400 ht_kind = self.op.hypervisor
5401 if ht_kind in constants.HTS_REQ_PORT:
5402 network_port = self.cfg.AllocatePort()
5406 ##if self.op.vnc_bind_address is None:
5407 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5409 # this is needed because os.path.join does not accept None arguments
5410 if self.op.file_storage_dir is None:
5411 string_file_storage_dir = ""
5413 string_file_storage_dir = self.op.file_storage_dir
5415 # build the full file storage dir path
5416 file_storage_dir = os.path.normpath(os.path.join(
5417 self.cfg.GetFileStorageDir(),
5418 string_file_storage_dir, instance))
5421 disks = _GenerateDiskTemplate(self,
5422 self.op.disk_template,
5423 instance, pnode_name,
5427 self.op.file_driver,
5430 iobj = objects.Instance(name=instance, os=self.op.os_type,
5431 primary_node=pnode_name,
5432 nics=self.nics, disks=disks,
5433 disk_template=self.op.disk_template,
5435 network_port=network_port,
5436 beparams=self.op.beparams,
5437 hvparams=self.op.hvparams,
5438 hypervisor=self.op.hypervisor,
5441 feedback_fn("* creating instance disks...")
5443 _CreateDisks(self, iobj)
5444 except errors.OpExecError:
5445 self.LogWarning("Device creation failed, reverting...")
5447 _RemoveDisks(self, iobj)
5449 self.cfg.ReleaseDRBDMinors(instance)
5452 feedback_fn("adding instance %s to cluster config" % instance)
5454 self.cfg.AddInstance(iobj)
5455 # Declare that we don't want to remove the instance lock anymore, as we've
5456 # added the instance to the config
5457 del self.remove_locks[locking.LEVEL_INSTANCE]
5458 # Unlock all the nodes
5459 if self.op.mode == constants.INSTANCE_IMPORT:
5460 nodes_keep = [self.op.src_node]
5461 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5462 if node != self.op.src_node]
5463 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5464 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5466 self.context.glm.release(locking.LEVEL_NODE)
5467 del self.acquired_locks[locking.LEVEL_NODE]
5469 if self.op.wait_for_sync:
5470 disk_abort = not _WaitForSync(self, iobj)
5471 elif iobj.disk_template in constants.DTS_NET_MIRROR:
5472 # make sure the disks are not degraded (still sync-ing is ok)
5474 feedback_fn("* checking mirrors status")
5475 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5480 _RemoveDisks(self, iobj)
5481 self.cfg.RemoveInstance(iobj.name)
5482 # Make sure the instance lock gets removed
5483 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5484 raise errors.OpExecError("There are some degraded disks for"
5487 feedback_fn("creating os for instance %s on node %s" %
5488 (instance, pnode_name))
5490 if iobj.disk_template != constants.DT_DISKLESS:
5491 if self.op.mode == constants.INSTANCE_CREATE:
5492 feedback_fn("* running the instance OS create scripts...")
5493 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5494 result.Raise("Could not add os for instance %s"
5495 " on node %s" % (instance, pnode_name))
5497 elif self.op.mode == constants.INSTANCE_IMPORT:
5498 feedback_fn("* running the instance OS import scripts...")
5499 src_node = self.op.src_node
5500 src_images = self.src_images
5501 cluster_name = self.cfg.GetClusterName()
5502 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5503 src_node, src_images,
5505 msg = import_result.fail_msg
5507 self.LogWarning("Error while importing the disk images for instance"
5508 " %s on node %s: %s" % (instance, pnode_name, msg))
5510 # also checked in the prereq part
5511 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5515 iobj.admin_up = True
5516 self.cfg.Update(iobj)
5517 logging.info("Starting instance %s on node %s", instance, pnode_name)
5518 feedback_fn("* starting instance...")
5519 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5520 result.Raise("Could not start instance")
5522 return list(iobj.all_nodes)
5525 class LUConnectConsole(NoHooksLU):
5526 """Connect to an instance's console.
5528 This is somewhat special in that it returns the command line that
5529 you need to run on the master node in order to connect to the
5533 _OP_REQP = ["instance_name"]
5536 def ExpandNames(self):
5537 self._ExpandAndLockInstance()
5539 def CheckPrereq(self):
5540 """Check prerequisites.
5542 This checks that the instance is in the cluster.
5545 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5546 assert self.instance is not None, \
5547 "Cannot retrieve locked instance %s" % self.op.instance_name
5548 _CheckNodeOnline(self, self.instance.primary_node)
5550 def Exec(self, feedback_fn):
5551 """Connect to the console of an instance
5554 instance = self.instance
5555 node = instance.primary_node
5557 node_insts = self.rpc.call_instance_list([node],
5558 [instance.hypervisor])[node]
5559 node_insts.Raise("Can't get node information from %s" % node)
5561 if instance.name not in node_insts.payload:
5562 raise errors.OpExecError("Instance %s is not running." % instance.name)
5564 logging.debug("Connecting to console of %s on %s", instance.name, node)
5566 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5567 cluster = self.cfg.GetClusterInfo()
5568 # beparams and hvparams are passed separately, to avoid editing the
5569 # instance and then saving the defaults in the instance itself.
5570 hvparams = cluster.FillHV(instance)
5571 beparams = cluster.FillBE(instance)
5572 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5575 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5578 class LUReplaceDisks(LogicalUnit):
5579 """Replace the disks of an instance.
5582 HPATH = "mirrors-replace"
5583 HTYPE = constants.HTYPE_INSTANCE
5584 _OP_REQP = ["instance_name", "mode", "disks"]
5587 def CheckArguments(self):
5588 if not hasattr(self.op, "remote_node"):
5589 self.op.remote_node = None
5590 if not hasattr(self.op, "iallocator"):
5591 self.op.iallocator = None
5593 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
5596 def ExpandNames(self):
5597 self._ExpandAndLockInstance()
5599 if self.op.iallocator is not None:
5600 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5602 elif self.op.remote_node is not None:
5603 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5604 if remote_node is None:
5605 raise errors.OpPrereqError("Node '%s' not known" %
5606 self.op.remote_node)
5608 self.op.remote_node = remote_node
5610 # Warning: do not remove the locking of the new secondary here
5611 # unless DRBD8.AddChildren is changed to work in parallel;
5612 # currently it doesn't since parallel invocations of
5613 # FindUnusedMinor will conflict
5614 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5615 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5618 self.needed_locks[locking.LEVEL_NODE] = []
5619 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5621 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
5622 self.op.iallocator, self.op.remote_node,
5625 self.tasklets = [self.replacer]
5627 def DeclareLocks(self, level):
5628 # If we're not already locking all nodes in the set we have to declare the
5629 # instance's primary/secondary nodes.
5630 if (level == locking.LEVEL_NODE and
5631 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5632 self._LockInstancesNodes()
5634 def BuildHooksEnv(self):
5637 This runs on the master, the primary and all the secondaries.
5640 instance = self.replacer.instance
5642 "MODE": self.op.mode,
5643 "NEW_SECONDARY": self.op.remote_node,
5644 "OLD_SECONDARY": instance.secondary_nodes[0],
5646 env.update(_BuildInstanceHookEnvByObject(self, instance))
5648 self.cfg.GetMasterNode(),
5649 instance.primary_node,
5651 if self.op.remote_node is not None:
5652 nl.append(self.op.remote_node)
5656 class LUEvacuateNode(LogicalUnit):
5657 """Relocate the secondary instances from a node.
5660 HPATH = "node-evacuate"
5661 HTYPE = constants.HTYPE_NODE
5662 _OP_REQP = ["node_name"]
5665 def CheckArguments(self):
5666 if not hasattr(self.op, "remote_node"):
5667 self.op.remote_node = None
5668 if not hasattr(self.op, "iallocator"):
5669 self.op.iallocator = None
5671 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
5672 self.op.remote_node,
5675 def ExpandNames(self):
5676 self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
5677 if self.op.node_name is None:
5678 raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
5680 self.needed_locks = {}
5682 # Declare node locks
5683 if self.op.iallocator is not None:
5684 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5686 elif self.op.remote_node is not None:
5687 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5688 if remote_node is None:
5689 raise errors.OpPrereqError("Node '%s' not known" %
5690 self.op.remote_node)
5692 self.op.remote_node = remote_node
5694 # Warning: do not remove the locking of the new secondary here
5695 # unless DRBD8.AddChildren is changed to work in parallel;
5696 # currently it doesn't since parallel invocations of
5697 # FindUnusedMinor will conflict
5698 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5699 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5702 raise errors.OpPrereqError("Invalid parameters")
5704 # Create tasklets for replacing disks for all secondary instances on this
5709 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
5710 logging.debug("Replacing disks for instance %s", inst.name)
5711 names.append(inst.name)
5713 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
5714 self.op.iallocator, self.op.remote_node, [])
5715 tasklets.append(replacer)
5717 self.tasklets = tasklets
5718 self.instance_names = names
5720 # Declare instance locks
5721 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
5723 def DeclareLocks(self, level):
5724 # If we're not already locking all nodes in the set we have to declare the
5725 # instance's primary/secondary nodes.
5726 if (level == locking.LEVEL_NODE and
5727 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5728 self._LockInstancesNodes()
5730 def BuildHooksEnv(self):
5733 This runs on the master, the primary and all the secondaries.
5737 "NODE_NAME": self.op.node_name,
5740 nl = [self.cfg.GetMasterNode()]
5742 if self.op.remote_node is not None:
5743 env["NEW_SECONDARY"] = self.op.remote_node
5744 nl.append(self.op.remote_node)
5746 return (env, nl, nl)
5749 class TLReplaceDisks(Tasklet):
5750 """Replaces disks for an instance.
5752 Note: Locking is not within the scope of this class.
5755 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
5757 """Initializes this class.
5760 Tasklet.__init__(self, lu)
5763 self.instance_name = instance_name
5765 self.iallocator_name = iallocator_name
5766 self.remote_node = remote_node
5770 self.instance = None
5771 self.new_node = None
5772 self.target_node = None
5773 self.other_node = None
5774 self.remote_node_info = None
5775 self.node_secondary_ip = None
5778 def CheckArguments(mode, remote_node, iallocator):
5779 """Helper function for users of this class.
5782 # check for valid parameter combination
5783 if mode == constants.REPLACE_DISK_CHG:
5784 if remote_node is None and iallocator is None:
5785 raise errors.OpPrereqError("When changing the secondary either an"
5786 " iallocator script must be used or the"
5789 if remote_node is not None and iallocator is not None:
5790 raise errors.OpPrereqError("Give either the iallocator or the new"
5791 " secondary, not both")
5793 elif remote_node is not None or iallocator is not None:
5794 # Not replacing the secondary
5795 raise errors.OpPrereqError("The iallocator and new node options can"
5796 " only be used when changing the"
5800 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
5801 """Compute a new secondary node using an IAllocator.
5804 ial = IAllocator(lu.cfg, lu.rpc,
5805 mode=constants.IALLOCATOR_MODE_RELOC,
5807 relocate_from=relocate_from)
5809 ial.Run(iallocator_name)
5812 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
5813 " %s" % (iallocator_name, ial.info))
5815 if len(ial.nodes) != ial.required_nodes:
5816 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5817 " of nodes (%s), required %s" %
5818 (len(ial.nodes), ial.required_nodes))
5820 remote_node_name = ial.nodes[0]
5822 lu.LogInfo("Selected new secondary for instance '%s': %s",
5823 instance_name, remote_node_name)
5825 return remote_node_name
5827 def _FindFaultyDisks(self, node_name):
5830 for dev in self.instance.disks:
5831 self.cfg.SetDiskID(dev, node_name)
5833 result = self.rpc.call_blockdev_getmirrorstatus(node_name,
5834 self.instance.disks)
5835 result.Raise("Failed to get disk status from node %s" % node_name,
5838 for idx, bdev_status in enumerate(result.payload):
5839 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
5844 def CheckPrereq(self):
5845 """Check prerequisites.
5847 This checks that the instance is in the cluster.
5850 self.instance = self.cfg.GetInstanceInfo(self.instance_name)
5851 assert self.instance is not None, \
5852 "Cannot retrieve locked instance %s" % self.instance_name
5854 if self.instance.disk_template != constants.DT_DRBD8:
5855 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5858 if len(self.instance.secondary_nodes) != 1:
5859 raise errors.OpPrereqError("The instance has a strange layout,"
5860 " expected one secondary but found %d" %
5861 len(self.instance.secondary_nodes))
5863 secondary_node = self.instance.secondary_nodes[0]
5865 if self.iallocator_name is None:
5866 remote_node = self.remote_node
5868 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
5869 self.instance.name, secondary_node)
5871 if remote_node is not None:
5872 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5873 assert self.remote_node_info is not None, \
5874 "Cannot retrieve locked node %s" % remote_node
5876 self.remote_node_info = None
5878 if remote_node == self.instance.primary_node:
5879 raise errors.OpPrereqError("The specified node is the primary node of"
5882 if remote_node == secondary_node:
5883 raise errors.OpPrereqError("The specified node is already the"
5884 " secondary node of the instance.")
5886 if self.mode == constants.REPLACE_DISK_AUTO:
5888 raise errors.OpPrereqError("Cannot specify disks to be replaced")
5890 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
5891 faulty_secondary = self._FindFaultyDisks(secondary_node)
5893 if faulty_primary and faulty_secondary:
5894 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
5895 " one node and can not be repaired"
5896 " automatically" % self.instance_name)
5899 self.disks = faulty_primary
5900 self.target_node = self.instance.primary_node
5901 self.other_node = secondary_node
5902 check_nodes = [self.target_node, self.other_node]
5903 elif faulty_secondary:
5904 self.disks = faulty_secondary
5905 self.target_node = secondary_node
5906 self.other_node = self.instance.primary_node
5907 check_nodes = [self.target_node, self.other_node]
5913 # Non-automatic modes
5914 if self.mode == constants.REPLACE_DISK_PRI:
5915 self.target_node = self.instance.primary_node
5916 self.other_node = secondary_node
5917 check_nodes = [self.target_node, self.other_node]
5919 elif self.mode == constants.REPLACE_DISK_SEC:
5920 self.target_node = secondary_node
5921 self.other_node = self.instance.primary_node
5922 check_nodes = [self.target_node, self.other_node]
5924 elif self.mode == constants.REPLACE_DISK_CHG:
5925 self.new_node = remote_node
5926 self.other_node = self.instance.primary_node
5927 self.target_node = secondary_node
5928 check_nodes = [self.new_node, self.other_node]
5930 _CheckNodeNotDrained(self.lu, remote_node)
5933 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
5936 # If not specified all disks should be replaced
5938 self.disks = range(len(self.instance.disks))
5940 for node in check_nodes:
5941 _CheckNodeOnline(self.lu, node)
5943 # Check whether disks are valid
5944 for disk_idx in self.disks:
5945 self.instance.FindDisk(disk_idx)
5947 # Get secondary node IP addresses
5950 for node_name in [self.target_node, self.other_node, self.new_node]:
5951 if node_name is not None:
5952 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
5954 self.node_secondary_ip = node_2nd_ip
5956 def Exec(self, feedback_fn):
5957 """Execute disk replacement.
5959 This dispatches the disk replacement to the appropriate handler.
5963 feedback_fn("No disks need replacement")
5966 feedback_fn("Replacing disk(s) %s for %s" %
5967 (", ".join([str(i) for i in self.disks]), self.instance.name))
5969 activate_disks = (not self.instance.admin_up)
5971 # Activate the instance disks if we're replacing them on a down instance
5973 _StartInstanceDisks(self.lu, self.instance, True)
5976 # Should we replace the secondary node?
5977 if self.new_node is not None:
5978 return self._ExecDrbd8Secondary()
5980 return self._ExecDrbd8DiskOnly()
5983 # Deactivate the instance disks if we're replacing them on a down instance
5985 _SafeShutdownInstanceDisks(self.lu, self.instance)
5987 def _CheckVolumeGroup(self, nodes):
5988 self.lu.LogInfo("Checking volume groups")
5990 vgname = self.cfg.GetVGName()
5992 # Make sure volume group exists on all involved nodes
5993 results = self.rpc.call_vg_list(nodes)
5995 raise errors.OpExecError("Can't list volume groups on the nodes")
5999 res.Raise("Error checking node %s" % node)
6000 if vgname not in res.payload:
6001 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6004 def _CheckDisksExistence(self, nodes):
6005 # Check disk existence
6006 for idx, dev in enumerate(self.instance.disks):
6007 if idx not in self.disks:
6011 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6012 self.cfg.SetDiskID(dev, node)
6014 result = self.rpc.call_blockdev_find(node, dev)
6016 msg = result.fail_msg
6017 if msg or not result.payload:
6019 msg = "disk not found"
6020 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6023 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6024 for idx, dev in enumerate(self.instance.disks):
6025 if idx not in self.disks:
6028 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6031 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6033 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6034 " replace disks for instance %s" %
6035 (node_name, self.instance.name))
6037 def _CreateNewStorage(self, node_name):
6038 vgname = self.cfg.GetVGName()
6041 for idx, dev in enumerate(self.instance.disks):
6042 if idx not in self.disks:
6045 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6047 self.cfg.SetDiskID(dev, node_name)
6049 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6050 names = _GenerateUniqueNames(self.lu, lv_names)
6052 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6053 logical_id=(vgname, names[0]))
6054 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6055 logical_id=(vgname, names[1]))
6057 new_lvs = [lv_data, lv_meta]
6058 old_lvs = dev.children
6059 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6061 # we pass force_create=True to force the LVM creation
6062 for new_lv in new_lvs:
6063 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6064 _GetInstanceInfoText(self.instance), False)
6068 def _CheckDevices(self, node_name, iv_names):
6069 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
6070 self.cfg.SetDiskID(dev, node_name)
6072 result = self.rpc.call_blockdev_find(node_name, dev)
6074 msg = result.fail_msg
6075 if msg or not result.payload:
6077 msg = "disk not found"
6078 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6081 if result.payload.is_degraded:
6082 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6084 def _RemoveOldStorage(self, node_name, iv_names):
6085 for name, (dev, old_lvs, _) in iv_names.iteritems():
6086 self.lu.LogInfo("Remove logical volumes for %s" % name)
6089 self.cfg.SetDiskID(lv, node_name)
6091 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6093 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6094 hint="remove unused LVs manually")
6096 def _ExecDrbd8DiskOnly(self):
6097 """Replace a disk on the primary or secondary for DRBD 8.
6099 The algorithm for replace is quite complicated:
6101 1. for each disk to be replaced:
6103 1. create new LVs on the target node with unique names
6104 1. detach old LVs from the drbd device
6105 1. rename old LVs to name_replaced.<time_t>
6106 1. rename new LVs to old LVs
6107 1. attach the new LVs (with the old names now) to the drbd device
6109 1. wait for sync across all devices
6111 1. for each modified disk:
6113 1. remove old LVs (which have the name name_replaces.<time_t>)
6115 Failures are not very well handled.
6120 # Step: check device activation
6121 self.lu.LogStep(1, steps_total, "Check device existence")
6122 self._CheckDisksExistence([self.other_node, self.target_node])
6123 self._CheckVolumeGroup([self.target_node, self.other_node])
6125 # Step: check other node consistency
6126 self.lu.LogStep(2, steps_total, "Check peer consistency")
6127 self._CheckDisksConsistency(self.other_node,
6128 self.other_node == self.instance.primary_node,
6131 # Step: create new storage
6132 self.lu.LogStep(3, steps_total, "Allocate new storage")
6133 iv_names = self._CreateNewStorage(self.target_node)
6135 # Step: for each lv, detach+rename*2+attach
6136 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6137 for dev, old_lvs, new_lvs in iv_names.itervalues():
6138 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6140 result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs)
6141 result.Raise("Can't detach drbd from local storage on node"
6142 " %s for device %s" % (self.target_node, dev.iv_name))
6144 #cfg.Update(instance)
6146 # ok, we created the new LVs, so now we know we have the needed
6147 # storage; as such, we proceed on the target node to rename
6148 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6149 # using the assumption that logical_id == physical_id (which in
6150 # turn is the unique_id on that node)
6152 # FIXME(iustin): use a better name for the replaced LVs
6153 temp_suffix = int(time.time())
6154 ren_fn = lambda d, suff: (d.physical_id[0],
6155 d.physical_id[1] + "_replaced-%s" % suff)
6157 # Build the rename list based on what LVs exist on the node
6158 rename_old_to_new = []
6159 for to_ren in old_lvs:
6160 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6161 if not result.fail_msg and result.payload:
6163 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6165 self.lu.LogInfo("Renaming the old LVs on the target node")
6166 result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new)
6167 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6169 # Now we rename the new LVs to the old LVs
6170 self.lu.LogInfo("Renaming the new LVs on the target node")
6171 rename_new_to_old = [(new, old.physical_id)
6172 for old, new in zip(old_lvs, new_lvs)]
6173 result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old)
6174 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6176 for old, new in zip(old_lvs, new_lvs):
6177 new.logical_id = old.logical_id
6178 self.cfg.SetDiskID(new, self.target_node)
6180 for disk in old_lvs:
6181 disk.logical_id = ren_fn(disk, temp_suffix)
6182 self.cfg.SetDiskID(disk, self.target_node)
6184 # Now that the new lvs have the old name, we can add them to the device
6185 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6186 result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs)
6187 msg = result.fail_msg
6189 for new_lv in new_lvs:
6190 msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg
6192 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6193 hint=("cleanup manually the unused logical"
6195 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6197 dev.children = new_lvs
6199 self.cfg.Update(self.instance)
6202 # This can fail as the old devices are degraded and _WaitForSync
6203 # does a combined result over all disks, so we don't check its return value
6204 self.lu.LogStep(5, steps_total, "Sync devices")
6205 _WaitForSync(self.lu, self.instance, unlock=True)
6207 # Check all devices manually
6208 self._CheckDevices(self.instance.primary_node, iv_names)
6210 # Step: remove old storage
6211 self.lu.LogStep(6, steps_total, "Removing old storage")
6212 self._RemoveOldStorage(self.target_node, iv_names)
6214 def _ExecDrbd8Secondary(self):
6215 """Replace the secondary node for DRBD 8.
6217 The algorithm for replace is quite complicated:
6218 - for all disks of the instance:
6219 - create new LVs on the new node with same names
6220 - shutdown the drbd device on the old secondary
6221 - disconnect the drbd network on the primary
6222 - create the drbd device on the new secondary
6223 - network attach the drbd on the primary, using an artifice:
6224 the drbd code for Attach() will connect to the network if it
6225 finds a device which is connected to the good local disks but
6227 - wait for sync across all devices
6228 - remove all disks from the old secondary
6230 Failures are not very well handled.
6235 # Step: check device activation
6236 self.lu.LogStep(1, steps_total, "Check device existence")
6237 self._CheckDisksExistence([self.instance.primary_node])
6238 self._CheckVolumeGroup([self.instance.primary_node])
6240 # Step: check other node consistency
6241 self.lu.LogStep(2, steps_total, "Check peer consistency")
6242 self._CheckDisksConsistency(self.instance.primary_node, True, True)
6244 # Step: create new storage
6245 self.lu.LogStep(3, steps_total, "Allocate new storage")
6246 for idx, dev in enumerate(self.instance.disks):
6247 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6248 (self.new_node, idx))
6249 # we pass force_create=True to force LVM creation
6250 for new_lv in dev.children:
6251 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6252 _GetInstanceInfoText(self.instance), False)
6254 # Step 4: dbrd minors and drbd setups changes
6255 # after this, we must manually remove the drbd minors on both the
6256 # error and the success paths
6257 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6258 minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks],
6260 logging.debug("Allocated minors %r" % (minors,))
6263 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6264 self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx))
6265 # create new devices on new_node; note that we create two IDs:
6266 # one without port, so the drbd will be activated without
6267 # networking information on the new node at this stage, and one
6268 # with network, for the latter activation in step 4
6269 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6270 if self.instance.primary_node == o_node1:
6275 new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret)
6276 new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret)
6278 iv_names[idx] = (dev, dev.children, new_net_id)
6279 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6281 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6282 logical_id=new_alone_id,
6283 children=dev.children,
6286 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6287 _GetInstanceInfoText(self.instance), False)
6288 except errors.GenericError:
6289 self.cfg.ReleaseDRBDMinors(self.instance.name)
6292 # We have new devices, shutdown the drbd on the old secondary
6293 for idx, dev in enumerate(self.instance.disks):
6294 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6295 self.cfg.SetDiskID(dev, self.target_node)
6296 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6298 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6299 "node: %s" % (idx, msg),
6300 hint=("Please cleanup this device manually as"
6301 " soon as possible"))
6303 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6304 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip,
6305 self.instance.disks)[self.instance.primary_node]
6307 msg = result.fail_msg
6309 # detaches didn't succeed (unlikely)
6310 self.cfg.ReleaseDRBDMinors(self.instance.name)
6311 raise errors.OpExecError("Can't detach the disks from the network on"
6312 " old node: %s" % (msg,))
6314 # if we managed to detach at least one, we update all the disks of
6315 # the instance to point to the new secondary
6316 self.lu.LogInfo("Updating instance configuration")
6317 for dev, _, new_logical_id in iv_names.itervalues():
6318 dev.logical_id = new_logical_id
6319 self.cfg.SetDiskID(dev, self.instance.primary_node)
6321 self.cfg.Update(self.instance)
6323 # and now perform the drbd attach
6324 self.lu.LogInfo("Attaching primary drbds to new secondary"
6325 " (standalone => connected)")
6326 result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip,
6327 self.instance.disks, self.instance.name,
6329 for to_node, to_result in result.items():
6330 msg = to_result.fail_msg
6332 self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg,
6333 hint=("please do a gnt-instance info to see the"
6334 " status of disks"))
6337 # This can fail as the old devices are degraded and _WaitForSync
6338 # does a combined result over all disks, so we don't check its return value
6339 self.lu.LogStep(5, steps_total, "Sync devices")
6340 _WaitForSync(self.lu, self.instance, unlock=True)
6342 # Check all devices manually
6343 self._CheckDevices(self.instance.primary_node, iv_names)
6345 # Step: remove old storage
6346 self.lu.LogStep(6, steps_total, "Removing old storage")
6347 self._RemoveOldStorage(self.target_node, iv_names)
6350 class LUGrowDisk(LogicalUnit):
6351 """Grow a disk of an instance.
6355 HTYPE = constants.HTYPE_INSTANCE
6356 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6359 def ExpandNames(self):
6360 self._ExpandAndLockInstance()
6361 self.needed_locks[locking.LEVEL_NODE] = []
6362 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6364 def DeclareLocks(self, level):
6365 if level == locking.LEVEL_NODE:
6366 self._LockInstancesNodes()
6368 def BuildHooksEnv(self):
6371 This runs on the master, the primary and all the secondaries.
6375 "DISK": self.op.disk,
6376 "AMOUNT": self.op.amount,
6378 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6380 self.cfg.GetMasterNode(),
6381 self.instance.primary_node,
6385 def CheckPrereq(self):
6386 """Check prerequisites.
6388 This checks that the instance is in the cluster.
6391 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6392 assert instance is not None, \
6393 "Cannot retrieve locked instance %s" % self.op.instance_name
6394 nodenames = list(instance.all_nodes)
6395 for node in nodenames:
6396 _CheckNodeOnline(self, node)
6399 self.instance = instance
6401 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6402 raise errors.OpPrereqError("Instance's disk layout does not support"
6405 self.disk = instance.FindDisk(self.op.disk)
6407 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6408 instance.hypervisor)
6409 for node in nodenames:
6410 info = nodeinfo[node]
6411 info.Raise("Cannot get current information from node %s" % node)
6412 vg_free = info.payload.get('vg_free', None)
6413 if not isinstance(vg_free, int):
6414 raise errors.OpPrereqError("Can't compute free disk space on"
6416 if self.op.amount > vg_free:
6417 raise errors.OpPrereqError("Not enough disk space on target node %s:"
6418 " %d MiB available, %d MiB required" %
6419 (node, vg_free, self.op.amount))
6421 def Exec(self, feedback_fn):
6422 """Execute disk grow.
6425 instance = self.instance
6427 for node in instance.all_nodes:
6428 self.cfg.SetDiskID(disk, node)
6429 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6430 result.Raise("Grow request failed to node %s" % node)
6431 disk.RecordGrow(self.op.amount)
6432 self.cfg.Update(instance)
6433 if self.op.wait_for_sync:
6434 disk_abort = not _WaitForSync(self, instance)
6436 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6437 " status.\nPlease check the instance.")
6440 class LUQueryInstanceData(NoHooksLU):
6441 """Query runtime instance data.
6444 _OP_REQP = ["instances", "static"]
6447 def ExpandNames(self):
6448 self.needed_locks = {}
6449 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6451 if not isinstance(self.op.instances, list):
6452 raise errors.OpPrereqError("Invalid argument type 'instances'")
6454 if self.op.instances:
6455 self.wanted_names = []
6456 for name in self.op.instances:
6457 full_name = self.cfg.ExpandInstanceName(name)
6458 if full_name is None:
6459 raise errors.OpPrereqError("Instance '%s' not known" % name)
6460 self.wanted_names.append(full_name)
6461 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6463 self.wanted_names = None
6464 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6466 self.needed_locks[locking.LEVEL_NODE] = []
6467 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6469 def DeclareLocks(self, level):
6470 if level == locking.LEVEL_NODE:
6471 self._LockInstancesNodes()
6473 def CheckPrereq(self):
6474 """Check prerequisites.
6476 This only checks the optional instance list against the existing names.
6479 if self.wanted_names is None:
6480 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
6482 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
6483 in self.wanted_names]
6486 def _ComputeBlockdevStatus(self, node, instance_name, dev):
6487 """Returns the status of a block device
6493 self.cfg.SetDiskID(dev, node)
6495 result = self.rpc.call_blockdev_find(node, dev)
6499 result.Raise("Can't compute disk status for %s" % instance_name)
6501 status = result.payload
6505 return (status.dev_path, status.major, status.minor,
6506 status.sync_percent, status.estimated_time,
6507 status.is_degraded, status.ldisk_status)
6509 def _ComputeDiskStatus(self, instance, snode, dev):
6510 """Compute block device status.
6513 if dev.dev_type in constants.LDS_DRBD:
6514 # we change the snode then (otherwise we use the one passed in)
6515 if dev.logical_id[0] == instance.primary_node:
6516 snode = dev.logical_id[1]
6518 snode = dev.logical_id[0]
6520 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
6522 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
6525 dev_children = [self._ComputeDiskStatus(instance, snode, child)
6526 for child in dev.children]
6531 "iv_name": dev.iv_name,
6532 "dev_type": dev.dev_type,
6533 "logical_id": dev.logical_id,
6534 "physical_id": dev.physical_id,
6535 "pstatus": dev_pstatus,
6536 "sstatus": dev_sstatus,
6537 "children": dev_children,
6544 def Exec(self, feedback_fn):
6545 """Gather and return data"""
6548 cluster = self.cfg.GetClusterInfo()
6550 for instance in self.wanted_instances:
6551 if not self.op.static:
6552 remote_info = self.rpc.call_instance_info(instance.primary_node,
6554 instance.hypervisor)
6555 remote_info.Raise("Error checking node %s" % instance.primary_node)
6556 remote_info = remote_info.payload
6557 if remote_info and "state" in remote_info:
6560 remote_state = "down"
6563 if instance.admin_up:
6566 config_state = "down"
6568 disks = [self._ComputeDiskStatus(instance, None, device)
6569 for device in instance.disks]
6572 "name": instance.name,
6573 "config_state": config_state,
6574 "run_state": remote_state,
6575 "pnode": instance.primary_node,
6576 "snodes": instance.secondary_nodes,
6578 # this happens to be the same format used for hooks
6579 "nics": _NICListToTuple(self, instance.nics),
6581 "hypervisor": instance.hypervisor,
6582 "network_port": instance.network_port,
6583 "hv_instance": instance.hvparams,
6584 "hv_actual": cluster.FillHV(instance),
6585 "be_instance": instance.beparams,
6586 "be_actual": cluster.FillBE(instance),
6589 result[instance.name] = idict
6594 class LUSetInstanceParams(LogicalUnit):
6595 """Modifies an instances's parameters.
6598 HPATH = "instance-modify"
6599 HTYPE = constants.HTYPE_INSTANCE
6600 _OP_REQP = ["instance_name"]
6603 def CheckArguments(self):
6604 if not hasattr(self.op, 'nics'):
6606 if not hasattr(self.op, 'disks'):
6608 if not hasattr(self.op, 'beparams'):
6609 self.op.beparams = {}
6610 if not hasattr(self.op, 'hvparams'):
6611 self.op.hvparams = {}
6612 self.op.force = getattr(self.op, "force", False)
6613 if not (self.op.nics or self.op.disks or
6614 self.op.hvparams or self.op.beparams):
6615 raise errors.OpPrereqError("No changes submitted")
6619 for disk_op, disk_dict in self.op.disks:
6620 if disk_op == constants.DDM_REMOVE:
6623 elif disk_op == constants.DDM_ADD:
6626 if not isinstance(disk_op, int):
6627 raise errors.OpPrereqError("Invalid disk index")
6628 if not isinstance(disk_dict, dict):
6629 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
6630 raise errors.OpPrereqError(msg)
6632 if disk_op == constants.DDM_ADD:
6633 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
6634 if mode not in constants.DISK_ACCESS_SET:
6635 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
6636 size = disk_dict.get('size', None)
6638 raise errors.OpPrereqError("Required disk parameter size missing")
6641 except ValueError, err:
6642 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
6644 disk_dict['size'] = size
6646 # modification of disk
6647 if 'size' in disk_dict:
6648 raise errors.OpPrereqError("Disk size change not possible, use"
6651 if disk_addremove > 1:
6652 raise errors.OpPrereqError("Only one disk add or remove operation"
6653 " supported at a time")
6657 for nic_op, nic_dict in self.op.nics:
6658 if nic_op == constants.DDM_REMOVE:
6661 elif nic_op == constants.DDM_ADD:
6664 if not isinstance(nic_op, int):
6665 raise errors.OpPrereqError("Invalid nic index")
6666 if not isinstance(nic_dict, dict):
6667 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
6668 raise errors.OpPrereqError(msg)
6670 # nic_dict should be a dict
6671 nic_ip = nic_dict.get('ip', None)
6672 if nic_ip is not None:
6673 if nic_ip.lower() == constants.VALUE_NONE:
6674 nic_dict['ip'] = None
6676 if not utils.IsValidIP(nic_ip):
6677 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
6679 nic_bridge = nic_dict.get('bridge', None)
6680 nic_link = nic_dict.get('link', None)
6681 if nic_bridge and nic_link:
6682 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
6683 " at the same time")
6684 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
6685 nic_dict['bridge'] = None
6686 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
6687 nic_dict['link'] = None
6689 if nic_op == constants.DDM_ADD:
6690 nic_mac = nic_dict.get('mac', None)
6692 nic_dict['mac'] = constants.VALUE_AUTO
6694 if 'mac' in nic_dict:
6695 nic_mac = nic_dict['mac']
6696 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6697 if not utils.IsValidMac(nic_mac):
6698 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6699 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6700 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6701 " modifying an existing nic")
6703 if nic_addremove > 1:
6704 raise errors.OpPrereqError("Only one NIC add or remove operation"
6705 " supported at a time")
6707 def ExpandNames(self):
6708 self._ExpandAndLockInstance()
6709 self.needed_locks[locking.LEVEL_NODE] = []
6710 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6712 def DeclareLocks(self, level):
6713 if level == locking.LEVEL_NODE:
6714 self._LockInstancesNodes()
6716 def BuildHooksEnv(self):
6719 This runs on the master, primary and secondaries.
6723 if constants.BE_MEMORY in self.be_new:
6724 args['memory'] = self.be_new[constants.BE_MEMORY]
6725 if constants.BE_VCPUS in self.be_new:
6726 args['vcpus'] = self.be_new[constants.BE_VCPUS]
6727 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6728 # information at all.
6731 nic_override = dict(self.op.nics)
6732 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6733 for idx, nic in enumerate(self.instance.nics):
6734 if idx in nic_override:
6735 this_nic_override = nic_override[idx]
6737 this_nic_override = {}
6738 if 'ip' in this_nic_override:
6739 ip = this_nic_override['ip']
6742 if 'mac' in this_nic_override:
6743 mac = this_nic_override['mac']
6746 if idx in self.nic_pnew:
6747 nicparams = self.nic_pnew[idx]
6749 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6750 mode = nicparams[constants.NIC_MODE]
6751 link = nicparams[constants.NIC_LINK]
6752 args['nics'].append((ip, mac, mode, link))
6753 if constants.DDM_ADD in nic_override:
6754 ip = nic_override[constants.DDM_ADD].get('ip', None)
6755 mac = nic_override[constants.DDM_ADD]['mac']
6756 nicparams = self.nic_pnew[constants.DDM_ADD]
6757 mode = nicparams[constants.NIC_MODE]
6758 link = nicparams[constants.NIC_LINK]
6759 args['nics'].append((ip, mac, mode, link))
6760 elif constants.DDM_REMOVE in nic_override:
6761 del args['nics'][-1]
6763 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6764 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6767 def _GetUpdatedParams(self, old_params, update_dict,
6768 default_values, parameter_types):
6769 """Return the new params dict for the given params.
6771 @type old_params: dict
6772 @param old_params: old parameters
6773 @type update_dict: dict
6774 @param update_dict: dict containing new parameter values,
6775 or constants.VALUE_DEFAULT to reset the
6776 parameter to its default value
6777 @type default_values: dict
6778 @param default_values: default values for the filled parameters
6779 @type parameter_types: dict
6780 @param parameter_types: dict mapping target dict keys to types
6781 in constants.ENFORCEABLE_TYPES
6782 @rtype: (dict, dict)
6783 @return: (new_parameters, filled_parameters)
6786 params_copy = copy.deepcopy(old_params)
6787 for key, val in update_dict.iteritems():
6788 if val == constants.VALUE_DEFAULT:
6790 del params_copy[key]
6794 params_copy[key] = val
6795 utils.ForceDictType(params_copy, parameter_types)
6796 params_filled = objects.FillDict(default_values, params_copy)
6797 return (params_copy, params_filled)
6799 def CheckPrereq(self):
6800 """Check prerequisites.
6802 This only checks the instance list against the existing names.
6805 self.force = self.op.force
6807 # checking the new params on the primary/secondary nodes
6809 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6810 cluster = self.cluster = self.cfg.GetClusterInfo()
6811 assert self.instance is not None, \
6812 "Cannot retrieve locked instance %s" % self.op.instance_name
6813 pnode = instance.primary_node
6814 nodelist = list(instance.all_nodes)
6816 # hvparams processing
6817 if self.op.hvparams:
6818 i_hvdict, hv_new = self._GetUpdatedParams(
6819 instance.hvparams, self.op.hvparams,
6820 cluster.hvparams[instance.hypervisor],
6821 constants.HVS_PARAMETER_TYPES)
6823 hypervisor.GetHypervisor(
6824 instance.hypervisor).CheckParameterSyntax(hv_new)
6825 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6826 self.hv_new = hv_new # the new actual values
6827 self.hv_inst = i_hvdict # the new dict (without defaults)
6829 self.hv_new = self.hv_inst = {}
6831 # beparams processing
6832 if self.op.beparams:
6833 i_bedict, be_new = self._GetUpdatedParams(
6834 instance.beparams, self.op.beparams,
6835 cluster.beparams[constants.PP_DEFAULT],
6836 constants.BES_PARAMETER_TYPES)
6837 self.be_new = be_new # the new actual values
6838 self.be_inst = i_bedict # the new dict (without defaults)
6840 self.be_new = self.be_inst = {}
6844 if constants.BE_MEMORY in self.op.beparams and not self.force:
6845 mem_check_list = [pnode]
6846 if be_new[constants.BE_AUTO_BALANCE]:
6847 # either we changed auto_balance to yes or it was from before
6848 mem_check_list.extend(instance.secondary_nodes)
6849 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6850 instance.hypervisor)
6851 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6852 instance.hypervisor)
6853 pninfo = nodeinfo[pnode]
6854 msg = pninfo.fail_msg
6856 # Assume the primary node is unreachable and go ahead
6857 self.warn.append("Can't get info from primary node %s: %s" %
6859 elif not isinstance(pninfo.payload.get('memory_free', None), int):
6860 self.warn.append("Node data from primary node %s doesn't contain"
6861 " free memory information" % pnode)
6862 elif instance_info.fail_msg:
6863 self.warn.append("Can't get instance runtime information: %s" %
6864 instance_info.fail_msg)
6866 if instance_info.payload:
6867 current_mem = int(instance_info.payload['memory'])
6869 # Assume instance not running
6870 # (there is a slight race condition here, but it's not very probable,
6871 # and we have no other way to check)
6873 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6874 pninfo.payload['memory_free'])
6876 raise errors.OpPrereqError("This change will prevent the instance"
6877 " from starting, due to %d MB of memory"
6878 " missing on its primary node" % miss_mem)
6880 if be_new[constants.BE_AUTO_BALANCE]:
6881 for node, nres in nodeinfo.items():
6882 if node not in instance.secondary_nodes:
6886 self.warn.append("Can't get info from secondary node %s: %s" %
6888 elif not isinstance(nres.payload.get('memory_free', None), int):
6889 self.warn.append("Secondary node %s didn't return free"
6890 " memory information" % node)
6891 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6892 self.warn.append("Not enough memory to failover instance to"
6893 " secondary node %s" % node)
6898 for nic_op, nic_dict in self.op.nics:
6899 if nic_op == constants.DDM_REMOVE:
6900 if not instance.nics:
6901 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6903 if nic_op != constants.DDM_ADD:
6905 if nic_op < 0 or nic_op >= len(instance.nics):
6906 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6908 (nic_op, len(instance.nics)))
6909 old_nic_params = instance.nics[nic_op].nicparams
6910 old_nic_ip = instance.nics[nic_op].ip
6915 update_params_dict = dict([(key, nic_dict[key])
6916 for key in constants.NICS_PARAMETERS
6917 if key in nic_dict])
6919 if 'bridge' in nic_dict:
6920 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6922 new_nic_params, new_filled_nic_params = \
6923 self._GetUpdatedParams(old_nic_params, update_params_dict,
6924 cluster.nicparams[constants.PP_DEFAULT],
6925 constants.NICS_PARAMETER_TYPES)
6926 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6927 self.nic_pinst[nic_op] = new_nic_params
6928 self.nic_pnew[nic_op] = new_filled_nic_params
6929 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6931 if new_nic_mode == constants.NIC_MODE_BRIDGED:
6932 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6933 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6935 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6937 self.warn.append(msg)
6939 raise errors.OpPrereqError(msg)
6940 if new_nic_mode == constants.NIC_MODE_ROUTED:
6941 if 'ip' in nic_dict:
6942 nic_ip = nic_dict['ip']
6946 raise errors.OpPrereqError('Cannot set the nic ip to None'
6948 if 'mac' in nic_dict:
6949 nic_mac = nic_dict['mac']
6951 raise errors.OpPrereqError('Cannot set the nic mac to None')
6952 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6953 # otherwise generate the mac
6954 nic_dict['mac'] = self.cfg.GenerateMAC()
6956 # or validate/reserve the current one
6957 if self.cfg.IsMacInUse(nic_mac):
6958 raise errors.OpPrereqError("MAC address %s already in use"
6959 " in cluster" % nic_mac)
6962 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6963 raise errors.OpPrereqError("Disk operations not supported for"
6964 " diskless instances")
6965 for disk_op, disk_dict in self.op.disks:
6966 if disk_op == constants.DDM_REMOVE:
6967 if len(instance.disks) == 1:
6968 raise errors.OpPrereqError("Cannot remove the last disk of"
6970 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6971 ins_l = ins_l[pnode]
6972 msg = ins_l.fail_msg
6974 raise errors.OpPrereqError("Can't contact node %s: %s" %
6976 if instance.name in ins_l.payload:
6977 raise errors.OpPrereqError("Instance is running, can't remove"
6980 if (disk_op == constants.DDM_ADD and
6981 len(instance.nics) >= constants.MAX_DISKS):
6982 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6983 " add more" % constants.MAX_DISKS)
6984 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6986 if disk_op < 0 or disk_op >= len(instance.disks):
6987 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6989 (disk_op, len(instance.disks)))
6993 def Exec(self, feedback_fn):
6994 """Modifies an instance.
6996 All parameters take effect only at the next restart of the instance.
6999 # Process here the warnings from CheckPrereq, as we don't have a
7000 # feedback_fn there.
7001 for warn in self.warn:
7002 feedback_fn("WARNING: %s" % warn)
7005 instance = self.instance
7006 cluster = self.cluster
7008 for disk_op, disk_dict in self.op.disks:
7009 if disk_op == constants.DDM_REMOVE:
7010 # remove the last disk
7011 device = instance.disks.pop()
7012 device_idx = len(instance.disks)
7013 for node, disk in device.ComputeNodeTree(instance.primary_node):
7014 self.cfg.SetDiskID(disk, node)
7015 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7017 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7018 " continuing anyway", device_idx, node, msg)
7019 result.append(("disk/%d" % device_idx, "remove"))
7020 elif disk_op == constants.DDM_ADD:
7022 if instance.disk_template == constants.DT_FILE:
7023 file_driver, file_path = instance.disks[0].logical_id
7024 file_path = os.path.dirname(file_path)
7026 file_driver = file_path = None
7027 disk_idx_base = len(instance.disks)
7028 new_disk = _GenerateDiskTemplate(self,
7029 instance.disk_template,
7030 instance.name, instance.primary_node,
7031 instance.secondary_nodes,
7036 instance.disks.append(new_disk)
7037 info = _GetInstanceInfoText(instance)
7039 logging.info("Creating volume %s for instance %s",
7040 new_disk.iv_name, instance.name)
7041 # Note: this needs to be kept in sync with _CreateDisks
7043 for node in instance.all_nodes:
7044 f_create = node == instance.primary_node
7046 _CreateBlockDev(self, node, instance, new_disk,
7047 f_create, info, f_create)
7048 except errors.OpExecError, err:
7049 self.LogWarning("Failed to create volume %s (%s) on"
7051 new_disk.iv_name, new_disk, node, err)
7052 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7053 (new_disk.size, new_disk.mode)))
7055 # change a given disk
7056 instance.disks[disk_op].mode = disk_dict['mode']
7057 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7059 for nic_op, nic_dict in self.op.nics:
7060 if nic_op == constants.DDM_REMOVE:
7061 # remove the last nic
7062 del instance.nics[-1]
7063 result.append(("nic.%d" % len(instance.nics), "remove"))
7064 elif nic_op == constants.DDM_ADD:
7065 # mac and bridge should be set, by now
7066 mac = nic_dict['mac']
7067 ip = nic_dict.get('ip', None)
7068 nicparams = self.nic_pinst[constants.DDM_ADD]
7069 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7070 instance.nics.append(new_nic)
7071 result.append(("nic.%d" % (len(instance.nics) - 1),
7072 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7073 (new_nic.mac, new_nic.ip,
7074 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7075 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7078 for key in 'mac', 'ip':
7080 setattr(instance.nics[nic_op], key, nic_dict[key])
7081 if nic_op in self.nic_pnew:
7082 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
7083 for key, val in nic_dict.iteritems():
7084 result.append(("nic.%s/%d" % (key, nic_op), val))
7087 if self.op.hvparams:
7088 instance.hvparams = self.hv_inst
7089 for key, val in self.op.hvparams.iteritems():
7090 result.append(("hv/%s" % key, val))
7093 if self.op.beparams:
7094 instance.beparams = self.be_inst
7095 for key, val in self.op.beparams.iteritems():
7096 result.append(("be/%s" % key, val))
7098 self.cfg.Update(instance)
7103 class LUQueryExports(NoHooksLU):
7104 """Query the exports list
7107 _OP_REQP = ['nodes']
7110 def ExpandNames(self):
7111 self.needed_locks = {}
7112 self.share_locks[locking.LEVEL_NODE] = 1
7113 if not self.op.nodes:
7114 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7116 self.needed_locks[locking.LEVEL_NODE] = \
7117 _GetWantedNodes(self, self.op.nodes)
7119 def CheckPrereq(self):
7120 """Check prerequisites.
7123 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
7125 def Exec(self, feedback_fn):
7126 """Compute the list of all the exported system images.
7129 @return: a dictionary with the structure node->(export-list)
7130 where export-list is a list of the instances exported on
7134 rpcresult = self.rpc.call_export_list(self.nodes)
7136 for node in rpcresult:
7137 if rpcresult[node].fail_msg:
7138 result[node] = False
7140 result[node] = rpcresult[node].payload
7145 class LUExportInstance(LogicalUnit):
7146 """Export an instance to an image in the cluster.
7149 HPATH = "instance-export"
7150 HTYPE = constants.HTYPE_INSTANCE
7151 _OP_REQP = ["instance_name", "target_node", "shutdown"]
7154 def ExpandNames(self):
7155 self._ExpandAndLockInstance()
7156 # FIXME: lock only instance primary and destination node
7158 # Sad but true, for now we have do lock all nodes, as we don't know where
7159 # the previous export might be, and and in this LU we search for it and
7160 # remove it from its current node. In the future we could fix this by:
7161 # - making a tasklet to search (share-lock all), then create the new one,
7162 # then one to remove, after
7163 # - removing the removal operation altogether
7164 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7166 def DeclareLocks(self, level):
7167 """Last minute lock declaration."""
7168 # All nodes are locked anyway, so nothing to do here.
7170 def BuildHooksEnv(self):
7173 This will run on the master, primary node and target node.
7177 "EXPORT_NODE": self.op.target_node,
7178 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
7180 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7181 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
7182 self.op.target_node]
7185 def CheckPrereq(self):
7186 """Check prerequisites.
7188 This checks that the instance and node names are valid.
7191 instance_name = self.op.instance_name
7192 self.instance = self.cfg.GetInstanceInfo(instance_name)
7193 assert self.instance is not None, \
7194 "Cannot retrieve locked instance %s" % self.op.instance_name
7195 _CheckNodeOnline(self, self.instance.primary_node)
7197 self.dst_node = self.cfg.GetNodeInfo(
7198 self.cfg.ExpandNodeName(self.op.target_node))
7200 if self.dst_node is None:
7201 # This is wrong node name, not a non-locked node
7202 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
7203 _CheckNodeOnline(self, self.dst_node.name)
7204 _CheckNodeNotDrained(self, self.dst_node.name)
7206 # instance disk type verification
7207 for disk in self.instance.disks:
7208 if disk.dev_type == constants.LD_FILE:
7209 raise errors.OpPrereqError("Export not supported for instances with"
7210 " file-based disks")
7212 def Exec(self, feedback_fn):
7213 """Export an instance to an image in the cluster.
7216 instance = self.instance
7217 dst_node = self.dst_node
7218 src_node = instance.primary_node
7219 if self.op.shutdown:
7220 # shutdown the instance, but not the disks
7221 result = self.rpc.call_instance_shutdown(src_node, instance)
7222 result.Raise("Could not shutdown instance %s on"
7223 " node %s" % (instance.name, src_node))
7225 vgname = self.cfg.GetVGName()
7229 # set the disks ID correctly since call_instance_start needs the
7230 # correct drbd minor to create the symlinks
7231 for disk in instance.disks:
7232 self.cfg.SetDiskID(disk, src_node)
7237 for idx, disk in enumerate(instance.disks):
7238 # result.payload will be a snapshot of an lvm leaf of the one we passed
7239 result = self.rpc.call_blockdev_snapshot(src_node, disk)
7240 msg = result.fail_msg
7242 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7244 snap_disks.append(False)
7246 disk_id = (vgname, result.payload)
7247 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7248 logical_id=disk_id, physical_id=disk_id,
7249 iv_name=disk.iv_name)
7250 snap_disks.append(new_dev)
7253 if self.op.shutdown and instance.admin_up:
7254 result = self.rpc.call_instance_start(src_node, instance, None, None)
7255 msg = result.fail_msg
7257 _ShutdownInstanceDisks(self, instance)
7258 raise errors.OpExecError("Could not start instance: %s" % msg)
7260 # TODO: check for size
7262 cluster_name = self.cfg.GetClusterName()
7263 for idx, dev in enumerate(snap_disks):
7265 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7266 instance, cluster_name, idx)
7267 msg = result.fail_msg
7269 self.LogWarning("Could not export disk/%s from node %s to"
7270 " node %s: %s", idx, src_node, dst_node.name, msg)
7271 dresults.append(False)
7273 dresults.append(True)
7274 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7276 self.LogWarning("Could not remove snapshot for disk/%d from node"
7277 " %s: %s", idx, src_node, msg)
7279 dresults.append(False)
7281 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7283 msg = result.fail_msg
7285 self.LogWarning("Could not finalize export for instance %s"
7286 " on node %s: %s", instance.name, dst_node.name, msg)
7289 nodelist = self.cfg.GetNodeList()
7290 nodelist.remove(dst_node.name)
7292 # on one-node clusters nodelist will be empty after the removal
7293 # if we proceed the backup would be removed because OpQueryExports
7294 # substitutes an empty list with the full cluster node list.
7295 iname = instance.name
7297 exportlist = self.rpc.call_export_list(nodelist)
7298 for node in exportlist:
7299 if exportlist[node].fail_msg:
7301 if iname in exportlist[node].payload:
7302 msg = self.rpc.call_export_remove(node, iname).fail_msg
7304 self.LogWarning("Could not remove older export for instance %s"
7305 " on node %s: %s", iname, node, msg)
7306 return fin_resu, dresults
7309 class LURemoveExport(NoHooksLU):
7310 """Remove exports related to the named instance.
7313 _OP_REQP = ["instance_name"]
7316 def ExpandNames(self):
7317 self.needed_locks = {}
7318 # We need all nodes to be locked in order for RemoveExport to work, but we
7319 # don't need to lock the instance itself, as nothing will happen to it (and
7320 # we can remove exports also for a removed instance)
7321 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7323 def CheckPrereq(self):
7324 """Check prerequisites.
7328 def Exec(self, feedback_fn):
7329 """Remove any export.
7332 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7333 # If the instance was not found we'll try with the name that was passed in.
7334 # This will only work if it was an FQDN, though.
7336 if not instance_name:
7338 instance_name = self.op.instance_name
7340 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7341 exportlist = self.rpc.call_export_list(locked_nodes)
7343 for node in exportlist:
7344 msg = exportlist[node].fail_msg
7346 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7348 if instance_name in exportlist[node].payload:
7350 result = self.rpc.call_export_remove(node, instance_name)
7351 msg = result.fail_msg
7353 logging.error("Could not remove export for instance %s"
7354 " on node %s: %s", instance_name, node, msg)
7356 if fqdn_warn and not found:
7357 feedback_fn("Export not found. If trying to remove an export belonging"
7358 " to a deleted instance please use its Fully Qualified"
7362 class TagsLU(NoHooksLU):
7365 This is an abstract class which is the parent of all the other tags LUs.
7369 def ExpandNames(self):
7370 self.needed_locks = {}
7371 if self.op.kind == constants.TAG_NODE:
7372 name = self.cfg.ExpandNodeName(self.op.name)
7374 raise errors.OpPrereqError("Invalid node name (%s)" %
7377 self.needed_locks[locking.LEVEL_NODE] = name
7378 elif self.op.kind == constants.TAG_INSTANCE:
7379 name = self.cfg.ExpandInstanceName(self.op.name)
7381 raise errors.OpPrereqError("Invalid instance name (%s)" %
7384 self.needed_locks[locking.LEVEL_INSTANCE] = name
7386 def CheckPrereq(self):
7387 """Check prerequisites.
7390 if self.op.kind == constants.TAG_CLUSTER:
7391 self.target = self.cfg.GetClusterInfo()
7392 elif self.op.kind == constants.TAG_NODE:
7393 self.target = self.cfg.GetNodeInfo(self.op.name)
7394 elif self.op.kind == constants.TAG_INSTANCE:
7395 self.target = self.cfg.GetInstanceInfo(self.op.name)
7397 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7401 class LUGetTags(TagsLU):
7402 """Returns the tags of a given object.
7405 _OP_REQP = ["kind", "name"]
7408 def Exec(self, feedback_fn):
7409 """Returns the tag list.
7412 return list(self.target.GetTags())
7415 class LUSearchTags(NoHooksLU):
7416 """Searches the tags for a given pattern.
7419 _OP_REQP = ["pattern"]
7422 def ExpandNames(self):
7423 self.needed_locks = {}
7425 def CheckPrereq(self):
7426 """Check prerequisites.
7428 This checks the pattern passed for validity by compiling it.
7432 self.re = re.compile(self.op.pattern)
7433 except re.error, err:
7434 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
7435 (self.op.pattern, err))
7437 def Exec(self, feedback_fn):
7438 """Returns the tag list.
7442 tgts = [("/cluster", cfg.GetClusterInfo())]
7443 ilist = cfg.GetAllInstancesInfo().values()
7444 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7445 nlist = cfg.GetAllNodesInfo().values()
7446 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7448 for path, target in tgts:
7449 for tag in target.GetTags():
7450 if self.re.search(tag):
7451 results.append((path, tag))
7455 class LUAddTags(TagsLU):
7456 """Sets a tag on a given object.
7459 _OP_REQP = ["kind", "name", "tags"]
7462 def CheckPrereq(self):
7463 """Check prerequisites.
7465 This checks the type and length of the tag name and value.
7468 TagsLU.CheckPrereq(self)
7469 for tag in self.op.tags:
7470 objects.TaggableObject.ValidateTag(tag)
7472 def Exec(self, feedback_fn):
7477 for tag in self.op.tags:
7478 self.target.AddTag(tag)
7479 except errors.TagError, err:
7480 raise errors.OpExecError("Error while setting tag: %s" % str(err))
7482 self.cfg.Update(self.target)
7483 except errors.ConfigurationError:
7484 raise errors.OpRetryError("There has been a modification to the"
7485 " config file and the operation has been"
7486 " aborted. Please retry.")
7489 class LUDelTags(TagsLU):
7490 """Delete a list of tags from a given object.
7493 _OP_REQP = ["kind", "name", "tags"]
7496 def CheckPrereq(self):
7497 """Check prerequisites.
7499 This checks that we have the given tag.
7502 TagsLU.CheckPrereq(self)
7503 for tag in self.op.tags:
7504 objects.TaggableObject.ValidateTag(tag)
7505 del_tags = frozenset(self.op.tags)
7506 cur_tags = self.target.GetTags()
7507 if not del_tags <= cur_tags:
7508 diff_tags = del_tags - cur_tags
7509 diff_names = ["'%s'" % tag for tag in diff_tags]
7511 raise errors.OpPrereqError("Tag(s) %s not found" %
7512 (",".join(diff_names)))
7514 def Exec(self, feedback_fn):
7515 """Remove the tag from the object.
7518 for tag in self.op.tags:
7519 self.target.RemoveTag(tag)
7521 self.cfg.Update(self.target)
7522 except errors.ConfigurationError:
7523 raise errors.OpRetryError("There has been a modification to the"
7524 " config file and the operation has been"
7525 " aborted. Please retry.")
7528 class LUTestDelay(NoHooksLU):
7529 """Sleep for a specified amount of time.
7531 This LU sleeps on the master and/or nodes for a specified amount of
7535 _OP_REQP = ["duration", "on_master", "on_nodes"]
7538 def ExpandNames(self):
7539 """Expand names and set required locks.
7541 This expands the node list, if any.
7544 self.needed_locks = {}
7545 if self.op.on_nodes:
7546 # _GetWantedNodes can be used here, but is not always appropriate to use
7547 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
7549 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
7550 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
7552 def CheckPrereq(self):
7553 """Check prerequisites.
7557 def Exec(self, feedback_fn):
7558 """Do the actual sleep.
7561 if self.op.on_master:
7562 if not utils.TestDelay(self.op.duration):
7563 raise errors.OpExecError("Error during master delay test")
7564 if self.op.on_nodes:
7565 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
7566 for node, node_result in result.items():
7567 node_result.Raise("Failure during rpc call to node %s" % node)
7570 class IAllocator(object):
7571 """IAllocator framework.
7573 An IAllocator instance has three sets of attributes:
7574 - cfg that is needed to query the cluster
7575 - input data (all members of the _KEYS class attribute are required)
7576 - four buffer attributes (in|out_data|text), that represent the
7577 input (to the external script) in text and data structure format,
7578 and the output from it, again in two formats
7579 - the result variables from the script (success, info, nodes) for
7584 "mem_size", "disks", "disk_template",
7585 "os", "tags", "nics", "vcpus", "hypervisor",
7591 def __init__(self, cfg, rpc, mode, name, **kwargs):
7594 # init buffer variables
7595 self.in_text = self.out_text = self.in_data = self.out_data = None
7596 # init all input fields so that pylint is happy
7599 self.mem_size = self.disks = self.disk_template = None
7600 self.os = self.tags = self.nics = self.vcpus = None
7601 self.hypervisor = None
7602 self.relocate_from = None
7604 self.required_nodes = None
7605 # init result fields
7606 self.success = self.info = self.nodes = None
7607 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7608 keyset = self._ALLO_KEYS
7609 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7610 keyset = self._RELO_KEYS
7612 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
7613 " IAllocator" % self.mode)
7615 if key not in keyset:
7616 raise errors.ProgrammerError("Invalid input parameter '%s' to"
7617 " IAllocator" % key)
7618 setattr(self, key, kwargs[key])
7620 if key not in kwargs:
7621 raise errors.ProgrammerError("Missing input parameter '%s' to"
7622 " IAllocator" % key)
7623 self._BuildInputData()
7625 def _ComputeClusterData(self):
7626 """Compute the generic allocator input data.
7628 This is the data that is independent of the actual operation.
7632 cluster_info = cfg.GetClusterInfo()
7635 "version": constants.IALLOCATOR_VERSION,
7636 "cluster_name": cfg.GetClusterName(),
7637 "cluster_tags": list(cluster_info.GetTags()),
7638 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
7639 # we don't have job IDs
7641 iinfo = cfg.GetAllInstancesInfo().values()
7642 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
7646 node_list = cfg.GetNodeList()
7648 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7649 hypervisor_name = self.hypervisor
7650 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7651 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
7653 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
7656 self.rpc.call_all_instances_info(node_list,
7657 cluster_info.enabled_hypervisors)
7658 for nname, nresult in node_data.items():
7659 # first fill in static (config-based) values
7660 ninfo = cfg.GetNodeInfo(nname)
7662 "tags": list(ninfo.GetTags()),
7663 "primary_ip": ninfo.primary_ip,
7664 "secondary_ip": ninfo.secondary_ip,
7665 "offline": ninfo.offline,
7666 "drained": ninfo.drained,
7667 "master_candidate": ninfo.master_candidate,
7670 if not (ninfo.offline or ninfo.drained):
7671 nresult.Raise("Can't get data for node %s" % nname)
7672 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
7674 remote_info = nresult.payload
7676 for attr in ['memory_total', 'memory_free', 'memory_dom0',
7677 'vg_size', 'vg_free', 'cpu_total']:
7678 if attr not in remote_info:
7679 raise errors.OpExecError("Node '%s' didn't return attribute"
7680 " '%s'" % (nname, attr))
7681 if not isinstance(remote_info[attr], int):
7682 raise errors.OpExecError("Node '%s' returned invalid value"
7684 (nname, attr, remote_info[attr]))
7685 # compute memory used by primary instances
7686 i_p_mem = i_p_up_mem = 0
7687 for iinfo, beinfo in i_list:
7688 if iinfo.primary_node == nname:
7689 i_p_mem += beinfo[constants.BE_MEMORY]
7690 if iinfo.name not in node_iinfo[nname].payload:
7693 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
7694 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
7695 remote_info['memory_free'] -= max(0, i_mem_diff)
7698 i_p_up_mem += beinfo[constants.BE_MEMORY]
7700 # compute memory used by instances
7702 "total_memory": remote_info['memory_total'],
7703 "reserved_memory": remote_info['memory_dom0'],
7704 "free_memory": remote_info['memory_free'],
7705 "total_disk": remote_info['vg_size'],
7706 "free_disk": remote_info['vg_free'],
7707 "total_cpus": remote_info['cpu_total'],
7708 "i_pri_memory": i_p_mem,
7709 "i_pri_up_memory": i_p_up_mem,
7713 node_results[nname] = pnr
7714 data["nodes"] = node_results
7718 for iinfo, beinfo in i_list:
7720 for nic in iinfo.nics:
7721 filled_params = objects.FillDict(
7722 cluster_info.nicparams[constants.PP_DEFAULT],
7724 nic_dict = {"mac": nic.mac,
7726 "mode": filled_params[constants.NIC_MODE],
7727 "link": filled_params[constants.NIC_LINK],
7729 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7730 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7731 nic_data.append(nic_dict)
7733 "tags": list(iinfo.GetTags()),
7734 "admin_up": iinfo.admin_up,
7735 "vcpus": beinfo[constants.BE_VCPUS],
7736 "memory": beinfo[constants.BE_MEMORY],
7738 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7740 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7741 "disk_template": iinfo.disk_template,
7742 "hypervisor": iinfo.hypervisor,
7744 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7746 instance_data[iinfo.name] = pir
7748 data["instances"] = instance_data
7752 def _AddNewInstance(self):
7753 """Add new instance data to allocator structure.
7755 This in combination with _AllocatorGetClusterData will create the
7756 correct structure needed as input for the allocator.
7758 The checks for the completeness of the opcode must have already been
7764 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7766 if self.disk_template in constants.DTS_NET_MIRROR:
7767 self.required_nodes = 2
7769 self.required_nodes = 1
7773 "disk_template": self.disk_template,
7776 "vcpus": self.vcpus,
7777 "memory": self.mem_size,
7778 "disks": self.disks,
7779 "disk_space_total": disk_space,
7781 "required_nodes": self.required_nodes,
7783 data["request"] = request
7785 def _AddRelocateInstance(self):
7786 """Add relocate instance data to allocator structure.
7788 This in combination with _IAllocatorGetClusterData will create the
7789 correct structure needed as input for the allocator.
7791 The checks for the completeness of the opcode must have already been
7795 instance = self.cfg.GetInstanceInfo(self.name)
7796 if instance is None:
7797 raise errors.ProgrammerError("Unknown instance '%s' passed to"
7798 " IAllocator" % self.name)
7800 if instance.disk_template not in constants.DTS_NET_MIRROR:
7801 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7803 if len(instance.secondary_nodes) != 1:
7804 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7806 self.required_nodes = 1
7807 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7808 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7813 "disk_space_total": disk_space,
7814 "required_nodes": self.required_nodes,
7815 "relocate_from": self.relocate_from,
7817 self.in_data["request"] = request
7819 def _BuildInputData(self):
7820 """Build input data structures.
7823 self._ComputeClusterData()
7825 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7826 self._AddNewInstance()
7828 self._AddRelocateInstance()
7830 self.in_text = serializer.Dump(self.in_data)
7832 def Run(self, name, validate=True, call_fn=None):
7833 """Run an instance allocator and return the results.
7837 call_fn = self.rpc.call_iallocator_runner
7839 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
7840 result.Raise("Failure while running the iallocator script")
7842 self.out_text = result.payload
7844 self._ValidateResult()
7846 def _ValidateResult(self):
7847 """Process the allocator results.
7849 This will process and if successful save the result in
7850 self.out_data and the other parameters.
7854 rdict = serializer.Load(self.out_text)
7855 except Exception, err:
7856 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7858 if not isinstance(rdict, dict):
7859 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7861 for key in "success", "info", "nodes":
7862 if key not in rdict:
7863 raise errors.OpExecError("Can't parse iallocator results:"
7864 " missing key '%s'" % key)
7865 setattr(self, key, rdict[key])
7867 if not isinstance(rdict["nodes"], list):
7868 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7870 self.out_data = rdict
7873 class LUTestAllocator(NoHooksLU):
7874 """Run allocator tests.
7876 This LU runs the allocator tests
7879 _OP_REQP = ["direction", "mode", "name"]
7881 def CheckPrereq(self):
7882 """Check prerequisites.
7884 This checks the opcode parameters depending on the director and mode test.
7887 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7888 for attr in ["name", "mem_size", "disks", "disk_template",
7889 "os", "tags", "nics", "vcpus"]:
7890 if not hasattr(self.op, attr):
7891 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7893 iname = self.cfg.ExpandInstanceName(self.op.name)
7894 if iname is not None:
7895 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7897 if not isinstance(self.op.nics, list):
7898 raise errors.OpPrereqError("Invalid parameter 'nics'")
7899 for row in self.op.nics:
7900 if (not isinstance(row, dict) or
7903 "bridge" not in row):
7904 raise errors.OpPrereqError("Invalid contents of the"
7905 " 'nics' parameter")
7906 if not isinstance(self.op.disks, list):
7907 raise errors.OpPrereqError("Invalid parameter 'disks'")
7908 for row in self.op.disks:
7909 if (not isinstance(row, dict) or
7910 "size" not in row or
7911 not isinstance(row["size"], int) or
7912 "mode" not in row or
7913 row["mode"] not in ['r', 'w']):
7914 raise errors.OpPrereqError("Invalid contents of the"
7915 " 'disks' parameter")
7916 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7917 self.op.hypervisor = self.cfg.GetHypervisorType()
7918 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7919 if not hasattr(self.op, "name"):
7920 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7921 fname = self.cfg.ExpandInstanceName(self.op.name)
7923 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7925 self.op.name = fname
7926 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7928 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7931 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7932 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7933 raise errors.OpPrereqError("Missing allocator name")
7934 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7935 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7938 def Exec(self, feedback_fn):
7939 """Run the allocator test.
7942 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7943 ial = IAllocator(self.cfg, self.rpc,
7946 mem_size=self.op.mem_size,
7947 disks=self.op.disks,
7948 disk_template=self.op.disk_template,
7952 vcpus=self.op.vcpus,
7953 hypervisor=self.op.hypervisor,
7956 ial = IAllocator(self.cfg, self.rpc,
7959 relocate_from=list(self.relocate_from),
7962 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7963 result = ial.in_text
7965 ial.Run(self.op.allocator, validate=False)
7966 result = ial.out_text