4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 for attr_name in self._OP_REQP:
94 attr_val = getattr(op, attr_name, None)
96 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108 ssh = property(fget=__GetSSH)
110 def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments.
113 This method is for doing a simple syntactic check and ensure
114 validity of opcode parameters, without any cluster-related
115 checks. While the same can be accomplished in ExpandNames and/or
116 CheckPrereq, doing these separate is better because:
118 - ExpandNames is left as as purely a lock-related function
119 - CheckPrereq is run after we have aquired locks (and possible
122 The function is allowed to change the self.op attribute so that
123 later methods can no longer worry about missing parameters.
128 def ExpandNames(self):
129 """Expand names for this LU.
131 This method is called before starting to execute the opcode, and it should
132 update all the parameters of the opcode to their canonical form (e.g. a
133 short node name must be fully expanded after this method has successfully
134 completed). This way locking, hooks, logging, ecc. can work correctly.
136 LUs which implement this method must also populate the self.needed_locks
137 member, as a dict with lock levels as keys, and a list of needed lock names
140 - use an empty dict if you don't need any lock
141 - if you don't need any lock at a particular level omit that level
142 - don't put anything for the BGL level
143 - if you want all locks at a level use locking.ALL_SET as a value
145 If you need to share locks (rather than acquire them exclusively) at one
146 level you can modify self.share_locks, setting a true value (usually 1) for
147 that level. By default locks are not shared.
151 # Acquire all nodes and one instance
152 self.needed_locks = {
153 locking.LEVEL_NODE: locking.ALL_SET,
154 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156 # Acquire just two nodes
157 self.needed_locks = {
158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161 self.needed_locks = {} # No, you can't leave it to the default value None
164 # The implementation of this method is mandatory only if the new LU is
165 # concurrent, so that old LUs don't need to be changed all at the same
168 self.needed_locks = {} # Exclusive LUs don't need locks.
170 raise NotImplementedError
172 def DeclareLocks(self, level):
173 """Declare LU locking needs for a level
175 While most LUs can just declare their locking needs at ExpandNames time,
176 sometimes there's the need to calculate some locks after having acquired
177 the ones before. This function is called just before acquiring locks at a
178 particular level, but after acquiring the ones at lower levels, and permits
179 such calculations. It can be used to modify self.needed_locks, and by
180 default it does nothing.
182 This function is only called if you have something already set in
183 self.needed_locks for the level.
185 @param level: Locking level which is going to be locked
186 @type level: member of ganeti.locking.LEVELS
190 def CheckPrereq(self):
191 """Check prerequisites for this LU.
193 This method should check that the prerequisites for the execution
194 of this LU are fulfilled. It can do internode communication, but
195 it should be idempotent - no cluster or system changes are
198 The method should raise errors.OpPrereqError in case something is
199 not fulfilled. Its return value is ignored.
201 This method should also update all the parameters of the opcode to
202 their canonical form if it hasn't been done by ExpandNames before.
205 raise NotImplementedError
207 def Exec(self, feedback_fn):
210 This method should implement the actual work. It should raise
211 errors.OpExecError for failures that are somewhat dealt with in
215 raise NotImplementedError
217 def BuildHooksEnv(self):
218 """Build hooks environment for this LU.
220 This method should return a three-node tuple consisting of: a dict
221 containing the environment that will be used for running the
222 specific hook for this LU, a list of node names on which the hook
223 should run before the execution, and a list of node names on which
224 the hook should run after the execution.
226 The keys of the dict must not have 'GANETI_' prefixed as this will
227 be handled in the hooks runner. Also note additional keys will be
228 added by the hooks runner. If the LU doesn't define any
229 environment, an empty dict (and not None) should be returned.
231 No nodes should be returned as an empty list (and not None).
233 Note that if the HPATH for a LU class is None, this function will
237 raise NotImplementedError
239 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks.
242 This method is called every time a hooks phase is executed, and notifies
243 the Logical Unit about the hooks' result. The LU can then use it to alter
244 its result based on the hooks. By default the method does nothing and the
245 previous result is passed back unchanged but any LU can define it if it
246 wants to use the local cluster hook-scripts somehow.
248 @param phase: one of L{constants.HOOKS_PHASE_POST} or
249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250 @param hook_results: the results of the multi-node hooks rpc call
251 @param feedback_fn: function used send feedback back to the caller
252 @param lu_result: the previous Exec result this LU had, or None
254 @return: the new Exec result, based on the previous result
260 def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance.
263 Many LUs that work on an instance take its name in self.op.instance_name
264 and need to expand it and then declare the expanded name for locking. This
265 function does it, and then updates self.op.instance_name to the expanded
266 name. It also initializes needed_locks as a dict, if this hasn't been done
270 if self.needed_locks is None:
271 self.needed_locks = {}
273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274 "_ExpandAndLockInstance called with instance-level locks set"
275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276 if expanded_name is None:
277 raise errors.OpPrereqError("Instance '%s' not known" %
278 self.op.instance_name)
279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280 self.op.instance_name = expanded_name
282 def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking.
285 This function should be called after locking one or more instances to lock
286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287 with all primary or secondary nodes for instances already locked and
288 present in self.needed_locks[locking.LEVEL_INSTANCE].
290 It should be called from DeclareLocks, and for safety only works if
291 self.recalculate_locks[locking.LEVEL_NODE] is set.
293 In the future it may grow parameters to just lock some instance's nodes, or
294 to just lock primaries or secondary nodes, if needed.
296 If should be called in DeclareLocks in a way similar to::
298 if level == locking.LEVEL_NODE:
299 self._LockInstancesNodes()
301 @type primary_only: boolean
302 @param primary_only: only lock primary nodes of locked instances
305 assert locking.LEVEL_NODE in self.recalculate_locks, \
306 "_LockInstancesNodes helper function called with no nodes to recalculate"
308 # TODO: check if we're really been called with the instance locks held
310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311 # future we might want to have different behaviors depending on the value
312 # of self.recalculate_locks[locking.LEVEL_NODE]
314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315 instance = self.context.cfg.GetInstanceInfo(instance_name)
316 wanted_nodes.append(instance.primary_node)
318 wanted_nodes.extend(instance.secondary_nodes)
320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325 del self.recalculate_locks[locking.LEVEL_NODE]
328 class NoHooksLU(LogicalUnit):
329 """Simple LU which runs no hooks.
331 This LU is intended as a parent for other LogicalUnits which will
332 run no hooks, in order to reduce duplicate code.
339 def _GetWantedNodes(lu, nodes):
340 """Returns list of checked and expanded node names.
342 @type lu: L{LogicalUnit}
343 @param lu: the logical unit on whose behalf we execute
345 @param nodes: list of node names or None for all nodes
347 @return: the list of nodes, sorted
348 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351 if not isinstance(nodes, list):
352 raise errors.OpPrereqError("Invalid argument type 'nodes'")
355 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356 " non-empty list of nodes whose name is to be expanded.")
360 node = lu.cfg.ExpandNodeName(name)
362 raise errors.OpPrereqError("No such node name '%s'" % name)
365 return utils.NiceSort(wanted)
368 def _GetWantedInstances(lu, instances):
369 """Returns list of checked and expanded instance names.
371 @type lu: L{LogicalUnit}
372 @param lu: the logical unit on whose behalf we execute
373 @type instances: list
374 @param instances: list of instance names or None for all instances
376 @return: the list of instances, sorted
377 @raise errors.OpPrereqError: if the instances parameter is wrong type
378 @raise errors.OpPrereqError: if any of the passed instances is not found
381 if not isinstance(instances, list):
382 raise errors.OpPrereqError("Invalid argument type 'instances'")
387 for name in instances:
388 instance = lu.cfg.ExpandInstanceName(name)
390 raise errors.OpPrereqError("No such instance name '%s'" % name)
391 wanted.append(instance)
394 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398 def _CheckOutputFields(static, dynamic, selected):
399 """Checks whether all selected fields are valid.
401 @type static: L{utils.FieldSet}
402 @param static: static fields set
403 @type dynamic: L{utils.FieldSet}
404 @param dynamic: dynamic fields set
411 delta = f.NonMatching(selected)
413 raise errors.OpPrereqError("Unknown output fields selected: %s"
417 def _CheckBooleanOpField(op, name):
418 """Validates boolean opcode parameters.
420 This will ensure that an opcode parameter is either a boolean value,
421 or None (but that it always exists).
424 val = getattr(op, name, None)
425 if not (val is None or isinstance(val, bool)):
426 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428 setattr(op, name, val)
431 def _CheckNodeOnline(lu, node):
432 """Ensure that a given node is online.
434 @param lu: the LU on behalf of which we make the check
435 @param node: the node to check
436 @raise errors.OpPrereqError: if the node is offline
439 if lu.cfg.GetNodeInfo(node).offline:
440 raise errors.OpPrereqError("Can't use offline node %s" % node)
443 def _CheckNodeNotDrained(lu, node):
444 """Ensure that a given node is not drained.
446 @param lu: the LU on behalf of which we make the check
447 @param node: the node to check
448 @raise errors.OpPrereqError: if the node is drained
451 if lu.cfg.GetNodeInfo(node).drained:
452 raise errors.OpPrereqError("Can't use drained node %s" % node)
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456 memory, vcpus, nics, disk_template, disks,
457 bep, hvp, hypervisor):
458 """Builds instance related env variables for hooks
460 This builds the hook environment from individual variables.
463 @param name: the name of the instance
464 @type primary_node: string
465 @param primary_node: the name of the instance's primary node
466 @type secondary_nodes: list
467 @param secondary_nodes: list of secondary nodes as strings
468 @type os_type: string
469 @param os_type: the name of the instance's OS
470 @type status: boolean
471 @param status: the should_run status of the instance
473 @param memory: the memory size of the instance
475 @param vcpus: the count of VCPUs the instance has
477 @param nics: list of tuples (ip, bridge, mac) representing
478 the NICs the instance has
479 @type disk_template: string
480 @param disk_template: the distk template of the instance
482 @param disks: the list of (size, mode) pairs
484 @param bep: the backend parameters for the instance
486 @param hvp: the hypervisor parameters for the instance
487 @type hypervisor: string
488 @param hypervisor: the hypervisor for the instance
490 @return: the hook environment for this instance
499 "INSTANCE_NAME": name,
500 "INSTANCE_PRIMARY": primary_node,
501 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502 "INSTANCE_OS_TYPE": os_type,
503 "INSTANCE_STATUS": str_status,
504 "INSTANCE_MEMORY": memory,
505 "INSTANCE_VCPUS": vcpus,
506 "INSTANCE_DISK_TEMPLATE": disk_template,
507 "INSTANCE_HYPERVISOR": hypervisor,
511 nic_count = len(nics)
512 for idx, (ip, bridge, mac) in enumerate(nics):
515 env["INSTANCE_NIC%d_IP" % idx] = ip
516 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517 env["INSTANCE_NIC%d_MAC" % idx] = mac
521 env["INSTANCE_NIC_COUNT"] = nic_count
524 disk_count = len(disks)
525 for idx, (size, mode) in enumerate(disks):
526 env["INSTANCE_DISK%d_SIZE" % idx] = size
527 env["INSTANCE_DISK%d_MODE" % idx] = mode
531 env["INSTANCE_DISK_COUNT"] = disk_count
533 for source, kind in [(bep, "BE"), (hvp, "HV")]:
534 for key, value in source.items():
535 env["INSTANCE_%s_%s" % (kind, key)] = value
540 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541 """Builds instance related env variables for hooks from an object.
543 @type lu: L{LogicalUnit}
544 @param lu: the logical unit on whose behalf we execute
545 @type instance: L{objects.Instance}
546 @param instance: the instance for which we should build the
549 @param override: dictionary with key/values that will override
552 @return: the hook environment dictionary
555 cluster = lu.cfg.GetClusterInfo()
556 bep = cluster.FillBE(instance)
557 hvp = cluster.FillHV(instance)
559 'name': instance.name,
560 'primary_node': instance.primary_node,
561 'secondary_nodes': instance.secondary_nodes,
562 'os_type': instance.os,
563 'status': instance.admin_up,
564 'memory': bep[constants.BE_MEMORY],
565 'vcpus': bep[constants.BE_VCPUS],
566 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567 'disk_template': instance.disk_template,
568 'disks': [(disk.size, disk.mode) for disk in instance.disks],
571 'hypervisor': instance.hypervisor,
574 args.update(override)
575 return _BuildInstanceHookEnv(**args)
578 def _AdjustCandidatePool(lu):
579 """Adjust the candidate pool after node operations.
582 mod_list = lu.cfg.MaintainCandidatePool()
584 lu.LogInfo("Promoted nodes to master candidate role: %s",
585 ", ".join(node.name for node in mod_list))
586 for name in mod_list:
587 lu.context.ReaddNode(name)
588 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
590 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
594 def _CheckInstanceBridgesExist(lu, instance):
595 """Check that the brigdes needed by an instance exist.
598 # check bridges existance
599 brlist = [nic.bridge for nic in instance.nics]
600 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
603 raise errors.OpPrereqError("One or more target bridges %s does not"
604 " exist on destination node '%s'" %
605 (brlist, instance.primary_node))
608 class LUDestroyCluster(NoHooksLU):
609 """Logical unit for destroying the cluster.
614 def CheckPrereq(self):
615 """Check prerequisites.
617 This checks whether the cluster is empty.
619 Any errors are signalled by raising errors.OpPrereqError.
622 master = self.cfg.GetMasterNode()
624 nodelist = self.cfg.GetNodeList()
625 if len(nodelist) != 1 or nodelist[0] != master:
626 raise errors.OpPrereqError("There are still %d node(s) in"
627 " this cluster." % (len(nodelist) - 1))
628 instancelist = self.cfg.GetInstanceList()
630 raise errors.OpPrereqError("There are still %d instance(s) in"
631 " this cluster." % len(instancelist))
633 def Exec(self, feedback_fn):
634 """Destroys the cluster.
637 master = self.cfg.GetMasterNode()
638 result = self.rpc.call_node_stop_master(master, False)
641 raise errors.OpExecError("Could not disable the master role")
642 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643 utils.CreateBackup(priv_key)
644 utils.CreateBackup(pub_key)
648 class LUVerifyCluster(LogicalUnit):
649 """Verifies the cluster status.
652 HPATH = "cluster-verify"
653 HTYPE = constants.HTYPE_CLUSTER
654 _OP_REQP = ["skip_checks"]
657 def ExpandNames(self):
658 self.needed_locks = {
659 locking.LEVEL_NODE: locking.ALL_SET,
660 locking.LEVEL_INSTANCE: locking.ALL_SET,
662 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
664 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665 node_result, feedback_fn, master_files,
667 """Run multiple tests against a node.
671 - compares ganeti version
672 - checks vg existance and size > 20G
673 - checks config file checksum
674 - checks ssh to other nodes
676 @type nodeinfo: L{objects.Node}
677 @param nodeinfo: the node to check
678 @param file_list: required list of files
679 @param local_cksum: dictionary of local files and their checksums
680 @param node_result: the results from the node
681 @param feedback_fn: function used to accumulate results
682 @param master_files: list of files that only masters should have
683 @param drbd_map: the useddrbd minors for this node, in
684 form of minor: (instance, must_exist) which correspond to instances
685 and their running status
686 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
691 # main result, node_result should be a non-empty dict
692 if not node_result or not isinstance(node_result, dict):
693 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
696 # compares ganeti version
697 local_version = constants.PROTOCOL_VERSION
698 remote_version = node_result.get('version', None)
699 if not (remote_version and isinstance(remote_version, (list, tuple)) and
700 len(remote_version) == 2):
701 feedback_fn(" - ERROR: connection to %s failed" % (node))
704 if local_version != remote_version[0]:
705 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
706 " node %s %s" % (local_version, node, remote_version[0]))
709 # node seems compatible, we can actually try to look into its results
713 # full package version
714 if constants.RELEASE_VERSION != remote_version[1]:
715 feedback_fn(" - WARNING: software version mismatch: master %s,"
717 (constants.RELEASE_VERSION, node, remote_version[1]))
719 # checks vg existence and size > 20G
720 if vg_name is not None:
721 vglist = node_result.get(constants.NV_VGLIST, None)
723 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
727 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728 constants.MIN_VG_SIZE)
730 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
733 # checks config file checksum
735 remote_cksum = node_result.get(constants.NV_FILELIST, None)
736 if not isinstance(remote_cksum, dict):
738 feedback_fn(" - ERROR: node hasn't returned file checksum data")
740 for file_name in file_list:
741 node_is_mc = nodeinfo.master_candidate
742 must_have_file = file_name not in master_files
743 if file_name not in remote_cksum:
744 if node_is_mc or must_have_file:
746 feedback_fn(" - ERROR: file '%s' missing" % file_name)
747 elif remote_cksum[file_name] != local_cksum[file_name]:
748 if node_is_mc or must_have_file:
750 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
752 # not candidate and this is not a must-have file
754 feedback_fn(" - ERROR: file '%s' should not exist on non master"
755 " candidates (and the file is outdated)" % file_name)
757 # all good, except non-master/non-must have combination
758 if not node_is_mc and not must_have_file:
759 feedback_fn(" - ERROR: file '%s' should not exist on non master"
760 " candidates" % file_name)
764 if constants.NV_NODELIST not in node_result:
766 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
768 if node_result[constants.NV_NODELIST]:
770 for node in node_result[constants.NV_NODELIST]:
771 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
772 (node, node_result[constants.NV_NODELIST][node]))
774 if constants.NV_NODENETTEST not in node_result:
776 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
778 if node_result[constants.NV_NODENETTEST]:
780 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
782 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
783 (node, node_result[constants.NV_NODENETTEST][node]))
785 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786 if isinstance(hyp_result, dict):
787 for hv_name, hv_result in hyp_result.iteritems():
788 if hv_result is not None:
789 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
790 (hv_name, hv_result))
792 # check used drbd list
793 if vg_name is not None:
794 used_minors = node_result.get(constants.NV_DRBDLIST, [])
795 if not isinstance(used_minors, (tuple, list)):
796 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
799 for minor, (iname, must_exist) in drbd_map.items():
800 if minor not in used_minors and must_exist:
801 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
802 " not active" % (minor, iname))
804 for minor in used_minors:
805 if minor not in drbd_map:
806 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
812 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813 node_instance, feedback_fn, n_offline):
814 """Verify an instance.
816 This function checks to see if the required block devices are
817 available on the instance's node.
822 node_current = instanceconfig.primary_node
825 instanceconfig.MapLVsByNode(node_vol_should)
827 for node in node_vol_should:
828 if node in n_offline:
829 # ignore missing volumes on offline nodes
831 for volume in node_vol_should[node]:
832 if node not in node_vol_is or volume not in node_vol_is[node]:
833 feedback_fn(" - ERROR: volume %s missing on node %s" %
837 if instanceconfig.admin_up:
838 if ((node_current not in node_instance or
839 not instance in node_instance[node_current]) and
840 node_current not in n_offline):
841 feedback_fn(" - ERROR: instance %s not running on node %s" %
842 (instance, node_current))
845 for node in node_instance:
846 if (not node == node_current):
847 if instance in node_instance[node]:
848 feedback_fn(" - ERROR: instance %s should not run on node %s" %
854 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855 """Verify if there are any unknown volumes in the cluster.
857 The .os, .swap and backup volumes are ignored. All other volumes are
863 for node in node_vol_is:
864 for volume in node_vol_is[node]:
865 if node not in node_vol_should or volume not in node_vol_should[node]:
866 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
871 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872 """Verify the list of running instances.
874 This checks what instances are running but unknown to the cluster.
878 for node in node_instance:
879 for runninginstance in node_instance[node]:
880 if runninginstance not in instancelist:
881 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
882 (runninginstance, node))
886 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887 """Verify N+1 Memory Resilience.
889 Check that if one single node dies we can still start all the instances it
895 for node, nodeinfo in node_info.iteritems():
896 # This code checks that every node which is now listed as secondary has
897 # enough memory to host all instances it is supposed to should a single
898 # other node in the cluster fail.
899 # FIXME: not ready for failover to an arbitrary node
900 # FIXME: does not support file-backed instances
901 # WARNING: we currently take into account down instances as well as up
902 # ones, considering that even if they're down someone might want to start
903 # them even in the event of a node failure.
904 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
906 for instance in instances:
907 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908 if bep[constants.BE_AUTO_BALANCE]:
909 needed_mem += bep[constants.BE_MEMORY]
910 if nodeinfo['mfree'] < needed_mem:
911 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
912 " failovers should node %s fail" % (node, prinode))
916 def CheckPrereq(self):
917 """Check prerequisites.
919 Transform the list of checks we're going to skip into a set and check that
920 all its members are valid.
923 self.skip_set = frozenset(self.op.skip_checks)
924 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925 raise errors.OpPrereqError("Invalid checks to be skipped specified")
927 def BuildHooksEnv(self):
930 Cluster-Verify hooks just rone in the post phase and their failure makes
931 the output be logged in the verify output and the verification to fail.
934 all_nodes = self.cfg.GetNodeList()
936 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
938 for node in self.cfg.GetAllNodesInfo().values():
939 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
941 return env, [], all_nodes
943 def Exec(self, feedback_fn):
944 """Verify integrity of cluster, performing various test on nodes.
948 feedback_fn("* Verifying global settings")
949 for msg in self.cfg.VerifyConfig():
950 feedback_fn(" - ERROR: %s" % msg)
952 vg_name = self.cfg.GetVGName()
953 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954 nodelist = utils.NiceSort(self.cfg.GetNodeList())
955 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958 for iname in instancelist)
959 i_non_redundant = [] # Non redundant instances
960 i_non_a_balanced = [] # Non auto-balanced instances
961 n_offline = [] # List of offline nodes
962 n_drained = [] # List of nodes being drained
968 # FIXME: verify OS list
970 master_files = [constants.CLUSTER_CONF_FILE]
972 file_names = ssconf.SimpleStore().GetFileList()
973 file_names.append(constants.SSL_CERT_FILE)
974 file_names.append(constants.RAPI_CERT_FILE)
975 file_names.extend(master_files)
977 local_checksums = utils.FingerprintFiles(file_names)
979 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980 node_verify_param = {
981 constants.NV_FILELIST: file_names,
982 constants.NV_NODELIST: [node.name for node in nodeinfo
983 if not node.offline],
984 constants.NV_HYPERVISOR: hypervisors,
985 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986 node.secondary_ip) for node in nodeinfo
987 if not node.offline],
988 constants.NV_INSTANCELIST: hypervisors,
989 constants.NV_VERSION: None,
990 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
992 if vg_name is not None:
993 node_verify_param[constants.NV_VGLIST] = None
994 node_verify_param[constants.NV_LVLIST] = vg_name
995 node_verify_param[constants.NV_DRBDLIST] = None
996 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997 self.cfg.GetClusterName())
999 cluster = self.cfg.GetClusterInfo()
1000 master_node = self.cfg.GetMasterNode()
1001 all_drbd_map = self.cfg.ComputeDRBDMap()
1003 for node_i in nodeinfo:
1005 nresult = all_nvinfo[node].data
1008 feedback_fn("* Skipping offline node %s" % (node,))
1009 n_offline.append(node)
1012 if node == master_node:
1014 elif node_i.master_candidate:
1015 ntype = "master candidate"
1016 elif node_i.drained:
1018 n_drained.append(node)
1021 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1023 if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024 feedback_fn(" - ERROR: connection to %s failed" % (node,))
1029 for minor, instance in all_drbd_map[node].items():
1030 if instance not in instanceinfo:
1031 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1033 # ghost instance should not be running, but otherwise we
1034 # don't give double warnings (both ghost instance and
1035 # unallocated minor in use)
1036 node_drbd[minor] = (instance, False)
1038 instance = instanceinfo[instance]
1039 node_drbd[minor] = (instance.name, instance.admin_up)
1040 result = self._VerifyNode(node_i, file_names, local_checksums,
1041 nresult, feedback_fn, master_files,
1045 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1047 node_volume[node] = {}
1048 elif isinstance(lvdata, basestring):
1049 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1050 (node, utils.SafeEncode(lvdata)))
1052 node_volume[node] = {}
1053 elif not isinstance(lvdata, dict):
1054 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1058 node_volume[node] = lvdata
1061 idata = nresult.get(constants.NV_INSTANCELIST, None)
1062 if not isinstance(idata, list):
1063 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1068 node_instance[node] = idata
1071 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072 if not isinstance(nodeinfo, dict):
1073 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1079 "mfree": int(nodeinfo['memory_free']),
1082 # dictionary holding all instances this node is secondary for,
1083 # grouped by their primary node. Each key is a cluster node, and each
1084 # value is a list of instances which have the key as primary and the
1085 # current node as secondary. this is handy to calculate N+1 memory
1086 # availability if you can only failover from a primary to its
1088 "sinst-by-pnode": {},
1090 # FIXME: devise a free space model for file based instances as well
1091 if vg_name is not None:
1092 if (constants.NV_VGLIST not in nresult or
1093 vg_name not in nresult[constants.NV_VGLIST]):
1094 feedback_fn(" - ERROR: node %s didn't return data for the"
1095 " volume group '%s' - it is either missing or broken" %
1099 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100 except (ValueError, KeyError):
1101 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1102 " from node %s" % (node,))
1106 node_vol_should = {}
1108 for instance in instancelist:
1109 feedback_fn("* Verifying instance %s" % instance)
1110 inst_config = instanceinfo[instance]
1111 result = self._VerifyInstance(instance, inst_config, node_volume,
1112 node_instance, feedback_fn, n_offline)
1114 inst_nodes_offline = []
1116 inst_config.MapLVsByNode(node_vol_should)
1118 instance_cfg[instance] = inst_config
1120 pnode = inst_config.primary_node
1121 if pnode in node_info:
1122 node_info[pnode]['pinst'].append(instance)
1123 elif pnode not in n_offline:
1124 feedback_fn(" - ERROR: instance %s, connection to primary node"
1125 " %s failed" % (instance, pnode))
1128 if pnode in n_offline:
1129 inst_nodes_offline.append(pnode)
1131 # If the instance is non-redundant we cannot survive losing its primary
1132 # node, so we are not N+1 compliant. On the other hand we have no disk
1133 # templates with more than one secondary so that situation is not well
1135 # FIXME: does not support file-backed instances
1136 if len(inst_config.secondary_nodes) == 0:
1137 i_non_redundant.append(instance)
1138 elif len(inst_config.secondary_nodes) > 1:
1139 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1142 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143 i_non_a_balanced.append(instance)
1145 for snode in inst_config.secondary_nodes:
1146 if snode in node_info:
1147 node_info[snode]['sinst'].append(instance)
1148 if pnode not in node_info[snode]['sinst-by-pnode']:
1149 node_info[snode]['sinst-by-pnode'][pnode] = []
1150 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151 elif snode not in n_offline:
1152 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1153 " %s failed" % (instance, snode))
1155 if snode in n_offline:
1156 inst_nodes_offline.append(snode)
1158 if inst_nodes_offline:
1159 # warn that the instance lives on offline nodes, and set bad=True
1160 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1161 ", ".join(inst_nodes_offline))
1164 feedback_fn("* Verifying orphan volumes")
1165 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1169 feedback_fn("* Verifying remaining instances")
1170 result = self._VerifyOrphanInstances(instancelist, node_instance,
1174 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175 feedback_fn("* Verifying N+1 Memory redundancy")
1176 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1179 feedback_fn("* Other Notes")
1181 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1182 % len(i_non_redundant))
1184 if i_non_a_balanced:
1185 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1186 % len(i_non_a_balanced))
1189 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1192 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1196 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197 """Analize the post-hooks' result
1199 This method analyses the hook result, handles it, and sends some
1200 nicely-formatted feedback back to the user.
1202 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204 @param hooks_results: the results of the multi-node hooks rpc call
1205 @param feedback_fn: function used send feedback back to the caller
1206 @param lu_result: previous Exec result
1207 @return: the new Exec result, based on the previous result
1211 # We only really run POST phase hooks, and are only interested in
1213 if phase == constants.HOOKS_PHASE_POST:
1214 # Used to change hooks' output to proper indentation
1215 indent_re = re.compile('^', re.M)
1216 feedback_fn("* Hooks Results")
1217 if not hooks_results:
1218 feedback_fn(" - ERROR: general communication failure")
1221 for node_name in hooks_results:
1222 show_node_header = True
1223 res = hooks_results[node_name]
1224 if res.failed or res.data is False or not isinstance(res.data, list):
1226 # no need to warn or set fail return value
1228 feedback_fn(" Communication failure in hooks execution")
1231 for script, hkr, output in res.data:
1232 if hkr == constants.HKR_FAIL:
1233 # The node header is only shown once, if there are
1234 # failing hooks on that node
1235 if show_node_header:
1236 feedback_fn(" Node %s:" % node_name)
1237 show_node_header = False
1238 feedback_fn(" ERROR: Script %s failed, output:" % script)
1239 output = indent_re.sub(' ', output)
1240 feedback_fn("%s" % output)
1246 class LUVerifyDisks(NoHooksLU):
1247 """Verifies the cluster disks status.
1253 def ExpandNames(self):
1254 self.needed_locks = {
1255 locking.LEVEL_NODE: locking.ALL_SET,
1256 locking.LEVEL_INSTANCE: locking.ALL_SET,
1258 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1260 def CheckPrereq(self):
1261 """Check prerequisites.
1263 This has no prerequisites.
1268 def Exec(self, feedback_fn):
1269 """Verify integrity of cluster disks.
1272 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1274 vg_name = self.cfg.GetVGName()
1275 nodes = utils.NiceSort(self.cfg.GetNodeList())
1276 instances = [self.cfg.GetInstanceInfo(name)
1277 for name in self.cfg.GetInstanceList()]
1280 for inst in instances:
1282 if (not inst.admin_up or
1283 inst.disk_template not in constants.DTS_NET_MIRROR):
1285 inst.MapLVsByNode(inst_lvs)
1286 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287 for node, vol_list in inst_lvs.iteritems():
1288 for vol in vol_list:
1289 nv_dict[(node, vol)] = inst
1294 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1299 lvs = node_lvs[node]
1302 self.LogWarning("Connection to node %s failed: %s" %
1306 if isinstance(lvs, basestring):
1307 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308 res_nlvm[node] = lvs
1310 elif not isinstance(lvs, dict):
1311 logging.warning("Connection to node %s failed or invalid data"
1313 res_nodes.append(node)
1316 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317 inst = nv_dict.pop((node, lv_name), None)
1318 if (not lv_online and inst is not None
1319 and inst.name not in res_instances):
1320 res_instances.append(inst.name)
1322 # any leftover items in nv_dict are missing LVs, let's arrange the
1324 for key, inst in nv_dict.iteritems():
1325 if inst.name not in res_missing:
1326 res_missing[inst.name] = []
1327 res_missing[inst.name].append(key)
1332 class LURepairDiskSizes(NoHooksLU):
1333 """Verifies the cluster disks sizes.
1336 _OP_REQP = ["instances"]
1339 def ExpandNames(self):
1341 if not isinstance(self.op.instances, list):
1342 raise errors.OpPrereqError("Invalid argument type 'instances'")
1344 if self.op.instances:
1345 self.wanted_names = []
1346 for name in self.op.instances:
1347 full_name = self.cfg.ExpandInstanceName(name)
1348 if full_name is None:
1349 raise errors.OpPrereqError("Instance '%s' not known" % name)
1350 self.wanted_names.append(full_name)
1351 self.needed_locks = {
1352 locking.LEVEL_NODE: [],
1353 locking.LEVEL_INSTANCE: self.wanted_names,
1355 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1357 self.wanted_names = None
1358 self.needed_locks = {
1359 locking.LEVEL_NODE: locking.ALL_SET,
1360 locking.LEVEL_INSTANCE: locking.ALL_SET,
1362 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1364 def DeclareLocks(self, level):
1365 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1366 self._LockInstancesNodes(primary_only=True)
1368 def CheckPrereq(self):
1369 """Check prerequisites.
1371 This only checks the optional instance list against the existing names.
1374 if self.wanted_names is None:
1375 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1377 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1378 in self.wanted_names]
1380 def Exec(self, feedback_fn):
1381 """Verify the size of cluster disks.
1384 # TODO: check child disks too
1385 # TODO: check differences in size between primary/secondary nodes
1387 for instance in self.wanted_instances:
1388 pnode = instance.primary_node
1389 if pnode not in per_node_disks:
1390 per_node_disks[pnode] = []
1391 for idx, disk in enumerate(instance.disks):
1392 per_node_disks[pnode].append((instance, idx, disk))
1395 for node, dskl in per_node_disks.items():
1396 newl = [v[2].Copy() for v in dskl]
1398 self.cfg.SetDiskID(dsk, node)
1399 result = self.rpc.call_blockdev_getsizes(node, newl)
1401 self.LogWarning("Failure in blockdev_getsizes call to node"
1402 " %s, ignoring", node)
1404 if len(result.data) != len(dskl):
1405 self.LogWarning("Invalid result from node %s, ignoring node results",
1408 for ((instance, idx, disk), size) in zip(dskl, result.data):
1410 self.LogWarning("Disk %d of instance %s did not return size"
1411 " information, ignoring", idx, instance.name)
1413 if not isinstance(size, (int, long)):
1414 self.LogWarning("Disk %d of instance %s did not return valid"
1415 " size information, ignoring", idx, instance.name)
1418 if size != disk.size:
1419 self.LogInfo("Disk %d of instance %s has mismatched size,"
1420 " correcting: recorded %d, actual %d", idx,
1421 instance.name, disk.size, size)
1423 self.cfg.Update(instance)
1424 changed.append((instance.name, idx, size))
1428 class LURenameCluster(LogicalUnit):
1429 """Rename the cluster.
1432 HPATH = "cluster-rename"
1433 HTYPE = constants.HTYPE_CLUSTER
1436 def BuildHooksEnv(self):
1441 "OP_TARGET": self.cfg.GetClusterName(),
1442 "NEW_NAME": self.op.name,
1444 mn = self.cfg.GetMasterNode()
1445 return env, [mn], [mn]
1447 def CheckPrereq(self):
1448 """Verify that the passed name is a valid one.
1451 hostname = utils.HostInfo(self.op.name)
1453 new_name = hostname.name
1454 self.ip = new_ip = hostname.ip
1455 old_name = self.cfg.GetClusterName()
1456 old_ip = self.cfg.GetMasterIP()
1457 if new_name == old_name and new_ip == old_ip:
1458 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1459 " cluster has changed")
1460 if new_ip != old_ip:
1461 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1462 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1463 " reachable on the network. Aborting." %
1466 self.op.name = new_name
1468 def Exec(self, feedback_fn):
1469 """Rename the cluster.
1472 clustername = self.op.name
1475 # shutdown the master IP
1476 master = self.cfg.GetMasterNode()
1477 result = self.rpc.call_node_stop_master(master, False)
1478 if result.failed or not result.data:
1479 raise errors.OpExecError("Could not disable the master role")
1482 cluster = self.cfg.GetClusterInfo()
1483 cluster.cluster_name = clustername
1484 cluster.master_ip = ip
1485 self.cfg.Update(cluster)
1487 # update the known hosts file
1488 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1489 node_list = self.cfg.GetNodeList()
1491 node_list.remove(master)
1494 result = self.rpc.call_upload_file(node_list,
1495 constants.SSH_KNOWN_HOSTS_FILE)
1496 for to_node, to_result in result.iteritems():
1497 if to_result.failed or not to_result.data:
1498 logging.error("Copy of file %s to node %s failed",
1499 constants.SSH_KNOWN_HOSTS_FILE, to_node)
1502 result = self.rpc.call_node_start_master(master, False, False)
1503 if result.failed or not result.data:
1504 self.LogWarning("Could not re-enable the master role on"
1505 " the master, please restart manually.")
1508 def _RecursiveCheckIfLVMBased(disk):
1509 """Check if the given disk or its children are lvm-based.
1511 @type disk: L{objects.Disk}
1512 @param disk: the disk to check
1514 @return: boolean indicating whether a LD_LV dev_type was found or not
1518 for chdisk in disk.children:
1519 if _RecursiveCheckIfLVMBased(chdisk):
1521 return disk.dev_type == constants.LD_LV
1524 class LUSetClusterParams(LogicalUnit):
1525 """Change the parameters of the cluster.
1528 HPATH = "cluster-modify"
1529 HTYPE = constants.HTYPE_CLUSTER
1533 def CheckArguments(self):
1537 if not hasattr(self.op, "candidate_pool_size"):
1538 self.op.candidate_pool_size = None
1539 if self.op.candidate_pool_size is not None:
1541 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1542 except (ValueError, TypeError), err:
1543 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1545 if self.op.candidate_pool_size < 1:
1546 raise errors.OpPrereqError("At least one master candidate needed")
1548 def ExpandNames(self):
1549 # FIXME: in the future maybe other cluster params won't require checking on
1550 # all nodes to be modified.
1551 self.needed_locks = {
1552 locking.LEVEL_NODE: locking.ALL_SET,
1554 self.share_locks[locking.LEVEL_NODE] = 1
1556 def BuildHooksEnv(self):
1561 "OP_TARGET": self.cfg.GetClusterName(),
1562 "NEW_VG_NAME": self.op.vg_name,
1564 mn = self.cfg.GetMasterNode()
1565 return env, [mn], [mn]
1567 def CheckPrereq(self):
1568 """Check prerequisites.
1570 This checks whether the given params don't conflict and
1571 if the given volume group is valid.
1574 if self.op.vg_name is not None and not self.op.vg_name:
1575 instances = self.cfg.GetAllInstancesInfo().values()
1576 for inst in instances:
1577 for disk in inst.disks:
1578 if _RecursiveCheckIfLVMBased(disk):
1579 raise errors.OpPrereqError("Cannot disable lvm storage while"
1580 " lvm-based instances exist")
1582 node_list = self.acquired_locks[locking.LEVEL_NODE]
1584 # if vg_name not None, checks given volume group on all nodes
1586 vglist = self.rpc.call_vg_list(node_list)
1587 for node in node_list:
1588 if vglist[node].failed:
1589 # ignoring down node
1590 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1592 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1594 constants.MIN_VG_SIZE)
1596 raise errors.OpPrereqError("Error on node '%s': %s" %
1599 self.cluster = cluster = self.cfg.GetClusterInfo()
1600 # validate beparams changes
1601 if self.op.beparams:
1602 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1603 self.new_beparams = cluster.FillDict(
1604 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1606 # hypervisor list/parameters
1607 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1608 if self.op.hvparams:
1609 if not isinstance(self.op.hvparams, dict):
1610 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1611 for hv_name, hv_dict in self.op.hvparams.items():
1612 if hv_name not in self.new_hvparams:
1613 self.new_hvparams[hv_name] = hv_dict
1615 self.new_hvparams[hv_name].update(hv_dict)
1617 if self.op.enabled_hypervisors is not None:
1618 self.hv_list = self.op.enabled_hypervisors
1620 self.hv_list = cluster.enabled_hypervisors
1622 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1623 # either the enabled list has changed, or the parameters have, validate
1624 for hv_name, hv_params in self.new_hvparams.items():
1625 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1626 (self.op.enabled_hypervisors and
1627 hv_name in self.op.enabled_hypervisors)):
1628 # either this is a new hypervisor, or its parameters have changed
1629 hv_class = hypervisor.GetHypervisor(hv_name)
1630 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1631 hv_class.CheckParameterSyntax(hv_params)
1632 _CheckHVParams(self, node_list, hv_name, hv_params)
1634 def Exec(self, feedback_fn):
1635 """Change the parameters of the cluster.
1638 if self.op.vg_name is not None:
1639 new_volume = self.op.vg_name
1642 if new_volume != self.cfg.GetVGName():
1643 self.cfg.SetVGName(new_volume)
1645 feedback_fn("Cluster LVM configuration already in desired"
1646 " state, not changing")
1647 if self.op.hvparams:
1648 self.cluster.hvparams = self.new_hvparams
1649 if self.op.enabled_hypervisors is not None:
1650 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1651 if self.op.beparams:
1652 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1653 if self.op.candidate_pool_size is not None:
1654 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1655 # we need to update the pool size here, otherwise the save will fail
1656 _AdjustCandidatePool(self)
1658 self.cfg.Update(self.cluster)
1661 class LURedistributeConfig(NoHooksLU):
1662 """Force the redistribution of cluster configuration.
1664 This is a very simple LU.
1670 def ExpandNames(self):
1671 self.needed_locks = {
1672 locking.LEVEL_NODE: locking.ALL_SET,
1674 self.share_locks[locking.LEVEL_NODE] = 1
1676 def CheckPrereq(self):
1677 """Check prerequisites.
1681 def Exec(self, feedback_fn):
1682 """Redistribute the configuration.
1685 self.cfg.Update(self.cfg.GetClusterInfo())
1688 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1689 """Sleep and poll for an instance's disk to sync.
1692 if not instance.disks:
1696 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1698 node = instance.primary_node
1700 for dev in instance.disks:
1701 lu.cfg.SetDiskID(dev, node)
1704 degr_retries = 10 # in seconds, as we sleep 1 second each time
1708 cumul_degraded = False
1709 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1710 if rstats.failed or not rstats.data:
1711 lu.LogWarning("Can't get any data from node %s", node)
1714 raise errors.RemoteError("Can't contact node %s for mirror data,"
1715 " aborting." % node)
1718 rstats = rstats.data
1720 for i, mstat in enumerate(rstats):
1722 lu.LogWarning("Can't compute data for node %s/%s",
1723 node, instance.disks[i].iv_name)
1725 # we ignore the ldisk parameter
1726 perc_done, est_time, is_degraded, _ = mstat
1727 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1728 if perc_done is not None:
1730 if est_time is not None:
1731 rem_time = "%d estimated seconds remaining" % est_time
1734 rem_time = "no time estimate"
1735 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1736 (instance.disks[i].iv_name, perc_done, rem_time))
1738 # if we're done but degraded, let's do a few small retries, to
1739 # make sure we see a stable and not transient situation; therefore
1740 # we force restart of the loop
1741 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1742 logging.info("Degraded disks found, %d retries left", degr_retries)
1750 time.sleep(min(60, max_time))
1753 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1754 return not cumul_degraded
1757 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1758 """Check that mirrors are not degraded.
1760 The ldisk parameter, if True, will change the test from the
1761 is_degraded attribute (which represents overall non-ok status for
1762 the device(s)) to the ldisk (representing the local storage status).
1765 lu.cfg.SetDiskID(dev, node)
1772 if on_primary or dev.AssembleOnSecondary():
1773 rstats = lu.rpc.call_blockdev_find(node, dev)
1774 msg = rstats.RemoteFailMsg()
1776 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1778 elif not rstats.payload:
1779 lu.LogWarning("Can't find disk on node %s", node)
1782 result = result and (not rstats.payload[idx])
1784 for child in dev.children:
1785 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1790 class LUDiagnoseOS(NoHooksLU):
1791 """Logical unit for OS diagnose/query.
1794 _OP_REQP = ["output_fields", "names"]
1796 _FIELDS_STATIC = utils.FieldSet()
1797 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1799 def ExpandNames(self):
1801 raise errors.OpPrereqError("Selective OS query not supported")
1803 _CheckOutputFields(static=self._FIELDS_STATIC,
1804 dynamic=self._FIELDS_DYNAMIC,
1805 selected=self.op.output_fields)
1807 # Lock all nodes, in shared mode
1808 # Temporary removal of locks, should be reverted later
1809 # TODO: reintroduce locks when they are lighter-weight
1810 self.needed_locks = {}
1811 #self.share_locks[locking.LEVEL_NODE] = 1
1812 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1814 def CheckPrereq(self):
1815 """Check prerequisites.
1820 def _DiagnoseByOS(node_list, rlist):
1821 """Remaps a per-node return list into an a per-os per-node dictionary
1823 @param node_list: a list with the names of all nodes
1824 @param rlist: a map with node names as keys and OS objects as values
1827 @return: a dictionary with osnames as keys and as value another map, with
1828 nodes as keys and list of OS objects as values, eg::
1830 {"debian-etch": {"node1": [<object>,...],
1831 "node2": [<object>,]}
1836 # we build here the list of nodes that didn't fail the RPC (at RPC
1837 # level), so that nodes with a non-responding node daemon don't
1838 # make all OSes invalid
1839 good_nodes = [node_name for node_name in rlist
1840 if not rlist[node_name].failed]
1841 for node_name, nr in rlist.iteritems():
1842 if nr.failed or not nr.data:
1844 for os_obj in nr.data:
1845 if os_obj.name not in all_os:
1846 # build a list of nodes for this os containing empty lists
1847 # for each node in node_list
1848 all_os[os_obj.name] = {}
1849 for nname in good_nodes:
1850 all_os[os_obj.name][nname] = []
1851 all_os[os_obj.name][node_name].append(os_obj)
1854 def Exec(self, feedback_fn):
1855 """Compute the list of OSes.
1858 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1859 node_data = self.rpc.call_os_diagnose(valid_nodes)
1860 if node_data == False:
1861 raise errors.OpExecError("Can't gather the list of OSes")
1862 pol = self._DiagnoseByOS(valid_nodes, node_data)
1864 for os_name, os_data in pol.iteritems():
1866 for field in self.op.output_fields:
1869 elif field == "valid":
1870 val = utils.all([osl and osl[0] for osl in os_data.values()])
1871 elif field == "node_status":
1873 for node_name, nos_list in os_data.iteritems():
1874 val[node_name] = [(v.status, v.path) for v in nos_list]
1876 raise errors.ParameterError(field)
1883 class LURemoveNode(LogicalUnit):
1884 """Logical unit for removing a node.
1887 HPATH = "node-remove"
1888 HTYPE = constants.HTYPE_NODE
1889 _OP_REQP = ["node_name"]
1891 def BuildHooksEnv(self):
1894 This doesn't run on the target node in the pre phase as a failed
1895 node would then be impossible to remove.
1899 "OP_TARGET": self.op.node_name,
1900 "NODE_NAME": self.op.node_name,
1902 all_nodes = self.cfg.GetNodeList()
1903 all_nodes.remove(self.op.node_name)
1904 return env, all_nodes, all_nodes
1906 def CheckPrereq(self):
1907 """Check prerequisites.
1910 - the node exists in the configuration
1911 - it does not have primary or secondary instances
1912 - it's not the master
1914 Any errors are signalled by raising errors.OpPrereqError.
1917 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1919 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1921 instance_list = self.cfg.GetInstanceList()
1923 masternode = self.cfg.GetMasterNode()
1924 if node.name == masternode:
1925 raise errors.OpPrereqError("Node is the master node,"
1926 " you need to failover first.")
1928 for instance_name in instance_list:
1929 instance = self.cfg.GetInstanceInfo(instance_name)
1930 if node.name in instance.all_nodes:
1931 raise errors.OpPrereqError("Instance %s is still running on the node,"
1932 " please remove first." % instance_name)
1933 self.op.node_name = node.name
1936 def Exec(self, feedback_fn):
1937 """Removes the node from the cluster.
1941 logging.info("Stopping the node daemon and removing configs from node %s",
1944 self.context.RemoveNode(node.name)
1946 self.rpc.call_node_leave_cluster(node.name)
1948 # Promote nodes to master candidate as needed
1949 _AdjustCandidatePool(self)
1952 class LUQueryNodes(NoHooksLU):
1953 """Logical unit for querying nodes.
1956 _OP_REQP = ["output_fields", "names", "use_locking"]
1958 _FIELDS_DYNAMIC = utils.FieldSet(
1960 "mtotal", "mnode", "mfree",
1962 "ctotal", "cnodes", "csockets",
1965 _FIELDS_STATIC = utils.FieldSet(
1966 "name", "pinst_cnt", "sinst_cnt",
1967 "pinst_list", "sinst_list",
1968 "pip", "sip", "tags",
1977 def ExpandNames(self):
1978 _CheckOutputFields(static=self._FIELDS_STATIC,
1979 dynamic=self._FIELDS_DYNAMIC,
1980 selected=self.op.output_fields)
1982 self.needed_locks = {}
1983 self.share_locks[locking.LEVEL_NODE] = 1
1986 self.wanted = _GetWantedNodes(self, self.op.names)
1988 self.wanted = locking.ALL_SET
1990 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1991 self.do_locking = self.do_node_query and self.op.use_locking
1993 # if we don't request only static fields, we need to lock the nodes
1994 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1997 def CheckPrereq(self):
1998 """Check prerequisites.
2001 # The validation of the node list is done in the _GetWantedNodes,
2002 # if non empty, and if empty, there's no validation to do
2005 def Exec(self, feedback_fn):
2006 """Computes the list of nodes and their attributes.
2009 all_info = self.cfg.GetAllNodesInfo()
2011 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2012 elif self.wanted != locking.ALL_SET:
2013 nodenames = self.wanted
2014 missing = set(nodenames).difference(all_info.keys())
2016 raise errors.OpExecError(
2017 "Some nodes were removed before retrieving their data: %s" % missing)
2019 nodenames = all_info.keys()
2021 nodenames = utils.NiceSort(nodenames)
2022 nodelist = [all_info[name] for name in nodenames]
2024 # begin data gathering
2026 if self.do_node_query:
2028 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2029 self.cfg.GetHypervisorType())
2030 for name in nodenames:
2031 nodeinfo = node_data[name]
2032 if not nodeinfo.failed and nodeinfo.data:
2033 nodeinfo = nodeinfo.data
2034 fn = utils.TryConvert
2036 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2037 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2038 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2039 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2040 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2041 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2042 "bootid": nodeinfo.get('bootid', None),
2043 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2044 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2047 live_data[name] = {}
2049 live_data = dict.fromkeys(nodenames, {})
2051 node_to_primary = dict([(name, set()) for name in nodenames])
2052 node_to_secondary = dict([(name, set()) for name in nodenames])
2054 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2055 "sinst_cnt", "sinst_list"))
2056 if inst_fields & frozenset(self.op.output_fields):
2057 instancelist = self.cfg.GetInstanceList()
2059 for instance_name in instancelist:
2060 inst = self.cfg.GetInstanceInfo(instance_name)
2061 if inst.primary_node in node_to_primary:
2062 node_to_primary[inst.primary_node].add(inst.name)
2063 for secnode in inst.secondary_nodes:
2064 if secnode in node_to_secondary:
2065 node_to_secondary[secnode].add(inst.name)
2067 master_node = self.cfg.GetMasterNode()
2069 # end data gathering
2072 for node in nodelist:
2074 for field in self.op.output_fields:
2077 elif field == "pinst_list":
2078 val = list(node_to_primary[node.name])
2079 elif field == "sinst_list":
2080 val = list(node_to_secondary[node.name])
2081 elif field == "pinst_cnt":
2082 val = len(node_to_primary[node.name])
2083 elif field == "sinst_cnt":
2084 val = len(node_to_secondary[node.name])
2085 elif field == "pip":
2086 val = node.primary_ip
2087 elif field == "sip":
2088 val = node.secondary_ip
2089 elif field == "tags":
2090 val = list(node.GetTags())
2091 elif field == "serial_no":
2092 val = node.serial_no
2093 elif field == "master_candidate":
2094 val = node.master_candidate
2095 elif field == "master":
2096 val = node.name == master_node
2097 elif field == "offline":
2099 elif field == "drained":
2101 elif self._FIELDS_DYNAMIC.Matches(field):
2102 val = live_data[node.name].get(field, None)
2103 elif field == "role":
2104 if node.name == master_node:
2106 elif node.master_candidate:
2115 raise errors.ParameterError(field)
2116 node_output.append(val)
2117 output.append(node_output)
2122 class LUQueryNodeVolumes(NoHooksLU):
2123 """Logical unit for getting volumes on node(s).
2126 _OP_REQP = ["nodes", "output_fields"]
2128 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2129 _FIELDS_STATIC = utils.FieldSet("node")
2131 def ExpandNames(self):
2132 _CheckOutputFields(static=self._FIELDS_STATIC,
2133 dynamic=self._FIELDS_DYNAMIC,
2134 selected=self.op.output_fields)
2136 self.needed_locks = {}
2137 self.share_locks[locking.LEVEL_NODE] = 1
2138 if not self.op.nodes:
2139 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2141 self.needed_locks[locking.LEVEL_NODE] = \
2142 _GetWantedNodes(self, self.op.nodes)
2144 def CheckPrereq(self):
2145 """Check prerequisites.
2147 This checks that the fields required are valid output fields.
2150 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2152 def Exec(self, feedback_fn):
2153 """Computes the list of nodes and their attributes.
2156 nodenames = self.nodes
2157 volumes = self.rpc.call_node_volumes(nodenames)
2159 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2160 in self.cfg.GetInstanceList()]
2162 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2165 for node in nodenames:
2166 if node not in volumes or volumes[node].failed or not volumes[node].data:
2169 node_vols = volumes[node].data[:]
2170 node_vols.sort(key=lambda vol: vol['dev'])
2172 for vol in node_vols:
2174 for field in self.op.output_fields:
2177 elif field == "phys":
2181 elif field == "name":
2183 elif field == "size":
2184 val = int(float(vol['size']))
2185 elif field == "instance":
2187 if node not in lv_by_node[inst]:
2189 if vol['name'] in lv_by_node[inst][node]:
2195 raise errors.ParameterError(field)
2196 node_output.append(str(val))
2198 output.append(node_output)
2203 class LUAddNode(LogicalUnit):
2204 """Logical unit for adding node to the cluster.
2208 HTYPE = constants.HTYPE_NODE
2209 _OP_REQP = ["node_name"]
2211 def BuildHooksEnv(self):
2214 This will run on all nodes before, and on all nodes + the new node after.
2218 "OP_TARGET": self.op.node_name,
2219 "NODE_NAME": self.op.node_name,
2220 "NODE_PIP": self.op.primary_ip,
2221 "NODE_SIP": self.op.secondary_ip,
2223 nodes_0 = self.cfg.GetNodeList()
2224 nodes_1 = nodes_0 + [self.op.node_name, ]
2225 return env, nodes_0, nodes_1
2227 def CheckPrereq(self):
2228 """Check prerequisites.
2231 - the new node is not already in the config
2233 - its parameters (single/dual homed) matches the cluster
2235 Any errors are signalled by raising errors.OpPrereqError.
2238 node_name = self.op.node_name
2241 dns_data = utils.HostInfo(node_name)
2243 node = dns_data.name
2244 primary_ip = self.op.primary_ip = dns_data.ip
2245 secondary_ip = getattr(self.op, "secondary_ip", None)
2246 if secondary_ip is None:
2247 secondary_ip = primary_ip
2248 if not utils.IsValidIP(secondary_ip):
2249 raise errors.OpPrereqError("Invalid secondary IP given")
2250 self.op.secondary_ip = secondary_ip
2252 node_list = cfg.GetNodeList()
2253 if not self.op.readd and node in node_list:
2254 raise errors.OpPrereqError("Node %s is already in the configuration" %
2256 elif self.op.readd and node not in node_list:
2257 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2259 for existing_node_name in node_list:
2260 existing_node = cfg.GetNodeInfo(existing_node_name)
2262 if self.op.readd and node == existing_node_name:
2263 if (existing_node.primary_ip != primary_ip or
2264 existing_node.secondary_ip != secondary_ip):
2265 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2266 " address configuration as before")
2269 if (existing_node.primary_ip == primary_ip or
2270 existing_node.secondary_ip == primary_ip or
2271 existing_node.primary_ip == secondary_ip or
2272 existing_node.secondary_ip == secondary_ip):
2273 raise errors.OpPrereqError("New node ip address(es) conflict with"
2274 " existing node %s" % existing_node.name)
2276 # check that the type of the node (single versus dual homed) is the
2277 # same as for the master
2278 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2279 master_singlehomed = myself.secondary_ip == myself.primary_ip
2280 newbie_singlehomed = secondary_ip == primary_ip
2281 if master_singlehomed != newbie_singlehomed:
2282 if master_singlehomed:
2283 raise errors.OpPrereqError("The master has no private ip but the"
2284 " new node has one")
2286 raise errors.OpPrereqError("The master has a private ip but the"
2287 " new node doesn't have one")
2289 # checks reachablity
2290 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2291 raise errors.OpPrereqError("Node not reachable by ping")
2293 if not newbie_singlehomed:
2294 # check reachability from my secondary ip to newbie's secondary ip
2295 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2296 source=myself.secondary_ip):
2297 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2298 " based ping to noded port")
2300 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2305 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2306 # the new node will increase mc_max with one, so:
2307 mc_max = min(mc_max + 1, cp_size)
2308 self.master_candidate = mc_now < mc_max
2311 self.new_node = self.cfg.GetNodeInfo(node)
2312 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2314 self.new_node = objects.Node(name=node,
2315 primary_ip=primary_ip,
2316 secondary_ip=secondary_ip,
2317 master_candidate=self.master_candidate,
2318 offline=False, drained=False)
2320 def Exec(self, feedback_fn):
2321 """Adds the new node to the cluster.
2324 new_node = self.new_node
2325 node = new_node.name
2327 # for re-adds, reset the offline/drained/master-candidate flags;
2328 # we need to reset here, otherwise offline would prevent RPC calls
2329 # later in the procedure; this also means that if the re-add
2330 # fails, we are left with a non-offlined, broken node
2332 new_node.drained = new_node.offline = False
2333 self.LogInfo("Readding a node, the offline/drained flags were reset")
2334 # if we demote the node, we do cleanup later in the procedure
2335 new_node.master_candidate = self.master_candidate
2337 # notify the user about any possible mc promotion
2338 if new_node.master_candidate:
2339 self.LogInfo("Node will be a master candidate")
2341 # check connectivity
2342 result = self.rpc.call_version([node])[node]
2345 if constants.PROTOCOL_VERSION == result.data:
2346 logging.info("Communication to node %s fine, sw version %s match",
2349 raise errors.OpExecError("Version mismatch master version %s,"
2350 " node version %s" %
2351 (constants.PROTOCOL_VERSION, result.data))
2353 raise errors.OpExecError("Cannot get version from the new node")
2356 logging.info("Copy ssh key to node %s", node)
2357 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2359 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2360 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2366 keyarray.append(f.read())
2370 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2372 keyarray[3], keyarray[4], keyarray[5])
2374 msg = result.RemoteFailMsg()
2376 raise errors.OpExecError("Cannot transfer ssh keys to the"
2377 " new node: %s" % msg)
2379 # Add node to our /etc/hosts, and add key to known_hosts
2380 utils.AddHostToEtcHosts(new_node.name)
2382 if new_node.secondary_ip != new_node.primary_ip:
2383 result = self.rpc.call_node_has_ip_address(new_node.name,
2384 new_node.secondary_ip)
2385 if result.failed or not result.data:
2386 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2387 " you gave (%s). Please fix and re-run this"
2388 " command." % new_node.secondary_ip)
2390 node_verify_list = [self.cfg.GetMasterNode()]
2391 node_verify_param = {
2393 # TODO: do a node-net-test as well?
2396 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2397 self.cfg.GetClusterName())
2398 for verifier in node_verify_list:
2399 if result[verifier].failed or not result[verifier].data:
2400 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2401 " for remote verification" % verifier)
2402 if result[verifier].data['nodelist']:
2403 for failed in result[verifier].data['nodelist']:
2404 feedback_fn("ssh/hostname verification failed %s -> %s" %
2405 (verifier, result[verifier].data['nodelist'][failed]))
2406 raise errors.OpExecError("ssh/hostname verification failed.")
2408 # Distribute updated /etc/hosts and known_hosts to all nodes,
2409 # including the node just added
2410 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2411 dist_nodes = self.cfg.GetNodeList()
2412 if not self.op.readd:
2413 dist_nodes.append(node)
2414 if myself.name in dist_nodes:
2415 dist_nodes.remove(myself.name)
2417 logging.debug("Copying hosts and known_hosts to all nodes")
2418 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2419 result = self.rpc.call_upload_file(dist_nodes, fname)
2420 for to_node, to_result in result.iteritems():
2421 if to_result.failed or not to_result.data:
2422 logging.error("Copy of file %s to node %s failed", fname, to_node)
2425 enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2426 if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2427 to_copy.append(constants.VNC_PASSWORD_FILE)
2429 for fname in to_copy:
2430 result = self.rpc.call_upload_file([node], fname)
2431 if result[node].failed or not result[node]:
2432 logging.error("Could not copy file %s to node %s", fname, node)
2435 self.context.ReaddNode(new_node)
2436 # make sure we redistribute the config
2437 self.cfg.Update(new_node)
2438 # and make sure the new node will not have old files around
2439 if not new_node.master_candidate:
2440 result = self.rpc.call_node_demote_from_mc(new_node.name)
2441 msg = result.RemoteFailMsg()
2443 self.LogWarning("Node failed to demote itself from master"
2444 " candidate status: %s" % msg)
2446 self.context.AddNode(new_node)
2449 class LUSetNodeParams(LogicalUnit):
2450 """Modifies the parameters of a node.
2453 HPATH = "node-modify"
2454 HTYPE = constants.HTYPE_NODE
2455 _OP_REQP = ["node_name"]
2458 def CheckArguments(self):
2459 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2460 if node_name is None:
2461 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2462 self.op.node_name = node_name
2463 _CheckBooleanOpField(self.op, 'master_candidate')
2464 _CheckBooleanOpField(self.op, 'offline')
2465 _CheckBooleanOpField(self.op, 'drained')
2466 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2467 if all_mods.count(None) == 3:
2468 raise errors.OpPrereqError("Please pass at least one modification")
2469 if all_mods.count(True) > 1:
2470 raise errors.OpPrereqError("Can't set the node into more than one"
2471 " state at the same time")
2473 def ExpandNames(self):
2474 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2476 def BuildHooksEnv(self):
2479 This runs on the master node.
2483 "OP_TARGET": self.op.node_name,
2484 "MASTER_CANDIDATE": str(self.op.master_candidate),
2485 "OFFLINE": str(self.op.offline),
2486 "DRAINED": str(self.op.drained),
2488 nl = [self.cfg.GetMasterNode(),
2492 def CheckPrereq(self):
2493 """Check prerequisites.
2495 This only checks the instance list against the existing names.
2498 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2500 if ((self.op.master_candidate == False or self.op.offline == True or
2501 self.op.drained == True) and node.master_candidate):
2502 # we will demote the node from master_candidate
2503 if self.op.node_name == self.cfg.GetMasterNode():
2504 raise errors.OpPrereqError("The master node has to be a"
2505 " master candidate, online and not drained")
2506 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2507 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2508 if num_candidates <= cp_size:
2509 msg = ("Not enough master candidates (desired"
2510 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2512 self.LogWarning(msg)
2514 raise errors.OpPrereqError(msg)
2516 if (self.op.master_candidate == True and
2517 ((node.offline and not self.op.offline == False) or
2518 (node.drained and not self.op.drained == False))):
2519 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2520 " to master_candidate" % node.name)
2524 def Exec(self, feedback_fn):
2533 if self.op.offline is not None:
2534 node.offline = self.op.offline
2535 result.append(("offline", str(self.op.offline)))
2536 if self.op.offline == True:
2537 if node.master_candidate:
2538 node.master_candidate = False
2540 result.append(("master_candidate", "auto-demotion due to offline"))
2542 node.drained = False
2543 result.append(("drained", "clear drained status due to offline"))
2545 if self.op.master_candidate is not None:
2546 node.master_candidate = self.op.master_candidate
2548 result.append(("master_candidate", str(self.op.master_candidate)))
2549 if self.op.master_candidate == False:
2550 rrc = self.rpc.call_node_demote_from_mc(node.name)
2551 msg = rrc.RemoteFailMsg()
2553 self.LogWarning("Node failed to demote itself: %s" % msg)
2555 if self.op.drained is not None:
2556 node.drained = self.op.drained
2557 result.append(("drained", str(self.op.drained)))
2558 if self.op.drained == True:
2559 if node.master_candidate:
2560 node.master_candidate = False
2562 result.append(("master_candidate", "auto-demotion due to drain"))
2563 rrc = self.rpc.call_node_demote_from_mc(node.name)
2564 msg = rrc.RemoteFailMsg()
2566 self.LogWarning("Node failed to demote itself: %s" % msg)
2568 node.offline = False
2569 result.append(("offline", "clear offline status due to drain"))
2571 # this will trigger configuration file update, if needed
2572 self.cfg.Update(node)
2573 # this will trigger job queue propagation or cleanup
2575 self.context.ReaddNode(node)
2580 class LUQueryClusterInfo(NoHooksLU):
2581 """Query cluster configuration.
2587 def ExpandNames(self):
2588 self.needed_locks = {}
2590 def CheckPrereq(self):
2591 """No prerequsites needed for this LU.
2596 def Exec(self, feedback_fn):
2597 """Return cluster config.
2600 cluster = self.cfg.GetClusterInfo()
2602 "software_version": constants.RELEASE_VERSION,
2603 "protocol_version": constants.PROTOCOL_VERSION,
2604 "config_version": constants.CONFIG_VERSION,
2605 "os_api_version": constants.OS_API_VERSION,
2606 "export_version": constants.EXPORT_VERSION,
2607 "architecture": (platform.architecture()[0], platform.machine()),
2608 "name": cluster.cluster_name,
2609 "master": cluster.master_node,
2610 "default_hypervisor": cluster.default_hypervisor,
2611 "enabled_hypervisors": cluster.enabled_hypervisors,
2612 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2613 for hypervisor in cluster.enabled_hypervisors]),
2614 "beparams": cluster.beparams,
2615 "candidate_pool_size": cluster.candidate_pool_size,
2616 "default_bridge": cluster.default_bridge,
2617 "master_netdev": cluster.master_netdev,
2618 "volume_group_name": cluster.volume_group_name,
2619 "file_storage_dir": cluster.file_storage_dir,
2625 class LUQueryConfigValues(NoHooksLU):
2626 """Return configuration values.
2631 _FIELDS_DYNAMIC = utils.FieldSet()
2632 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2634 def ExpandNames(self):
2635 self.needed_locks = {}
2637 _CheckOutputFields(static=self._FIELDS_STATIC,
2638 dynamic=self._FIELDS_DYNAMIC,
2639 selected=self.op.output_fields)
2641 def CheckPrereq(self):
2642 """No prerequisites.
2647 def Exec(self, feedback_fn):
2648 """Dump a representation of the cluster config to the standard output.
2652 for field in self.op.output_fields:
2653 if field == "cluster_name":
2654 entry = self.cfg.GetClusterName()
2655 elif field == "master_node":
2656 entry = self.cfg.GetMasterNode()
2657 elif field == "drain_flag":
2658 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2660 raise errors.ParameterError(field)
2661 values.append(entry)
2665 class LUActivateInstanceDisks(NoHooksLU):
2666 """Bring up an instance's disks.
2669 _OP_REQP = ["instance_name"]
2672 def ExpandNames(self):
2673 self._ExpandAndLockInstance()
2674 self.needed_locks[locking.LEVEL_NODE] = []
2675 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2677 def DeclareLocks(self, level):
2678 if level == locking.LEVEL_NODE:
2679 self._LockInstancesNodes()
2681 def CheckPrereq(self):
2682 """Check prerequisites.
2684 This checks that the instance is in the cluster.
2687 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2688 assert self.instance is not None, \
2689 "Cannot retrieve locked instance %s" % self.op.instance_name
2690 _CheckNodeOnline(self, self.instance.primary_node)
2691 if not hasattr(self.op, "ignore_size"):
2692 self.op.ignore_size = False
2694 def Exec(self, feedback_fn):
2695 """Activate the disks.
2698 disks_ok, disks_info = \
2699 _AssembleInstanceDisks(self, self.instance,
2700 ignore_size=self.op.ignore_size)
2702 raise errors.OpExecError("Cannot activate block devices")
2707 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
2709 """Prepare the block devices for an instance.
2711 This sets up the block devices on all nodes.
2713 @type lu: L{LogicalUnit}
2714 @param lu: the logical unit on whose behalf we execute
2715 @type instance: L{objects.Instance}
2716 @param instance: the instance for whose disks we assemble
2717 @type ignore_secondaries: boolean
2718 @param ignore_secondaries: if true, errors on secondary nodes
2719 won't result in an error return from the function
2720 @type ignore_size: boolean
2721 @param ignore_size: if true, the current known size of the disk
2722 will not be used during the disk activation, useful for cases
2723 when the size is wrong
2724 @return: False if the operation failed, otherwise a list of
2725 (host, instance_visible_name, node_visible_name)
2726 with the mapping from node devices to instance devices
2731 iname = instance.name
2732 # With the two passes mechanism we try to reduce the window of
2733 # opportunity for the race condition of switching DRBD to primary
2734 # before handshaking occured, but we do not eliminate it
2736 # The proper fix would be to wait (with some limits) until the
2737 # connection has been made and drbd transitions from WFConnection
2738 # into any other network-connected state (Connected, SyncTarget,
2741 # 1st pass, assemble on all nodes in secondary mode
2742 for inst_disk in instance.disks:
2743 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2745 node_disk = node_disk.Copy()
2746 node_disk.UnsetSize()
2747 lu.cfg.SetDiskID(node_disk, node)
2748 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2749 msg = result.RemoteFailMsg()
2751 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2752 " (is_primary=False, pass=1): %s",
2753 inst_disk.iv_name, node, msg)
2754 if not ignore_secondaries:
2757 # FIXME: race condition on drbd migration to primary
2759 # 2nd pass, do only the primary node
2760 for inst_disk in instance.disks:
2761 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2762 if node != instance.primary_node:
2765 node_disk = node_disk.Copy()
2766 node_disk.UnsetSize()
2767 lu.cfg.SetDiskID(node_disk, node)
2768 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2769 msg = result.RemoteFailMsg()
2771 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2772 " (is_primary=True, pass=2): %s",
2773 inst_disk.iv_name, node, msg)
2775 device_info.append((instance.primary_node, inst_disk.iv_name,
2778 # leave the disks configured for the primary node
2779 # this is a workaround that would be fixed better by
2780 # improving the logical/physical id handling
2781 for disk in instance.disks:
2782 lu.cfg.SetDiskID(disk, instance.primary_node)
2784 return disks_ok, device_info
2787 def _StartInstanceDisks(lu, instance, force):
2788 """Start the disks of an instance.
2791 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2792 ignore_secondaries=force)
2794 _ShutdownInstanceDisks(lu, instance)
2795 if force is not None and not force:
2796 lu.proc.LogWarning("", hint="If the message above refers to a"
2798 " you can retry the operation using '--force'.")
2799 raise errors.OpExecError("Disk consistency error")
2802 class LUDeactivateInstanceDisks(NoHooksLU):
2803 """Shutdown an instance's disks.
2806 _OP_REQP = ["instance_name"]
2809 def ExpandNames(self):
2810 self._ExpandAndLockInstance()
2811 self.needed_locks[locking.LEVEL_NODE] = []
2812 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2814 def DeclareLocks(self, level):
2815 if level == locking.LEVEL_NODE:
2816 self._LockInstancesNodes()
2818 def CheckPrereq(self):
2819 """Check prerequisites.
2821 This checks that the instance is in the cluster.
2824 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2825 assert self.instance is not None, \
2826 "Cannot retrieve locked instance %s" % self.op.instance_name
2828 def Exec(self, feedback_fn):
2829 """Deactivate the disks
2832 instance = self.instance
2833 _SafeShutdownInstanceDisks(self, instance)
2836 def _SafeShutdownInstanceDisks(lu, instance):
2837 """Shutdown block devices of an instance.
2839 This function checks if an instance is running, before calling
2840 _ShutdownInstanceDisks.
2843 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2844 [instance.hypervisor])
2845 ins_l = ins_l[instance.primary_node]
2846 if ins_l.failed or not isinstance(ins_l.data, list):
2847 raise errors.OpExecError("Can't contact node '%s'" %
2848 instance.primary_node)
2850 if instance.name in ins_l.data:
2851 raise errors.OpExecError("Instance is running, can't shutdown"
2854 _ShutdownInstanceDisks(lu, instance)
2857 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2858 """Shutdown block devices of an instance.
2860 This does the shutdown on all nodes of the instance.
2862 If the ignore_primary is false, errors on the primary node are
2867 for disk in instance.disks:
2868 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2869 lu.cfg.SetDiskID(top_disk, node)
2870 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2871 msg = result.RemoteFailMsg()
2873 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2874 disk.iv_name, node, msg)
2875 if not ignore_primary or node != instance.primary_node:
2880 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2881 """Checks if a node has enough free memory.
2883 This function check if a given node has the needed amount of free
2884 memory. In case the node has less memory or we cannot get the
2885 information from the node, this function raise an OpPrereqError
2888 @type lu: C{LogicalUnit}
2889 @param lu: a logical unit from which we get configuration data
2891 @param node: the node to check
2892 @type reason: C{str}
2893 @param reason: string to use in the error message
2894 @type requested: C{int}
2895 @param requested: the amount of memory in MiB to check for
2896 @type hypervisor_name: C{str}
2897 @param hypervisor_name: the hypervisor to ask for memory stats
2898 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2899 we cannot check the node
2902 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2903 nodeinfo[node].Raise()
2904 free_mem = nodeinfo[node].data.get('memory_free')
2905 if not isinstance(free_mem, int):
2906 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2907 " was '%s'" % (node, free_mem))
2908 if requested > free_mem:
2909 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2910 " needed %s MiB, available %s MiB" %
2911 (node, reason, requested, free_mem))
2914 class LUStartupInstance(LogicalUnit):
2915 """Starts an instance.
2918 HPATH = "instance-start"
2919 HTYPE = constants.HTYPE_INSTANCE
2920 _OP_REQP = ["instance_name", "force"]
2923 def ExpandNames(self):
2924 self._ExpandAndLockInstance()
2926 def BuildHooksEnv(self):
2929 This runs on master, primary and secondary nodes of the instance.
2933 "FORCE": self.op.force,
2935 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2936 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2939 def CheckPrereq(self):
2940 """Check prerequisites.
2942 This checks that the instance is in the cluster.
2945 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2946 assert self.instance is not None, \
2947 "Cannot retrieve locked instance %s" % self.op.instance_name
2950 self.beparams = getattr(self.op, "beparams", {})
2952 if not isinstance(self.beparams, dict):
2953 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2954 " dict" % (type(self.beparams), ))
2955 # fill the beparams dict
2956 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2957 self.op.beparams = self.beparams
2960 self.hvparams = getattr(self.op, "hvparams", {})
2962 if not isinstance(self.hvparams, dict):
2963 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2964 " dict" % (type(self.hvparams), ))
2966 # check hypervisor parameter syntax (locally)
2967 cluster = self.cfg.GetClusterInfo()
2968 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2969 filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2971 filled_hvp.update(self.hvparams)
2972 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2973 hv_type.CheckParameterSyntax(filled_hvp)
2974 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2975 self.op.hvparams = self.hvparams
2977 _CheckNodeOnline(self, instance.primary_node)
2979 bep = self.cfg.GetClusterInfo().FillBE(instance)
2980 # check bridges existance
2981 _CheckInstanceBridgesExist(self, instance)
2983 remote_info = self.rpc.call_instance_info(instance.primary_node,
2985 instance.hypervisor)
2987 if not remote_info.data:
2988 _CheckNodeFreeMemory(self, instance.primary_node,
2989 "starting instance %s" % instance.name,
2990 bep[constants.BE_MEMORY], instance.hypervisor)
2992 def Exec(self, feedback_fn):
2993 """Start the instance.
2996 instance = self.instance
2997 force = self.op.force
2999 self.cfg.MarkInstanceUp(instance.name)
3001 node_current = instance.primary_node
3003 _StartInstanceDisks(self, instance, force)
3005 result = self.rpc.call_instance_start(node_current, instance,
3006 self.hvparams, self.beparams)
3007 msg = result.RemoteFailMsg()
3009 _ShutdownInstanceDisks(self, instance)
3010 raise errors.OpExecError("Could not start instance: %s" % msg)
3013 class LURebootInstance(LogicalUnit):
3014 """Reboot an instance.
3017 HPATH = "instance-reboot"
3018 HTYPE = constants.HTYPE_INSTANCE
3019 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3022 def ExpandNames(self):
3023 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3024 constants.INSTANCE_REBOOT_HARD,
3025 constants.INSTANCE_REBOOT_FULL]:
3026 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3027 (constants.INSTANCE_REBOOT_SOFT,
3028 constants.INSTANCE_REBOOT_HARD,
3029 constants.INSTANCE_REBOOT_FULL))
3030 self._ExpandAndLockInstance()
3032 def BuildHooksEnv(self):
3035 This runs on master, primary and secondary nodes of the instance.
3039 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3040 "REBOOT_TYPE": self.op.reboot_type,
3042 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3043 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3046 def CheckPrereq(self):
3047 """Check prerequisites.
3049 This checks that the instance is in the cluster.
3052 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3053 assert self.instance is not None, \
3054 "Cannot retrieve locked instance %s" % self.op.instance_name
3056 _CheckNodeOnline(self, instance.primary_node)
3058 # check bridges existance
3059 _CheckInstanceBridgesExist(self, instance)
3061 def Exec(self, feedback_fn):
3062 """Reboot the instance.
3065 instance = self.instance
3066 ignore_secondaries = self.op.ignore_secondaries
3067 reboot_type = self.op.reboot_type
3069 node_current = instance.primary_node
3071 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3072 constants.INSTANCE_REBOOT_HARD]:
3073 for disk in instance.disks:
3074 self.cfg.SetDiskID(disk, node_current)
3075 result = self.rpc.call_instance_reboot(node_current, instance,
3077 msg = result.RemoteFailMsg()
3079 raise errors.OpExecError("Could not reboot instance: %s" % msg)
3081 result = self.rpc.call_instance_shutdown(node_current, instance)
3082 msg = result.RemoteFailMsg()
3084 raise errors.OpExecError("Could not shutdown instance for"
3085 " full reboot: %s" % msg)
3086 _ShutdownInstanceDisks(self, instance)
3087 _StartInstanceDisks(self, instance, ignore_secondaries)
3088 result = self.rpc.call_instance_start(node_current, instance, None, None)
3089 msg = result.RemoteFailMsg()
3091 _ShutdownInstanceDisks(self, instance)
3092 raise errors.OpExecError("Could not start instance for"
3093 " full reboot: %s" % msg)
3095 self.cfg.MarkInstanceUp(instance.name)
3098 class LUShutdownInstance(LogicalUnit):
3099 """Shutdown an instance.
3102 HPATH = "instance-stop"
3103 HTYPE = constants.HTYPE_INSTANCE
3104 _OP_REQP = ["instance_name"]
3107 def ExpandNames(self):
3108 self._ExpandAndLockInstance()
3110 def BuildHooksEnv(self):
3113 This runs on master, primary and secondary nodes of the instance.
3116 env = _BuildInstanceHookEnvByObject(self, self.instance)
3117 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3120 def CheckPrereq(self):
3121 """Check prerequisites.
3123 This checks that the instance is in the cluster.
3126 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3127 assert self.instance is not None, \
3128 "Cannot retrieve locked instance %s" % self.op.instance_name
3129 _CheckNodeOnline(self, self.instance.primary_node)
3131 def Exec(self, feedback_fn):
3132 """Shutdown the instance.
3135 instance = self.instance
3136 node_current = instance.primary_node
3137 self.cfg.MarkInstanceDown(instance.name)
3138 result = self.rpc.call_instance_shutdown(node_current, instance)
3139 msg = result.RemoteFailMsg()
3141 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3143 _ShutdownInstanceDisks(self, instance)
3146 class LUReinstallInstance(LogicalUnit):
3147 """Reinstall an instance.
3150 HPATH = "instance-reinstall"
3151 HTYPE = constants.HTYPE_INSTANCE
3152 _OP_REQP = ["instance_name"]
3155 def ExpandNames(self):
3156 self._ExpandAndLockInstance()
3158 def BuildHooksEnv(self):
3161 This runs on master, primary and secondary nodes of the instance.
3164 env = _BuildInstanceHookEnvByObject(self, self.instance)
3165 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3168 def CheckPrereq(self):
3169 """Check prerequisites.
3171 This checks that the instance is in the cluster and is not running.
3174 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3175 assert instance is not None, \
3176 "Cannot retrieve locked instance %s" % self.op.instance_name
3177 _CheckNodeOnline(self, instance.primary_node)
3179 if instance.disk_template == constants.DT_DISKLESS:
3180 raise errors.OpPrereqError("Instance '%s' has no disks" %
3181 self.op.instance_name)
3182 if instance.admin_up:
3183 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3184 self.op.instance_name)
3185 remote_info = self.rpc.call_instance_info(instance.primary_node,
3187 instance.hypervisor)
3189 if remote_info.data:
3190 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3191 (self.op.instance_name,
3192 instance.primary_node))
3194 self.op.os_type = getattr(self.op, "os_type", None)
3195 if self.op.os_type is not None:
3197 pnode = self.cfg.GetNodeInfo(
3198 self.cfg.ExpandNodeName(instance.primary_node))
3200 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3202 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3204 if not isinstance(result.data, objects.OS):
3205 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3206 " primary node" % self.op.os_type)
3208 self.instance = instance
3210 def Exec(self, feedback_fn):
3211 """Reinstall the instance.
3214 inst = self.instance
3216 if self.op.os_type is not None:
3217 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3218 inst.os = self.op.os_type
3219 self.cfg.Update(inst)
3221 _StartInstanceDisks(self, inst, None)
3223 feedback_fn("Running the instance OS create scripts...")
3224 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3225 msg = result.RemoteFailMsg()
3227 raise errors.OpExecError("Could not install OS for instance %s"
3229 (inst.name, inst.primary_node, msg))
3231 _ShutdownInstanceDisks(self, inst)
3234 class LURenameInstance(LogicalUnit):
3235 """Rename an instance.
3238 HPATH = "instance-rename"
3239 HTYPE = constants.HTYPE_INSTANCE
3240 _OP_REQP = ["instance_name", "new_name"]
3242 def BuildHooksEnv(self):
3245 This runs on master, primary and secondary nodes of the instance.
3248 env = _BuildInstanceHookEnvByObject(self, self.instance)
3249 env["INSTANCE_NEW_NAME"] = self.op.new_name
3250 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3253 def CheckPrereq(self):
3254 """Check prerequisites.
3256 This checks that the instance is in the cluster and is not running.
3259 instance = self.cfg.GetInstanceInfo(
3260 self.cfg.ExpandInstanceName(self.op.instance_name))
3261 if instance is None:
3262 raise errors.OpPrereqError("Instance '%s' not known" %
3263 self.op.instance_name)
3264 _CheckNodeOnline(self, instance.primary_node)
3266 if instance.admin_up:
3267 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3268 self.op.instance_name)
3269 remote_info = self.rpc.call_instance_info(instance.primary_node,
3271 instance.hypervisor)
3273 if remote_info.data:
3274 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3275 (self.op.instance_name,
3276 instance.primary_node))
3277 self.instance = instance
3279 # new name verification
3280 name_info = utils.HostInfo(self.op.new_name)
3282 self.op.new_name = new_name = name_info.name
3283 instance_list = self.cfg.GetInstanceList()
3284 if new_name in instance_list:
3285 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3288 if not getattr(self.op, "ignore_ip", False):
3289 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3290 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3291 (name_info.ip, new_name))
3294 def Exec(self, feedback_fn):
3295 """Reinstall the instance.
3298 inst = self.instance
3299 old_name = inst.name
3301 if inst.disk_template == constants.DT_FILE:
3302 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3304 self.cfg.RenameInstance(inst.name, self.op.new_name)
3305 # Change the instance lock. This is definitely safe while we hold the BGL
3306 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3307 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3309 # re-read the instance from the configuration after rename
3310 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3312 if inst.disk_template == constants.DT_FILE:
3313 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3314 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3315 old_file_storage_dir,
3316 new_file_storage_dir)
3319 raise errors.OpExecError("Could not connect to node '%s' to rename"
3320 " directory '%s' to '%s' (but the instance"
3321 " has been renamed in Ganeti)" % (
3322 inst.primary_node, old_file_storage_dir,
3323 new_file_storage_dir))
3325 if not result.data[0]:
3326 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3327 " (but the instance has been renamed in"
3328 " Ganeti)" % (old_file_storage_dir,
3329 new_file_storage_dir))
3331 _StartInstanceDisks(self, inst, None)
3333 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3335 msg = result.RemoteFailMsg()
3337 msg = ("Could not run OS rename script for instance %s on node %s"
3338 " (but the instance has been renamed in Ganeti): %s" %
3339 (inst.name, inst.primary_node, msg))
3340 self.proc.LogWarning(msg)
3342 _ShutdownInstanceDisks(self, inst)
3345 class LURemoveInstance(LogicalUnit):
3346 """Remove an instance.
3349 HPATH = "instance-remove"
3350 HTYPE = constants.HTYPE_INSTANCE
3351 _OP_REQP = ["instance_name", "ignore_failures"]
3354 def ExpandNames(self):
3355 self._ExpandAndLockInstance()
3356 self.needed_locks[locking.LEVEL_NODE] = []
3357 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3359 def DeclareLocks(self, level):
3360 if level == locking.LEVEL_NODE:
3361 self._LockInstancesNodes()
3363 def BuildHooksEnv(self):
3366 This runs on master, primary and secondary nodes of the instance.
3369 env = _BuildInstanceHookEnvByObject(self, self.instance)
3370 nl = [self.cfg.GetMasterNode()]
3373 def CheckPrereq(self):
3374 """Check prerequisites.
3376 This checks that the instance is in the cluster.
3379 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3380 assert self.instance is not None, \
3381 "Cannot retrieve locked instance %s" % self.op.instance_name
3383 def Exec(self, feedback_fn):
3384 """Remove the instance.
3387 instance = self.instance
3388 logging.info("Shutting down instance %s on node %s",
3389 instance.name, instance.primary_node)
3391 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3392 msg = result.RemoteFailMsg()
3394 if self.op.ignore_failures:
3395 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3397 raise errors.OpExecError("Could not shutdown instance %s on"
3399 (instance.name, instance.primary_node, msg))
3401 logging.info("Removing block devices for instance %s", instance.name)
3403 if not _RemoveDisks(self, instance):
3404 if self.op.ignore_failures:
3405 feedback_fn("Warning: can't remove instance's disks")
3407 raise errors.OpExecError("Can't remove instance's disks")
3409 logging.info("Removing instance %s out of cluster config", instance.name)
3411 self.cfg.RemoveInstance(instance.name)
3412 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3415 class LUQueryInstances(NoHooksLU):
3416 """Logical unit for querying instances.
3419 _OP_REQP = ["output_fields", "names", "use_locking"]
3421 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3423 "disk_template", "ip", "mac", "bridge",
3424 "sda_size", "sdb_size", "vcpus", "tags",
3425 "network_port", "beparams",
3426 r"(disk)\.(size)/([0-9]+)",
3427 r"(disk)\.(sizes)", "disk_usage",
3428 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3429 r"(nic)\.(macs|ips|bridges)",
3430 r"(disk|nic)\.(count)",
3431 "serial_no", "hypervisor", "hvparams",] +
3433 for name in constants.HVS_PARAMETERS] +
3435 for name in constants.BES_PARAMETERS])
3436 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3439 def ExpandNames(self):
3440 _CheckOutputFields(static=self._FIELDS_STATIC,
3441 dynamic=self._FIELDS_DYNAMIC,
3442 selected=self.op.output_fields)
3444 self.needed_locks = {}
3445 self.share_locks[locking.LEVEL_INSTANCE] = 1
3446 self.share_locks[locking.LEVEL_NODE] = 1
3449 self.wanted = _GetWantedInstances(self, self.op.names)
3451 self.wanted = locking.ALL_SET
3453 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3454 self.do_locking = self.do_node_query and self.op.use_locking
3456 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3457 self.needed_locks[locking.LEVEL_NODE] = []
3458 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3460 def DeclareLocks(self, level):
3461 if level == locking.LEVEL_NODE and self.do_locking:
3462 self._LockInstancesNodes()
3464 def CheckPrereq(self):
3465 """Check prerequisites.
3470 def Exec(self, feedback_fn):
3471 """Computes the list of nodes and their attributes.
3474 all_info = self.cfg.GetAllInstancesInfo()
3475 if self.wanted == locking.ALL_SET:
3476 # caller didn't specify instance names, so ordering is not important
3478 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3480 instance_names = all_info.keys()
3481 instance_names = utils.NiceSort(instance_names)
3483 # caller did specify names, so we must keep the ordering
3485 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3487 tgt_set = all_info.keys()
3488 missing = set(self.wanted).difference(tgt_set)
3490 raise errors.OpExecError("Some instances were removed before"
3491 " retrieving their data: %s" % missing)
3492 instance_names = self.wanted
3494 instance_list = [all_info[iname] for iname in instance_names]
3496 # begin data gathering
3498 nodes = frozenset([inst.primary_node for inst in instance_list])
3499 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3503 if self.do_node_query:
3505 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3507 result = node_data[name]
3509 # offline nodes will be in both lists
3510 off_nodes.append(name)
3512 bad_nodes.append(name)
3515 live_data.update(result.data)
3516 # else no instance is alive
3518 live_data = dict([(name, {}) for name in instance_names])
3520 # end data gathering
3525 for instance in instance_list:
3527 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3528 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3529 for field in self.op.output_fields:
3530 st_match = self._FIELDS_STATIC.Matches(field)
3535 elif field == "pnode":
3536 val = instance.primary_node
3537 elif field == "snodes":
3538 val = list(instance.secondary_nodes)
3539 elif field == "admin_state":
3540 val = instance.admin_up
3541 elif field == "oper_state":
3542 if instance.primary_node in bad_nodes:
3545 val = bool(live_data.get(instance.name))
3546 elif field == "status":
3547 if instance.primary_node in off_nodes:
3548 val = "ERROR_nodeoffline"
3549 elif instance.primary_node in bad_nodes:
3550 val = "ERROR_nodedown"
3552 running = bool(live_data.get(instance.name))
3554 if instance.admin_up:
3559 if instance.admin_up:
3563 elif field == "oper_ram":
3564 if instance.primary_node in bad_nodes:
3566 elif instance.name in live_data:
3567 val = live_data[instance.name].get("memory", "?")
3570 elif field == "vcpus":
3571 val = i_be[constants.BE_VCPUS]
3572 elif field == "disk_template":
3573 val = instance.disk_template
3576 val = instance.nics[0].ip
3579 elif field == "bridge":
3581 val = instance.nics[0].bridge
3584 elif field == "mac":
3586 val = instance.nics[0].mac
3589 elif field == "sda_size" or field == "sdb_size":
3590 idx = ord(field[2]) - ord('a')
3592 val = instance.FindDisk(idx).size
3593 except errors.OpPrereqError:
3595 elif field == "disk_usage": # total disk usage per node
3596 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3597 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3598 elif field == "tags":
3599 val = list(instance.GetTags())
3600 elif field == "serial_no":
3601 val = instance.serial_no
3602 elif field == "network_port":
3603 val = instance.network_port
3604 elif field == "hypervisor":
3605 val = instance.hypervisor
3606 elif field == "hvparams":
3608 elif (field.startswith(HVPREFIX) and
3609 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3610 val = i_hv.get(field[len(HVPREFIX):], None)
3611 elif field == "beparams":
3613 elif (field.startswith(BEPREFIX) and
3614 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3615 val = i_be.get(field[len(BEPREFIX):], None)
3616 elif st_match and st_match.groups():
3617 # matches a variable list
3618 st_groups = st_match.groups()
3619 if st_groups and st_groups[0] == "disk":
3620 if st_groups[1] == "count":
3621 val = len(instance.disks)
3622 elif st_groups[1] == "sizes":
3623 val = [disk.size for disk in instance.disks]
3624 elif st_groups[1] == "size":
3626 val = instance.FindDisk(st_groups[2]).size
3627 except errors.OpPrereqError:
3630 assert False, "Unhandled disk parameter"
3631 elif st_groups[0] == "nic":
3632 if st_groups[1] == "count":
3633 val = len(instance.nics)
3634 elif st_groups[1] == "macs":
3635 val = [nic.mac for nic in instance.nics]
3636 elif st_groups[1] == "ips":
3637 val = [nic.ip for nic in instance.nics]
3638 elif st_groups[1] == "bridges":
3639 val = [nic.bridge for nic in instance.nics]
3642 nic_idx = int(st_groups[2])
3643 if nic_idx >= len(instance.nics):
3646 if st_groups[1] == "mac":
3647 val = instance.nics[nic_idx].mac
3648 elif st_groups[1] == "ip":
3649 val = instance.nics[nic_idx].ip
3650 elif st_groups[1] == "bridge":
3651 val = instance.nics[nic_idx].bridge
3653 assert False, "Unhandled NIC parameter"
3655 assert False, ("Declared but unhandled variable parameter '%s'" %
3658 assert False, "Declared but unhandled parameter '%s'" % field
3665 class LUFailoverInstance(LogicalUnit):
3666 """Failover an instance.
3669 HPATH = "instance-failover"
3670 HTYPE = constants.HTYPE_INSTANCE
3671 _OP_REQP = ["instance_name", "ignore_consistency"]
3674 def ExpandNames(self):
3675 self._ExpandAndLockInstance()
3676 self.needed_locks[locking.LEVEL_NODE] = []
3677 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3679 def DeclareLocks(self, level):
3680 if level == locking.LEVEL_NODE:
3681 self._LockInstancesNodes()
3683 def BuildHooksEnv(self):
3686 This runs on master, primary and secondary nodes of the instance.
3690 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3692 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3693 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3696 def CheckPrereq(self):
3697 """Check prerequisites.
3699 This checks that the instance is in the cluster.
3702 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3703 assert self.instance is not None, \
3704 "Cannot retrieve locked instance %s" % self.op.instance_name
3706 bep = self.cfg.GetClusterInfo().FillBE(instance)
3707 if instance.disk_template not in constants.DTS_NET_MIRROR:
3708 raise errors.OpPrereqError("Instance's disk layout is not"
3709 " network mirrored, cannot failover.")
3711 secondary_nodes = instance.secondary_nodes
3712 if not secondary_nodes:
3713 raise errors.ProgrammerError("no secondary node but using "
3714 "a mirrored disk template")
3716 target_node = secondary_nodes[0]
3717 _CheckNodeOnline(self, target_node)
3718 _CheckNodeNotDrained(self, target_node)
3720 if instance.admin_up:
3721 # check memory requirements on the secondary node
3722 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3723 instance.name, bep[constants.BE_MEMORY],
3724 instance.hypervisor)
3726 self.LogInfo("Not checking memory on the secondary node as"
3727 " instance will not be started")
3729 # check bridge existance
3730 brlist = [nic.bridge for nic in instance.nics]
3731 result = self.rpc.call_bridges_exist(target_node, brlist)
3734 raise errors.OpPrereqError("One or more target bridges %s does not"
3735 " exist on destination node '%s'" %
3736 (brlist, target_node))
3738 def Exec(self, feedback_fn):
3739 """Failover an instance.
3741 The failover is done by shutting it down on its present node and
3742 starting it on the secondary.
3745 instance = self.instance
3747 source_node = instance.primary_node
3748 target_node = instance.secondary_nodes[0]
3750 feedback_fn("* checking disk consistency between source and target")
3751 for dev in instance.disks:
3752 # for drbd, these are drbd over lvm
3753 if not _CheckDiskConsistency(self, dev, target_node, False):
3754 if instance.admin_up and not self.op.ignore_consistency:
3755 raise errors.OpExecError("Disk %s is degraded on target node,"
3756 " aborting failover." % dev.iv_name)
3758 feedback_fn("* shutting down instance on source node")
3759 logging.info("Shutting down instance %s on node %s",
3760 instance.name, source_node)
3762 result = self.rpc.call_instance_shutdown(source_node, instance)
3763 msg = result.RemoteFailMsg()
3765 if self.op.ignore_consistency:
3766 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3767 " Proceeding anyway. Please make sure node"
3768 " %s is down. Error details: %s",
3769 instance.name, source_node, source_node, msg)
3771 raise errors.OpExecError("Could not shutdown instance %s on"
3773 (instance.name, source_node, msg))
3775 feedback_fn("* deactivating the instance's disks on source node")
3776 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3777 raise errors.OpExecError("Can't shut down the instance's disks.")
3779 instance.primary_node = target_node
3780 # distribute new instance config to the other nodes
3781 self.cfg.Update(instance)
3783 # Only start the instance if it's marked as up
3784 if instance.admin_up:
3785 feedback_fn("* activating the instance's disks on target node")
3786 logging.info("Starting instance %s on node %s",
3787 instance.name, target_node)
3789 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3790 ignore_secondaries=True)
3792 _ShutdownInstanceDisks(self, instance)
3793 raise errors.OpExecError("Can't activate the instance's disks")
3795 feedback_fn("* starting the instance on the target node")
3796 result = self.rpc.call_instance_start(target_node, instance, None, None)
3797 msg = result.RemoteFailMsg()
3799 _ShutdownInstanceDisks(self, instance)
3800 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3801 (instance.name, target_node, msg))
3804 class LUMigrateInstance(LogicalUnit):
3805 """Migrate an instance.
3807 This is migration without shutting down, compared to the failover,
3808 which is done with shutdown.
3811 HPATH = "instance-migrate"
3812 HTYPE = constants.HTYPE_INSTANCE
3813 _OP_REQP = ["instance_name", "live", "cleanup"]
3817 def ExpandNames(self):
3818 self._ExpandAndLockInstance()
3819 self.needed_locks[locking.LEVEL_NODE] = []
3820 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3822 def DeclareLocks(self, level):
3823 if level == locking.LEVEL_NODE:
3824 self._LockInstancesNodes()
3826 def BuildHooksEnv(self):
3829 This runs on master, primary and secondary nodes of the instance.
3832 env = _BuildInstanceHookEnvByObject(self, self.instance)
3833 env["MIGRATE_LIVE"] = self.op.live
3834 env["MIGRATE_CLEANUP"] = self.op.cleanup
3835 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3838 def CheckPrereq(self):
3839 """Check prerequisites.
3841 This checks that the instance is in the cluster.
3844 instance = self.cfg.GetInstanceInfo(
3845 self.cfg.ExpandInstanceName(self.op.instance_name))
3846 if instance is None:
3847 raise errors.OpPrereqError("Instance '%s' not known" %
3848 self.op.instance_name)
3850 if instance.disk_template != constants.DT_DRBD8:
3851 raise errors.OpPrereqError("Instance's disk layout is not"
3852 " drbd8, cannot migrate.")
3854 secondary_nodes = instance.secondary_nodes
3855 if not secondary_nodes:
3856 raise errors.ConfigurationError("No secondary node but using"
3857 " drbd8 disk template")
3859 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3861 target_node = secondary_nodes[0]
3862 # check memory requirements on the secondary node
3863 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3864 instance.name, i_be[constants.BE_MEMORY],
3865 instance.hypervisor)
3867 # check bridge existance
3868 brlist = [nic.bridge for nic in instance.nics]
3869 result = self.rpc.call_bridges_exist(target_node, brlist)
3870 if result.failed or not result.data:
3871 raise errors.OpPrereqError("One or more target bridges %s does not"
3872 " exist on destination node '%s'" %
3873 (brlist, target_node))
3875 if not self.op.cleanup:
3876 _CheckNodeNotDrained(self, target_node)
3877 result = self.rpc.call_instance_migratable(instance.primary_node,
3879 msg = result.RemoteFailMsg()
3881 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3884 self.instance = instance
3886 def _WaitUntilSync(self):
3887 """Poll with custom rpc for disk sync.
3889 This uses our own step-based rpc call.
3892 self.feedback_fn("* wait until resync is done")
3896 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3898 self.instance.disks)
3900 for node, nres in result.items():
3901 msg = nres.RemoteFailMsg()
3903 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3905 node_done, node_percent = nres.payload
3906 all_done = all_done and node_done
3907 if node_percent is not None:
3908 min_percent = min(min_percent, node_percent)
3910 if min_percent < 100:
3911 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3914 def _EnsureSecondary(self, node):
3915 """Demote a node to secondary.
3918 self.feedback_fn("* switching node %s to secondary mode" % node)
3920 for dev in self.instance.disks:
3921 self.cfg.SetDiskID(dev, node)
3923 result = self.rpc.call_blockdev_close(node, self.instance.name,
3924 self.instance.disks)
3925 msg = result.RemoteFailMsg()
3927 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3928 " error %s" % (node, msg))
3930 def _GoStandalone(self):
3931 """Disconnect from the network.
3934 self.feedback_fn("* changing into standalone mode")
3935 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3936 self.instance.disks)
3937 for node, nres in result.items():
3938 msg = nres.RemoteFailMsg()
3940 raise errors.OpExecError("Cannot disconnect disks node %s,"
3941 " error %s" % (node, msg))
3943 def _GoReconnect(self, multimaster):
3944 """Reconnect to the network.
3950 msg = "single-master"
3951 self.feedback_fn("* changing disks into %s mode" % msg)
3952 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3953 self.instance.disks,
3954 self.instance.name, multimaster)
3955 for node, nres in result.items():
3956 msg = nres.RemoteFailMsg()
3958 raise errors.OpExecError("Cannot change disks config on node %s,"
3959 " error: %s" % (node, msg))
3961 def _ExecCleanup(self):
3962 """Try to cleanup after a failed migration.
3964 The cleanup is done by:
3965 - check that the instance is running only on one node
3966 (and update the config if needed)
3967 - change disks on its secondary node to secondary
3968 - wait until disks are fully synchronized
3969 - disconnect from the network
3970 - change disks into single-master mode
3971 - wait again until disks are fully synchronized
3974 instance = self.instance
3975 target_node = self.target_node
3976 source_node = self.source_node
3978 # check running on only one node
3979 self.feedback_fn("* checking where the instance actually runs"
3980 " (if this hangs, the hypervisor might be in"
3982 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3983 for node, result in ins_l.items():
3985 if not isinstance(result.data, list):
3986 raise errors.OpExecError("Can't contact node '%s'" % node)
3988 runningon_source = instance.name in ins_l[source_node].data
3989 runningon_target = instance.name in ins_l[target_node].data
3991 if runningon_source and runningon_target:
3992 raise errors.OpExecError("Instance seems to be running on two nodes,"
3993 " or the hypervisor is confused. You will have"
3994 " to ensure manually that it runs only on one"
3995 " and restart this operation.")
3997 if not (runningon_source or runningon_target):
3998 raise errors.OpExecError("Instance does not seem to be running at all."
3999 " In this case, it's safer to repair by"
4000 " running 'gnt-instance stop' to ensure disk"
4001 " shutdown, and then restarting it.")
4003 if runningon_target:
4004 # the migration has actually succeeded, we need to update the config
4005 self.feedback_fn("* instance running on secondary node (%s),"
4006 " updating config" % target_node)
4007 instance.primary_node = target_node
4008 self.cfg.Update(instance)
4009 demoted_node = source_node
4011 self.feedback_fn("* instance confirmed to be running on its"
4012 " primary node (%s)" % source_node)
4013 demoted_node = target_node
4015 self._EnsureSecondary(demoted_node)
4017 self._WaitUntilSync()
4018 except errors.OpExecError:
4019 # we ignore here errors, since if the device is standalone, it
4020 # won't be able to sync
4022 self._GoStandalone()
4023 self._GoReconnect(False)
4024 self._WaitUntilSync()
4026 self.feedback_fn("* done")
4028 def _RevertDiskStatus(self):
4029 """Try to revert the disk status after a failed migration.
4032 target_node = self.target_node
4034 self._EnsureSecondary(target_node)
4035 self._GoStandalone()
4036 self._GoReconnect(False)
4037 self._WaitUntilSync()
4038 except errors.OpExecError, err:
4039 self.LogWarning("Migration failed and I can't reconnect the"
4040 " drives: error '%s'\n"
4041 "Please look and recover the instance status" %
4044 def _AbortMigration(self):
4045 """Call the hypervisor code to abort a started migration.
4048 instance = self.instance
4049 target_node = self.target_node
4050 migration_info = self.migration_info
4052 abort_result = self.rpc.call_finalize_migration(target_node,
4056 abort_msg = abort_result.RemoteFailMsg()
4058 logging.error("Aborting migration failed on target node %s: %s" %
4059 (target_node, abort_msg))
4060 # Don't raise an exception here, as we stil have to try to revert the
4061 # disk status, even if this step failed.
4063 def _ExecMigration(self):
4064 """Migrate an instance.
4066 The migrate is done by:
4067 - change the disks into dual-master mode
4068 - wait until disks are fully synchronized again
4069 - migrate the instance
4070 - change disks on the new secondary node (the old primary) to secondary
4071 - wait until disks are fully synchronized
4072 - change disks into single-master mode
4075 instance = self.instance
4076 target_node = self.target_node
4077 source_node = self.source_node
4079 self.feedback_fn("* checking disk consistency between source and target")
4080 for dev in instance.disks:
4081 if not _CheckDiskConsistency(self, dev, target_node, False):
4082 raise errors.OpExecError("Disk %s is degraded or not fully"
4083 " synchronized on target node,"
4084 " aborting migrate." % dev.iv_name)
4086 # First get the migration information from the remote node
4087 result = self.rpc.call_migration_info(source_node, instance)
4088 msg = result.RemoteFailMsg()
4090 log_err = ("Failed fetching source migration information from %s: %s" %
4092 logging.error(log_err)
4093 raise errors.OpExecError(log_err)
4095 self.migration_info = migration_info = result.payload
4097 # Then switch the disks to master/master mode
4098 self._EnsureSecondary(target_node)
4099 self._GoStandalone()
4100 self._GoReconnect(True)
4101 self._WaitUntilSync()
4103 self.feedback_fn("* preparing %s to accept the instance" % target_node)
4104 result = self.rpc.call_accept_instance(target_node,
4107 self.nodes_ip[target_node])
4109 msg = result.RemoteFailMsg()
4111 logging.error("Instance pre-migration failed, trying to revert"
4112 " disk status: %s", msg)
4113 self._AbortMigration()
4114 self._RevertDiskStatus()
4115 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4116 (instance.name, msg))
4118 self.feedback_fn("* migrating instance to %s" % target_node)
4120 result = self.rpc.call_instance_migrate(source_node, instance,
4121 self.nodes_ip[target_node],
4123 msg = result.RemoteFailMsg()
4125 logging.error("Instance migration failed, trying to revert"
4126 " disk status: %s", msg)
4127 self._AbortMigration()
4128 self._RevertDiskStatus()
4129 raise errors.OpExecError("Could not migrate instance %s: %s" %
4130 (instance.name, msg))
4133 instance.primary_node = target_node
4134 # distribute new instance config to the other nodes
4135 self.cfg.Update(instance)
4137 result = self.rpc.call_finalize_migration(target_node,
4141 msg = result.RemoteFailMsg()
4143 logging.error("Instance migration succeeded, but finalization failed:"
4145 raise errors.OpExecError("Could not finalize instance migration: %s" %
4148 self._EnsureSecondary(source_node)
4149 self._WaitUntilSync()
4150 self._GoStandalone()
4151 self._GoReconnect(False)
4152 self._WaitUntilSync()
4154 self.feedback_fn("* done")
4156 def Exec(self, feedback_fn):
4157 """Perform the migration.
4160 self.feedback_fn = feedback_fn
4162 self.source_node = self.instance.primary_node
4163 self.target_node = self.instance.secondary_nodes[0]
4164 self.all_nodes = [self.source_node, self.target_node]
4166 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4167 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4170 return self._ExecCleanup()
4172 return self._ExecMigration()
4175 def _CreateBlockDev(lu, node, instance, device, force_create,
4177 """Create a tree of block devices on a given node.
4179 If this device type has to be created on secondaries, create it and
4182 If not, just recurse to children keeping the same 'force' value.
4184 @param lu: the lu on whose behalf we execute
4185 @param node: the node on which to create the device
4186 @type instance: L{objects.Instance}
4187 @param instance: the instance which owns the device
4188 @type device: L{objects.Disk}
4189 @param device: the device to create
4190 @type force_create: boolean
4191 @param force_create: whether to force creation of this device; this
4192 will be change to True whenever we find a device which has
4193 CreateOnSecondary() attribute
4194 @param info: the extra 'metadata' we should attach to the device
4195 (this will be represented as a LVM tag)
4196 @type force_open: boolean
4197 @param force_open: this parameter will be passes to the
4198 L{backend.BlockdevCreate} function where it specifies
4199 whether we run on primary or not, and it affects both
4200 the child assembly and the device own Open() execution
4203 if device.CreateOnSecondary():
4207 for child in device.children:
4208 _CreateBlockDev(lu, node, instance, child, force_create,
4211 if not force_create:
4214 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4217 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4218 """Create a single block device on a given node.
4220 This will not recurse over children of the device, so they must be
4223 @param lu: the lu on whose behalf we execute
4224 @param node: the node on which to create the device
4225 @type instance: L{objects.Instance}
4226 @param instance: the instance which owns the device
4227 @type device: L{objects.Disk}
4228 @param device: the device to create
4229 @param info: the extra 'metadata' we should attach to the device
4230 (this will be represented as a LVM tag)
4231 @type force_open: boolean
4232 @param force_open: this parameter will be passes to the
4233 L{backend.BlockdevCreate} function where it specifies
4234 whether we run on primary or not, and it affects both
4235 the child assembly and the device own Open() execution
4238 lu.cfg.SetDiskID(device, node)
4239 result = lu.rpc.call_blockdev_create(node, device, device.size,
4240 instance.name, force_open, info)
4241 msg = result.RemoteFailMsg()
4243 raise errors.OpExecError("Can't create block device %s on"
4244 " node %s for instance %s: %s" %
4245 (device, node, instance.name, msg))
4246 if device.physical_id is None:
4247 device.physical_id = result.payload
4250 def _GenerateUniqueNames(lu, exts):
4251 """Generate a suitable LV name.
4253 This will generate a logical volume name for the given instance.
4258 new_id = lu.cfg.GenerateUniqueID()
4259 results.append("%s%s" % (new_id, val))
4263 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4265 """Generate a drbd8 device complete with its children.
4268 port = lu.cfg.AllocatePort()
4269 vgname = lu.cfg.GetVGName()
4270 shared_secret = lu.cfg.GenerateDRBDSecret()
4271 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4272 logical_id=(vgname, names[0]))
4273 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4274 logical_id=(vgname, names[1]))
4275 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4276 logical_id=(primary, secondary, port,
4279 children=[dev_data, dev_meta],
4284 def _GenerateDiskTemplate(lu, template_name,
4285 instance_name, primary_node,
4286 secondary_nodes, disk_info,
4287 file_storage_dir, file_driver,
4289 """Generate the entire disk layout for a given template type.
4292 #TODO: compute space requirements
4294 vgname = lu.cfg.GetVGName()
4295 disk_count = len(disk_info)
4297 if template_name == constants.DT_DISKLESS:
4299 elif template_name == constants.DT_PLAIN:
4300 if len(secondary_nodes) != 0:
4301 raise errors.ProgrammerError("Wrong template configuration")
4303 names = _GenerateUniqueNames(lu, [".disk%d" % i
4304 for i in range(disk_count)])
4305 for idx, disk in enumerate(disk_info):
4306 disk_index = idx + base_index
4307 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4308 logical_id=(vgname, names[idx]),
4309 iv_name="disk/%d" % disk_index,
4311 disks.append(disk_dev)
4312 elif template_name == constants.DT_DRBD8:
4313 if len(secondary_nodes) != 1:
4314 raise errors.ProgrammerError("Wrong template configuration")
4315 remote_node = secondary_nodes[0]
4316 minors = lu.cfg.AllocateDRBDMinor(
4317 [primary_node, remote_node] * len(disk_info), instance_name)
4320 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4321 for i in range(disk_count)]):
4322 names.append(lv_prefix + "_data")
4323 names.append(lv_prefix + "_meta")
4324 for idx, disk in enumerate(disk_info):
4325 disk_index = idx + base_index
4326 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4327 disk["size"], names[idx*2:idx*2+2],
4328 "disk/%d" % disk_index,
4329 minors[idx*2], minors[idx*2+1])
4330 disk_dev.mode = disk["mode"]
4331 disks.append(disk_dev)
4332 elif template_name == constants.DT_FILE:
4333 if len(secondary_nodes) != 0:
4334 raise errors.ProgrammerError("Wrong template configuration")
4336 for idx, disk in enumerate(disk_info):
4337 disk_index = idx + base_index
4338 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4339 iv_name="disk/%d" % disk_index,
4340 logical_id=(file_driver,
4341 "%s/disk%d" % (file_storage_dir,
4344 disks.append(disk_dev)
4346 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4350 def _GetInstanceInfoText(instance):
4351 """Compute that text that should be added to the disk's metadata.
4354 return "originstname+%s" % instance.name
4357 def _CreateDisks(lu, instance):
4358 """Create all disks for an instance.
4360 This abstracts away some work from AddInstance.
4362 @type lu: L{LogicalUnit}
4363 @param lu: the logical unit on whose behalf we execute
4364 @type instance: L{objects.Instance}
4365 @param instance: the instance whose disks we should create
4367 @return: the success of the creation
4370 info = _GetInstanceInfoText(instance)
4371 pnode = instance.primary_node
4373 if instance.disk_template == constants.DT_FILE:
4374 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4375 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4377 if result.failed or not result.data:
4378 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4380 if not result.data[0]:
4381 raise errors.OpExecError("Failed to create directory '%s'" %
4384 # Note: this needs to be kept in sync with adding of disks in
4385 # LUSetInstanceParams
4386 for device in instance.disks:
4387 logging.info("Creating volume %s for instance %s",
4388 device.iv_name, instance.name)
4390 for node in instance.all_nodes:
4391 f_create = node == pnode
4392 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4395 def _RemoveDisks(lu, instance):
4396 """Remove all disks for an instance.
4398 This abstracts away some work from `AddInstance()` and
4399 `RemoveInstance()`. Note that in case some of the devices couldn't
4400 be removed, the removal will continue with the other ones (compare
4401 with `_CreateDisks()`).
4403 @type lu: L{LogicalUnit}
4404 @param lu: the logical unit on whose behalf we execute
4405 @type instance: L{objects.Instance}
4406 @param instance: the instance whose disks we should remove
4408 @return: the success of the removal
4411 logging.info("Removing block devices for instance %s", instance.name)
4414 for device in instance.disks:
4415 for node, disk in device.ComputeNodeTree(instance.primary_node):
4416 lu.cfg.SetDiskID(disk, node)
4417 msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4419 lu.LogWarning("Could not remove block device %s on node %s,"
4420 " continuing anyway: %s", device.iv_name, node, msg)
4423 if instance.disk_template == constants.DT_FILE:
4424 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4425 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4427 if result.failed or not result.data:
4428 logging.error("Could not remove directory '%s'", file_storage_dir)
4434 def _ComputeDiskSize(disk_template, disks):
4435 """Compute disk size requirements in the volume group
4438 # Required free disk space as a function of disk and swap space
4440 constants.DT_DISKLESS: None,
4441 constants.DT_PLAIN: sum(d["size"] for d in disks),
4442 # 128 MB are added for drbd metadata for each disk
4443 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4444 constants.DT_FILE: None,
4447 if disk_template not in req_size_dict:
4448 raise errors.ProgrammerError("Disk template '%s' size requirement"
4449 " is unknown" % disk_template)
4451 return req_size_dict[disk_template]
4454 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4455 """Hypervisor parameter validation.
4457 This function abstract the hypervisor parameter validation to be
4458 used in both instance create and instance modify.
4460 @type lu: L{LogicalUnit}
4461 @param lu: the logical unit for which we check
4462 @type nodenames: list
4463 @param nodenames: the list of nodes on which we should check
4464 @type hvname: string
4465 @param hvname: the name of the hypervisor we should use
4466 @type hvparams: dict
4467 @param hvparams: the parameters which we need to check
4468 @raise errors.OpPrereqError: if the parameters are not valid
4471 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4474 for node in nodenames:
4478 msg = info.RemoteFailMsg()
4480 raise errors.OpPrereqError("Hypervisor parameter validation"
4481 " failed on node %s: %s" % (node, msg))
4484 class LUCreateInstance(LogicalUnit):
4485 """Create an instance.
4488 HPATH = "instance-add"
4489 HTYPE = constants.HTYPE_INSTANCE
4490 _OP_REQP = ["instance_name", "disks", "disk_template",
4492 "wait_for_sync", "ip_check", "nics",
4493 "hvparams", "beparams"]
4496 def _ExpandNode(self, node):
4497 """Expands and checks one node name.
4500 node_full = self.cfg.ExpandNodeName(node)
4501 if node_full is None:
4502 raise errors.OpPrereqError("Unknown node %s" % node)
4505 def ExpandNames(self):
4506 """ExpandNames for CreateInstance.
4508 Figure out the right locks for instance creation.
4511 self.needed_locks = {}
4513 # set optional parameters to none if they don't exist
4514 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4515 if not hasattr(self.op, attr):
4516 setattr(self.op, attr, None)
4518 # cheap checks, mostly valid constants given
4520 # verify creation mode
4521 if self.op.mode not in (constants.INSTANCE_CREATE,
4522 constants.INSTANCE_IMPORT):
4523 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4526 # disk template and mirror node verification
4527 if self.op.disk_template not in constants.DISK_TEMPLATES:
4528 raise errors.OpPrereqError("Invalid disk template name")
4530 if self.op.hypervisor is None:
4531 self.op.hypervisor = self.cfg.GetHypervisorType()
4533 cluster = self.cfg.GetClusterInfo()
4534 enabled_hvs = cluster.enabled_hypervisors
4535 if self.op.hypervisor not in enabled_hvs:
4536 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4537 " cluster (%s)" % (self.op.hypervisor,
4538 ",".join(enabled_hvs)))
4540 # check hypervisor parameter syntax (locally)
4541 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4542 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4544 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4545 hv_type.CheckParameterSyntax(filled_hvp)
4546 self.hv_full = filled_hvp
4548 # fill and remember the beparams dict
4549 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4550 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4553 #### instance parameters check
4555 # instance name verification
4556 hostname1 = utils.HostInfo(self.op.instance_name)
4557 self.op.instance_name = instance_name = hostname1.name
4559 # this is just a preventive check, but someone might still add this
4560 # instance in the meantime, and creation will fail at lock-add time
4561 if instance_name in self.cfg.GetInstanceList():
4562 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4565 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4569 for nic in self.op.nics:
4570 # ip validity checks
4571 ip = nic.get("ip", None)
4572 if ip is None or ip.lower() == "none":
4574 elif ip.lower() == constants.VALUE_AUTO:
4575 nic_ip = hostname1.ip
4577 if not utils.IsValidIP(ip):
4578 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4579 " like a valid IP" % ip)
4582 # MAC address verification
4583 mac = nic.get("mac", constants.VALUE_AUTO)
4584 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4585 if not utils.IsValidMac(mac.lower()):
4586 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4588 # bridge verification
4589 bridge = nic.get("bridge", None)
4591 bridge = self.cfg.GetDefBridge()
4592 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4594 # disk checks/pre-build
4596 for disk in self.op.disks:
4597 mode = disk.get("mode", constants.DISK_RDWR)
4598 if mode not in constants.DISK_ACCESS_SET:
4599 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4601 size = disk.get("size", None)
4603 raise errors.OpPrereqError("Missing disk size")
4607 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4608 self.disks.append({"size": size, "mode": mode})
4610 # used in CheckPrereq for ip ping check
4611 self.check_ip = hostname1.ip
4613 # file storage checks
4614 if (self.op.file_driver and
4615 not self.op.file_driver in constants.FILE_DRIVER):
4616 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4617 self.op.file_driver)
4619 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4620 raise errors.OpPrereqError("File storage directory path not absolute")
4622 ### Node/iallocator related checks
4623 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4624 raise errors.OpPrereqError("One and only one of iallocator and primary"
4625 " node must be given")
4627 if self.op.iallocator:
4628 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4630 self.op.pnode = self._ExpandNode(self.op.pnode)
4631 nodelist = [self.op.pnode]
4632 if self.op.snode is not None:
4633 self.op.snode = self._ExpandNode(self.op.snode)
4634 nodelist.append(self.op.snode)
4635 self.needed_locks[locking.LEVEL_NODE] = nodelist
4637 # in case of import lock the source node too
4638 if self.op.mode == constants.INSTANCE_IMPORT:
4639 src_node = getattr(self.op, "src_node", None)
4640 src_path = getattr(self.op, "src_path", None)
4642 if src_path is None:
4643 self.op.src_path = src_path = self.op.instance_name
4645 if src_node is None:
4646 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4647 self.op.src_node = None
4648 if os.path.isabs(src_path):
4649 raise errors.OpPrereqError("Importing an instance from an absolute"
4650 " path requires a source node option.")
4652 self.op.src_node = src_node = self._ExpandNode(src_node)
4653 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4654 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4655 if not os.path.isabs(src_path):
4656 self.op.src_path = src_path = \
4657 os.path.join(constants.EXPORT_DIR, src_path)
4659 else: # INSTANCE_CREATE
4660 if getattr(self.op, "os_type", None) is None:
4661 raise errors.OpPrereqError("No guest OS specified")
4663 def _RunAllocator(self):
4664 """Run the allocator based on input opcode.
4667 nics = [n.ToDict() for n in self.nics]
4668 ial = IAllocator(self,
4669 mode=constants.IALLOCATOR_MODE_ALLOC,
4670 name=self.op.instance_name,
4671 disk_template=self.op.disk_template,
4674 vcpus=self.be_full[constants.BE_VCPUS],
4675 mem_size=self.be_full[constants.BE_MEMORY],
4678 hypervisor=self.op.hypervisor,
4681 ial.Run(self.op.iallocator)
4684 raise errors.OpPrereqError("Can't compute nodes using"
4685 " iallocator '%s': %s" % (self.op.iallocator,
4687 if len(ial.nodes) != ial.required_nodes:
4688 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4689 " of nodes (%s), required %s" %
4690 (self.op.iallocator, len(ial.nodes),
4691 ial.required_nodes))
4692 self.op.pnode = ial.nodes[0]
4693 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4694 self.op.instance_name, self.op.iallocator,
4695 ", ".join(ial.nodes))
4696 if ial.required_nodes == 2:
4697 self.op.snode = ial.nodes[1]
4699 def BuildHooksEnv(self):
4702 This runs on master, primary and secondary nodes of the instance.
4706 "ADD_MODE": self.op.mode,
4708 if self.op.mode == constants.INSTANCE_IMPORT:
4709 env["SRC_NODE"] = self.op.src_node
4710 env["SRC_PATH"] = self.op.src_path
4711 env["SRC_IMAGES"] = self.src_images
4713 env.update(_BuildInstanceHookEnv(
4714 name=self.op.instance_name,
4715 primary_node=self.op.pnode,
4716 secondary_nodes=self.secondaries,
4717 status=self.op.start,
4718 os_type=self.op.os_type,
4719 memory=self.be_full[constants.BE_MEMORY],
4720 vcpus=self.be_full[constants.BE_VCPUS],
4721 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4722 disk_template=self.op.disk_template,
4723 disks=[(d["size"], d["mode"]) for d in self.disks],
4726 hypervisor=self.op.hypervisor,
4729 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4734 def CheckPrereq(self):
4735 """Check prerequisites.
4738 if (not self.cfg.GetVGName() and
4739 self.op.disk_template not in constants.DTS_NOT_LVM):
4740 raise errors.OpPrereqError("Cluster does not support lvm-based"
4743 if self.op.mode == constants.INSTANCE_IMPORT:
4744 src_node = self.op.src_node
4745 src_path = self.op.src_path
4747 if src_node is None:
4748 exp_list = self.rpc.call_export_list(
4749 self.acquired_locks[locking.LEVEL_NODE])
4751 for node in exp_list:
4752 if not exp_list[node].failed and src_path in exp_list[node].data:
4754 self.op.src_node = src_node = node
4755 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4759 raise errors.OpPrereqError("No export found for relative path %s" %
4762 _CheckNodeOnline(self, src_node)
4763 result = self.rpc.call_export_info(src_node, src_path)
4766 raise errors.OpPrereqError("No export found in dir %s" % src_path)
4768 export_info = result.data
4769 if not export_info.has_section(constants.INISECT_EXP):
4770 raise errors.ProgrammerError("Corrupted export config")
4772 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4773 if (int(ei_version) != constants.EXPORT_VERSION):
4774 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4775 (ei_version, constants.EXPORT_VERSION))
4777 # Check that the new instance doesn't have less disks than the export
4778 instance_disks = len(self.disks)
4779 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4780 if instance_disks < export_disks:
4781 raise errors.OpPrereqError("Not enough disks to import."
4782 " (instance: %d, export: %d)" %
4783 (instance_disks, export_disks))
4785 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4787 for idx in range(export_disks):
4788 option = 'disk%d_dump' % idx
4789 if export_info.has_option(constants.INISECT_INS, option):
4790 # FIXME: are the old os-es, disk sizes, etc. useful?
4791 export_name = export_info.get(constants.INISECT_INS, option)
4792 image = os.path.join(src_path, export_name)
4793 disk_images.append(image)
4795 disk_images.append(False)
4797 self.src_images = disk_images
4799 old_name = export_info.get(constants.INISECT_INS, 'name')
4800 # FIXME: int() here could throw a ValueError on broken exports
4801 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4802 if self.op.instance_name == old_name:
4803 for idx, nic in enumerate(self.nics):
4804 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4805 nic_mac_ini = 'nic%d_mac' % idx
4806 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4808 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4809 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4810 if self.op.start and not self.op.ip_check:
4811 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4812 " adding an instance in start mode")
4814 if self.op.ip_check:
4815 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4816 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4817 (self.check_ip, self.op.instance_name))
4819 #### mac address generation
4820 # By generating here the mac address both the allocator and the hooks get
4821 # the real final mac address rather than the 'auto' or 'generate' value.
4822 # There is a race condition between the generation and the instance object
4823 # creation, which means that we know the mac is valid now, but we're not
4824 # sure it will be when we actually add the instance. If things go bad
4825 # adding the instance will abort because of a duplicate mac, and the
4826 # creation job will fail.
4827 for nic in self.nics:
4828 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4829 nic.mac = self.cfg.GenerateMAC()
4833 if self.op.iallocator is not None:
4834 self._RunAllocator()
4836 #### node related checks
4838 # check primary node
4839 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4840 assert self.pnode is not None, \
4841 "Cannot retrieve locked node %s" % self.op.pnode
4843 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4846 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4849 self.secondaries = []
4851 # mirror node verification
4852 if self.op.disk_template in constants.DTS_NET_MIRROR:
4853 if self.op.snode is None:
4854 raise errors.OpPrereqError("The networked disk templates need"
4856 if self.op.snode == pnode.name:
4857 raise errors.OpPrereqError("The secondary node cannot be"
4858 " the primary node.")
4859 _CheckNodeOnline(self, self.op.snode)
4860 _CheckNodeNotDrained(self, self.op.snode)
4861 self.secondaries.append(self.op.snode)
4863 nodenames = [pnode.name] + self.secondaries
4865 req_size = _ComputeDiskSize(self.op.disk_template,
4868 # Check lv size requirements
4869 if req_size is not None:
4870 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4872 for node in nodenames:
4873 info = nodeinfo[node]
4877 raise errors.OpPrereqError("Cannot get current information"
4878 " from node '%s'" % node)
4879 vg_free = info.get('vg_free', None)
4880 if not isinstance(vg_free, int):
4881 raise errors.OpPrereqError("Can't compute free disk space on"
4883 if req_size > info['vg_free']:
4884 raise errors.OpPrereqError("Not enough disk space on target node %s."
4885 " %d MB available, %d MB required" %
4886 (node, info['vg_free'], req_size))
4888 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4891 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4893 if not isinstance(result.data, objects.OS) or not result.data:
4894 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4895 " primary node" % self.op.os_type)
4897 # bridge check on primary node
4898 bridges = [n.bridge for n in self.nics]
4899 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4902 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4903 " exist on destination node '%s'" %
4904 (",".join(bridges), pnode.name))
4906 # memory check on primary node
4908 _CheckNodeFreeMemory(self, self.pnode.name,
4909 "creating instance %s" % self.op.instance_name,
4910 self.be_full[constants.BE_MEMORY],
4913 def Exec(self, feedback_fn):
4914 """Create and add the instance to the cluster.
4917 instance = self.op.instance_name
4918 pnode_name = self.pnode.name
4920 ht_kind = self.op.hypervisor
4921 if ht_kind in constants.HTS_REQ_PORT:
4922 network_port = self.cfg.AllocatePort()
4926 ##if self.op.vnc_bind_address is None:
4927 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4929 # this is needed because os.path.join does not accept None arguments
4930 if self.op.file_storage_dir is None:
4931 string_file_storage_dir = ""
4933 string_file_storage_dir = self.op.file_storage_dir
4935 # build the full file storage dir path
4936 file_storage_dir = os.path.normpath(os.path.join(
4937 self.cfg.GetFileStorageDir(),
4938 string_file_storage_dir, instance))
4941 disks = _GenerateDiskTemplate(self,
4942 self.op.disk_template,
4943 instance, pnode_name,
4947 self.op.file_driver,
4950 iobj = objects.Instance(name=instance, os=self.op.os_type,
4951 primary_node=pnode_name,
4952 nics=self.nics, disks=disks,
4953 disk_template=self.op.disk_template,
4955 network_port=network_port,
4956 beparams=self.op.beparams,
4957 hvparams=self.op.hvparams,
4958 hypervisor=self.op.hypervisor,
4961 feedback_fn("* creating instance disks...")
4963 _CreateDisks(self, iobj)
4964 except errors.OpExecError:
4965 self.LogWarning("Device creation failed, reverting...")
4967 _RemoveDisks(self, iobj)
4969 self.cfg.ReleaseDRBDMinors(instance)
4972 feedback_fn("adding instance %s to cluster config" % instance)
4974 self.cfg.AddInstance(iobj)
4975 # Declare that we don't want to remove the instance lock anymore, as we've
4976 # added the instance to the config
4977 del self.remove_locks[locking.LEVEL_INSTANCE]
4978 # Unlock all the nodes
4979 if self.op.mode == constants.INSTANCE_IMPORT:
4980 nodes_keep = [self.op.src_node]
4981 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4982 if node != self.op.src_node]
4983 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4984 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4986 self.context.glm.release(locking.LEVEL_NODE)
4987 del self.acquired_locks[locking.LEVEL_NODE]
4989 if self.op.wait_for_sync:
4990 disk_abort = not _WaitForSync(self, iobj)
4991 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4992 # make sure the disks are not degraded (still sync-ing is ok)
4994 feedback_fn("* checking mirrors status")
4995 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5000 _RemoveDisks(self, iobj)
5001 self.cfg.RemoveInstance(iobj.name)
5002 # Make sure the instance lock gets removed
5003 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5004 raise errors.OpExecError("There are some degraded disks for"
5007 feedback_fn("creating os for instance %s on node %s" %
5008 (instance, pnode_name))
5010 if iobj.disk_template != constants.DT_DISKLESS:
5011 if self.op.mode == constants.INSTANCE_CREATE:
5012 feedback_fn("* running the instance OS create scripts...")
5013 result = self.rpc.call_instance_os_add(pnode_name, iobj)
5014 msg = result.RemoteFailMsg()
5016 raise errors.OpExecError("Could not add os for instance %s"
5018 (instance, pnode_name, msg))
5020 elif self.op.mode == constants.INSTANCE_IMPORT:
5021 feedback_fn("* running the instance OS import scripts...")
5022 src_node = self.op.src_node
5023 src_images = self.src_images
5024 cluster_name = self.cfg.GetClusterName()
5025 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5026 src_node, src_images,
5028 import_result.Raise()
5029 for idx, result in enumerate(import_result.data):
5031 self.LogWarning("Could not import the image %s for instance"
5032 " %s, disk %d, on node %s" %
5033 (src_images[idx], instance, idx, pnode_name))
5035 # also checked in the prereq part
5036 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5040 iobj.admin_up = True
5041 self.cfg.Update(iobj)
5042 logging.info("Starting instance %s on node %s", instance, pnode_name)
5043 feedback_fn("* starting instance...")
5044 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5045 msg = result.RemoteFailMsg()
5047 raise errors.OpExecError("Could not start instance: %s" % msg)
5050 class LUConnectConsole(NoHooksLU):
5051 """Connect to an instance's console.
5053 This is somewhat special in that it returns the command line that
5054 you need to run on the master node in order to connect to the
5058 _OP_REQP = ["instance_name"]
5061 def ExpandNames(self):
5062 self._ExpandAndLockInstance()
5064 def CheckPrereq(self):
5065 """Check prerequisites.
5067 This checks that the instance is in the cluster.
5070 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5071 assert self.instance is not None, \
5072 "Cannot retrieve locked instance %s" % self.op.instance_name
5073 _CheckNodeOnline(self, self.instance.primary_node)
5075 def Exec(self, feedback_fn):
5076 """Connect to the console of an instance
5079 instance = self.instance
5080 node = instance.primary_node
5082 node_insts = self.rpc.call_instance_list([node],
5083 [instance.hypervisor])[node]
5086 if instance.name not in node_insts.data:
5087 raise errors.OpExecError("Instance %s is not running." % instance.name)
5089 logging.debug("Connecting to console of %s on %s", instance.name, node)
5091 hyper = hypervisor.GetHypervisor(instance.hypervisor)
5092 cluster = self.cfg.GetClusterInfo()
5093 # beparams and hvparams are passed separately, to avoid editing the
5094 # instance and then saving the defaults in the instance itself.
5095 hvparams = cluster.FillHV(instance)
5096 beparams = cluster.FillBE(instance)
5097 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5100 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5103 class LUReplaceDisks(LogicalUnit):
5104 """Replace the disks of an instance.
5107 HPATH = "mirrors-replace"
5108 HTYPE = constants.HTYPE_INSTANCE
5109 _OP_REQP = ["instance_name", "mode", "disks"]
5112 def CheckArguments(self):
5113 if not hasattr(self.op, "remote_node"):
5114 self.op.remote_node = None
5115 if not hasattr(self.op, "iallocator"):
5116 self.op.iallocator = None
5118 # check for valid parameter combination
5119 cnt = [self.op.remote_node, self.op.iallocator].count(None)
5120 if self.op.mode == constants.REPLACE_DISK_CHG:
5122 raise errors.OpPrereqError("When changing the secondary either an"
5123 " iallocator script must be used or the"
5126 raise errors.OpPrereqError("Give either the iallocator or the new"
5127 " secondary, not both")
5128 else: # not replacing the secondary
5130 raise errors.OpPrereqError("The iallocator and new node options can"
5131 " be used only when changing the"
5134 def ExpandNames(self):
5135 self._ExpandAndLockInstance()
5137 if self.op.iallocator is not None:
5138 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5139 elif self.op.remote_node is not None:
5140 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5141 if remote_node is None:
5142 raise errors.OpPrereqError("Node '%s' not known" %
5143 self.op.remote_node)
5144 self.op.remote_node = remote_node
5145 # Warning: do not remove the locking of the new secondary here
5146 # unless DRBD8.AddChildren is changed to work in parallel;
5147 # currently it doesn't since parallel invocations of
5148 # FindUnusedMinor will conflict
5149 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5150 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5152 self.needed_locks[locking.LEVEL_NODE] = []
5153 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5155 def DeclareLocks(self, level):
5156 # If we're not already locking all nodes in the set we have to declare the
5157 # instance's primary/secondary nodes.
5158 if (level == locking.LEVEL_NODE and
5159 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5160 self._LockInstancesNodes()
5162 def _RunAllocator(self):
5163 """Compute a new secondary node using an IAllocator.
5166 ial = IAllocator(self,
5167 mode=constants.IALLOCATOR_MODE_RELOC,
5168 name=self.op.instance_name,
5169 relocate_from=[self.sec_node])
5171 ial.Run(self.op.iallocator)
5174 raise errors.OpPrereqError("Can't compute nodes using"
5175 " iallocator '%s': %s" % (self.op.iallocator,
5177 if len(ial.nodes) != ial.required_nodes:
5178 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5179 " of nodes (%s), required %s" %
5180 (len(ial.nodes), ial.required_nodes))
5181 self.op.remote_node = ial.nodes[0]
5182 self.LogInfo("Selected new secondary for the instance: %s",
5183 self.op.remote_node)
5185 def BuildHooksEnv(self):
5188 This runs on the master, the primary and all the secondaries.
5192 "MODE": self.op.mode,
5193 "NEW_SECONDARY": self.op.remote_node,
5194 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5196 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5198 self.cfg.GetMasterNode(),
5199 self.instance.primary_node,
5201 if self.op.remote_node is not None:
5202 nl.append(self.op.remote_node)
5205 def CheckPrereq(self):
5206 """Check prerequisites.
5208 This checks that the instance is in the cluster.
5211 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5212 assert instance is not None, \
5213 "Cannot retrieve locked instance %s" % self.op.instance_name
5214 self.instance = instance
5216 if instance.disk_template != constants.DT_DRBD8:
5217 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5220 if len(instance.secondary_nodes) != 1:
5221 raise errors.OpPrereqError("The instance has a strange layout,"
5222 " expected one secondary but found %d" %
5223 len(instance.secondary_nodes))
5225 self.sec_node = instance.secondary_nodes[0]
5227 if self.op.iallocator is not None:
5228 self._RunAllocator()
5230 remote_node = self.op.remote_node
5231 if remote_node is not None:
5232 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5233 assert self.remote_node_info is not None, \
5234 "Cannot retrieve locked node %s" % remote_node
5236 self.remote_node_info = None
5237 if remote_node == instance.primary_node:
5238 raise errors.OpPrereqError("The specified node is the primary node of"
5240 elif remote_node == self.sec_node:
5241 raise errors.OpPrereqError("The specified node is already the"
5242 " secondary node of the instance.")
5244 if self.op.mode == constants.REPLACE_DISK_PRI:
5245 n1 = self.tgt_node = instance.primary_node
5246 n2 = self.oth_node = self.sec_node
5247 elif self.op.mode == constants.REPLACE_DISK_SEC:
5248 n1 = self.tgt_node = self.sec_node
5249 n2 = self.oth_node = instance.primary_node
5250 elif self.op.mode == constants.REPLACE_DISK_CHG:
5251 n1 = self.new_node = remote_node
5252 n2 = self.oth_node = instance.primary_node
5253 self.tgt_node = self.sec_node
5254 _CheckNodeNotDrained(self, remote_node)
5256 raise errors.ProgrammerError("Unhandled disk replace mode")
5258 _CheckNodeOnline(self, n1)
5259 _CheckNodeOnline(self, n2)
5261 if not self.op.disks:
5262 self.op.disks = range(len(instance.disks))
5264 for disk_idx in self.op.disks:
5265 instance.FindDisk(disk_idx)
5267 def _ExecD8DiskOnly(self, feedback_fn):
5268 """Replace a disk on the primary or secondary for dbrd8.
5270 The algorithm for replace is quite complicated:
5272 1. for each disk to be replaced:
5274 1. create new LVs on the target node with unique names
5275 1. detach old LVs from the drbd device
5276 1. rename old LVs to name_replaced.<time_t>
5277 1. rename new LVs to old LVs
5278 1. attach the new LVs (with the old names now) to the drbd device
5280 1. wait for sync across all devices
5282 1. for each modified disk:
5284 1. remove old LVs (which have the name name_replaces.<time_t>)
5286 Failures are not very well handled.
5290 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5291 instance = self.instance
5293 vgname = self.cfg.GetVGName()
5296 tgt_node = self.tgt_node
5297 oth_node = self.oth_node
5299 # Step: check device activation
5300 self.proc.LogStep(1, steps_total, "check device existence")
5301 info("checking volume groups")
5302 my_vg = cfg.GetVGName()
5303 results = self.rpc.call_vg_list([oth_node, tgt_node])
5305 raise errors.OpExecError("Can't list volume groups on the nodes")
5306 for node in oth_node, tgt_node:
5308 if res.failed or not res.data or my_vg not in res.data:
5309 raise errors.OpExecError("Volume group '%s' not found on %s" %
5311 for idx, dev in enumerate(instance.disks):
5312 if idx not in self.op.disks:
5314 for node in tgt_node, oth_node:
5315 info("checking disk/%d on %s" % (idx, node))
5316 cfg.SetDiskID(dev, node)
5317 result = self.rpc.call_blockdev_find(node, dev)
5318 msg = result.RemoteFailMsg()
5319 if not msg and not result.payload:
5320 msg = "disk not found"
5322 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5325 # Step: check other node consistency
5326 self.proc.LogStep(2, steps_total, "check peer consistency")
5327 for idx, dev in enumerate(instance.disks):
5328 if idx not in self.op.disks:
5330 info("checking disk/%d consistency on %s" % (idx, oth_node))
5331 if not _CheckDiskConsistency(self, dev, oth_node,
5332 oth_node==instance.primary_node):
5333 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5334 " to replace disks on this node (%s)" %
5335 (oth_node, tgt_node))
5337 # Step: create new storage
5338 self.proc.LogStep(3, steps_total, "allocate new storage")
5339 for idx, dev in enumerate(instance.disks):
5340 if idx not in self.op.disks:
5343 cfg.SetDiskID(dev, tgt_node)
5344 lv_names = [".disk%d_%s" % (idx, suf)
5345 for suf in ["data", "meta"]]
5346 names = _GenerateUniqueNames(self, lv_names)
5347 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5348 logical_id=(vgname, names[0]))
5349 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5350 logical_id=(vgname, names[1]))
5351 new_lvs = [lv_data, lv_meta]
5352 old_lvs = dev.children
5353 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5354 info("creating new local storage on %s for %s" %
5355 (tgt_node, dev.iv_name))
5356 # we pass force_create=True to force the LVM creation
5357 for new_lv in new_lvs:
5358 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5359 _GetInstanceInfoText(instance), False)
5361 # Step: for each lv, detach+rename*2+attach
5362 self.proc.LogStep(4, steps_total, "change drbd configuration")
5363 for dev, old_lvs, new_lvs in iv_names.itervalues():
5364 info("detaching %s drbd from local storage" % dev.iv_name)
5365 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5368 raise errors.OpExecError("Can't detach drbd from local storage on node"
5369 " %s for device %s" % (tgt_node, dev.iv_name))
5371 #cfg.Update(instance)
5373 # ok, we created the new LVs, so now we know we have the needed
5374 # storage; as such, we proceed on the target node to rename
5375 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5376 # using the assumption that logical_id == physical_id (which in
5377 # turn is the unique_id on that node)
5379 # FIXME(iustin): use a better name for the replaced LVs
5380 temp_suffix = int(time.time())
5381 ren_fn = lambda d, suff: (d.physical_id[0],
5382 d.physical_id[1] + "_replaced-%s" % suff)
5383 # build the rename list based on what LVs exist on the node
5385 for to_ren in old_lvs:
5386 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5387 if not result.RemoteFailMsg() and result.payload:
5389 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5391 info("renaming the old LVs on the target node")
5392 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5395 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5396 # now we rename the new LVs to the old LVs
5397 info("renaming the new LVs on the target node")
5398 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5399 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5402 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5404 for old, new in zip(old_lvs, new_lvs):
5405 new.logical_id = old.logical_id
5406 cfg.SetDiskID(new, tgt_node)
5408 for disk in old_lvs:
5409 disk.logical_id = ren_fn(disk, temp_suffix)
5410 cfg.SetDiskID(disk, tgt_node)
5412 # now that the new lvs have the old name, we can add them to the device
5413 info("adding new mirror component on %s" % tgt_node)
5414 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5415 if result.failed or not result.data:
5416 for new_lv in new_lvs:
5417 msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5419 warning("Can't rollback device %s: %s", dev, msg,
5420 hint="cleanup manually the unused logical volumes")
5421 raise errors.OpExecError("Can't add local storage to drbd")
5423 dev.children = new_lvs
5424 cfg.Update(instance)
5426 # Step: wait for sync
5428 # this can fail as the old devices are degraded and _WaitForSync
5429 # does a combined result over all disks, so we don't check its
5431 self.proc.LogStep(5, steps_total, "sync devices")
5432 _WaitForSync(self, instance, unlock=True)
5434 # so check manually all the devices
5435 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5436 cfg.SetDiskID(dev, instance.primary_node)
5437 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5438 msg = result.RemoteFailMsg()
5439 if not msg and not result.payload:
5440 msg = "disk not found"
5442 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5444 if result.payload[5]:
5445 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5447 # Step: remove old storage
5448 self.proc.LogStep(6, steps_total, "removing old storage")
5449 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5450 info("remove logical volumes for %s" % name)
5452 cfg.SetDiskID(lv, tgt_node)
5453 msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5455 warning("Can't remove old LV: %s" % msg,
5456 hint="manually remove unused LVs")
5459 def _ExecD8Secondary(self, feedback_fn):
5460 """Replace the secondary node for drbd8.
5462 The algorithm for replace is quite complicated:
5463 - for all disks of the instance:
5464 - create new LVs on the new node with same names
5465 - shutdown the drbd device on the old secondary
5466 - disconnect the drbd network on the primary
5467 - create the drbd device on the new secondary
5468 - network attach the drbd on the primary, using an artifice:
5469 the drbd code for Attach() will connect to the network if it
5470 finds a device which is connected to the good local disks but
5472 - wait for sync across all devices
5473 - remove all disks from the old secondary
5475 Failures are not very well handled.
5479 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5480 instance = self.instance
5484 old_node = self.tgt_node
5485 new_node = self.new_node
5486 pri_node = instance.primary_node
5488 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5489 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5490 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5493 # Step: check device activation
5494 self.proc.LogStep(1, steps_total, "check device existence")
5495 info("checking volume groups")
5496 my_vg = cfg.GetVGName()
5497 results = self.rpc.call_vg_list([pri_node, new_node])
5498 for node in pri_node, new_node:
5500 if res.failed or not res.data or my_vg not in res.data:
5501 raise errors.OpExecError("Volume group '%s' not found on %s" %
5503 for idx, dev in enumerate(instance.disks):
5504 if idx not in self.op.disks:
5506 info("checking disk/%d on %s" % (idx, pri_node))
5507 cfg.SetDiskID(dev, pri_node)
5508 result = self.rpc.call_blockdev_find(pri_node, dev)
5509 msg = result.RemoteFailMsg()
5510 if not msg and not result.payload:
5511 msg = "disk not found"
5513 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5514 (idx, pri_node, msg))
5516 # Step: check other node consistency
5517 self.proc.LogStep(2, steps_total, "check peer consistency")
5518 for idx, dev in enumerate(instance.disks):
5519 if idx not in self.op.disks:
5521 info("checking disk/%d consistency on %s" % (idx, pri_node))
5522 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5523 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5524 " unsafe to replace the secondary" %
5527 # Step: create new storage
5528 self.proc.LogStep(3, steps_total, "allocate new storage")
5529 for idx, dev in enumerate(instance.disks):
5530 info("adding new local storage on %s for disk/%d" %
5532 # we pass force_create=True to force LVM creation
5533 for new_lv in dev.children:
5534 _CreateBlockDev(self, new_node, instance, new_lv, True,
5535 _GetInstanceInfoText(instance), False)
5537 # Step 4: dbrd minors and drbd setups changes
5538 # after this, we must manually remove the drbd minors on both the
5539 # error and the success paths
5540 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5542 logging.debug("Allocated minors %s" % (minors,))
5543 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5544 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5546 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5547 # create new devices on new_node; note that we create two IDs:
5548 # one without port, so the drbd will be activated without
5549 # networking information on the new node at this stage, and one
5550 # with network, for the latter activation in step 4
5551 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5552 if pri_node == o_node1:
5557 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5558 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5560 iv_names[idx] = (dev, dev.children, new_net_id)
5561 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5563 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5564 logical_id=new_alone_id,
5565 children=dev.children,
5568 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5569 _GetInstanceInfoText(instance), False)
5570 except errors.GenericError:
5571 self.cfg.ReleaseDRBDMinors(instance.name)
5574 for idx, dev in enumerate(instance.disks):
5575 # we have new devices, shutdown the drbd on the old secondary
5576 info("shutting down drbd for disk/%d on old node" % idx)
5577 cfg.SetDiskID(dev, old_node)
5578 msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5580 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5582 hint="Please cleanup this device manually as soon as possible")
5584 info("detaching primary drbds from the network (=> standalone)")
5585 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5586 instance.disks)[pri_node]
5588 msg = result.RemoteFailMsg()
5590 # detaches didn't succeed (unlikely)
5591 self.cfg.ReleaseDRBDMinors(instance.name)
5592 raise errors.OpExecError("Can't detach the disks from the network on"
5593 " old node: %s" % (msg,))
5595 # if we managed to detach at least one, we update all the disks of
5596 # the instance to point to the new secondary
5597 info("updating instance configuration")
5598 for dev, _, new_logical_id in iv_names.itervalues():
5599 dev.logical_id = new_logical_id
5600 cfg.SetDiskID(dev, pri_node)
5601 cfg.Update(instance)
5603 # and now perform the drbd attach
5604 info("attaching primary drbds to new secondary (standalone => connected)")
5605 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5606 instance.disks, instance.name,
5608 for to_node, to_result in result.items():
5609 msg = to_result.RemoteFailMsg()
5611 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5612 hint="please do a gnt-instance info to see the"
5615 # this can fail as the old devices are degraded and _WaitForSync
5616 # does a combined result over all disks, so we don't check its
5618 self.proc.LogStep(5, steps_total, "sync devices")
5619 _WaitForSync(self, instance, unlock=True)
5621 # so check manually all the devices
5622 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5623 cfg.SetDiskID(dev, pri_node)
5624 result = self.rpc.call_blockdev_find(pri_node, dev)
5625 msg = result.RemoteFailMsg()
5626 if not msg and not result.payload:
5627 msg = "disk not found"
5629 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5631 if result.payload[5]:
5632 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5634 self.proc.LogStep(6, steps_total, "removing old storage")
5635 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5636 info("remove logical volumes for disk/%d" % idx)
5638 cfg.SetDiskID(lv, old_node)
5639 msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5641 warning("Can't remove LV on old secondary: %s", msg,
5642 hint="Cleanup stale volumes by hand")
5644 def Exec(self, feedback_fn):
5645 """Execute disk replacement.
5647 This dispatches the disk replacement to the appropriate handler.
5650 instance = self.instance
5652 # Activate the instance disks if we're replacing them on a down instance
5653 if not instance.admin_up:
5654 _StartInstanceDisks(self, instance, True)
5656 if self.op.mode == constants.REPLACE_DISK_CHG:
5657 fn = self._ExecD8Secondary
5659 fn = self._ExecD8DiskOnly
5661 ret = fn(feedback_fn)
5663 # Deactivate the instance disks if we're replacing them on a down instance
5664 if not instance.admin_up:
5665 _SafeShutdownInstanceDisks(self, instance)
5670 class LUGrowDisk(LogicalUnit):
5671 """Grow a disk of an instance.
5675 HTYPE = constants.HTYPE_INSTANCE
5676 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5679 def ExpandNames(self):
5680 self._ExpandAndLockInstance()
5681 self.needed_locks[locking.LEVEL_NODE] = []
5682 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5684 def DeclareLocks(self, level):
5685 if level == locking.LEVEL_NODE:
5686 self._LockInstancesNodes()
5688 def BuildHooksEnv(self):
5691 This runs on the master, the primary and all the secondaries.
5695 "DISK": self.op.disk,
5696 "AMOUNT": self.op.amount,
5698 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5700 self.cfg.GetMasterNode(),
5701 self.instance.primary_node,
5705 def CheckPrereq(self):
5706 """Check prerequisites.
5708 This checks that the instance is in the cluster.
5711 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5712 assert instance is not None, \
5713 "Cannot retrieve locked instance %s" % self.op.instance_name
5714 nodenames = list(instance.all_nodes)
5715 for node in nodenames:
5716 _CheckNodeOnline(self, node)
5719 self.instance = instance
5721 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5722 raise errors.OpPrereqError("Instance's disk layout does not support"
5725 self.disk = instance.FindDisk(self.op.disk)
5727 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5728 instance.hypervisor)
5729 for node in nodenames:
5730 info = nodeinfo[node]
5731 if info.failed or not info.data:
5732 raise errors.OpPrereqError("Cannot get current information"
5733 " from node '%s'" % node)
5734 vg_free = info.data.get('vg_free', None)
5735 if not isinstance(vg_free, int):
5736 raise errors.OpPrereqError("Can't compute free disk space on"
5738 if self.op.amount > vg_free:
5739 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5740 " %d MiB available, %d MiB required" %
5741 (node, vg_free, self.op.amount))
5743 def Exec(self, feedback_fn):
5744 """Execute disk grow.
5747 instance = self.instance
5749 for node in instance.all_nodes:
5750 self.cfg.SetDiskID(disk, node)
5751 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5752 msg = result.RemoteFailMsg()
5754 raise errors.OpExecError("Grow request failed to node %s: %s" %
5756 disk.RecordGrow(self.op.amount)
5757 self.cfg.Update(instance)
5758 if self.op.wait_for_sync:
5759 disk_abort = not _WaitForSync(self, instance)
5761 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5762 " status.\nPlease check the instance.")
5765 class LUQueryInstanceData(NoHooksLU):
5766 """Query runtime instance data.
5769 _OP_REQP = ["instances", "static"]
5772 def ExpandNames(self):
5773 self.needed_locks = {}
5774 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5776 if not isinstance(self.op.instances, list):
5777 raise errors.OpPrereqError("Invalid argument type 'instances'")
5779 if self.op.instances:
5780 self.wanted_names = []
5781 for name in self.op.instances:
5782 full_name = self.cfg.ExpandInstanceName(name)
5783 if full_name is None:
5784 raise errors.OpPrereqError("Instance '%s' not known" % name)
5785 self.wanted_names.append(full_name)
5786 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5788 self.wanted_names = None
5789 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5791 self.needed_locks[locking.LEVEL_NODE] = []
5792 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5794 def DeclareLocks(self, level):
5795 if level == locking.LEVEL_NODE:
5796 self._LockInstancesNodes()
5798 def CheckPrereq(self):
5799 """Check prerequisites.
5801 This only checks the optional instance list against the existing names.
5804 if self.wanted_names is None:
5805 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5807 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5808 in self.wanted_names]
5811 def _ComputeDiskStatus(self, instance, snode, dev):
5812 """Compute block device status.
5815 static = self.op.static
5817 self.cfg.SetDiskID(dev, instance.primary_node)
5818 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5819 if dev_pstatus.offline:
5822 msg = dev_pstatus.RemoteFailMsg()
5824 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5825 (instance.name, msg))
5826 dev_pstatus = dev_pstatus.payload
5830 if dev.dev_type in constants.LDS_DRBD:
5831 # we change the snode then (otherwise we use the one passed in)
5832 if dev.logical_id[0] == instance.primary_node:
5833 snode = dev.logical_id[1]
5835 snode = dev.logical_id[0]
5837 if snode and not static:
5838 self.cfg.SetDiskID(dev, snode)
5839 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5840 if dev_sstatus.offline:
5843 msg = dev_sstatus.RemoteFailMsg()
5845 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5846 (instance.name, msg))
5847 dev_sstatus = dev_sstatus.payload
5852 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5853 for child in dev.children]
5858 "iv_name": dev.iv_name,
5859 "dev_type": dev.dev_type,
5860 "logical_id": dev.logical_id,
5861 "physical_id": dev.physical_id,
5862 "pstatus": dev_pstatus,
5863 "sstatus": dev_sstatus,
5864 "children": dev_children,
5871 def Exec(self, feedback_fn):
5872 """Gather and return data"""
5875 cluster = self.cfg.GetClusterInfo()
5877 for instance in self.wanted_instances:
5878 if not self.op.static:
5879 remote_info = self.rpc.call_instance_info(instance.primary_node,
5881 instance.hypervisor)
5883 remote_info = remote_info.data
5884 if remote_info and "state" in remote_info:
5887 remote_state = "down"
5890 if instance.admin_up:
5893 config_state = "down"
5895 disks = [self._ComputeDiskStatus(instance, None, device)
5896 for device in instance.disks]
5899 "name": instance.name,
5900 "config_state": config_state,
5901 "run_state": remote_state,
5902 "pnode": instance.primary_node,
5903 "snodes": instance.secondary_nodes,
5905 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5907 "hypervisor": instance.hypervisor,
5908 "network_port": instance.network_port,
5909 "hv_instance": instance.hvparams,
5910 "hv_actual": cluster.FillHV(instance),
5911 "be_instance": instance.beparams,
5912 "be_actual": cluster.FillBE(instance),
5915 result[instance.name] = idict
5920 class LUSetInstanceParams(LogicalUnit):
5921 """Modifies an instances's parameters.
5924 HPATH = "instance-modify"
5925 HTYPE = constants.HTYPE_INSTANCE
5926 _OP_REQP = ["instance_name"]
5929 def CheckArguments(self):
5930 if not hasattr(self.op, 'nics'):
5932 if not hasattr(self.op, 'disks'):
5934 if not hasattr(self.op, 'beparams'):
5935 self.op.beparams = {}
5936 if not hasattr(self.op, 'hvparams'):
5937 self.op.hvparams = {}
5938 self.op.force = getattr(self.op, "force", False)
5939 if not (self.op.nics or self.op.disks or
5940 self.op.hvparams or self.op.beparams):
5941 raise errors.OpPrereqError("No changes submitted")
5945 for disk_op, disk_dict in self.op.disks:
5946 if disk_op == constants.DDM_REMOVE:
5949 elif disk_op == constants.DDM_ADD:
5952 if not isinstance(disk_op, int):
5953 raise errors.OpPrereqError("Invalid disk index")
5954 if disk_op == constants.DDM_ADD:
5955 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5956 if mode not in constants.DISK_ACCESS_SET:
5957 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5958 size = disk_dict.get('size', None)
5960 raise errors.OpPrereqError("Required disk parameter size missing")
5963 except ValueError, err:
5964 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5966 disk_dict['size'] = size
5968 # modification of disk
5969 if 'size' in disk_dict:
5970 raise errors.OpPrereqError("Disk size change not possible, use"
5973 if disk_addremove > 1:
5974 raise errors.OpPrereqError("Only one disk add or remove operation"
5975 " supported at a time")
5979 for nic_op, nic_dict in self.op.nics:
5980 if nic_op == constants.DDM_REMOVE:
5983 elif nic_op == constants.DDM_ADD:
5986 if not isinstance(nic_op, int):
5987 raise errors.OpPrereqError("Invalid nic index")
5989 # nic_dict should be a dict
5990 nic_ip = nic_dict.get('ip', None)
5991 if nic_ip is not None:
5992 if nic_ip.lower() == constants.VALUE_NONE:
5993 nic_dict['ip'] = None
5995 if not utils.IsValidIP(nic_ip):
5996 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5998 if nic_op == constants.DDM_ADD:
5999 nic_bridge = nic_dict.get('bridge', None)
6000 if nic_bridge is None:
6001 nic_dict['bridge'] = self.cfg.GetDefBridge()
6002 nic_mac = nic_dict.get('mac', None)
6004 nic_dict['mac'] = constants.VALUE_AUTO
6006 if 'mac' in nic_dict:
6007 nic_mac = nic_dict['mac']
6008 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6009 if not utils.IsValidMac(nic_mac):
6010 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6011 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6012 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6013 " modifying an existing nic")
6015 if nic_addremove > 1:
6016 raise errors.OpPrereqError("Only one NIC add or remove operation"
6017 " supported at a time")
6019 def ExpandNames(self):
6020 self._ExpandAndLockInstance()
6021 self.needed_locks[locking.LEVEL_NODE] = []
6022 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6024 def DeclareLocks(self, level):
6025 if level == locking.LEVEL_NODE:
6026 self._LockInstancesNodes()
6028 def BuildHooksEnv(self):
6031 This runs on the master, primary and secondaries.
6035 if constants.BE_MEMORY in self.be_new:
6036 args['memory'] = self.be_new[constants.BE_MEMORY]
6037 if constants.BE_VCPUS in self.be_new:
6038 args['vcpus'] = self.be_new[constants.BE_VCPUS]
6039 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6040 # information at all.
6043 nic_override = dict(self.op.nics)
6044 for idx, nic in enumerate(self.instance.nics):
6045 if idx in nic_override:
6046 this_nic_override = nic_override[idx]
6048 this_nic_override = {}
6049 if 'ip' in this_nic_override:
6050 ip = this_nic_override['ip']
6053 if 'bridge' in this_nic_override:
6054 bridge = this_nic_override['bridge']
6057 if 'mac' in this_nic_override:
6058 mac = this_nic_override['mac']
6061 args['nics'].append((ip, bridge, mac))
6062 if constants.DDM_ADD in nic_override:
6063 ip = nic_override[constants.DDM_ADD].get('ip', None)
6064 bridge = nic_override[constants.DDM_ADD]['bridge']
6065 mac = nic_override[constants.DDM_ADD]['mac']
6066 args['nics'].append((ip, bridge, mac))
6067 elif constants.DDM_REMOVE in nic_override:
6068 del args['nics'][-1]
6070 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6071 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6074 def CheckPrereq(self):
6075 """Check prerequisites.
6077 This only checks the instance list against the existing names.
6080 force = self.force = self.op.force
6082 # checking the new params on the primary/secondary nodes
6084 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6085 assert self.instance is not None, \
6086 "Cannot retrieve locked instance %s" % self.op.instance_name
6087 pnode = instance.primary_node
6088 nodelist = list(instance.all_nodes)
6090 # hvparams processing
6091 if self.op.hvparams:
6092 i_hvdict = copy.deepcopy(instance.hvparams)
6093 for key, val in self.op.hvparams.iteritems():
6094 if val == constants.VALUE_DEFAULT:
6101 cluster = self.cfg.GetClusterInfo()
6102 utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
6103 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
6106 hypervisor.GetHypervisor(
6107 instance.hypervisor).CheckParameterSyntax(hv_new)
6108 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6109 self.hv_new = hv_new # the new actual values
6110 self.hv_inst = i_hvdict # the new dict (without defaults)
6112 self.hv_new = self.hv_inst = {}
6114 # beparams processing
6115 if self.op.beparams:
6116 i_bedict = copy.deepcopy(instance.beparams)
6117 for key, val in self.op.beparams.iteritems():
6118 if val == constants.VALUE_DEFAULT:
6125 cluster = self.cfg.GetClusterInfo()
6126 utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6127 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6129 self.be_new = be_new # the new actual values
6130 self.be_inst = i_bedict # the new dict (without defaults)
6132 self.be_new = self.be_inst = {}
6136 if constants.BE_MEMORY in self.op.beparams and not self.force:
6137 mem_check_list = [pnode]
6138 if be_new[constants.BE_AUTO_BALANCE]:
6139 # either we changed auto_balance to yes or it was from before
6140 mem_check_list.extend(instance.secondary_nodes)
6141 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6142 instance.hypervisor)
6143 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6144 instance.hypervisor)
6145 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6146 # Assume the primary node is unreachable and go ahead
6147 self.warn.append("Can't get info from primary node %s" % pnode)
6149 if not instance_info.failed and instance_info.data:
6150 current_mem = int(instance_info.data['memory'])
6152 # Assume instance not running
6153 # (there is a slight race condition here, but it's not very probable,
6154 # and we have no other way to check)
6156 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6157 nodeinfo[pnode].data['memory_free'])
6159 raise errors.OpPrereqError("This change will prevent the instance"
6160 " from starting, due to %d MB of memory"
6161 " missing on its primary node" % miss_mem)
6163 if be_new[constants.BE_AUTO_BALANCE]:
6164 for node, nres in nodeinfo.iteritems():
6165 if node not in instance.secondary_nodes:
6167 if nres.failed or not isinstance(nres.data, dict):
6168 self.warn.append("Can't get info from secondary node %s" % node)
6169 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6170 self.warn.append("Not enough memory to failover instance to"
6171 " secondary node %s" % node)
6174 for nic_op, nic_dict in self.op.nics:
6175 if nic_op == constants.DDM_REMOVE:
6176 if not instance.nics:
6177 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6179 if nic_op != constants.DDM_ADD:
6181 if nic_op < 0 or nic_op >= len(instance.nics):
6182 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6184 (nic_op, len(instance.nics)))
6185 if 'bridge' in nic_dict:
6186 nic_bridge = nic_dict['bridge']
6187 if nic_bridge is None:
6188 raise errors.OpPrereqError('Cannot set the nic bridge to None')
6189 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6190 msg = ("Bridge '%s' doesn't exist on one of"
6191 " the instance nodes" % nic_bridge)
6193 self.warn.append(msg)
6195 raise errors.OpPrereqError(msg)
6196 if 'mac' in nic_dict:
6197 nic_mac = nic_dict['mac']
6199 raise errors.OpPrereqError('Cannot set the nic mac to None')
6200 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6201 # otherwise generate the mac
6202 nic_dict['mac'] = self.cfg.GenerateMAC()
6204 # or validate/reserve the current one
6205 if self.cfg.IsMacInUse(nic_mac):
6206 raise errors.OpPrereqError("MAC address %s already in use"
6207 " in cluster" % nic_mac)
6210 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6211 raise errors.OpPrereqError("Disk operations not supported for"
6212 " diskless instances")
6213 for disk_op, disk_dict in self.op.disks:
6214 if disk_op == constants.DDM_REMOVE:
6215 if len(instance.disks) == 1:
6216 raise errors.OpPrereqError("Cannot remove the last disk of"
6218 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6219 ins_l = ins_l[pnode]
6220 if ins_l.failed or not isinstance(ins_l.data, list):
6221 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6222 if instance.name in ins_l.data:
6223 raise errors.OpPrereqError("Instance is running, can't remove"
6226 if (disk_op == constants.DDM_ADD and
6227 len(instance.nics) >= constants.MAX_DISKS):
6228 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6229 " add more" % constants.MAX_DISKS)
6230 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6232 if disk_op < 0 or disk_op >= len(instance.disks):
6233 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6235 (disk_op, len(instance.disks)))
6239 def Exec(self, feedback_fn):
6240 """Modifies an instance.
6242 All parameters take effect only at the next restart of the instance.
6245 # Process here the warnings from CheckPrereq, as we don't have a
6246 # feedback_fn there.
6247 for warn in self.warn:
6248 feedback_fn("WARNING: %s" % warn)
6251 instance = self.instance
6253 for disk_op, disk_dict in self.op.disks:
6254 if disk_op == constants.DDM_REMOVE:
6255 # remove the last disk
6256 device = instance.disks.pop()
6257 device_idx = len(instance.disks)
6258 for node, disk in device.ComputeNodeTree(instance.primary_node):
6259 self.cfg.SetDiskID(disk, node)
6260 msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6262 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6263 " continuing anyway", device_idx, node, msg)
6264 result.append(("disk/%d" % device_idx, "remove"))
6265 elif disk_op == constants.DDM_ADD:
6267 if instance.disk_template == constants.DT_FILE:
6268 file_driver, file_path = instance.disks[0].logical_id
6269 file_path = os.path.dirname(file_path)
6271 file_driver = file_path = None
6272 disk_idx_base = len(instance.disks)
6273 new_disk = _GenerateDiskTemplate(self,
6274 instance.disk_template,
6275 instance.name, instance.primary_node,
6276 instance.secondary_nodes,
6281 instance.disks.append(new_disk)
6282 info = _GetInstanceInfoText(instance)
6284 logging.info("Creating volume %s for instance %s",
6285 new_disk.iv_name, instance.name)
6286 # Note: this needs to be kept in sync with _CreateDisks
6288 for node in instance.all_nodes:
6289 f_create = node == instance.primary_node
6291 _CreateBlockDev(self, node, instance, new_disk,
6292 f_create, info, f_create)
6293 except errors.OpExecError, err:
6294 self.LogWarning("Failed to create volume %s (%s) on"
6296 new_disk.iv_name, new_disk, node, err)
6297 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6298 (new_disk.size, new_disk.mode)))
6300 # change a given disk
6301 instance.disks[disk_op].mode = disk_dict['mode']
6302 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6304 for nic_op, nic_dict in self.op.nics:
6305 if nic_op == constants.DDM_REMOVE:
6306 # remove the last nic
6307 del instance.nics[-1]
6308 result.append(("nic.%d" % len(instance.nics), "remove"))
6309 elif nic_op == constants.DDM_ADD:
6310 # mac and bridge should be set, by now
6311 mac = nic_dict['mac']
6312 bridge = nic_dict['bridge']
6313 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6315 instance.nics.append(new_nic)
6316 result.append(("nic.%d" % (len(instance.nics) - 1),
6317 "add:mac=%s,ip=%s,bridge=%s" %
6318 (new_nic.mac, new_nic.ip, new_nic.bridge)))
6320 # change a given nic
6321 for key in 'mac', 'ip', 'bridge':
6323 setattr(instance.nics[nic_op], key, nic_dict[key])
6324 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6327 if self.op.hvparams:
6328 instance.hvparams = self.hv_inst
6329 for key, val in self.op.hvparams.iteritems():
6330 result.append(("hv/%s" % key, val))
6333 if self.op.beparams:
6334 instance.beparams = self.be_inst
6335 for key, val in self.op.beparams.iteritems():
6336 result.append(("be/%s" % key, val))
6338 self.cfg.Update(instance)
6343 class LUQueryExports(NoHooksLU):
6344 """Query the exports list
6347 _OP_REQP = ['nodes']
6350 def ExpandNames(self):
6351 self.needed_locks = {}
6352 self.share_locks[locking.LEVEL_NODE] = 1
6353 if not self.op.nodes:
6354 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6356 self.needed_locks[locking.LEVEL_NODE] = \
6357 _GetWantedNodes(self, self.op.nodes)
6359 def CheckPrereq(self):
6360 """Check prerequisites.
6363 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6365 def Exec(self, feedback_fn):
6366 """Compute the list of all the exported system images.
6369 @return: a dictionary with the structure node->(export-list)
6370 where export-list is a list of the instances exported on
6374 rpcresult = self.rpc.call_export_list(self.nodes)
6376 for node in rpcresult:
6377 if rpcresult[node].failed:
6378 result[node] = False
6380 result[node] = rpcresult[node].data
6385 class LUExportInstance(LogicalUnit):
6386 """Export an instance to an image in the cluster.
6389 HPATH = "instance-export"
6390 HTYPE = constants.HTYPE_INSTANCE
6391 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6394 def ExpandNames(self):
6395 self._ExpandAndLockInstance()
6396 # FIXME: lock only instance primary and destination node
6398 # Sad but true, for now we have do lock all nodes, as we don't know where
6399 # the previous export might be, and and in this LU we search for it and
6400 # remove it from its current node. In the future we could fix this by:
6401 # - making a tasklet to search (share-lock all), then create the new one,
6402 # then one to remove, after
6403 # - removing the removal operation altoghether
6404 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6406 def DeclareLocks(self, level):
6407 """Last minute lock declaration."""
6408 # All nodes are locked anyway, so nothing to do here.
6410 def BuildHooksEnv(self):
6413 This will run on the master, primary node and target node.
6417 "EXPORT_NODE": self.op.target_node,
6418 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6420 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6421 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6422 self.op.target_node]
6425 def CheckPrereq(self):
6426 """Check prerequisites.
6428 This checks that the instance and node names are valid.
6431 instance_name = self.op.instance_name
6432 self.instance = self.cfg.GetInstanceInfo(instance_name)
6433 assert self.instance is not None, \
6434 "Cannot retrieve locked instance %s" % self.op.instance_name
6435 _CheckNodeOnline(self, self.instance.primary_node)
6437 self.dst_node = self.cfg.GetNodeInfo(
6438 self.cfg.ExpandNodeName(self.op.target_node))
6440 if self.dst_node is None:
6441 # This is wrong node name, not a non-locked node
6442 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6443 _CheckNodeOnline(self, self.dst_node.name)
6444 _CheckNodeNotDrained(self, self.dst_node.name)
6446 # instance disk type verification
6447 for disk in self.instance.disks:
6448 if disk.dev_type == constants.LD_FILE:
6449 raise errors.OpPrereqError("Export not supported for instances with"
6450 " file-based disks")
6452 def Exec(self, feedback_fn):
6453 """Export an instance to an image in the cluster.
6456 instance = self.instance
6457 dst_node = self.dst_node
6458 src_node = instance.primary_node
6459 if self.op.shutdown:
6460 # shutdown the instance, but not the disks
6461 result = self.rpc.call_instance_shutdown(src_node, instance)
6462 msg = result.RemoteFailMsg()
6464 raise errors.OpExecError("Could not shutdown instance %s on"
6466 (instance.name, src_node, msg))
6468 vgname = self.cfg.GetVGName()
6472 # set the disks ID correctly since call_instance_start needs the
6473 # correct drbd minor to create the symlinks
6474 for disk in instance.disks:
6475 self.cfg.SetDiskID(disk, src_node)
6480 for idx, disk in enumerate(instance.disks):
6481 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6482 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6483 if new_dev_name.failed or not new_dev_name.data:
6484 self.LogWarning("Could not snapshot disk/%d on node %s",
6486 snap_disks.append(False)
6488 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6489 logical_id=(vgname, new_dev_name.data),
6490 physical_id=(vgname, new_dev_name.data),
6491 iv_name=disk.iv_name)
6492 snap_disks.append(new_dev)
6495 if self.op.shutdown and instance.admin_up:
6496 result = self.rpc.call_instance_start(src_node, instance, None, None)
6497 msg = result.RemoteFailMsg()
6499 _ShutdownInstanceDisks(self, instance)
6500 raise errors.OpExecError("Could not start instance: %s" % msg)
6502 # TODO: check for size
6504 cluster_name = self.cfg.GetClusterName()
6505 for idx, dev in enumerate(snap_disks):
6507 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6508 instance, cluster_name, idx)
6509 if result.failed or not result.data:
6510 self.LogWarning("Could not export disk/%d from node %s to"
6511 " node %s", idx, src_node, dst_node.name)
6512 dresults.append(False)
6514 dresults.append(True)
6515 msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6517 self.LogWarning("Could not remove snapshot for disk/%d from node"
6518 " %s: %s", idx, src_node, msg)
6520 dresults.append(False)
6522 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6524 if result.failed or not result.data:
6525 self.LogWarning("Could not finalize export for instance %s on node %s",
6526 instance.name, dst_node.name)
6529 nodelist = self.cfg.GetNodeList()
6530 nodelist.remove(dst_node.name)
6532 # on one-node clusters nodelist will be empty after the removal
6533 # if we proceed the backup would be removed because OpQueryExports
6534 # substitutes an empty list with the full cluster node list.
6536 exportlist = self.rpc.call_export_list(nodelist)
6537 for node in exportlist:
6538 if exportlist[node].failed:
6540 if instance.name in exportlist[node].data:
6541 if not self.rpc.call_export_remove(node, instance.name):
6542 self.LogWarning("Could not remove older export for instance %s"
6543 " on node %s", instance.name, node)
6544 return fin_resu, dresults
6547 class LURemoveExport(NoHooksLU):
6548 """Remove exports related to the named instance.
6551 _OP_REQP = ["instance_name"]
6554 def ExpandNames(self):
6555 self.needed_locks = {}
6556 # We need all nodes to be locked in order for RemoveExport to work, but we
6557 # don't need to lock the instance itself, as nothing will happen to it (and
6558 # we can remove exports also for a removed instance)
6559 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6561 def CheckPrereq(self):
6562 """Check prerequisites.
6566 def Exec(self, feedback_fn):
6567 """Remove any export.
6570 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6571 # If the instance was not found we'll try with the name that was passed in.
6572 # This will only work if it was an FQDN, though.
6574 if not instance_name:
6576 instance_name = self.op.instance_name
6578 exportlist = self.rpc.call_export_list(self.acquired_locks[
6579 locking.LEVEL_NODE])
6581 for node in exportlist:
6582 if exportlist[node].failed:
6583 self.LogWarning("Failed to query node %s, continuing" % node)
6585 if instance_name in exportlist[node].data:
6587 result = self.rpc.call_export_remove(node, instance_name)
6588 if result.failed or not result.data:
6589 logging.error("Could not remove export for instance %s"
6590 " on node %s", instance_name, node)
6592 if fqdn_warn and not found:
6593 feedback_fn("Export not found. If trying to remove an export belonging"
6594 " to a deleted instance please use its Fully Qualified"
6598 class TagsLU(NoHooksLU):
6601 This is an abstract class which is the parent of all the other tags LUs.
6605 def ExpandNames(self):
6606 self.needed_locks = {}
6607 if self.op.kind == constants.TAG_NODE:
6608 name = self.cfg.ExpandNodeName(self.op.name)
6610 raise errors.OpPrereqError("Invalid node name (%s)" %
6613 self.needed_locks[locking.LEVEL_NODE] = name
6614 elif self.op.kind == constants.TAG_INSTANCE:
6615 name = self.cfg.ExpandInstanceName(self.op.name)
6617 raise errors.OpPrereqError("Invalid instance name (%s)" %
6620 self.needed_locks[locking.LEVEL_INSTANCE] = name
6622 def CheckPrereq(self):
6623 """Check prerequisites.
6626 if self.op.kind == constants.TAG_CLUSTER:
6627 self.target = self.cfg.GetClusterInfo()
6628 elif self.op.kind == constants.TAG_NODE:
6629 self.target = self.cfg.GetNodeInfo(self.op.name)
6630 elif self.op.kind == constants.TAG_INSTANCE:
6631 self.target = self.cfg.GetInstanceInfo(self.op.name)
6633 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6637 class LUGetTags(TagsLU):
6638 """Returns the tags of a given object.
6641 _OP_REQP = ["kind", "name"]
6644 def Exec(self, feedback_fn):
6645 """Returns the tag list.
6648 return list(self.target.GetTags())
6651 class LUSearchTags(NoHooksLU):
6652 """Searches the tags for a given pattern.
6655 _OP_REQP = ["pattern"]
6658 def ExpandNames(self):
6659 self.needed_locks = {}
6661 def CheckPrereq(self):
6662 """Check prerequisites.
6664 This checks the pattern passed for validity by compiling it.
6668 self.re = re.compile(self.op.pattern)
6669 except re.error, err:
6670 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6671 (self.op.pattern, err))
6673 def Exec(self, feedback_fn):
6674 """Returns the tag list.
6678 tgts = [("/cluster", cfg.GetClusterInfo())]
6679 ilist = cfg.GetAllInstancesInfo().values()
6680 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6681 nlist = cfg.GetAllNodesInfo().values()
6682 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6684 for path, target in tgts:
6685 for tag in target.GetTags():
6686 if self.re.search(tag):
6687 results.append((path, tag))
6691 class LUAddTags(TagsLU):
6692 """Sets a tag on a given object.
6695 _OP_REQP = ["kind", "name", "tags"]
6698 def CheckPrereq(self):
6699 """Check prerequisites.
6701 This checks the type and length of the tag name and value.
6704 TagsLU.CheckPrereq(self)
6705 for tag in self.op.tags:
6706 objects.TaggableObject.ValidateTag(tag)
6708 def Exec(self, feedback_fn):
6713 for tag in self.op.tags:
6714 self.target.AddTag(tag)
6715 except errors.TagError, err:
6716 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6718 self.cfg.Update(self.target)
6719 except errors.ConfigurationError:
6720 raise errors.OpRetryError("There has been a modification to the"
6721 " config file and the operation has been"
6722 " aborted. Please retry.")
6725 class LUDelTags(TagsLU):
6726 """Delete a list of tags from a given object.
6729 _OP_REQP = ["kind", "name", "tags"]
6732 def CheckPrereq(self):
6733 """Check prerequisites.
6735 This checks that we have the given tag.
6738 TagsLU.CheckPrereq(self)
6739 for tag in self.op.tags:
6740 objects.TaggableObject.ValidateTag(tag)
6741 del_tags = frozenset(self.op.tags)
6742 cur_tags = self.target.GetTags()
6743 if not del_tags <= cur_tags:
6744 diff_tags = del_tags - cur_tags
6745 diff_names = ["'%s'" % tag for tag in diff_tags]
6747 raise errors.OpPrereqError("Tag(s) %s not found" %
6748 (",".join(diff_names)))
6750 def Exec(self, feedback_fn):
6751 """Remove the tag from the object.
6754 for tag in self.op.tags:
6755 self.target.RemoveTag(tag)
6757 self.cfg.Update(self.target)
6758 except errors.ConfigurationError:
6759 raise errors.OpRetryError("There has been a modification to the"
6760 " config file and the operation has been"
6761 " aborted. Please retry.")
6764 class LUTestDelay(NoHooksLU):
6765 """Sleep for a specified amount of time.
6767 This LU sleeps on the master and/or nodes for a specified amount of
6771 _OP_REQP = ["duration", "on_master", "on_nodes"]
6774 def ExpandNames(self):
6775 """Expand names and set required locks.
6777 This expands the node list, if any.
6780 self.needed_locks = {}
6781 if self.op.on_nodes:
6782 # _GetWantedNodes can be used here, but is not always appropriate to use
6783 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6785 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6786 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6788 def CheckPrereq(self):
6789 """Check prerequisites.
6793 def Exec(self, feedback_fn):
6794 """Do the actual sleep.
6797 if self.op.on_master:
6798 if not utils.TestDelay(self.op.duration):
6799 raise errors.OpExecError("Error during master delay test")
6800 if self.op.on_nodes:
6801 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6803 raise errors.OpExecError("Complete failure from rpc call")
6804 for node, node_result in result.items():
6806 if not node_result.data:
6807 raise errors.OpExecError("Failure during rpc call to node %s,"
6808 " result: %s" % (node, node_result.data))
6811 class IAllocator(object):
6812 """IAllocator framework.
6814 An IAllocator instance has three sets of attributes:
6815 - cfg that is needed to query the cluster
6816 - input data (all members of the _KEYS class attribute are required)
6817 - four buffer attributes (in|out_data|text), that represent the
6818 input (to the external script) in text and data structure format,
6819 and the output from it, again in two formats
6820 - the result variables from the script (success, info, nodes) for
6825 "mem_size", "disks", "disk_template",
6826 "os", "tags", "nics", "vcpus", "hypervisor",
6832 def __init__(self, lu, mode, name, **kwargs):
6834 # init buffer variables
6835 self.in_text = self.out_text = self.in_data = self.out_data = None
6836 # init all input fields so that pylint is happy
6839 self.mem_size = self.disks = self.disk_template = None
6840 self.os = self.tags = self.nics = self.vcpus = None
6841 self.hypervisor = None
6842 self.relocate_from = None
6844 self.required_nodes = None
6845 # init result fields
6846 self.success = self.info = self.nodes = None
6847 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6848 keyset = self._ALLO_KEYS
6849 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6850 keyset = self._RELO_KEYS
6852 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6853 " IAllocator" % self.mode)
6855 if key not in keyset:
6856 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6857 " IAllocator" % key)
6858 setattr(self, key, kwargs[key])
6860 if key not in kwargs:
6861 raise errors.ProgrammerError("Missing input parameter '%s' to"
6862 " IAllocator" % key)
6863 self._BuildInputData()
6865 def _ComputeClusterData(self):
6866 """Compute the generic allocator input data.
6868 This is the data that is independent of the actual operation.
6872 cluster_info = cfg.GetClusterInfo()
6875 "version": constants.IALLOCATOR_VERSION,
6876 "cluster_name": cfg.GetClusterName(),
6877 "cluster_tags": list(cluster_info.GetTags()),
6878 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6879 # we don't have job IDs
6881 iinfo = cfg.GetAllInstancesInfo().values()
6882 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6886 node_list = cfg.GetNodeList()
6888 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6889 hypervisor_name = self.hypervisor
6890 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6891 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6893 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6895 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6896 cluster_info.enabled_hypervisors)
6897 for nname, nresult in node_data.items():
6898 # first fill in static (config-based) values
6899 ninfo = cfg.GetNodeInfo(nname)
6901 "tags": list(ninfo.GetTags()),
6902 "primary_ip": ninfo.primary_ip,
6903 "secondary_ip": ninfo.secondary_ip,
6904 "offline": ninfo.offline,
6905 "drained": ninfo.drained,
6906 "master_candidate": ninfo.master_candidate,
6909 if not ninfo.offline:
6911 if not isinstance(nresult.data, dict):
6912 raise errors.OpExecError("Can't get data for node %s" % nname)
6913 remote_info = nresult.data
6914 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6915 'vg_size', 'vg_free', 'cpu_total']:
6916 if attr not in remote_info:
6917 raise errors.OpExecError("Node '%s' didn't return attribute"
6918 " '%s'" % (nname, attr))
6920 remote_info[attr] = int(remote_info[attr])
6921 except ValueError, err:
6922 raise errors.OpExecError("Node '%s' returned invalid value"
6923 " for '%s': %s" % (nname, attr, err))
6924 # compute memory used by primary instances
6925 i_p_mem = i_p_up_mem = 0
6926 for iinfo, beinfo in i_list:
6927 if iinfo.primary_node == nname:
6928 i_p_mem += beinfo[constants.BE_MEMORY]
6929 if iinfo.name not in node_iinfo[nname].data:
6932 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6933 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6934 remote_info['memory_free'] -= max(0, i_mem_diff)
6937 i_p_up_mem += beinfo[constants.BE_MEMORY]
6939 # compute memory used by instances
6941 "total_memory": remote_info['memory_total'],
6942 "reserved_memory": remote_info['memory_dom0'],
6943 "free_memory": remote_info['memory_free'],
6944 "total_disk": remote_info['vg_size'],
6945 "free_disk": remote_info['vg_free'],
6946 "total_cpus": remote_info['cpu_total'],
6947 "i_pri_memory": i_p_mem,
6948 "i_pri_up_memory": i_p_up_mem,
6952 node_results[nname] = pnr
6953 data["nodes"] = node_results
6957 for iinfo, beinfo in i_list:
6958 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6959 for n in iinfo.nics]
6961 "tags": list(iinfo.GetTags()),
6962 "admin_up": iinfo.admin_up,
6963 "vcpus": beinfo[constants.BE_VCPUS],
6964 "memory": beinfo[constants.BE_MEMORY],
6966 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6968 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6969 "disk_template": iinfo.disk_template,
6970 "hypervisor": iinfo.hypervisor,
6972 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6974 instance_data[iinfo.name] = pir
6976 data["instances"] = instance_data
6980 def _AddNewInstance(self):
6981 """Add new instance data to allocator structure.
6983 This in combination with _AllocatorGetClusterData will create the
6984 correct structure needed as input for the allocator.
6986 The checks for the completeness of the opcode must have already been
6992 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6994 if self.disk_template in constants.DTS_NET_MIRROR:
6995 self.required_nodes = 2
6997 self.required_nodes = 1
7001 "disk_template": self.disk_template,
7004 "vcpus": self.vcpus,
7005 "memory": self.mem_size,
7006 "disks": self.disks,
7007 "disk_space_total": disk_space,
7009 "required_nodes": self.required_nodes,
7011 data["request"] = request
7013 def _AddRelocateInstance(self):
7014 """Add relocate instance data to allocator structure.
7016 This in combination with _IAllocatorGetClusterData will create the
7017 correct structure needed as input for the allocator.
7019 The checks for the completeness of the opcode must have already been
7023 instance = self.lu.cfg.GetInstanceInfo(self.name)
7024 if instance is None:
7025 raise errors.ProgrammerError("Unknown instance '%s' passed to"
7026 " IAllocator" % self.name)
7028 if instance.disk_template not in constants.DTS_NET_MIRROR:
7029 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7031 if len(instance.secondary_nodes) != 1:
7032 raise errors.OpPrereqError("Instance has not exactly one secondary node")
7034 self.required_nodes = 1
7035 disk_sizes = [{'size': disk.size} for disk in instance.disks]
7036 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7041 "disk_space_total": disk_space,
7042 "required_nodes": self.required_nodes,
7043 "relocate_from": self.relocate_from,
7045 self.in_data["request"] = request
7047 def _BuildInputData(self):
7048 """Build input data structures.
7051 self._ComputeClusterData()
7053 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7054 self._AddNewInstance()
7056 self._AddRelocateInstance()
7058 self.in_text = serializer.Dump(self.in_data)
7060 def Run(self, name, validate=True, call_fn=None):
7061 """Run an instance allocator and return the results.
7065 call_fn = self.lu.rpc.call_iallocator_runner
7068 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7071 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7072 raise errors.OpExecError("Invalid result from master iallocator runner")
7074 rcode, stdout, stderr, fail = result.data
7076 if rcode == constants.IARUN_NOTFOUND:
7077 raise errors.OpExecError("Can't find allocator '%s'" % name)
7078 elif rcode == constants.IARUN_FAILURE:
7079 raise errors.OpExecError("Instance allocator call failed: %s,"
7080 " output: %s" % (fail, stdout+stderr))
7081 self.out_text = stdout
7083 self._ValidateResult()
7085 def _ValidateResult(self):
7086 """Process the allocator results.
7088 This will process and if successful save the result in
7089 self.out_data and the other parameters.
7093 rdict = serializer.Load(self.out_text)
7094 except Exception, err:
7095 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7097 if not isinstance(rdict, dict):
7098 raise errors.OpExecError("Can't parse iallocator results: not a dict")
7100 for key in "success", "info", "nodes":
7101 if key not in rdict:
7102 raise errors.OpExecError("Can't parse iallocator results:"
7103 " missing key '%s'" % key)
7104 setattr(self, key, rdict[key])
7106 if not isinstance(rdict["nodes"], list):
7107 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7109 self.out_data = rdict
7112 class LUTestAllocator(NoHooksLU):
7113 """Run allocator tests.
7115 This LU runs the allocator tests
7118 _OP_REQP = ["direction", "mode", "name"]
7120 def CheckPrereq(self):
7121 """Check prerequisites.
7123 This checks the opcode parameters depending on the director and mode test.
7126 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7127 for attr in ["name", "mem_size", "disks", "disk_template",
7128 "os", "tags", "nics", "vcpus"]:
7129 if not hasattr(self.op, attr):
7130 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7132 iname = self.cfg.ExpandInstanceName(self.op.name)
7133 if iname is not None:
7134 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7136 if not isinstance(self.op.nics, list):
7137 raise errors.OpPrereqError("Invalid parameter 'nics'")
7138 for row in self.op.nics:
7139 if (not isinstance(row, dict) or
7142 "bridge" not in row):
7143 raise errors.OpPrereqError("Invalid contents of the"
7144 " 'nics' parameter")
7145 if not isinstance(self.op.disks, list):
7146 raise errors.OpPrereqError("Invalid parameter 'disks'")
7147 for row in self.op.disks:
7148 if (not isinstance(row, dict) or
7149 "size" not in row or
7150 not isinstance(row["size"], int) or
7151 "mode" not in row or
7152 row["mode"] not in ['r', 'w']):
7153 raise errors.OpPrereqError("Invalid contents of the"
7154 " 'disks' parameter")
7155 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7156 self.op.hypervisor = self.cfg.GetHypervisorType()
7157 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7158 if not hasattr(self.op, "name"):
7159 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7160 fname = self.cfg.ExpandInstanceName(self.op.name)
7162 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7164 self.op.name = fname
7165 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7167 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7170 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7171 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7172 raise errors.OpPrereqError("Missing allocator name")
7173 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7174 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7177 def Exec(self, feedback_fn):
7178 """Run the allocator test.
7181 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7182 ial = IAllocator(self,
7185 mem_size=self.op.mem_size,
7186 disks=self.op.disks,
7187 disk_template=self.op.disk_template,
7191 vcpus=self.op.vcpus,
7192 hypervisor=self.op.hypervisor,
7195 ial = IAllocator(self,
7198 relocate_from=list(self.relocate_from),
7201 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7202 result = ial.in_text
7204 ial.Run(self.op.allocator, validate=False)
7205 result = ial.out_text