4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 for attr_name in self._OP_REQP:
94 attr_val = getattr(op, attr_name, None)
96 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108 ssh = property(fget=__GetSSH)
110 def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments.
113 This method is for doing a simple syntactic check and ensure
114 validity of opcode parameters, without any cluster-related
115 checks. While the same can be accomplished in ExpandNames and/or
116 CheckPrereq, doing these separate is better because:
118 - ExpandNames is left as as purely a lock-related function
119 - CheckPrereq is run after we have aquired locks (and possible
122 The function is allowed to change the self.op attribute so that
123 later methods can no longer worry about missing parameters.
128 def ExpandNames(self):
129 """Expand names for this LU.
131 This method is called before starting to execute the opcode, and it should
132 update all the parameters of the opcode to their canonical form (e.g. a
133 short node name must be fully expanded after this method has successfully
134 completed). This way locking, hooks, logging, ecc. can work correctly.
136 LUs which implement this method must also populate the self.needed_locks
137 member, as a dict with lock levels as keys, and a list of needed lock names
140 - use an empty dict if you don't need any lock
141 - if you don't need any lock at a particular level omit that level
142 - don't put anything for the BGL level
143 - if you want all locks at a level use locking.ALL_SET as a value
145 If you need to share locks (rather than acquire them exclusively) at one
146 level you can modify self.share_locks, setting a true value (usually 1) for
147 that level. By default locks are not shared.
151 # Acquire all nodes and one instance
152 self.needed_locks = {
153 locking.LEVEL_NODE: locking.ALL_SET,
154 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156 # Acquire just two nodes
157 self.needed_locks = {
158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161 self.needed_locks = {} # No, you can't leave it to the default value None
164 # The implementation of this method is mandatory only if the new LU is
165 # concurrent, so that old LUs don't need to be changed all at the same
168 self.needed_locks = {} # Exclusive LUs don't need locks.
170 raise NotImplementedError
172 def DeclareLocks(self, level):
173 """Declare LU locking needs for a level
175 While most LUs can just declare their locking needs at ExpandNames time,
176 sometimes there's the need to calculate some locks after having acquired
177 the ones before. This function is called just before acquiring locks at a
178 particular level, but after acquiring the ones at lower levels, and permits
179 such calculations. It can be used to modify self.needed_locks, and by
180 default it does nothing.
182 This function is only called if you have something already set in
183 self.needed_locks for the level.
185 @param level: Locking level which is going to be locked
186 @type level: member of ganeti.locking.LEVELS
190 def CheckPrereq(self):
191 """Check prerequisites for this LU.
193 This method should check that the prerequisites for the execution
194 of this LU are fulfilled. It can do internode communication, but
195 it should be idempotent - no cluster or system changes are
198 The method should raise errors.OpPrereqError in case something is
199 not fulfilled. Its return value is ignored.
201 This method should also update all the parameters of the opcode to
202 their canonical form if it hasn't been done by ExpandNames before.
205 raise NotImplementedError
207 def Exec(self, feedback_fn):
210 This method should implement the actual work. It should raise
211 errors.OpExecError for failures that are somewhat dealt with in
215 raise NotImplementedError
217 def BuildHooksEnv(self):
218 """Build hooks environment for this LU.
220 This method should return a three-node tuple consisting of: a dict
221 containing the environment that will be used for running the
222 specific hook for this LU, a list of node names on which the hook
223 should run before the execution, and a list of node names on which
224 the hook should run after the execution.
226 The keys of the dict must not have 'GANETI_' prefixed as this will
227 be handled in the hooks runner. Also note additional keys will be
228 added by the hooks runner. If the LU doesn't define any
229 environment, an empty dict (and not None) should be returned.
231 No nodes should be returned as an empty list (and not None).
233 Note that if the HPATH for a LU class is None, this function will
237 raise NotImplementedError
239 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks.
242 This method is called every time a hooks phase is executed, and notifies
243 the Logical Unit about the hooks' result. The LU can then use it to alter
244 its result based on the hooks. By default the method does nothing and the
245 previous result is passed back unchanged but any LU can define it if it
246 wants to use the local cluster hook-scripts somehow.
248 @param phase: one of L{constants.HOOKS_PHASE_POST} or
249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250 @param hook_results: the results of the multi-node hooks rpc call
251 @param feedback_fn: function used send feedback back to the caller
252 @param lu_result: the previous Exec result this LU had, or None
254 @return: the new Exec result, based on the previous result
260 def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance.
263 Many LUs that work on an instance take its name in self.op.instance_name
264 and need to expand it and then declare the expanded name for locking. This
265 function does it, and then updates self.op.instance_name to the expanded
266 name. It also initializes needed_locks as a dict, if this hasn't been done
270 if self.needed_locks is None:
271 self.needed_locks = {}
273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274 "_ExpandAndLockInstance called with instance-level locks set"
275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276 if expanded_name is None:
277 raise errors.OpPrereqError("Instance '%s' not known" %
278 self.op.instance_name)
279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280 self.op.instance_name = expanded_name
282 def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking.
285 This function should be called after locking one or more instances to lock
286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287 with all primary or secondary nodes for instances already locked and
288 present in self.needed_locks[locking.LEVEL_INSTANCE].
290 It should be called from DeclareLocks, and for safety only works if
291 self.recalculate_locks[locking.LEVEL_NODE] is set.
293 In the future it may grow parameters to just lock some instance's nodes, or
294 to just lock primaries or secondary nodes, if needed.
296 If should be called in DeclareLocks in a way similar to::
298 if level == locking.LEVEL_NODE:
299 self._LockInstancesNodes()
301 @type primary_only: boolean
302 @param primary_only: only lock primary nodes of locked instances
305 assert locking.LEVEL_NODE in self.recalculate_locks, \
306 "_LockInstancesNodes helper function called with no nodes to recalculate"
308 # TODO: check if we're really been called with the instance locks held
310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311 # future we might want to have different behaviors depending on the value
312 # of self.recalculate_locks[locking.LEVEL_NODE]
314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315 instance = self.context.cfg.GetInstanceInfo(instance_name)
316 wanted_nodes.append(instance.primary_node)
318 wanted_nodes.extend(instance.secondary_nodes)
320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325 del self.recalculate_locks[locking.LEVEL_NODE]
328 class NoHooksLU(LogicalUnit):
329 """Simple LU which runs no hooks.
331 This LU is intended as a parent for other LogicalUnits which will
332 run no hooks, in order to reduce duplicate code.
339 def _GetWantedNodes(lu, nodes):
340 """Returns list of checked and expanded node names.
342 @type lu: L{LogicalUnit}
343 @param lu: the logical unit on whose behalf we execute
345 @param nodes: list of node names or None for all nodes
347 @return: the list of nodes, sorted
348 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351 if not isinstance(nodes, list):
352 raise errors.OpPrereqError("Invalid argument type 'nodes'")
355 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356 " non-empty list of nodes whose name is to be expanded.")
360 node = lu.cfg.ExpandNodeName(name)
362 raise errors.OpPrereqError("No such node name '%s'" % name)
365 return utils.NiceSort(wanted)
368 def _GetWantedInstances(lu, instances):
369 """Returns list of checked and expanded instance names.
371 @type lu: L{LogicalUnit}
372 @param lu: the logical unit on whose behalf we execute
373 @type instances: list
374 @param instances: list of instance names or None for all instances
376 @return: the list of instances, sorted
377 @raise errors.OpPrereqError: if the instances parameter is wrong type
378 @raise errors.OpPrereqError: if any of the passed instances is not found
381 if not isinstance(instances, list):
382 raise errors.OpPrereqError("Invalid argument type 'instances'")
387 for name in instances:
388 instance = lu.cfg.ExpandInstanceName(name)
390 raise errors.OpPrereqError("No such instance name '%s'" % name)
391 wanted.append(instance)
394 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398 def _CheckOutputFields(static, dynamic, selected):
399 """Checks whether all selected fields are valid.
401 @type static: L{utils.FieldSet}
402 @param static: static fields set
403 @type dynamic: L{utils.FieldSet}
404 @param dynamic: dynamic fields set
411 delta = f.NonMatching(selected)
413 raise errors.OpPrereqError("Unknown output fields selected: %s"
417 def _CheckBooleanOpField(op, name):
418 """Validates boolean opcode parameters.
420 This will ensure that an opcode parameter is either a boolean value,
421 or None (but that it always exists).
424 val = getattr(op, name, None)
425 if not (val is None or isinstance(val, bool)):
426 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428 setattr(op, name, val)
431 def _CheckNodeOnline(lu, node):
432 """Ensure that a given node is online.
434 @param lu: the LU on behalf of which we make the check
435 @param node: the node to check
436 @raise errors.OpPrereqError: if the node is offline
439 if lu.cfg.GetNodeInfo(node).offline:
440 raise errors.OpPrereqError("Can't use offline node %s" % node)
443 def _CheckNodeNotDrained(lu, node):
444 """Ensure that a given node is not drained.
446 @param lu: the LU on behalf of which we make the check
447 @param node: the node to check
448 @raise errors.OpPrereqError: if the node is drained
451 if lu.cfg.GetNodeInfo(node).drained:
452 raise errors.OpPrereqError("Can't use drained node %s" % node)
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456 memory, vcpus, nics, disk_template, disks):
457 """Builds instance related env variables for hooks
459 This builds the hook environment from individual variables.
462 @param name: the name of the instance
463 @type primary_node: string
464 @param primary_node: the name of the instance's primary node
465 @type secondary_nodes: list
466 @param secondary_nodes: list of secondary nodes as strings
467 @type os_type: string
468 @param os_type: the name of the instance's OS
469 @type status: boolean
470 @param status: the should_run status of the instance
472 @param memory: the memory size of the instance
474 @param vcpus: the count of VCPUs the instance has
476 @param nics: list of tuples (ip, bridge, mac) representing
477 the NICs the instance has
478 @type disk_template: string
479 @param disk_template: the distk template of the instance
481 @param disks: the list of (size, mode) pairs
483 @return: the hook environment for this instance
492 "INSTANCE_NAME": name,
493 "INSTANCE_PRIMARY": primary_node,
494 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495 "INSTANCE_OS_TYPE": os_type,
496 "INSTANCE_STATUS": str_status,
497 "INSTANCE_MEMORY": memory,
498 "INSTANCE_VCPUS": vcpus,
499 "INSTANCE_DISK_TEMPLATE": disk_template,
503 nic_count = len(nics)
504 for idx, (ip, bridge, mac) in enumerate(nics):
507 env["INSTANCE_NIC%d_IP" % idx] = ip
508 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509 env["INSTANCE_NIC%d_MAC" % idx] = mac
513 env["INSTANCE_NIC_COUNT"] = nic_count
516 disk_count = len(disks)
517 for idx, (size, mode) in enumerate(disks):
518 env["INSTANCE_DISK%d_SIZE" % idx] = size
519 env["INSTANCE_DISK%d_MODE" % idx] = mode
523 env["INSTANCE_DISK_COUNT"] = disk_count
528 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529 """Builds instance related env variables for hooks from an object.
531 @type lu: L{LogicalUnit}
532 @param lu: the logical unit on whose behalf we execute
533 @type instance: L{objects.Instance}
534 @param instance: the instance for which we should build the
537 @param override: dictionary with key/values that will override
540 @return: the hook environment dictionary
543 bep = lu.cfg.GetClusterInfo().FillBE(instance)
545 'name': instance.name,
546 'primary_node': instance.primary_node,
547 'secondary_nodes': instance.secondary_nodes,
548 'os_type': instance.os,
549 'status': instance.admin_up,
550 'memory': bep[constants.BE_MEMORY],
551 'vcpus': bep[constants.BE_VCPUS],
552 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553 'disk_template': instance.disk_template,
554 'disks': [(disk.size, disk.mode) for disk in instance.disks],
557 args.update(override)
558 return _BuildInstanceHookEnv(**args)
561 def _AdjustCandidatePool(lu):
562 """Adjust the candidate pool after node operations.
565 mod_list = lu.cfg.MaintainCandidatePool()
567 lu.LogInfo("Promoted nodes to master candidate role: %s",
568 ", ".join(node.name for node in mod_list))
569 for name in mod_list:
570 lu.context.ReaddNode(name)
571 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
573 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
577 def _CheckInstanceBridgesExist(lu, instance):
578 """Check that the brigdes needed by an instance exist.
581 # check bridges existance
582 brlist = [nic.bridge for nic in instance.nics]
583 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
586 raise errors.OpPrereqError("One or more target bridges %s does not"
587 " exist on destination node '%s'" %
588 (brlist, instance.primary_node))
591 class LUDestroyCluster(NoHooksLU):
592 """Logical unit for destroying the cluster.
597 def CheckPrereq(self):
598 """Check prerequisites.
600 This checks whether the cluster is empty.
602 Any errors are signalled by raising errors.OpPrereqError.
605 master = self.cfg.GetMasterNode()
607 nodelist = self.cfg.GetNodeList()
608 if len(nodelist) != 1 or nodelist[0] != master:
609 raise errors.OpPrereqError("There are still %d node(s) in"
610 " this cluster." % (len(nodelist) - 1))
611 instancelist = self.cfg.GetInstanceList()
613 raise errors.OpPrereqError("There are still %d instance(s) in"
614 " this cluster." % len(instancelist))
616 def Exec(self, feedback_fn):
617 """Destroys the cluster.
620 master = self.cfg.GetMasterNode()
621 result = self.rpc.call_node_stop_master(master, False)
624 raise errors.OpExecError("Could not disable the master role")
625 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
626 utils.CreateBackup(priv_key)
627 utils.CreateBackup(pub_key)
631 class LUVerifyCluster(LogicalUnit):
632 """Verifies the cluster status.
635 HPATH = "cluster-verify"
636 HTYPE = constants.HTYPE_CLUSTER
637 _OP_REQP = ["skip_checks"]
640 def ExpandNames(self):
641 self.needed_locks = {
642 locking.LEVEL_NODE: locking.ALL_SET,
643 locking.LEVEL_INSTANCE: locking.ALL_SET,
645 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
647 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
648 node_result, feedback_fn, master_files,
650 """Run multiple tests against a node.
654 - compares ganeti version
655 - checks vg existance and size > 20G
656 - checks config file checksum
657 - checks ssh to other nodes
659 @type nodeinfo: L{objects.Node}
660 @param nodeinfo: the node to check
661 @param file_list: required list of files
662 @param local_cksum: dictionary of local files and their checksums
663 @param node_result: the results from the node
664 @param feedback_fn: function used to accumulate results
665 @param master_files: list of files that only masters should have
666 @param drbd_map: the useddrbd minors for this node, in
667 form of minor: (instance, must_exist) which correspond to instances
668 and their running status
669 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
674 # main result, node_result should be a non-empty dict
675 if not node_result or not isinstance(node_result, dict):
676 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
679 # compares ganeti version
680 local_version = constants.PROTOCOL_VERSION
681 remote_version = node_result.get('version', None)
682 if not (remote_version and isinstance(remote_version, (list, tuple)) and
683 len(remote_version) == 2):
684 feedback_fn(" - ERROR: connection to %s failed" % (node))
687 if local_version != remote_version[0]:
688 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
689 " node %s %s" % (local_version, node, remote_version[0]))
692 # node seems compatible, we can actually try to look into its results
696 # full package version
697 if constants.RELEASE_VERSION != remote_version[1]:
698 feedback_fn(" - WARNING: software version mismatch: master %s,"
700 (constants.RELEASE_VERSION, node, remote_version[1]))
702 # checks vg existence and size > 20G
703 if vg_name is not None:
704 vglist = node_result.get(constants.NV_VGLIST, None)
706 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
710 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
711 constants.MIN_VG_SIZE)
713 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
716 # checks config file checksum
718 remote_cksum = node_result.get(constants.NV_FILELIST, None)
719 if not isinstance(remote_cksum, dict):
721 feedback_fn(" - ERROR: node hasn't returned file checksum data")
723 for file_name in file_list:
724 node_is_mc = nodeinfo.master_candidate
725 must_have_file = file_name not in master_files
726 if file_name not in remote_cksum:
727 if node_is_mc or must_have_file:
729 feedback_fn(" - ERROR: file '%s' missing" % file_name)
730 elif remote_cksum[file_name] != local_cksum[file_name]:
731 if node_is_mc or must_have_file:
733 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
735 # not candidate and this is not a must-have file
737 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
740 # all good, except non-master/non-must have combination
741 if not node_is_mc and not must_have_file:
742 feedback_fn(" - ERROR: file '%s' should not exist on non master"
743 " candidates" % file_name)
747 if constants.NV_NODELIST not in node_result:
749 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
751 if node_result[constants.NV_NODELIST]:
753 for node in node_result[constants.NV_NODELIST]:
754 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
755 (node, node_result[constants.NV_NODELIST][node]))
757 if constants.NV_NODENETTEST not in node_result:
759 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
761 if node_result[constants.NV_NODENETTEST]:
763 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
765 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
766 (node, node_result[constants.NV_NODENETTEST][node]))
768 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769 if isinstance(hyp_result, dict):
770 for hv_name, hv_result in hyp_result.iteritems():
771 if hv_result is not None:
772 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
773 (hv_name, hv_result))
775 # check used drbd list
776 if vg_name is not None:
777 used_minors = node_result.get(constants.NV_DRBDLIST, [])
778 if not isinstance(used_minors, (tuple, list)):
779 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
782 for minor, (iname, must_exist) in drbd_map.items():
783 if minor not in used_minors and must_exist:
784 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
785 " not active" % (minor, iname))
787 for minor in used_minors:
788 if minor not in drbd_map:
789 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
795 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796 node_instance, feedback_fn, n_offline):
797 """Verify an instance.
799 This function checks to see if the required block devices are
800 available on the instance's node.
805 node_current = instanceconfig.primary_node
808 instanceconfig.MapLVsByNode(node_vol_should)
810 for node in node_vol_should:
811 if node in n_offline:
812 # ignore missing volumes on offline nodes
814 for volume in node_vol_should[node]:
815 if node not in node_vol_is or volume not in node_vol_is[node]:
816 feedback_fn(" - ERROR: volume %s missing on node %s" %
820 if instanceconfig.admin_up:
821 if ((node_current not in node_instance or
822 not instance in node_instance[node_current]) and
823 node_current not in n_offline):
824 feedback_fn(" - ERROR: instance %s not running on node %s" %
825 (instance, node_current))
828 for node in node_instance:
829 if (not node == node_current):
830 if instance in node_instance[node]:
831 feedback_fn(" - ERROR: instance %s should not run on node %s" %
837 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838 """Verify if there are any unknown volumes in the cluster.
840 The .os, .swap and backup volumes are ignored. All other volumes are
846 for node in node_vol_is:
847 for volume in node_vol_is[node]:
848 if node not in node_vol_should or volume not in node_vol_should[node]:
849 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
854 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855 """Verify the list of running instances.
857 This checks what instances are running but unknown to the cluster.
861 for node in node_instance:
862 for runninginstance in node_instance[node]:
863 if runninginstance not in instancelist:
864 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
865 (runninginstance, node))
869 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870 """Verify N+1 Memory Resilience.
872 Check that if one single node dies we can still start all the instances it
878 for node, nodeinfo in node_info.iteritems():
879 # This code checks that every node which is now listed as secondary has
880 # enough memory to host all instances it is supposed to should a single
881 # other node in the cluster fail.
882 # FIXME: not ready for failover to an arbitrary node
883 # FIXME: does not support file-backed instances
884 # WARNING: we currently take into account down instances as well as up
885 # ones, considering that even if they're down someone might want to start
886 # them even in the event of a node failure.
887 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
889 for instance in instances:
890 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891 if bep[constants.BE_AUTO_BALANCE]:
892 needed_mem += bep[constants.BE_MEMORY]
893 if nodeinfo['mfree'] < needed_mem:
894 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
895 " failovers should node %s fail" % (node, prinode))
899 def CheckPrereq(self):
900 """Check prerequisites.
902 Transform the list of checks we're going to skip into a set and check that
903 all its members are valid.
906 self.skip_set = frozenset(self.op.skip_checks)
907 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908 raise errors.OpPrereqError("Invalid checks to be skipped specified")
910 def BuildHooksEnv(self):
913 Cluster-Verify hooks just rone in the post phase and their failure makes
914 the output be logged in the verify output and the verification to fail.
917 all_nodes = self.cfg.GetNodeList()
919 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
921 for node in self.cfg.GetAllNodesInfo().values():
922 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
924 return env, [], all_nodes
926 def Exec(self, feedback_fn):
927 """Verify integrity of cluster, performing various test on nodes.
931 feedback_fn("* Verifying global settings")
932 for msg in self.cfg.VerifyConfig():
933 feedback_fn(" - ERROR: %s" % msg)
935 vg_name = self.cfg.GetVGName()
936 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
937 nodelist = utils.NiceSort(self.cfg.GetNodeList())
938 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
939 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
940 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
941 for iname in instancelist)
942 i_non_redundant = [] # Non redundant instances
943 i_non_a_balanced = [] # Non auto-balanced instances
944 n_offline = [] # List of offline nodes
945 n_drained = [] # List of nodes being drained
951 # FIXME: verify OS list
953 master_files = [constants.CLUSTER_CONF_FILE]
955 file_names = ssconf.SimpleStore().GetFileList()
956 file_names.append(constants.SSL_CERT_FILE)
957 file_names.append(constants.RAPI_CERT_FILE)
958 file_names.extend(master_files)
960 local_checksums = utils.FingerprintFiles(file_names)
962 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
963 node_verify_param = {
964 constants.NV_FILELIST: file_names,
965 constants.NV_NODELIST: [node.name for node in nodeinfo
966 if not node.offline],
967 constants.NV_HYPERVISOR: hypervisors,
968 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
969 node.secondary_ip) for node in nodeinfo
970 if not node.offline],
971 constants.NV_INSTANCELIST: hypervisors,
972 constants.NV_VERSION: None,
973 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
975 if vg_name is not None:
976 node_verify_param[constants.NV_VGLIST] = None
977 node_verify_param[constants.NV_LVLIST] = vg_name
978 node_verify_param[constants.NV_DRBDLIST] = None
979 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
980 self.cfg.GetClusterName())
982 cluster = self.cfg.GetClusterInfo()
983 master_node = self.cfg.GetMasterNode()
984 all_drbd_map = self.cfg.ComputeDRBDMap()
986 for node_i in nodeinfo:
988 nresult = all_nvinfo[node].data
991 feedback_fn("* Skipping offline node %s" % (node,))
992 n_offline.append(node)
995 if node == master_node:
997 elif node_i.master_candidate:
998 ntype = "master candidate"
1001 n_drained.append(node)
1004 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1006 if all_nvinfo[node].failed or not isinstance(nresult, dict):
1007 feedback_fn(" - ERROR: connection to %s failed" % (node,))
1012 for minor, instance in all_drbd_map[node].items():
1013 if instance not in instanceinfo:
1014 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1016 # ghost instance should not be running, but otherwise we
1017 # don't give double warnings (both ghost instance and
1018 # unallocated minor in use)
1019 node_drbd[minor] = (instance, False)
1021 instance = instanceinfo[instance]
1022 node_drbd[minor] = (instance.name, instance.admin_up)
1023 result = self._VerifyNode(node_i, file_names, local_checksums,
1024 nresult, feedback_fn, master_files,
1028 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1030 node_volume[node] = {}
1031 elif isinstance(lvdata, basestring):
1032 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1033 (node, utils.SafeEncode(lvdata)))
1035 node_volume[node] = {}
1036 elif not isinstance(lvdata, dict):
1037 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1041 node_volume[node] = lvdata
1044 idata = nresult.get(constants.NV_INSTANCELIST, None)
1045 if not isinstance(idata, list):
1046 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1051 node_instance[node] = idata
1054 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1055 if not isinstance(nodeinfo, dict):
1056 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1062 "mfree": int(nodeinfo['memory_free']),
1065 # dictionary holding all instances this node is secondary for,
1066 # grouped by their primary node. Each key is a cluster node, and each
1067 # value is a list of instances which have the key as primary and the
1068 # current node as secondary. this is handy to calculate N+1 memory
1069 # availability if you can only failover from a primary to its
1071 "sinst-by-pnode": {},
1073 # FIXME: devise a free space model for file based instances as well
1074 if vg_name is not None:
1075 if (constants.NV_VGLIST not in nresult or
1076 vg_name not in nresult[constants.NV_VGLIST]):
1077 feedback_fn(" - ERROR: node %s didn't return data for the"
1078 " volume group '%s' - it is either missing or broken" %
1082 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1083 except (ValueError, KeyError):
1084 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1085 " from node %s" % (node,))
1089 node_vol_should = {}
1091 for instance in instancelist:
1092 feedback_fn("* Verifying instance %s" % instance)
1093 inst_config = instanceinfo[instance]
1094 result = self._VerifyInstance(instance, inst_config, node_volume,
1095 node_instance, feedback_fn, n_offline)
1097 inst_nodes_offline = []
1099 inst_config.MapLVsByNode(node_vol_should)
1101 instance_cfg[instance] = inst_config
1103 pnode = inst_config.primary_node
1104 if pnode in node_info:
1105 node_info[pnode]['pinst'].append(instance)
1106 elif pnode not in n_offline:
1107 feedback_fn(" - ERROR: instance %s, connection to primary node"
1108 " %s failed" % (instance, pnode))
1111 if pnode in n_offline:
1112 inst_nodes_offline.append(pnode)
1114 # If the instance is non-redundant we cannot survive losing its primary
1115 # node, so we are not N+1 compliant. On the other hand we have no disk
1116 # templates with more than one secondary so that situation is not well
1118 # FIXME: does not support file-backed instances
1119 if len(inst_config.secondary_nodes) == 0:
1120 i_non_redundant.append(instance)
1121 elif len(inst_config.secondary_nodes) > 1:
1122 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1125 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1126 i_non_a_balanced.append(instance)
1128 for snode in inst_config.secondary_nodes:
1129 if snode in node_info:
1130 node_info[snode]['sinst'].append(instance)
1131 if pnode not in node_info[snode]['sinst-by-pnode']:
1132 node_info[snode]['sinst-by-pnode'][pnode] = []
1133 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1134 elif snode not in n_offline:
1135 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1136 " %s failed" % (instance, snode))
1138 if snode in n_offline:
1139 inst_nodes_offline.append(snode)
1141 if inst_nodes_offline:
1142 # warn that the instance lives on offline nodes, and set bad=True
1143 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1144 ", ".join(inst_nodes_offline))
1147 feedback_fn("* Verifying orphan volumes")
1148 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1152 feedback_fn("* Verifying remaining instances")
1153 result = self._VerifyOrphanInstances(instancelist, node_instance,
1157 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1158 feedback_fn("* Verifying N+1 Memory redundancy")
1159 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1162 feedback_fn("* Other Notes")
1164 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1165 % len(i_non_redundant))
1167 if i_non_a_balanced:
1168 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1169 % len(i_non_a_balanced))
1172 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1175 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1179 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1180 """Analize the post-hooks' result
1182 This method analyses the hook result, handles it, and sends some
1183 nicely-formatted feedback back to the user.
1185 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1186 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1187 @param hooks_results: the results of the multi-node hooks rpc call
1188 @param feedback_fn: function used send feedback back to the caller
1189 @param lu_result: previous Exec result
1190 @return: the new Exec result, based on the previous result
1194 # We only really run POST phase hooks, and are only interested in
1196 if phase == constants.HOOKS_PHASE_POST:
1197 # Used to change hooks' output to proper indentation
1198 indent_re = re.compile('^', re.M)
1199 feedback_fn("* Hooks Results")
1200 if not hooks_results:
1201 feedback_fn(" - ERROR: general communication failure")
1204 for node_name in hooks_results:
1205 show_node_header = True
1206 res = hooks_results[node_name]
1207 if res.failed or res.data is False or not isinstance(res.data, list):
1209 # no need to warn or set fail return value
1211 feedback_fn(" Communication failure in hooks execution")
1214 for script, hkr, output in res.data:
1215 if hkr == constants.HKR_FAIL:
1216 # The node header is only shown once, if there are
1217 # failing hooks on that node
1218 if show_node_header:
1219 feedback_fn(" Node %s:" % node_name)
1220 show_node_header = False
1221 feedback_fn(" ERROR: Script %s failed, output:" % script)
1222 output = indent_re.sub(' ', output)
1223 feedback_fn("%s" % output)
1229 class LUVerifyDisks(NoHooksLU):
1230 """Verifies the cluster disks status.
1236 def ExpandNames(self):
1237 self.needed_locks = {
1238 locking.LEVEL_NODE: locking.ALL_SET,
1239 locking.LEVEL_INSTANCE: locking.ALL_SET,
1241 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1243 def CheckPrereq(self):
1244 """Check prerequisites.
1246 This has no prerequisites.
1251 def Exec(self, feedback_fn):
1252 """Verify integrity of cluster disks.
1255 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1257 vg_name = self.cfg.GetVGName()
1258 nodes = utils.NiceSort(self.cfg.GetNodeList())
1259 instances = [self.cfg.GetInstanceInfo(name)
1260 for name in self.cfg.GetInstanceList()]
1263 for inst in instances:
1265 if (not inst.admin_up or
1266 inst.disk_template not in constants.DTS_NET_MIRROR):
1268 inst.MapLVsByNode(inst_lvs)
1269 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1270 for node, vol_list in inst_lvs.iteritems():
1271 for vol in vol_list:
1272 nv_dict[(node, vol)] = inst
1277 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1282 lvs = node_lvs[node]
1285 self.LogWarning("Connection to node %s failed: %s" %
1289 if isinstance(lvs, basestring):
1290 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1291 res_nlvm[node] = lvs
1293 elif not isinstance(lvs, dict):
1294 logging.warning("Connection to node %s failed or invalid data"
1296 res_nodes.append(node)
1299 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1300 inst = nv_dict.pop((node, lv_name), None)
1301 if (not lv_online and inst is not None
1302 and inst.name not in res_instances):
1303 res_instances.append(inst.name)
1305 # any leftover items in nv_dict are missing LVs, let's arrange the
1307 for key, inst in nv_dict.iteritems():
1308 if inst.name not in res_missing:
1309 res_missing[inst.name] = []
1310 res_missing[inst.name].append(key)
1315 class LURenameCluster(LogicalUnit):
1316 """Rename the cluster.
1319 HPATH = "cluster-rename"
1320 HTYPE = constants.HTYPE_CLUSTER
1323 def BuildHooksEnv(self):
1328 "OP_TARGET": self.cfg.GetClusterName(),
1329 "NEW_NAME": self.op.name,
1331 mn = self.cfg.GetMasterNode()
1332 return env, [mn], [mn]
1334 def CheckPrereq(self):
1335 """Verify that the passed name is a valid one.
1338 hostname = utils.HostInfo(self.op.name)
1340 new_name = hostname.name
1341 self.ip = new_ip = hostname.ip
1342 old_name = self.cfg.GetClusterName()
1343 old_ip = self.cfg.GetMasterIP()
1344 if new_name == old_name and new_ip == old_ip:
1345 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1346 " cluster has changed")
1347 if new_ip != old_ip:
1348 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1349 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1350 " reachable on the network. Aborting." %
1353 self.op.name = new_name
1355 def Exec(self, feedback_fn):
1356 """Rename the cluster.
1359 clustername = self.op.name
1362 # shutdown the master IP
1363 master = self.cfg.GetMasterNode()
1364 result = self.rpc.call_node_stop_master(master, False)
1365 if result.failed or not result.data:
1366 raise errors.OpExecError("Could not disable the master role")
1369 cluster = self.cfg.GetClusterInfo()
1370 cluster.cluster_name = clustername
1371 cluster.master_ip = ip
1372 self.cfg.Update(cluster)
1374 # update the known hosts file
1375 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1376 node_list = self.cfg.GetNodeList()
1378 node_list.remove(master)
1381 result = self.rpc.call_upload_file(node_list,
1382 constants.SSH_KNOWN_HOSTS_FILE)
1383 for to_node, to_result in result.iteritems():
1384 msg = to_result.RemoteFailMsg()
1386 msg = ("Copy of file %s to node %s failed: %s" %
1387 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1388 self.proc.LogWarning(msg)
1391 result = self.rpc.call_node_start_master(master, False)
1392 if result.failed or not result.data:
1393 self.LogWarning("Could not re-enable the master role on"
1394 " the master, please restart manually.")
1397 def _RecursiveCheckIfLVMBased(disk):
1398 """Check if the given disk or its children are lvm-based.
1400 @type disk: L{objects.Disk}
1401 @param disk: the disk to check
1403 @return: boolean indicating whether a LD_LV dev_type was found or not
1407 for chdisk in disk.children:
1408 if _RecursiveCheckIfLVMBased(chdisk):
1410 return disk.dev_type == constants.LD_LV
1413 class LUSetClusterParams(LogicalUnit):
1414 """Change the parameters of the cluster.
1417 HPATH = "cluster-modify"
1418 HTYPE = constants.HTYPE_CLUSTER
1422 def CheckArguments(self):
1426 if not hasattr(self.op, "candidate_pool_size"):
1427 self.op.candidate_pool_size = None
1428 if self.op.candidate_pool_size is not None:
1430 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1431 except (ValueError, TypeError), err:
1432 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1434 if self.op.candidate_pool_size < 1:
1435 raise errors.OpPrereqError("At least one master candidate needed")
1437 def ExpandNames(self):
1438 # FIXME: in the future maybe other cluster params won't require checking on
1439 # all nodes to be modified.
1440 self.needed_locks = {
1441 locking.LEVEL_NODE: locking.ALL_SET,
1443 self.share_locks[locking.LEVEL_NODE] = 1
1445 def BuildHooksEnv(self):
1450 "OP_TARGET": self.cfg.GetClusterName(),
1451 "NEW_VG_NAME": self.op.vg_name,
1453 mn = self.cfg.GetMasterNode()
1454 return env, [mn], [mn]
1456 def CheckPrereq(self):
1457 """Check prerequisites.
1459 This checks whether the given params don't conflict and
1460 if the given volume group is valid.
1463 if self.op.vg_name is not None and not self.op.vg_name:
1464 instances = self.cfg.GetAllInstancesInfo().values()
1465 for inst in instances:
1466 for disk in inst.disks:
1467 if _RecursiveCheckIfLVMBased(disk):
1468 raise errors.OpPrereqError("Cannot disable lvm storage while"
1469 " lvm-based instances exist")
1471 node_list = self.acquired_locks[locking.LEVEL_NODE]
1473 # if vg_name not None, checks given volume group on all nodes
1475 vglist = self.rpc.call_vg_list(node_list)
1476 for node in node_list:
1477 if vglist[node].failed:
1478 # ignoring down node
1479 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1481 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1483 constants.MIN_VG_SIZE)
1485 raise errors.OpPrereqError("Error on node '%s': %s" %
1488 self.cluster = cluster = self.cfg.GetClusterInfo()
1489 # validate params changes
1490 if self.op.beparams:
1491 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1492 self.new_beparams = objects.FillDict(
1493 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1495 if self.op.nicparams:
1496 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1497 self.new_nicparams = objects.FillDict(
1498 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1499 objects.NIC.CheckParameterSyntax(self.new_nicparams)
1501 # hypervisor list/parameters
1502 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1503 if self.op.hvparams:
1504 if not isinstance(self.op.hvparams, dict):
1505 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1506 for hv_name, hv_dict in self.op.hvparams.items():
1507 if hv_name not in self.new_hvparams:
1508 self.new_hvparams[hv_name] = hv_dict
1510 self.new_hvparams[hv_name].update(hv_dict)
1512 if self.op.enabled_hypervisors is not None:
1513 self.hv_list = self.op.enabled_hypervisors
1515 self.hv_list = cluster.enabled_hypervisors
1517 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1518 # either the enabled list has changed, or the parameters have, validate
1519 for hv_name, hv_params in self.new_hvparams.items():
1520 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1521 (self.op.enabled_hypervisors and
1522 hv_name in self.op.enabled_hypervisors)):
1523 # either this is a new hypervisor, or its parameters have changed
1524 hv_class = hypervisor.GetHypervisor(hv_name)
1525 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1526 hv_class.CheckParameterSyntax(hv_params)
1527 _CheckHVParams(self, node_list, hv_name, hv_params)
1529 def Exec(self, feedback_fn):
1530 """Change the parameters of the cluster.
1533 if self.op.vg_name is not None:
1534 new_volume = self.op.vg_name
1537 if new_volume != self.cfg.GetVGName():
1538 self.cfg.SetVGName(new_volume)
1540 feedback_fn("Cluster LVM configuration already in desired"
1541 " state, not changing")
1542 if self.op.hvparams:
1543 self.cluster.hvparams = self.new_hvparams
1544 if self.op.enabled_hypervisors is not None:
1545 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1546 if self.op.beparams:
1547 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1548 if self.op.nicparams:
1549 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1551 if self.op.candidate_pool_size is not None:
1552 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1554 self.cfg.Update(self.cluster)
1556 # we want to update nodes after the cluster so that if any errors
1557 # happen, we have recorded and saved the cluster info
1558 if self.op.candidate_pool_size is not None:
1559 _AdjustCandidatePool(self)
1562 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1563 """Distribute additional files which are part of the cluster configuration.
1565 ConfigWriter takes care of distributing the config and ssconf files, but
1566 there are more files which should be distributed to all nodes. This function
1567 makes sure those are copied.
1569 @param lu: calling logical unit
1570 @param additional_nodes: list of nodes not in the config to distribute to
1573 # 1. Gather target nodes
1574 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1575 dist_nodes = lu.cfg.GetNodeList()
1576 if additional_nodes is not None:
1577 dist_nodes.extend(additional_nodes)
1578 if myself.name in dist_nodes:
1579 dist_nodes.remove(myself.name)
1580 # 2. Gather files to distribute
1581 dist_files = set([constants.ETC_HOSTS,
1582 constants.SSH_KNOWN_HOSTS_FILE,
1583 constants.RAPI_CERT_FILE,
1584 constants.RAPI_USERS_FILE,
1587 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1588 for hv_name in enabled_hypervisors:
1589 hv_class = hypervisor.GetHypervisor(hv_name)
1590 dist_files.update(hv_class.GetAncillaryFiles())
1592 # 3. Perform the files upload
1593 for fname in dist_files:
1594 if os.path.exists(fname):
1595 result = lu.rpc.call_upload_file(dist_nodes, fname)
1596 for to_node, to_result in result.items():
1597 msg = to_result.RemoteFailMsg()
1599 msg = ("Copy of file %s to node %s failed: %s" %
1600 (fname, to_node, msg))
1601 lu.proc.LogWarning(msg)
1604 class LURedistributeConfig(NoHooksLU):
1605 """Force the redistribution of cluster configuration.
1607 This is a very simple LU.
1613 def ExpandNames(self):
1614 self.needed_locks = {
1615 locking.LEVEL_NODE: locking.ALL_SET,
1617 self.share_locks[locking.LEVEL_NODE] = 1
1619 def CheckPrereq(self):
1620 """Check prerequisites.
1624 def Exec(self, feedback_fn):
1625 """Redistribute the configuration.
1628 self.cfg.Update(self.cfg.GetClusterInfo())
1629 _RedistributeAncillaryFiles(self)
1632 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1633 """Sleep and poll for an instance's disk to sync.
1636 if not instance.disks:
1640 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1642 node = instance.primary_node
1644 for dev in instance.disks:
1645 lu.cfg.SetDiskID(dev, node)
1651 cumul_degraded = False
1652 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1653 if rstats.failed or not rstats.data:
1654 lu.LogWarning("Can't get any data from node %s", node)
1657 raise errors.RemoteError("Can't contact node %s for mirror data,"
1658 " aborting." % node)
1661 rstats = rstats.data
1663 for i, mstat in enumerate(rstats):
1665 lu.LogWarning("Can't compute data for node %s/%s",
1666 node, instance.disks[i].iv_name)
1668 # we ignore the ldisk parameter
1669 perc_done, est_time, is_degraded, _ = mstat
1670 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1671 if perc_done is not None:
1673 if est_time is not None:
1674 rem_time = "%d estimated seconds remaining" % est_time
1677 rem_time = "no time estimate"
1678 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1679 (instance.disks[i].iv_name, perc_done, rem_time))
1683 time.sleep(min(60, max_time))
1686 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1687 return not cumul_degraded
1690 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1691 """Check that mirrors are not degraded.
1693 The ldisk parameter, if True, will change the test from the
1694 is_degraded attribute (which represents overall non-ok status for
1695 the device(s)) to the ldisk (representing the local storage status).
1698 lu.cfg.SetDiskID(dev, node)
1705 if on_primary or dev.AssembleOnSecondary():
1706 rstats = lu.rpc.call_blockdev_find(node, dev)
1707 msg = rstats.RemoteFailMsg()
1709 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1711 elif not rstats.payload:
1712 lu.LogWarning("Can't find disk on node %s", node)
1715 result = result and (not rstats.payload[idx])
1717 for child in dev.children:
1718 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1723 class LUDiagnoseOS(NoHooksLU):
1724 """Logical unit for OS diagnose/query.
1727 _OP_REQP = ["output_fields", "names"]
1729 _FIELDS_STATIC = utils.FieldSet()
1730 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1732 def ExpandNames(self):
1734 raise errors.OpPrereqError("Selective OS query not supported")
1736 _CheckOutputFields(static=self._FIELDS_STATIC,
1737 dynamic=self._FIELDS_DYNAMIC,
1738 selected=self.op.output_fields)
1740 # Lock all nodes, in shared mode
1741 # Temporary removal of locks, should be reverted later
1742 # TODO: reintroduce locks when they are lighter-weight
1743 self.needed_locks = {}
1744 #self.share_locks[locking.LEVEL_NODE] = 1
1745 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1747 def CheckPrereq(self):
1748 """Check prerequisites.
1753 def _DiagnoseByOS(node_list, rlist):
1754 """Remaps a per-node return list into an a per-os per-node dictionary
1756 @param node_list: a list with the names of all nodes
1757 @param rlist: a map with node names as keys and OS objects as values
1760 @return: a dictionary with osnames as keys and as value another map, with
1761 nodes as keys and list of OS objects as values, eg::
1763 {"debian-etch": {"node1": [<object>,...],
1764 "node2": [<object>,]}
1769 # we build here the list of nodes that didn't fail the RPC (at RPC
1770 # level), so that nodes with a non-responding node daemon don't
1771 # make all OSes invalid
1772 good_nodes = [node_name for node_name in rlist
1773 if not rlist[node_name].failed]
1774 for node_name, nr in rlist.iteritems():
1775 if nr.failed or not nr.data:
1777 for os_obj in nr.data:
1778 if os_obj.name not in all_os:
1779 # build a list of nodes for this os containing empty lists
1780 # for each node in node_list
1781 all_os[os_obj.name] = {}
1782 for nname in good_nodes:
1783 all_os[os_obj.name][nname] = []
1784 all_os[os_obj.name][node_name].append(os_obj)
1787 def Exec(self, feedback_fn):
1788 """Compute the list of OSes.
1791 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1792 node_data = self.rpc.call_os_diagnose(valid_nodes)
1793 if node_data == False:
1794 raise errors.OpExecError("Can't gather the list of OSes")
1795 pol = self._DiagnoseByOS(valid_nodes, node_data)
1797 for os_name, os_data in pol.iteritems():
1799 for field in self.op.output_fields:
1802 elif field == "valid":
1803 val = utils.all([osl and osl[0] for osl in os_data.values()])
1804 elif field == "node_status":
1806 for node_name, nos_list in os_data.iteritems():
1807 val[node_name] = [(v.status, v.path) for v in nos_list]
1809 raise errors.ParameterError(field)
1816 class LURemoveNode(LogicalUnit):
1817 """Logical unit for removing a node.
1820 HPATH = "node-remove"
1821 HTYPE = constants.HTYPE_NODE
1822 _OP_REQP = ["node_name"]
1824 def BuildHooksEnv(self):
1827 This doesn't run on the target node in the pre phase as a failed
1828 node would then be impossible to remove.
1832 "OP_TARGET": self.op.node_name,
1833 "NODE_NAME": self.op.node_name,
1835 all_nodes = self.cfg.GetNodeList()
1836 all_nodes.remove(self.op.node_name)
1837 return env, all_nodes, all_nodes
1839 def CheckPrereq(self):
1840 """Check prerequisites.
1843 - the node exists in the configuration
1844 - it does not have primary or secondary instances
1845 - it's not the master
1847 Any errors are signalled by raising errors.OpPrereqError.
1850 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1852 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1854 instance_list = self.cfg.GetInstanceList()
1856 masternode = self.cfg.GetMasterNode()
1857 if node.name == masternode:
1858 raise errors.OpPrereqError("Node is the master node,"
1859 " you need to failover first.")
1861 for instance_name in instance_list:
1862 instance = self.cfg.GetInstanceInfo(instance_name)
1863 if node.name in instance.all_nodes:
1864 raise errors.OpPrereqError("Instance %s is still running on the node,"
1865 " please remove first." % instance_name)
1866 self.op.node_name = node.name
1869 def Exec(self, feedback_fn):
1870 """Removes the node from the cluster.
1874 logging.info("Stopping the node daemon and removing configs from node %s",
1877 self.context.RemoveNode(node.name)
1879 self.rpc.call_node_leave_cluster(node.name)
1881 # Promote nodes to master candidate as needed
1882 _AdjustCandidatePool(self)
1885 class LUQueryNodes(NoHooksLU):
1886 """Logical unit for querying nodes.
1889 _OP_REQP = ["output_fields", "names", "use_locking"]
1891 _FIELDS_DYNAMIC = utils.FieldSet(
1893 "mtotal", "mnode", "mfree",
1895 "ctotal", "cnodes", "csockets",
1898 _FIELDS_STATIC = utils.FieldSet(
1899 "name", "pinst_cnt", "sinst_cnt",
1900 "pinst_list", "sinst_list",
1901 "pip", "sip", "tags",
1909 def ExpandNames(self):
1910 _CheckOutputFields(static=self._FIELDS_STATIC,
1911 dynamic=self._FIELDS_DYNAMIC,
1912 selected=self.op.output_fields)
1914 self.needed_locks = {}
1915 self.share_locks[locking.LEVEL_NODE] = 1
1918 self.wanted = _GetWantedNodes(self, self.op.names)
1920 self.wanted = locking.ALL_SET
1922 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1923 self.do_locking = self.do_node_query and self.op.use_locking
1925 # if we don't request only static fields, we need to lock the nodes
1926 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1929 def CheckPrereq(self):
1930 """Check prerequisites.
1933 # The validation of the node list is done in the _GetWantedNodes,
1934 # if non empty, and if empty, there's no validation to do
1937 def Exec(self, feedback_fn):
1938 """Computes the list of nodes and their attributes.
1941 all_info = self.cfg.GetAllNodesInfo()
1943 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1944 elif self.wanted != locking.ALL_SET:
1945 nodenames = self.wanted
1946 missing = set(nodenames).difference(all_info.keys())
1948 raise errors.OpExecError(
1949 "Some nodes were removed before retrieving their data: %s" % missing)
1951 nodenames = all_info.keys()
1953 nodenames = utils.NiceSort(nodenames)
1954 nodelist = [all_info[name] for name in nodenames]
1956 # begin data gathering
1958 if self.do_node_query:
1960 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1961 self.cfg.GetHypervisorType())
1962 for name in nodenames:
1963 nodeinfo = node_data[name]
1964 if not nodeinfo.failed and nodeinfo.data:
1965 nodeinfo = nodeinfo.data
1966 fn = utils.TryConvert
1968 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1969 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1970 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1971 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1972 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1973 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1974 "bootid": nodeinfo.get('bootid', None),
1975 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1976 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1979 live_data[name] = {}
1981 live_data = dict.fromkeys(nodenames, {})
1983 node_to_primary = dict([(name, set()) for name in nodenames])
1984 node_to_secondary = dict([(name, set()) for name in nodenames])
1986 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1987 "sinst_cnt", "sinst_list"))
1988 if inst_fields & frozenset(self.op.output_fields):
1989 instancelist = self.cfg.GetInstanceList()
1991 for instance_name in instancelist:
1992 inst = self.cfg.GetInstanceInfo(instance_name)
1993 if inst.primary_node in node_to_primary:
1994 node_to_primary[inst.primary_node].add(inst.name)
1995 for secnode in inst.secondary_nodes:
1996 if secnode in node_to_secondary:
1997 node_to_secondary[secnode].add(inst.name)
1999 master_node = self.cfg.GetMasterNode()
2001 # end data gathering
2004 for node in nodelist:
2006 for field in self.op.output_fields:
2009 elif field == "pinst_list":
2010 val = list(node_to_primary[node.name])
2011 elif field == "sinst_list":
2012 val = list(node_to_secondary[node.name])
2013 elif field == "pinst_cnt":
2014 val = len(node_to_primary[node.name])
2015 elif field == "sinst_cnt":
2016 val = len(node_to_secondary[node.name])
2017 elif field == "pip":
2018 val = node.primary_ip
2019 elif field == "sip":
2020 val = node.secondary_ip
2021 elif field == "tags":
2022 val = list(node.GetTags())
2023 elif field == "serial_no":
2024 val = node.serial_no
2025 elif field == "master_candidate":
2026 val = node.master_candidate
2027 elif field == "master":
2028 val = node.name == master_node
2029 elif field == "offline":
2031 elif field == "drained":
2033 elif self._FIELDS_DYNAMIC.Matches(field):
2034 val = live_data[node.name].get(field, None)
2036 raise errors.ParameterError(field)
2037 node_output.append(val)
2038 output.append(node_output)
2043 class LUQueryNodeVolumes(NoHooksLU):
2044 """Logical unit for getting volumes on node(s).
2047 _OP_REQP = ["nodes", "output_fields"]
2049 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2050 _FIELDS_STATIC = utils.FieldSet("node")
2052 def ExpandNames(self):
2053 _CheckOutputFields(static=self._FIELDS_STATIC,
2054 dynamic=self._FIELDS_DYNAMIC,
2055 selected=self.op.output_fields)
2057 self.needed_locks = {}
2058 self.share_locks[locking.LEVEL_NODE] = 1
2059 if not self.op.nodes:
2060 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2062 self.needed_locks[locking.LEVEL_NODE] = \
2063 _GetWantedNodes(self, self.op.nodes)
2065 def CheckPrereq(self):
2066 """Check prerequisites.
2068 This checks that the fields required are valid output fields.
2071 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2073 def Exec(self, feedback_fn):
2074 """Computes the list of nodes and their attributes.
2077 nodenames = self.nodes
2078 volumes = self.rpc.call_node_volumes(nodenames)
2080 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2081 in self.cfg.GetInstanceList()]
2083 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2086 for node in nodenames:
2087 if node not in volumes or volumes[node].failed or not volumes[node].data:
2090 node_vols = volumes[node].data[:]
2091 node_vols.sort(key=lambda vol: vol['dev'])
2093 for vol in node_vols:
2095 for field in self.op.output_fields:
2098 elif field == "phys":
2102 elif field == "name":
2104 elif field == "size":
2105 val = int(float(vol['size']))
2106 elif field == "instance":
2108 if node not in lv_by_node[inst]:
2110 if vol['name'] in lv_by_node[inst][node]:
2116 raise errors.ParameterError(field)
2117 node_output.append(str(val))
2119 output.append(node_output)
2124 class LUAddNode(LogicalUnit):
2125 """Logical unit for adding node to the cluster.
2129 HTYPE = constants.HTYPE_NODE
2130 _OP_REQP = ["node_name"]
2132 def BuildHooksEnv(self):
2135 This will run on all nodes before, and on all nodes + the new node after.
2139 "OP_TARGET": self.op.node_name,
2140 "NODE_NAME": self.op.node_name,
2141 "NODE_PIP": self.op.primary_ip,
2142 "NODE_SIP": self.op.secondary_ip,
2144 nodes_0 = self.cfg.GetNodeList()
2145 nodes_1 = nodes_0 + [self.op.node_name, ]
2146 return env, nodes_0, nodes_1
2148 def CheckPrereq(self):
2149 """Check prerequisites.
2152 - the new node is not already in the config
2154 - its parameters (single/dual homed) matches the cluster
2156 Any errors are signalled by raising errors.OpPrereqError.
2159 node_name = self.op.node_name
2162 dns_data = utils.HostInfo(node_name)
2164 node = dns_data.name
2165 primary_ip = self.op.primary_ip = dns_data.ip
2166 secondary_ip = getattr(self.op, "secondary_ip", None)
2167 if secondary_ip is None:
2168 secondary_ip = primary_ip
2169 if not utils.IsValidIP(secondary_ip):
2170 raise errors.OpPrereqError("Invalid secondary IP given")
2171 self.op.secondary_ip = secondary_ip
2173 node_list = cfg.GetNodeList()
2174 if not self.op.readd and node in node_list:
2175 raise errors.OpPrereqError("Node %s is already in the configuration" %
2177 elif self.op.readd and node not in node_list:
2178 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2180 for existing_node_name in node_list:
2181 existing_node = cfg.GetNodeInfo(existing_node_name)
2183 if self.op.readd and node == existing_node_name:
2184 if (existing_node.primary_ip != primary_ip or
2185 existing_node.secondary_ip != secondary_ip):
2186 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2187 " address configuration as before")
2190 if (existing_node.primary_ip == primary_ip or
2191 existing_node.secondary_ip == primary_ip or
2192 existing_node.primary_ip == secondary_ip or
2193 existing_node.secondary_ip == secondary_ip):
2194 raise errors.OpPrereqError("New node ip address(es) conflict with"
2195 " existing node %s" % existing_node.name)
2197 # check that the type of the node (single versus dual homed) is the
2198 # same as for the master
2199 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2200 master_singlehomed = myself.secondary_ip == myself.primary_ip
2201 newbie_singlehomed = secondary_ip == primary_ip
2202 if master_singlehomed != newbie_singlehomed:
2203 if master_singlehomed:
2204 raise errors.OpPrereqError("The master has no private ip but the"
2205 " new node has one")
2207 raise errors.OpPrereqError("The master has a private ip but the"
2208 " new node doesn't have one")
2210 # checks reachablity
2211 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2212 raise errors.OpPrereqError("Node not reachable by ping")
2214 if not newbie_singlehomed:
2215 # check reachability from my secondary ip to newbie's secondary ip
2216 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2217 source=myself.secondary_ip):
2218 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2219 " based ping to noded port")
2221 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2222 mc_now, _ = self.cfg.GetMasterCandidateStats()
2223 master_candidate = mc_now < cp_size
2225 self.new_node = objects.Node(name=node,
2226 primary_ip=primary_ip,
2227 secondary_ip=secondary_ip,
2228 master_candidate=master_candidate,
2229 offline=False, drained=False)
2231 def Exec(self, feedback_fn):
2232 """Adds the new node to the cluster.
2235 new_node = self.new_node
2236 node = new_node.name
2238 # check connectivity
2239 result = self.rpc.call_version([node])[node]
2242 if constants.PROTOCOL_VERSION == result.data:
2243 logging.info("Communication to node %s fine, sw version %s match",
2246 raise errors.OpExecError("Version mismatch master version %s,"
2247 " node version %s" %
2248 (constants.PROTOCOL_VERSION, result.data))
2250 raise errors.OpExecError("Cannot get version from the new node")
2253 logging.info("Copy ssh key to node %s", node)
2254 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2256 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2257 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2263 keyarray.append(f.read())
2267 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2269 keyarray[3], keyarray[4], keyarray[5])
2271 msg = result.RemoteFailMsg()
2273 raise errors.OpExecError("Cannot transfer ssh keys to the"
2274 " new node: %s" % msg)
2276 # Add node to our /etc/hosts, and add key to known_hosts
2277 if self.cfg.GetClusterInfo().modify_etc_hosts:
2278 utils.AddHostToEtcHosts(new_node.name)
2280 if new_node.secondary_ip != new_node.primary_ip:
2281 result = self.rpc.call_node_has_ip_address(new_node.name,
2282 new_node.secondary_ip)
2283 if result.failed or not result.data:
2284 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2285 " you gave (%s). Please fix and re-run this"
2286 " command." % new_node.secondary_ip)
2288 node_verify_list = [self.cfg.GetMasterNode()]
2289 node_verify_param = {
2291 # TODO: do a node-net-test as well?
2294 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2295 self.cfg.GetClusterName())
2296 for verifier in node_verify_list:
2297 if result[verifier].failed or not result[verifier].data:
2298 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2299 " for remote verification" % verifier)
2300 if result[verifier].data['nodelist']:
2301 for failed in result[verifier].data['nodelist']:
2302 feedback_fn("ssh/hostname verification failed %s -> %s" %
2303 (verifier, result[verifier].data['nodelist'][failed]))
2304 raise errors.OpExecError("ssh/hostname verification failed.")
2307 _RedistributeAncillaryFiles(self)
2308 self.context.ReaddNode(new_node)
2310 _RedistributeAncillaryFiles(self, additional_nodes=node)
2311 self.context.AddNode(new_node)
2314 class LUSetNodeParams(LogicalUnit):
2315 """Modifies the parameters of a node.
2318 HPATH = "node-modify"
2319 HTYPE = constants.HTYPE_NODE
2320 _OP_REQP = ["node_name"]
2323 def CheckArguments(self):
2324 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2325 if node_name is None:
2326 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2327 self.op.node_name = node_name
2328 _CheckBooleanOpField(self.op, 'master_candidate')
2329 _CheckBooleanOpField(self.op, 'offline')
2330 _CheckBooleanOpField(self.op, 'drained')
2331 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2332 if all_mods.count(None) == 3:
2333 raise errors.OpPrereqError("Please pass at least one modification")
2334 if all_mods.count(True) > 1:
2335 raise errors.OpPrereqError("Can't set the node into more than one"
2336 " state at the same time")
2338 def ExpandNames(self):
2339 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2341 def BuildHooksEnv(self):
2344 This runs on the master node.
2348 "OP_TARGET": self.op.node_name,
2349 "MASTER_CANDIDATE": str(self.op.master_candidate),
2350 "OFFLINE": str(self.op.offline),
2351 "DRAINED": str(self.op.drained),
2353 nl = [self.cfg.GetMasterNode(),
2357 def CheckPrereq(self):
2358 """Check prerequisites.
2360 This only checks the instance list against the existing names.
2363 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2365 if ((self.op.master_candidate == False or self.op.offline == True or
2366 self.op.drained == True) and node.master_candidate):
2367 # we will demote the node from master_candidate
2368 if self.op.node_name == self.cfg.GetMasterNode():
2369 raise errors.OpPrereqError("The master node has to be a"
2370 " master candidate, online and not drained")
2371 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2372 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2373 if num_candidates <= cp_size:
2374 msg = ("Not enough master candidates (desired"
2375 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2377 self.LogWarning(msg)
2379 raise errors.OpPrereqError(msg)
2381 if (self.op.master_candidate == True and
2382 ((node.offline and not self.op.offline == False) or
2383 (node.drained and not self.op.drained == False))):
2384 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2385 " to master_candidate" % node.name)
2389 def Exec(self, feedback_fn):
2398 if self.op.offline is not None:
2399 node.offline = self.op.offline
2400 result.append(("offline", str(self.op.offline)))
2401 if self.op.offline == True:
2402 if node.master_candidate:
2403 node.master_candidate = False
2405 result.append(("master_candidate", "auto-demotion due to offline"))
2407 node.drained = False
2408 result.append(("drained", "clear drained status due to offline"))
2410 if self.op.master_candidate is not None:
2411 node.master_candidate = self.op.master_candidate
2413 result.append(("master_candidate", str(self.op.master_candidate)))
2414 if self.op.master_candidate == False:
2415 rrc = self.rpc.call_node_demote_from_mc(node.name)
2416 msg = rrc.RemoteFailMsg()
2418 self.LogWarning("Node failed to demote itself: %s" % msg)
2420 if self.op.drained is not None:
2421 node.drained = self.op.drained
2422 result.append(("drained", str(self.op.drained)))
2423 if self.op.drained == True:
2424 if node.master_candidate:
2425 node.master_candidate = False
2427 result.append(("master_candidate", "auto-demotion due to drain"))
2429 node.offline = False
2430 result.append(("offline", "clear offline status due to drain"))
2432 # this will trigger configuration file update, if needed
2433 self.cfg.Update(node)
2434 # this will trigger job queue propagation or cleanup
2436 self.context.ReaddNode(node)
2441 class LUPowercycleNode(NoHooksLU):
2442 """Powercycles a node.
2445 _OP_REQP = ["node_name", "force"]
2448 def CheckArguments(self):
2449 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2450 if node_name is None:
2451 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2452 self.op.node_name = node_name
2453 if node_name == self.cfg.GetMasterNode() and not self.op.force:
2454 raise errors.OpPrereqError("The node is the master and the force"
2455 " parameter was not set")
2457 def ExpandNames(self):
2458 """Locking for PowercycleNode.
2460 This is a last-resource option and shouldn't block on other
2461 jobs. Therefore, we grab no locks.
2464 self.needed_locks = {}
2466 def CheckPrereq(self):
2467 """Check prerequisites.
2469 This LU has no prereqs.
2474 def Exec(self, feedback_fn):
2478 result = self.rpc.call_node_powercycle(self.op.node_name,
2479 self.cfg.GetHypervisorType())
2480 msg = result.RemoteFailMsg()
2482 raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2483 return result.payload
2486 class LUQueryClusterInfo(NoHooksLU):
2487 """Query cluster configuration.
2493 def ExpandNames(self):
2494 self.needed_locks = {}
2496 def CheckPrereq(self):
2497 """No prerequsites needed for this LU.
2502 def Exec(self, feedback_fn):
2503 """Return cluster config.
2506 cluster = self.cfg.GetClusterInfo()
2508 "software_version": constants.RELEASE_VERSION,
2509 "protocol_version": constants.PROTOCOL_VERSION,
2510 "config_version": constants.CONFIG_VERSION,
2511 "os_api_version": constants.OS_API_VERSION,
2512 "export_version": constants.EXPORT_VERSION,
2513 "architecture": (platform.architecture()[0], platform.machine()),
2514 "name": cluster.cluster_name,
2515 "master": cluster.master_node,
2516 "default_hypervisor": cluster.default_hypervisor,
2517 "enabled_hypervisors": cluster.enabled_hypervisors,
2518 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2519 for hypervisor in cluster.enabled_hypervisors]),
2520 "beparams": cluster.beparams,
2521 "nicparams": cluster.nicparams,
2522 "candidate_pool_size": cluster.candidate_pool_size,
2523 "default_bridge": cluster.default_bridge,
2524 "master_netdev": cluster.master_netdev,
2525 "volume_group_name": cluster.volume_group_name,
2526 "file_storage_dir": cluster.file_storage_dir,
2532 class LUQueryConfigValues(NoHooksLU):
2533 """Return configuration values.
2538 _FIELDS_DYNAMIC = utils.FieldSet()
2539 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2541 def ExpandNames(self):
2542 self.needed_locks = {}
2544 _CheckOutputFields(static=self._FIELDS_STATIC,
2545 dynamic=self._FIELDS_DYNAMIC,
2546 selected=self.op.output_fields)
2548 def CheckPrereq(self):
2549 """No prerequisites.
2554 def Exec(self, feedback_fn):
2555 """Dump a representation of the cluster config to the standard output.
2559 for field in self.op.output_fields:
2560 if field == "cluster_name":
2561 entry = self.cfg.GetClusterName()
2562 elif field == "master_node":
2563 entry = self.cfg.GetMasterNode()
2564 elif field == "drain_flag":
2565 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2567 raise errors.ParameterError(field)
2568 values.append(entry)
2572 class LUActivateInstanceDisks(NoHooksLU):
2573 """Bring up an instance's disks.
2576 _OP_REQP = ["instance_name"]
2579 def ExpandNames(self):
2580 self._ExpandAndLockInstance()
2581 self.needed_locks[locking.LEVEL_NODE] = []
2582 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2584 def DeclareLocks(self, level):
2585 if level == locking.LEVEL_NODE:
2586 self._LockInstancesNodes()
2588 def CheckPrereq(self):
2589 """Check prerequisites.
2591 This checks that the instance is in the cluster.
2594 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2595 assert self.instance is not None, \
2596 "Cannot retrieve locked instance %s" % self.op.instance_name
2597 _CheckNodeOnline(self, self.instance.primary_node)
2599 def Exec(self, feedback_fn):
2600 """Activate the disks.
2603 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2605 raise errors.OpExecError("Cannot activate block devices")
2610 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2611 """Prepare the block devices for an instance.
2613 This sets up the block devices on all nodes.
2615 @type lu: L{LogicalUnit}
2616 @param lu: the logical unit on whose behalf we execute
2617 @type instance: L{objects.Instance}
2618 @param instance: the instance for whose disks we assemble
2619 @type ignore_secondaries: boolean
2620 @param ignore_secondaries: if true, errors on secondary nodes
2621 won't result in an error return from the function
2622 @return: False if the operation failed, otherwise a list of
2623 (host, instance_visible_name, node_visible_name)
2624 with the mapping from node devices to instance devices
2629 iname = instance.name
2630 # With the two passes mechanism we try to reduce the window of
2631 # opportunity for the race condition of switching DRBD to primary
2632 # before handshaking occured, but we do not eliminate it
2634 # The proper fix would be to wait (with some limits) until the
2635 # connection has been made and drbd transitions from WFConnection
2636 # into any other network-connected state (Connected, SyncTarget,
2639 # 1st pass, assemble on all nodes in secondary mode
2640 for inst_disk in instance.disks:
2641 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2642 lu.cfg.SetDiskID(node_disk, node)
2643 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2644 msg = result.RemoteFailMsg()
2646 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2647 " (is_primary=False, pass=1): %s",
2648 inst_disk.iv_name, node, msg)
2649 if not ignore_secondaries:
2652 # FIXME: race condition on drbd migration to primary
2654 # 2nd pass, do only the primary node
2655 for inst_disk in instance.disks:
2656 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2657 if node != instance.primary_node:
2659 lu.cfg.SetDiskID(node_disk, node)
2660 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2661 msg = result.RemoteFailMsg()
2663 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2664 " (is_primary=True, pass=2): %s",
2665 inst_disk.iv_name, node, msg)
2667 device_info.append((instance.primary_node, inst_disk.iv_name,
2670 # leave the disks configured for the primary node
2671 # this is a workaround that would be fixed better by
2672 # improving the logical/physical id handling
2673 for disk in instance.disks:
2674 lu.cfg.SetDiskID(disk, instance.primary_node)
2676 return disks_ok, device_info
2679 def _StartInstanceDisks(lu, instance, force):
2680 """Start the disks of an instance.
2683 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2684 ignore_secondaries=force)
2686 _ShutdownInstanceDisks(lu, instance)
2687 if force is not None and not force:
2688 lu.proc.LogWarning("", hint="If the message above refers to a"
2690 " you can retry the operation using '--force'.")
2691 raise errors.OpExecError("Disk consistency error")
2694 class LUDeactivateInstanceDisks(NoHooksLU):
2695 """Shutdown an instance's disks.
2698 _OP_REQP = ["instance_name"]
2701 def ExpandNames(self):
2702 self._ExpandAndLockInstance()
2703 self.needed_locks[locking.LEVEL_NODE] = []
2704 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2706 def DeclareLocks(self, level):
2707 if level == locking.LEVEL_NODE:
2708 self._LockInstancesNodes()
2710 def CheckPrereq(self):
2711 """Check prerequisites.
2713 This checks that the instance is in the cluster.
2716 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2717 assert self.instance is not None, \
2718 "Cannot retrieve locked instance %s" % self.op.instance_name
2720 def Exec(self, feedback_fn):
2721 """Deactivate the disks
2724 instance = self.instance
2725 _SafeShutdownInstanceDisks(self, instance)
2728 def _SafeShutdownInstanceDisks(lu, instance):
2729 """Shutdown block devices of an instance.
2731 This function checks if an instance is running, before calling
2732 _ShutdownInstanceDisks.
2735 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2736 [instance.hypervisor])
2737 ins_l = ins_l[instance.primary_node]
2738 if ins_l.failed or not isinstance(ins_l.data, list):
2739 raise errors.OpExecError("Can't contact node '%s'" %
2740 instance.primary_node)
2742 if instance.name in ins_l.data:
2743 raise errors.OpExecError("Instance is running, can't shutdown"
2746 _ShutdownInstanceDisks(lu, instance)
2749 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2750 """Shutdown block devices of an instance.
2752 This does the shutdown on all nodes of the instance.
2754 If the ignore_primary is false, errors on the primary node are
2759 for disk in instance.disks:
2760 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2761 lu.cfg.SetDiskID(top_disk, node)
2762 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2763 msg = result.RemoteFailMsg()
2765 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2766 disk.iv_name, node, msg)
2767 if not ignore_primary or node != instance.primary_node:
2772 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2773 """Checks if a node has enough free memory.
2775 This function check if a given node has the needed amount of free
2776 memory. In case the node has less memory or we cannot get the
2777 information from the node, this function raise an OpPrereqError
2780 @type lu: C{LogicalUnit}
2781 @param lu: a logical unit from which we get configuration data
2783 @param node: the node to check
2784 @type reason: C{str}
2785 @param reason: string to use in the error message
2786 @type requested: C{int}
2787 @param requested: the amount of memory in MiB to check for
2788 @type hypervisor_name: C{str}
2789 @param hypervisor_name: the hypervisor to ask for memory stats
2790 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2791 we cannot check the node
2794 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2795 nodeinfo[node].Raise()
2796 free_mem = nodeinfo[node].data.get('memory_free')
2797 if not isinstance(free_mem, int):
2798 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2799 " was '%s'" % (node, free_mem))
2800 if requested > free_mem:
2801 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2802 " needed %s MiB, available %s MiB" %
2803 (node, reason, requested, free_mem))
2806 class LUStartupInstance(LogicalUnit):
2807 """Starts an instance.
2810 HPATH = "instance-start"
2811 HTYPE = constants.HTYPE_INSTANCE
2812 _OP_REQP = ["instance_name", "force"]
2815 def ExpandNames(self):
2816 self._ExpandAndLockInstance()
2818 def BuildHooksEnv(self):
2821 This runs on master, primary and secondary nodes of the instance.
2825 "FORCE": self.op.force,
2827 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2828 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2831 def CheckPrereq(self):
2832 """Check prerequisites.
2834 This checks that the instance is in the cluster.
2837 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2838 assert self.instance is not None, \
2839 "Cannot retrieve locked instance %s" % self.op.instance_name
2842 self.beparams = getattr(self.op, "beparams", {})
2844 if not isinstance(self.beparams, dict):
2845 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2846 " dict" % (type(self.beparams), ))
2847 # fill the beparams dict
2848 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2849 self.op.beparams = self.beparams
2852 self.hvparams = getattr(self.op, "hvparams", {})
2854 if not isinstance(self.hvparams, dict):
2855 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2856 " dict" % (type(self.hvparams), ))
2858 # check hypervisor parameter syntax (locally)
2859 cluster = self.cfg.GetClusterInfo()
2860 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2861 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2863 filled_hvp.update(self.hvparams)
2864 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2865 hv_type.CheckParameterSyntax(filled_hvp)
2866 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2867 self.op.hvparams = self.hvparams
2869 _CheckNodeOnline(self, instance.primary_node)
2871 bep = self.cfg.GetClusterInfo().FillBE(instance)
2872 # check bridges existance
2873 _CheckInstanceBridgesExist(self, instance)
2875 remote_info = self.rpc.call_instance_info(instance.primary_node,
2877 instance.hypervisor)
2879 if not remote_info.data:
2880 _CheckNodeFreeMemory(self, instance.primary_node,
2881 "starting instance %s" % instance.name,
2882 bep[constants.BE_MEMORY], instance.hypervisor)
2884 def Exec(self, feedback_fn):
2885 """Start the instance.
2888 instance = self.instance
2889 force = self.op.force
2891 self.cfg.MarkInstanceUp(instance.name)
2893 node_current = instance.primary_node
2895 _StartInstanceDisks(self, instance, force)
2897 result = self.rpc.call_instance_start(node_current, instance,
2898 self.hvparams, self.beparams)
2899 msg = result.RemoteFailMsg()
2901 _ShutdownInstanceDisks(self, instance)
2902 raise errors.OpExecError("Could not start instance: %s" % msg)
2905 class LURebootInstance(LogicalUnit):
2906 """Reboot an instance.
2909 HPATH = "instance-reboot"
2910 HTYPE = constants.HTYPE_INSTANCE
2911 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2914 def ExpandNames(self):
2915 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2916 constants.INSTANCE_REBOOT_HARD,
2917 constants.INSTANCE_REBOOT_FULL]:
2918 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2919 (constants.INSTANCE_REBOOT_SOFT,
2920 constants.INSTANCE_REBOOT_HARD,
2921 constants.INSTANCE_REBOOT_FULL))
2922 self._ExpandAndLockInstance()
2924 def BuildHooksEnv(self):
2927 This runs on master, primary and secondary nodes of the instance.
2931 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2932 "REBOOT_TYPE": self.op.reboot_type,
2934 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2935 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2938 def CheckPrereq(self):
2939 """Check prerequisites.
2941 This checks that the instance is in the cluster.
2944 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2945 assert self.instance is not None, \
2946 "Cannot retrieve locked instance %s" % self.op.instance_name
2948 _CheckNodeOnline(self, instance.primary_node)
2950 # check bridges existance
2951 _CheckInstanceBridgesExist(self, instance)
2953 def Exec(self, feedback_fn):
2954 """Reboot the instance.
2957 instance = self.instance
2958 ignore_secondaries = self.op.ignore_secondaries
2959 reboot_type = self.op.reboot_type
2961 node_current = instance.primary_node
2963 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2964 constants.INSTANCE_REBOOT_HARD]:
2965 for disk in instance.disks:
2966 self.cfg.SetDiskID(disk, node_current)
2967 result = self.rpc.call_instance_reboot(node_current, instance,
2969 msg = result.RemoteFailMsg()
2971 raise errors.OpExecError("Could not reboot instance: %s" % msg)
2973 result = self.rpc.call_instance_shutdown(node_current, instance)
2974 msg = result.RemoteFailMsg()
2976 raise errors.OpExecError("Could not shutdown instance for"
2977 " full reboot: %s" % msg)
2978 _ShutdownInstanceDisks(self, instance)
2979 _StartInstanceDisks(self, instance, ignore_secondaries)
2980 result = self.rpc.call_instance_start(node_current, instance, None, None)
2981 msg = result.RemoteFailMsg()
2983 _ShutdownInstanceDisks(self, instance)
2984 raise errors.OpExecError("Could not start instance for"
2985 " full reboot: %s" % msg)
2987 self.cfg.MarkInstanceUp(instance.name)
2990 class LUShutdownInstance(LogicalUnit):
2991 """Shutdown an instance.
2994 HPATH = "instance-stop"
2995 HTYPE = constants.HTYPE_INSTANCE
2996 _OP_REQP = ["instance_name"]
2999 def ExpandNames(self):
3000 self._ExpandAndLockInstance()
3002 def BuildHooksEnv(self):
3005 This runs on master, primary and secondary nodes of the instance.
3008 env = _BuildInstanceHookEnvByObject(self, self.instance)
3009 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3012 def CheckPrereq(self):
3013 """Check prerequisites.
3015 This checks that the instance is in the cluster.
3018 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3019 assert self.instance is not None, \
3020 "Cannot retrieve locked instance %s" % self.op.instance_name
3021 _CheckNodeOnline(self, self.instance.primary_node)
3023 def Exec(self, feedback_fn):
3024 """Shutdown the instance.
3027 instance = self.instance
3028 node_current = instance.primary_node
3029 self.cfg.MarkInstanceDown(instance.name)
3030 result = self.rpc.call_instance_shutdown(node_current, instance)
3031 msg = result.RemoteFailMsg()
3033 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3035 _ShutdownInstanceDisks(self, instance)
3038 class LUReinstallInstance(LogicalUnit):
3039 """Reinstall an instance.
3042 HPATH = "instance-reinstall"
3043 HTYPE = constants.HTYPE_INSTANCE
3044 _OP_REQP = ["instance_name"]
3047 def ExpandNames(self):
3048 self._ExpandAndLockInstance()
3050 def BuildHooksEnv(self):
3053 This runs on master, primary and secondary nodes of the instance.
3056 env = _BuildInstanceHookEnvByObject(self, self.instance)
3057 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3060 def CheckPrereq(self):
3061 """Check prerequisites.
3063 This checks that the instance is in the cluster and is not running.
3066 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3067 assert instance is not None, \
3068 "Cannot retrieve locked instance %s" % self.op.instance_name
3069 _CheckNodeOnline(self, instance.primary_node)
3071 if instance.disk_template == constants.DT_DISKLESS:
3072 raise errors.OpPrereqError("Instance '%s' has no disks" %
3073 self.op.instance_name)
3074 if instance.admin_up:
3075 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3076 self.op.instance_name)
3077 remote_info = self.rpc.call_instance_info(instance.primary_node,
3079 instance.hypervisor)
3081 if remote_info.data:
3082 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3083 (self.op.instance_name,
3084 instance.primary_node))
3086 self.op.os_type = getattr(self.op, "os_type", None)
3087 if self.op.os_type is not None:
3089 pnode = self.cfg.GetNodeInfo(
3090 self.cfg.ExpandNodeName(instance.primary_node))
3092 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3094 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3096 if not isinstance(result.data, objects.OS):
3097 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3098 " primary node" % self.op.os_type)
3100 self.instance = instance
3102 def Exec(self, feedback_fn):
3103 """Reinstall the instance.
3106 inst = self.instance
3108 if self.op.os_type is not None:
3109 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3110 inst.os = self.op.os_type
3111 self.cfg.Update(inst)
3113 _StartInstanceDisks(self, inst, None)
3115 feedback_fn("Running the instance OS create scripts...")
3116 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3117 msg = result.RemoteFailMsg()
3119 raise errors.OpExecError("Could not install OS for instance %s"
3121 (inst.name, inst.primary_node, msg))
3123 _ShutdownInstanceDisks(self, inst)
3126 class LURenameInstance(LogicalUnit):
3127 """Rename an instance.
3130 HPATH = "instance-rename"
3131 HTYPE = constants.HTYPE_INSTANCE
3132 _OP_REQP = ["instance_name", "new_name"]
3134 def BuildHooksEnv(self):
3137 This runs on master, primary and secondary nodes of the instance.
3140 env = _BuildInstanceHookEnvByObject(self, self.instance)
3141 env["INSTANCE_NEW_NAME"] = self.op.new_name
3142 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3145 def CheckPrereq(self):
3146 """Check prerequisites.
3148 This checks that the instance is in the cluster and is not running.
3151 instance = self.cfg.GetInstanceInfo(
3152 self.cfg.ExpandInstanceName(self.op.instance_name))
3153 if instance is None:
3154 raise errors.OpPrereqError("Instance '%s' not known" %
3155 self.op.instance_name)
3156 _CheckNodeOnline(self, instance.primary_node)
3158 if instance.admin_up:
3159 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3160 self.op.instance_name)
3161 remote_info = self.rpc.call_instance_info(instance.primary_node,
3163 instance.hypervisor)
3165 if remote_info.data:
3166 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3167 (self.op.instance_name,
3168 instance.primary_node))
3169 self.instance = instance
3171 # new name verification
3172 name_info = utils.HostInfo(self.op.new_name)
3174 self.op.new_name = new_name = name_info.name
3175 instance_list = self.cfg.GetInstanceList()
3176 if new_name in instance_list:
3177 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3180 if not getattr(self.op, "ignore_ip", False):
3181 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3182 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3183 (name_info.ip, new_name))
3186 def Exec(self, feedback_fn):
3187 """Reinstall the instance.
3190 inst = self.instance
3191 old_name = inst.name
3193 if inst.disk_template == constants.DT_FILE:
3194 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3196 self.cfg.RenameInstance(inst.name, self.op.new_name)
3197 # Change the instance lock. This is definitely safe while we hold the BGL
3198 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3199 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3201 # re-read the instance from the configuration after rename
3202 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3204 if inst.disk_template == constants.DT_FILE:
3205 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3206 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3207 old_file_storage_dir,
3208 new_file_storage_dir)
3211 raise errors.OpExecError("Could not connect to node '%s' to rename"
3212 " directory '%s' to '%s' (but the instance"
3213 " has been renamed in Ganeti)" % (
3214 inst.primary_node, old_file_storage_dir,
3215 new_file_storage_dir))
3217 if not result.data[0]:
3218 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3219 " (but the instance has been renamed in"
3220 " Ganeti)" % (old_file_storage_dir,
3221 new_file_storage_dir))
3223 _StartInstanceDisks(self, inst, None)
3225 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3227 msg = result.RemoteFailMsg()
3229 msg = ("Could not run OS rename script for instance %s on node %s"
3230 " (but the instance has been renamed in Ganeti): %s" %
3231 (inst.name, inst.primary_node, msg))
3232 self.proc.LogWarning(msg)
3234 _ShutdownInstanceDisks(self, inst)
3237 class LURemoveInstance(LogicalUnit):
3238 """Remove an instance.
3241 HPATH = "instance-remove"
3242 HTYPE = constants.HTYPE_INSTANCE
3243 _OP_REQP = ["instance_name", "ignore_failures"]
3246 def ExpandNames(self):
3247 self._ExpandAndLockInstance()
3248 self.needed_locks[locking.LEVEL_NODE] = []
3249 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3251 def DeclareLocks(self, level):
3252 if level == locking.LEVEL_NODE:
3253 self._LockInstancesNodes()
3255 def BuildHooksEnv(self):
3258 This runs on master, primary and secondary nodes of the instance.
3261 env = _BuildInstanceHookEnvByObject(self, self.instance)
3262 nl = [self.cfg.GetMasterNode()]
3265 def CheckPrereq(self):
3266 """Check prerequisites.
3268 This checks that the instance is in the cluster.
3271 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3272 assert self.instance is not None, \
3273 "Cannot retrieve locked instance %s" % self.op.instance_name
3275 def Exec(self, feedback_fn):
3276 """Remove the instance.
3279 instance = self.instance
3280 logging.info("Shutting down instance %s on node %s",
3281 instance.name, instance.primary_node)
3283 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3284 msg = result.RemoteFailMsg()
3286 if self.op.ignore_failures:
3287 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3289 raise errors.OpExecError("Could not shutdown instance %s on"
3291 (instance.name, instance.primary_node, msg))
3293 logging.info("Removing block devices for instance %s", instance.name)
3295 if not _RemoveDisks(self, instance):
3296 if self.op.ignore_failures:
3297 feedback_fn("Warning: can't remove instance's disks")
3299 raise errors.OpExecError("Can't remove instance's disks")
3301 logging.info("Removing instance %s out of cluster config", instance.name)
3303 self.cfg.RemoveInstance(instance.name)
3304 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3307 class LUQueryInstances(NoHooksLU):
3308 """Logical unit for querying instances.
3311 _OP_REQP = ["output_fields", "names", "use_locking"]
3313 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3315 "disk_template", "ip", "mac", "bridge",
3316 "sda_size", "sdb_size", "vcpus", "tags",
3317 "network_port", "beparams",
3318 r"(disk)\.(size)/([0-9]+)",
3319 r"(disk)\.(sizes)", "disk_usage",
3320 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3321 r"(nic)\.(macs|ips|bridges)",
3322 r"(disk|nic)\.(count)",
3323 "serial_no", "hypervisor", "hvparams",] +
3325 for name in constants.HVS_PARAMETERS] +
3327 for name in constants.BES_PARAMETERS])
3328 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3331 def ExpandNames(self):
3332 _CheckOutputFields(static=self._FIELDS_STATIC,
3333 dynamic=self._FIELDS_DYNAMIC,
3334 selected=self.op.output_fields)
3336 self.needed_locks = {}
3337 self.share_locks[locking.LEVEL_INSTANCE] = 1
3338 self.share_locks[locking.LEVEL_NODE] = 1
3341 self.wanted = _GetWantedInstances(self, self.op.names)
3343 self.wanted = locking.ALL_SET
3345 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3346 self.do_locking = self.do_node_query and self.op.use_locking
3348 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3349 self.needed_locks[locking.LEVEL_NODE] = []
3350 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3352 def DeclareLocks(self, level):
3353 if level == locking.LEVEL_NODE and self.do_locking:
3354 self._LockInstancesNodes()
3356 def CheckPrereq(self):
3357 """Check prerequisites.
3362 def Exec(self, feedback_fn):
3363 """Computes the list of nodes and their attributes.
3366 all_info = self.cfg.GetAllInstancesInfo()
3367 if self.wanted == locking.ALL_SET:
3368 # caller didn't specify instance names, so ordering is not important
3370 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3372 instance_names = all_info.keys()
3373 instance_names = utils.NiceSort(instance_names)
3375 # caller did specify names, so we must keep the ordering
3377 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3379 tgt_set = all_info.keys()
3380 missing = set(self.wanted).difference(tgt_set)
3382 raise errors.OpExecError("Some instances were removed before"
3383 " retrieving their data: %s" % missing)
3384 instance_names = self.wanted
3386 instance_list = [all_info[iname] for iname in instance_names]
3388 # begin data gathering
3390 nodes = frozenset([inst.primary_node for inst in instance_list])
3391 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3395 if self.do_node_query:
3397 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3399 result = node_data[name]
3401 # offline nodes will be in both lists
3402 off_nodes.append(name)
3404 bad_nodes.append(name)
3407 live_data.update(result.data)
3408 # else no instance is alive
3410 live_data = dict([(name, {}) for name in instance_names])
3412 # end data gathering
3417 for instance in instance_list:
3419 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3420 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3421 for field in self.op.output_fields:
3422 st_match = self._FIELDS_STATIC.Matches(field)
3427 elif field == "pnode":
3428 val = instance.primary_node
3429 elif field == "snodes":
3430 val = list(instance.secondary_nodes)
3431 elif field == "admin_state":
3432 val = instance.admin_up
3433 elif field == "oper_state":
3434 if instance.primary_node in bad_nodes:
3437 val = bool(live_data.get(instance.name))
3438 elif field == "status":
3439 if instance.primary_node in off_nodes:
3440 val = "ERROR_nodeoffline"
3441 elif instance.primary_node in bad_nodes:
3442 val = "ERROR_nodedown"
3444 running = bool(live_data.get(instance.name))
3446 if instance.admin_up:
3451 if instance.admin_up:
3455 elif field == "oper_ram":
3456 if instance.primary_node in bad_nodes:
3458 elif instance.name in live_data:
3459 val = live_data[instance.name].get("memory", "?")
3462 elif field == "disk_template":
3463 val = instance.disk_template
3465 val = instance.nics[0].ip
3466 elif field == "bridge":
3467 val = instance.nics[0].bridge
3468 elif field == "mac":
3469 val = instance.nics[0].mac
3470 elif field == "sda_size" or field == "sdb_size":
3471 idx = ord(field[2]) - ord('a')
3473 val = instance.FindDisk(idx).size
3474 except errors.OpPrereqError:
3476 elif field == "disk_usage": # total disk usage per node
3477 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3478 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3479 elif field == "tags":
3480 val = list(instance.GetTags())
3481 elif field == "serial_no":
3482 val = instance.serial_no
3483 elif field == "network_port":
3484 val = instance.network_port
3485 elif field == "hypervisor":
3486 val = instance.hypervisor
3487 elif field == "hvparams":
3489 elif (field.startswith(HVPREFIX) and
3490 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3491 val = i_hv.get(field[len(HVPREFIX):], None)
3492 elif field == "beparams":
3494 elif (field.startswith(BEPREFIX) and
3495 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3496 val = i_be.get(field[len(BEPREFIX):], None)
3497 elif st_match and st_match.groups():
3498 # matches a variable list
3499 st_groups = st_match.groups()
3500 if st_groups and st_groups[0] == "disk":
3501 if st_groups[1] == "count":
3502 val = len(instance.disks)
3503 elif st_groups[1] == "sizes":
3504 val = [disk.size for disk in instance.disks]
3505 elif st_groups[1] == "size":
3507 val = instance.FindDisk(st_groups[2]).size
3508 except errors.OpPrereqError:
3511 assert False, "Unhandled disk parameter"
3512 elif st_groups[0] == "nic":
3513 if st_groups[1] == "count":
3514 val = len(instance.nics)
3515 elif st_groups[1] == "macs":
3516 val = [nic.mac for nic in instance.nics]
3517 elif st_groups[1] == "ips":
3518 val = [nic.ip for nic in instance.nics]
3519 elif st_groups[1] == "bridges":
3520 val = [nic.bridge for nic in instance.nics]
3523 nic_idx = int(st_groups[2])
3524 if nic_idx >= len(instance.nics):
3527 if st_groups[1] == "mac":
3528 val = instance.nics[nic_idx].mac
3529 elif st_groups[1] == "ip":
3530 val = instance.nics[nic_idx].ip
3531 elif st_groups[1] == "bridge":
3532 val = instance.nics[nic_idx].bridge
3534 assert False, "Unhandled NIC parameter"
3536 assert False, "Unhandled variable parameter"
3538 raise errors.ParameterError(field)
3545 class LUFailoverInstance(LogicalUnit):
3546 """Failover an instance.
3549 HPATH = "instance-failover"
3550 HTYPE = constants.HTYPE_INSTANCE
3551 _OP_REQP = ["instance_name", "ignore_consistency"]
3554 def ExpandNames(self):
3555 self._ExpandAndLockInstance()
3556 self.needed_locks[locking.LEVEL_NODE] = []
3557 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3559 def DeclareLocks(self, level):
3560 if level == locking.LEVEL_NODE:
3561 self._LockInstancesNodes()
3563 def BuildHooksEnv(self):
3566 This runs on master, primary and secondary nodes of the instance.
3570 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3572 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3573 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3576 def CheckPrereq(self):
3577 """Check prerequisites.
3579 This checks that the instance is in the cluster.
3582 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3583 assert self.instance is not None, \
3584 "Cannot retrieve locked instance %s" % self.op.instance_name
3586 bep = self.cfg.GetClusterInfo().FillBE(instance)
3587 if instance.disk_template not in constants.DTS_NET_MIRROR:
3588 raise errors.OpPrereqError("Instance's disk layout is not"
3589 " network mirrored, cannot failover.")
3591 secondary_nodes = instance.secondary_nodes
3592 if not secondary_nodes:
3593 raise errors.ProgrammerError("no secondary node but using "
3594 "a mirrored disk template")
3596 target_node = secondary_nodes[0]
3597 _CheckNodeOnline(self, target_node)
3598 _CheckNodeNotDrained(self, target_node)
3599 # check memory requirements on the secondary node
3600 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3601 instance.name, bep[constants.BE_MEMORY],
3602 instance.hypervisor)
3604 # check bridge existance
3605 brlist = [nic.bridge for nic in instance.nics]
3606 result = self.rpc.call_bridges_exist(target_node, brlist)
3609 raise errors.OpPrereqError("One or more target bridges %s does not"
3610 " exist on destination node '%s'" %
3611 (brlist, target_node))
3613 def Exec(self, feedback_fn):
3614 """Failover an instance.
3616 The failover is done by shutting it down on its present node and
3617 starting it on the secondary.
3620 instance = self.instance
3622 source_node = instance.primary_node
3623 target_node = instance.secondary_nodes[0]
3625 feedback_fn("* checking disk consistency between source and target")
3626 for dev in instance.disks:
3627 # for drbd, these are drbd over lvm
3628 if not _CheckDiskConsistency(self, dev, target_node, False):
3629 if instance.admin_up and not self.op.ignore_consistency:
3630 raise errors.OpExecError("Disk %s is degraded on target node,"
3631 " aborting failover." % dev.iv_name)
3633 feedback_fn("* shutting down instance on source node")
3634 logging.info("Shutting down instance %s on node %s",
3635 instance.name, source_node)
3637 result = self.rpc.call_instance_shutdown(source_node, instance)
3638 msg = result.RemoteFailMsg()
3640 if self.op.ignore_consistency:
3641 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3642 " Proceeding anyway. Please make sure node"
3643 " %s is down. Error details: %s",
3644 instance.name, source_node, source_node, msg)
3646 raise errors.OpExecError("Could not shutdown instance %s on"
3648 (instance.name, source_node, msg))
3650 feedback_fn("* deactivating the instance's disks on source node")
3651 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3652 raise errors.OpExecError("Can't shut down the instance's disks.")
3654 instance.primary_node = target_node
3655 # distribute new instance config to the other nodes
3656 self.cfg.Update(instance)
3658 # Only start the instance if it's marked as up
3659 if instance.admin_up:
3660 feedback_fn("* activating the instance's disks on target node")
3661 logging.info("Starting instance %s on node %s",
3662 instance.name, target_node)
3664 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3665 ignore_secondaries=True)
3667 _ShutdownInstanceDisks(self, instance)
3668 raise errors.OpExecError("Can't activate the instance's disks")
3670 feedback_fn("* starting the instance on the target node")
3671 result = self.rpc.call_instance_start(target_node, instance, None, None)
3672 msg = result.RemoteFailMsg()
3674 _ShutdownInstanceDisks(self, instance)
3675 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3676 (instance.name, target_node, msg))
3679 class LUMigrateInstance(LogicalUnit):
3680 """Migrate an instance.
3682 This is migration without shutting down, compared to the failover,
3683 which is done with shutdown.
3686 HPATH = "instance-migrate"
3687 HTYPE = constants.HTYPE_INSTANCE
3688 _OP_REQP = ["instance_name", "live", "cleanup"]
3692 def ExpandNames(self):
3693 self._ExpandAndLockInstance()
3694 self.needed_locks[locking.LEVEL_NODE] = []
3695 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3697 def DeclareLocks(self, level):
3698 if level == locking.LEVEL_NODE:
3699 self._LockInstancesNodes()
3701 def BuildHooksEnv(self):
3704 This runs on master, primary and secondary nodes of the instance.
3707 env = _BuildInstanceHookEnvByObject(self, self.instance)
3708 env["MIGRATE_LIVE"] = self.op.live
3709 env["MIGRATE_CLEANUP"] = self.op.cleanup
3710 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3713 def CheckPrereq(self):
3714 """Check prerequisites.
3716 This checks that the instance is in the cluster.
3719 instance = self.cfg.GetInstanceInfo(
3720 self.cfg.ExpandInstanceName(self.op.instance_name))
3721 if instance is None:
3722 raise errors.OpPrereqError("Instance '%s' not known" %
3723 self.op.instance_name)
3725 if instance.disk_template != constants.DT_DRBD8:
3726 raise errors.OpPrereqError("Instance's disk layout is not"
3727 " drbd8, cannot migrate.")
3729 secondary_nodes = instance.secondary_nodes
3730 if not secondary_nodes:
3731 raise errors.ConfigurationError("No secondary node but using"
3732 " drbd8 disk template")
3734 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3736 target_node = secondary_nodes[0]
3737 # check memory requirements on the secondary node
3738 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3739 instance.name, i_be[constants.BE_MEMORY],
3740 instance.hypervisor)
3742 # check bridge existance
3743 brlist = [nic.bridge for nic in instance.nics]
3744 result = self.rpc.call_bridges_exist(target_node, brlist)
3745 if result.failed or not result.data:
3746 raise errors.OpPrereqError("One or more target bridges %s does not"
3747 " exist on destination node '%s'" %
3748 (brlist, target_node))
3750 if not self.op.cleanup:
3751 _CheckNodeNotDrained(self, target_node)
3752 result = self.rpc.call_instance_migratable(instance.primary_node,
3754 msg = result.RemoteFailMsg()
3756 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3759 self.instance = instance
3761 def _WaitUntilSync(self):
3762 """Poll with custom rpc for disk sync.
3764 This uses our own step-based rpc call.
3767 self.feedback_fn("* wait until resync is done")
3771 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3773 self.instance.disks)
3775 for node, nres in result.items():
3776 msg = nres.RemoteFailMsg()
3778 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3780 node_done, node_percent = nres.payload
3781 all_done = all_done and node_done
3782 if node_percent is not None:
3783 min_percent = min(min_percent, node_percent)
3785 if min_percent < 100:
3786 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3789 def _EnsureSecondary(self, node):
3790 """Demote a node to secondary.
3793 self.feedback_fn("* switching node %s to secondary mode" % node)
3795 for dev in self.instance.disks:
3796 self.cfg.SetDiskID(dev, node)
3798 result = self.rpc.call_blockdev_close(node, self.instance.name,
3799 self.instance.disks)
3800 msg = result.RemoteFailMsg()
3802 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3803 " error %s" % (node, msg))
3805 def _GoStandalone(self):
3806 """Disconnect from the network.
3809 self.feedback_fn("* changing into standalone mode")
3810 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3811 self.instance.disks)
3812 for node, nres in result.items():
3813 msg = nres.RemoteFailMsg()
3815 raise errors.OpExecError("Cannot disconnect disks node %s,"
3816 " error %s" % (node, msg))
3818 def _GoReconnect(self, multimaster):
3819 """Reconnect to the network.
3825 msg = "single-master"
3826 self.feedback_fn("* changing disks into %s mode" % msg)
3827 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3828 self.instance.disks,
3829 self.instance.name, multimaster)
3830 for node, nres in result.items():
3831 msg = nres.RemoteFailMsg()
3833 raise errors.OpExecError("Cannot change disks config on node %s,"
3834 " error: %s" % (node, msg))
3836 def _ExecCleanup(self):
3837 """Try to cleanup after a failed migration.
3839 The cleanup is done by:
3840 - check that the instance is running only on one node
3841 (and update the config if needed)
3842 - change disks on its secondary node to secondary
3843 - wait until disks are fully synchronized
3844 - disconnect from the network
3845 - change disks into single-master mode
3846 - wait again until disks are fully synchronized
3849 instance = self.instance
3850 target_node = self.target_node
3851 source_node = self.source_node
3853 # check running on only one node
3854 self.feedback_fn("* checking where the instance actually runs"
3855 " (if this hangs, the hypervisor might be in"
3857 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3858 for node, result in ins_l.items():
3860 if not isinstance(result.data, list):
3861 raise errors.OpExecError("Can't contact node '%s'" % node)
3863 runningon_source = instance.name in ins_l[source_node].data
3864 runningon_target = instance.name in ins_l[target_node].data
3866 if runningon_source and runningon_target:
3867 raise errors.OpExecError("Instance seems to be running on two nodes,"
3868 " or the hypervisor is confused. You will have"
3869 " to ensure manually that it runs only on one"
3870 " and restart this operation.")
3872 if not (runningon_source or runningon_target):
3873 raise errors.OpExecError("Instance does not seem to be running at all."
3874 " In this case, it's safer to repair by"
3875 " running 'gnt-instance stop' to ensure disk"
3876 " shutdown, and then restarting it.")
3878 if runningon_target:
3879 # the migration has actually succeeded, we need to update the config
3880 self.feedback_fn("* instance running on secondary node (%s),"
3881 " updating config" % target_node)
3882 instance.primary_node = target_node
3883 self.cfg.Update(instance)
3884 demoted_node = source_node
3886 self.feedback_fn("* instance confirmed to be running on its"
3887 " primary node (%s)" % source_node)
3888 demoted_node = target_node
3890 self._EnsureSecondary(demoted_node)
3892 self._WaitUntilSync()
3893 except errors.OpExecError:
3894 # we ignore here errors, since if the device is standalone, it
3895 # won't be able to sync
3897 self._GoStandalone()
3898 self._GoReconnect(False)
3899 self._WaitUntilSync()
3901 self.feedback_fn("* done")
3903 def _RevertDiskStatus(self):
3904 """Try to revert the disk status after a failed migration.
3907 target_node = self.target_node
3909 self._EnsureSecondary(target_node)
3910 self._GoStandalone()
3911 self._GoReconnect(False)
3912 self._WaitUntilSync()
3913 except errors.OpExecError, err:
3914 self.LogWarning("Migration failed and I can't reconnect the"
3915 " drives: error '%s'\n"
3916 "Please look and recover the instance status" %
3919 def _AbortMigration(self):
3920 """Call the hypervisor code to abort a started migration.
3923 instance = self.instance
3924 target_node = self.target_node
3925 migration_info = self.migration_info
3927 abort_result = self.rpc.call_finalize_migration(target_node,
3931 abort_msg = abort_result.RemoteFailMsg()
3933 logging.error("Aborting migration failed on target node %s: %s" %
3934 (target_node, abort_msg))
3935 # Don't raise an exception here, as we stil have to try to revert the
3936 # disk status, even if this step failed.
3938 def _ExecMigration(self):
3939 """Migrate an instance.
3941 The migrate is done by:
3942 - change the disks into dual-master mode
3943 - wait until disks are fully synchronized again
3944 - migrate the instance
3945 - change disks on the new secondary node (the old primary) to secondary
3946 - wait until disks are fully synchronized
3947 - change disks into single-master mode
3950 instance = self.instance
3951 target_node = self.target_node
3952 source_node = self.source_node
3954 self.feedback_fn("* checking disk consistency between source and target")
3955 for dev in instance.disks:
3956 if not _CheckDiskConsistency(self, dev, target_node, False):
3957 raise errors.OpExecError("Disk %s is degraded or not fully"
3958 " synchronized on target node,"
3959 " aborting migrate." % dev.iv_name)
3961 # First get the migration information from the remote node
3962 result = self.rpc.call_migration_info(source_node, instance)
3963 msg = result.RemoteFailMsg()
3965 log_err = ("Failed fetching source migration information from %s: %s" %
3967 logging.error(log_err)
3968 raise errors.OpExecError(log_err)
3970 self.migration_info = migration_info = result.payload
3972 # Then switch the disks to master/master mode
3973 self._EnsureSecondary(target_node)
3974 self._GoStandalone()
3975 self._GoReconnect(True)
3976 self._WaitUntilSync()
3978 self.feedback_fn("* preparing %s to accept the instance" % target_node)
3979 result = self.rpc.call_accept_instance(target_node,
3982 self.nodes_ip[target_node])
3984 msg = result.RemoteFailMsg()
3986 logging.error("Instance pre-migration failed, trying to revert"
3987 " disk status: %s", msg)
3988 self._AbortMigration()
3989 self._RevertDiskStatus()
3990 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3991 (instance.name, msg))
3993 self.feedback_fn("* migrating instance to %s" % target_node)
3995 result = self.rpc.call_instance_migrate(source_node, instance,
3996 self.nodes_ip[target_node],
3998 msg = result.RemoteFailMsg()
4000 logging.error("Instance migration failed, trying to revert"
4001 " disk status: %s", msg)
4002 self._AbortMigration()
4003 self._RevertDiskStatus()
4004 raise errors.OpExecError("Could not migrate instance %s: %s" %
4005 (instance.name, msg))
4008 instance.primary_node = target_node
4009 # distribute new instance config to the other nodes
4010 self.cfg.Update(instance)
4012 result = self.rpc.call_finalize_migration(target_node,
4016 msg = result.RemoteFailMsg()
4018 logging.error("Instance migration succeeded, but finalization failed:"
4020 raise errors.OpExecError("Could not finalize instance migration: %s" %
4023 self._EnsureSecondary(source_node)
4024 self._WaitUntilSync()
4025 self._GoStandalone()
4026 self._GoReconnect(False)
4027 self._WaitUntilSync()
4029 self.feedback_fn("* done")
4031 def Exec(self, feedback_fn):
4032 """Perform the migration.
4035 self.feedback_fn = feedback_fn
4037 self.source_node = self.instance.primary_node
4038 self.target_node = self.instance.secondary_nodes[0]
4039 self.all_nodes = [self.source_node, self.target_node]
4041 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4042 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4045 return self._ExecCleanup()
4047 return self._ExecMigration()
4050 def _CreateBlockDev(lu, node, instance, device, force_create,
4052 """Create a tree of block devices on a given node.
4054 If this device type has to be created on secondaries, create it and
4057 If not, just recurse to children keeping the same 'force' value.
4059 @param lu: the lu on whose behalf we execute
4060 @param node: the node on which to create the device
4061 @type instance: L{objects.Instance}
4062 @param instance: the instance which owns the device
4063 @type device: L{objects.Disk}
4064 @param device: the device to create
4065 @type force_create: boolean
4066 @param force_create: whether to force creation of this device; this
4067 will be change to True whenever we find a device which has
4068 CreateOnSecondary() attribute
4069 @param info: the extra 'metadata' we should attach to the device
4070 (this will be represented as a LVM tag)
4071 @type force_open: boolean
4072 @param force_open: this parameter will be passes to the
4073 L{backend.BlockdevCreate} function where it specifies
4074 whether we run on primary or not, and it affects both
4075 the child assembly and the device own Open() execution
4078 if device.CreateOnSecondary():
4082 for child in device.children:
4083 _CreateBlockDev(lu, node, instance, child, force_create,
4086 if not force_create:
4089 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4092 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4093 """Create a single block device on a given node.
4095 This will not recurse over children of the device, so they must be
4098 @param lu: the lu on whose behalf we execute
4099 @param node: the node on which to create the device
4100 @type instance: L{objects.Instance}
4101 @param instance: the instance which owns the device
4102 @type device: L{objects.Disk}
4103 @param device: the device to create
4104 @param info: the extra 'metadata' we should attach to the device
4105 (this will be represented as a LVM tag)
4106 @type force_open: boolean
4107 @param force_open: this parameter will be passes to the
4108 L{backend.BlockdevCreate} function where it specifies
4109 whether we run on primary or not, and it affects both
4110 the child assembly and the device own Open() execution
4113 lu.cfg.SetDiskID(device, node)
4114 result = lu.rpc.call_blockdev_create(node, device, device.size,
4115 instance.name, force_open, info)
4116 msg = result.RemoteFailMsg()
4118 raise errors.OpExecError("Can't create block device %s on"
4119 " node %s for instance %s: %s" %
4120 (device, node, instance.name, msg))
4121 if device.physical_id is None:
4122 device.physical_id = result.payload
4125 def _GenerateUniqueNames(lu, exts):
4126 """Generate a suitable LV name.
4128 This will generate a logical volume name for the given instance.
4133 new_id = lu.cfg.GenerateUniqueID()
4134 results.append("%s%s" % (new_id, val))
4138 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4140 """Generate a drbd8 device complete with its children.
4143 port = lu.cfg.AllocatePort()
4144 vgname = lu.cfg.GetVGName()
4145 shared_secret = lu.cfg.GenerateDRBDSecret()
4146 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4147 logical_id=(vgname, names[0]))
4148 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4149 logical_id=(vgname, names[1]))
4150 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4151 logical_id=(primary, secondary, port,
4154 children=[dev_data, dev_meta],
4159 def _GenerateDiskTemplate(lu, template_name,
4160 instance_name, primary_node,
4161 secondary_nodes, disk_info,
4162 file_storage_dir, file_driver,
4164 """Generate the entire disk layout for a given template type.
4167 #TODO: compute space requirements
4169 vgname = lu.cfg.GetVGName()
4170 disk_count = len(disk_info)
4172 if template_name == constants.DT_DISKLESS:
4174 elif template_name == constants.DT_PLAIN:
4175 if len(secondary_nodes) != 0:
4176 raise errors.ProgrammerError("Wrong template configuration")
4178 names = _GenerateUniqueNames(lu, [".disk%d" % i
4179 for i in range(disk_count)])
4180 for idx, disk in enumerate(disk_info):
4181 disk_index = idx + base_index
4182 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4183 logical_id=(vgname, names[idx]),
4184 iv_name="disk/%d" % disk_index,
4186 disks.append(disk_dev)
4187 elif template_name == constants.DT_DRBD8:
4188 if len(secondary_nodes) != 1:
4189 raise errors.ProgrammerError("Wrong template configuration")
4190 remote_node = secondary_nodes[0]
4191 minors = lu.cfg.AllocateDRBDMinor(
4192 [primary_node, remote_node] * len(disk_info), instance_name)
4195 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4196 for i in range(disk_count)]):
4197 names.append(lv_prefix + "_data")
4198 names.append(lv_prefix + "_meta")
4199 for idx, disk in enumerate(disk_info):
4200 disk_index = idx + base_index
4201 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4202 disk["size"], names[idx*2:idx*2+2],
4203 "disk/%d" % disk_index,
4204 minors[idx*2], minors[idx*2+1])
4205 disk_dev.mode = disk["mode"]
4206 disks.append(disk_dev)
4207 elif template_name == constants.DT_FILE:
4208 if len(secondary_nodes) != 0:
4209 raise errors.ProgrammerError("Wrong template configuration")
4211 for idx, disk in enumerate(disk_info):
4212 disk_index = idx + base_index
4213 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4214 iv_name="disk/%d" % disk_index,
4215 logical_id=(file_driver,
4216 "%s/disk%d" % (file_storage_dir,
4219 disks.append(disk_dev)
4221 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4225 def _GetInstanceInfoText(instance):
4226 """Compute that text that should be added to the disk's metadata.
4229 return "originstname+%s" % instance.name
4232 def _CreateDisks(lu, instance):
4233 """Create all disks for an instance.
4235 This abstracts away some work from AddInstance.
4237 @type lu: L{LogicalUnit}
4238 @param lu: the logical unit on whose behalf we execute
4239 @type instance: L{objects.Instance}
4240 @param instance: the instance whose disks we should create
4242 @return: the success of the creation
4245 info = _GetInstanceInfoText(instance)
4246 pnode = instance.primary_node
4248 if instance.disk_template == constants.DT_FILE:
4249 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4250 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4252 if result.failed or not result.data:
4253 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4255 if not result.data[0]:
4256 raise errors.OpExecError("Failed to create directory '%s'" %
4259 # Note: this needs to be kept in sync with adding of disks in
4260 # LUSetInstanceParams
4261 for device in instance.disks:
4262 logging.info("Creating volume %s for instance %s",
4263 device.iv_name, instance.name)
4265 for node in instance.all_nodes:
4266 f_create = node == pnode
4267 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4270 def _RemoveDisks(lu, instance):
4271 """Remove all disks for an instance.
4273 This abstracts away some work from `AddInstance()` and
4274 `RemoveInstance()`. Note that in case some of the devices couldn't
4275 be removed, the removal will continue with the other ones (compare
4276 with `_CreateDisks()`).
4278 @type lu: L{LogicalUnit}
4279 @param lu: the logical unit on whose behalf we execute
4280 @type instance: L{objects.Instance}
4281 @param instance: the instance whose disks we should remove
4283 @return: the success of the removal
4286 logging.info("Removing block devices for instance %s", instance.name)
4289 for device in instance.disks:
4290 for node, disk in device.ComputeNodeTree(instance.primary_node):
4291 lu.cfg.SetDiskID(disk, node)
4292 msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4294 lu.LogWarning("Could not remove block device %s on node %s,"
4295 " continuing anyway: %s", device.iv_name, node, msg)
4298 if instance.disk_template == constants.DT_FILE:
4299 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4300 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4302 if result.failed or not result.data:
4303 logging.error("Could not remove directory '%s'", file_storage_dir)
4309 def _ComputeDiskSize(disk_template, disks):
4310 """Compute disk size requirements in the volume group
4313 # Required free disk space as a function of disk and swap space
4315 constants.DT_DISKLESS: None,
4316 constants.DT_PLAIN: sum(d["size"] for d in disks),
4317 # 128 MB are added for drbd metadata for each disk
4318 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4319 constants.DT_FILE: None,
4322 if disk_template not in req_size_dict:
4323 raise errors.ProgrammerError("Disk template '%s' size requirement"
4324 " is unknown" % disk_template)
4326 return req_size_dict[disk_template]
4329 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4330 """Hypervisor parameter validation.
4332 This function abstract the hypervisor parameter validation to be
4333 used in both instance create and instance modify.
4335 @type lu: L{LogicalUnit}
4336 @param lu: the logical unit for which we check
4337 @type nodenames: list
4338 @param nodenames: the list of nodes on which we should check
4339 @type hvname: string
4340 @param hvname: the name of the hypervisor we should use
4341 @type hvparams: dict
4342 @param hvparams: the parameters which we need to check
4343 @raise errors.OpPrereqError: if the parameters are not valid
4346 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4349 for node in nodenames:
4353 msg = info.RemoteFailMsg()
4355 raise errors.OpPrereqError("Hypervisor parameter validation"
4356 " failed on node %s: %s" % (node, msg))
4359 class LUCreateInstance(LogicalUnit):
4360 """Create an instance.
4363 HPATH = "instance-add"
4364 HTYPE = constants.HTYPE_INSTANCE
4365 _OP_REQP = ["instance_name", "disks", "disk_template",
4367 "wait_for_sync", "ip_check", "nics",
4368 "hvparams", "beparams"]
4371 def _ExpandNode(self, node):
4372 """Expands and checks one node name.
4375 node_full = self.cfg.ExpandNodeName(node)
4376 if node_full is None:
4377 raise errors.OpPrereqError("Unknown node %s" % node)
4380 def ExpandNames(self):
4381 """ExpandNames for CreateInstance.
4383 Figure out the right locks for instance creation.
4386 self.needed_locks = {}
4388 # set optional parameters to none if they don't exist
4389 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4390 if not hasattr(self.op, attr):
4391 setattr(self.op, attr, None)
4393 # cheap checks, mostly valid constants given
4395 # verify creation mode
4396 if self.op.mode not in (constants.INSTANCE_CREATE,
4397 constants.INSTANCE_IMPORT):
4398 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4401 # disk template and mirror node verification
4402 if self.op.disk_template not in constants.DISK_TEMPLATES:
4403 raise errors.OpPrereqError("Invalid disk template name")
4405 if self.op.hypervisor is None:
4406 self.op.hypervisor = self.cfg.GetHypervisorType()
4408 cluster = self.cfg.GetClusterInfo()
4409 enabled_hvs = cluster.enabled_hypervisors
4410 if self.op.hypervisor not in enabled_hvs:
4411 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4412 " cluster (%s)" % (self.op.hypervisor,
4413 ",".join(enabled_hvs)))
4415 # check hypervisor parameter syntax (locally)
4416 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4417 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4419 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4420 hv_type.CheckParameterSyntax(filled_hvp)
4422 # fill and remember the beparams dict
4423 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4424 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4427 #### instance parameters check
4429 # instance name verification
4430 hostname1 = utils.HostInfo(self.op.instance_name)
4431 self.op.instance_name = instance_name = hostname1.name
4433 # this is just a preventive check, but someone might still add this
4434 # instance in the meantime, and creation will fail at lock-add time
4435 if instance_name in self.cfg.GetInstanceList():
4436 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4439 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4443 for nic in self.op.nics:
4444 # ip validity checks
4445 ip = nic.get("ip", None)
4446 if ip is None or ip.lower() == "none":
4448 elif ip.lower() == constants.VALUE_AUTO:
4449 nic_ip = hostname1.ip
4451 if not utils.IsValidIP(ip):
4452 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4453 " like a valid IP" % ip)
4456 # MAC address verification
4457 mac = nic.get("mac", constants.VALUE_AUTO)
4458 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4459 if not utils.IsValidMac(mac.lower()):
4460 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4462 # bridge verification
4463 bridge = nic.get("bridge", None)
4465 bridge = self.cfg.GetDefBridge()
4466 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4468 # disk checks/pre-build
4470 for disk in self.op.disks:
4471 mode = disk.get("mode", constants.DISK_RDWR)
4472 if mode not in constants.DISK_ACCESS_SET:
4473 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4475 size = disk.get("size", None)
4477 raise errors.OpPrereqError("Missing disk size")
4481 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4482 self.disks.append({"size": size, "mode": mode})
4484 # used in CheckPrereq for ip ping check
4485 self.check_ip = hostname1.ip
4487 # file storage checks
4488 if (self.op.file_driver and
4489 not self.op.file_driver in constants.FILE_DRIVER):
4490 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4491 self.op.file_driver)
4493 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4494 raise errors.OpPrereqError("File storage directory path not absolute")
4496 ### Node/iallocator related checks
4497 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4498 raise errors.OpPrereqError("One and only one of iallocator and primary"
4499 " node must be given")
4501 if self.op.iallocator:
4502 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4504 self.op.pnode = self._ExpandNode(self.op.pnode)
4505 nodelist = [self.op.pnode]
4506 if self.op.snode is not None:
4507 self.op.snode = self._ExpandNode(self.op.snode)
4508 nodelist.append(self.op.snode)
4509 self.needed_locks[locking.LEVEL_NODE] = nodelist
4511 # in case of import lock the source node too
4512 if self.op.mode == constants.INSTANCE_IMPORT:
4513 src_node = getattr(self.op, "src_node", None)
4514 src_path = getattr(self.op, "src_path", None)
4516 if src_path is None:
4517 self.op.src_path = src_path = self.op.instance_name
4519 if src_node is None:
4520 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4521 self.op.src_node = None
4522 if os.path.isabs(src_path):
4523 raise errors.OpPrereqError("Importing an instance from an absolute"
4524 " path requires a source node option.")
4526 self.op.src_node = src_node = self._ExpandNode(src_node)
4527 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4528 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4529 if not os.path.isabs(src_path):
4530 self.op.src_path = src_path = \
4531 os.path.join(constants.EXPORT_DIR, src_path)
4533 else: # INSTANCE_CREATE
4534 if getattr(self.op, "os_type", None) is None:
4535 raise errors.OpPrereqError("No guest OS specified")
4537 def _RunAllocator(self):
4538 """Run the allocator based on input opcode.
4541 nics = [n.ToDict() for n in self.nics]
4542 ial = IAllocator(self,
4543 mode=constants.IALLOCATOR_MODE_ALLOC,
4544 name=self.op.instance_name,
4545 disk_template=self.op.disk_template,
4548 vcpus=self.be_full[constants.BE_VCPUS],
4549 mem_size=self.be_full[constants.BE_MEMORY],
4552 hypervisor=self.op.hypervisor,
4555 ial.Run(self.op.iallocator)
4558 raise errors.OpPrereqError("Can't compute nodes using"
4559 " iallocator '%s': %s" % (self.op.iallocator,
4561 if len(ial.nodes) != ial.required_nodes:
4562 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4563 " of nodes (%s), required %s" %
4564 (self.op.iallocator, len(ial.nodes),
4565 ial.required_nodes))
4566 self.op.pnode = ial.nodes[0]
4567 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4568 self.op.instance_name, self.op.iallocator,
4569 ", ".join(ial.nodes))
4570 if ial.required_nodes == 2:
4571 self.op.snode = ial.nodes[1]
4573 def BuildHooksEnv(self):
4576 This runs on master, primary and secondary nodes of the instance.
4580 "ADD_MODE": self.op.mode,
4582 if self.op.mode == constants.INSTANCE_IMPORT:
4583 env["SRC_NODE"] = self.op.src_node
4584 env["SRC_PATH"] = self.op.src_path
4585 env["SRC_IMAGES"] = self.src_images
4587 env.update(_BuildInstanceHookEnv(
4588 name=self.op.instance_name,
4589 primary_node=self.op.pnode,
4590 secondary_nodes=self.secondaries,
4591 status=self.op.start,
4592 os_type=self.op.os_type,
4593 memory=self.be_full[constants.BE_MEMORY],
4594 vcpus=self.be_full[constants.BE_VCPUS],
4595 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4596 disk_template=self.op.disk_template,
4597 disks=[(d["size"], d["mode"]) for d in self.disks],
4600 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4605 def CheckPrereq(self):
4606 """Check prerequisites.
4609 if (not self.cfg.GetVGName() and
4610 self.op.disk_template not in constants.DTS_NOT_LVM):
4611 raise errors.OpPrereqError("Cluster does not support lvm-based"
4614 if self.op.mode == constants.INSTANCE_IMPORT:
4615 src_node = self.op.src_node
4616 src_path = self.op.src_path
4618 if src_node is None:
4619 exp_list = self.rpc.call_export_list(
4620 self.acquired_locks[locking.LEVEL_NODE])
4622 for node in exp_list:
4623 if not exp_list[node].failed and src_path in exp_list[node].data:
4625 self.op.src_node = src_node = node
4626 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4630 raise errors.OpPrereqError("No export found for relative path %s" %
4633 _CheckNodeOnline(self, src_node)
4634 result = self.rpc.call_export_info(src_node, src_path)
4637 raise errors.OpPrereqError("No export found in dir %s" % src_path)
4639 export_info = result.data
4640 if not export_info.has_section(constants.INISECT_EXP):
4641 raise errors.ProgrammerError("Corrupted export config")
4643 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4644 if (int(ei_version) != constants.EXPORT_VERSION):
4645 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4646 (ei_version, constants.EXPORT_VERSION))
4648 # Check that the new instance doesn't have less disks than the export
4649 instance_disks = len(self.disks)
4650 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4651 if instance_disks < export_disks:
4652 raise errors.OpPrereqError("Not enough disks to import."
4653 " (instance: %d, export: %d)" %
4654 (instance_disks, export_disks))
4656 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4658 for idx in range(export_disks):
4659 option = 'disk%d_dump' % idx
4660 if export_info.has_option(constants.INISECT_INS, option):
4661 # FIXME: are the old os-es, disk sizes, etc. useful?
4662 export_name = export_info.get(constants.INISECT_INS, option)
4663 image = os.path.join(src_path, export_name)
4664 disk_images.append(image)
4666 disk_images.append(False)
4668 self.src_images = disk_images
4670 old_name = export_info.get(constants.INISECT_INS, 'name')
4671 # FIXME: int() here could throw a ValueError on broken exports
4672 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4673 if self.op.instance_name == old_name:
4674 for idx, nic in enumerate(self.nics):
4675 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4676 nic_mac_ini = 'nic%d_mac' % idx
4677 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4679 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4680 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4681 if self.op.start and not self.op.ip_check:
4682 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4683 " adding an instance in start mode")
4685 if self.op.ip_check:
4686 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4687 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4688 (self.check_ip, self.op.instance_name))
4690 #### mac address generation
4691 # By generating here the mac address both the allocator and the hooks get
4692 # the real final mac address rather than the 'auto' or 'generate' value.
4693 # There is a race condition between the generation and the instance object
4694 # creation, which means that we know the mac is valid now, but we're not
4695 # sure it will be when we actually add the instance. If things go bad
4696 # adding the instance will abort because of a duplicate mac, and the
4697 # creation job will fail.
4698 for nic in self.nics:
4699 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4700 nic.mac = self.cfg.GenerateMAC()
4704 if self.op.iallocator is not None:
4705 self._RunAllocator()
4707 #### node related checks
4709 # check primary node
4710 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4711 assert self.pnode is not None, \
4712 "Cannot retrieve locked node %s" % self.op.pnode
4714 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4717 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4720 self.secondaries = []
4722 # mirror node verification
4723 if self.op.disk_template in constants.DTS_NET_MIRROR:
4724 if self.op.snode is None:
4725 raise errors.OpPrereqError("The networked disk templates need"
4727 if self.op.snode == pnode.name:
4728 raise errors.OpPrereqError("The secondary node cannot be"
4729 " the primary node.")
4730 _CheckNodeOnline(self, self.op.snode)
4731 _CheckNodeNotDrained(self, self.op.snode)
4732 self.secondaries.append(self.op.snode)
4734 nodenames = [pnode.name] + self.secondaries
4736 req_size = _ComputeDiskSize(self.op.disk_template,
4739 # Check lv size requirements
4740 if req_size is not None:
4741 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4743 for node in nodenames:
4744 info = nodeinfo[node]
4748 raise errors.OpPrereqError("Cannot get current information"
4749 " from node '%s'" % node)
4750 vg_free = info.get('vg_free', None)
4751 if not isinstance(vg_free, int):
4752 raise errors.OpPrereqError("Can't compute free disk space on"
4754 if req_size > info['vg_free']:
4755 raise errors.OpPrereqError("Not enough disk space on target node %s."
4756 " %d MB available, %d MB required" %
4757 (node, info['vg_free'], req_size))
4759 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4762 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4764 if not isinstance(result.data, objects.OS):
4765 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4766 " primary node" % self.op.os_type)
4768 # bridge check on primary node
4769 bridges = [n.bridge for n in self.nics]
4770 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4773 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4774 " exist on destination node '%s'" %
4775 (",".join(bridges), pnode.name))
4777 # memory check on primary node
4779 _CheckNodeFreeMemory(self, self.pnode.name,
4780 "creating instance %s" % self.op.instance_name,
4781 self.be_full[constants.BE_MEMORY],
4784 def Exec(self, feedback_fn):
4785 """Create and add the instance to the cluster.
4788 instance = self.op.instance_name
4789 pnode_name = self.pnode.name
4791 ht_kind = self.op.hypervisor
4792 if ht_kind in constants.HTS_REQ_PORT:
4793 network_port = self.cfg.AllocatePort()
4797 ##if self.op.vnc_bind_address is None:
4798 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4800 # this is needed because os.path.join does not accept None arguments
4801 if self.op.file_storage_dir is None:
4802 string_file_storage_dir = ""
4804 string_file_storage_dir = self.op.file_storage_dir
4806 # build the full file storage dir path
4807 file_storage_dir = os.path.normpath(os.path.join(
4808 self.cfg.GetFileStorageDir(),
4809 string_file_storage_dir, instance))
4812 disks = _GenerateDiskTemplate(self,
4813 self.op.disk_template,
4814 instance, pnode_name,
4818 self.op.file_driver,
4821 iobj = objects.Instance(name=instance, os=self.op.os_type,
4822 primary_node=pnode_name,
4823 nics=self.nics, disks=disks,
4824 disk_template=self.op.disk_template,
4826 network_port=network_port,
4827 beparams=self.op.beparams,
4828 hvparams=self.op.hvparams,
4829 hypervisor=self.op.hypervisor,
4832 feedback_fn("* creating instance disks...")
4834 _CreateDisks(self, iobj)
4835 except errors.OpExecError:
4836 self.LogWarning("Device creation failed, reverting...")
4838 _RemoveDisks(self, iobj)
4840 self.cfg.ReleaseDRBDMinors(instance)
4843 feedback_fn("adding instance %s to cluster config" % instance)
4845 self.cfg.AddInstance(iobj)
4846 # Declare that we don't want to remove the instance lock anymore, as we've
4847 # added the instance to the config
4848 del self.remove_locks[locking.LEVEL_INSTANCE]
4849 # Unlock all the nodes
4850 if self.op.mode == constants.INSTANCE_IMPORT:
4851 nodes_keep = [self.op.src_node]
4852 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4853 if node != self.op.src_node]
4854 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4855 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4857 self.context.glm.release(locking.LEVEL_NODE)
4858 del self.acquired_locks[locking.LEVEL_NODE]
4860 if self.op.wait_for_sync:
4861 disk_abort = not _WaitForSync(self, iobj)
4862 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4863 # make sure the disks are not degraded (still sync-ing is ok)
4865 feedback_fn("* checking mirrors status")
4866 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4871 _RemoveDisks(self, iobj)
4872 self.cfg.RemoveInstance(iobj.name)
4873 # Make sure the instance lock gets removed
4874 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4875 raise errors.OpExecError("There are some degraded disks for"
4878 feedback_fn("creating os for instance %s on node %s" %
4879 (instance, pnode_name))
4881 if iobj.disk_template != constants.DT_DISKLESS:
4882 if self.op.mode == constants.INSTANCE_CREATE:
4883 feedback_fn("* running the instance OS create scripts...")
4884 result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4885 msg = result.RemoteFailMsg()
4887 raise errors.OpExecError("Could not add os for instance %s"
4889 (instance, pnode_name, msg))
4891 elif self.op.mode == constants.INSTANCE_IMPORT:
4892 feedback_fn("* running the instance OS import scripts...")
4893 src_node = self.op.src_node
4894 src_images = self.src_images
4895 cluster_name = self.cfg.GetClusterName()
4896 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4897 src_node, src_images,
4899 import_result.Raise()
4900 for idx, result in enumerate(import_result.data):
4902 self.LogWarning("Could not import the image %s for instance"
4903 " %s, disk %d, on node %s" %
4904 (src_images[idx], instance, idx, pnode_name))
4906 # also checked in the prereq part
4907 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4911 iobj.admin_up = True
4912 self.cfg.Update(iobj)
4913 logging.info("Starting instance %s on node %s", instance, pnode_name)
4914 feedback_fn("* starting instance...")
4915 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4916 msg = result.RemoteFailMsg()
4918 raise errors.OpExecError("Could not start instance: %s" % msg)
4921 class LUConnectConsole(NoHooksLU):
4922 """Connect to an instance's console.
4924 This is somewhat special in that it returns the command line that
4925 you need to run on the master node in order to connect to the
4929 _OP_REQP = ["instance_name"]
4932 def ExpandNames(self):
4933 self._ExpandAndLockInstance()
4935 def CheckPrereq(self):
4936 """Check prerequisites.
4938 This checks that the instance is in the cluster.
4941 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4942 assert self.instance is not None, \
4943 "Cannot retrieve locked instance %s" % self.op.instance_name
4944 _CheckNodeOnline(self, self.instance.primary_node)
4946 def Exec(self, feedback_fn):
4947 """Connect to the console of an instance
4950 instance = self.instance
4951 node = instance.primary_node
4953 node_insts = self.rpc.call_instance_list([node],
4954 [instance.hypervisor])[node]
4957 if instance.name not in node_insts.data:
4958 raise errors.OpExecError("Instance %s is not running." % instance.name)
4960 logging.debug("Connecting to console of %s on %s", instance.name, node)
4962 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4963 cluster = self.cfg.GetClusterInfo()
4964 # beparams and hvparams are passed separately, to avoid editing the
4965 # instance and then saving the defaults in the instance itself.
4966 hvparams = cluster.FillHV(instance)
4967 beparams = cluster.FillBE(instance)
4968 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4971 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4974 class LUReplaceDisks(LogicalUnit):
4975 """Replace the disks of an instance.
4978 HPATH = "mirrors-replace"
4979 HTYPE = constants.HTYPE_INSTANCE
4980 _OP_REQP = ["instance_name", "mode", "disks"]
4983 def CheckArguments(self):
4984 if not hasattr(self.op, "remote_node"):
4985 self.op.remote_node = None
4986 if not hasattr(self.op, "iallocator"):
4987 self.op.iallocator = None
4989 # check for valid parameter combination
4990 cnt = [self.op.remote_node, self.op.iallocator].count(None)
4991 if self.op.mode == constants.REPLACE_DISK_CHG:
4993 raise errors.OpPrereqError("When changing the secondary either an"
4994 " iallocator script must be used or the"
4997 raise errors.OpPrereqError("Give either the iallocator or the new"
4998 " secondary, not both")
4999 else: # not replacing the secondary
5001 raise errors.OpPrereqError("The iallocator and new node options can"
5002 " be used only when changing the"
5005 def ExpandNames(self):
5006 self._ExpandAndLockInstance()
5008 if self.op.iallocator is not None:
5009 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5010 elif self.op.remote_node is not None:
5011 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5012 if remote_node is None:
5013 raise errors.OpPrereqError("Node '%s' not known" %
5014 self.op.remote_node)
5015 self.op.remote_node = remote_node
5016 # Warning: do not remove the locking of the new secondary here
5017 # unless DRBD8.AddChildren is changed to work in parallel;
5018 # currently it doesn't since parallel invocations of
5019 # FindUnusedMinor will conflict
5020 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5021 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5023 self.needed_locks[locking.LEVEL_NODE] = []
5024 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5026 def DeclareLocks(self, level):
5027 # If we're not already locking all nodes in the set we have to declare the
5028 # instance's primary/secondary nodes.
5029 if (level == locking.LEVEL_NODE and
5030 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5031 self._LockInstancesNodes()
5033 def _RunAllocator(self):
5034 """Compute a new secondary node using an IAllocator.
5037 ial = IAllocator(self,
5038 mode=constants.IALLOCATOR_MODE_RELOC,
5039 name=self.op.instance_name,
5040 relocate_from=[self.sec_node])
5042 ial.Run(self.op.iallocator)
5045 raise errors.OpPrereqError("Can't compute nodes using"
5046 " iallocator '%s': %s" % (self.op.iallocator,
5048 if len(ial.nodes) != ial.required_nodes:
5049 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5050 " of nodes (%s), required %s" %
5051 (len(ial.nodes), ial.required_nodes))
5052 self.op.remote_node = ial.nodes[0]
5053 self.LogInfo("Selected new secondary for the instance: %s",
5054 self.op.remote_node)
5056 def BuildHooksEnv(self):
5059 This runs on the master, the primary and all the secondaries.
5063 "MODE": self.op.mode,
5064 "NEW_SECONDARY": self.op.remote_node,
5065 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5067 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5069 self.cfg.GetMasterNode(),
5070 self.instance.primary_node,
5072 if self.op.remote_node is not None:
5073 nl.append(self.op.remote_node)
5076 def CheckPrereq(self):
5077 """Check prerequisites.
5079 This checks that the instance is in the cluster.
5082 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5083 assert instance is not None, \
5084 "Cannot retrieve locked instance %s" % self.op.instance_name
5085 self.instance = instance
5087 if instance.disk_template != constants.DT_DRBD8:
5088 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5091 if len(instance.secondary_nodes) != 1:
5092 raise errors.OpPrereqError("The instance has a strange layout,"
5093 " expected one secondary but found %d" %
5094 len(instance.secondary_nodes))
5096 self.sec_node = instance.secondary_nodes[0]
5098 if self.op.iallocator is not None:
5099 self._RunAllocator()
5101 remote_node = self.op.remote_node
5102 if remote_node is not None:
5103 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5104 assert self.remote_node_info is not None, \
5105 "Cannot retrieve locked node %s" % remote_node
5107 self.remote_node_info = None
5108 if remote_node == instance.primary_node:
5109 raise errors.OpPrereqError("The specified node is the primary node of"
5111 elif remote_node == self.sec_node:
5112 raise errors.OpPrereqError("The specified node is already the"
5113 " secondary node of the instance.")
5115 if self.op.mode == constants.REPLACE_DISK_PRI:
5116 n1 = self.tgt_node = instance.primary_node
5117 n2 = self.oth_node = self.sec_node
5118 elif self.op.mode == constants.REPLACE_DISK_SEC:
5119 n1 = self.tgt_node = self.sec_node
5120 n2 = self.oth_node = instance.primary_node
5121 elif self.op.mode == constants.REPLACE_DISK_CHG:
5122 n1 = self.new_node = remote_node
5123 n2 = self.oth_node = instance.primary_node
5124 self.tgt_node = self.sec_node
5125 _CheckNodeNotDrained(self, remote_node)
5127 raise errors.ProgrammerError("Unhandled disk replace mode")
5129 _CheckNodeOnline(self, n1)
5130 _CheckNodeOnline(self, n2)
5132 if not self.op.disks:
5133 self.op.disks = range(len(instance.disks))
5135 for disk_idx in self.op.disks:
5136 instance.FindDisk(disk_idx)
5138 def _ExecD8DiskOnly(self, feedback_fn):
5139 """Replace a disk on the primary or secondary for dbrd8.
5141 The algorithm for replace is quite complicated:
5143 1. for each disk to be replaced:
5145 1. create new LVs on the target node with unique names
5146 1. detach old LVs from the drbd device
5147 1. rename old LVs to name_replaced.<time_t>
5148 1. rename new LVs to old LVs
5149 1. attach the new LVs (with the old names now) to the drbd device
5151 1. wait for sync across all devices
5153 1. for each modified disk:
5155 1. remove old LVs (which have the name name_replaces.<time_t>)
5157 Failures are not very well handled.
5161 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5162 instance = self.instance
5164 vgname = self.cfg.GetVGName()
5167 tgt_node = self.tgt_node
5168 oth_node = self.oth_node
5170 # Step: check device activation
5171 self.proc.LogStep(1, steps_total, "check device existence")
5172 info("checking volume groups")
5173 my_vg = cfg.GetVGName()
5174 results = self.rpc.call_vg_list([oth_node, tgt_node])
5176 raise errors.OpExecError("Can't list volume groups on the nodes")
5177 for node in oth_node, tgt_node:
5179 if res.failed or not res.data or my_vg not in res.data:
5180 raise errors.OpExecError("Volume group '%s' not found on %s" %
5182 for idx, dev in enumerate(instance.disks):
5183 if idx not in self.op.disks:
5185 for node in tgt_node, oth_node:
5186 info("checking disk/%d on %s" % (idx, node))
5187 cfg.SetDiskID(dev, node)
5188 result = self.rpc.call_blockdev_find(node, dev)
5189 msg = result.RemoteFailMsg()
5190 if not msg and not result.payload:
5191 msg = "disk not found"
5193 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5196 # Step: check other node consistency
5197 self.proc.LogStep(2, steps_total, "check peer consistency")
5198 for idx, dev in enumerate(instance.disks):
5199 if idx not in self.op.disks:
5201 info("checking disk/%d consistency on %s" % (idx, oth_node))
5202 if not _CheckDiskConsistency(self, dev, oth_node,
5203 oth_node==instance.primary_node):
5204 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5205 " to replace disks on this node (%s)" %
5206 (oth_node, tgt_node))
5208 # Step: create new storage
5209 self.proc.LogStep(3, steps_total, "allocate new storage")
5210 for idx, dev in enumerate(instance.disks):
5211 if idx not in self.op.disks:
5214 cfg.SetDiskID(dev, tgt_node)
5215 lv_names = [".disk%d_%s" % (idx, suf)
5216 for suf in ["data", "meta"]]
5217 names = _GenerateUniqueNames(self, lv_names)
5218 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5219 logical_id=(vgname, names[0]))
5220 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5221 logical_id=(vgname, names[1]))
5222 new_lvs = [lv_data, lv_meta]
5223 old_lvs = dev.children
5224 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5225 info("creating new local storage on %s for %s" %
5226 (tgt_node, dev.iv_name))
5227 # we pass force_create=True to force the LVM creation
5228 for new_lv in new_lvs:
5229 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5230 _GetInstanceInfoText(instance), False)
5232 # Step: for each lv, detach+rename*2+attach
5233 self.proc.LogStep(4, steps_total, "change drbd configuration")
5234 for dev, old_lvs, new_lvs in iv_names.itervalues():
5235 info("detaching %s drbd from local storage" % dev.iv_name)
5236 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5237 msg = result.RemoteFailMsg()
5239 raise errors.OpExecError("Can't detach drbd from local storage on node"
5240 " %s for device %s: %s" %
5241 (tgt_node, dev.iv_name, msg))
5243 #cfg.Update(instance)
5245 # ok, we created the new LVs, so now we know we have the needed
5246 # storage; as such, we proceed on the target node to rename
5247 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5248 # using the assumption that logical_id == physical_id (which in
5249 # turn is the unique_id on that node)
5251 # FIXME(iustin): use a better name for the replaced LVs
5252 temp_suffix = int(time.time())
5253 ren_fn = lambda d, suff: (d.physical_id[0],
5254 d.physical_id[1] + "_replaced-%s" % suff)
5255 # build the rename list based on what LVs exist on the node
5257 for to_ren in old_lvs:
5258 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5259 if not result.RemoteFailMsg() and result.payload:
5261 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5263 info("renaming the old LVs on the target node")
5264 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5265 msg = result.RemoteFailMsg()
5267 raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5269 # now we rename the new LVs to the old LVs
5270 info("renaming the new LVs on the target node")
5271 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5272 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5273 msg = result.RemoteFailMsg()
5275 raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5278 for old, new in zip(old_lvs, new_lvs):
5279 new.logical_id = old.logical_id
5280 cfg.SetDiskID(new, tgt_node)
5282 for disk in old_lvs:
5283 disk.logical_id = ren_fn(disk, temp_suffix)
5284 cfg.SetDiskID(disk, tgt_node)
5286 # now that the new lvs have the old name, we can add them to the device
5287 info("adding new mirror component on %s" % tgt_node)
5288 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5289 msg = result.RemoteFailMsg()
5291 for new_lv in new_lvs:
5292 msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5294 warning("Can't rollback device %s: %s", dev, msg,
5295 hint="cleanup manually the unused logical volumes")
5296 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5298 dev.children = new_lvs
5299 cfg.Update(instance)
5301 # Step: wait for sync
5303 # this can fail as the old devices are degraded and _WaitForSync
5304 # does a combined result over all disks, so we don't check its
5306 self.proc.LogStep(5, steps_total, "sync devices")
5307 _WaitForSync(self, instance, unlock=True)
5309 # so check manually all the devices
5310 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5311 cfg.SetDiskID(dev, instance.primary_node)
5312 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5313 msg = result.RemoteFailMsg()
5314 if not msg and not result.payload:
5315 msg = "disk not found"
5317 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5319 if result.payload[5]:
5320 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5322 # Step: remove old storage
5323 self.proc.LogStep(6, steps_total, "removing old storage")
5324 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5325 info("remove logical volumes for %s" % name)
5327 cfg.SetDiskID(lv, tgt_node)
5328 msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5330 warning("Can't remove old LV: %s" % msg,
5331 hint="manually remove unused LVs")
5334 def _ExecD8Secondary(self, feedback_fn):
5335 """Replace the secondary node for drbd8.
5337 The algorithm for replace is quite complicated:
5338 - for all disks of the instance:
5339 - create new LVs on the new node with same names
5340 - shutdown the drbd device on the old secondary
5341 - disconnect the drbd network on the primary
5342 - create the drbd device on the new secondary
5343 - network attach the drbd on the primary, using an artifice:
5344 the drbd code for Attach() will connect to the network if it
5345 finds a device which is connected to the good local disks but
5347 - wait for sync across all devices
5348 - remove all disks from the old secondary
5350 Failures are not very well handled.
5354 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5355 instance = self.instance
5359 old_node = self.tgt_node
5360 new_node = self.new_node
5361 pri_node = instance.primary_node
5363 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5364 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5365 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5368 # Step: check device activation
5369 self.proc.LogStep(1, steps_total, "check device existence")
5370 info("checking volume groups")
5371 my_vg = cfg.GetVGName()
5372 results = self.rpc.call_vg_list([pri_node, new_node])
5373 for node in pri_node, new_node:
5375 if res.failed or not res.data or my_vg not in res.data:
5376 raise errors.OpExecError("Volume group '%s' not found on %s" %
5378 for idx, dev in enumerate(instance.disks):
5379 if idx not in self.op.disks:
5381 info("checking disk/%d on %s" % (idx, pri_node))
5382 cfg.SetDiskID(dev, pri_node)
5383 result = self.rpc.call_blockdev_find(pri_node, dev)
5384 msg = result.RemoteFailMsg()
5385 if not msg and not result.payload:
5386 msg = "disk not found"
5388 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5389 (idx, pri_node, msg))
5391 # Step: check other node consistency
5392 self.proc.LogStep(2, steps_total, "check peer consistency")
5393 for idx, dev in enumerate(instance.disks):
5394 if idx not in self.op.disks:
5396 info("checking disk/%d consistency on %s" % (idx, pri_node))
5397 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5398 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5399 " unsafe to replace the secondary" %
5402 # Step: create new storage
5403 self.proc.LogStep(3, steps_total, "allocate new storage")
5404 for idx, dev in enumerate(instance.disks):
5405 info("adding new local storage on %s for disk/%d" %
5407 # we pass force_create=True to force LVM creation
5408 for new_lv in dev.children:
5409 _CreateBlockDev(self, new_node, instance, new_lv, True,
5410 _GetInstanceInfoText(instance), False)
5412 # Step 4: dbrd minors and drbd setups changes
5413 # after this, we must manually remove the drbd minors on both the
5414 # error and the success paths
5415 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5417 logging.debug("Allocated minors %s" % (minors,))
5418 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5419 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5421 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5422 # create new devices on new_node; note that we create two IDs:
5423 # one without port, so the drbd will be activated without
5424 # networking information on the new node at this stage, and one
5425 # with network, for the latter activation in step 4
5426 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5427 if pri_node == o_node1:
5432 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5433 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5435 iv_names[idx] = (dev, dev.children, new_net_id)
5436 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5438 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5439 logical_id=new_alone_id,
5440 children=dev.children)
5442 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5443 _GetInstanceInfoText(instance), False)
5444 except errors.GenericError:
5445 self.cfg.ReleaseDRBDMinors(instance.name)
5448 for idx, dev in enumerate(instance.disks):
5449 # we have new devices, shutdown the drbd on the old secondary
5450 info("shutting down drbd for disk/%d on old node" % idx)
5451 cfg.SetDiskID(dev, old_node)
5452 msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5454 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5456 hint="Please cleanup this device manually as soon as possible")
5458 info("detaching primary drbds from the network (=> standalone)")
5459 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5460 instance.disks)[pri_node]
5462 msg = result.RemoteFailMsg()
5464 # detaches didn't succeed (unlikely)
5465 self.cfg.ReleaseDRBDMinors(instance.name)
5466 raise errors.OpExecError("Can't detach the disks from the network on"
5467 " old node: %s" % (msg,))
5469 # if we managed to detach at least one, we update all the disks of
5470 # the instance to point to the new secondary
5471 info("updating instance configuration")
5472 for dev, _, new_logical_id in iv_names.itervalues():
5473 dev.logical_id = new_logical_id
5474 cfg.SetDiskID(dev, pri_node)
5475 cfg.Update(instance)
5477 # and now perform the drbd attach
5478 info("attaching primary drbds to new secondary (standalone => connected)")
5479 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5480 instance.disks, instance.name,
5482 for to_node, to_result in result.items():
5483 msg = to_result.RemoteFailMsg()
5485 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5486 hint="please do a gnt-instance info to see the"
5489 # this can fail as the old devices are degraded and _WaitForSync
5490 # does a combined result over all disks, so we don't check its
5492 self.proc.LogStep(5, steps_total, "sync devices")
5493 _WaitForSync(self, instance, unlock=True)
5495 # so check manually all the devices
5496 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5497 cfg.SetDiskID(dev, pri_node)
5498 result = self.rpc.call_blockdev_find(pri_node, dev)
5499 msg = result.RemoteFailMsg()
5500 if not msg and not result.payload:
5501 msg = "disk not found"
5503 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5505 if result.payload[5]:
5506 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5508 self.proc.LogStep(6, steps_total, "removing old storage")
5509 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5510 info("remove logical volumes for disk/%d" % idx)
5512 cfg.SetDiskID(lv, old_node)
5513 msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5515 warning("Can't remove LV on old secondary: %s", msg,
5516 hint="Cleanup stale volumes by hand")
5518 def Exec(self, feedback_fn):
5519 """Execute disk replacement.
5521 This dispatches the disk replacement to the appropriate handler.
5524 instance = self.instance
5526 # Activate the instance disks if we're replacing them on a down instance
5527 if not instance.admin_up:
5528 _StartInstanceDisks(self, instance, True)
5530 if self.op.mode == constants.REPLACE_DISK_CHG:
5531 fn = self._ExecD8Secondary
5533 fn = self._ExecD8DiskOnly
5535 ret = fn(feedback_fn)
5537 # Deactivate the instance disks if we're replacing them on a down instance
5538 if not instance.admin_up:
5539 _SafeShutdownInstanceDisks(self, instance)
5544 class LUGrowDisk(LogicalUnit):
5545 """Grow a disk of an instance.
5549 HTYPE = constants.HTYPE_INSTANCE
5550 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5553 def ExpandNames(self):
5554 self._ExpandAndLockInstance()
5555 self.needed_locks[locking.LEVEL_NODE] = []
5556 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5558 def DeclareLocks(self, level):
5559 if level == locking.LEVEL_NODE:
5560 self._LockInstancesNodes()
5562 def BuildHooksEnv(self):
5565 This runs on the master, the primary and all the secondaries.
5569 "DISK": self.op.disk,
5570 "AMOUNT": self.op.amount,
5572 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5574 self.cfg.GetMasterNode(),
5575 self.instance.primary_node,
5579 def CheckPrereq(self):
5580 """Check prerequisites.
5582 This checks that the instance is in the cluster.
5585 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5586 assert instance is not None, \
5587 "Cannot retrieve locked instance %s" % self.op.instance_name
5588 nodenames = list(instance.all_nodes)
5589 for node in nodenames:
5590 _CheckNodeOnline(self, node)
5593 self.instance = instance
5595 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5596 raise errors.OpPrereqError("Instance's disk layout does not support"
5599 self.disk = instance.FindDisk(self.op.disk)
5601 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5602 instance.hypervisor)
5603 for node in nodenames:
5604 info = nodeinfo[node]
5605 if info.failed or not info.data:
5606 raise errors.OpPrereqError("Cannot get current information"
5607 " from node '%s'" % node)
5608 vg_free = info.data.get('vg_free', None)
5609 if not isinstance(vg_free, int):
5610 raise errors.OpPrereqError("Can't compute free disk space on"
5612 if self.op.amount > vg_free:
5613 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5614 " %d MiB available, %d MiB required" %
5615 (node, vg_free, self.op.amount))
5617 def Exec(self, feedback_fn):
5618 """Execute disk grow.
5621 instance = self.instance
5623 for node in instance.all_nodes:
5624 self.cfg.SetDiskID(disk, node)
5625 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5626 msg = result.RemoteFailMsg()
5628 raise errors.OpExecError("Grow request failed to node %s: %s" %
5630 disk.RecordGrow(self.op.amount)
5631 self.cfg.Update(instance)
5632 if self.op.wait_for_sync:
5633 disk_abort = not _WaitForSync(self, instance)
5635 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5636 " status.\nPlease check the instance.")
5639 class LUQueryInstanceData(NoHooksLU):
5640 """Query runtime instance data.
5643 _OP_REQP = ["instances", "static"]
5646 def ExpandNames(self):
5647 self.needed_locks = {}
5648 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5650 if not isinstance(self.op.instances, list):
5651 raise errors.OpPrereqError("Invalid argument type 'instances'")
5653 if self.op.instances:
5654 self.wanted_names = []
5655 for name in self.op.instances:
5656 full_name = self.cfg.ExpandInstanceName(name)
5657 if full_name is None:
5658 raise errors.OpPrereqError("Instance '%s' not known" % name)
5659 self.wanted_names.append(full_name)
5660 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5662 self.wanted_names = None
5663 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5665 self.needed_locks[locking.LEVEL_NODE] = []
5666 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5668 def DeclareLocks(self, level):
5669 if level == locking.LEVEL_NODE:
5670 self._LockInstancesNodes()
5672 def CheckPrereq(self):
5673 """Check prerequisites.
5675 This only checks the optional instance list against the existing names.
5678 if self.wanted_names is None:
5679 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5681 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5682 in self.wanted_names]
5685 def _ComputeDiskStatus(self, instance, snode, dev):
5686 """Compute block device status.
5689 static = self.op.static
5691 self.cfg.SetDiskID(dev, instance.primary_node)
5692 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5693 if dev_pstatus.offline:
5696 msg = dev_pstatus.RemoteFailMsg()
5698 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5699 (instance.name, msg))
5700 dev_pstatus = dev_pstatus.payload
5704 if dev.dev_type in constants.LDS_DRBD:
5705 # we change the snode then (otherwise we use the one passed in)
5706 if dev.logical_id[0] == instance.primary_node:
5707 snode = dev.logical_id[1]
5709 snode = dev.logical_id[0]
5711 if snode and not static:
5712 self.cfg.SetDiskID(dev, snode)
5713 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5714 if dev_sstatus.offline:
5717 msg = dev_sstatus.RemoteFailMsg()
5719 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5720 (instance.name, msg))
5721 dev_sstatus = dev_sstatus.payload
5726 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5727 for child in dev.children]
5732 "iv_name": dev.iv_name,
5733 "dev_type": dev.dev_type,
5734 "logical_id": dev.logical_id,
5735 "physical_id": dev.physical_id,
5736 "pstatus": dev_pstatus,
5737 "sstatus": dev_sstatus,
5738 "children": dev_children,
5744 def Exec(self, feedback_fn):
5745 """Gather and return data"""
5748 cluster = self.cfg.GetClusterInfo()
5750 for instance in self.wanted_instances:
5751 if not self.op.static:
5752 remote_info = self.rpc.call_instance_info(instance.primary_node,
5754 instance.hypervisor)
5756 remote_info = remote_info.data
5757 if remote_info and "state" in remote_info:
5760 remote_state = "down"
5763 if instance.admin_up:
5766 config_state = "down"
5768 disks = [self._ComputeDiskStatus(instance, None, device)
5769 for device in instance.disks]
5772 "name": instance.name,
5773 "config_state": config_state,
5774 "run_state": remote_state,
5775 "pnode": instance.primary_node,
5776 "snodes": instance.secondary_nodes,
5778 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5780 "hypervisor": instance.hypervisor,
5781 "network_port": instance.network_port,
5782 "hv_instance": instance.hvparams,
5783 "hv_actual": cluster.FillHV(instance),
5784 "be_instance": instance.beparams,
5785 "be_actual": cluster.FillBE(instance),
5788 result[instance.name] = idict
5793 class LUSetInstanceParams(LogicalUnit):
5794 """Modifies an instances's parameters.
5797 HPATH = "instance-modify"
5798 HTYPE = constants.HTYPE_INSTANCE
5799 _OP_REQP = ["instance_name"]
5802 def CheckArguments(self):
5803 if not hasattr(self.op, 'nics'):
5805 if not hasattr(self.op, 'disks'):
5807 if not hasattr(self.op, 'beparams'):
5808 self.op.beparams = {}
5809 if not hasattr(self.op, 'hvparams'):
5810 self.op.hvparams = {}
5811 self.op.force = getattr(self.op, "force", False)
5812 if not (self.op.nics or self.op.disks or
5813 self.op.hvparams or self.op.beparams):
5814 raise errors.OpPrereqError("No changes submitted")
5818 for disk_op, disk_dict in self.op.disks:
5819 if disk_op == constants.DDM_REMOVE:
5822 elif disk_op == constants.DDM_ADD:
5825 if not isinstance(disk_op, int):
5826 raise errors.OpPrereqError("Invalid disk index")
5827 if disk_op == constants.DDM_ADD:
5828 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5829 if mode not in constants.DISK_ACCESS_SET:
5830 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5831 size = disk_dict.get('size', None)
5833 raise errors.OpPrereqError("Required disk parameter size missing")
5836 except ValueError, err:
5837 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5839 disk_dict['size'] = size
5841 # modification of disk
5842 if 'size' in disk_dict:
5843 raise errors.OpPrereqError("Disk size change not possible, use"
5846 if disk_addremove > 1:
5847 raise errors.OpPrereqError("Only one disk add or remove operation"
5848 " supported at a time")
5852 for nic_op, nic_dict in self.op.nics:
5853 if nic_op == constants.DDM_REMOVE:
5856 elif nic_op == constants.DDM_ADD:
5859 if not isinstance(nic_op, int):
5860 raise errors.OpPrereqError("Invalid nic index")
5862 # nic_dict should be a dict
5863 nic_ip = nic_dict.get('ip', None)
5864 if nic_ip is not None:
5865 if nic_ip.lower() == constants.VALUE_NONE:
5866 nic_dict['ip'] = None
5868 if not utils.IsValidIP(nic_ip):
5869 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5871 if nic_op == constants.DDM_ADD:
5872 nic_bridge = nic_dict.get('bridge', None)
5873 if nic_bridge is None:
5874 nic_dict['bridge'] = self.cfg.GetDefBridge()
5875 nic_mac = nic_dict.get('mac', None)
5877 nic_dict['mac'] = constants.VALUE_AUTO
5879 if 'mac' in nic_dict:
5880 nic_mac = nic_dict['mac']
5881 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5882 if not utils.IsValidMac(nic_mac):
5883 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5884 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5885 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5886 " modifying an existing nic")
5888 if nic_addremove > 1:
5889 raise errors.OpPrereqError("Only one NIC add or remove operation"
5890 " supported at a time")
5892 def ExpandNames(self):
5893 self._ExpandAndLockInstance()
5894 self.needed_locks[locking.LEVEL_NODE] = []
5895 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5897 def DeclareLocks(self, level):
5898 if level == locking.LEVEL_NODE:
5899 self._LockInstancesNodes()
5901 def BuildHooksEnv(self):
5904 This runs on the master, primary and secondaries.
5908 if constants.BE_MEMORY in self.be_new:
5909 args['memory'] = self.be_new[constants.BE_MEMORY]
5910 if constants.BE_VCPUS in self.be_new:
5911 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5912 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5913 # information at all.
5916 nic_override = dict(self.op.nics)
5917 for idx, nic in enumerate(self.instance.nics):
5918 if idx in nic_override:
5919 this_nic_override = nic_override[idx]
5921 this_nic_override = {}
5922 if 'ip' in this_nic_override:
5923 ip = this_nic_override['ip']
5926 if 'bridge' in this_nic_override:
5927 bridge = this_nic_override['bridge']
5930 if 'mac' in this_nic_override:
5931 mac = this_nic_override['mac']
5934 args['nics'].append((ip, bridge, mac))
5935 if constants.DDM_ADD in nic_override:
5936 ip = nic_override[constants.DDM_ADD].get('ip', None)
5937 bridge = nic_override[constants.DDM_ADD]['bridge']
5938 mac = nic_override[constants.DDM_ADD]['mac']
5939 args['nics'].append((ip, bridge, mac))
5940 elif constants.DDM_REMOVE in nic_override:
5941 del args['nics'][-1]
5943 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5944 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5947 def CheckPrereq(self):
5948 """Check prerequisites.
5950 This only checks the instance list against the existing names.
5953 force = self.force = self.op.force
5955 # checking the new params on the primary/secondary nodes
5957 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5958 assert self.instance is not None, \
5959 "Cannot retrieve locked instance %s" % self.op.instance_name
5960 pnode = instance.primary_node
5961 nodelist = list(instance.all_nodes)
5963 # hvparams processing
5964 if self.op.hvparams:
5965 i_hvdict = copy.deepcopy(instance.hvparams)
5966 for key, val in self.op.hvparams.iteritems():
5967 if val == constants.VALUE_DEFAULT:
5974 cluster = self.cfg.GetClusterInfo()
5975 utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5976 hv_new = objects.FillDict(cluster.hvparams[instance.hypervisor],
5979 hypervisor.GetHypervisor(
5980 instance.hypervisor).CheckParameterSyntax(hv_new)
5981 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5982 self.hv_new = hv_new # the new actual values
5983 self.hv_inst = i_hvdict # the new dict (without defaults)
5985 self.hv_new = self.hv_inst = {}
5987 # beparams processing
5988 if self.op.beparams:
5989 i_bedict = copy.deepcopy(instance.beparams)
5990 for key, val in self.op.beparams.iteritems():
5991 if val == constants.VALUE_DEFAULT:
5998 cluster = self.cfg.GetClusterInfo()
5999 utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6000 be_new = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
6002 self.be_new = be_new # the new actual values
6003 self.be_inst = i_bedict # the new dict (without defaults)
6005 self.be_new = self.be_inst = {}
6009 if constants.BE_MEMORY in self.op.beparams and not self.force:
6010 mem_check_list = [pnode]
6011 if be_new[constants.BE_AUTO_BALANCE]:
6012 # either we changed auto_balance to yes or it was from before
6013 mem_check_list.extend(instance.secondary_nodes)
6014 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6015 instance.hypervisor)
6016 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6017 instance.hypervisor)
6018 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6019 # Assume the primary node is unreachable and go ahead
6020 self.warn.append("Can't get info from primary node %s" % pnode)
6022 if not instance_info.failed and instance_info.data:
6023 current_mem = int(instance_info.data['memory'])
6025 # Assume instance not running
6026 # (there is a slight race condition here, but it's not very probable,
6027 # and we have no other way to check)
6029 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6030 nodeinfo[pnode].data['memory_free'])
6032 raise errors.OpPrereqError("This change will prevent the instance"
6033 " from starting, due to %d MB of memory"
6034 " missing on its primary node" % miss_mem)
6036 if be_new[constants.BE_AUTO_BALANCE]:
6037 for node, nres in nodeinfo.iteritems():
6038 if node not in instance.secondary_nodes:
6040 if nres.failed or not isinstance(nres.data, dict):
6041 self.warn.append("Can't get info from secondary node %s" % node)
6042 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6043 self.warn.append("Not enough memory to failover instance to"
6044 " secondary node %s" % node)
6047 for nic_op, nic_dict in self.op.nics:
6048 if nic_op == constants.DDM_REMOVE:
6049 if not instance.nics:
6050 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6052 if nic_op != constants.DDM_ADD:
6054 if nic_op < 0 or nic_op >= len(instance.nics):
6055 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6057 (nic_op, len(instance.nics)))
6058 if 'bridge' in nic_dict:
6059 nic_bridge = nic_dict['bridge']
6060 if nic_bridge is None:
6061 raise errors.OpPrereqError('Cannot set the nic bridge to None')
6062 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6063 msg = ("Bridge '%s' doesn't exist on one of"
6064 " the instance nodes" % nic_bridge)
6066 self.warn.append(msg)
6068 raise errors.OpPrereqError(msg)
6069 if 'mac' in nic_dict:
6070 nic_mac = nic_dict['mac']
6072 raise errors.OpPrereqError('Cannot set the nic mac to None')
6073 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6074 # otherwise generate the mac
6075 nic_dict['mac'] = self.cfg.GenerateMAC()
6077 # or validate/reserve the current one
6078 if self.cfg.IsMacInUse(nic_mac):
6079 raise errors.OpPrereqError("MAC address %s already in use"
6080 " in cluster" % nic_mac)
6083 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6084 raise errors.OpPrereqError("Disk operations not supported for"
6085 " diskless instances")
6086 for disk_op, disk_dict in self.op.disks:
6087 if disk_op == constants.DDM_REMOVE:
6088 if len(instance.disks) == 1:
6089 raise errors.OpPrereqError("Cannot remove the last disk of"
6091 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6092 ins_l = ins_l[pnode]
6093 if ins_l.failed or not isinstance(ins_l.data, list):
6094 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6095 if instance.name in ins_l.data:
6096 raise errors.OpPrereqError("Instance is running, can't remove"
6099 if (disk_op == constants.DDM_ADD and
6100 len(instance.nics) >= constants.MAX_DISKS):
6101 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6102 " add more" % constants.MAX_DISKS)
6103 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6105 if disk_op < 0 or disk_op >= len(instance.disks):
6106 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6108 (disk_op, len(instance.disks)))
6112 def Exec(self, feedback_fn):
6113 """Modifies an instance.
6115 All parameters take effect only at the next restart of the instance.
6118 # Process here the warnings from CheckPrereq, as we don't have a
6119 # feedback_fn there.
6120 for warn in self.warn:
6121 feedback_fn("WARNING: %s" % warn)
6124 instance = self.instance
6126 for disk_op, disk_dict in self.op.disks:
6127 if disk_op == constants.DDM_REMOVE:
6128 # remove the last disk
6129 device = instance.disks.pop()
6130 device_idx = len(instance.disks)
6131 for node, disk in device.ComputeNodeTree(instance.primary_node):
6132 self.cfg.SetDiskID(disk, node)
6133 msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6135 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6136 " continuing anyway", device_idx, node, msg)
6137 result.append(("disk/%d" % device_idx, "remove"))
6138 elif disk_op == constants.DDM_ADD:
6140 if instance.disk_template == constants.DT_FILE:
6141 file_driver, file_path = instance.disks[0].logical_id
6142 file_path = os.path.dirname(file_path)
6144 file_driver = file_path = None
6145 disk_idx_base = len(instance.disks)
6146 new_disk = _GenerateDiskTemplate(self,
6147 instance.disk_template,
6148 instance.name, instance.primary_node,
6149 instance.secondary_nodes,
6154 instance.disks.append(new_disk)
6155 info = _GetInstanceInfoText(instance)
6157 logging.info("Creating volume %s for instance %s",
6158 new_disk.iv_name, instance.name)
6159 # Note: this needs to be kept in sync with _CreateDisks
6161 for node in instance.all_nodes:
6162 f_create = node == instance.primary_node
6164 _CreateBlockDev(self, node, instance, new_disk,
6165 f_create, info, f_create)
6166 except errors.OpExecError, err:
6167 self.LogWarning("Failed to create volume %s (%s) on"
6169 new_disk.iv_name, new_disk, node, err)
6170 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6171 (new_disk.size, new_disk.mode)))
6173 # change a given disk
6174 instance.disks[disk_op].mode = disk_dict['mode']
6175 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6177 for nic_op, nic_dict in self.op.nics:
6178 if nic_op == constants.DDM_REMOVE:
6179 # remove the last nic
6180 del instance.nics[-1]
6181 result.append(("nic.%d" % len(instance.nics), "remove"))
6182 elif nic_op == constants.DDM_ADD:
6183 # mac and bridge should be set, by now
6184 mac = nic_dict['mac']
6185 bridge = nic_dict['bridge']
6186 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6188 instance.nics.append(new_nic)
6189 result.append(("nic.%d" % (len(instance.nics) - 1),
6190 "add:mac=%s,ip=%s,bridge=%s" %
6191 (new_nic.mac, new_nic.ip, new_nic.bridge)))
6193 # change a given nic
6194 for key in 'mac', 'ip', 'bridge':
6196 setattr(instance.nics[nic_op], key, nic_dict[key])
6197 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6200 if self.op.hvparams:
6201 instance.hvparams = self.hv_inst
6202 for key, val in self.op.hvparams.iteritems():
6203 result.append(("hv/%s" % key, val))
6206 if self.op.beparams:
6207 instance.beparams = self.be_inst
6208 for key, val in self.op.beparams.iteritems():
6209 result.append(("be/%s" % key, val))
6211 self.cfg.Update(instance)
6216 class LUQueryExports(NoHooksLU):
6217 """Query the exports list
6220 _OP_REQP = ['nodes']
6223 def ExpandNames(self):
6224 self.needed_locks = {}
6225 self.share_locks[locking.LEVEL_NODE] = 1
6226 if not self.op.nodes:
6227 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6229 self.needed_locks[locking.LEVEL_NODE] = \
6230 _GetWantedNodes(self, self.op.nodes)
6232 def CheckPrereq(self):
6233 """Check prerequisites.
6236 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6238 def Exec(self, feedback_fn):
6239 """Compute the list of all the exported system images.
6242 @return: a dictionary with the structure node->(export-list)
6243 where export-list is a list of the instances exported on
6247 rpcresult = self.rpc.call_export_list(self.nodes)
6249 for node in rpcresult:
6250 if rpcresult[node].failed:
6251 result[node] = False
6253 result[node] = rpcresult[node].data
6258 class LUExportInstance(LogicalUnit):
6259 """Export an instance to an image in the cluster.
6262 HPATH = "instance-export"
6263 HTYPE = constants.HTYPE_INSTANCE
6264 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6267 def ExpandNames(self):
6268 self._ExpandAndLockInstance()
6269 # FIXME: lock only instance primary and destination node
6271 # Sad but true, for now we have do lock all nodes, as we don't know where
6272 # the previous export might be, and and in this LU we search for it and
6273 # remove it from its current node. In the future we could fix this by:
6274 # - making a tasklet to search (share-lock all), then create the new one,
6275 # then one to remove, after
6276 # - removing the removal operation altoghether
6277 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6279 def DeclareLocks(self, level):
6280 """Last minute lock declaration."""
6281 # All nodes are locked anyway, so nothing to do here.
6283 def BuildHooksEnv(self):
6286 This will run on the master, primary node and target node.
6290 "EXPORT_NODE": self.op.target_node,
6291 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6293 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6294 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6295 self.op.target_node]
6298 def CheckPrereq(self):
6299 """Check prerequisites.
6301 This checks that the instance and node names are valid.
6304 instance_name = self.op.instance_name
6305 self.instance = self.cfg.GetInstanceInfo(instance_name)
6306 assert self.instance is not None, \
6307 "Cannot retrieve locked instance %s" % self.op.instance_name
6308 _CheckNodeOnline(self, self.instance.primary_node)
6310 self.dst_node = self.cfg.GetNodeInfo(
6311 self.cfg.ExpandNodeName(self.op.target_node))
6313 if self.dst_node is None:
6314 # This is wrong node name, not a non-locked node
6315 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6316 _CheckNodeOnline(self, self.dst_node.name)
6317 _CheckNodeNotDrained(self, self.dst_node.name)
6319 # instance disk type verification
6320 for disk in self.instance.disks:
6321 if disk.dev_type == constants.LD_FILE:
6322 raise errors.OpPrereqError("Export not supported for instances with"
6323 " file-based disks")
6325 def Exec(self, feedback_fn):
6326 """Export an instance to an image in the cluster.
6329 instance = self.instance
6330 dst_node = self.dst_node
6331 src_node = instance.primary_node
6332 if self.op.shutdown:
6333 # shutdown the instance, but not the disks
6334 result = self.rpc.call_instance_shutdown(src_node, instance)
6335 msg = result.RemoteFailMsg()
6337 raise errors.OpExecError("Could not shutdown instance %s on"
6339 (instance.name, src_node, msg))
6341 vgname = self.cfg.GetVGName()
6345 # set the disks ID correctly since call_instance_start needs the
6346 # correct drbd minor to create the symlinks
6347 for disk in instance.disks:
6348 self.cfg.SetDiskID(disk, src_node)
6351 for disk in instance.disks:
6352 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6353 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6354 if new_dev_name.failed or not new_dev_name.data:
6355 self.LogWarning("Could not snapshot block device %s on node %s",
6356 disk.logical_id[1], src_node)
6357 snap_disks.append(False)
6359 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6360 logical_id=(vgname, new_dev_name.data),
6361 physical_id=(vgname, new_dev_name.data),
6362 iv_name=disk.iv_name)
6363 snap_disks.append(new_dev)
6366 if self.op.shutdown and instance.admin_up:
6367 result = self.rpc.call_instance_start(src_node, instance, None, None)
6368 msg = result.RemoteFailMsg()
6370 _ShutdownInstanceDisks(self, instance)
6371 raise errors.OpExecError("Could not start instance: %s" % msg)
6373 # TODO: check for size
6375 cluster_name = self.cfg.GetClusterName()
6376 for idx, dev in enumerate(snap_disks):
6378 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6379 instance, cluster_name, idx)
6380 if result.failed or not result.data:
6381 self.LogWarning("Could not export block device %s from node %s to"
6382 " node %s", dev.logical_id[1], src_node,
6384 msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6386 self.LogWarning("Could not remove snapshot block device %s from node"
6387 " %s: %s", dev.logical_id[1], src_node, msg)
6389 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6390 if result.failed or not result.data:
6391 self.LogWarning("Could not finalize export for instance %s on node %s",
6392 instance.name, dst_node.name)
6394 nodelist = self.cfg.GetNodeList()
6395 nodelist.remove(dst_node.name)
6397 # on one-node clusters nodelist will be empty after the removal
6398 # if we proceed the backup would be removed because OpQueryExports
6399 # substitutes an empty list with the full cluster node list.
6401 exportlist = self.rpc.call_export_list(nodelist)
6402 for node in exportlist:
6403 if exportlist[node].failed:
6405 if instance.name in exportlist[node].data:
6406 if not self.rpc.call_export_remove(node, instance.name):
6407 self.LogWarning("Could not remove older export for instance %s"
6408 " on node %s", instance.name, node)
6411 class LURemoveExport(NoHooksLU):
6412 """Remove exports related to the named instance.
6415 _OP_REQP = ["instance_name"]
6418 def ExpandNames(self):
6419 self.needed_locks = {}
6420 # We need all nodes to be locked in order for RemoveExport to work, but we
6421 # don't need to lock the instance itself, as nothing will happen to it (and
6422 # we can remove exports also for a removed instance)
6423 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6425 def CheckPrereq(self):
6426 """Check prerequisites.
6430 def Exec(self, feedback_fn):
6431 """Remove any export.
6434 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6435 # If the instance was not found we'll try with the name that was passed in.
6436 # This will only work if it was an FQDN, though.
6438 if not instance_name:
6440 instance_name = self.op.instance_name
6442 exportlist = self.rpc.call_export_list(self.acquired_locks[
6443 locking.LEVEL_NODE])
6445 for node in exportlist:
6446 if exportlist[node].failed:
6447 self.LogWarning("Failed to query node %s, continuing" % node)
6449 if instance_name in exportlist[node].data:
6451 result = self.rpc.call_export_remove(node, instance_name)
6452 if result.failed or not result.data:
6453 logging.error("Could not remove export for instance %s"
6454 " on node %s", instance_name, node)
6456 if fqdn_warn and not found:
6457 feedback_fn("Export not found. If trying to remove an export belonging"
6458 " to a deleted instance please use its Fully Qualified"
6462 class TagsLU(NoHooksLU):
6465 This is an abstract class which is the parent of all the other tags LUs.
6469 def ExpandNames(self):
6470 self.needed_locks = {}
6471 if self.op.kind == constants.TAG_NODE:
6472 name = self.cfg.ExpandNodeName(self.op.name)
6474 raise errors.OpPrereqError("Invalid node name (%s)" %
6477 self.needed_locks[locking.LEVEL_NODE] = name
6478 elif self.op.kind == constants.TAG_INSTANCE:
6479 name = self.cfg.ExpandInstanceName(self.op.name)
6481 raise errors.OpPrereqError("Invalid instance name (%s)" %
6484 self.needed_locks[locking.LEVEL_INSTANCE] = name
6486 def CheckPrereq(self):
6487 """Check prerequisites.
6490 if self.op.kind == constants.TAG_CLUSTER:
6491 self.target = self.cfg.GetClusterInfo()
6492 elif self.op.kind == constants.TAG_NODE:
6493 self.target = self.cfg.GetNodeInfo(self.op.name)
6494 elif self.op.kind == constants.TAG_INSTANCE:
6495 self.target = self.cfg.GetInstanceInfo(self.op.name)
6497 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6501 class LUGetTags(TagsLU):
6502 """Returns the tags of a given object.
6505 _OP_REQP = ["kind", "name"]
6508 def Exec(self, feedback_fn):
6509 """Returns the tag list.
6512 return list(self.target.GetTags())
6515 class LUSearchTags(NoHooksLU):
6516 """Searches the tags for a given pattern.
6519 _OP_REQP = ["pattern"]
6522 def ExpandNames(self):
6523 self.needed_locks = {}
6525 def CheckPrereq(self):
6526 """Check prerequisites.
6528 This checks the pattern passed for validity by compiling it.
6532 self.re = re.compile(self.op.pattern)
6533 except re.error, err:
6534 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6535 (self.op.pattern, err))
6537 def Exec(self, feedback_fn):
6538 """Returns the tag list.
6542 tgts = [("/cluster", cfg.GetClusterInfo())]
6543 ilist = cfg.GetAllInstancesInfo().values()
6544 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6545 nlist = cfg.GetAllNodesInfo().values()
6546 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6548 for path, target in tgts:
6549 for tag in target.GetTags():
6550 if self.re.search(tag):
6551 results.append((path, tag))
6555 class LUAddTags(TagsLU):
6556 """Sets a tag on a given object.
6559 _OP_REQP = ["kind", "name", "tags"]
6562 def CheckPrereq(self):
6563 """Check prerequisites.
6565 This checks the type and length of the tag name and value.
6568 TagsLU.CheckPrereq(self)
6569 for tag in self.op.tags:
6570 objects.TaggableObject.ValidateTag(tag)
6572 def Exec(self, feedback_fn):
6577 for tag in self.op.tags:
6578 self.target.AddTag(tag)
6579 except errors.TagError, err:
6580 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6582 self.cfg.Update(self.target)
6583 except errors.ConfigurationError:
6584 raise errors.OpRetryError("There has been a modification to the"
6585 " config file and the operation has been"
6586 " aborted. Please retry.")
6589 class LUDelTags(TagsLU):
6590 """Delete a list of tags from a given object.
6593 _OP_REQP = ["kind", "name", "tags"]
6596 def CheckPrereq(self):
6597 """Check prerequisites.
6599 This checks that we have the given tag.
6602 TagsLU.CheckPrereq(self)
6603 for tag in self.op.tags:
6604 objects.TaggableObject.ValidateTag(tag)
6605 del_tags = frozenset(self.op.tags)
6606 cur_tags = self.target.GetTags()
6607 if not del_tags <= cur_tags:
6608 diff_tags = del_tags - cur_tags
6609 diff_names = ["'%s'" % tag for tag in diff_tags]
6611 raise errors.OpPrereqError("Tag(s) %s not found" %
6612 (",".join(diff_names)))
6614 def Exec(self, feedback_fn):
6615 """Remove the tag from the object.
6618 for tag in self.op.tags:
6619 self.target.RemoveTag(tag)
6621 self.cfg.Update(self.target)
6622 except errors.ConfigurationError:
6623 raise errors.OpRetryError("There has been a modification to the"
6624 " config file and the operation has been"
6625 " aborted. Please retry.")
6628 class LUTestDelay(NoHooksLU):
6629 """Sleep for a specified amount of time.
6631 This LU sleeps on the master and/or nodes for a specified amount of
6635 _OP_REQP = ["duration", "on_master", "on_nodes"]
6638 def ExpandNames(self):
6639 """Expand names and set required locks.
6641 This expands the node list, if any.
6644 self.needed_locks = {}
6645 if self.op.on_nodes:
6646 # _GetWantedNodes can be used here, but is not always appropriate to use
6647 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6649 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6650 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6652 def CheckPrereq(self):
6653 """Check prerequisites.
6657 def Exec(self, feedback_fn):
6658 """Do the actual sleep.
6661 if self.op.on_master:
6662 if not utils.TestDelay(self.op.duration):
6663 raise errors.OpExecError("Error during master delay test")
6664 if self.op.on_nodes:
6665 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6667 raise errors.OpExecError("Complete failure from rpc call")
6668 for node, node_result in result.items():
6670 if not node_result.data:
6671 raise errors.OpExecError("Failure during rpc call to node %s,"
6672 " result: %s" % (node, node_result.data))
6675 class IAllocator(object):
6676 """IAllocator framework.
6678 An IAllocator instance has three sets of attributes:
6679 - cfg that is needed to query the cluster
6680 - input data (all members of the _KEYS class attribute are required)
6681 - four buffer attributes (in|out_data|text), that represent the
6682 input (to the external script) in text and data structure format,
6683 and the output from it, again in two formats
6684 - the result variables from the script (success, info, nodes) for
6689 "mem_size", "disks", "disk_template",
6690 "os", "tags", "nics", "vcpus", "hypervisor",
6696 def __init__(self, lu, mode, name, **kwargs):
6698 # init buffer variables
6699 self.in_text = self.out_text = self.in_data = self.out_data = None
6700 # init all input fields so that pylint is happy
6703 self.mem_size = self.disks = self.disk_template = None
6704 self.os = self.tags = self.nics = self.vcpus = None
6705 self.hypervisor = None
6706 self.relocate_from = None
6708 self.required_nodes = None
6709 # init result fields
6710 self.success = self.info = self.nodes = None
6711 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6712 keyset = self._ALLO_KEYS
6713 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6714 keyset = self._RELO_KEYS
6716 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6717 " IAllocator" % self.mode)
6719 if key not in keyset:
6720 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6721 " IAllocator" % key)
6722 setattr(self, key, kwargs[key])
6724 if key not in kwargs:
6725 raise errors.ProgrammerError("Missing input parameter '%s' to"
6726 " IAllocator" % key)
6727 self._BuildInputData()
6729 def _ComputeClusterData(self):
6730 """Compute the generic allocator input data.
6732 This is the data that is independent of the actual operation.
6736 cluster_info = cfg.GetClusterInfo()
6739 "version": constants.IALLOCATOR_VERSION,
6740 "cluster_name": cfg.GetClusterName(),
6741 "cluster_tags": list(cluster_info.GetTags()),
6742 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6743 # we don't have job IDs
6745 iinfo = cfg.GetAllInstancesInfo().values()
6746 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6750 node_list = cfg.GetNodeList()
6752 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6753 hypervisor_name = self.hypervisor
6754 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6755 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6757 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6759 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6760 cluster_info.enabled_hypervisors)
6761 for nname, nresult in node_data.items():
6762 # first fill in static (config-based) values
6763 ninfo = cfg.GetNodeInfo(nname)
6765 "tags": list(ninfo.GetTags()),
6766 "primary_ip": ninfo.primary_ip,
6767 "secondary_ip": ninfo.secondary_ip,
6768 "offline": ninfo.offline,
6769 "drained": ninfo.drained,
6770 "master_candidate": ninfo.master_candidate,
6773 if not ninfo.offline:
6775 if not isinstance(nresult.data, dict):
6776 raise errors.OpExecError("Can't get data for node %s" % nname)
6777 remote_info = nresult.data
6778 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6779 'vg_size', 'vg_free', 'cpu_total']:
6780 if attr not in remote_info:
6781 raise errors.OpExecError("Node '%s' didn't return attribute"
6782 " '%s'" % (nname, attr))
6784 remote_info[attr] = int(remote_info[attr])
6785 except ValueError, err:
6786 raise errors.OpExecError("Node '%s' returned invalid value"
6787 " for '%s': %s" % (nname, attr, err))
6788 # compute memory used by primary instances
6789 i_p_mem = i_p_up_mem = 0
6790 for iinfo, beinfo in i_list:
6791 if iinfo.primary_node == nname:
6792 i_p_mem += beinfo[constants.BE_MEMORY]
6793 if iinfo.name not in node_iinfo[nname].data:
6796 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6797 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6798 remote_info['memory_free'] -= max(0, i_mem_diff)
6801 i_p_up_mem += beinfo[constants.BE_MEMORY]
6803 # compute memory used by instances
6805 "total_memory": remote_info['memory_total'],
6806 "reserved_memory": remote_info['memory_dom0'],
6807 "free_memory": remote_info['memory_free'],
6808 "total_disk": remote_info['vg_size'],
6809 "free_disk": remote_info['vg_free'],
6810 "total_cpus": remote_info['cpu_total'],
6811 "i_pri_memory": i_p_mem,
6812 "i_pri_up_memory": i_p_up_mem,
6816 node_results[nname] = pnr
6817 data["nodes"] = node_results
6821 for iinfo, beinfo in i_list:
6822 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6823 for n in iinfo.nics]
6825 "tags": list(iinfo.GetTags()),
6826 "admin_up": iinfo.admin_up,
6827 "vcpus": beinfo[constants.BE_VCPUS],
6828 "memory": beinfo[constants.BE_MEMORY],
6830 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6832 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6833 "disk_template": iinfo.disk_template,
6834 "hypervisor": iinfo.hypervisor,
6836 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6838 instance_data[iinfo.name] = pir
6840 data["instances"] = instance_data
6844 def _AddNewInstance(self):
6845 """Add new instance data to allocator structure.
6847 This in combination with _AllocatorGetClusterData will create the
6848 correct structure needed as input for the allocator.
6850 The checks for the completeness of the opcode must have already been
6856 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6858 if self.disk_template in constants.DTS_NET_MIRROR:
6859 self.required_nodes = 2
6861 self.required_nodes = 1
6865 "disk_template": self.disk_template,
6868 "vcpus": self.vcpus,
6869 "memory": self.mem_size,
6870 "disks": self.disks,
6871 "disk_space_total": disk_space,
6873 "required_nodes": self.required_nodes,
6875 data["request"] = request
6877 def _AddRelocateInstance(self):
6878 """Add relocate instance data to allocator structure.
6880 This in combination with _IAllocatorGetClusterData will create the
6881 correct structure needed as input for the allocator.
6883 The checks for the completeness of the opcode must have already been
6887 instance = self.lu.cfg.GetInstanceInfo(self.name)
6888 if instance is None:
6889 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6890 " IAllocator" % self.name)
6892 if instance.disk_template not in constants.DTS_NET_MIRROR:
6893 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6895 if len(instance.secondary_nodes) != 1:
6896 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6898 self.required_nodes = 1
6899 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6900 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6905 "disk_space_total": disk_space,
6906 "required_nodes": self.required_nodes,
6907 "relocate_from": self.relocate_from,
6909 self.in_data["request"] = request
6911 def _BuildInputData(self):
6912 """Build input data structures.
6915 self._ComputeClusterData()
6917 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6918 self._AddNewInstance()
6920 self._AddRelocateInstance()
6922 self.in_text = serializer.Dump(self.in_data)
6924 def Run(self, name, validate=True, call_fn=None):
6925 """Run an instance allocator and return the results.
6929 call_fn = self.lu.rpc.call_iallocator_runner
6932 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6935 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6936 raise errors.OpExecError("Invalid result from master iallocator runner")
6938 rcode, stdout, stderr, fail = result.data
6940 if rcode == constants.IARUN_NOTFOUND:
6941 raise errors.OpExecError("Can't find allocator '%s'" % name)
6942 elif rcode == constants.IARUN_FAILURE:
6943 raise errors.OpExecError("Instance allocator call failed: %s,"
6944 " output: %s" % (fail, stdout+stderr))
6945 self.out_text = stdout
6947 self._ValidateResult()
6949 def _ValidateResult(self):
6950 """Process the allocator results.
6952 This will process and if successful save the result in
6953 self.out_data and the other parameters.
6957 rdict = serializer.Load(self.out_text)
6958 except Exception, err:
6959 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6961 if not isinstance(rdict, dict):
6962 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6964 for key in "success", "info", "nodes":
6965 if key not in rdict:
6966 raise errors.OpExecError("Can't parse iallocator results:"
6967 " missing key '%s'" % key)
6968 setattr(self, key, rdict[key])
6970 if not isinstance(rdict["nodes"], list):
6971 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6973 self.out_data = rdict
6976 class LUTestAllocator(NoHooksLU):
6977 """Run allocator tests.
6979 This LU runs the allocator tests
6982 _OP_REQP = ["direction", "mode", "name"]
6984 def CheckPrereq(self):
6985 """Check prerequisites.
6987 This checks the opcode parameters depending on the director and mode test.
6990 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6991 for attr in ["name", "mem_size", "disks", "disk_template",
6992 "os", "tags", "nics", "vcpus"]:
6993 if not hasattr(self.op, attr):
6994 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6996 iname = self.cfg.ExpandInstanceName(self.op.name)
6997 if iname is not None:
6998 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7000 if not isinstance(self.op.nics, list):
7001 raise errors.OpPrereqError("Invalid parameter 'nics'")
7002 for row in self.op.nics:
7003 if (not isinstance(row, dict) or
7006 "bridge" not in row):
7007 raise errors.OpPrereqError("Invalid contents of the"
7008 " 'nics' parameter")
7009 if not isinstance(self.op.disks, list):
7010 raise errors.OpPrereqError("Invalid parameter 'disks'")
7011 for row in self.op.disks:
7012 if (not isinstance(row, dict) or
7013 "size" not in row or
7014 not isinstance(row["size"], int) or
7015 "mode" not in row or
7016 row["mode"] not in ['r', 'w']):
7017 raise errors.OpPrereqError("Invalid contents of the"
7018 " 'disks' parameter")
7019 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7020 self.op.hypervisor = self.cfg.GetHypervisorType()
7021 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7022 if not hasattr(self.op, "name"):
7023 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7024 fname = self.cfg.ExpandInstanceName(self.op.name)
7026 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7028 self.op.name = fname
7029 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7031 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7034 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7035 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7036 raise errors.OpPrereqError("Missing allocator name")
7037 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7038 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7041 def Exec(self, feedback_fn):
7042 """Run the allocator test.
7045 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7046 ial = IAllocator(self,
7049 mem_size=self.op.mem_size,
7050 disks=self.op.disks,
7051 disk_template=self.op.disk_template,
7055 vcpus=self.op.vcpus,
7056 hypervisor=self.op.hypervisor,
7059 ial = IAllocator(self,
7062 relocate_from=list(self.relocate_from),
7065 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7066 result = ial.in_text
7068 ial.Run(self.op.allocator, validate=False)
7069 result = ial.out_text