4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 for attr_name in self._OP_REQP:
94 attr_val = getattr(op, attr_name, None)
96 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108 ssh = property(fget=__GetSSH)
110 def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments.
113 This method is for doing a simple syntactic check and ensure
114 validity of opcode parameters, without any cluster-related
115 checks. While the same can be accomplished in ExpandNames and/or
116 CheckPrereq, doing these separate is better because:
118 - ExpandNames is left as as purely a lock-related function
119 - CheckPrereq is run after we have aquired locks (and possible
122 The function is allowed to change the self.op attribute so that
123 later methods can no longer worry about missing parameters.
128 def ExpandNames(self):
129 """Expand names for this LU.
131 This method is called before starting to execute the opcode, and it should
132 update all the parameters of the opcode to their canonical form (e.g. a
133 short node name must be fully expanded after this method has successfully
134 completed). This way locking, hooks, logging, ecc. can work correctly.
136 LUs which implement this method must also populate the self.needed_locks
137 member, as a dict with lock levels as keys, and a list of needed lock names
140 - use an empty dict if you don't need any lock
141 - if you don't need any lock at a particular level omit that level
142 - don't put anything for the BGL level
143 - if you want all locks at a level use locking.ALL_SET as a value
145 If you need to share locks (rather than acquire them exclusively) at one
146 level you can modify self.share_locks, setting a true value (usually 1) for
147 that level. By default locks are not shared.
151 # Acquire all nodes and one instance
152 self.needed_locks = {
153 locking.LEVEL_NODE: locking.ALL_SET,
154 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156 # Acquire just two nodes
157 self.needed_locks = {
158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161 self.needed_locks = {} # No, you can't leave it to the default value None
164 # The implementation of this method is mandatory only if the new LU is
165 # concurrent, so that old LUs don't need to be changed all at the same
168 self.needed_locks = {} # Exclusive LUs don't need locks.
170 raise NotImplementedError
172 def DeclareLocks(self, level):
173 """Declare LU locking needs for a level
175 While most LUs can just declare their locking needs at ExpandNames time,
176 sometimes there's the need to calculate some locks after having acquired
177 the ones before. This function is called just before acquiring locks at a
178 particular level, but after acquiring the ones at lower levels, and permits
179 such calculations. It can be used to modify self.needed_locks, and by
180 default it does nothing.
182 This function is only called if you have something already set in
183 self.needed_locks for the level.
185 @param level: Locking level which is going to be locked
186 @type level: member of ganeti.locking.LEVELS
190 def CheckPrereq(self):
191 """Check prerequisites for this LU.
193 This method should check that the prerequisites for the execution
194 of this LU are fulfilled. It can do internode communication, but
195 it should be idempotent - no cluster or system changes are
198 The method should raise errors.OpPrereqError in case something is
199 not fulfilled. Its return value is ignored.
201 This method should also update all the parameters of the opcode to
202 their canonical form if it hasn't been done by ExpandNames before.
205 raise NotImplementedError
207 def Exec(self, feedback_fn):
210 This method should implement the actual work. It should raise
211 errors.OpExecError for failures that are somewhat dealt with in
215 raise NotImplementedError
217 def BuildHooksEnv(self):
218 """Build hooks environment for this LU.
220 This method should return a three-node tuple consisting of: a dict
221 containing the environment that will be used for running the
222 specific hook for this LU, a list of node names on which the hook
223 should run before the execution, and a list of node names on which
224 the hook should run after the execution.
226 The keys of the dict must not have 'GANETI_' prefixed as this will
227 be handled in the hooks runner. Also note additional keys will be
228 added by the hooks runner. If the LU doesn't define any
229 environment, an empty dict (and not None) should be returned.
231 No nodes should be returned as an empty list (and not None).
233 Note that if the HPATH for a LU class is None, this function will
237 raise NotImplementedError
239 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks.
242 This method is called every time a hooks phase is executed, and notifies
243 the Logical Unit about the hooks' result. The LU can then use it to alter
244 its result based on the hooks. By default the method does nothing and the
245 previous result is passed back unchanged but any LU can define it if it
246 wants to use the local cluster hook-scripts somehow.
248 @param phase: one of L{constants.HOOKS_PHASE_POST} or
249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250 @param hook_results: the results of the multi-node hooks rpc call
251 @param feedback_fn: function used send feedback back to the caller
252 @param lu_result: the previous Exec result this LU had, or None
254 @return: the new Exec result, based on the previous result
260 def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance.
263 Many LUs that work on an instance take its name in self.op.instance_name
264 and need to expand it and then declare the expanded name for locking. This
265 function does it, and then updates self.op.instance_name to the expanded
266 name. It also initializes needed_locks as a dict, if this hasn't been done
270 if self.needed_locks is None:
271 self.needed_locks = {}
273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274 "_ExpandAndLockInstance called with instance-level locks set"
275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276 if expanded_name is None:
277 raise errors.OpPrereqError("Instance '%s' not known" %
278 self.op.instance_name)
279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280 self.op.instance_name = expanded_name
282 def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking.
285 This function should be called after locking one or more instances to lock
286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287 with all primary or secondary nodes for instances already locked and
288 present in self.needed_locks[locking.LEVEL_INSTANCE].
290 It should be called from DeclareLocks, and for safety only works if
291 self.recalculate_locks[locking.LEVEL_NODE] is set.
293 In the future it may grow parameters to just lock some instance's nodes, or
294 to just lock primaries or secondary nodes, if needed.
296 If should be called in DeclareLocks in a way similar to::
298 if level == locking.LEVEL_NODE:
299 self._LockInstancesNodes()
301 @type primary_only: boolean
302 @param primary_only: only lock primary nodes of locked instances
305 assert locking.LEVEL_NODE in self.recalculate_locks, \
306 "_LockInstancesNodes helper function called with no nodes to recalculate"
308 # TODO: check if we're really been called with the instance locks held
310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311 # future we might want to have different behaviors depending on the value
312 # of self.recalculate_locks[locking.LEVEL_NODE]
314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315 instance = self.context.cfg.GetInstanceInfo(instance_name)
316 wanted_nodes.append(instance.primary_node)
318 wanted_nodes.extend(instance.secondary_nodes)
320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325 del self.recalculate_locks[locking.LEVEL_NODE]
328 class NoHooksLU(LogicalUnit):
329 """Simple LU which runs no hooks.
331 This LU is intended as a parent for other LogicalUnits which will
332 run no hooks, in order to reduce duplicate code.
339 def _GetWantedNodes(lu, nodes):
340 """Returns list of checked and expanded node names.
342 @type lu: L{LogicalUnit}
343 @param lu: the logical unit on whose behalf we execute
345 @param nodes: list of node names or None for all nodes
347 @return: the list of nodes, sorted
348 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351 if not isinstance(nodes, list):
352 raise errors.OpPrereqError("Invalid argument type 'nodes'")
355 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356 " non-empty list of nodes whose name is to be expanded.")
360 node = lu.cfg.ExpandNodeName(name)
362 raise errors.OpPrereqError("No such node name '%s'" % name)
365 return utils.NiceSort(wanted)
368 def _GetWantedInstances(lu, instances):
369 """Returns list of checked and expanded instance names.
371 @type lu: L{LogicalUnit}
372 @param lu: the logical unit on whose behalf we execute
373 @type instances: list
374 @param instances: list of instance names or None for all instances
376 @return: the list of instances, sorted
377 @raise errors.OpPrereqError: if the instances parameter is wrong type
378 @raise errors.OpPrereqError: if any of the passed instances is not found
381 if not isinstance(instances, list):
382 raise errors.OpPrereqError("Invalid argument type 'instances'")
387 for name in instances:
388 instance = lu.cfg.ExpandInstanceName(name)
390 raise errors.OpPrereqError("No such instance name '%s'" % name)
391 wanted.append(instance)
394 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398 def _CheckOutputFields(static, dynamic, selected):
399 """Checks whether all selected fields are valid.
401 @type static: L{utils.FieldSet}
402 @param static: static fields set
403 @type dynamic: L{utils.FieldSet}
404 @param dynamic: dynamic fields set
411 delta = f.NonMatching(selected)
413 raise errors.OpPrereqError("Unknown output fields selected: %s"
417 def _CheckBooleanOpField(op, name):
418 """Validates boolean opcode parameters.
420 This will ensure that an opcode parameter is either a boolean value,
421 or None (but that it always exists).
424 val = getattr(op, name, None)
425 if not (val is None or isinstance(val, bool)):
426 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428 setattr(op, name, val)
431 def _CheckNodeOnline(lu, node):
432 """Ensure that a given node is online.
434 @param lu: the LU on behalf of which we make the check
435 @param node: the node to check
436 @raise errors.OpPrereqError: if the node is offline
439 if lu.cfg.GetNodeInfo(node).offline:
440 raise errors.OpPrereqError("Can't use offline node %s" % node)
443 def _CheckNodeNotDrained(lu, node):
444 """Ensure that a given node is not drained.
446 @param lu: the LU on behalf of which we make the check
447 @param node: the node to check
448 @raise errors.OpPrereqError: if the node is drained
451 if lu.cfg.GetNodeInfo(node).drained:
452 raise errors.OpPrereqError("Can't use drained node %s" % node)
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456 memory, vcpus, nics, disk_template, disks):
457 """Builds instance related env variables for hooks
459 This builds the hook environment from individual variables.
462 @param name: the name of the instance
463 @type primary_node: string
464 @param primary_node: the name of the instance's primary node
465 @type secondary_nodes: list
466 @param secondary_nodes: list of secondary nodes as strings
467 @type os_type: string
468 @param os_type: the name of the instance's OS
469 @type status: boolean
470 @param status: the should_run status of the instance
472 @param memory: the memory size of the instance
474 @param vcpus: the count of VCPUs the instance has
476 @param nics: list of tuples (ip, bridge, mac) representing
477 the NICs the instance has
478 @type disk_template: string
479 @param disk_template: the distk template of the instance
481 @param disks: the list of (size, mode) pairs
483 @return: the hook environment for this instance
492 "INSTANCE_NAME": name,
493 "INSTANCE_PRIMARY": primary_node,
494 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495 "INSTANCE_OS_TYPE": os_type,
496 "INSTANCE_STATUS": str_status,
497 "INSTANCE_MEMORY": memory,
498 "INSTANCE_VCPUS": vcpus,
499 "INSTANCE_DISK_TEMPLATE": disk_template,
503 nic_count = len(nics)
504 for idx, (ip, bridge, mac) in enumerate(nics):
507 env["INSTANCE_NIC%d_IP" % idx] = ip
508 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509 env["INSTANCE_NIC%d_MAC" % idx] = mac
513 env["INSTANCE_NIC_COUNT"] = nic_count
516 disk_count = len(disks)
517 for idx, (size, mode) in enumerate(disks):
518 env["INSTANCE_DISK%d_SIZE" % idx] = size
519 env["INSTANCE_DISK%d_MODE" % idx] = mode
523 env["INSTANCE_DISK_COUNT"] = disk_count
528 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529 """Builds instance related env variables for hooks from an object.
531 @type lu: L{LogicalUnit}
532 @param lu: the logical unit on whose behalf we execute
533 @type instance: L{objects.Instance}
534 @param instance: the instance for which we should build the
537 @param override: dictionary with key/values that will override
540 @return: the hook environment dictionary
543 bep = lu.cfg.GetClusterInfo().FillBE(instance)
545 'name': instance.name,
546 'primary_node': instance.primary_node,
547 'secondary_nodes': instance.secondary_nodes,
548 'os_type': instance.os,
549 'status': instance.admin_up,
550 'memory': bep[constants.BE_MEMORY],
551 'vcpus': bep[constants.BE_VCPUS],
552 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553 'disk_template': instance.disk_template,
554 'disks': [(disk.size, disk.mode) for disk in instance.disks],
557 args.update(override)
558 return _BuildInstanceHookEnv(**args)
561 def _AdjustCandidatePool(lu):
562 """Adjust the candidate pool after node operations.
565 mod_list = lu.cfg.MaintainCandidatePool()
567 lu.LogInfo("Promoted nodes to master candidate role: %s",
568 ", ".join(node.name for node in mod_list))
569 for name in mod_list:
570 lu.context.ReaddNode(name)
571 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
573 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
577 def _CheckInstanceBridgesExist(lu, instance):
578 """Check that the brigdes needed by an instance exist.
581 # check bridges existance
582 brlist = [nic.bridge for nic in instance.nics]
583 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
586 raise errors.OpPrereqError("One or more target bridges %s does not"
587 " exist on destination node '%s'" %
588 (brlist, instance.primary_node))
591 class LUDestroyCluster(NoHooksLU):
592 """Logical unit for destroying the cluster.
597 def CheckPrereq(self):
598 """Check prerequisites.
600 This checks whether the cluster is empty.
602 Any errors are signalled by raising errors.OpPrereqError.
605 master = self.cfg.GetMasterNode()
607 nodelist = self.cfg.GetNodeList()
608 if len(nodelist) != 1 or nodelist[0] != master:
609 raise errors.OpPrereqError("There are still %d node(s) in"
610 " this cluster." % (len(nodelist) - 1))
611 instancelist = self.cfg.GetInstanceList()
613 raise errors.OpPrereqError("There are still %d instance(s) in"
614 " this cluster." % len(instancelist))
616 def Exec(self, feedback_fn):
617 """Destroys the cluster.
620 master = self.cfg.GetMasterNode()
621 result = self.rpc.call_node_stop_master(master, False)
624 raise errors.OpExecError("Could not disable the master role")
625 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
626 utils.CreateBackup(priv_key)
627 utils.CreateBackup(pub_key)
631 class LUVerifyCluster(LogicalUnit):
632 """Verifies the cluster status.
635 HPATH = "cluster-verify"
636 HTYPE = constants.HTYPE_CLUSTER
637 _OP_REQP = ["skip_checks"]
640 def ExpandNames(self):
641 self.needed_locks = {
642 locking.LEVEL_NODE: locking.ALL_SET,
643 locking.LEVEL_INSTANCE: locking.ALL_SET,
645 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
647 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
648 node_result, feedback_fn, master_files,
650 """Run multiple tests against a node.
654 - compares ganeti version
655 - checks vg existance and size > 20G
656 - checks config file checksum
657 - checks ssh to other nodes
659 @type nodeinfo: L{objects.Node}
660 @param nodeinfo: the node to check
661 @param file_list: required list of files
662 @param local_cksum: dictionary of local files and their checksums
663 @param node_result: the results from the node
664 @param feedback_fn: function used to accumulate results
665 @param master_files: list of files that only masters should have
666 @param drbd_map: the useddrbd minors for this node, in
667 form of minor: (instance, must_exist) which correspond to instances
668 and their running status
669 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
674 # main result, node_result should be a non-empty dict
675 if not node_result or not isinstance(node_result, dict):
676 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
679 # compares ganeti version
680 local_version = constants.PROTOCOL_VERSION
681 remote_version = node_result.get('version', None)
682 if not (remote_version and isinstance(remote_version, (list, tuple)) and
683 len(remote_version) == 2):
684 feedback_fn(" - ERROR: connection to %s failed" % (node))
687 if local_version != remote_version[0]:
688 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
689 " node %s %s" % (local_version, node, remote_version[0]))
692 # node seems compatible, we can actually try to look into its results
696 # full package version
697 if constants.RELEASE_VERSION != remote_version[1]:
698 feedback_fn(" - WARNING: software version mismatch: master %s,"
700 (constants.RELEASE_VERSION, node, remote_version[1]))
702 # checks vg existence and size > 20G
703 if vg_name is not None:
704 vglist = node_result.get(constants.NV_VGLIST, None)
706 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
710 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
711 constants.MIN_VG_SIZE)
713 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
716 # checks config file checksum
718 remote_cksum = node_result.get(constants.NV_FILELIST, None)
719 if not isinstance(remote_cksum, dict):
721 feedback_fn(" - ERROR: node hasn't returned file checksum data")
723 for file_name in file_list:
724 node_is_mc = nodeinfo.master_candidate
725 must_have_file = file_name not in master_files
726 if file_name not in remote_cksum:
727 if node_is_mc or must_have_file:
729 feedback_fn(" - ERROR: file '%s' missing" % file_name)
730 elif remote_cksum[file_name] != local_cksum[file_name]:
731 if node_is_mc or must_have_file:
733 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
735 # not candidate and this is not a must-have file
737 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
740 # all good, except non-master/non-must have combination
741 if not node_is_mc and not must_have_file:
742 feedback_fn(" - ERROR: file '%s' should not exist on non master"
743 " candidates" % file_name)
747 if constants.NV_NODELIST not in node_result:
749 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
751 if node_result[constants.NV_NODELIST]:
753 for node in node_result[constants.NV_NODELIST]:
754 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
755 (node, node_result[constants.NV_NODELIST][node]))
757 if constants.NV_NODENETTEST not in node_result:
759 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
761 if node_result[constants.NV_NODENETTEST]:
763 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
765 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
766 (node, node_result[constants.NV_NODENETTEST][node]))
768 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769 if isinstance(hyp_result, dict):
770 for hv_name, hv_result in hyp_result.iteritems():
771 if hv_result is not None:
772 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
773 (hv_name, hv_result))
775 # check used drbd list
776 if vg_name is not None:
777 used_minors = node_result.get(constants.NV_DRBDLIST, [])
778 if not isinstance(used_minors, (tuple, list)):
779 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
782 for minor, (iname, must_exist) in drbd_map.items():
783 if minor not in used_minors and must_exist:
784 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
785 " not active" % (minor, iname))
787 for minor in used_minors:
788 if minor not in drbd_map:
789 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
795 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796 node_instance, feedback_fn, n_offline):
797 """Verify an instance.
799 This function checks to see if the required block devices are
800 available on the instance's node.
805 node_current = instanceconfig.primary_node
808 instanceconfig.MapLVsByNode(node_vol_should)
810 for node in node_vol_should:
811 if node in n_offline:
812 # ignore missing volumes on offline nodes
814 for volume in node_vol_should[node]:
815 if node not in node_vol_is or volume not in node_vol_is[node]:
816 feedback_fn(" - ERROR: volume %s missing on node %s" %
820 if instanceconfig.admin_up:
821 if ((node_current not in node_instance or
822 not instance in node_instance[node_current]) and
823 node_current not in n_offline):
824 feedback_fn(" - ERROR: instance %s not running on node %s" %
825 (instance, node_current))
828 for node in node_instance:
829 if (not node == node_current):
830 if instance in node_instance[node]:
831 feedback_fn(" - ERROR: instance %s should not run on node %s" %
837 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838 """Verify if there are any unknown volumes in the cluster.
840 The .os, .swap and backup volumes are ignored. All other volumes are
846 for node in node_vol_is:
847 for volume in node_vol_is[node]:
848 if node not in node_vol_should or volume not in node_vol_should[node]:
849 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
854 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855 """Verify the list of running instances.
857 This checks what instances are running but unknown to the cluster.
861 for node in node_instance:
862 for runninginstance in node_instance[node]:
863 if runninginstance not in instancelist:
864 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
865 (runninginstance, node))
869 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870 """Verify N+1 Memory Resilience.
872 Check that if one single node dies we can still start all the instances it
878 for node, nodeinfo in node_info.iteritems():
879 # This code checks that every node which is now listed as secondary has
880 # enough memory to host all instances it is supposed to should a single
881 # other node in the cluster fail.
882 # FIXME: not ready for failover to an arbitrary node
883 # FIXME: does not support file-backed instances
884 # WARNING: we currently take into account down instances as well as up
885 # ones, considering that even if they're down someone might want to start
886 # them even in the event of a node failure.
887 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
889 for instance in instances:
890 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891 if bep[constants.BE_AUTO_BALANCE]:
892 needed_mem += bep[constants.BE_MEMORY]
893 if nodeinfo['mfree'] < needed_mem:
894 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
895 " failovers should node %s fail" % (node, prinode))
899 def CheckPrereq(self):
900 """Check prerequisites.
902 Transform the list of checks we're going to skip into a set and check that
903 all its members are valid.
906 self.skip_set = frozenset(self.op.skip_checks)
907 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908 raise errors.OpPrereqError("Invalid checks to be skipped specified")
910 def BuildHooksEnv(self):
913 Cluster-Verify hooks just rone in the post phase and their failure makes
914 the output be logged in the verify output and the verification to fail.
917 all_nodes = self.cfg.GetNodeList()
919 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
921 for node in self.cfg.GetAllNodesInfo().values():
922 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
924 return env, [], all_nodes
926 def Exec(self, feedback_fn):
927 """Verify integrity of cluster, performing various test on nodes.
931 feedback_fn("* Verifying global settings")
932 for msg in self.cfg.VerifyConfig():
933 feedback_fn(" - ERROR: %s" % msg)
935 vg_name = self.cfg.GetVGName()
936 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
937 nodelist = utils.NiceSort(self.cfg.GetNodeList())
938 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
939 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
940 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
941 for iname in instancelist)
942 i_non_redundant = [] # Non redundant instances
943 i_non_a_balanced = [] # Non auto-balanced instances
944 n_offline = [] # List of offline nodes
945 n_drained = [] # List of nodes being drained
951 # FIXME: verify OS list
953 master_files = [constants.CLUSTER_CONF_FILE]
955 file_names = ssconf.SimpleStore().GetFileList()
956 file_names.append(constants.SSL_CERT_FILE)
957 file_names.append(constants.RAPI_CERT_FILE)
958 file_names.extend(master_files)
960 local_checksums = utils.FingerprintFiles(file_names)
962 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
963 node_verify_param = {
964 constants.NV_FILELIST: file_names,
965 constants.NV_NODELIST: [node.name for node in nodeinfo
966 if not node.offline],
967 constants.NV_HYPERVISOR: hypervisors,
968 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
969 node.secondary_ip) for node in nodeinfo
970 if not node.offline],
971 constants.NV_INSTANCELIST: hypervisors,
972 constants.NV_VERSION: None,
973 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
975 if vg_name is not None:
976 node_verify_param[constants.NV_VGLIST] = None
977 node_verify_param[constants.NV_LVLIST] = vg_name
978 node_verify_param[constants.NV_DRBDLIST] = None
979 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
980 self.cfg.GetClusterName())
982 cluster = self.cfg.GetClusterInfo()
983 master_node = self.cfg.GetMasterNode()
984 all_drbd_map = self.cfg.ComputeDRBDMap()
986 for node_i in nodeinfo:
988 nresult = all_nvinfo[node].data
991 feedback_fn("* Skipping offline node %s" % (node,))
992 n_offline.append(node)
995 if node == master_node:
997 elif node_i.master_candidate:
998 ntype = "master candidate"
1001 n_drained.append(node)
1004 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1006 if all_nvinfo[node].failed or not isinstance(nresult, dict):
1007 feedback_fn(" - ERROR: connection to %s failed" % (node,))
1012 for minor, instance in all_drbd_map[node].items():
1013 if instance not in instanceinfo:
1014 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1016 # ghost instance should not be running, but otherwise we
1017 # don't give double warnings (both ghost instance and
1018 # unallocated minor in use)
1019 node_drbd[minor] = (instance, False)
1021 instance = instanceinfo[instance]
1022 node_drbd[minor] = (instance.name, instance.admin_up)
1023 result = self._VerifyNode(node_i, file_names, local_checksums,
1024 nresult, feedback_fn, master_files,
1028 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1030 node_volume[node] = {}
1031 elif isinstance(lvdata, basestring):
1032 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1033 (node, utils.SafeEncode(lvdata)))
1035 node_volume[node] = {}
1036 elif not isinstance(lvdata, dict):
1037 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1041 node_volume[node] = lvdata
1044 idata = nresult.get(constants.NV_INSTANCELIST, None)
1045 if not isinstance(idata, list):
1046 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1051 node_instance[node] = idata
1054 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1055 if not isinstance(nodeinfo, dict):
1056 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1062 "mfree": int(nodeinfo['memory_free']),
1065 # dictionary holding all instances this node is secondary for,
1066 # grouped by their primary node. Each key is a cluster node, and each
1067 # value is a list of instances which have the key as primary and the
1068 # current node as secondary. this is handy to calculate N+1 memory
1069 # availability if you can only failover from a primary to its
1071 "sinst-by-pnode": {},
1073 # FIXME: devise a free space model for file based instances as well
1074 if vg_name is not None:
1075 if (constants.NV_VGLIST not in nresult or
1076 vg_name not in nresult[constants.NV_VGLIST]):
1077 feedback_fn(" - ERROR: node %s didn't return data for the"
1078 " volume group '%s' - it is either missing or broken" %
1082 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1083 except (ValueError, KeyError):
1084 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1085 " from node %s" % (node,))
1089 node_vol_should = {}
1091 for instance in instancelist:
1092 feedback_fn("* Verifying instance %s" % instance)
1093 inst_config = instanceinfo[instance]
1094 result = self._VerifyInstance(instance, inst_config, node_volume,
1095 node_instance, feedback_fn, n_offline)
1097 inst_nodes_offline = []
1099 inst_config.MapLVsByNode(node_vol_should)
1101 instance_cfg[instance] = inst_config
1103 pnode = inst_config.primary_node
1104 if pnode in node_info:
1105 node_info[pnode]['pinst'].append(instance)
1106 elif pnode not in n_offline:
1107 feedback_fn(" - ERROR: instance %s, connection to primary node"
1108 " %s failed" % (instance, pnode))
1111 if pnode in n_offline:
1112 inst_nodes_offline.append(pnode)
1114 # If the instance is non-redundant we cannot survive losing its primary
1115 # node, so we are not N+1 compliant. On the other hand we have no disk
1116 # templates with more than one secondary so that situation is not well
1118 # FIXME: does not support file-backed instances
1119 if len(inst_config.secondary_nodes) == 0:
1120 i_non_redundant.append(instance)
1121 elif len(inst_config.secondary_nodes) > 1:
1122 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1125 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1126 i_non_a_balanced.append(instance)
1128 for snode in inst_config.secondary_nodes:
1129 if snode in node_info:
1130 node_info[snode]['sinst'].append(instance)
1131 if pnode not in node_info[snode]['sinst-by-pnode']:
1132 node_info[snode]['sinst-by-pnode'][pnode] = []
1133 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1134 elif snode not in n_offline:
1135 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1136 " %s failed" % (instance, snode))
1138 if snode in n_offline:
1139 inst_nodes_offline.append(snode)
1141 if inst_nodes_offline:
1142 # warn that the instance lives on offline nodes, and set bad=True
1143 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1144 ", ".join(inst_nodes_offline))
1147 feedback_fn("* Verifying orphan volumes")
1148 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1152 feedback_fn("* Verifying remaining instances")
1153 result = self._VerifyOrphanInstances(instancelist, node_instance,
1157 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1158 feedback_fn("* Verifying N+1 Memory redundancy")
1159 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1162 feedback_fn("* Other Notes")
1164 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1165 % len(i_non_redundant))
1167 if i_non_a_balanced:
1168 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1169 % len(i_non_a_balanced))
1172 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1175 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1179 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1180 """Analize the post-hooks' result
1182 This method analyses the hook result, handles it, and sends some
1183 nicely-formatted feedback back to the user.
1185 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1186 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1187 @param hooks_results: the results of the multi-node hooks rpc call
1188 @param feedback_fn: function used send feedback back to the caller
1189 @param lu_result: previous Exec result
1190 @return: the new Exec result, based on the previous result
1194 # We only really run POST phase hooks, and are only interested in
1196 if phase == constants.HOOKS_PHASE_POST:
1197 # Used to change hooks' output to proper indentation
1198 indent_re = re.compile('^', re.M)
1199 feedback_fn("* Hooks Results")
1200 if not hooks_results:
1201 feedback_fn(" - ERROR: general communication failure")
1204 for node_name in hooks_results:
1205 show_node_header = True
1206 res = hooks_results[node_name]
1207 if res.failed or res.data is False or not isinstance(res.data, list):
1209 # no need to warn or set fail return value
1211 feedback_fn(" Communication failure in hooks execution")
1214 for script, hkr, output in res.data:
1215 if hkr == constants.HKR_FAIL:
1216 # The node header is only shown once, if there are
1217 # failing hooks on that node
1218 if show_node_header:
1219 feedback_fn(" Node %s:" % node_name)
1220 show_node_header = False
1221 feedback_fn(" ERROR: Script %s failed, output:" % script)
1222 output = indent_re.sub(' ', output)
1223 feedback_fn("%s" % output)
1229 class LUVerifyDisks(NoHooksLU):
1230 """Verifies the cluster disks status.
1236 def ExpandNames(self):
1237 self.needed_locks = {
1238 locking.LEVEL_NODE: locking.ALL_SET,
1239 locking.LEVEL_INSTANCE: locking.ALL_SET,
1241 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1243 def CheckPrereq(self):
1244 """Check prerequisites.
1246 This has no prerequisites.
1251 def Exec(self, feedback_fn):
1252 """Verify integrity of cluster disks.
1255 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1257 vg_name = self.cfg.GetVGName()
1258 nodes = utils.NiceSort(self.cfg.GetNodeList())
1259 instances = [self.cfg.GetInstanceInfo(name)
1260 for name in self.cfg.GetInstanceList()]
1263 for inst in instances:
1265 if (not inst.admin_up or
1266 inst.disk_template not in constants.DTS_NET_MIRROR):
1268 inst.MapLVsByNode(inst_lvs)
1269 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1270 for node, vol_list in inst_lvs.iteritems():
1271 for vol in vol_list:
1272 nv_dict[(node, vol)] = inst
1277 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1282 lvs = node_lvs[node]
1285 self.LogWarning("Connection to node %s failed: %s" %
1289 if isinstance(lvs, basestring):
1290 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1291 res_nlvm[node] = lvs
1293 elif not isinstance(lvs, dict):
1294 logging.warning("Connection to node %s failed or invalid data"
1296 res_nodes.append(node)
1299 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1300 inst = nv_dict.pop((node, lv_name), None)
1301 if (not lv_online and inst is not None
1302 and inst.name not in res_instances):
1303 res_instances.append(inst.name)
1305 # any leftover items in nv_dict are missing LVs, let's arrange the
1307 for key, inst in nv_dict.iteritems():
1308 if inst.name not in res_missing:
1309 res_missing[inst.name] = []
1310 res_missing[inst.name].append(key)
1315 class LURenameCluster(LogicalUnit):
1316 """Rename the cluster.
1319 HPATH = "cluster-rename"
1320 HTYPE = constants.HTYPE_CLUSTER
1323 def BuildHooksEnv(self):
1328 "OP_TARGET": self.cfg.GetClusterName(),
1329 "NEW_NAME": self.op.name,
1331 mn = self.cfg.GetMasterNode()
1332 return env, [mn], [mn]
1334 def CheckPrereq(self):
1335 """Verify that the passed name is a valid one.
1338 hostname = utils.HostInfo(self.op.name)
1340 new_name = hostname.name
1341 self.ip = new_ip = hostname.ip
1342 old_name = self.cfg.GetClusterName()
1343 old_ip = self.cfg.GetMasterIP()
1344 if new_name == old_name and new_ip == old_ip:
1345 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1346 " cluster has changed")
1347 if new_ip != old_ip:
1348 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1349 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1350 " reachable on the network. Aborting." %
1353 self.op.name = new_name
1355 def Exec(self, feedback_fn):
1356 """Rename the cluster.
1359 clustername = self.op.name
1362 # shutdown the master IP
1363 master = self.cfg.GetMasterNode()
1364 result = self.rpc.call_node_stop_master(master, False)
1365 if result.failed or not result.data:
1366 raise errors.OpExecError("Could not disable the master role")
1369 cluster = self.cfg.GetClusterInfo()
1370 cluster.cluster_name = clustername
1371 cluster.master_ip = ip
1372 self.cfg.Update(cluster)
1374 # update the known hosts file
1375 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1376 node_list = self.cfg.GetNodeList()
1378 node_list.remove(master)
1381 result = self.rpc.call_upload_file(node_list,
1382 constants.SSH_KNOWN_HOSTS_FILE)
1383 for to_node, to_result in result.iteritems():
1384 msg = to_result.RemoteFailMsg()
1386 msg = ("Copy of file %s to node %s failed: %s" %
1387 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1388 self.proc.LogWarning(msg)
1391 result = self.rpc.call_node_start_master(master, False)
1392 if result.failed or not result.data:
1393 self.LogWarning("Could not re-enable the master role on"
1394 " the master, please restart manually.")
1397 def _RecursiveCheckIfLVMBased(disk):
1398 """Check if the given disk or its children are lvm-based.
1400 @type disk: L{objects.Disk}
1401 @param disk: the disk to check
1403 @return: boolean indicating whether a LD_LV dev_type was found or not
1407 for chdisk in disk.children:
1408 if _RecursiveCheckIfLVMBased(chdisk):
1410 return disk.dev_type == constants.LD_LV
1413 class LUSetClusterParams(LogicalUnit):
1414 """Change the parameters of the cluster.
1417 HPATH = "cluster-modify"
1418 HTYPE = constants.HTYPE_CLUSTER
1422 def CheckArguments(self):
1426 if not hasattr(self.op, "candidate_pool_size"):
1427 self.op.candidate_pool_size = None
1428 if self.op.candidate_pool_size is not None:
1430 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1431 except (ValueError, TypeError), err:
1432 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1434 if self.op.candidate_pool_size < 1:
1435 raise errors.OpPrereqError("At least one master candidate needed")
1437 def ExpandNames(self):
1438 # FIXME: in the future maybe other cluster params won't require checking on
1439 # all nodes to be modified.
1440 self.needed_locks = {
1441 locking.LEVEL_NODE: locking.ALL_SET,
1443 self.share_locks[locking.LEVEL_NODE] = 1
1445 def BuildHooksEnv(self):
1450 "OP_TARGET": self.cfg.GetClusterName(),
1451 "NEW_VG_NAME": self.op.vg_name,
1453 mn = self.cfg.GetMasterNode()
1454 return env, [mn], [mn]
1456 def CheckPrereq(self):
1457 """Check prerequisites.
1459 This checks whether the given params don't conflict and
1460 if the given volume group is valid.
1463 if self.op.vg_name is not None and not self.op.vg_name:
1464 instances = self.cfg.GetAllInstancesInfo().values()
1465 for inst in instances:
1466 for disk in inst.disks:
1467 if _RecursiveCheckIfLVMBased(disk):
1468 raise errors.OpPrereqError("Cannot disable lvm storage while"
1469 " lvm-based instances exist")
1471 node_list = self.acquired_locks[locking.LEVEL_NODE]
1473 # if vg_name not None, checks given volume group on all nodes
1475 vglist = self.rpc.call_vg_list(node_list)
1476 for node in node_list:
1477 if vglist[node].failed:
1478 # ignoring down node
1479 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1481 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1483 constants.MIN_VG_SIZE)
1485 raise errors.OpPrereqError("Error on node '%s': %s" %
1488 self.cluster = cluster = self.cfg.GetClusterInfo()
1489 # validate params changes
1490 if self.op.beparams:
1491 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1492 self.new_beparams = objects.FillDict(
1493 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1495 if self.op.nicparams:
1496 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1497 self.new_nicparams = objects.FillDict(
1498 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1499 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1501 # hypervisor list/parameters
1502 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1503 if self.op.hvparams:
1504 if not isinstance(self.op.hvparams, dict):
1505 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1506 for hv_name, hv_dict in self.op.hvparams.items():
1507 if hv_name not in self.new_hvparams:
1508 self.new_hvparams[hv_name] = hv_dict
1510 self.new_hvparams[hv_name].update(hv_dict)
1512 if self.op.enabled_hypervisors is not None:
1513 self.hv_list = self.op.enabled_hypervisors
1515 self.hv_list = cluster.enabled_hypervisors
1517 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1518 # either the enabled list has changed, or the parameters have, validate
1519 for hv_name, hv_params in self.new_hvparams.items():
1520 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1521 (self.op.enabled_hypervisors and
1522 hv_name in self.op.enabled_hypervisors)):
1523 # either this is a new hypervisor, or its parameters have changed
1524 hv_class = hypervisor.GetHypervisor(hv_name)
1525 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1526 hv_class.CheckParameterSyntax(hv_params)
1527 _CheckHVParams(self, node_list, hv_name, hv_params)
1529 def Exec(self, feedback_fn):
1530 """Change the parameters of the cluster.
1533 if self.op.vg_name is not None:
1534 new_volume = self.op.vg_name
1537 if new_volume != self.cfg.GetVGName():
1538 self.cfg.SetVGName(new_volume)
1540 feedback_fn("Cluster LVM configuration already in desired"
1541 " state, not changing")
1542 if self.op.hvparams:
1543 self.cluster.hvparams = self.new_hvparams
1544 if self.op.enabled_hypervisors is not None:
1545 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1546 if self.op.beparams:
1547 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1548 if self.op.nicparams:
1549 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1551 if self.op.candidate_pool_size is not None:
1552 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1554 self.cfg.Update(self.cluster)
1556 # we want to update nodes after the cluster so that if any errors
1557 # happen, we have recorded and saved the cluster info
1558 if self.op.candidate_pool_size is not None:
1559 _AdjustCandidatePool(self)
1562 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1563 """Distribute additional files which are part of the cluster configuration.
1565 ConfigWriter takes care of distributing the config and ssconf files, but
1566 there are more files which should be distributed to all nodes. This function
1567 makes sure those are copied.
1569 @param lu: calling logical unit
1570 @param additional_nodes: list of nodes not in the config to distribute to
1573 # 1. Gather target nodes
1574 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1575 dist_nodes = lu.cfg.GetNodeList()
1576 if additional_nodes is not None:
1577 dist_nodes.extend(additional_nodes)
1578 if myself.name in dist_nodes:
1579 dist_nodes.remove(myself.name)
1580 # 2. Gather files to distribute
1581 dist_files = set([constants.ETC_HOSTS,
1582 constants.SSH_KNOWN_HOSTS_FILE,
1583 constants.RAPI_CERT_FILE,
1584 constants.RAPI_USERS_FILE,
1587 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1588 for hv_name in enabled_hypervisors:
1589 hv_class = hypervisor.GetHypervisor(hv_name)
1590 dist_files.update(hv_class.GetAncillaryFiles())
1592 # 3. Perform the files upload
1593 for fname in dist_files:
1594 if os.path.exists(fname):
1595 result = lu.rpc.call_upload_file(dist_nodes, fname)
1596 for to_node, to_result in result.items():
1597 msg = to_result.RemoteFailMsg()
1599 msg = ("Copy of file %s to node %s failed: %s" %
1600 (fname, to_node, msg))
1601 lu.proc.LogWarning(msg)
1604 class LURedistributeConfig(NoHooksLU):
1605 """Force the redistribution of cluster configuration.
1607 This is a very simple LU.
1613 def ExpandNames(self):
1614 self.needed_locks = {
1615 locking.LEVEL_NODE: locking.ALL_SET,
1617 self.share_locks[locking.LEVEL_NODE] = 1
1619 def CheckPrereq(self):
1620 """Check prerequisites.
1624 def Exec(self, feedback_fn):
1625 """Redistribute the configuration.
1628 self.cfg.Update(self.cfg.GetClusterInfo())
1629 _RedistributeAncillaryFiles(self)
1632 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1633 """Sleep and poll for an instance's disk to sync.
1636 if not instance.disks:
1640 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1642 node = instance.primary_node
1644 for dev in instance.disks:
1645 lu.cfg.SetDiskID(dev, node)
1651 cumul_degraded = False
1652 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1653 if rstats.failed or not rstats.data:
1654 lu.LogWarning("Can't get any data from node %s", node)
1657 raise errors.RemoteError("Can't contact node %s for mirror data,"
1658 " aborting." % node)
1661 rstats = rstats.data
1663 for i, mstat in enumerate(rstats):
1665 lu.LogWarning("Can't compute data for node %s/%s",
1666 node, instance.disks[i].iv_name)
1668 # we ignore the ldisk parameter
1669 perc_done, est_time, is_degraded, _ = mstat
1670 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1671 if perc_done is not None:
1673 if est_time is not None:
1674 rem_time = "%d estimated seconds remaining" % est_time
1677 rem_time = "no time estimate"
1678 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1679 (instance.disks[i].iv_name, perc_done, rem_time))
1683 time.sleep(min(60, max_time))
1686 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1687 return not cumul_degraded
1690 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1691 """Check that mirrors are not degraded.
1693 The ldisk parameter, if True, will change the test from the
1694 is_degraded attribute (which represents overall non-ok status for
1695 the device(s)) to the ldisk (representing the local storage status).
1698 lu.cfg.SetDiskID(dev, node)
1705 if on_primary or dev.AssembleOnSecondary():
1706 rstats = lu.rpc.call_blockdev_find(node, dev)
1707 msg = rstats.RemoteFailMsg()
1709 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1711 elif not rstats.payload:
1712 lu.LogWarning("Can't find disk on node %s", node)
1715 result = result and (not rstats.payload[idx])
1717 for child in dev.children:
1718 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1723 class LUDiagnoseOS(NoHooksLU):
1724 """Logical unit for OS diagnose/query.
1727 _OP_REQP = ["output_fields", "names"]
1729 _FIELDS_STATIC = utils.FieldSet()
1730 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1732 def ExpandNames(self):
1734 raise errors.OpPrereqError("Selective OS query not supported")
1736 _CheckOutputFields(static=self._FIELDS_STATIC,
1737 dynamic=self._FIELDS_DYNAMIC,
1738 selected=self.op.output_fields)
1740 # Lock all nodes, in shared mode
1741 # Temporary removal of locks, should be reverted later
1742 # TODO: reintroduce locks when they are lighter-weight
1743 self.needed_locks = {}
1744 #self.share_locks[locking.LEVEL_NODE] = 1
1745 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1747 def CheckPrereq(self):
1748 """Check prerequisites.
1753 def _DiagnoseByOS(node_list, rlist):
1754 """Remaps a per-node return list into an a per-os per-node dictionary
1756 @param node_list: a list with the names of all nodes
1757 @param rlist: a map with node names as keys and OS objects as values
1760 @return: a dictionary with osnames as keys and as value another map, with
1761 nodes as keys and list of OS objects as values, eg::
1763 {"debian-etch": {"node1": [<object>,...],
1764 "node2": [<object>,]}
1769 # we build here the list of nodes that didn't fail the RPC (at RPC
1770 # level), so that nodes with a non-responding node daemon don't
1771 # make all OSes invalid
1772 good_nodes = [node_name for node_name in rlist
1773 if not rlist[node_name].failed]
1774 for node_name, nr in rlist.iteritems():
1775 if nr.failed or not nr.data:
1777 for os_obj in nr.data:
1778 if os_obj.name not in all_os:
1779 # build a list of nodes for this os containing empty lists
1780 # for each node in node_list
1781 all_os[os_obj.name] = {}
1782 for nname in good_nodes:
1783 all_os[os_obj.name][nname] = []
1784 all_os[os_obj.name][node_name].append(os_obj)
1787 def Exec(self, feedback_fn):
1788 """Compute the list of OSes.
1791 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1792 node_data = self.rpc.call_os_diagnose(valid_nodes)
1793 if node_data == False:
1794 raise errors.OpExecError("Can't gather the list of OSes")
1795 pol = self._DiagnoseByOS(valid_nodes, node_data)
1797 for os_name, os_data in pol.iteritems():
1799 for field in self.op.output_fields:
1802 elif field == "valid":
1803 val = utils.all([osl and osl[0] for osl in os_data.values()])
1804 elif field == "node_status":
1806 for node_name, nos_list in os_data.iteritems():
1807 val[node_name] = [(v.status, v.path) for v in nos_list]
1809 raise errors.ParameterError(field)
1816 class LURemoveNode(LogicalUnit):
1817 """Logical unit for removing a node.
1820 HPATH = "node-remove"
1821 HTYPE = constants.HTYPE_NODE
1822 _OP_REQP = ["node_name"]
1824 def BuildHooksEnv(self):
1827 This doesn't run on the target node in the pre phase as a failed
1828 node would then be impossible to remove.
1832 "OP_TARGET": self.op.node_name,
1833 "NODE_NAME": self.op.node_name,
1835 all_nodes = self.cfg.GetNodeList()
1836 all_nodes.remove(self.op.node_name)
1837 return env, all_nodes, all_nodes
1839 def CheckPrereq(self):
1840 """Check prerequisites.
1843 - the node exists in the configuration
1844 - it does not have primary or secondary instances
1845 - it's not the master
1847 Any errors are signalled by raising errors.OpPrereqError.
1850 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1852 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1854 instance_list = self.cfg.GetInstanceList()
1856 masternode = self.cfg.GetMasterNode()
1857 if node.name == masternode:
1858 raise errors.OpPrereqError("Node is the master node,"
1859 " you need to failover first.")
1861 for instance_name in instance_list:
1862 instance = self.cfg.GetInstanceInfo(instance_name)
1863 if node.name in instance.all_nodes:
1864 raise errors.OpPrereqError("Instance %s is still running on the node,"
1865 " please remove first." % instance_name)
1866 self.op.node_name = node.name
1869 def Exec(self, feedback_fn):
1870 """Removes the node from the cluster.
1874 logging.info("Stopping the node daemon and removing configs from node %s",
1877 self.context.RemoveNode(node.name)
1879 self.rpc.call_node_leave_cluster(node.name)
1881 # Promote nodes to master candidate as needed
1882 _AdjustCandidatePool(self)
1885 class LUQueryNodes(NoHooksLU):
1886 """Logical unit for querying nodes.
1889 _OP_REQP = ["output_fields", "names", "use_locking"]
1891 _FIELDS_DYNAMIC = utils.FieldSet(
1893 "mtotal", "mnode", "mfree",
1895 "ctotal", "cnodes", "csockets",
1898 _FIELDS_STATIC = utils.FieldSet(
1899 "name", "pinst_cnt", "sinst_cnt",
1900 "pinst_list", "sinst_list",
1901 "pip", "sip", "tags",
1909 def ExpandNames(self):
1910 _CheckOutputFields(static=self._FIELDS_STATIC,
1911 dynamic=self._FIELDS_DYNAMIC,
1912 selected=self.op.output_fields)
1914 self.needed_locks = {}
1915 self.share_locks[locking.LEVEL_NODE] = 1
1918 self.wanted = _GetWantedNodes(self, self.op.names)
1920 self.wanted = locking.ALL_SET
1922 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1923 self.do_locking = self.do_node_query and self.op.use_locking
1925 # if we don't request only static fields, we need to lock the nodes
1926 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1929 def CheckPrereq(self):
1930 """Check prerequisites.
1933 # The validation of the node list is done in the _GetWantedNodes,
1934 # if non empty, and if empty, there's no validation to do
1937 def Exec(self, feedback_fn):
1938 """Computes the list of nodes and their attributes.
1941 all_info = self.cfg.GetAllNodesInfo()
1943 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1944 elif self.wanted != locking.ALL_SET:
1945 nodenames = self.wanted
1946 missing = set(nodenames).difference(all_info.keys())
1948 raise errors.OpExecError(
1949 "Some nodes were removed before retrieving their data: %s" % missing)
1951 nodenames = all_info.keys()
1953 nodenames = utils.NiceSort(nodenames)
1954 nodelist = [all_info[name] for name in nodenames]
1956 # begin data gathering
1958 if self.do_node_query:
1960 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1961 self.cfg.GetHypervisorType())
1962 for name in nodenames:
1963 nodeinfo = node_data[name]
1964 if not nodeinfo.failed and nodeinfo.data:
1965 nodeinfo = nodeinfo.data
1966 fn = utils.TryConvert
1968 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1969 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1970 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1971 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1972 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1973 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1974 "bootid": nodeinfo.get('bootid', None),
1975 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1976 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1979 live_data[name] = {}
1981 live_data = dict.fromkeys(nodenames, {})
1983 node_to_primary = dict([(name, set()) for name in nodenames])
1984 node_to_secondary = dict([(name, set()) for name in nodenames])
1986 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1987 "sinst_cnt", "sinst_list"))
1988 if inst_fields & frozenset(self.op.output_fields):
1989 instancelist = self.cfg.GetInstanceList()
1991 for instance_name in instancelist:
1992 inst = self.cfg.GetInstanceInfo(instance_name)
1993 if inst.primary_node in node_to_primary:
1994 node_to_primary[inst.primary_node].add(inst.name)
1995 for secnode in inst.secondary_nodes:
1996 if secnode in node_to_secondary:
1997 node_to_secondary[secnode].add(inst.name)
1999 master_node = self.cfg.GetMasterNode()
2001 # end data gathering
2004 for node in nodelist:
2006 for field in self.op.output_fields:
2009 elif field == "pinst_list":
2010 val = list(node_to_primary[node.name])
2011 elif field == "sinst_list":
2012 val = list(node_to_secondary[node.name])
2013 elif field == "pinst_cnt":
2014 val = len(node_to_primary[node.name])
2015 elif field == "sinst_cnt":
2016 val = len(node_to_secondary[node.name])
2017 elif field == "pip":
2018 val = node.primary_ip
2019 elif field == "sip":
2020 val = node.secondary_ip
2021 elif field == "tags":
2022 val = list(node.GetTags())
2023 elif field == "serial_no":
2024 val = node.serial_no
2025 elif field == "master_candidate":
2026 val = node.master_candidate
2027 elif field == "master":
2028 val = node.name == master_node
2029 elif field == "offline":
2031 elif field == "drained":
2033 elif self._FIELDS_DYNAMIC.Matches(field):
2034 val = live_data[node.name].get(field, None)
2036 raise errors.ParameterError(field)
2037 node_output.append(val)
2038 output.append(node_output)
2043 class LUQueryNodeVolumes(NoHooksLU):
2044 """Logical unit for getting volumes on node(s).
2047 _OP_REQP = ["nodes", "output_fields"]
2049 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2050 _FIELDS_STATIC = utils.FieldSet("node")
2052 def ExpandNames(self):
2053 _CheckOutputFields(static=self._FIELDS_STATIC,
2054 dynamic=self._FIELDS_DYNAMIC,
2055 selected=self.op.output_fields)
2057 self.needed_locks = {}
2058 self.share_locks[locking.LEVEL_NODE] = 1
2059 if not self.op.nodes:
2060 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2062 self.needed_locks[locking.LEVEL_NODE] = \
2063 _GetWantedNodes(self, self.op.nodes)
2065 def CheckPrereq(self):
2066 """Check prerequisites.
2068 This checks that the fields required are valid output fields.
2071 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2073 def Exec(self, feedback_fn):
2074 """Computes the list of nodes and their attributes.
2077 nodenames = self.nodes
2078 volumes = self.rpc.call_node_volumes(nodenames)
2080 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2081 in self.cfg.GetInstanceList()]
2083 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2086 for node in nodenames:
2087 if node not in volumes or volumes[node].failed or not volumes[node].data:
2090 node_vols = volumes[node].data[:]
2091 node_vols.sort(key=lambda vol: vol['dev'])
2093 for vol in node_vols:
2095 for field in self.op.output_fields:
2098 elif field == "phys":
2102 elif field == "name":
2104 elif field == "size":
2105 val = int(float(vol['size']))
2106 elif field == "instance":
2108 if node not in lv_by_node[inst]:
2110 if vol['name'] in lv_by_node[inst][node]:
2116 raise errors.ParameterError(field)
2117 node_output.append(str(val))
2119 output.append(node_output)
2124 class LUAddNode(LogicalUnit):
2125 """Logical unit for adding node to the cluster.
2129 HTYPE = constants.HTYPE_NODE
2130 _OP_REQP = ["node_name"]
2132 def BuildHooksEnv(self):
2135 This will run on all nodes before, and on all nodes + the new node after.
2139 "OP_TARGET": self.op.node_name,
2140 "NODE_NAME": self.op.node_name,
2141 "NODE_PIP": self.op.primary_ip,
2142 "NODE_SIP": self.op.secondary_ip,
2144 nodes_0 = self.cfg.GetNodeList()
2145 nodes_1 = nodes_0 + [self.op.node_name, ]
2146 return env, nodes_0, nodes_1
2148 def CheckPrereq(self):
2149 """Check prerequisites.
2152 - the new node is not already in the config
2154 - its parameters (single/dual homed) matches the cluster
2156 Any errors are signalled by raising errors.OpPrereqError.
2159 node_name = self.op.node_name
2162 dns_data = utils.HostInfo(node_name)
2164 node = dns_data.name
2165 primary_ip = self.op.primary_ip = dns_data.ip
2166 secondary_ip = getattr(self.op, "secondary_ip", None)
2167 if secondary_ip is None:
2168 secondary_ip = primary_ip
2169 if not utils.IsValidIP(secondary_ip):
2170 raise errors.OpPrereqError("Invalid secondary IP given")
2171 self.op.secondary_ip = secondary_ip
2173 node_list = cfg.GetNodeList()
2174 if not self.op.readd and node in node_list:
2175 raise errors.OpPrereqError("Node %s is already in the configuration" %
2177 elif self.op.readd and node not in node_list:
2178 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2180 for existing_node_name in node_list:
2181 existing_node = cfg.GetNodeInfo(existing_node_name)
2183 if self.op.readd and node == existing_node_name:
2184 if (existing_node.primary_ip != primary_ip or
2185 existing_node.secondary_ip != secondary_ip):
2186 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2187 " address configuration as before")
2190 if (existing_node.primary_ip == primary_ip or
2191 existing_node.secondary_ip == primary_ip or
2192 existing_node.primary_ip == secondary_ip or
2193 existing_node.secondary_ip == secondary_ip):
2194 raise errors.OpPrereqError("New node ip address(es) conflict with"
2195 " existing node %s" % existing_node.name)
2197 # check that the type of the node (single versus dual homed) is the
2198 # same as for the master
2199 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2200 master_singlehomed = myself.secondary_ip == myself.primary_ip
2201 newbie_singlehomed = secondary_ip == primary_ip
2202 if master_singlehomed != newbie_singlehomed:
2203 if master_singlehomed:
2204 raise errors.OpPrereqError("The master has no private ip but the"
2205 " new node has one")
2207 raise errors.OpPrereqError("The master has a private ip but the"
2208 " new node doesn't have one")
2210 # checks reachablity
2211 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2212 raise errors.OpPrereqError("Node not reachable by ping")
2214 if not newbie_singlehomed:
2215 # check reachability from my secondary ip to newbie's secondary ip
2216 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2217 source=myself.secondary_ip):
2218 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2219 " based ping to noded port")
2221 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2222 mc_now, _ = self.cfg.GetMasterCandidateStats()
2223 master_candidate = mc_now < cp_size
2225 self.new_node = objects.Node(name=node,
2226 primary_ip=primary_ip,
2227 secondary_ip=secondary_ip,
2228 master_candidate=master_candidate,
2229 offline=False, drained=False)
2231 def Exec(self, feedback_fn):
2232 """Adds the new node to the cluster.
2235 new_node = self.new_node
2236 node = new_node.name
2238 # check connectivity
2239 result = self.rpc.call_version([node])[node]
2242 if constants.PROTOCOL_VERSION == result.data:
2243 logging.info("Communication to node %s fine, sw version %s match",
2246 raise errors.OpExecError("Version mismatch master version %s,"
2247 " node version %s" %
2248 (constants.PROTOCOL_VERSION, result.data))
2250 raise errors.OpExecError("Cannot get version from the new node")
2253 logging.info("Copy ssh key to node %s", node)
2254 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2256 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2257 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2263 keyarray.append(f.read())
2267 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2269 keyarray[3], keyarray[4], keyarray[5])
2271 msg = result.RemoteFailMsg()
2273 raise errors.OpExecError("Cannot transfer ssh keys to the"
2274 " new node: %s" % msg)
2276 # Add node to our /etc/hosts, and add key to known_hosts
2277 if self.cfg.GetClusterInfo().modify_etc_hosts:
2278 utils.AddHostToEtcHosts(new_node.name)
2280 if new_node.secondary_ip != new_node.primary_ip:
2281 result = self.rpc.call_node_has_ip_address(new_node.name,
2282 new_node.secondary_ip)
2283 if result.failed or not result.data:
2284 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2285 " you gave (%s). Please fix and re-run this"
2286 " command." % new_node.secondary_ip)
2288 node_verify_list = [self.cfg.GetMasterNode()]
2289 node_verify_param = {
2291 # TODO: do a node-net-test as well?
2294 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2295 self.cfg.GetClusterName())
2296 for verifier in node_verify_list:
2297 if result[verifier].failed or not result[verifier].data:
2298 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2299 " for remote verification" % verifier)
2300 if result[verifier].data['nodelist']:
2301 for failed in result[verifier].data['nodelist']:
2302 feedback_fn("ssh/hostname verification failed %s -> %s" %
2303 (verifier, result[verifier].data['nodelist'][failed]))
2304 raise errors.OpExecError("ssh/hostname verification failed.")
2307 _RedistributeAncillaryFiles(self)
2308 self.context.ReaddNode(new_node)
2310 _RedistributeAncillaryFiles(self, additional_nodes=[node])
2311 self.context.AddNode(new_node)
2314 class LUSetNodeParams(LogicalUnit):
2315 """Modifies the parameters of a node.
2318 HPATH = "node-modify"
2319 HTYPE = constants.HTYPE_NODE
2320 _OP_REQP = ["node_name"]
2323 def CheckArguments(self):
2324 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2325 if node_name is None:
2326 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2327 self.op.node_name = node_name
2328 _CheckBooleanOpField(self.op, 'master_candidate')
2329 _CheckBooleanOpField(self.op, 'offline')
2330 _CheckBooleanOpField(self.op, 'drained')
2331 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2332 if all_mods.count(None) == 3:
2333 raise errors.OpPrereqError("Please pass at least one modification")
2334 if all_mods.count(True) > 1:
2335 raise errors.OpPrereqError("Can't set the node into more than one"
2336 " state at the same time")
2338 def ExpandNames(self):
2339 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2341 def BuildHooksEnv(self):
2344 This runs on the master node.
2348 "OP_TARGET": self.op.node_name,
2349 "MASTER_CANDIDATE": str(self.op.master_candidate),
2350 "OFFLINE": str(self.op.offline),
2351 "DRAINED": str(self.op.drained),
2353 nl = [self.cfg.GetMasterNode(),
2357 def CheckPrereq(self):
2358 """Check prerequisites.
2360 This only checks the instance list against the existing names.
2363 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2365 if ((self.op.master_candidate == False or self.op.offline == True or
2366 self.op.drained == True) and node.master_candidate):
2367 # we will demote the node from master_candidate
2368 if self.op.node_name == self.cfg.GetMasterNode():
2369 raise errors.OpPrereqError("The master node has to be a"
2370 " master candidate, online and not drained")
2371 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2372 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2373 if num_candidates <= cp_size:
2374 msg = ("Not enough master candidates (desired"
2375 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2377 self.LogWarning(msg)
2379 raise errors.OpPrereqError(msg)
2381 if (self.op.master_candidate == True and
2382 ((node.offline and not self.op.offline == False) or
2383 (node.drained and not self.op.drained == False))):
2384 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2385 " to master_candidate" % node.name)
2389 def Exec(self, feedback_fn):
2398 if self.op.offline is not None:
2399 node.offline = self.op.offline
2400 result.append(("offline", str(self.op.offline)))
2401 if self.op.offline == True:
2402 if node.master_candidate:
2403 node.master_candidate = False
2405 result.append(("master_candidate", "auto-demotion due to offline"))
2407 node.drained = False
2408 result.append(("drained", "clear drained status due to offline"))
2410 if self.op.master_candidate is not None:
2411 node.master_candidate = self.op.master_candidate
2413 result.append(("master_candidate", str(self.op.master_candidate)))
2414 if self.op.master_candidate == False:
2415 rrc = self.rpc.call_node_demote_from_mc(node.name)
2416 msg = rrc.RemoteFailMsg()
2418 self.LogWarning("Node failed to demote itself: %s" % msg)
2420 if self.op.drained is not None:
2421 node.drained = self.op.drained
2422 result.append(("drained", str(self.op.drained)))
2423 if self.op.drained == True:
2424 if node.master_candidate:
2425 node.master_candidate = False
2427 result.append(("master_candidate", "auto-demotion due to drain"))
2429 node.offline = False
2430 result.append(("offline", "clear offline status due to drain"))
2432 # this will trigger configuration file update, if needed
2433 self.cfg.Update(node)
2434 # this will trigger job queue propagation or cleanup
2436 self.context.ReaddNode(node)
2441 class LUPowercycleNode(NoHooksLU):
2442 """Powercycles a node.
2445 _OP_REQP = ["node_name", "force"]
2448 def CheckArguments(self):
2449 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2450 if node_name is None:
2451 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2452 self.op.node_name = node_name
2453 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2454 raise errors.OpPrereqError("The node is the master and the force"
2455 " parameter was not set")
2457 def ExpandNames(self):
2458 """Locking for PowercycleNode.
2460 This is a last-resource option and shouldn't block on other
2461 jobs. Therefore, we grab no locks.
2464 self.needed_locks = {}
2466 def CheckPrereq(self):
2467 """Check prerequisites.
2469 This LU has no prereqs.
2474 def Exec(self, feedback_fn):
2478 result = self.rpc.call_node_powercycle(self.op.node_name,
2479 self.cfg.GetHypervisorType())
2480 msg = result.RemoteFailMsg()
2482 raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2483 return result.payload
2486 class LUQueryClusterInfo(NoHooksLU):
2487 """Query cluster configuration.
2493 def ExpandNames(self):
2494 self.needed_locks = {}
2496 def CheckPrereq(self):
2497 """No prerequsites needed for this LU.
2502 def Exec(self, feedback_fn):
2503 """Return cluster config.
2506 cluster = self.cfg.GetClusterInfo()
2508 "software_version": constants.RELEASE_VERSION,
2509 "protocol_version": constants.PROTOCOL_VERSION,
2510 "config_version": constants.CONFIG_VERSION,
2511 "os_api_version": constants.OS_API_VERSION,
2512 "export_version": constants.EXPORT_VERSION,
2513 "architecture": (platform.architecture()[0], platform.machine()),
2514 "name": cluster.cluster_name,
2515 "master": cluster.master_node,
2516 "default_hypervisor": cluster.default_hypervisor,
2517 "enabled_hypervisors": cluster.enabled_hypervisors,
2518 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2519 for hypervisor in cluster.enabled_hypervisors]),
2520 "beparams": cluster.beparams,
2521 "nicparams": cluster.nicparams,
2522 "candidate_pool_size": cluster.candidate_pool_size,
2523 "default_bridge": cluster.default_bridge,
2524 "master_netdev": cluster.master_netdev,
2525 "volume_group_name": cluster.volume_group_name,
2526 "file_storage_dir": cluster.file_storage_dir,
2532 class LUQueryConfigValues(NoHooksLU):
2533 """Return configuration values.
2538 _FIELDS_DYNAMIC = utils.FieldSet()
2539 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2541 def ExpandNames(self):
2542 self.needed_locks = {}
2544 _CheckOutputFields(static=self._FIELDS_STATIC,
2545 dynamic=self._FIELDS_DYNAMIC,
2546 selected=self.op.output_fields)
2548 def CheckPrereq(self):
2549 """No prerequisites.
2554 def Exec(self, feedback_fn):
2555 """Dump a representation of the cluster config to the standard output.
2559 for field in self.op.output_fields:
2560 if field == "cluster_name":
2561 entry = self.cfg.GetClusterName()
2562 elif field == "master_node":
2563 entry = self.cfg.GetMasterNode()
2564 elif field == "drain_flag":
2565 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2567 raise errors.ParameterError(field)
2568 values.append(entry)
2572 class LUActivateInstanceDisks(NoHooksLU):
2573 """Bring up an instance's disks.
2576 _OP_REQP = ["instance_name"]
2579 def ExpandNames(self):
2580 self._ExpandAndLockInstance()
2581 self.needed_locks[locking.LEVEL_NODE] = []
2582 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2584 def DeclareLocks(self, level):
2585 if level == locking.LEVEL_NODE:
2586 self._LockInstancesNodes()
2588 def CheckPrereq(self):
2589 """Check prerequisites.
2591 This checks that the instance is in the cluster.
2594 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2595 assert self.instance is not None, \
2596 "Cannot retrieve locked instance %s" % self.op.instance_name
2597 _CheckNodeOnline(self, self.instance.primary_node)
2599 def Exec(self, feedback_fn):
2600 """Activate the disks.
2603 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2605 raise errors.OpExecError("Cannot activate block devices")
2610 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2611 """Prepare the block devices for an instance.
2613 This sets up the block devices on all nodes.
2615 @type lu: L{LogicalUnit}
2616 @param lu: the logical unit on whose behalf we execute
2617 @type instance: L{objects.Instance}
2618 @param instance: the instance for whose disks we assemble
2619 @type ignore_secondaries: boolean
2620 @param ignore_secondaries: if true, errors on secondary nodes
2621 won't result in an error return from the function
2622 @return: False if the operation failed, otherwise a list of
2623 (host, instance_visible_name, node_visible_name)
2624 with the mapping from node devices to instance devices
2629 iname = instance.name
2630 # With the two passes mechanism we try to reduce the window of
2631 # opportunity for the race condition of switching DRBD to primary
2632 # before handshaking occured, but we do not eliminate it
2634 # The proper fix would be to wait (with some limits) until the
2635 # connection has been made and drbd transitions from WFConnection
2636 # into any other network-connected state (Connected, SyncTarget,
2639 # 1st pass, assemble on all nodes in secondary mode
2640 for inst_disk in instance.disks:
2641 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2642 lu.cfg.SetDiskID(node_disk, node)
2643 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2644 msg = result.RemoteFailMsg()
2646 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2647 " (is_primary=False, pass=1): %s",
2648 inst_disk.iv_name, node, msg)
2649 if not ignore_secondaries:
2652 # FIXME: race condition on drbd migration to primary
2654 # 2nd pass, do only the primary node
2655 for inst_disk in instance.disks:
2656 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2657 if node != instance.primary_node:
2659 lu.cfg.SetDiskID(node_disk, node)
2660 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2661 msg = result.RemoteFailMsg()
2663 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2664 " (is_primary=True, pass=2): %s",
2665 inst_disk.iv_name, node, msg)
2667 device_info.append((instance.primary_node, inst_disk.iv_name,
2670 # leave the disks configured for the primary node
2671 # this is a workaround that would be fixed better by
2672 # improving the logical/physical id handling
2673 for disk in instance.disks:
2674 lu.cfg.SetDiskID(disk, instance.primary_node)
2676 return disks_ok, device_info
2679 def _StartInstanceDisks(lu, instance, force):
2680 """Start the disks of an instance.
2683 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2684 ignore_secondaries=force)
2686 _ShutdownInstanceDisks(lu, instance)
2687 if force is not None and not force:
2688 lu.proc.LogWarning("", hint="If the message above refers to a"
2690 " you can retry the operation using '--force'.")
2691 raise errors.OpExecError("Disk consistency error")
2694 class LUDeactivateInstanceDisks(NoHooksLU):
2695 """Shutdown an instance's disks.
2698 _OP_REQP = ["instance_name"]
2701 def ExpandNames(self):
2702 self._ExpandAndLockInstance()
2703 self.needed_locks[locking.LEVEL_NODE] = []
2704 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2706 def DeclareLocks(self, level):
2707 if level == locking.LEVEL_NODE:
2708 self._LockInstancesNodes()
2710 def CheckPrereq(self):
2711 """Check prerequisites.
2713 This checks that the instance is in the cluster.
2716 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2717 assert self.instance is not None, \
2718 "Cannot retrieve locked instance %s" % self.op.instance_name
2720 def Exec(self, feedback_fn):
2721 """Deactivate the disks
2724 instance = self.instance
2725 _SafeShutdownInstanceDisks(self, instance)
2728 def _SafeShutdownInstanceDisks(lu, instance):
2729 """Shutdown block devices of an instance.
2731 This function checks if an instance is running, before calling
2732 _ShutdownInstanceDisks.
2735 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2736 [instance.hypervisor])
2737 ins_l = ins_l[instance.primary_node]
2738 if ins_l.failed or not isinstance(ins_l.data, list):
2739 raise errors.OpExecError("Can't contact node '%s'" %
2740 instance.primary_node)
2742 if instance.name in ins_l.data:
2743 raise errors.OpExecError("Instance is running, can't shutdown"
2746 _ShutdownInstanceDisks(lu, instance)
2749 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2750 """Shutdown block devices of an instance.
2752 This does the shutdown on all nodes of the instance.
2754 If the ignore_primary is false, errors on the primary node are
2759 for disk in instance.disks:
2760 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2761 lu.cfg.SetDiskID(top_disk, node)
2762 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2763 msg = result.RemoteFailMsg()
2765 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2766 disk.iv_name, node, msg)
2767 if not ignore_primary or node != instance.primary_node:
2772 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2773 """Checks if a node has enough free memory.
2775 This function check if a given node has the needed amount of free
2776 memory. In case the node has less memory or we cannot get the
2777 information from the node, this function raise an OpPrereqError
2780 @type lu: C{LogicalUnit}
2781 @param lu: a logical unit from which we get configuration data
2783 @param node: the node to check
2784 @type reason: C{str}
2785 @param reason: string to use in the error message
2786 @type requested: C{int}
2787 @param requested: the amount of memory in MiB to check for
2788 @type hypervisor_name: C{str}
2789 @param hypervisor_name: the hypervisor to ask for memory stats
2790 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2791 we cannot check the node
2794 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2795 nodeinfo[node].Raise()
2796 free_mem = nodeinfo[node].data.get('memory_free')
2797 if not isinstance(free_mem, int):
2798 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2799 " was '%s'" % (node, free_mem))
2800 if requested > free_mem:
2801 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2802 " needed %s MiB, available %s MiB" %
2803 (node, reason, requested, free_mem))
2806 class LUStartupInstance(LogicalUnit):
2807 """Starts an instance.
2810 HPATH = "instance-start"
2811 HTYPE = constants.HTYPE_INSTANCE
2812 _OP_REQP = ["instance_name", "force"]
2815 def ExpandNames(self):
2816 self._ExpandAndLockInstance()
2818 def BuildHooksEnv(self):
2821 This runs on master, primary and secondary nodes of the instance.
2825 "FORCE": self.op.force,
2827 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2828 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2831 def CheckPrereq(self):
2832 """Check prerequisites.
2834 This checks that the instance is in the cluster.
2837 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2838 assert self.instance is not None, \
2839 "Cannot retrieve locked instance %s" % self.op.instance_name
2842 self.beparams = getattr(self.op, "beparams", {})
2844 if not isinstance(self.beparams, dict):
2845 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2846 " dict" % (type(self.beparams), ))
2847 # fill the beparams dict
2848 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2849 self.op.beparams = self.beparams
2852 self.hvparams = getattr(self.op, "hvparams", {})
2854 if not isinstance(self.hvparams, dict):
2855 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2856 " dict" % (type(self.hvparams), ))
2858 # check hypervisor parameter syntax (locally)
2859 cluster = self.cfg.GetClusterInfo()
2860 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2861 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2863 filled_hvp.update(self.hvparams)
2864 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2865 hv_type.CheckParameterSyntax(filled_hvp)
2866 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2867 self.op.hvparams = self.hvparams
2869 _CheckNodeOnline(self, instance.primary_node)
2871 bep = self.cfg.GetClusterInfo().FillBE(instance)
2872 # check bridges existance
2873 _CheckInstanceBridgesExist(self, instance)
2875 remote_info = self.rpc.call_instance_info(instance.primary_node,
2877 instance.hypervisor)
2879 if not remote_info.data:
2880 _CheckNodeFreeMemory(self, instance.primary_node,
2881 "starting instance %s" % instance.name,
2882 bep[constants.BE_MEMORY], instance.hypervisor)
2884 def Exec(self, feedback_fn):
2885 """Start the instance.
2888 instance = self.instance
2889 force = self.op.force
2891 self.cfg.MarkInstanceUp(instance.name)
2893 node_current = instance.primary_node
2895 _StartInstanceDisks(self, instance, force)
2897 result = self.rpc.call_instance_start(node_current, instance,
2898 self.hvparams, self.beparams)
2899 msg = result.RemoteFailMsg()
2901 _ShutdownInstanceDisks(self, instance)
2902 raise errors.OpExecError("Could not start instance: %s" % msg)
2905 class LURebootInstance(LogicalUnit):
2906 """Reboot an instance.
2909 HPATH = "instance-reboot"
2910 HTYPE = constants.HTYPE_INSTANCE
2911 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2914 def ExpandNames(self):
2915 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2916 constants.INSTANCE_REBOOT_HARD,
2917 constants.INSTANCE_REBOOT_FULL]:
2918 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2919 (constants.INSTANCE_REBOOT_SOFT,
2920 constants.INSTANCE_REBOOT_HARD,
2921 constants.INSTANCE_REBOOT_FULL))
2922 self._ExpandAndLockInstance()
2924 def BuildHooksEnv(self):
2927 This runs on master, primary and secondary nodes of the instance.
2931 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2932 "REBOOT_TYPE": self.op.reboot_type,
2934 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2935 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2938 def CheckPrereq(self):
2939 """Check prerequisites.
2941 This checks that the instance is in the cluster.
2944 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2945 assert self.instance is not None, \
2946 "Cannot retrieve locked instance %s" % self.op.instance_name
2948 _CheckNodeOnline(self, instance.primary_node)
2950 # check bridges existance
2951 _CheckInstanceBridgesExist(self, instance)
2953 def Exec(self, feedback_fn):
2954 """Reboot the instance.
2957 instance = self.instance
2958 ignore_secondaries = self.op.ignore_secondaries
2959 reboot_type = self.op.reboot_type
2961 node_current = instance.primary_node
2963 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2964 constants.INSTANCE_REBOOT_HARD]:
2965 for disk in instance.disks:
2966 self.cfg.SetDiskID(disk, node_current)
2967 result = self.rpc.call_instance_reboot(node_current, instance,
2969 msg = result.RemoteFailMsg()
2971 raise errors.OpExecError("Could not reboot instance: %s" % msg)
2973 result = self.rpc.call_instance_shutdown(node_current, instance)
2974 msg = result.RemoteFailMsg()
2976 raise errors.OpExecError("Could not shutdown instance for"
2977 " full reboot: %s" % msg)
2978 _ShutdownInstanceDisks(self, instance)
2979 _StartInstanceDisks(self, instance, ignore_secondaries)
2980 result = self.rpc.call_instance_start(node_current, instance, None, None)
2981 msg = result.RemoteFailMsg()
2983 _ShutdownInstanceDisks(self, instance)
2984 raise errors.OpExecError("Could not start instance for"
2985 " full reboot: %s" % msg)
2987 self.cfg.MarkInstanceUp(instance.name)
2990 class LUShutdownInstance(LogicalUnit):
2991 """Shutdown an instance.
2994 HPATH = "instance-stop"
2995 HTYPE = constants.HTYPE_INSTANCE
2996 _OP_REQP = ["instance_name"]
2999 def ExpandNames(self):
3000 self._ExpandAndLockInstance()
3002 def BuildHooksEnv(self):
3005 This runs on master, primary and secondary nodes of the instance.
3008 env = _BuildInstanceHookEnvByObject(self, self.instance)
3009 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3012 def CheckPrereq(self):
3013 """Check prerequisites.
3015 This checks that the instance is in the cluster.
3018 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3019 assert self.instance is not None, \
3020 "Cannot retrieve locked instance %s" % self.op.instance_name
3021 _CheckNodeOnline(self, self.instance.primary_node)
3023 def Exec(self, feedback_fn):
3024 """Shutdown the instance.
3027 instance = self.instance
3028 node_current = instance.primary_node
3029 self.cfg.MarkInstanceDown(instance.name)
3030 result = self.rpc.call_instance_shutdown(node_current, instance)
3031 msg = result.RemoteFailMsg()
3033 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3035 _ShutdownInstanceDisks(self, instance)
3038 class LUReinstallInstance(LogicalUnit):
3039 """Reinstall an instance.
3042 HPATH = "instance-reinstall"
3043 HTYPE = constants.HTYPE_INSTANCE
3044 _OP_REQP = ["instance_name"]
3047 def ExpandNames(self):
3048 self._ExpandAndLockInstance()
3050 def BuildHooksEnv(self):
3053 This runs on master, primary and secondary nodes of the instance.
3056 env = _BuildInstanceHookEnvByObject(self, self.instance)
3057 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3060 def CheckPrereq(self):
3061 """Check prerequisites.
3063 This checks that the instance is in the cluster and is not running.
3066 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3067 assert instance is not None, \
3068 "Cannot retrieve locked instance %s" % self.op.instance_name
3069 _CheckNodeOnline(self, instance.primary_node)
3071 if instance.disk_template == constants.DT_DISKLESS:
3072 raise errors.OpPrereqError("Instance '%s' has no disks" %
3073 self.op.instance_name)
3074 if instance.admin_up:
3075 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3076 self.op.instance_name)
3077 remote_info = self.rpc.call_instance_info(instance.primary_node,
3079 instance.hypervisor)
3081 if remote_info.data:
3082 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3083 (self.op.instance_name,
3084 instance.primary_node))
3086 self.op.os_type = getattr(self.op, "os_type", None)
3087 if self.op.os_type is not None:
3089 pnode = self.cfg.GetNodeInfo(
3090 self.cfg.ExpandNodeName(instance.primary_node))
3092 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3094 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3096 if not isinstance(result.data, objects.OS):
3097 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3098 " primary node" % self.op.os_type)
3100 self.instance = instance
3102 def Exec(self, feedback_fn):
3103 """Reinstall the instance.
3106 inst = self.instance
3108 if self.op.os_type is not None:
3109 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3110 inst.os = self.op.os_type
3111 self.cfg.Update(inst)
3113 _StartInstanceDisks(self, inst, None)
3115 feedback_fn("Running the instance OS create scripts...")
3116 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3117 msg = result.RemoteFailMsg()
3119 raise errors.OpExecError("Could not install OS for instance %s"
3121 (inst.name, inst.primary_node, msg))
3123 _ShutdownInstanceDisks(self, inst)
3126 class LURenameInstance(LogicalUnit):
3127 """Rename an instance.
3130 HPATH = "instance-rename"
3131 HTYPE = constants.HTYPE_INSTANCE
3132 _OP_REQP = ["instance_name", "new_name"]
3134 def BuildHooksEnv(self):
3137 This runs on master, primary and secondary nodes of the instance.
3140 env = _BuildInstanceHookEnvByObject(self, self.instance)
3141 env["INSTANCE_NEW_NAME"] = self.op.new_name
3142 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3145 def CheckPrereq(self):
3146 """Check prerequisites.
3148 This checks that the instance is in the cluster and is not running.
3151 instance = self.cfg.GetInstanceInfo(
3152 self.cfg.ExpandInstanceName(self.op.instance_name))
3153 if instance is None:
3154 raise errors.OpPrereqError("Instance '%s' not known" %
3155 self.op.instance_name)
3156 _CheckNodeOnline(self, instance.primary_node)
3158 if instance.admin_up:
3159 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3160 self.op.instance_name)
3161 remote_info = self.rpc.call_instance_info(instance.primary_node,
3163 instance.hypervisor)
3165 if remote_info.data:
3166 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3167 (self.op.instance_name,
3168 instance.primary_node))
3169 self.instance = instance
3171 # new name verification
3172 name_info = utils.HostInfo(self.op.new_name)
3174 self.op.new_name = new_name = name_info.name
3175 instance_list = self.cfg.GetInstanceList()
3176 if new_name in instance_list:
3177 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3180 if not getattr(self.op, "ignore_ip", False):
3181 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3182 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3183 (name_info.ip, new_name))
3186 def Exec(self, feedback_fn):
3187 """Reinstall the instance.
3190 inst = self.instance
3191 old_name = inst.name
3193 if inst.disk_template == constants.DT_FILE:
3194 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3196 self.cfg.RenameInstance(inst.name, self.op.new_name)
3197 # Change the instance lock. This is definitely safe while we hold the BGL
3198 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3199 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3201 # re-read the instance from the configuration after rename
3202 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3204 if inst.disk_template == constants.DT_FILE:
3205 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3206 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3207 old_file_storage_dir,
3208 new_file_storage_dir)
3211 raise errors.OpExecError("Could not connect to node '%s' to rename"
3212 " directory '%s' to '%s' (but the instance"
3213 " has been renamed in Ganeti)" % (
3214 inst.primary_node, old_file_storage_dir,
3215 new_file_storage_dir))
3217 if not result.data[0]:
3218 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3219 " (but the instance has been renamed in"
3220 " Ganeti)" % (old_file_storage_dir,
3221 new_file_storage_dir))
3223 _StartInstanceDisks(self, inst, None)
3225 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3227 msg = result.RemoteFailMsg()
3229 msg = ("Could not run OS rename script for instance %s on node %s"
3230 " (but the instance has been renamed in Ganeti): %s" %
3231 (inst.name, inst.primary_node, msg))
3232 self.proc.LogWarning(msg)
3234 _ShutdownInstanceDisks(self, inst)
3237 class LURemoveInstance(LogicalUnit):
3238 """Remove an instance.
3241 HPATH = "instance-remove"
3242 HTYPE = constants.HTYPE_INSTANCE
3243 _OP_REQP = ["instance_name", "ignore_failures"]
3246 def ExpandNames(self):
3247 self._ExpandAndLockInstance()
3248 self.needed_locks[locking.LEVEL_NODE] = []
3249 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3251 def DeclareLocks(self, level):
3252 if level == locking.LEVEL_NODE:
3253 self._LockInstancesNodes()
3255 def BuildHooksEnv(self):
3258 This runs on master, primary and secondary nodes of the instance.
3261 env = _BuildInstanceHookEnvByObject(self, self.instance)
3262 nl = [self.cfg.GetMasterNode()]
3265 def CheckPrereq(self):
3266 """Check prerequisites.
3268 This checks that the instance is in the cluster.
3271 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3272 assert self.instance is not None, \
3273 "Cannot retrieve locked instance %s" % self.op.instance_name
3275 def Exec(self, feedback_fn):
3276 """Remove the instance.
3279 instance = self.instance
3280 logging.info("Shutting down instance %s on node %s",
3281 instance.name, instance.primary_node)
3283 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3284 msg = result.RemoteFailMsg()
3286 if self.op.ignore_failures:
3287 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3289 raise errors.OpExecError("Could not shutdown instance %s on"
3291 (instance.name, instance.primary_node, msg))
3293 logging.info("Removing block devices for instance %s", instance.name)
3295 if not _RemoveDisks(self, instance):
3296 if self.op.ignore_failures:
3297 feedback_fn("Warning: can't remove instance's disks")
3299 raise errors.OpExecError("Can't remove instance's disks")
3301 logging.info("Removing instance %s out of cluster config", instance.name)
3303 self.cfg.RemoveInstance(instance.name)
3304 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3307 class LUQueryInstances(NoHooksLU):
3308 """Logical unit for querying instances.
3311 _OP_REQP = ["output_fields", "names", "use_locking"]
3313 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3315 "disk_template", "ip", "mac", "bridge",
3316 "sda_size", "sdb_size", "vcpus", "tags",
3317 "network_port", "beparams",
3318 r"(disk)\.(size)/([0-9]+)",
3319 r"(disk)\.(sizes)", "disk_usage",
3320 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3321 r"(nic)\.(macs|ips|bridges)",
3322 r"(disk|nic)\.(count)",
3323 "serial_no", "hypervisor", "hvparams",] +
3325 for name in constants.HVS_PARAMETERS] +
3327 for name in constants.BES_PARAMETERS])
3328 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3331 def ExpandNames(self):
3332 _CheckOutputFields(static=self._FIELDS_STATIC,
3333 dynamic=self._FIELDS_DYNAMIC,
3334 selected=self.op.output_fields)
3336 self.needed_locks = {}
3337 self.share_locks[locking.LEVEL_INSTANCE] = 1
3338 self.share_locks[locking.LEVEL_NODE] = 1
3341 self.wanted = _GetWantedInstances(self, self.op.names)
3343 self.wanted = locking.ALL_SET
3345 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3346 self.do_locking = self.do_node_query and self.op.use_locking
3348 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3349 self.needed_locks[locking.LEVEL_NODE] = []
3350 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3352 def DeclareLocks(self, level):
3353 if level == locking.LEVEL_NODE and self.do_locking:
3354 self._LockInstancesNodes()
3356 def CheckPrereq(self):
3357 """Check prerequisites.
3362 def Exec(self, feedback_fn):
3363 """Computes the list of nodes and their attributes.
3366 all_info = self.cfg.GetAllInstancesInfo()
3367 if self.wanted == locking.ALL_SET:
3368 # caller didn't specify instance names, so ordering is not important
3370 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3372 instance_names = all_info.keys()
3373 instance_names = utils.NiceSort(instance_names)
3375 # caller did specify names, so we must keep the ordering
3377 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3379 tgt_set = all_info.keys()
3380 missing = set(self.wanted).difference(tgt_set)
3382 raise errors.OpExecError("Some instances were removed before"
3383 " retrieving their data: %s" % missing)
3384 instance_names = self.wanted
3386 instance_list = [all_info[iname] for iname in instance_names]
3388 # begin data gathering
3390 nodes = frozenset([inst.primary_node for inst in instance_list])
3391 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3395 if self.do_node_query:
3397 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3399 result = node_data[name]
3401 # offline nodes will be in both lists
3402 off_nodes.append(name)
3404 bad_nodes.append(name)
3407 live_data.update(result.data)
3408 # else no instance is alive
3410 live_data = dict([(name, {}) for name in instance_names])
3412 # end data gathering
3417 for instance in instance_list:
3419 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3420 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3421 for field in self.op.output_fields:
3422 st_match = self._FIELDS_STATIC.Matches(field)
3427 elif field == "pnode":
3428 val = instance.primary_node
3429 elif field == "snodes":
3430 val = list(instance.secondary_nodes)
3431 elif field == "admin_state":
3432 val = instance.admin_up
3433 elif field == "oper_state":
3434 if instance.primary_node in bad_nodes:
3437 val = bool(live_data.get(instance.name))
3438 elif field == "status":
3439 if instance.primary_node in off_nodes:
3440 val = "ERROR_nodeoffline"
3441 elif instance.primary_node in bad_nodes:
3442 val = "ERROR_nodedown"
3444 running = bool(live_data.get(instance.name))
3446 if instance.admin_up:
3451 if instance.admin_up:
3455 elif field == "oper_ram":
3456 if instance.primary_node in bad_nodes:
3458 elif instance.name in live_data:
3459 val = live_data[instance.name].get("memory", "?")
3462 elif field == "disk_template":
3463 val = instance.disk_template
3465 val = instance.nics[0].ip
3466 elif field == "bridge":
3467 val = instance.nics[0].bridge
3468 elif field == "mac":
3469 val = instance.nics[0].mac
3470 elif field == "sda_size" or field == "sdb_size":
3471 idx = ord(field[2]) - ord('a')
3473 val = instance.FindDisk(idx).size
3474 except errors.OpPrereqError:
3476 elif field == "disk_usage": # total disk usage per node
3477 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3478 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3479 elif field == "tags":
3480 val = list(instance.GetTags())
3481 elif field == "serial_no":
3482 val = instance.serial_no
3483 elif field == "network_port":
3484 val = instance.network_port
3485 elif field == "hypervisor":
3486 val = instance.hypervisor
3487 elif field == "hvparams":
3489 elif (field.startswith(HVPREFIX) and
3490 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3491 val = i_hv.get(field[len(HVPREFIX):], None)
3492 elif field == "beparams":
3494 elif (field.startswith(BEPREFIX) and
3495 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3496 val = i_be.get(field[len(BEPREFIX):], None)
3497 elif st_match and st_match.groups():
3498 # matches a variable list
3499 st_groups = st_match.groups()
3500 if st_groups and st_groups[0] == "disk":
3501 if st_groups[1] == "count":
3502 val = len(instance.disks)
3503 elif st_groups[1] == "sizes":
3504 val = [disk.size for disk in instance.disks]
3505 elif st_groups[1] == "size":
3507 val = instance.FindDisk(st_groups[2]).size
3508 except errors.OpPrereqError:
3511 assert False, "Unhandled disk parameter"
3512 elif st_groups[0] == "nic":
3513 if st_groups[1] == "count":
3514 val = len(instance.nics)
3515 elif st_groups[1] == "macs":
3516 val = [nic.mac for nic in instance.nics]
3517 elif st_groups[1] == "ips":
3518 val = [nic.ip for nic in instance.nics]
3519 elif st_groups[1] == "bridges":
3520 val = [nic.bridge for nic in instance.nics]
3523 nic_idx = int(st_groups[2])
3524 if nic_idx >= len(instance.nics):
3527 if st_groups[1] == "mac":
3528 val = instance.nics[nic_idx].mac
3529 elif st_groups[1] == "ip":
3530 val = instance.nics[nic_idx].ip
3531 elif st_groups[1] == "bridge":
3532 val = instance.nics[nic_idx].bridge
3534 assert False, "Unhandled NIC parameter"
3536 assert False, "Unhandled variable parameter"
3538 raise errors.ParameterError(field)
3545 class LUFailoverInstance(LogicalUnit):
3546 """Failover an instance.
3549 HPATH = "instance-failover"
3550 HTYPE = constants.HTYPE_INSTANCE
3551 _OP_REQP = ["instance_name", "ignore_consistency"]
3554 def ExpandNames(self):
3555 self._ExpandAndLockInstance()
3556 self.needed_locks[locking.LEVEL_NODE] = []
3557 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3559 def DeclareLocks(self, level):
3560 if level == locking.LEVEL_NODE:
3561 self._LockInstancesNodes()
3563 def BuildHooksEnv(self):
3566 This runs on master, primary and secondary nodes of the instance.
3570 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3572 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3573 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3576 def CheckPrereq(self):
3577 """Check prerequisites.
3579 This checks that the instance is in the cluster.
3582 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3583 assert self.instance is not None, \
3584 "Cannot retrieve locked instance %s" % self.op.instance_name
3586 bep = self.cfg.GetClusterInfo().FillBE(instance)
3587 if instance.disk_template not in constants.DTS_NET_MIRROR:
3588 raise errors.OpPrereqError("Instance's disk layout is not"
3589 " network mirrored, cannot failover.")
3591 secondary_nodes = instance.secondary_nodes
3592 if not secondary_nodes:
3593 raise errors.ProgrammerError("no secondary node but using "
3594 "a mirrored disk template")
3596 target_node = secondary_nodes[0]
3597 _CheckNodeOnline(self, target_node)
3598 _CheckNodeNotDrained(self, target_node)
3599 # check memory requirements on the secondary node
3600 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3601 instance.name, bep[constants.BE_MEMORY],
3602 instance.hypervisor)
3604 # check bridge existance
3605 brlist = [nic.bridge for nic in instance.nics]
3606 result = self.rpc.call_bridges_exist(target_node, brlist)
3609 raise errors.OpPrereqError("One or more target bridges %s does not"
3610 " exist on destination node '%s'" %
3611 (brlist, target_node))
3613 def Exec(self, feedback_fn):
3614 """Failover an instance.
3616 The failover is done by shutting it down on its present node and
3617 starting it on the secondary.
3620 instance = self.instance
3622 source_node = instance.primary_node
3623 target_node = instance.secondary_nodes[0]
3625 feedback_fn("* checking disk consistency between source and target")
3626 for dev in instance.disks:
3627 # for drbd, these are drbd over lvm
3628 if not _CheckDiskConsistency(self, dev, target_node, False):
3629 if instance.admin_up and not self.op.ignore_consistency:
3630 raise errors.OpExecError("Disk %s is degraded on target node,"
3631 " aborting failover." % dev.iv_name)
3633 feedback_fn("* shutting down instance on source node")
3634 logging.info("Shutting down instance %s on node %s",
3635 instance.name, source_node)
3637 result = self.rpc.call_instance_shutdown(source_node, instance)
3638 msg = result.RemoteFailMsg()
3640 if self.op.ignore_consistency:
3641 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3642 " Proceeding anyway. Please make sure node"
3643 " %s is down. Error details: %s",
3644 instance.name, source_node, source_node, msg)
3646 raise errors.OpExecError("Could not shutdown instance %s on"
3648 (instance.name, source_node, msg))
3650 feedback_fn("* deactivating the instance's disks on source node")
3651 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3652 raise errors.OpExecError("Can't shut down the instance's disks.")
3654 instance.primary_node = target_node
3655 # distribute new instance config to the other nodes
3656 self.cfg.Update(instance)
3658 # Only start the instance if it's marked as up
3659 if instance.admin_up:
3660 feedback_fn("* activating the instance's disks on target node")
3661 logging.info("Starting instance %s on node %s",
3662 instance.name, target_node)
3664 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3665 ignore_secondaries=True)
3667 _ShutdownInstanceDisks(self, instance)
3668 raise errors.OpExecError("Can't activate the instance's disks")
3670 feedback_fn("* starting the instance on the target node")
3671 result = self.rpc.call_instance_start(target_node, instance, None, None)
3672 msg = result.RemoteFailMsg()
3674 _ShutdownInstanceDisks(self, instance)
3675 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3676 (instance.name, target_node, msg))
3679 class LUMigrateInstance(LogicalUnit):
3680 """Migrate an instance.
3682 This is migration without shutting down, compared to the failover,
3683 which is done with shutdown.
3686 HPATH = "instance-migrate"
3687 HTYPE = constants.HTYPE_INSTANCE
3688 _OP_REQP = ["instance_name", "live", "cleanup"]
3692 def ExpandNames(self):
3693 self._ExpandAndLockInstance()
3694 self.needed_locks[locking.LEVEL_NODE] = []
3695 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3697 def DeclareLocks(self, level):
3698 if level == locking.LEVEL_NODE:
3699 self._LockInstancesNodes()
3701 def BuildHooksEnv(self):
3704 This runs on master, primary and secondary nodes of the instance.
3707 env = _BuildInstanceHookEnvByObject(self, self.instance)
3708 env["MIGRATE_LIVE"] = self.op.live
3709 env["MIGRATE_CLEANUP"] = self.op.cleanup
3710 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3713 def CheckPrereq(self):
3714 """Check prerequisites.
3716 This checks that the instance is in the cluster.
3719 instance = self.cfg.GetInstanceInfo(
3720 self.cfg.ExpandInstanceName(self.op.instance_name))
3721 if instance is None:
3722 raise errors.OpPrereqError("Instance '%s' not known" %
3723 self.op.instance_name)
3725 if instance.disk_template != constants.DT_DRBD8:
3726 raise errors.OpPrereqError("Instance's disk layout is not"
3727 " drbd8, cannot migrate.")
3729 secondary_nodes = instance.secondary_nodes
3730 if not secondary_nodes:
3731 raise errors.ConfigurationError("No secondary node but using"
3732 " drbd8 disk template")
3734 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3736 target_node = secondary_nodes[0]
3737 # check memory requirements on the secondary node
3738 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3739 instance.name, i_be[constants.BE_MEMORY],
3740 instance.hypervisor)
3742 # check bridge existance
3743 brlist = [nic.bridge for nic in instance.nics]
3744 result = self.rpc.call_bridges_exist(target_node, brlist)
3745 if result.failed or not result.data:
3746 raise errors.OpPrereqError("One or more target bridges %s does not"
3747 " exist on destination node '%s'" %
3748 (brlist, target_node))
3750 if not self.op.cleanup:
3751 _CheckNodeNotDrained(self, target_node)
3752 result = self.rpc.call_instance_migratable(instance.primary_node,
3754 msg = result.RemoteFailMsg()
3756 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3759 self.instance = instance
3761 def _WaitUntilSync(self):
3762 """Poll with custom rpc for disk sync.
3764 This uses our own step-based rpc call.
3767 self.feedback_fn("* wait until resync is done")
3771 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3773 self.instance.disks)
3775 for node, nres in result.items():
3776 msg = nres.RemoteFailMsg()
3778 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3780 node_done, node_percent = nres.payload
3781 all_done = all_done and node_done
3782 if node_percent is not None:
3783 min_percent = min(min_percent, node_percent)
3785 if min_percent < 100:
3786 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3789 def _EnsureSecondary(self, node):
3790 """Demote a node to secondary.
3793 self.feedback_fn("* switching node %s to secondary mode" % node)
3795 for dev in self.instance.disks:
3796 self.cfg.SetDiskID(dev, node)
3798 result = self.rpc.call_blockdev_close(node, self.instance.name,
3799 self.instance.disks)
3800 msg = result.RemoteFailMsg()
3802 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3803 " error %s" % (node, msg))
3805 def _GoStandalone(self):
3806 """Disconnect from the network.
3809 self.feedback_fn("* changing into standalone mode")
3810 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3811 self.instance.disks)
3812 for node, nres in result.items():
3813 msg = nres.RemoteFailMsg()
3815 raise errors.OpExecError("Cannot disconnect disks node %s,"
3816 " error %s" % (node, msg))
3818 def _GoReconnect(self, multimaster):
3819 """Reconnect to the network.
3825 msg = "single-master"
3826 self.feedback_fn("* changing disks into %s mode" % msg)
3827 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3828 self.instance.disks,
3829 self.instance.name, multimaster)
3830 for node, nres in result.items():
3831 msg = nres.RemoteFailMsg()
3833 raise errors.OpExecError("Cannot change disks config on node %s,"
3834 " error: %s" % (node, msg))
3836 def _ExecCleanup(self):
3837 """Try to cleanup after a failed migration.
3839 The cleanup is done by:
3840 - check that the instance is running only on one node
3841 (and update the config if needed)
3842 - change disks on its secondary node to secondary
3843 - wait until disks are fully synchronized
3844 - disconnect from the network
3845 - change disks into single-master mode
3846 - wait again until disks are fully synchronized
3849 instance = self.instance
3850 target_node = self.target_node
3851 source_node = self.source_node
3853 # check running on only one node
3854 self.feedback_fn("* checking where the instance actually runs"
3855 " (if this hangs, the hypervisor might be in"
3857 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3858 for node, result in ins_l.items():
3860 if not isinstance(result.data, list):
3861 raise errors.OpExecError("Can't contact node '%s'" % node)
3863 runningon_source = instance.name in ins_l[source_node].data
3864 runningon_target = instance.name in ins_l[target_node].data
3866 if runningon_source and runningon_target:
3867 raise errors.OpExecError("Instance seems to be running on two nodes,"
3868 " or the hypervisor is confused. You will have"
3869 " to ensure manually that it runs only on one"
3870 " and restart this operation.")
3872 if not (runningon_source or runningon_target):
3873 raise errors.OpExecError("Instance does not seem to be running at all."
3874 " In this case, it's safer to repair by"
3875 " running 'gnt-instance stop' to ensure disk"
3876 " shutdown, and then restarting it.")
3878 if runningon_target:
3879 # the migration has actually succeeded, we need to update the config
3880 self.feedback_fn("* instance running on secondary node (%s),"
3881 " updating config" % target_node)
3882 instance.primary_node = target_node
3883 self.cfg.Update(instance)
3884 demoted_node = source_node
3886 self.feedback_fn("* instance confirmed to be running on its"
3887 " primary node (%s)" % source_node)
3888 demoted_node = target_node
3890 self._EnsureSecondary(demoted_node)
3892 self._WaitUntilSync()
3893 except errors.OpExecError:
3894 # we ignore here errors, since if the device is standalone, it
3895 # won't be able to sync
3897 self._GoStandalone()
3898 self._GoReconnect(False)
3899 self._WaitUntilSync()
3901 self.feedback_fn("* done")
3903 def _RevertDiskStatus(self):
3904 """Try to revert the disk status after a failed migration.
3907 target_node = self.target_node
3909 self._EnsureSecondary(target_node)
3910 self._GoStandalone()
3911 self._GoReconnect(False)
3912 self._WaitUntilSync()
3913 except errors.OpExecError, err:
3914 self.LogWarning("Migration failed and I can't reconnect the"
3915 " drives: error '%s'\n"
3916 "Please look and recover the instance status" %
3919 def _AbortMigration(self):
3920 """Call the hypervisor code to abort a started migration.
3923 instance = self.instance
3924 target_node = self.target_node
3925 migration_info = self.migration_info
3927 abort_result = self.rpc.call_finalize_migration(target_node,
3931 abort_msg = abort_result.RemoteFailMsg()
3933 logging.error("Aborting migration failed on target node %s: %s" %
3934 (target_node, abort_msg))
3935 # Don't raise an exception here, as we stil have to try to revert the
3936 # disk status, even if this step failed.
3938 def _ExecMigration(self):
3939 """Migrate an instance.
3941 The migrate is done by:
3942 - change the disks into dual-master mode
3943 - wait until disks are fully synchronized again
3944 - migrate the instance
3945 - change disks on the new secondary node (the old primary) to secondary
3946 - wait until disks are fully synchronized
3947 - change disks into single-master mode
3950 instance = self.instance
3951 target_node = self.target_node
3952 source_node = self.source_node
3954 self.feedback_fn("* checking disk consistency between source and target")
3955 for dev in instance.disks:
3956 if not _CheckDiskConsistency(self, dev, target_node, False):
3957 raise errors.OpExecError("Disk %s is degraded or not fully"
3958 " synchronized on target node,"
3959 " aborting migrate." % dev.iv_name)
3961 # First get the migration information from the remote node
3962 result = self.rpc.call_migration_info(source_node, instance)
3963 msg = result.RemoteFailMsg()
3965 log_err = ("Failed fetching source migration information from %s: %s" %
3967 logging.error(log_err)
3968 raise errors.OpExecError(log_err)
3970 self.migration_info = migration_info = result.payload
3972 # Then switch the disks to master/master mode
3973 self._EnsureSecondary(target_node)
3974 self._GoStandalone()
3975 self._GoReconnect(True)
3976 self._WaitUntilSync()
3978 self.feedback_fn("* preparing %s to accept the instance" % target_node)
3979 result = self.rpc.call_accept_instance(target_node,
3982 self.nodes_ip[target_node])
3984 msg = result.RemoteFailMsg()
3986 logging.error("Instance pre-migration failed, trying to revert"
3987 " disk status: %s", msg)
3988 self._AbortMigration()
3989 self._RevertDiskStatus()
3990 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3991 (instance.name, msg))
3993 self.feedback_fn("* migrating instance to %s" % target_node)
3995 result = self.rpc.call_instance_migrate(source_node, instance,
3996 self.nodes_ip[target_node],
3998 msg = result.RemoteFailMsg()
4000 logging.error("Instance migration failed, trying to revert"
4001 " disk status: %s", msg)
4002 self._AbortMigration()
4003 self._RevertDiskStatus()
4004 raise errors.OpExecError("Could not migrate instance %s: %s" %
4005 (instance.name, msg))
4008 instance.primary_node = target_node
4009 # distribute new instance config to the other nodes
4010 self.cfg.Update(instance)
4012 result = self.rpc.call_finalize_migration(target_node,
4016 msg = result.RemoteFailMsg()
4018 logging.error("Instance migration succeeded, but finalization failed:"
4020 raise errors.OpExecError("Could not finalize instance migration: %s" %
4023 self._EnsureSecondary(source_node)
4024 self._WaitUntilSync()
4025 self._GoStandalone()
4026 self._GoReconnect(False)
4027 self._WaitUntilSync()
4029 self.feedback_fn("* done")
4031 def Exec(self, feedback_fn):
4032 """Perform the migration.
4035 self.feedback_fn = feedback_fn
4037 self.source_node = self.instance.primary_node
4038 self.target_node = self.instance.secondary_nodes[0]
4039 self.all_nodes = [self.source_node, self.target_node]
4041 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4042 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4045 return self._ExecCleanup()
4047 return self._ExecMigration()
4050 def _CreateBlockDev(lu, node, instance, device, force_create,
4052 """Create a tree of block devices on a given node.
4054 If this device type has to be created on secondaries, create it and
4057 If not, just recurse to children keeping the same 'force' value.
4059 @param lu: the lu on whose behalf we execute
4060 @param node: the node on which to create the device
4061 @type instance: L{objects.Instance}
4062 @param instance: the instance which owns the device
4063 @type device: L{objects.Disk}
4064 @param device: the device to create
4065 @type force_create: boolean
4066 @param force_create: whether to force creation of this device; this
4067 will be change to True whenever we find a device which has
4068 CreateOnSecondary() attribute
4069 @param info: the extra 'metadata' we should attach to the device
4070 (this will be represented as a LVM tag)
4071 @type force_open: boolean
4072 @param force_open: this parameter will be passes to the
4073 L{backend.BlockdevCreate} function where it specifies
4074 whether we run on primary or not, and it affects both
4075 the child assembly and the device own Open() execution
4078 if device.CreateOnSecondary():
4082 for child in device.children:
4083 _CreateBlockDev(lu, node, instance, child, force_create,
4086 if not force_create:
4089 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4092 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4093 """Create a single block device on a given node.
4095 This will not recurse over children of the device, so they must be
4098 @param lu: the lu on whose behalf we execute
4099 @param node: the node on which to create the device
4100 @type instance: L{objects.Instance}
4101 @param instance: the instance which owns the device
4102 @type device: L{objects.Disk}
4103 @param device: the device to create
4104 @param info: the extra 'metadata' we should attach to the device
4105 (this will be represented as a LVM tag)
4106 @type force_open: boolean
4107 @param force_open: this parameter will be passes to the
4108 L{backend.BlockdevCreate} function where it specifies
4109 whether we run on primary or not, and it affects both
4110 the child assembly and the device own Open() execution
4113 lu.cfg.SetDiskID(device, node)
4114 result = lu.rpc.call_blockdev_create(node, device, device.size,
4115 instance.name, force_open, info)
4116 msg = result.RemoteFailMsg()
4118 raise errors.OpExecError("Can't create block device %s on"
4119 " node %s for instance %s: %s" %
4120 (device, node, instance.name, msg))
4121 if device.physical_id is None:
4122 device.physical_id = result.payload
4125 def _GenerateUniqueNames(lu, exts):
4126 """Generate a suitable LV name.
4128 This will generate a logical volume name for the given instance.
4133 new_id = lu.cfg.GenerateUniqueID()
4134 results.append("%s%s" % (new_id, val))
4138 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4140 """Generate a drbd8 device complete with its children.
4143 port = lu.cfg.AllocatePort()
4144 vgname = lu.cfg.GetVGName()
4145 shared_secret = lu.cfg.GenerateDRBDSecret()
4146 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4147 logical_id=(vgname, names[0]))
4148 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4149 logical_id=(vgname, names[1]))
4150 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4151 logical_id=(primary, secondary, port,
4154 children=[dev_data, dev_meta],
4159 def _GenerateDiskTemplate(lu, template_name,
4160 instance_name, primary_node,
4161 secondary_nodes, disk_info,
4162 file_storage_dir, file_driver,
4164 """Generate the entire disk layout for a given template type.
4167 #TODO: compute space requirements
4169 vgname = lu.cfg.GetVGName()
4170 disk_count = len(disk_info)
4172 if template_name == constants.DT_DISKLESS:
4174 elif template_name == constants.DT_PLAIN:
4175 if len(secondary_nodes) != 0:
4176 raise errors.ProgrammerError("Wrong template configuration")
4178 names = _GenerateUniqueNames(lu, [".disk%d" % i
4179 for i in range(disk_count)])
4180 for idx, disk in enumerate(disk_info):
4181 disk_index = idx + base_index
4182 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4183 logical_id=(vgname, names[idx]),
4184 iv_name="disk/%d" % disk_index,
4186 disks.append(disk_dev)
4187 elif template_name == constants.DT_DRBD8:
4188 if len(secondary_nodes) != 1:
4189 raise errors.ProgrammerError("Wrong template configuration")
4190 remote_node = secondary_nodes[0]
4191 minors = lu.cfg.AllocateDRBDMinor(
4192 [primary_node, remote_node] * len(disk_info), instance_name)
4195 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4196 for i in range(disk_count)]):
4197 names.append(lv_prefix + "_data")
4198 names.append(lv_prefix + "_meta")
4199 for idx, disk in enumerate(disk_info):
4200 disk_index = idx + base_index
4201 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4202 disk["size"], names[idx*2:idx*2+2],
4203 "disk/%d" % disk_index,
4204 minors[idx*2], minors[idx*2+1])
4205 disk_dev.mode = disk["mode"]
4206 disks.append(disk_dev)
4207 elif template_name == constants.DT_FILE:
4208 if len(secondary_nodes) != 0:
4209 raise errors.ProgrammerError("Wrong template configuration")
4211 for idx, disk in enumerate(disk_info):
4212 disk_index = idx + base_index
4213 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4214 iv_name="disk/%d" % disk_index,
4215 logical_id=(file_driver,
4216 "%s/disk%d" % (file_storage_dir,
4219 disks.append(disk_dev)
4221 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4225 def _GetInstanceInfoText(instance):
4226 """Compute that text that should be added to the disk's metadata.
4229 return "originstname+%s" % instance.name
4232 def _CreateDisks(lu, instance):
4233 """Create all disks for an instance.
4235 This abstracts away some work from AddInstance.
4237 @type lu: L{LogicalUnit}
4238 @param lu: the logical unit on whose behalf we execute
4239 @type instance: L{objects.Instance}
4240 @param instance: the instance whose disks we should create
4242 @return: the success of the creation
4245 info = _GetInstanceInfoText(instance)
4246 pnode = instance.primary_node
4248 if instance.disk_template == constants.DT_FILE:
4249 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4250 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4252 if result.failed or not result.data:
4253 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4255 if not result.data[0]:
4256 raise errors.OpExecError("Failed to create directory '%s'" %
4259 # Note: this needs to be kept in sync with adding of disks in
4260 # LUSetInstanceParams
4261 for device in instance.disks:
4262 logging.info("Creating volume %s for instance %s",
4263 device.iv_name, instance.name)
4265 for node in instance.all_nodes:
4266 f_create = node == pnode
4267 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4270 def _RemoveDisks(lu, instance):
4271 """Remove all disks for an instance.
4273 This abstracts away some work from `AddInstance()` and
4274 `RemoveInstance()`. Note that in case some of the devices couldn't
4275 be removed, the removal will continue with the other ones (compare
4276 with `_CreateDisks()`).
4278 @type lu: L{LogicalUnit}
4279 @param lu: the logical unit on whose behalf we execute
4280 @type instance: L{objects.Instance}
4281 @param instance: the instance whose disks we should remove
4283 @return: the success of the removal
4286 logging.info("Removing block devices for instance %s", instance.name)
4289 for device in instance.disks:
4290 for node, disk in device.ComputeNodeTree(instance.primary_node):
4291 lu.cfg.SetDiskID(disk, node)
4292 msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4294 lu.LogWarning("Could not remove block device %s on node %s,"
4295 " continuing anyway: %s", device.iv_name, node, msg)
4298 if instance.disk_template == constants.DT_FILE:
4299 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4300 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4302 if result.failed or not result.data:
4303 logging.error("Could not remove directory '%s'", file_storage_dir)
4309 def _ComputeDiskSize(disk_template, disks):
4310 """Compute disk size requirements in the volume group
4313 # Required free disk space as a function of disk and swap space
4315 constants.DT_DISKLESS: None,
4316 constants.DT_PLAIN: sum(d["size"] for d in disks),
4317 # 128 MB are added for drbd metadata for each disk
4318 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4319 constants.DT_FILE: None,
4322 if disk_template not in req_size_dict:
4323 raise errors.ProgrammerError("Disk template '%s' size requirement"
4324 " is unknown" % disk_template)
4326 return req_size_dict[disk_template]
4329 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4330 """Hypervisor parameter validation.
4332 This function abstract the hypervisor parameter validation to be
4333 used in both instance create and instance modify.
4335 @type lu: L{LogicalUnit}
4336 @param lu: the logical unit for which we check
4337 @type nodenames: list
4338 @param nodenames: the list of nodes on which we should check
4339 @type hvname: string
4340 @param hvname: the name of the hypervisor we should use
4341 @type hvparams: dict
4342 @param hvparams: the parameters which we need to check
4343 @raise errors.OpPrereqError: if the parameters are not valid
4346 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4349 for node in nodenames:
4353 msg = info.RemoteFailMsg()
4355 raise errors.OpPrereqError("Hypervisor parameter validation"
4356 " failed on node %s: %s" % (node, msg))
4359 class LUCreateInstance(LogicalUnit):
4360 """Create an instance.
4363 HPATH = "instance-add"
4364 HTYPE = constants.HTYPE_INSTANCE
4365 _OP_REQP = ["instance_name", "disks", "disk_template",
4367 "wait_for_sync", "ip_check", "nics",
4368 "hvparams", "beparams"]
4371 def _ExpandNode(self, node):
4372 """Expands and checks one node name.
4375 node_full = self.cfg.ExpandNodeName(node)
4376 if node_full is None:
4377 raise errors.OpPrereqError("Unknown node %s" % node)
4380 def ExpandNames(self):
4381 """ExpandNames for CreateInstance.
4383 Figure out the right locks for instance creation.
4386 self.needed_locks = {}
4388 # set optional parameters to none if they don't exist
4389 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4390 if not hasattr(self.op, attr):
4391 setattr(self.op, attr, None)
4393 # cheap checks, mostly valid constants given
4395 # verify creation mode
4396 if self.op.mode not in (constants.INSTANCE_CREATE,
4397 constants.INSTANCE_IMPORT):
4398 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4401 # disk template and mirror node verification
4402 if self.op.disk_template not in constants.DISK_TEMPLATES:
4403 raise errors.OpPrereqError("Invalid disk template name")
4405 if self.op.hypervisor is None:
4406 self.op.hypervisor = self.cfg.GetHypervisorType()
4408 cluster = self.cfg.GetClusterInfo()
4409 enabled_hvs = cluster.enabled_hypervisors
4410 if self.op.hypervisor not in enabled_hvs:
4411 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4412 " cluster (%s)" % (self.op.hypervisor,
4413 ",".join(enabled_hvs)))
4415 # check hypervisor parameter syntax (locally)
4416 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4417 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4419 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4420 hv_type.CheckParameterSyntax(filled_hvp)
4422 # fill and remember the beparams dict
4423 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4424 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4427 #### instance parameters check
4429 # instance name verification
4430 hostname1 = utils.HostInfo(self.op.instance_name)
4431 self.op.instance_name = instance_name = hostname1.name
4433 # this is just a preventive check, but someone might still add this
4434 # instance in the meantime, and creation will fail at lock-add time
4435 if instance_name in self.cfg.GetInstanceList():
4436 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4439 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4443 for idx, nic in enumerate(self.op.nics):
4444 nic_mode_req = nic.get("mode", None)
4445 nic_mode = nic_mode_req
4446 if nic_mode is None:
4447 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4449 # in routed mode, for the first nic, the default ip is 'auto'
4450 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4451 default_ip_mode = constants.VALUE_AUTO
4453 default_ip_mode = constants.VALUE_NONE
4455 # ip validity checks
4456 ip = nic.get("ip", default_ip_mode)
4457 if ip is None or ip.lower() == constants.VALUE_NONE:
4459 elif ip.lower() == constants.VALUE_AUTO:
4460 nic_ip = hostname1.ip
4462 if not utils.IsValidIP(ip):
4463 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4464 " like a valid IP" % ip)
4467 # TODO: check the ip for uniqueness !!
4468 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4469 raise errors.OpPrereqError("Routed nic mode requires an ip address")
4471 # MAC address verification
4472 mac = nic.get("mac", constants.VALUE_AUTO)
4473 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4474 if not utils.IsValidMac(mac.lower()):
4475 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4477 # bridge verification
4478 bridge = nic.get("bridge", None)
4479 link = nic.get("link", None)
4481 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4482 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4483 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4489 nicparams[constants.NIC_MODE] = nic_mode_req
4491 nicparams[constants.NIC_LINK] = link
4493 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4495 objects.NIC.CheckParameterSyntax(check_params)
4496 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4498 # disk checks/pre-build
4500 for disk in self.op.disks:
4501 mode = disk.get("mode", constants.DISK_RDWR)
4502 if mode not in constants.DISK_ACCESS_SET:
4503 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4505 size = disk.get("size", None)
4507 raise errors.OpPrereqError("Missing disk size")
4511 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4512 self.disks.append({"size": size, "mode": mode})
4514 # used in CheckPrereq for ip ping check
4515 self.check_ip = hostname1.ip
4517 # file storage checks
4518 if (self.op.file_driver and
4519 not self.op.file_driver in constants.FILE_DRIVER):
4520 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4521 self.op.file_driver)
4523 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4524 raise errors.OpPrereqError("File storage directory path not absolute")
4526 ### Node/iallocator related checks
4527 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4528 raise errors.OpPrereqError("One and only one of iallocator and primary"
4529 " node must be given")
4531 if self.op.iallocator:
4532 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4534 self.op.pnode = self._ExpandNode(self.op.pnode)
4535 nodelist = [self.op.pnode]
4536 if self.op.snode is not None:
4537 self.op.snode = self._ExpandNode(self.op.snode)
4538 nodelist.append(self.op.snode)
4539 self.needed_locks[locking.LEVEL_NODE] = nodelist
4541 # in case of import lock the source node too
4542 if self.op.mode == constants.INSTANCE_IMPORT:
4543 src_node = getattr(self.op, "src_node", None)
4544 src_path = getattr(self.op, "src_path", None)
4546 if src_path is None:
4547 self.op.src_path = src_path = self.op.instance_name
4549 if src_node is None:
4550 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4551 self.op.src_node = None
4552 if os.path.isabs(src_path):
4553 raise errors.OpPrereqError("Importing an instance from an absolute"
4554 " path requires a source node option.")
4556 self.op.src_node = src_node = self._ExpandNode(src_node)
4557 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4558 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4559 if not os.path.isabs(src_path):
4560 self.op.src_path = src_path = \
4561 os.path.join(constants.EXPORT_DIR, src_path)
4563 else: # INSTANCE_CREATE
4564 if getattr(self.op, "os_type", None) is None:
4565 raise errors.OpPrereqError("No guest OS specified")
4567 def _RunAllocator(self):
4568 """Run the allocator based on input opcode.
4571 nics = [n.ToDict() for n in self.nics]
4572 ial = IAllocator(self,
4573 mode=constants.IALLOCATOR_MODE_ALLOC,
4574 name=self.op.instance_name,
4575 disk_template=self.op.disk_template,
4578 vcpus=self.be_full[constants.BE_VCPUS],
4579 mem_size=self.be_full[constants.BE_MEMORY],
4582 hypervisor=self.op.hypervisor,
4585 ial.Run(self.op.iallocator)
4588 raise errors.OpPrereqError("Can't compute nodes using"
4589 " iallocator '%s': %s" % (self.op.iallocator,
4591 if len(ial.nodes) != ial.required_nodes:
4592 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4593 " of nodes (%s), required %s" %
4594 (self.op.iallocator, len(ial.nodes),
4595 ial.required_nodes))
4596 self.op.pnode = ial.nodes[0]
4597 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4598 self.op.instance_name, self.op.iallocator,
4599 ", ".join(ial.nodes))
4600 if ial.required_nodes == 2:
4601 self.op.snode = ial.nodes[1]
4603 def BuildHooksEnv(self):
4606 This runs on master, primary and secondary nodes of the instance.
4610 "ADD_MODE": self.op.mode,
4612 if self.op.mode == constants.INSTANCE_IMPORT:
4613 env["SRC_NODE"] = self.op.src_node
4614 env["SRC_PATH"] = self.op.src_path
4615 env["SRC_IMAGES"] = self.src_images
4617 env.update(_BuildInstanceHookEnv(
4618 name=self.op.instance_name,
4619 primary_node=self.op.pnode,
4620 secondary_nodes=self.secondaries,
4621 status=self.op.start,
4622 os_type=self.op.os_type,
4623 memory=self.be_full[constants.BE_MEMORY],
4624 vcpus=self.be_full[constants.BE_VCPUS],
4625 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4626 disk_template=self.op.disk_template,
4627 disks=[(d["size"], d["mode"]) for d in self.disks],
4630 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4635 def CheckPrereq(self):
4636 """Check prerequisites.
4639 if (not self.cfg.GetVGName() and
4640 self.op.disk_template not in constants.DTS_NOT_LVM):
4641 raise errors.OpPrereqError("Cluster does not support lvm-based"
4644 if self.op.mode == constants.INSTANCE_IMPORT:
4645 src_node = self.op.src_node
4646 src_path = self.op.src_path
4648 if src_node is None:
4649 exp_list = self.rpc.call_export_list(
4650 self.acquired_locks[locking.LEVEL_NODE])
4652 for node in exp_list:
4653 if not exp_list[node].failed and src_path in exp_list[node].data:
4655 self.op.src_node = src_node = node
4656 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4660 raise errors.OpPrereqError("No export found for relative path %s" %
4663 _CheckNodeOnline(self, src_node)
4664 result = self.rpc.call_export_info(src_node, src_path)
4667 raise errors.OpPrereqError("No export found in dir %s" % src_path)
4669 export_info = result.data
4670 if not export_info.has_section(constants.INISECT_EXP):
4671 raise errors.ProgrammerError("Corrupted export config")
4673 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4674 if (int(ei_version) != constants.EXPORT_VERSION):
4675 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4676 (ei_version, constants.EXPORT_VERSION))
4678 # Check that the new instance doesn't have less disks than the export
4679 instance_disks = len(self.disks)
4680 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4681 if instance_disks < export_disks:
4682 raise errors.OpPrereqError("Not enough disks to import."
4683 " (instance: %d, export: %d)" %
4684 (instance_disks, export_disks))
4686 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4688 for idx in range(export_disks):
4689 option = 'disk%d_dump' % idx
4690 if export_info.has_option(constants.INISECT_INS, option):
4691 # FIXME: are the old os-es, disk sizes, etc. useful?
4692 export_name = export_info.get(constants.INISECT_INS, option)
4693 image = os.path.join(src_path, export_name)
4694 disk_images.append(image)
4696 disk_images.append(False)
4698 self.src_images = disk_images
4700 old_name = export_info.get(constants.INISECT_INS, 'name')
4701 # FIXME: int() here could throw a ValueError on broken exports
4702 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4703 if self.op.instance_name == old_name:
4704 for idx, nic in enumerate(self.nics):
4705 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4706 nic_mac_ini = 'nic%d_mac' % idx
4707 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4709 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4710 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4711 if self.op.start and not self.op.ip_check:
4712 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4713 " adding an instance in start mode")
4715 if self.op.ip_check:
4716 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4717 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4718 (self.check_ip, self.op.instance_name))
4720 #### mac address generation
4721 # By generating here the mac address both the allocator and the hooks get
4722 # the real final mac address rather than the 'auto' or 'generate' value.
4723 # There is a race condition between the generation and the instance object
4724 # creation, which means that we know the mac is valid now, but we're not
4725 # sure it will be when we actually add the instance. If things go bad
4726 # adding the instance will abort because of a duplicate mac, and the
4727 # creation job will fail.
4728 for nic in self.nics:
4729 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4730 nic.mac = self.cfg.GenerateMAC()
4734 if self.op.iallocator is not None:
4735 self._RunAllocator()
4737 #### node related checks
4739 # check primary node
4740 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4741 assert self.pnode is not None, \
4742 "Cannot retrieve locked node %s" % self.op.pnode
4744 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4747 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4750 self.secondaries = []
4752 # mirror node verification
4753 if self.op.disk_template in constants.DTS_NET_MIRROR:
4754 if self.op.snode is None:
4755 raise errors.OpPrereqError("The networked disk templates need"
4757 if self.op.snode == pnode.name:
4758 raise errors.OpPrereqError("The secondary node cannot be"
4759 " the primary node.")
4760 _CheckNodeOnline(self, self.op.snode)
4761 _CheckNodeNotDrained(self, self.op.snode)
4762 self.secondaries.append(self.op.snode)
4764 nodenames = [pnode.name] + self.secondaries
4766 req_size = _ComputeDiskSize(self.op.disk_template,
4769 # Check lv size requirements
4770 if req_size is not None:
4771 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4773 for node in nodenames:
4774 info = nodeinfo[node]
4778 raise errors.OpPrereqError("Cannot get current information"
4779 " from node '%s'" % node)
4780 vg_free = info.get('vg_free', None)
4781 if not isinstance(vg_free, int):
4782 raise errors.OpPrereqError("Can't compute free disk space on"
4784 if req_size > info['vg_free']:
4785 raise errors.OpPrereqError("Not enough disk space on target node %s."
4786 " %d MB available, %d MB required" %
4787 (node, info['vg_free'], req_size))
4789 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4792 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4794 if not isinstance(result.data, objects.OS):
4795 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4796 " primary node" % self.op.os_type)
4798 # bridge check on primary node
4799 bridges = [n.bridge for n in self.nics]
4800 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4803 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4804 " exist on destination node '%s'" %
4805 (",".join(bridges), pnode.name))
4807 # memory check on primary node
4809 _CheckNodeFreeMemory(self, self.pnode.name,
4810 "creating instance %s" % self.op.instance_name,
4811 self.be_full[constants.BE_MEMORY],
4814 def Exec(self, feedback_fn):
4815 """Create and add the instance to the cluster.
4818 instance = self.op.instance_name
4819 pnode_name = self.pnode.name
4821 ht_kind = self.op.hypervisor
4822 if ht_kind in constants.HTS_REQ_PORT:
4823 network_port = self.cfg.AllocatePort()
4827 ##if self.op.vnc_bind_address is None:
4828 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4830 # this is needed because os.path.join does not accept None arguments
4831 if self.op.file_storage_dir is None:
4832 string_file_storage_dir = ""
4834 string_file_storage_dir = self.op.file_storage_dir
4836 # build the full file storage dir path
4837 file_storage_dir = os.path.normpath(os.path.join(
4838 self.cfg.GetFileStorageDir(),
4839 string_file_storage_dir, instance))
4842 disks = _GenerateDiskTemplate(self,
4843 self.op.disk_template,
4844 instance, pnode_name,
4848 self.op.file_driver,
4851 iobj = objects.Instance(name=instance, os=self.op.os_type,
4852 primary_node=pnode_name,
4853 nics=self.nics, disks=disks,
4854 disk_template=self.op.disk_template,
4856 network_port=network_port,
4857 beparams=self.op.beparams,
4858 hvparams=self.op.hvparams,
4859 hypervisor=self.op.hypervisor,
4862 feedback_fn("* creating instance disks...")
4864 _CreateDisks(self, iobj)
4865 except errors.OpExecError:
4866 self.LogWarning("Device creation failed, reverting...")
4868 _RemoveDisks(self, iobj)
4870 self.cfg.ReleaseDRBDMinors(instance)
4873 feedback_fn("adding instance %s to cluster config" % instance)
4875 self.cfg.AddInstance(iobj)
4876 # Declare that we don't want to remove the instance lock anymore, as we've
4877 # added the instance to the config
4878 del self.remove_locks[locking.LEVEL_INSTANCE]
4879 # Unlock all the nodes
4880 if self.op.mode == constants.INSTANCE_IMPORT:
4881 nodes_keep = [self.op.src_node]
4882 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4883 if node != self.op.src_node]
4884 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4885 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4887 self.context.glm.release(locking.LEVEL_NODE)
4888 del self.acquired_locks[locking.LEVEL_NODE]
4890 if self.op.wait_for_sync:
4891 disk_abort = not _WaitForSync(self, iobj)
4892 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4893 # make sure the disks are not degraded (still sync-ing is ok)
4895 feedback_fn("* checking mirrors status")
4896 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4901 _RemoveDisks(self, iobj)
4902 self.cfg.RemoveInstance(iobj.name)
4903 # Make sure the instance lock gets removed
4904 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4905 raise errors.OpExecError("There are some degraded disks for"
4908 feedback_fn("creating os for instance %s on node %s" %
4909 (instance, pnode_name))
4911 if iobj.disk_template != constants.DT_DISKLESS:
4912 if self.op.mode == constants.INSTANCE_CREATE:
4913 feedback_fn("* running the instance OS create scripts...")
4914 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4915 msg = result.RemoteFailMsg()
4917 raise errors.OpExecError("Could not add os for instance %s"
4919 (instance, pnode_name, msg))
4921 elif self.op.mode == constants.INSTANCE_IMPORT:
4922 feedback_fn("* running the instance OS import scripts...")
4923 src_node = self.op.src_node
4924 src_images = self.src_images
4925 cluster_name = self.cfg.GetClusterName()
4926 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4927 src_node, src_images,
4929 import_result.Raise()
4930 for idx, result in enumerate(import_result.data):
4932 self.LogWarning("Could not import the image %s for instance"
4933 " %s, disk %d, on node %s" %
4934 (src_images[idx], instance, idx, pnode_name))
4936 # also checked in the prereq part
4937 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4941 iobj.admin_up = True
4942 self.cfg.Update(iobj)
4943 logging.info("Starting instance %s on node %s", instance, pnode_name)
4944 feedback_fn("* starting instance...")
4945 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4946 msg = result.RemoteFailMsg()
4948 raise errors.OpExecError("Could not start instance: %s" % msg)
4951 class LUConnectConsole(NoHooksLU):
4952 """Connect to an instance's console.
4954 This is somewhat special in that it returns the command line that
4955 you need to run on the master node in order to connect to the
4959 _OP_REQP = ["instance_name"]
4962 def ExpandNames(self):
4963 self._ExpandAndLockInstance()
4965 def CheckPrereq(self):
4966 """Check prerequisites.
4968 This checks that the instance is in the cluster.
4971 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4972 assert self.instance is not None, \
4973 "Cannot retrieve locked instance %s" % self.op.instance_name
4974 _CheckNodeOnline(self, self.instance.primary_node)
4976 def Exec(self, feedback_fn):
4977 """Connect to the console of an instance
4980 instance = self.instance
4981 node = instance.primary_node
4983 node_insts = self.rpc.call_instance_list([node],
4984 [instance.hypervisor])[node]
4987 if instance.name not in node_insts.data:
4988 raise errors.OpExecError("Instance %s is not running." % instance.name)
4990 logging.debug("Connecting to console of %s on %s", instance.name, node)
4992 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4993 cluster = self.cfg.GetClusterInfo()
4994 # beparams and hvparams are passed separately, to avoid editing the
4995 # instance and then saving the defaults in the instance itself.
4996 hvparams = cluster.FillHV(instance)
4997 beparams = cluster.FillBE(instance)
4998 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5001 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5004 class LUReplaceDisks(LogicalUnit):
5005 """Replace the disks of an instance.
5008 HPATH = "mirrors-replace"
5009 HTYPE = constants.HTYPE_INSTANCE
5010 _OP_REQP = ["instance_name", "mode", "disks"]
5013 def CheckArguments(self):
5014 if not hasattr(self.op, "remote_node"):
5015 self.op.remote_node = None
5016 if not hasattr(self.op, "iallocator"):
5017 self.op.iallocator = None
5019 # check for valid parameter combination
5020 cnt = [self.op.remote_node, self.op.iallocator].count(None)
5021 if self.op.mode == constants.REPLACE_DISK_CHG:
5023 raise errors.OpPrereqError("When changing the secondary either an"
5024 " iallocator script must be used or the"
5027 raise errors.OpPrereqError("Give either the iallocator or the new"
5028 " secondary, not both")
5029 else: # not replacing the secondary
5031 raise errors.OpPrereqError("The iallocator and new node options can"
5032 " be used only when changing the"
5035 def ExpandNames(self):
5036 self._ExpandAndLockInstance()
5038 if self.op.iallocator is not None:
5039 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5040 elif self.op.remote_node is not None:
5041 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5042 if remote_node is None:
5043 raise errors.OpPrereqError("Node '%s' not known" %
5044 self.op.remote_node)
5045 self.op.remote_node = remote_node
5046 # Warning: do not remove the locking of the new secondary here
5047 # unless DRBD8.AddChildren is changed to work in parallel;
5048 # currently it doesn't since parallel invocations of
5049 # FindUnusedMinor will conflict
5050 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5051 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5053 self.needed_locks[locking.LEVEL_NODE] = []
5054 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5056 def DeclareLocks(self, level):
5057 # If we're not already locking all nodes in the set we have to declare the
5058 # instance's primary/secondary nodes.
5059 if (level == locking.LEVEL_NODE and
5060 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5061 self._LockInstancesNodes()
5063 def _RunAllocator(self):
5064 """Compute a new secondary node using an IAllocator.
5067 ial = IAllocator(self,
5068 mode=constants.IALLOCATOR_MODE_RELOC,
5069 name=self.op.instance_name,
5070 relocate_from=[self.sec_node])
5072 ial.Run(self.op.iallocator)
5075 raise errors.OpPrereqError("Can't compute nodes using"
5076 " iallocator '%s': %s" % (self.op.iallocator,
5078 if len(ial.nodes) != ial.required_nodes:
5079 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5080 " of nodes (%s), required %s" %
5081 (len(ial.nodes), ial.required_nodes))
5082 self.op.remote_node = ial.nodes[0]
5083 self.LogInfo("Selected new secondary for the instance: %s",
5084 self.op.remote_node)
5086 def BuildHooksEnv(self):
5089 This runs on the master, the primary and all the secondaries.
5093 "MODE": self.op.mode,
5094 "NEW_SECONDARY": self.op.remote_node,
5095 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5097 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5099 self.cfg.GetMasterNode(),
5100 self.instance.primary_node,
5102 if self.op.remote_node is not None:
5103 nl.append(self.op.remote_node)
5106 def CheckPrereq(self):
5107 """Check prerequisites.
5109 This checks that the instance is in the cluster.
5112 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5113 assert instance is not None, \
5114 "Cannot retrieve locked instance %s" % self.op.instance_name
5115 self.instance = instance
5117 if instance.disk_template != constants.DT_DRBD8:
5118 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5121 if len(instance.secondary_nodes) != 1:
5122 raise errors.OpPrereqError("The instance has a strange layout,"
5123 " expected one secondary but found %d" %
5124 len(instance.secondary_nodes))
5126 self.sec_node = instance.secondary_nodes[0]
5128 if self.op.iallocator is not None:
5129 self._RunAllocator()
5131 remote_node = self.op.remote_node
5132 if remote_node is not None:
5133 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5134 assert self.remote_node_info is not None, \
5135 "Cannot retrieve locked node %s" % remote_node
5137 self.remote_node_info = None
5138 if remote_node == instance.primary_node:
5139 raise errors.OpPrereqError("The specified node is the primary node of"
5141 elif remote_node == self.sec_node:
5142 raise errors.OpPrereqError("The specified node is already the"
5143 " secondary node of the instance.")
5145 if self.op.mode == constants.REPLACE_DISK_PRI:
5146 n1 = self.tgt_node = instance.primary_node
5147 n2 = self.oth_node = self.sec_node
5148 elif self.op.mode == constants.REPLACE_DISK_SEC:
5149 n1 = self.tgt_node = self.sec_node
5150 n2 = self.oth_node = instance.primary_node
5151 elif self.op.mode == constants.REPLACE_DISK_CHG:
5152 n1 = self.new_node = remote_node
5153 n2 = self.oth_node = instance.primary_node
5154 self.tgt_node = self.sec_node
5155 _CheckNodeNotDrained(self, remote_node)
5157 raise errors.ProgrammerError("Unhandled disk replace mode")
5159 _CheckNodeOnline(self, n1)
5160 _CheckNodeOnline(self, n2)
5162 if not self.op.disks:
5163 self.op.disks = range(len(instance.disks))
5165 for disk_idx in self.op.disks:
5166 instance.FindDisk(disk_idx)
5168 def _ExecD8DiskOnly(self, feedback_fn):
5169 """Replace a disk on the primary or secondary for dbrd8.
5171 The algorithm for replace is quite complicated:
5173 1. for each disk to be replaced:
5175 1. create new LVs on the target node with unique names
5176 1. detach old LVs from the drbd device
5177 1. rename old LVs to name_replaced.<time_t>
5178 1. rename new LVs to old LVs
5179 1. attach the new LVs (with the old names now) to the drbd device
5181 1. wait for sync across all devices
5183 1. for each modified disk:
5185 1. remove old LVs (which have the name name_replaces.<time_t>)
5187 Failures are not very well handled.
5191 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5192 instance = self.instance
5194 vgname = self.cfg.GetVGName()
5197 tgt_node = self.tgt_node
5198 oth_node = self.oth_node
5200 # Step: check device activation
5201 self.proc.LogStep(1, steps_total, "check device existence")
5202 info("checking volume groups")
5203 my_vg = cfg.GetVGName()
5204 results = self.rpc.call_vg_list([oth_node, tgt_node])
5206 raise errors.OpExecError("Can't list volume groups on the nodes")
5207 for node in oth_node, tgt_node:
5209 if res.failed or not res.data or my_vg not in res.data:
5210 raise errors.OpExecError("Volume group '%s' not found on %s" %
5212 for idx, dev in enumerate(instance.disks):
5213 if idx not in self.op.disks:
5215 for node in tgt_node, oth_node:
5216 info("checking disk/%d on %s" % (idx, node))
5217 cfg.SetDiskID(dev, node)
5218 result = self.rpc.call_blockdev_find(node, dev)
5219 msg = result.RemoteFailMsg()
5220 if not msg and not result.payload:
5221 msg = "disk not found"
5223 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5226 # Step: check other node consistency
5227 self.proc.LogStep(2, steps_total, "check peer consistency")
5228 for idx, dev in enumerate(instance.disks):
5229 if idx not in self.op.disks:
5231 info("checking disk/%d consistency on %s" % (idx, oth_node))
5232 if not _CheckDiskConsistency(self, dev, oth_node,
5233 oth_node==instance.primary_node):
5234 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5235 " to replace disks on this node (%s)" %
5236 (oth_node, tgt_node))
5238 # Step: create new storage
5239 self.proc.LogStep(3, steps_total, "allocate new storage")
5240 for idx, dev in enumerate(instance.disks):
5241 if idx not in self.op.disks:
5244 cfg.SetDiskID(dev, tgt_node)
5245 lv_names = [".disk%d_%s" % (idx, suf)
5246 for suf in ["data", "meta"]]
5247 names = _GenerateUniqueNames(self, lv_names)
5248 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5249 logical_id=(vgname, names[0]))
5250 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5251 logical_id=(vgname, names[1]))
5252 new_lvs = [lv_data, lv_meta]
5253 old_lvs = dev.children
5254 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5255 info("creating new local storage on %s for %s" %
5256 (tgt_node, dev.iv_name))
5257 # we pass force_create=True to force the LVM creation
5258 for new_lv in new_lvs:
5259 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5260 _GetInstanceInfoText(instance), False)
5262 # Step: for each lv, detach+rename*2+attach
5263 self.proc.LogStep(4, steps_total, "change drbd configuration")
5264 for dev, old_lvs, new_lvs in iv_names.itervalues():
5265 info("detaching %s drbd from local storage" % dev.iv_name)
5266 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5267 msg = result.RemoteFailMsg()
5269 raise errors.OpExecError("Can't detach drbd from local storage on node"
5270 " %s for device %s: %s" %
5271 (tgt_node, dev.iv_name, msg))
5273 #cfg.Update(instance)
5275 # ok, we created the new LVs, so now we know we have the needed
5276 # storage; as such, we proceed on the target node to rename
5277 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5278 # using the assumption that logical_id == physical_id (which in
5279 # turn is the unique_id on that node)
5281 # FIXME(iustin): use a better name for the replaced LVs
5282 temp_suffix = int(time.time())
5283 ren_fn = lambda d, suff: (d.physical_id[0],
5284 d.physical_id[1] + "_replaced-%s" % suff)
5285 # build the rename list based on what LVs exist on the node
5287 for to_ren in old_lvs:
5288 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5289 if not result.RemoteFailMsg() and result.payload:
5291 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5293 info("renaming the old LVs on the target node")
5294 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5295 msg = result.RemoteFailMsg()
5297 raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5299 # now we rename the new LVs to the old LVs
5300 info("renaming the new LVs on the target node")
5301 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5302 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5303 msg = result.RemoteFailMsg()
5305 raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5308 for old, new in zip(old_lvs, new_lvs):
5309 new.logical_id = old.logical_id
5310 cfg.SetDiskID(new, tgt_node)
5312 for disk in old_lvs:
5313 disk.logical_id = ren_fn(disk, temp_suffix)
5314 cfg.SetDiskID(disk, tgt_node)
5316 # now that the new lvs have the old name, we can add them to the device
5317 info("adding new mirror component on %s" % tgt_node)
5318 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5319 msg = result.RemoteFailMsg()
5321 for new_lv in new_lvs:
5322 msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5324 warning("Can't rollback device %s: %s", dev, msg,
5325 hint="cleanup manually the unused logical volumes")
5326 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5328 dev.children = new_lvs
5329 cfg.Update(instance)
5331 # Step: wait for sync
5333 # this can fail as the old devices are degraded and _WaitForSync
5334 # does a combined result over all disks, so we don't check its
5336 self.proc.LogStep(5, steps_total, "sync devices")
5337 _WaitForSync(self, instance, unlock=True)
5339 # so check manually all the devices
5340 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5341 cfg.SetDiskID(dev, instance.primary_node)
5342 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5343 msg = result.RemoteFailMsg()
5344 if not msg and not result.payload:
5345 msg = "disk not found"
5347 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5349 if result.payload[5]:
5350 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5352 # Step: remove old storage
5353 self.proc.LogStep(6, steps_total, "removing old storage")
5354 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5355 info("remove logical volumes for %s" % name)
5357 cfg.SetDiskID(lv, tgt_node)
5358 msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5360 warning("Can't remove old LV: %s" % msg,
5361 hint="manually remove unused LVs")
5364 def _ExecD8Secondary(self, feedback_fn):
5365 """Replace the secondary node for drbd8.
5367 The algorithm for replace is quite complicated:
5368 - for all disks of the instance:
5369 - create new LVs on the new node with same names
5370 - shutdown the drbd device on the old secondary
5371 - disconnect the drbd network on the primary
5372 - create the drbd device on the new secondary
5373 - network attach the drbd on the primary, using an artifice:
5374 the drbd code for Attach() will connect to the network if it
5375 finds a device which is connected to the good local disks but
5377 - wait for sync across all devices
5378 - remove all disks from the old secondary
5380 Failures are not very well handled.
5384 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5385 instance = self.instance
5389 old_node = self.tgt_node
5390 new_node = self.new_node
5391 pri_node = instance.primary_node
5393 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5394 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5395 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5398 # Step: check device activation
5399 self.proc.LogStep(1, steps_total, "check device existence")
5400 info("checking volume groups")
5401 my_vg = cfg.GetVGName()
5402 results = self.rpc.call_vg_list([pri_node, new_node])
5403 for node in pri_node, new_node:
5405 if res.failed or not res.data or my_vg not in res.data:
5406 raise errors.OpExecError("Volume group '%s' not found on %s" %
5408 for idx, dev in enumerate(instance.disks):
5409 if idx not in self.op.disks:
5411 info("checking disk/%d on %s" % (idx, pri_node))
5412 cfg.SetDiskID(dev, pri_node)
5413 result = self.rpc.call_blockdev_find(pri_node, dev)
5414 msg = result.RemoteFailMsg()
5415 if not msg and not result.payload:
5416 msg = "disk not found"
5418 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5419 (idx, pri_node, msg))
5421 # Step: check other node consistency
5422 self.proc.LogStep(2, steps_total, "check peer consistency")
5423 for idx, dev in enumerate(instance.disks):
5424 if idx not in self.op.disks:
5426 info("checking disk/%d consistency on %s" % (idx, pri_node))
5427 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5428 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5429 " unsafe to replace the secondary" %
5432 # Step: create new storage
5433 self.proc.LogStep(3, steps_total, "allocate new storage")
5434 for idx, dev in enumerate(instance.disks):
5435 info("adding new local storage on %s for disk/%d" %
5437 # we pass force_create=True to force LVM creation
5438 for new_lv in dev.children:
5439 _CreateBlockDev(self, new_node, instance, new_lv, True,
5440 _GetInstanceInfoText(instance), False)
5442 # Step 4: dbrd minors and drbd setups changes
5443 # after this, we must manually remove the drbd minors on both the
5444 # error and the success paths
5445 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5447 logging.debug("Allocated minors %s" % (minors,))
5448 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5449 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5451 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5452 # create new devices on new_node; note that we create two IDs:
5453 # one without port, so the drbd will be activated without
5454 # networking information on the new node at this stage, and one
5455 # with network, for the latter activation in step 4
5456 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5457 if pri_node == o_node1:
5462 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5463 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5465 iv_names[idx] = (dev, dev.children, new_net_id)
5466 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5468 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5469 logical_id=new_alone_id,
5470 children=dev.children)
5472 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5473 _GetInstanceInfoText(instance), False)
5474 except errors.GenericError:
5475 self.cfg.ReleaseDRBDMinors(instance.name)
5478 for idx, dev in enumerate(instance.disks):
5479 # we have new devices, shutdown the drbd on the old secondary
5480 info("shutting down drbd for disk/%d on old node" % idx)
5481 cfg.SetDiskID(dev, old_node)
5482 msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5484 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5486 hint="Please cleanup this device manually as soon as possible")
5488 info("detaching primary drbds from the network (=> standalone)")
5489 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5490 instance.disks)[pri_node]
5492 msg = result.RemoteFailMsg()
5494 # detaches didn't succeed (unlikely)
5495 self.cfg.ReleaseDRBDMinors(instance.name)
5496 raise errors.OpExecError("Can't detach the disks from the network on"
5497 " old node: %s" % (msg,))
5499 # if we managed to detach at least one, we update all the disks of
5500 # the instance to point to the new secondary
5501 info("updating instance configuration")
5502 for dev, _, new_logical_id in iv_names.itervalues():
5503 dev.logical_id = new_logical_id
5504 cfg.SetDiskID(dev, pri_node)
5505 cfg.Update(instance)
5507 # and now perform the drbd attach
5508 info("attaching primary drbds to new secondary (standalone => connected)")
5509 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5510 instance.disks, instance.name,
5512 for to_node, to_result in result.items():
5513 msg = to_result.RemoteFailMsg()
5515 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5516 hint="please do a gnt-instance info to see the"
5519 # this can fail as the old devices are degraded and _WaitForSync
5520 # does a combined result over all disks, so we don't check its
5522 self.proc.LogStep(5, steps_total, "sync devices")
5523 _WaitForSync(self, instance, unlock=True)
5525 # so check manually all the devices
5526 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5527 cfg.SetDiskID(dev, pri_node)
5528 result = self.rpc.call_blockdev_find(pri_node, dev)
5529 msg = result.RemoteFailMsg()
5530 if not msg and not result.payload:
5531 msg = "disk not found"
5533 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5535 if result.payload[5]:
5536 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5538 self.proc.LogStep(6, steps_total, "removing old storage")
5539 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5540 info("remove logical volumes for disk/%d" % idx)
5542 cfg.SetDiskID(lv, old_node)
5543 msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5545 warning("Can't remove LV on old secondary: %s", msg,
5546 hint="Cleanup stale volumes by hand")
5548 def Exec(self, feedback_fn):
5549 """Execute disk replacement.
5551 This dispatches the disk replacement to the appropriate handler.
5554 instance = self.instance
5556 # Activate the instance disks if we're replacing them on a down instance
5557 if not instance.admin_up:
5558 _StartInstanceDisks(self, instance, True)
5560 if self.op.mode == constants.REPLACE_DISK_CHG:
5561 fn = self._ExecD8Secondary
5563 fn = self._ExecD8DiskOnly
5565 ret = fn(feedback_fn)
5567 # Deactivate the instance disks if we're replacing them on a down instance
5568 if not instance.admin_up:
5569 _SafeShutdownInstanceDisks(self, instance)
5574 class LUGrowDisk(LogicalUnit):
5575 """Grow a disk of an instance.
5579 HTYPE = constants.HTYPE_INSTANCE
5580 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5583 def ExpandNames(self):
5584 self._ExpandAndLockInstance()
5585 self.needed_locks[locking.LEVEL_NODE] = []
5586 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5588 def DeclareLocks(self, level):
5589 if level == locking.LEVEL_NODE:
5590 self._LockInstancesNodes()
5592 def BuildHooksEnv(self):
5595 This runs on the master, the primary and all the secondaries.
5599 "DISK": self.op.disk,
5600 "AMOUNT": self.op.amount,
5602 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5604 self.cfg.GetMasterNode(),
5605 self.instance.primary_node,
5609 def CheckPrereq(self):
5610 """Check prerequisites.
5612 This checks that the instance is in the cluster.
5615 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5616 assert instance is not None, \
5617 "Cannot retrieve locked instance %s" % self.op.instance_name
5618 nodenames = list(instance.all_nodes)
5619 for node in nodenames:
5620 _CheckNodeOnline(self, node)
5623 self.instance = instance
5625 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5626 raise errors.OpPrereqError("Instance's disk layout does not support"
5629 self.disk = instance.FindDisk(self.op.disk)
5631 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5632 instance.hypervisor)
5633 for node in nodenames:
5634 info = nodeinfo[node]
5635 if info.failed or not info.data:
5636 raise errors.OpPrereqError("Cannot get current information"
5637 " from node '%s'" % node)
5638 vg_free = info.data.get('vg_free', None)
5639 if not isinstance(vg_free, int):
5640 raise errors.OpPrereqError("Can't compute free disk space on"
5642 if self.op.amount > vg_free:
5643 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5644 " %d MiB available, %d MiB required" %
5645 (node, vg_free, self.op.amount))
5647 def Exec(self, feedback_fn):
5648 """Execute disk grow.
5651 instance = self.instance
5653 for node in instance.all_nodes:
5654 self.cfg.SetDiskID(disk, node)
5655 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5656 msg = result.RemoteFailMsg()
5658 raise errors.OpExecError("Grow request failed to node %s: %s" %
5660 disk.RecordGrow(self.op.amount)
5661 self.cfg.Update(instance)
5662 if self.op.wait_for_sync:
5663 disk_abort = not _WaitForSync(self, instance)
5665 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5666 " status.\nPlease check the instance.")
5669 class LUQueryInstanceData(NoHooksLU):
5670 """Query runtime instance data.
5673 _OP_REQP = ["instances", "static"]
5676 def ExpandNames(self):
5677 self.needed_locks = {}
5678 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5680 if not isinstance(self.op.instances, list):
5681 raise errors.OpPrereqError("Invalid argument type 'instances'")
5683 if self.op.instances:
5684 self.wanted_names = []
5685 for name in self.op.instances:
5686 full_name = self.cfg.ExpandInstanceName(name)
5687 if full_name is None:
5688 raise errors.OpPrereqError("Instance '%s' not known" % name)
5689 self.wanted_names.append(full_name)
5690 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5692 self.wanted_names = None
5693 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5695 self.needed_locks[locking.LEVEL_NODE] = []
5696 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5698 def DeclareLocks(self, level):
5699 if level == locking.LEVEL_NODE:
5700 self._LockInstancesNodes()
5702 def CheckPrereq(self):
5703 """Check prerequisites.
5705 This only checks the optional instance list against the existing names.
5708 if self.wanted_names is None:
5709 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5711 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5712 in self.wanted_names]
5715 def _ComputeDiskStatus(self, instance, snode, dev):
5716 """Compute block device status.
5719 static = self.op.static
5721 self.cfg.SetDiskID(dev, instance.primary_node)
5722 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5723 if dev_pstatus.offline:
5726 msg = dev_pstatus.RemoteFailMsg()
5728 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5729 (instance.name, msg))
5730 dev_pstatus = dev_pstatus.payload
5734 if dev.dev_type in constants.LDS_DRBD:
5735 # we change the snode then (otherwise we use the one passed in)
5736 if dev.logical_id[0] == instance.primary_node:
5737 snode = dev.logical_id[1]
5739 snode = dev.logical_id[0]
5741 if snode and not static:
5742 self.cfg.SetDiskID(dev, snode)
5743 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5744 if dev_sstatus.offline:
5747 msg = dev_sstatus.RemoteFailMsg()
5749 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5750 (instance.name, msg))
5751 dev_sstatus = dev_sstatus.payload
5756 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5757 for child in dev.children]
5762 "iv_name": dev.iv_name,
5763 "dev_type": dev.dev_type,
5764 "logical_id": dev.logical_id,
5765 "physical_id": dev.physical_id,
5766 "pstatus": dev_pstatus,
5767 "sstatus": dev_sstatus,
5768 "children": dev_children,
5774 def Exec(self, feedback_fn):
5775 """Gather and return data"""
5778 cluster = self.cfg.GetClusterInfo()
5780 for instance in self.wanted_instances:
5781 if not self.op.static:
5782 remote_info = self.rpc.call_instance_info(instance.primary_node,
5784 instance.hypervisor)
5786 remote_info = remote_info.data
5787 if remote_info and "state" in remote_info:
5790 remote_state = "down"
5793 if instance.admin_up:
5796 config_state = "down"
5798 disks = [self._ComputeDiskStatus(instance, None, device)
5799 for device in instance.disks]
5802 "name": instance.name,
5803 "config_state": config_state,
5804 "run_state": remote_state,
5805 "pnode": instance.primary_node,
5806 "snodes": instance.secondary_nodes,
5808 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5810 "hypervisor": instance.hypervisor,
5811 "network_port": instance.network_port,
5812 "hv_instance": instance.hvparams,
5813 "hv_actual": cluster.FillHV(instance),
5814 "be_instance": instance.beparams,
5815 "be_actual": cluster.FillBE(instance),
5818 result[instance.name] = idict
5823 class LUSetInstanceParams(LogicalUnit):
5824 """Modifies an instances's parameters.
5827 HPATH = "instance-modify"
5828 HTYPE = constants.HTYPE_INSTANCE
5829 _OP_REQP = ["instance_name"]
5832 def CheckArguments(self):
5833 if not hasattr(self.op, 'nics'):
5835 if not hasattr(self.op, 'disks'):
5837 if not hasattr(self.op, 'beparams'):
5838 self.op.beparams = {}
5839 if not hasattr(self.op, 'hvparams'):
5840 self.op.hvparams = {}
5841 self.op.force = getattr(self.op, "force", False)
5842 if not (self.op.nics or self.op.disks or
5843 self.op.hvparams or self.op.beparams):
5844 raise errors.OpPrereqError("No changes submitted")
5848 for disk_op, disk_dict in self.op.disks:
5849 if disk_op == constants.DDM_REMOVE:
5852 elif disk_op == constants.DDM_ADD:
5855 if not isinstance(disk_op, int):
5856 raise errors.OpPrereqError("Invalid disk index")
5857 if disk_op == constants.DDM_ADD:
5858 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5859 if mode not in constants.DISK_ACCESS_SET:
5860 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5861 size = disk_dict.get('size', None)
5863 raise errors.OpPrereqError("Required disk parameter size missing")
5866 except ValueError, err:
5867 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5869 disk_dict['size'] = size
5871 # modification of disk
5872 if 'size' in disk_dict:
5873 raise errors.OpPrereqError("Disk size change not possible, use"
5876 if disk_addremove > 1:
5877 raise errors.OpPrereqError("Only one disk add or remove operation"
5878 " supported at a time")
5882 for nic_op, nic_dict in self.op.nics:
5883 if nic_op == constants.DDM_REMOVE:
5886 elif nic_op == constants.DDM_ADD:
5889 if not isinstance(nic_op, int):
5890 raise errors.OpPrereqError("Invalid nic index")
5892 # nic_dict should be a dict
5893 nic_ip = nic_dict.get('ip', None)
5894 if nic_ip is not None:
5895 if nic_ip.lower() == constants.VALUE_NONE:
5896 nic_dict['ip'] = None
5898 if not utils.IsValidIP(nic_ip):
5899 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5901 nic_bridge = nic_dict.get('bridge', None)
5902 nic_link = nic_dict.get('link', None)
5903 if nic_bridge and nic_link:
5904 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5905 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5906 nic_dict['bridge'] = None
5907 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5908 nic_dict['link'] = None
5910 if nic_op == constants.DDM_ADD:
5911 nic_mac = nic_dict.get('mac', None)
5913 nic_dict['mac'] = constants.VALUE_AUTO
5915 if 'mac' in nic_dict:
5916 nic_mac = nic_dict['mac']
5917 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5918 if not utils.IsValidMac(nic_mac):
5919 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5920 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5921 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5922 " modifying an existing nic")
5924 if nic_addremove > 1:
5925 raise errors.OpPrereqError("Only one NIC add or remove operation"
5926 " supported at a time")
5928 def ExpandNames(self):
5929 self._ExpandAndLockInstance()
5930 self.needed_locks[locking.LEVEL_NODE] = []
5931 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5933 def DeclareLocks(self, level):
5934 if level == locking.LEVEL_NODE:
5935 self._LockInstancesNodes()
5937 def BuildHooksEnv(self):
5940 This runs on the master, primary and secondaries.
5944 if constants.BE_MEMORY in self.be_new:
5945 args['memory'] = self.be_new[constants.BE_MEMORY]
5946 if constants.BE_VCPUS in self.be_new:
5947 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5948 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5949 # information at all.
5952 nic_override = dict(self.op.nics)
5953 for idx, nic in enumerate(self.instance.nics):
5954 if idx in nic_override:
5955 this_nic_override = nic_override[idx]
5957 this_nic_override = {}
5958 if 'ip' in this_nic_override:
5959 ip = this_nic_override['ip']
5962 if 'bridge' in this_nic_override:
5963 bridge = this_nic_override['bridge']
5966 if 'mac' in this_nic_override:
5967 mac = this_nic_override['mac']
5970 args['nics'].append((ip, bridge, mac))
5971 if constants.DDM_ADD in nic_override:
5972 ip = nic_override[constants.DDM_ADD].get('ip', None)
5973 bridge = nic_override[constants.DDM_ADD]['bridge']
5974 mac = nic_override[constants.DDM_ADD]['mac']
5975 args['nics'].append((ip, bridge, mac))
5976 elif constants.DDM_REMOVE in nic_override:
5977 del args['nics'][-1]
5979 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5980 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5983 def _GetUpdatedParams(self, old_params, update_dict,
5984 default_values, parameter_types):
5985 """Return the new params dict for the given params.
5987 @type old_params: dict
5988 @type old_params: old parameters
5989 @type update_dict: dict
5990 @type update_dict: dict containing new parameter values,
5991 or constants.VALUE_DEFAULT to reset the
5992 parameter to its default value
5993 @type default_values: dict
5994 @param default_values: default values for the filled parameters
5995 @type parameter_types: dict
5996 @param parameter_types: dict mapping target dict keys to types
5997 in constants.ENFORCEABLE_TYPES
5998 @rtype: (dict, dict)
5999 @return: (new_parameters, filled_parameters)
6002 params_copy = copy.deepcopy(old_params)
6003 for key, val in update_dict.iteritems():
6004 if val == constants.VALUE_DEFAULT:
6006 del params_copy[key]
6010 params_copy[key] = val
6011 utils.ForceDictType(params_copy, parameter_types)
6012 params_filled = objects.FillDict(default_values, params_copy)
6013 return (params_copy, params_filled)
6015 def CheckPrereq(self):
6016 """Check prerequisites.
6018 This only checks the instance list against the existing names.
6021 force = self.force = self.op.force
6023 # checking the new params on the primary/secondary nodes
6025 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6026 cluster = self.cluster = self.cfg.GetClusterInfo()
6027 assert self.instance is not None, \
6028 "Cannot retrieve locked instance %s" % self.op.instance_name
6029 pnode = instance.primary_node
6030 nodelist = list(instance.all_nodes)
6032 # hvparams processing
6033 if self.op.hvparams:
6034 i_hvdict, hv_new = self._GetUpdatedParams(
6035 instance.hvparams, self.op.hvparams,
6036 cluster.hvparams[instance.hypervisor],
6037 constants.HVS_PARAMETER_TYPES)
6039 hypervisor.GetHypervisor(
6040 instance.hypervisor).CheckParameterSyntax(hv_new)
6041 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6042 self.hv_new = hv_new # the new actual values
6043 self.hv_inst = i_hvdict # the new dict (without defaults)
6045 self.hv_new = self.hv_inst = {}
6047 # beparams processing
6048 if self.op.beparams:
6049 i_bedict, be_new = self._GetUpdatedParams(
6050 instance.beparams, self.op.beparams,
6051 cluster.beparams[constants.PP_DEFAULT],
6052 constants.BES_PARAMETER_TYPES)
6053 self.be_new = be_new # the new actual values
6054 self.be_inst = i_bedict # the new dict (without defaults)
6056 self.be_new = self.be_inst = {}
6060 if constants.BE_MEMORY in self.op.beparams and not self.force:
6061 mem_check_list = [pnode]
6062 if be_new[constants.BE_AUTO_BALANCE]:
6063 # either we changed auto_balance to yes or it was from before
6064 mem_check_list.extend(instance.secondary_nodes)
6065 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6066 instance.hypervisor)
6067 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6068 instance.hypervisor)
6069 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6070 # Assume the primary node is unreachable and go ahead
6071 self.warn.append("Can't get info from primary node %s" % pnode)
6073 if not instance_info.failed and instance_info.data:
6074 current_mem = int(instance_info.data['memory'])
6076 # Assume instance not running
6077 # (there is a slight race condition here, but it's not very probable,
6078 # and we have no other way to check)
6080 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6081 nodeinfo[pnode].data['memory_free'])
6083 raise errors.OpPrereqError("This change will prevent the instance"
6084 " from starting, due to %d MB of memory"
6085 " missing on its primary node" % miss_mem)
6087 if be_new[constants.BE_AUTO_BALANCE]:
6088 for node, nres in nodeinfo.iteritems():
6089 if node not in instance.secondary_nodes:
6091 if nres.failed or not isinstance(nres.data, dict):
6092 self.warn.append("Can't get info from secondary node %s" % node)
6093 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6094 self.warn.append("Not enough memory to failover instance to"
6095 " secondary node %s" % node)
6100 for nic_op, nic_dict in self.op.nics:
6101 if nic_op == constants.DDM_REMOVE:
6102 if not instance.nics:
6103 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6105 if nic_op != constants.DDM_ADD:
6107 if nic_op < 0 or nic_op >= len(instance.nics):
6108 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6110 (nic_op, len(instance.nics)))
6111 old_nic_params = instance.nics[nic_op].nicparams
6112 old_nic_ip = instance.nics[nic_op].ip
6117 update_params_dict = dict([(key, nic_dict[key])
6118 for key in constants.NICS_PARAMETERS
6119 if key in nic_dict])
6121 if 'bridge' in nic_dict:
6122 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6124 new_nic_params, new_filled_nic_params = \
6125 self._GetUpdatedParams(old_nic_params, update_params_dict,
6126 cluster.nicparams[constants.PP_DEFAULT],
6127 constants.NICS_PARAMETER_TYPES)
6128 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6129 self.nic_pinst[nic_op] = new_nic_params
6130 self.nic_pnew[nic_op] = new_filled_nic_params
6131 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6133 if new_nic_mode == constants.NIC_MODE_BRIDGED:
6134 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6135 result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6138 msg = ("Bridge '%s' doesn't exist on one of"
6139 " the instance nodes" % nic_bridge)
6141 self.warn.append(msg)
6143 raise errors.OpPrereqError(msg)
6144 if new_nic_mode == constants.NIC_MODE_ROUTED:
6145 if 'ip' in nic_dict:
6146 nic_ip = nic_dict['ip']
6150 raise errors.OpPrereqError('Cannot set the nic ip to None'
6152 if 'mac' in nic_dict:
6153 nic_mac = nic_dict['mac']
6155 raise errors.OpPrereqError('Cannot set the nic mac to None')
6156 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6157 # otherwise generate the mac
6158 nic_dict['mac'] = self.cfg.GenerateMAC()
6160 # or validate/reserve the current one
6161 if self.cfg.IsMacInUse(nic_mac):
6162 raise errors.OpPrereqError("MAC address %s already in use"
6163 " in cluster" % nic_mac)
6166 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6167 raise errors.OpPrereqError("Disk operations not supported for"
6168 " diskless instances")
6169 for disk_op, disk_dict in self.op.disks:
6170 if disk_op == constants.DDM_REMOVE:
6171 if len(instance.disks) == 1:
6172 raise errors.OpPrereqError("Cannot remove the last disk of"
6174 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6175 ins_l = ins_l[pnode]
6176 if ins_l.failed or not isinstance(ins_l.data, list):
6177 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6178 if instance.name in ins_l.data:
6179 raise errors.OpPrereqError("Instance is running, can't remove"
6182 if (disk_op == constants.DDM_ADD and
6183 len(instance.nics) >= constants.MAX_DISKS):
6184 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6185 " add more" % constants.MAX_DISKS)
6186 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6188 if disk_op < 0 or disk_op >= len(instance.disks):
6189 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6191 (disk_op, len(instance.disks)))
6195 def Exec(self, feedback_fn):
6196 """Modifies an instance.
6198 All parameters take effect only at the next restart of the instance.
6201 # Process here the warnings from CheckPrereq, as we don't have a
6202 # feedback_fn there.
6203 for warn in self.warn:
6204 feedback_fn("WARNING: %s" % warn)
6207 instance = self.instance
6208 cluster = self.cluster
6210 for disk_op, disk_dict in self.op.disks:
6211 if disk_op == constants.DDM_REMOVE:
6212 # remove the last disk
6213 device = instance.disks.pop()
6214 device_idx = len(instance.disks)
6215 for node, disk in device.ComputeNodeTree(instance.primary_node):
6216 self.cfg.SetDiskID(disk, node)
6217 msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6219 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6220 " continuing anyway", device_idx, node, msg)
6221 result.append(("disk/%d" % device_idx, "remove"))
6222 elif disk_op == constants.DDM_ADD:
6224 if instance.disk_template == constants.DT_FILE:
6225 file_driver, file_path = instance.disks[0].logical_id
6226 file_path = os.path.dirname(file_path)
6228 file_driver = file_path = None
6229 disk_idx_base = len(instance.disks)
6230 new_disk = _GenerateDiskTemplate(self,
6231 instance.disk_template,
6232 instance.name, instance.primary_node,
6233 instance.secondary_nodes,
6238 instance.disks.append(new_disk)
6239 info = _GetInstanceInfoText(instance)
6241 logging.info("Creating volume %s for instance %s",
6242 new_disk.iv_name, instance.name)
6243 # Note: this needs to be kept in sync with _CreateDisks
6245 for node in instance.all_nodes:
6246 f_create = node == instance.primary_node
6248 _CreateBlockDev(self, node, instance, new_disk,
6249 f_create, info, f_create)
6250 except errors.OpExecError, err:
6251 self.LogWarning("Failed to create volume %s (%s) on"
6253 new_disk.iv_name, new_disk, node, err)
6254 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6255 (new_disk.size, new_disk.mode)))
6257 # change a given disk
6258 instance.disks[disk_op].mode = disk_dict['mode']
6259 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6261 for nic_op, nic_dict in self.op.nics:
6262 if nic_op == constants.DDM_REMOVE:
6263 # remove the last nic
6264 del instance.nics[-1]
6265 result.append(("nic.%d" % len(instance.nics), "remove"))
6266 elif nic_op == constants.DDM_ADD:
6267 # mac and bridge should be set, by now
6268 mac = nic_dict['mac']
6269 ip = nic_dict.get('ip', None)
6270 nicparams = self.nic_pinst[constants.DDM_ADD]
6271 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6272 instance.nics.append(new_nic)
6273 result.append(("nic.%d" % (len(instance.nics) - 1),
6274 "add:mac=%s,ip=%s,mode=%s,link=%s" %
6275 (new_nic.mac, new_nic.ip,
6276 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6277 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6280 for key in 'mac', 'ip':
6282 setattr(instance.nics[nic_op], key, nic_dict[key])
6283 if nic_op in self.nic_pnew:
6284 instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6285 for key, val in nic_dict.iteritems():
6286 result.append(("nic.%s/%d" % (key, nic_op), val))
6289 if self.op.hvparams:
6290 instance.hvparams = self.hv_inst
6291 for key, val in self.op.hvparams.iteritems():
6292 result.append(("hv/%s" % key, val))
6295 if self.op.beparams:
6296 instance.beparams = self.be_inst
6297 for key, val in self.op.beparams.iteritems():
6298 result.append(("be/%s" % key, val))
6300 self.cfg.Update(instance)
6305 class LUQueryExports(NoHooksLU):
6306 """Query the exports list
6309 _OP_REQP = ['nodes']
6312 def ExpandNames(self):
6313 self.needed_locks = {}
6314 self.share_locks[locking.LEVEL_NODE] = 1
6315 if not self.op.nodes:
6316 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6318 self.needed_locks[locking.LEVEL_NODE] = \
6319 _GetWantedNodes(self, self.op.nodes)
6321 def CheckPrereq(self):
6322 """Check prerequisites.
6325 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6327 def Exec(self, feedback_fn):
6328 """Compute the list of all the exported system images.
6331 @return: a dictionary with the structure node->(export-list)
6332 where export-list is a list of the instances exported on
6336 rpcresult = self.rpc.call_export_list(self.nodes)
6338 for node in rpcresult:
6339 if rpcresult[node].failed:
6340 result[node] = False
6342 result[node] = rpcresult[node].data
6347 class LUExportInstance(LogicalUnit):
6348 """Export an instance to an image in the cluster.
6351 HPATH = "instance-export"
6352 HTYPE = constants.HTYPE_INSTANCE
6353 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6356 def ExpandNames(self):
6357 self._ExpandAndLockInstance()
6358 # FIXME: lock only instance primary and destination node
6360 # Sad but true, for now we have do lock all nodes, as we don't know where
6361 # the previous export might be, and and in this LU we search for it and
6362 # remove it from its current node. In the future we could fix this by:
6363 # - making a tasklet to search (share-lock all), then create the new one,
6364 # then one to remove, after
6365 # - removing the removal operation altoghether
6366 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6368 def DeclareLocks(self, level):
6369 """Last minute lock declaration."""
6370 # All nodes are locked anyway, so nothing to do here.
6372 def BuildHooksEnv(self):
6375 This will run on the master, primary node and target node.
6379 "EXPORT_NODE": self.op.target_node,
6380 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6382 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6383 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6384 self.op.target_node]
6387 def CheckPrereq(self):
6388 """Check prerequisites.
6390 This checks that the instance and node names are valid.
6393 instance_name = self.op.instance_name
6394 self.instance = self.cfg.GetInstanceInfo(instance_name)
6395 assert self.instance is not None, \
6396 "Cannot retrieve locked instance %s" % self.op.instance_name
6397 _CheckNodeOnline(self, self.instance.primary_node)
6399 self.dst_node = self.cfg.GetNodeInfo(
6400 self.cfg.ExpandNodeName(self.op.target_node))
6402 if self.dst_node is None:
6403 # This is wrong node name, not a non-locked node
6404 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6405 _CheckNodeOnline(self, self.dst_node.name)
6406 _CheckNodeNotDrained(self, self.dst_node.name)
6408 # instance disk type verification
6409 for disk in self.instance.disks:
6410 if disk.dev_type == constants.LD_FILE:
6411 raise errors.OpPrereqError("Export not supported for instances with"
6412 " file-based disks")
6414 def Exec(self, feedback_fn):
6415 """Export an instance to an image in the cluster.
6418 instance = self.instance
6419 dst_node = self.dst_node
6420 src_node = instance.primary_node
6421 if self.op.shutdown:
6422 # shutdown the instance, but not the disks
6423 result = self.rpc.call_instance_shutdown(src_node, instance)
6424 msg = result.RemoteFailMsg()
6426 raise errors.OpExecError("Could not shutdown instance %s on"
6428 (instance.name, src_node, msg))
6430 vgname = self.cfg.GetVGName()
6434 # set the disks ID correctly since call_instance_start needs the
6435 # correct drbd minor to create the symlinks
6436 for disk in instance.disks:
6437 self.cfg.SetDiskID(disk, src_node)
6440 for disk in instance.disks:
6441 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6442 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6443 if new_dev_name.failed or not new_dev_name.data:
6444 self.LogWarning("Could not snapshot block device %s on node %s",
6445 disk.logical_id[1], src_node)
6446 snap_disks.append(False)
6448 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6449 logical_id=(vgname, new_dev_name.data),
6450 physical_id=(vgname, new_dev_name.data),
6451 iv_name=disk.iv_name)
6452 snap_disks.append(new_dev)
6455 if self.op.shutdown and instance.admin_up:
6456 result = self.rpc.call_instance_start(src_node, instance, None, None)
6457 msg = result.RemoteFailMsg()
6459 _ShutdownInstanceDisks(self, instance)
6460 raise errors.OpExecError("Could not start instance: %s" % msg)
6462 # TODO: check for size
6464 cluster_name = self.cfg.GetClusterName()
6465 for idx, dev in enumerate(snap_disks):
6467 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6468 instance, cluster_name, idx)
6469 if result.failed or not result.data:
6470 self.LogWarning("Could not export block device %s from node %s to"
6471 " node %s", dev.logical_id[1], src_node,
6473 msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6475 self.LogWarning("Could not remove snapshot block device %s from node"
6476 " %s: %s", dev.logical_id[1], src_node, msg)
6478 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6479 if result.failed or not result.data:
6480 self.LogWarning("Could not finalize export for instance %s on node %s",
6481 instance.name, dst_node.name)
6483 nodelist = self.cfg.GetNodeList()
6484 nodelist.remove(dst_node.name)
6486 # on one-node clusters nodelist will be empty after the removal
6487 # if we proceed the backup would be removed because OpQueryExports
6488 # substitutes an empty list with the full cluster node list.
6490 exportlist = self.rpc.call_export_list(nodelist)
6491 for node in exportlist:
6492 if exportlist[node].failed:
6494 if instance.name in exportlist[node].data:
6495 if not self.rpc.call_export_remove(node, instance.name):
6496 self.LogWarning("Could not remove older export for instance %s"
6497 " on node %s", instance.name, node)
6500 class LURemoveExport(NoHooksLU):
6501 """Remove exports related to the named instance.
6504 _OP_REQP = ["instance_name"]
6507 def ExpandNames(self):
6508 self.needed_locks = {}
6509 # We need all nodes to be locked in order for RemoveExport to work, but we
6510 # don't need to lock the instance itself, as nothing will happen to it (and
6511 # we can remove exports also for a removed instance)
6512 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6514 def CheckPrereq(self):
6515 """Check prerequisites.
6519 def Exec(self, feedback_fn):
6520 """Remove any export.
6523 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6524 # If the instance was not found we'll try with the name that was passed in.
6525 # This will only work if it was an FQDN, though.
6527 if not instance_name:
6529 instance_name = self.op.instance_name
6531 exportlist = self.rpc.call_export_list(self.acquired_locks[
6532 locking.LEVEL_NODE])
6534 for node in exportlist:
6535 if exportlist[node].failed:
6536 self.LogWarning("Failed to query node %s, continuing" % node)
6538 if instance_name in exportlist[node].data:
6540 result = self.rpc.call_export_remove(node, instance_name)
6541 if result.failed or not result.data:
6542 logging.error("Could not remove export for instance %s"
6543 " on node %s", instance_name, node)
6545 if fqdn_warn and not found:
6546 feedback_fn("Export not found. If trying to remove an export belonging"
6547 " to a deleted instance please use its Fully Qualified"
6551 class TagsLU(NoHooksLU):
6554 This is an abstract class which is the parent of all the other tags LUs.
6558 def ExpandNames(self):
6559 self.needed_locks = {}
6560 if self.op.kind == constants.TAG_NODE:
6561 name = self.cfg.ExpandNodeName(self.op.name)
6563 raise errors.OpPrereqError("Invalid node name (%s)" %
6566 self.needed_locks[locking.LEVEL_NODE] = name
6567 elif self.op.kind == constants.TAG_INSTANCE:
6568 name = self.cfg.ExpandInstanceName(self.op.name)
6570 raise errors.OpPrereqError("Invalid instance name (%s)" %
6573 self.needed_locks[locking.LEVEL_INSTANCE] = name
6575 def CheckPrereq(self):
6576 """Check prerequisites.
6579 if self.op.kind == constants.TAG_CLUSTER:
6580 self.target = self.cfg.GetClusterInfo()
6581 elif self.op.kind == constants.TAG_NODE:
6582 self.target = self.cfg.GetNodeInfo(self.op.name)
6583 elif self.op.kind == constants.TAG_INSTANCE:
6584 self.target = self.cfg.GetInstanceInfo(self.op.name)
6586 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6590 class LUGetTags(TagsLU):
6591 """Returns the tags of a given object.
6594 _OP_REQP = ["kind", "name"]
6597 def Exec(self, feedback_fn):
6598 """Returns the tag list.
6601 return list(self.target.GetTags())
6604 class LUSearchTags(NoHooksLU):
6605 """Searches the tags for a given pattern.
6608 _OP_REQP = ["pattern"]
6611 def ExpandNames(self):
6612 self.needed_locks = {}
6614 def CheckPrereq(self):
6615 """Check prerequisites.
6617 This checks the pattern passed for validity by compiling it.
6621 self.re = re.compile(self.op.pattern)
6622 except re.error, err:
6623 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6624 (self.op.pattern, err))
6626 def Exec(self, feedback_fn):
6627 """Returns the tag list.
6631 tgts = [("/cluster", cfg.GetClusterInfo())]
6632 ilist = cfg.GetAllInstancesInfo().values()
6633 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6634 nlist = cfg.GetAllNodesInfo().values()
6635 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6637 for path, target in tgts:
6638 for tag in target.GetTags():
6639 if self.re.search(tag):
6640 results.append((path, tag))
6644 class LUAddTags(TagsLU):
6645 """Sets a tag on a given object.
6648 _OP_REQP = ["kind", "name", "tags"]
6651 def CheckPrereq(self):
6652 """Check prerequisites.
6654 This checks the type and length of the tag name and value.
6657 TagsLU.CheckPrereq(self)
6658 for tag in self.op.tags:
6659 objects.TaggableObject.ValidateTag(tag)
6661 def Exec(self, feedback_fn):
6666 for tag in self.op.tags:
6667 self.target.AddTag(tag)
6668 except errors.TagError, err:
6669 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6671 self.cfg.Update(self.target)
6672 except errors.ConfigurationError:
6673 raise errors.OpRetryError("There has been a modification to the"
6674 " config file and the operation has been"
6675 " aborted. Please retry.")
6678 class LUDelTags(TagsLU):
6679 """Delete a list of tags from a given object.
6682 _OP_REQP = ["kind", "name", "tags"]
6685 def CheckPrereq(self):
6686 """Check prerequisites.
6688 This checks that we have the given tag.
6691 TagsLU.CheckPrereq(self)
6692 for tag in self.op.tags:
6693 objects.TaggableObject.ValidateTag(tag)
6694 del_tags = frozenset(self.op.tags)
6695 cur_tags = self.target.GetTags()
6696 if not del_tags <= cur_tags:
6697 diff_tags = del_tags - cur_tags
6698 diff_names = ["'%s'" % tag for tag in diff_tags]
6700 raise errors.OpPrereqError("Tag(s) %s not found" %
6701 (",".join(diff_names)))
6703 def Exec(self, feedback_fn):
6704 """Remove the tag from the object.
6707 for tag in self.op.tags:
6708 self.target.RemoveTag(tag)
6710 self.cfg.Update(self.target)
6711 except errors.ConfigurationError:
6712 raise errors.OpRetryError("There has been a modification to the"
6713 " config file and the operation has been"
6714 " aborted. Please retry.")
6717 class LUTestDelay(NoHooksLU):
6718 """Sleep for a specified amount of time.
6720 This LU sleeps on the master and/or nodes for a specified amount of
6724 _OP_REQP = ["duration", "on_master", "on_nodes"]
6727 def ExpandNames(self):
6728 """Expand names and set required locks.
6730 This expands the node list, if any.
6733 self.needed_locks = {}
6734 if self.op.on_nodes:
6735 # _GetWantedNodes can be used here, but is not always appropriate to use
6736 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6738 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6739 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6741 def CheckPrereq(self):
6742 """Check prerequisites.
6746 def Exec(self, feedback_fn):
6747 """Do the actual sleep.
6750 if self.op.on_master:
6751 if not utils.TestDelay(self.op.duration):
6752 raise errors.OpExecError("Error during master delay test")
6753 if self.op.on_nodes:
6754 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6756 raise errors.OpExecError("Complete failure from rpc call")
6757 for node, node_result in result.items():
6759 if not node_result.data:
6760 raise errors.OpExecError("Failure during rpc call to node %s,"
6761 " result: %s" % (node, node_result.data))
6764 class IAllocator(object):
6765 """IAllocator framework.
6767 An IAllocator instance has three sets of attributes:
6768 - cfg that is needed to query the cluster
6769 - input data (all members of the _KEYS class attribute are required)
6770 - four buffer attributes (in|out_data|text), that represent the
6771 input (to the external script) in text and data structure format,
6772 and the output from it, again in two formats
6773 - the result variables from the script (success, info, nodes) for
6778 "mem_size", "disks", "disk_template",
6779 "os", "tags", "nics", "vcpus", "hypervisor",
6785 def __init__(self, lu, mode, name, **kwargs):
6787 # init buffer variables
6788 self.in_text = self.out_text = self.in_data = self.out_data = None
6789 # init all input fields so that pylint is happy
6792 self.mem_size = self.disks = self.disk_template = None
6793 self.os = self.tags = self.nics = self.vcpus = None
6794 self.hypervisor = None
6795 self.relocate_from = None
6797 self.required_nodes = None
6798 # init result fields
6799 self.success = self.info = self.nodes = None
6800 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6801 keyset = self._ALLO_KEYS
6802 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6803 keyset = self._RELO_KEYS
6805 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6806 " IAllocator" % self.mode)
6808 if key not in keyset:
6809 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6810 " IAllocator" % key)
6811 setattr(self, key, kwargs[key])
6813 if key not in kwargs:
6814 raise errors.ProgrammerError("Missing input parameter '%s' to"
6815 " IAllocator" % key)
6816 self._BuildInputData()
6818 def _ComputeClusterData(self):
6819 """Compute the generic allocator input data.
6821 This is the data that is independent of the actual operation.
6825 cluster_info = cfg.GetClusterInfo()
6828 "version": constants.IALLOCATOR_VERSION,
6829 "cluster_name": cfg.GetClusterName(),
6830 "cluster_tags": list(cluster_info.GetTags()),
6831 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6832 # we don't have job IDs
6834 iinfo = cfg.GetAllInstancesInfo().values()
6835 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6839 node_list = cfg.GetNodeList()
6841 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6842 hypervisor_name = self.hypervisor
6843 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6844 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6846 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6848 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6849 cluster_info.enabled_hypervisors)
6850 for nname, nresult in node_data.items():
6851 # first fill in static (config-based) values
6852 ninfo = cfg.GetNodeInfo(nname)
6854 "tags": list(ninfo.GetTags()),
6855 "primary_ip": ninfo.primary_ip,
6856 "secondary_ip": ninfo.secondary_ip,
6857 "offline": ninfo.offline,
6858 "drained": ninfo.drained,
6859 "master_candidate": ninfo.master_candidate,
6862 if not ninfo.offline:
6864 if not isinstance(nresult.data, dict):
6865 raise errors.OpExecError("Can't get data for node %s" % nname)
6866 remote_info = nresult.data
6867 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6868 'vg_size', 'vg_free', 'cpu_total']:
6869 if attr not in remote_info:
6870 raise errors.OpExecError("Node '%s' didn't return attribute"
6871 " '%s'" % (nname, attr))
6873 remote_info[attr] = int(remote_info[attr])
6874 except ValueError, err:
6875 raise errors.OpExecError("Node '%s' returned invalid value"
6876 " for '%s': %s" % (nname, attr, err))
6877 # compute memory used by primary instances
6878 i_p_mem = i_p_up_mem = 0
6879 for iinfo, beinfo in i_list:
6880 if iinfo.primary_node == nname:
6881 i_p_mem += beinfo[constants.BE_MEMORY]
6882 if iinfo.name not in node_iinfo[nname].data:
6885 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6886 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6887 remote_info['memory_free'] -= max(0, i_mem_diff)
6890 i_p_up_mem += beinfo[constants.BE_MEMORY]
6892 # compute memory used by instances
6894 "total_memory": remote_info['memory_total'],
6895 "reserved_memory": remote_info['memory_dom0'],
6896 "free_memory": remote_info['memory_free'],
6897 "total_disk": remote_info['vg_size'],
6898 "free_disk": remote_info['vg_free'],
6899 "total_cpus": remote_info['cpu_total'],
6900 "i_pri_memory": i_p_mem,
6901 "i_pri_up_memory": i_p_up_mem,
6905 node_results[nname] = pnr
6906 data["nodes"] = node_results
6910 for iinfo, beinfo in i_list:
6911 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6912 for n in iinfo.nics]
6914 "tags": list(iinfo.GetTags()),
6915 "admin_up": iinfo.admin_up,
6916 "vcpus": beinfo[constants.BE_VCPUS],
6917 "memory": beinfo[constants.BE_MEMORY],
6919 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6921 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6922 "disk_template": iinfo.disk_template,
6923 "hypervisor": iinfo.hypervisor,
6925 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6927 instance_data[iinfo.name] = pir
6929 data["instances"] = instance_data
6933 def _AddNewInstance(self):
6934 """Add new instance data to allocator structure.
6936 This in combination with _AllocatorGetClusterData will create the
6937 correct structure needed as input for the allocator.
6939 The checks for the completeness of the opcode must have already been
6945 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6947 if self.disk_template in constants.DTS_NET_MIRROR:
6948 self.required_nodes = 2
6950 self.required_nodes = 1
6954 "disk_template": self.disk_template,
6957 "vcpus": self.vcpus,
6958 "memory": self.mem_size,
6959 "disks": self.disks,
6960 "disk_space_total": disk_space,
6962 "required_nodes": self.required_nodes,
6964 data["request"] = request
6966 def _AddRelocateInstance(self):
6967 """Add relocate instance data to allocator structure.
6969 This in combination with _IAllocatorGetClusterData will create the
6970 correct structure needed as input for the allocator.
6972 The checks for the completeness of the opcode must have already been
6976 instance = self.lu.cfg.GetInstanceInfo(self.name)
6977 if instance is None:
6978 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6979 " IAllocator" % self.name)
6981 if instance.disk_template not in constants.DTS_NET_MIRROR:
6982 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6984 if len(instance.secondary_nodes) != 1:
6985 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6987 self.required_nodes = 1
6988 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6989 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6994 "disk_space_total": disk_space,
6995 "required_nodes": self.required_nodes,
6996 "relocate_from": self.relocate_from,
6998 self.in_data["request"] = request
7000 def _BuildInputData(self):
7001 """Build input data structures.
7004 self._ComputeClusterData()
7006 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7007 self._AddNewInstance()
7009 self._AddRelocateInstance()
7011 self.in_text = serializer.Dump(self.in_data)
7013 def Run(self, name, validate=True, call_fn=None):
7014 """Run an instance allocator and return the results.
7018 call_fn = self.lu.rpc.call_iallocator_runner
7021 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7024 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7025 raise errors.OpExecError("Invalid result from master iallocator runner")
7027 rcode, stdout, stderr, fail = result.data
7029 if rcode == constants.IARUN_NOTFOUND:
7030 raise errors.OpExecError("Can't find allocator '%s'" % name)
7031 elif rcode == constants.IARUN_FAILURE:
7032 raise errors.OpExecError("Instance allocator call failed: %s,"
7033 " output: %s" % (fail, stdout+stderr))
7034 self.out_text = stdout
7036 self._ValidateResult()
7038 def _ValidateResult(self):
7039 """Process the allocator results.
7041 This will process and if successful save the result in
7042 self.out_data and the other parameters.
7046 rdict = serializer.Load(self.out_text)
7047 except Exception, err:
7048 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7050 if not isinstance(rdict, dict):
7051 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7053 for key in "success", "info", "nodes":
7054 if key not in rdict:
7055 raise errors.OpExecError("Can't parse iallocator results:"
7056 " missing key '%s'" % key)
7057 setattr(self, key, rdict[key])
7059 if not isinstance(rdict["nodes"], list):
7060 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7062 self.out_data = rdict
7065 class LUTestAllocator(NoHooksLU):
7066 """Run allocator tests.
7068 This LU runs the allocator tests
7071 _OP_REQP = ["direction", "mode", "name"]
7073 def CheckPrereq(self):
7074 """Check prerequisites.
7076 This checks the opcode parameters depending on the director and mode test.
7079 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7080 for attr in ["name", "mem_size", "disks", "disk_template",
7081 "os", "tags", "nics", "vcpus"]:
7082 if not hasattr(self.op, attr):
7083 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7085 iname = self.cfg.ExpandInstanceName(self.op.name)
7086 if iname is not None:
7087 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7089 if not isinstance(self.op.nics, list):
7090 raise errors.OpPrereqError("Invalid parameter 'nics'")
7091 for row in self.op.nics:
7092 if (not isinstance(row, dict) or
7095 "bridge" not in row):
7096 raise errors.OpPrereqError("Invalid contents of the"
7097 " 'nics' parameter")
7098 if not isinstance(self.op.disks, list):
7099 raise errors.OpPrereqError("Invalid parameter 'disks'")
7100 for row in self.op.disks:
7101 if (not isinstance(row, dict) or
7102 "size" not in row or
7103 not isinstance(row["size"], int) or
7104 "mode" not in row or
7105 row["mode"] not in ['r', 'w']):
7106 raise errors.OpPrereqError("Invalid contents of the"
7107 " 'disks' parameter")
7108 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7109 self.op.hypervisor = self.cfg.GetHypervisorType()
7110 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7111 if not hasattr(self.op, "name"):
7112 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7113 fname = self.cfg.ExpandInstanceName(self.op.name)
7115 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7117 self.op.name = fname
7118 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7120 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7123 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7124 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7125 raise errors.OpPrereqError("Missing allocator name")
7126 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7127 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7130 def Exec(self, feedback_fn):
7131 """Run the allocator test.
7134 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7135 ial = IAllocator(self,
7138 mem_size=self.op.mem_size,
7139 disks=self.op.disks,
7140 disk_template=self.op.disk_template,
7144 vcpus=self.op.vcpus,
7145 hypervisor=self.op.hypervisor,
7148 ial = IAllocator(self,
7151 relocate_from=list(self.relocate_from),
7154 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7155 result = ial.in_text
7157 ial.Run(self.op.allocator, validate=False)
7158 result = ial.out_text