4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 for attr_name in self._OP_REQP:
94 attr_val = getattr(op, attr_name, None)
96 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108 ssh = property(fget=__GetSSH)
110 def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments.
113 This method is for doing a simple syntactic check and ensure
114 validity of opcode parameters, without any cluster-related
115 checks. While the same can be accomplished in ExpandNames and/or
116 CheckPrereq, doing these separate is better because:
118 - ExpandNames is left as as purely a lock-related function
119 - CheckPrereq is run after we have aquired locks (and possible
122 The function is allowed to change the self.op attribute so that
123 later methods can no longer worry about missing parameters.
128 def ExpandNames(self):
129 """Expand names for this LU.
131 This method is called before starting to execute the opcode, and it should
132 update all the parameters of the opcode to their canonical form (e.g. a
133 short node name must be fully expanded after this method has successfully
134 completed). This way locking, hooks, logging, ecc. can work correctly.
136 LUs which implement this method must also populate the self.needed_locks
137 member, as a dict with lock levels as keys, and a list of needed lock names
140 - use an empty dict if you don't need any lock
141 - if you don't need any lock at a particular level omit that level
142 - don't put anything for the BGL level
143 - if you want all locks at a level use locking.ALL_SET as a value
145 If you need to share locks (rather than acquire them exclusively) at one
146 level you can modify self.share_locks, setting a true value (usually 1) for
147 that level. By default locks are not shared.
151 # Acquire all nodes and one instance
152 self.needed_locks = {
153 locking.LEVEL_NODE: locking.ALL_SET,
154 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156 # Acquire just two nodes
157 self.needed_locks = {
158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161 self.needed_locks = {} # No, you can't leave it to the default value None
164 # The implementation of this method is mandatory only if the new LU is
165 # concurrent, so that old LUs don't need to be changed all at the same
168 self.needed_locks = {} # Exclusive LUs don't need locks.
170 raise NotImplementedError
172 def DeclareLocks(self, level):
173 """Declare LU locking needs for a level
175 While most LUs can just declare their locking needs at ExpandNames time,
176 sometimes there's the need to calculate some locks after having acquired
177 the ones before. This function is called just before acquiring locks at a
178 particular level, but after acquiring the ones at lower levels, and permits
179 such calculations. It can be used to modify self.needed_locks, and by
180 default it does nothing.
182 This function is only called if you have something already set in
183 self.needed_locks for the level.
185 @param level: Locking level which is going to be locked
186 @type level: member of ganeti.locking.LEVELS
190 def CheckPrereq(self):
191 """Check prerequisites for this LU.
193 This method should check that the prerequisites for the execution
194 of this LU are fulfilled. It can do internode communication, but
195 it should be idempotent - no cluster or system changes are
198 The method should raise errors.OpPrereqError in case something is
199 not fulfilled. Its return value is ignored.
201 This method should also update all the parameters of the opcode to
202 their canonical form if it hasn't been done by ExpandNames before.
205 raise NotImplementedError
207 def Exec(self, feedback_fn):
210 This method should implement the actual work. It should raise
211 errors.OpExecError for failures that are somewhat dealt with in
215 raise NotImplementedError
217 def BuildHooksEnv(self):
218 """Build hooks environment for this LU.
220 This method should return a three-node tuple consisting of: a dict
221 containing the environment that will be used for running the
222 specific hook for this LU, a list of node names on which the hook
223 should run before the execution, and a list of node names on which
224 the hook should run after the execution.
226 The keys of the dict must not have 'GANETI_' prefixed as this will
227 be handled in the hooks runner. Also note additional keys will be
228 added by the hooks runner. If the LU doesn't define any
229 environment, an empty dict (and not None) should be returned.
231 No nodes should be returned as an empty list (and not None).
233 Note that if the HPATH for a LU class is None, this function will
237 raise NotImplementedError
239 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks.
242 This method is called every time a hooks phase is executed, and notifies
243 the Logical Unit about the hooks' result. The LU can then use it to alter
244 its result based on the hooks. By default the method does nothing and the
245 previous result is passed back unchanged but any LU can define it if it
246 wants to use the local cluster hook-scripts somehow.
248 @param phase: one of L{constants.HOOKS_PHASE_POST} or
249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250 @param hook_results: the results of the multi-node hooks rpc call
251 @param feedback_fn: function used send feedback back to the caller
252 @param lu_result: the previous Exec result this LU had, or None
254 @return: the new Exec result, based on the previous result
260 def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance.
263 Many LUs that work on an instance take its name in self.op.instance_name
264 and need to expand it and then declare the expanded name for locking. This
265 function does it, and then updates self.op.instance_name to the expanded
266 name. It also initializes needed_locks as a dict, if this hasn't been done
270 if self.needed_locks is None:
271 self.needed_locks = {}
273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274 "_ExpandAndLockInstance called with instance-level locks set"
275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276 if expanded_name is None:
277 raise errors.OpPrereqError("Instance '%s' not known" %
278 self.op.instance_name)
279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280 self.op.instance_name = expanded_name
282 def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking.
285 This function should be called after locking one or more instances to lock
286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287 with all primary or secondary nodes for instances already locked and
288 present in self.needed_locks[locking.LEVEL_INSTANCE].
290 It should be called from DeclareLocks, and for safety only works if
291 self.recalculate_locks[locking.LEVEL_NODE] is set.
293 In the future it may grow parameters to just lock some instance's nodes, or
294 to just lock primaries or secondary nodes, if needed.
296 If should be called in DeclareLocks in a way similar to::
298 if level == locking.LEVEL_NODE:
299 self._LockInstancesNodes()
301 @type primary_only: boolean
302 @param primary_only: only lock primary nodes of locked instances
305 assert locking.LEVEL_NODE in self.recalculate_locks, \
306 "_LockInstancesNodes helper function called with no nodes to recalculate"
308 # TODO: check if we're really been called with the instance locks held
310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311 # future we might want to have different behaviors depending on the value
312 # of self.recalculate_locks[locking.LEVEL_NODE]
314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315 instance = self.context.cfg.GetInstanceInfo(instance_name)
316 wanted_nodes.append(instance.primary_node)
318 wanted_nodes.extend(instance.secondary_nodes)
320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325 del self.recalculate_locks[locking.LEVEL_NODE]
328 class NoHooksLU(LogicalUnit):
329 """Simple LU which runs no hooks.
331 This LU is intended as a parent for other LogicalUnits which will
332 run no hooks, in order to reduce duplicate code.
339 def _GetWantedNodes(lu, nodes):
340 """Returns list of checked and expanded node names.
342 @type lu: L{LogicalUnit}
343 @param lu: the logical unit on whose behalf we execute
345 @param nodes: list of node names or None for all nodes
347 @return: the list of nodes, sorted
348 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351 if not isinstance(nodes, list):
352 raise errors.OpPrereqError("Invalid argument type 'nodes'")
355 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356 " non-empty list of nodes whose name is to be expanded.")
360 node = lu.cfg.ExpandNodeName(name)
362 raise errors.OpPrereqError("No such node name '%s'" % name)
365 return utils.NiceSort(wanted)
368 def _GetWantedInstances(lu, instances):
369 """Returns list of checked and expanded instance names.
371 @type lu: L{LogicalUnit}
372 @param lu: the logical unit on whose behalf we execute
373 @type instances: list
374 @param instances: list of instance names or None for all instances
376 @return: the list of instances, sorted
377 @raise errors.OpPrereqError: if the instances parameter is wrong type
378 @raise errors.OpPrereqError: if any of the passed instances is not found
381 if not isinstance(instances, list):
382 raise errors.OpPrereqError("Invalid argument type 'instances'")
387 for name in instances:
388 instance = lu.cfg.ExpandInstanceName(name)
390 raise errors.OpPrereqError("No such instance name '%s'" % name)
391 wanted.append(instance)
394 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398 def _CheckOutputFields(static, dynamic, selected):
399 """Checks whether all selected fields are valid.
401 @type static: L{utils.FieldSet}
402 @param static: static fields set
403 @type dynamic: L{utils.FieldSet}
404 @param dynamic: dynamic fields set
411 delta = f.NonMatching(selected)
413 raise errors.OpPrereqError("Unknown output fields selected: %s"
417 def _CheckBooleanOpField(op, name):
418 """Validates boolean opcode parameters.
420 This will ensure that an opcode parameter is either a boolean value,
421 or None (but that it always exists).
424 val = getattr(op, name, None)
425 if not (val is None or isinstance(val, bool)):
426 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428 setattr(op, name, val)
431 def _CheckNodeOnline(lu, node):
432 """Ensure that a given node is online.
434 @param lu: the LU on behalf of which we make the check
435 @param node: the node to check
436 @raise errors.OpPrereqError: if the node is offline
439 if lu.cfg.GetNodeInfo(node).offline:
440 raise errors.OpPrereqError("Can't use offline node %s" % node)
443 def _CheckNodeNotDrained(lu, node):
444 """Ensure that a given node is not drained.
446 @param lu: the LU on behalf of which we make the check
447 @param node: the node to check
448 @raise errors.OpPrereqError: if the node is drained
451 if lu.cfg.GetNodeInfo(node).drained:
452 raise errors.OpPrereqError("Can't use drained node %s" % node)
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456 memory, vcpus, nics, disk_template, disks,
457 bep, hvp, hypervisor):
458 """Builds instance related env variables for hooks
460 This builds the hook environment from individual variables.
463 @param name: the name of the instance
464 @type primary_node: string
465 @param primary_node: the name of the instance's primary node
466 @type secondary_nodes: list
467 @param secondary_nodes: list of secondary nodes as strings
468 @type os_type: string
469 @param os_type: the name of the instance's OS
470 @type status: boolean
471 @param status: the should_run status of the instance
473 @param memory: the memory size of the instance
475 @param vcpus: the count of VCPUs the instance has
477 @param nics: list of tuples (ip, bridge, mac) representing
478 the NICs the instance has
479 @type disk_template: string
480 @param disk_template: the distk template of the instance
482 @param disks: the list of (size, mode) pairs
484 @param bep: the backend parameters for the instance
486 @param hvp: the hypervisor parameters for the instance
487 @type hypervisor: string
488 @param hypervisor: the hypervisor for the instance
490 @return: the hook environment for this instance
499 "INSTANCE_NAME": name,
500 "INSTANCE_PRIMARY": primary_node,
501 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502 "INSTANCE_OS_TYPE": os_type,
503 "INSTANCE_STATUS": str_status,
504 "INSTANCE_MEMORY": memory,
505 "INSTANCE_VCPUS": vcpus,
506 "INSTANCE_DISK_TEMPLATE": disk_template,
507 "INSTANCE_HYPERVISOR": hypervisor,
511 nic_count = len(nics)
512 for idx, (ip, bridge, mac) in enumerate(nics):
515 env["INSTANCE_NIC%d_IP" % idx] = ip
516 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517 env["INSTANCE_NIC%d_MAC" % idx] = mac
521 env["INSTANCE_NIC_COUNT"] = nic_count
524 disk_count = len(disks)
525 for idx, (size, mode) in enumerate(disks):
526 env["INSTANCE_DISK%d_SIZE" % idx] = size
527 env["INSTANCE_DISK%d_MODE" % idx] = mode
531 env["INSTANCE_DISK_COUNT"] = disk_count
533 for source, kind in [(bep, "BE"), (hvp, "HV")]:
534 for key, value in source.items():
535 env["INSTANCE_%s_%s" % (kind, key)] = value
540 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541 """Builds instance related env variables for hooks from an object.
543 @type lu: L{LogicalUnit}
544 @param lu: the logical unit on whose behalf we execute
545 @type instance: L{objects.Instance}
546 @param instance: the instance for which we should build the
549 @param override: dictionary with key/values that will override
552 @return: the hook environment dictionary
555 cluster = lu.cfg.GetClusterInfo()
556 bep = cluster.FillBE(instance)
557 hvp = cluster.FillHV(instance)
559 'name': instance.name,
560 'primary_node': instance.primary_node,
561 'secondary_nodes': instance.secondary_nodes,
562 'os_type': instance.os,
563 'status': instance.admin_up,
564 'memory': bep[constants.BE_MEMORY],
565 'vcpus': bep[constants.BE_VCPUS],
566 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567 'disk_template': instance.disk_template,
568 'disks': [(disk.size, disk.mode) for disk in instance.disks],
571 'hypervisor': instance.hypervisor,
574 args.update(override)
575 return _BuildInstanceHookEnv(**args)
578 def _AdjustCandidatePool(lu):
579 """Adjust the candidate pool after node operations.
582 mod_list = lu.cfg.MaintainCandidatePool()
584 lu.LogInfo("Promoted nodes to master candidate role: %s",
585 ", ".join(node.name for node in mod_list))
586 for name in mod_list:
587 lu.context.ReaddNode(name)
588 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
590 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
594 def _CheckInstanceBridgesExist(lu, instance):
595 """Check that the brigdes needed by an instance exist.
598 # check bridges existance
599 brlist = [nic.bridge for nic in instance.nics]
600 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
603 raise errors.OpPrereqError("One or more target bridges %s does not"
604 " exist on destination node '%s'" %
605 (brlist, instance.primary_node))
608 class LUDestroyCluster(NoHooksLU):
609 """Logical unit for destroying the cluster.
614 def CheckPrereq(self):
615 """Check prerequisites.
617 This checks whether the cluster is empty.
619 Any errors are signalled by raising errors.OpPrereqError.
622 master = self.cfg.GetMasterNode()
624 nodelist = self.cfg.GetNodeList()
625 if len(nodelist) != 1 or nodelist[0] != master:
626 raise errors.OpPrereqError("There are still %d node(s) in"
627 " this cluster." % (len(nodelist) - 1))
628 instancelist = self.cfg.GetInstanceList()
630 raise errors.OpPrereqError("There are still %d instance(s) in"
631 " this cluster." % len(instancelist))
633 def Exec(self, feedback_fn):
634 """Destroys the cluster.
637 master = self.cfg.GetMasterNode()
638 result = self.rpc.call_node_stop_master(master, False)
641 raise errors.OpExecError("Could not disable the master role")
642 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643 utils.CreateBackup(priv_key)
644 utils.CreateBackup(pub_key)
648 class LUVerifyCluster(LogicalUnit):
649 """Verifies the cluster status.
652 HPATH = "cluster-verify"
653 HTYPE = constants.HTYPE_CLUSTER
654 _OP_REQP = ["skip_checks"]
657 def ExpandNames(self):
658 self.needed_locks = {
659 locking.LEVEL_NODE: locking.ALL_SET,
660 locking.LEVEL_INSTANCE: locking.ALL_SET,
662 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
664 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665 node_result, feedback_fn, master_files,
667 """Run multiple tests against a node.
671 - compares ganeti version
672 - checks vg existance and size > 20G
673 - checks config file checksum
674 - checks ssh to other nodes
676 @type nodeinfo: L{objects.Node}
677 @param nodeinfo: the node to check
678 @param file_list: required list of files
679 @param local_cksum: dictionary of local files and their checksums
680 @param node_result: the results from the node
681 @param feedback_fn: function used to accumulate results
682 @param master_files: list of files that only masters should have
683 @param drbd_map: the useddrbd minors for this node, in
684 form of minor: (instance, must_exist) which correspond to instances
685 and their running status
686 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
691 # main result, node_result should be a non-empty dict
692 if not node_result or not isinstance(node_result, dict):
693 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
696 # compares ganeti version
697 local_version = constants.PROTOCOL_VERSION
698 remote_version = node_result.get('version', None)
699 if not (remote_version and isinstance(remote_version, (list, tuple)) and
700 len(remote_version) == 2):
701 feedback_fn(" - ERROR: connection to %s failed" % (node))
704 if local_version != remote_version[0]:
705 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
706 " node %s %s" % (local_version, node, remote_version[0]))
709 # node seems compatible, we can actually try to look into its results
713 # full package version
714 if constants.RELEASE_VERSION != remote_version[1]:
715 feedback_fn(" - WARNING: software version mismatch: master %s,"
717 (constants.RELEASE_VERSION, node, remote_version[1]))
719 # checks vg existence and size > 20G
720 if vg_name is not None:
721 vglist = node_result.get(constants.NV_VGLIST, None)
723 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
727 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728 constants.MIN_VG_SIZE)
730 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
733 # checks config file checksum
735 remote_cksum = node_result.get(constants.NV_FILELIST, None)
736 if not isinstance(remote_cksum, dict):
738 feedback_fn(" - ERROR: node hasn't returned file checksum data")
740 for file_name in file_list:
741 node_is_mc = nodeinfo.master_candidate
742 must_have_file = file_name not in master_files
743 if file_name not in remote_cksum:
744 if node_is_mc or must_have_file:
746 feedback_fn(" - ERROR: file '%s' missing" % file_name)
747 elif remote_cksum[file_name] != local_cksum[file_name]:
748 if node_is_mc or must_have_file:
750 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
752 # not candidate and this is not a must-have file
754 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
757 # all good, except non-master/non-must have combination
758 if not node_is_mc and not must_have_file:
759 feedback_fn(" - ERROR: file '%s' should not exist on non master"
760 " candidates" % file_name)
764 if constants.NV_NODELIST not in node_result:
766 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
768 if node_result[constants.NV_NODELIST]:
770 for node in node_result[constants.NV_NODELIST]:
771 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
772 (node, node_result[constants.NV_NODELIST][node]))
774 if constants.NV_NODENETTEST not in node_result:
776 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
778 if node_result[constants.NV_NODENETTEST]:
780 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
782 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
783 (node, node_result[constants.NV_NODENETTEST][node]))
785 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786 if isinstance(hyp_result, dict):
787 for hv_name, hv_result in hyp_result.iteritems():
788 if hv_result is not None:
789 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
790 (hv_name, hv_result))
792 # check used drbd list
793 if vg_name is not None:
794 used_minors = node_result.get(constants.NV_DRBDLIST, [])
795 if not isinstance(used_minors, (tuple, list)):
796 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
799 for minor, (iname, must_exist) in drbd_map.items():
800 if minor not in used_minors and must_exist:
801 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
802 " not active" % (minor, iname))
804 for minor in used_minors:
805 if minor not in drbd_map:
806 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
812 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813 node_instance, feedback_fn, n_offline):
814 """Verify an instance.
816 This function checks to see if the required block devices are
817 available on the instance's node.
822 node_current = instanceconfig.primary_node
825 instanceconfig.MapLVsByNode(node_vol_should)
827 for node in node_vol_should:
828 if node in n_offline:
829 # ignore missing volumes on offline nodes
831 for volume in node_vol_should[node]:
832 if node not in node_vol_is or volume not in node_vol_is[node]:
833 feedback_fn(" - ERROR: volume %s missing on node %s" %
837 if instanceconfig.admin_up:
838 if ((node_current not in node_instance or
839 not instance in node_instance[node_current]) and
840 node_current not in n_offline):
841 feedback_fn(" - ERROR: instance %s not running on node %s" %
842 (instance, node_current))
845 for node in node_instance:
846 if (not node == node_current):
847 if instance in node_instance[node]:
848 feedback_fn(" - ERROR: instance %s should not run on node %s" %
854 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855 """Verify if there are any unknown volumes in the cluster.
857 The .os, .swap and backup volumes are ignored. All other volumes are
863 for node in node_vol_is:
864 for volume in node_vol_is[node]:
865 if node not in node_vol_should or volume not in node_vol_should[node]:
866 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
871 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872 """Verify the list of running instances.
874 This checks what instances are running but unknown to the cluster.
878 for node in node_instance:
879 for runninginstance in node_instance[node]:
880 if runninginstance not in instancelist:
881 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
882 (runninginstance, node))
886 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887 """Verify N+1 Memory Resilience.
889 Check that if one single node dies we can still start all the instances it
895 for node, nodeinfo in node_info.iteritems():
896 # This code checks that every node which is now listed as secondary has
897 # enough memory to host all instances it is supposed to should a single
898 # other node in the cluster fail.
899 # FIXME: not ready for failover to an arbitrary node
900 # FIXME: does not support file-backed instances
901 # WARNING: we currently take into account down instances as well as up
902 # ones, considering that even if they're down someone might want to start
903 # them even in the event of a node failure.
904 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
906 for instance in instances:
907 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908 if bep[constants.BE_AUTO_BALANCE]:
909 needed_mem += bep[constants.BE_MEMORY]
910 if nodeinfo['mfree'] < needed_mem:
911 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
912 " failovers should node %s fail" % (node, prinode))
916 def CheckPrereq(self):
917 """Check prerequisites.
919 Transform the list of checks we're going to skip into a set and check that
920 all its members are valid.
923 self.skip_set = frozenset(self.op.skip_checks)
924 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925 raise errors.OpPrereqError("Invalid checks to be skipped specified")
927 def BuildHooksEnv(self):
930 Cluster-Verify hooks just rone in the post phase and their failure makes
931 the output be logged in the verify output and the verification to fail.
934 all_nodes = self.cfg.GetNodeList()
936 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
938 for node in self.cfg.GetAllNodesInfo().values():
939 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
941 return env, [], all_nodes
943 def Exec(self, feedback_fn):
944 """Verify integrity of cluster, performing various test on nodes.
948 feedback_fn("* Verifying global settings")
949 for msg in self.cfg.VerifyConfig():
950 feedback_fn(" - ERROR: %s" % msg)
952 vg_name = self.cfg.GetVGName()
953 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954 nodelist = utils.NiceSort(self.cfg.GetNodeList())
955 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958 for iname in instancelist)
959 i_non_redundant = [] # Non redundant instances
960 i_non_a_balanced = [] # Non auto-balanced instances
961 n_offline = [] # List of offline nodes
962 n_drained = [] # List of nodes being drained
968 # FIXME: verify OS list
970 master_files = [constants.CLUSTER_CONF_FILE]
972 file_names = ssconf.SimpleStore().GetFileList()
973 file_names.append(constants.SSL_CERT_FILE)
974 file_names.append(constants.RAPI_CERT_FILE)
975 file_names.extend(master_files)
977 local_checksums = utils.FingerprintFiles(file_names)
979 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980 node_verify_param = {
981 constants.NV_FILELIST: file_names,
982 constants.NV_NODELIST: [node.name for node in nodeinfo
983 if not node.offline],
984 constants.NV_HYPERVISOR: hypervisors,
985 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986 node.secondary_ip) for node in nodeinfo
987 if not node.offline],
988 constants.NV_INSTANCELIST: hypervisors,
989 constants.NV_VERSION: None,
990 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
992 if vg_name is not None:
993 node_verify_param[constants.NV_VGLIST] = None
994 node_verify_param[constants.NV_LVLIST] = vg_name
995 node_verify_param[constants.NV_DRBDLIST] = None
996 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997 self.cfg.GetClusterName())
999 cluster = self.cfg.GetClusterInfo()
1000 master_node = self.cfg.GetMasterNode()
1001 all_drbd_map = self.cfg.ComputeDRBDMap()
1003 for node_i in nodeinfo:
1005 nresult = all_nvinfo[node].data
1008 feedback_fn("* Skipping offline node %s" % (node,))
1009 n_offline.append(node)
1012 if node == master_node:
1014 elif node_i.master_candidate:
1015 ntype = "master candidate"
1016 elif node_i.drained:
1018 n_drained.append(node)
1021 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1023 if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024 feedback_fn(" - ERROR: connection to %s failed" % (node,))
1029 for minor, instance in all_drbd_map[node].items():
1030 if instance not in instanceinfo:
1031 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1033 # ghost instance should not be running, but otherwise we
1034 # don't give double warnings (both ghost instance and
1035 # unallocated minor in use)
1036 node_drbd[minor] = (instance, False)
1038 instance = instanceinfo[instance]
1039 node_drbd[minor] = (instance.name, instance.admin_up)
1040 result = self._VerifyNode(node_i, file_names, local_checksums,
1041 nresult, feedback_fn, master_files,
1045 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1047 node_volume[node] = {}
1048 elif isinstance(lvdata, basestring):
1049 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1050 (node, utils.SafeEncode(lvdata)))
1052 node_volume[node] = {}
1053 elif not isinstance(lvdata, dict):
1054 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1058 node_volume[node] = lvdata
1061 idata = nresult.get(constants.NV_INSTANCELIST, None)
1062 if not isinstance(idata, list):
1063 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1068 node_instance[node] = idata
1071 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072 if not isinstance(nodeinfo, dict):
1073 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1079 "mfree": int(nodeinfo['memory_free']),
1082 # dictionary holding all instances this node is secondary for,
1083 # grouped by their primary node. Each key is a cluster node, and each
1084 # value is a list of instances which have the key as primary and the
1085 # current node as secondary. this is handy to calculate N+1 memory
1086 # availability if you can only failover from a primary to its
1088 "sinst-by-pnode": {},
1090 # FIXME: devise a free space model for file based instances as well
1091 if vg_name is not None:
1092 if (constants.NV_VGLIST not in nresult or
1093 vg_name not in nresult[constants.NV_VGLIST]):
1094 feedback_fn(" - ERROR: node %s didn't return data for the"
1095 " volume group '%s' - it is either missing or broken" %
1099 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100 except (ValueError, KeyError):
1101 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1102 " from node %s" % (node,))
1106 node_vol_should = {}
1108 for instance in instancelist:
1109 feedback_fn("* Verifying instance %s" % instance)
1110 inst_config = instanceinfo[instance]
1111 result = self._VerifyInstance(instance, inst_config, node_volume,
1112 node_instance, feedback_fn, n_offline)
1114 inst_nodes_offline = []
1116 inst_config.MapLVsByNode(node_vol_should)
1118 instance_cfg[instance] = inst_config
1120 pnode = inst_config.primary_node
1121 if pnode in node_info:
1122 node_info[pnode]['pinst'].append(instance)
1123 elif pnode not in n_offline:
1124 feedback_fn(" - ERROR: instance %s, connection to primary node"
1125 " %s failed" % (instance, pnode))
1128 if pnode in n_offline:
1129 inst_nodes_offline.append(pnode)
1131 # If the instance is non-redundant we cannot survive losing its primary
1132 # node, so we are not N+1 compliant. On the other hand we have no disk
1133 # templates with more than one secondary so that situation is not well
1135 # FIXME: does not support file-backed instances
1136 if len(inst_config.secondary_nodes) == 0:
1137 i_non_redundant.append(instance)
1138 elif len(inst_config.secondary_nodes) > 1:
1139 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1142 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143 i_non_a_balanced.append(instance)
1145 for snode in inst_config.secondary_nodes:
1146 if snode in node_info:
1147 node_info[snode]['sinst'].append(instance)
1148 if pnode not in node_info[snode]['sinst-by-pnode']:
1149 node_info[snode]['sinst-by-pnode'][pnode] = []
1150 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151 elif snode not in n_offline:
1152 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1153 " %s failed" % (instance, snode))
1155 if snode in n_offline:
1156 inst_nodes_offline.append(snode)
1158 if inst_nodes_offline:
1159 # warn that the instance lives on offline nodes, and set bad=True
1160 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1161 ", ".join(inst_nodes_offline))
1164 feedback_fn("* Verifying orphan volumes")
1165 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1169 feedback_fn("* Verifying remaining instances")
1170 result = self._VerifyOrphanInstances(instancelist, node_instance,
1174 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175 feedback_fn("* Verifying N+1 Memory redundancy")
1176 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1179 feedback_fn("* Other Notes")
1181 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1182 % len(i_non_redundant))
1184 if i_non_a_balanced:
1185 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1186 % len(i_non_a_balanced))
1189 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1192 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1196 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197 """Analize the post-hooks' result
1199 This method analyses the hook result, handles it, and sends some
1200 nicely-formatted feedback back to the user.
1202 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204 @param hooks_results: the results of the multi-node hooks rpc call
1205 @param feedback_fn: function used send feedback back to the caller
1206 @param lu_result: previous Exec result
1207 @return: the new Exec result, based on the previous result
1211 # We only really run POST phase hooks, and are only interested in
1213 if phase == constants.HOOKS_PHASE_POST:
1214 # Used to change hooks' output to proper indentation
1215 indent_re = re.compile('^', re.M)
1216 feedback_fn("* Hooks Results")
1217 if not hooks_results:
1218 feedback_fn(" - ERROR: general communication failure")
1221 for node_name in hooks_results:
1222 show_node_header = True
1223 res = hooks_results[node_name]
1224 if res.failed or res.data is False or not isinstance(res.data, list):
1226 # no need to warn or set fail return value
1228 feedback_fn(" Communication failure in hooks execution")
1231 for script, hkr, output in res.data:
1232 if hkr == constants.HKR_FAIL:
1233 # The node header is only shown once, if there are
1234 # failing hooks on that node
1235 if show_node_header:
1236 feedback_fn(" Node %s:" % node_name)
1237 show_node_header = False
1238 feedback_fn(" ERROR: Script %s failed, output:" % script)
1239 output = indent_re.sub(' ', output)
1240 feedback_fn("%s" % output)
1246 class LUVerifyDisks(NoHooksLU):
1247 """Verifies the cluster disks status.
1253 def ExpandNames(self):
1254 self.needed_locks = {
1255 locking.LEVEL_NODE: locking.ALL_SET,
1256 locking.LEVEL_INSTANCE: locking.ALL_SET,
1258 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1260 def CheckPrereq(self):
1261 """Check prerequisites.
1263 This has no prerequisites.
1268 def Exec(self, feedback_fn):
1269 """Verify integrity of cluster disks.
1272 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1274 vg_name = self.cfg.GetVGName()
1275 nodes = utils.NiceSort(self.cfg.GetNodeList())
1276 instances = [self.cfg.GetInstanceInfo(name)
1277 for name in self.cfg.GetInstanceList()]
1280 for inst in instances:
1282 if (not inst.admin_up or
1283 inst.disk_template not in constants.DTS_NET_MIRROR):
1285 inst.MapLVsByNode(inst_lvs)
1286 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287 for node, vol_list in inst_lvs.iteritems():
1288 for vol in vol_list:
1289 nv_dict[(node, vol)] = inst
1294 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1299 lvs = node_lvs[node]
1302 self.LogWarning("Connection to node %s failed: %s" %
1306 if isinstance(lvs, basestring):
1307 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308 res_nlvm[node] = lvs
1310 elif not isinstance(lvs, dict):
1311 logging.warning("Connection to node %s failed or invalid data"
1313 res_nodes.append(node)
1316 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317 inst = nv_dict.pop((node, lv_name), None)
1318 if (not lv_online and inst is not None
1319 and inst.name not in res_instances):
1320 res_instances.append(inst.name)
1322 # any leftover items in nv_dict are missing LVs, let's arrange the
1324 for key, inst in nv_dict.iteritems():
1325 if inst.name not in res_missing:
1326 res_missing[inst.name] = []
1327 res_missing[inst.name].append(key)
1332 class LURenameCluster(LogicalUnit):
1333 """Rename the cluster.
1336 HPATH = "cluster-rename"
1337 HTYPE = constants.HTYPE_CLUSTER
1340 def BuildHooksEnv(self):
1345 "OP_TARGET": self.cfg.GetClusterName(),
1346 "NEW_NAME": self.op.name,
1348 mn = self.cfg.GetMasterNode()
1349 return env, [mn], [mn]
1351 def CheckPrereq(self):
1352 """Verify that the passed name is a valid one.
1355 hostname = utils.HostInfo(self.op.name)
1357 new_name = hostname.name
1358 self.ip = new_ip = hostname.ip
1359 old_name = self.cfg.GetClusterName()
1360 old_ip = self.cfg.GetMasterIP()
1361 if new_name == old_name and new_ip == old_ip:
1362 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1363 " cluster has changed")
1364 if new_ip != old_ip:
1365 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1366 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1367 " reachable on the network. Aborting." %
1370 self.op.name = new_name
1372 def Exec(self, feedback_fn):
1373 """Rename the cluster.
1376 clustername = self.op.name
1379 # shutdown the master IP
1380 master = self.cfg.GetMasterNode()
1381 result = self.rpc.call_node_stop_master(master, False)
1382 if result.failed or not result.data:
1383 raise errors.OpExecError("Could not disable the master role")
1386 cluster = self.cfg.GetClusterInfo()
1387 cluster.cluster_name = clustername
1388 cluster.master_ip = ip
1389 self.cfg.Update(cluster)
1391 # update the known hosts file
1392 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1393 node_list = self.cfg.GetNodeList()
1395 node_list.remove(master)
1398 result = self.rpc.call_upload_file(node_list,
1399 constants.SSH_KNOWN_HOSTS_FILE)
1400 for to_node, to_result in result.iteritems():
1401 if to_result.failed or not to_result.data:
1402 logging.error("Copy of file %s to node %s failed",
1403 constants.SSH_KNOWN_HOSTS_FILE, to_node)
1406 result = self.rpc.call_node_start_master(master, False)
1407 if result.failed or not result.data:
1408 self.LogWarning("Could not re-enable the master role on"
1409 " the master, please restart manually.")
1412 def _RecursiveCheckIfLVMBased(disk):
1413 """Check if the given disk or its children are lvm-based.
1415 @type disk: L{objects.Disk}
1416 @param disk: the disk to check
1418 @return: boolean indicating whether a LD_LV dev_type was found or not
1422 for chdisk in disk.children:
1423 if _RecursiveCheckIfLVMBased(chdisk):
1425 return disk.dev_type == constants.LD_LV
1428 class LUSetClusterParams(LogicalUnit):
1429 """Change the parameters of the cluster.
1432 HPATH = "cluster-modify"
1433 HTYPE = constants.HTYPE_CLUSTER
1437 def CheckArguments(self):
1441 if not hasattr(self.op, "candidate_pool_size"):
1442 self.op.candidate_pool_size = None
1443 if self.op.candidate_pool_size is not None:
1445 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1446 except (ValueError, TypeError), err:
1447 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1449 if self.op.candidate_pool_size < 1:
1450 raise errors.OpPrereqError("At least one master candidate needed")
1452 def ExpandNames(self):
1453 # FIXME: in the future maybe other cluster params won't require checking on
1454 # all nodes to be modified.
1455 self.needed_locks = {
1456 locking.LEVEL_NODE: locking.ALL_SET,
1458 self.share_locks[locking.LEVEL_NODE] = 1
1460 def BuildHooksEnv(self):
1465 "OP_TARGET": self.cfg.GetClusterName(),
1466 "NEW_VG_NAME": self.op.vg_name,
1468 mn = self.cfg.GetMasterNode()
1469 return env, [mn], [mn]
1471 def CheckPrereq(self):
1472 """Check prerequisites.
1474 This checks whether the given params don't conflict and
1475 if the given volume group is valid.
1478 if self.op.vg_name is not None and not self.op.vg_name:
1479 instances = self.cfg.GetAllInstancesInfo().values()
1480 for inst in instances:
1481 for disk in inst.disks:
1482 if _RecursiveCheckIfLVMBased(disk):
1483 raise errors.OpPrereqError("Cannot disable lvm storage while"
1484 " lvm-based instances exist")
1486 node_list = self.acquired_locks[locking.LEVEL_NODE]
1488 # if vg_name not None, checks given volume group on all nodes
1490 vglist = self.rpc.call_vg_list(node_list)
1491 for node in node_list:
1492 if vglist[node].failed:
1493 # ignoring down node
1494 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1496 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1498 constants.MIN_VG_SIZE)
1500 raise errors.OpPrereqError("Error on node '%s': %s" %
1503 self.cluster = cluster = self.cfg.GetClusterInfo()
1504 # validate beparams changes
1505 if self.op.beparams:
1506 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1507 self.new_beparams = cluster.FillDict(
1508 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1510 # hypervisor list/parameters
1511 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1512 if self.op.hvparams:
1513 if not isinstance(self.op.hvparams, dict):
1514 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1515 for hv_name, hv_dict in self.op.hvparams.items():
1516 if hv_name not in self.new_hvparams:
1517 self.new_hvparams[hv_name] = hv_dict
1519 self.new_hvparams[hv_name].update(hv_dict)
1521 if self.op.enabled_hypervisors is not None:
1522 self.hv_list = self.op.enabled_hypervisors
1524 self.hv_list = cluster.enabled_hypervisors
1526 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1527 # either the enabled list has changed, or the parameters have, validate
1528 for hv_name, hv_params in self.new_hvparams.items():
1529 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1530 (self.op.enabled_hypervisors and
1531 hv_name in self.op.enabled_hypervisors)):
1532 # either this is a new hypervisor, or its parameters have changed
1533 hv_class = hypervisor.GetHypervisor(hv_name)
1534 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1535 hv_class.CheckParameterSyntax(hv_params)
1536 _CheckHVParams(self, node_list, hv_name, hv_params)
1538 def Exec(self, feedback_fn):
1539 """Change the parameters of the cluster.
1542 if self.op.vg_name is not None:
1543 new_volume = self.op.vg_name
1546 if new_volume != self.cfg.GetVGName():
1547 self.cfg.SetVGName(new_volume)
1549 feedback_fn("Cluster LVM configuration already in desired"
1550 " state, not changing")
1551 if self.op.hvparams:
1552 self.cluster.hvparams = self.new_hvparams
1553 if self.op.enabled_hypervisors is not None:
1554 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1555 if self.op.beparams:
1556 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1557 if self.op.candidate_pool_size is not None:
1558 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1560 self.cfg.Update(self.cluster)
1562 # we want to update nodes after the cluster so that if any errors
1563 # happen, we have recorded and saved the cluster info
1564 if self.op.candidate_pool_size is not None:
1565 _AdjustCandidatePool(self)
1568 class LURedistributeConfig(NoHooksLU):
1569 """Force the redistribution of cluster configuration.
1571 This is a very simple LU.
1577 def ExpandNames(self):
1578 self.needed_locks = {
1579 locking.LEVEL_NODE: locking.ALL_SET,
1581 self.share_locks[locking.LEVEL_NODE] = 1
1583 def CheckPrereq(self):
1584 """Check prerequisites.
1588 def Exec(self, feedback_fn):
1589 """Redistribute the configuration.
1592 self.cfg.Update(self.cfg.GetClusterInfo())
1595 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1596 """Sleep and poll for an instance's disk to sync.
1599 if not instance.disks:
1603 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1605 node = instance.primary_node
1607 for dev in instance.disks:
1608 lu.cfg.SetDiskID(dev, node)
1611 degr_retries = 10 # in seconds, as we sleep 1 second each time
1615 cumul_degraded = False
1616 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1617 if rstats.failed or not rstats.data:
1618 lu.LogWarning("Can't get any data from node %s", node)
1621 raise errors.RemoteError("Can't contact node %s for mirror data,"
1622 " aborting." % node)
1625 rstats = rstats.data
1627 for i, mstat in enumerate(rstats):
1629 lu.LogWarning("Can't compute data for node %s/%s",
1630 node, instance.disks[i].iv_name)
1632 # we ignore the ldisk parameter
1633 perc_done, est_time, is_degraded, _ = mstat
1634 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1635 if perc_done is not None:
1637 if est_time is not None:
1638 rem_time = "%d estimated seconds remaining" % est_time
1641 rem_time = "no time estimate"
1642 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1643 (instance.disks[i].iv_name, perc_done, rem_time))
1645 # if we're done but degraded, let's do a few small retries, to
1646 # make sure we see a stable and not transient situation; therefore
1647 # we force restart of the loop
1648 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1649 logging.info("Degraded disks found, %d retries left", degr_retries)
1657 time.sleep(min(60, max_time))
1660 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1661 return not cumul_degraded
1664 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1665 """Check that mirrors are not degraded.
1667 The ldisk parameter, if True, will change the test from the
1668 is_degraded attribute (which represents overall non-ok status for
1669 the device(s)) to the ldisk (representing the local storage status).
1672 lu.cfg.SetDiskID(dev, node)
1679 if on_primary or dev.AssembleOnSecondary():
1680 rstats = lu.rpc.call_blockdev_find(node, dev)
1681 msg = rstats.RemoteFailMsg()
1683 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1685 elif not rstats.payload:
1686 lu.LogWarning("Can't find disk on node %s", node)
1689 result = result and (not rstats.payload[idx])
1691 for child in dev.children:
1692 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1697 class LUDiagnoseOS(NoHooksLU):
1698 """Logical unit for OS diagnose/query.
1701 _OP_REQP = ["output_fields", "names"]
1703 _FIELDS_STATIC = utils.FieldSet()
1704 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1706 def ExpandNames(self):
1708 raise errors.OpPrereqError("Selective OS query not supported")
1710 _CheckOutputFields(static=self._FIELDS_STATIC,
1711 dynamic=self._FIELDS_DYNAMIC,
1712 selected=self.op.output_fields)
1714 # Lock all nodes, in shared mode
1715 # Temporary removal of locks, should be reverted later
1716 # TODO: reintroduce locks when they are lighter-weight
1717 self.needed_locks = {}
1718 #self.share_locks[locking.LEVEL_NODE] = 1
1719 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1721 def CheckPrereq(self):
1722 """Check prerequisites.
1727 def _DiagnoseByOS(node_list, rlist):
1728 """Remaps a per-node return list into an a per-os per-node dictionary
1730 @param node_list: a list with the names of all nodes
1731 @param rlist: a map with node names as keys and OS objects as values
1734 @return: a dictionary with osnames as keys and as value another map, with
1735 nodes as keys and list of OS objects as values, eg::
1737 {"debian-etch": {"node1": [<object>,...],
1738 "node2": [<object>,]}
1743 # we build here the list of nodes that didn't fail the RPC (at RPC
1744 # level), so that nodes with a non-responding node daemon don't
1745 # make all OSes invalid
1746 good_nodes = [node_name for node_name in rlist
1747 if not rlist[node_name].failed]
1748 for node_name, nr in rlist.iteritems():
1749 if nr.failed or not nr.data:
1751 for os_obj in nr.data:
1752 if os_obj.name not in all_os:
1753 # build a list of nodes for this os containing empty lists
1754 # for each node in node_list
1755 all_os[os_obj.name] = {}
1756 for nname in good_nodes:
1757 all_os[os_obj.name][nname] = []
1758 all_os[os_obj.name][node_name].append(os_obj)
1761 def Exec(self, feedback_fn):
1762 """Compute the list of OSes.
1765 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1766 node_data = self.rpc.call_os_diagnose(valid_nodes)
1767 if node_data == False:
1768 raise errors.OpExecError("Can't gather the list of OSes")
1769 pol = self._DiagnoseByOS(valid_nodes, node_data)
1771 for os_name, os_data in pol.iteritems():
1773 for field in self.op.output_fields:
1776 elif field == "valid":
1777 val = utils.all([osl and osl[0] for osl in os_data.values()])
1778 elif field == "node_status":
1780 for node_name, nos_list in os_data.iteritems():
1781 val[node_name] = [(v.status, v.path) for v in nos_list]
1783 raise errors.ParameterError(field)
1790 class LURemoveNode(LogicalUnit):
1791 """Logical unit for removing a node.
1794 HPATH = "node-remove"
1795 HTYPE = constants.HTYPE_NODE
1796 _OP_REQP = ["node_name"]
1798 def BuildHooksEnv(self):
1801 This doesn't run on the target node in the pre phase as a failed
1802 node would then be impossible to remove.
1806 "OP_TARGET": self.op.node_name,
1807 "NODE_NAME": self.op.node_name,
1809 all_nodes = self.cfg.GetNodeList()
1810 all_nodes.remove(self.op.node_name)
1811 return env, all_nodes, all_nodes
1813 def CheckPrereq(self):
1814 """Check prerequisites.
1817 - the node exists in the configuration
1818 - it does not have primary or secondary instances
1819 - it's not the master
1821 Any errors are signalled by raising errors.OpPrereqError.
1824 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1826 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1828 instance_list = self.cfg.GetInstanceList()
1830 masternode = self.cfg.GetMasterNode()
1831 if node.name == masternode:
1832 raise errors.OpPrereqError("Node is the master node,"
1833 " you need to failover first.")
1835 for instance_name in instance_list:
1836 instance = self.cfg.GetInstanceInfo(instance_name)
1837 if node.name in instance.all_nodes:
1838 raise errors.OpPrereqError("Instance %s is still running on the node,"
1839 " please remove first." % instance_name)
1840 self.op.node_name = node.name
1843 def Exec(self, feedback_fn):
1844 """Removes the node from the cluster.
1848 logging.info("Stopping the node daemon and removing configs from node %s",
1851 self.context.RemoveNode(node.name)
1853 self.rpc.call_node_leave_cluster(node.name)
1855 # Promote nodes to master candidate as needed
1856 _AdjustCandidatePool(self)
1859 class LUQueryNodes(NoHooksLU):
1860 """Logical unit for querying nodes.
1863 _OP_REQP = ["output_fields", "names", "use_locking"]
1865 _FIELDS_DYNAMIC = utils.FieldSet(
1867 "mtotal", "mnode", "mfree",
1869 "ctotal", "cnodes", "csockets",
1872 _FIELDS_STATIC = utils.FieldSet(
1873 "name", "pinst_cnt", "sinst_cnt",
1874 "pinst_list", "sinst_list",
1875 "pip", "sip", "tags",
1883 def ExpandNames(self):
1884 _CheckOutputFields(static=self._FIELDS_STATIC,
1885 dynamic=self._FIELDS_DYNAMIC,
1886 selected=self.op.output_fields)
1888 self.needed_locks = {}
1889 self.share_locks[locking.LEVEL_NODE] = 1
1892 self.wanted = _GetWantedNodes(self, self.op.names)
1894 self.wanted = locking.ALL_SET
1896 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1897 self.do_locking = self.do_node_query and self.op.use_locking
1899 # if we don't request only static fields, we need to lock the nodes
1900 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1903 def CheckPrereq(self):
1904 """Check prerequisites.
1907 # The validation of the node list is done in the _GetWantedNodes,
1908 # if non empty, and if empty, there's no validation to do
1911 def Exec(self, feedback_fn):
1912 """Computes the list of nodes and their attributes.
1915 all_info = self.cfg.GetAllNodesInfo()
1917 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1918 elif self.wanted != locking.ALL_SET:
1919 nodenames = self.wanted
1920 missing = set(nodenames).difference(all_info.keys())
1922 raise errors.OpExecError(
1923 "Some nodes were removed before retrieving their data: %s" % missing)
1925 nodenames = all_info.keys()
1927 nodenames = utils.NiceSort(nodenames)
1928 nodelist = [all_info[name] for name in nodenames]
1930 # begin data gathering
1932 if self.do_node_query:
1934 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1935 self.cfg.GetHypervisorType())
1936 for name in nodenames:
1937 nodeinfo = node_data[name]
1938 if not nodeinfo.failed and nodeinfo.data:
1939 nodeinfo = nodeinfo.data
1940 fn = utils.TryConvert
1942 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1943 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1944 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1945 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1946 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1947 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1948 "bootid": nodeinfo.get('bootid', None),
1949 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1950 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1953 live_data[name] = {}
1955 live_data = dict.fromkeys(nodenames, {})
1957 node_to_primary = dict([(name, set()) for name in nodenames])
1958 node_to_secondary = dict([(name, set()) for name in nodenames])
1960 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1961 "sinst_cnt", "sinst_list"))
1962 if inst_fields & frozenset(self.op.output_fields):
1963 instancelist = self.cfg.GetInstanceList()
1965 for instance_name in instancelist:
1966 inst = self.cfg.GetInstanceInfo(instance_name)
1967 if inst.primary_node in node_to_primary:
1968 node_to_primary[inst.primary_node].add(inst.name)
1969 for secnode in inst.secondary_nodes:
1970 if secnode in node_to_secondary:
1971 node_to_secondary[secnode].add(inst.name)
1973 master_node = self.cfg.GetMasterNode()
1975 # end data gathering
1978 for node in nodelist:
1980 for field in self.op.output_fields:
1983 elif field == "pinst_list":
1984 val = list(node_to_primary[node.name])
1985 elif field == "sinst_list":
1986 val = list(node_to_secondary[node.name])
1987 elif field == "pinst_cnt":
1988 val = len(node_to_primary[node.name])
1989 elif field == "sinst_cnt":
1990 val = len(node_to_secondary[node.name])
1991 elif field == "pip":
1992 val = node.primary_ip
1993 elif field == "sip":
1994 val = node.secondary_ip
1995 elif field == "tags":
1996 val = list(node.GetTags())
1997 elif field == "serial_no":
1998 val = node.serial_no
1999 elif field == "master_candidate":
2000 val = node.master_candidate
2001 elif field == "master":
2002 val = node.name == master_node
2003 elif field == "offline":
2005 elif field == "drained":
2007 elif self._FIELDS_DYNAMIC.Matches(field):
2008 val = live_data[node.name].get(field, None)
2010 raise errors.ParameterError(field)
2011 node_output.append(val)
2012 output.append(node_output)
2017 class LUQueryNodeVolumes(NoHooksLU):
2018 """Logical unit for getting volumes on node(s).
2021 _OP_REQP = ["nodes", "output_fields"]
2023 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2024 _FIELDS_STATIC = utils.FieldSet("node")
2026 def ExpandNames(self):
2027 _CheckOutputFields(static=self._FIELDS_STATIC,
2028 dynamic=self._FIELDS_DYNAMIC,
2029 selected=self.op.output_fields)
2031 self.needed_locks = {}
2032 self.share_locks[locking.LEVEL_NODE] = 1
2033 if not self.op.nodes:
2034 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2036 self.needed_locks[locking.LEVEL_NODE] = \
2037 _GetWantedNodes(self, self.op.nodes)
2039 def CheckPrereq(self):
2040 """Check prerequisites.
2042 This checks that the fields required are valid output fields.
2045 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2047 def Exec(self, feedback_fn):
2048 """Computes the list of nodes and their attributes.
2051 nodenames = self.nodes
2052 volumes = self.rpc.call_node_volumes(nodenames)
2054 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2055 in self.cfg.GetInstanceList()]
2057 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2060 for node in nodenames:
2061 if node not in volumes or volumes[node].failed or not volumes[node].data:
2064 node_vols = volumes[node].data[:]
2065 node_vols.sort(key=lambda vol: vol['dev'])
2067 for vol in node_vols:
2069 for field in self.op.output_fields:
2072 elif field == "phys":
2076 elif field == "name":
2078 elif field == "size":
2079 val = int(float(vol['size']))
2080 elif field == "instance":
2082 if node not in lv_by_node[inst]:
2084 if vol['name'] in lv_by_node[inst][node]:
2090 raise errors.ParameterError(field)
2091 node_output.append(str(val))
2093 output.append(node_output)
2098 class LUAddNode(LogicalUnit):
2099 """Logical unit for adding node to the cluster.
2103 HTYPE = constants.HTYPE_NODE
2104 _OP_REQP = ["node_name"]
2106 def BuildHooksEnv(self):
2109 This will run on all nodes before, and on all nodes + the new node after.
2113 "OP_TARGET": self.op.node_name,
2114 "NODE_NAME": self.op.node_name,
2115 "NODE_PIP": self.op.primary_ip,
2116 "NODE_SIP": self.op.secondary_ip,
2118 nodes_0 = self.cfg.GetNodeList()
2119 nodes_1 = nodes_0 + [self.op.node_name, ]
2120 return env, nodes_0, nodes_1
2122 def CheckPrereq(self):
2123 """Check prerequisites.
2126 - the new node is not already in the config
2128 - its parameters (single/dual homed) matches the cluster
2130 Any errors are signalled by raising errors.OpPrereqError.
2133 node_name = self.op.node_name
2136 dns_data = utils.HostInfo(node_name)
2138 node = dns_data.name
2139 primary_ip = self.op.primary_ip = dns_data.ip
2140 secondary_ip = getattr(self.op, "secondary_ip", None)
2141 if secondary_ip is None:
2142 secondary_ip = primary_ip
2143 if not utils.IsValidIP(secondary_ip):
2144 raise errors.OpPrereqError("Invalid secondary IP given")
2145 self.op.secondary_ip = secondary_ip
2147 node_list = cfg.GetNodeList()
2148 if not self.op.readd and node in node_list:
2149 raise errors.OpPrereqError("Node %s is already in the configuration" %
2151 elif self.op.readd and node not in node_list:
2152 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2154 for existing_node_name in node_list:
2155 existing_node = cfg.GetNodeInfo(existing_node_name)
2157 if self.op.readd and node == existing_node_name:
2158 if (existing_node.primary_ip != primary_ip or
2159 existing_node.secondary_ip != secondary_ip):
2160 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2161 " address configuration as before")
2164 if (existing_node.primary_ip == primary_ip or
2165 existing_node.secondary_ip == primary_ip or
2166 existing_node.primary_ip == secondary_ip or
2167 existing_node.secondary_ip == secondary_ip):
2168 raise errors.OpPrereqError("New node ip address(es) conflict with"
2169 " existing node %s" % existing_node.name)
2171 # check that the type of the node (single versus dual homed) is the
2172 # same as for the master
2173 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2174 master_singlehomed = myself.secondary_ip == myself.primary_ip
2175 newbie_singlehomed = secondary_ip == primary_ip
2176 if master_singlehomed != newbie_singlehomed:
2177 if master_singlehomed:
2178 raise errors.OpPrereqError("The master has no private ip but the"
2179 " new node has one")
2181 raise errors.OpPrereqError("The master has a private ip but the"
2182 " new node doesn't have one")
2184 # checks reachablity
2185 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2186 raise errors.OpPrereqError("Node not reachable by ping")
2188 if not newbie_singlehomed:
2189 # check reachability from my secondary ip to newbie's secondary ip
2190 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2191 source=myself.secondary_ip):
2192 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2193 " based ping to noded port")
2195 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2196 mc_now, _ = self.cfg.GetMasterCandidateStats()
2197 master_candidate = mc_now < cp_size
2199 self.new_node = objects.Node(name=node,
2200 primary_ip=primary_ip,
2201 secondary_ip=secondary_ip,
2202 master_candidate=master_candidate,
2203 offline=False, drained=False)
2205 def Exec(self, feedback_fn):
2206 """Adds the new node to the cluster.
2209 new_node = self.new_node
2210 node = new_node.name
2212 # check connectivity
2213 result = self.rpc.call_version([node])[node]
2216 if constants.PROTOCOL_VERSION == result.data:
2217 logging.info("Communication to node %s fine, sw version %s match",
2220 raise errors.OpExecError("Version mismatch master version %s,"
2221 " node version %s" %
2222 (constants.PROTOCOL_VERSION, result.data))
2224 raise errors.OpExecError("Cannot get version from the new node")
2227 logging.info("Copy ssh key to node %s", node)
2228 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2230 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2231 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2237 keyarray.append(f.read())
2241 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2243 keyarray[3], keyarray[4], keyarray[5])
2245 msg = result.RemoteFailMsg()
2247 raise errors.OpExecError("Cannot transfer ssh keys to the"
2248 " new node: %s" % msg)
2250 # Add node to our /etc/hosts, and add key to known_hosts
2251 utils.AddHostToEtcHosts(new_node.name)
2253 if new_node.secondary_ip != new_node.primary_ip:
2254 result = self.rpc.call_node_has_ip_address(new_node.name,
2255 new_node.secondary_ip)
2256 if result.failed or not result.data:
2257 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2258 " you gave (%s). Please fix and re-run this"
2259 " command." % new_node.secondary_ip)
2261 node_verify_list = [self.cfg.GetMasterNode()]
2262 node_verify_param = {
2264 # TODO: do a node-net-test as well?
2267 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2268 self.cfg.GetClusterName())
2269 for verifier in node_verify_list:
2270 if result[verifier].failed or not result[verifier].data:
2271 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2272 " for remote verification" % verifier)
2273 if result[verifier].data['nodelist']:
2274 for failed in result[verifier].data['nodelist']:
2275 feedback_fn("ssh/hostname verification failed %s -> %s" %
2276 (verifier, result[verifier].data['nodelist'][failed]))
2277 raise errors.OpExecError("ssh/hostname verification failed.")
2279 # Distribute updated /etc/hosts and known_hosts to all nodes,
2280 # including the node just added
2281 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2282 dist_nodes = self.cfg.GetNodeList()
2283 if not self.op.readd:
2284 dist_nodes.append(node)
2285 if myself.name in dist_nodes:
2286 dist_nodes.remove(myself.name)
2288 logging.debug("Copying hosts and known_hosts to all nodes")
2289 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2290 result = self.rpc.call_upload_file(dist_nodes, fname)
2291 for to_node, to_result in result.iteritems():
2292 if to_result.failed or not to_result.data:
2293 logging.error("Copy of file %s to node %s failed", fname, to_node)
2296 enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2297 if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2298 to_copy.append(constants.VNC_PASSWORD_FILE)
2300 for fname in to_copy:
2301 result = self.rpc.call_upload_file([node], fname)
2302 if result[node].failed or not result[node]:
2303 logging.error("Could not copy file %s to node %s", fname, node)
2306 self.context.ReaddNode(new_node)
2308 self.context.AddNode(new_node)
2311 class LUSetNodeParams(LogicalUnit):
2312 """Modifies the parameters of a node.
2315 HPATH = "node-modify"
2316 HTYPE = constants.HTYPE_NODE
2317 _OP_REQP = ["node_name"]
2320 def CheckArguments(self):
2321 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2322 if node_name is None:
2323 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2324 self.op.node_name = node_name
2325 _CheckBooleanOpField(self.op, 'master_candidate')
2326 _CheckBooleanOpField(self.op, 'offline')
2327 _CheckBooleanOpField(self.op, 'drained')
2328 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2329 if all_mods.count(None) == 3:
2330 raise errors.OpPrereqError("Please pass at least one modification")
2331 if all_mods.count(True) > 1:
2332 raise errors.OpPrereqError("Can't set the node into more than one"
2333 " state at the same time")
2335 def ExpandNames(self):
2336 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2338 def BuildHooksEnv(self):
2341 This runs on the master node.
2345 "OP_TARGET": self.op.node_name,
2346 "MASTER_CANDIDATE": str(self.op.master_candidate),
2347 "OFFLINE": str(self.op.offline),
2348 "DRAINED": str(self.op.drained),
2350 nl = [self.cfg.GetMasterNode(),
2354 def CheckPrereq(self):
2355 """Check prerequisites.
2357 This only checks the instance list against the existing names.
2360 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2362 if ((self.op.master_candidate == False or self.op.offline == True or
2363 self.op.drained == True) and node.master_candidate):
2364 # we will demote the node from master_candidate
2365 if self.op.node_name == self.cfg.GetMasterNode():
2366 raise errors.OpPrereqError("The master node has to be a"
2367 " master candidate, online and not drained")
2368 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2369 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2370 if num_candidates <= cp_size:
2371 msg = ("Not enough master candidates (desired"
2372 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2374 self.LogWarning(msg)
2376 raise errors.OpPrereqError(msg)
2378 if (self.op.master_candidate == True and
2379 ((node.offline and not self.op.offline == False) or
2380 (node.drained and not self.op.drained == False))):
2381 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2382 " to master_candidate" % node.name)
2386 def Exec(self, feedback_fn):
2395 if self.op.offline is not None:
2396 node.offline = self.op.offline
2397 result.append(("offline", str(self.op.offline)))
2398 if self.op.offline == True:
2399 if node.master_candidate:
2400 node.master_candidate = False
2402 result.append(("master_candidate", "auto-demotion due to offline"))
2404 node.drained = False
2405 result.append(("drained", "clear drained status due to offline"))
2407 if self.op.master_candidate is not None:
2408 node.master_candidate = self.op.master_candidate
2410 result.append(("master_candidate", str(self.op.master_candidate)))
2411 if self.op.master_candidate == False:
2412 rrc = self.rpc.call_node_demote_from_mc(node.name)
2413 msg = rrc.RemoteFailMsg()
2415 self.LogWarning("Node failed to demote itself: %s" % msg)
2417 if self.op.drained is not None:
2418 node.drained = self.op.drained
2419 result.append(("drained", str(self.op.drained)))
2420 if self.op.drained == True:
2421 if node.master_candidate:
2422 node.master_candidate = False
2424 result.append(("master_candidate", "auto-demotion due to drain"))
2426 node.offline = False
2427 result.append(("offline", "clear offline status due to drain"))
2429 # this will trigger configuration file update, if needed
2430 self.cfg.Update(node)
2431 # this will trigger job queue propagation or cleanup
2433 self.context.ReaddNode(node)
2438 class LUQueryClusterInfo(NoHooksLU):
2439 """Query cluster configuration.
2445 def ExpandNames(self):
2446 self.needed_locks = {}
2448 def CheckPrereq(self):
2449 """No prerequsites needed for this LU.
2454 def Exec(self, feedback_fn):
2455 """Return cluster config.
2458 cluster = self.cfg.GetClusterInfo()
2460 "software_version": constants.RELEASE_VERSION,
2461 "protocol_version": constants.PROTOCOL_VERSION,
2462 "config_version": constants.CONFIG_VERSION,
2463 "os_api_version": constants.OS_API_VERSION,
2464 "export_version": constants.EXPORT_VERSION,
2465 "architecture": (platform.architecture()[0], platform.machine()),
2466 "name": cluster.cluster_name,
2467 "master": cluster.master_node,
2468 "default_hypervisor": cluster.default_hypervisor,
2469 "enabled_hypervisors": cluster.enabled_hypervisors,
2470 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2471 for hypervisor in cluster.enabled_hypervisors]),
2472 "beparams": cluster.beparams,
2473 "candidate_pool_size": cluster.candidate_pool_size,
2474 "default_bridge": cluster.default_bridge,
2475 "master_netdev": cluster.master_netdev,
2476 "volume_group_name": cluster.volume_group_name,
2477 "file_storage_dir": cluster.file_storage_dir,
2483 class LUQueryConfigValues(NoHooksLU):
2484 """Return configuration values.
2489 _FIELDS_DYNAMIC = utils.FieldSet()
2490 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2492 def ExpandNames(self):
2493 self.needed_locks = {}
2495 _CheckOutputFields(static=self._FIELDS_STATIC,
2496 dynamic=self._FIELDS_DYNAMIC,
2497 selected=self.op.output_fields)
2499 def CheckPrereq(self):
2500 """No prerequisites.
2505 def Exec(self, feedback_fn):
2506 """Dump a representation of the cluster config to the standard output.
2510 for field in self.op.output_fields:
2511 if field == "cluster_name":
2512 entry = self.cfg.GetClusterName()
2513 elif field == "master_node":
2514 entry = self.cfg.GetMasterNode()
2515 elif field == "drain_flag":
2516 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2518 raise errors.ParameterError(field)
2519 values.append(entry)
2523 class LUActivateInstanceDisks(NoHooksLU):
2524 """Bring up an instance's disks.
2527 _OP_REQP = ["instance_name"]
2530 def ExpandNames(self):
2531 self._ExpandAndLockInstance()
2532 self.needed_locks[locking.LEVEL_NODE] = []
2533 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2535 def DeclareLocks(self, level):
2536 if level == locking.LEVEL_NODE:
2537 self._LockInstancesNodes()
2539 def CheckPrereq(self):
2540 """Check prerequisites.
2542 This checks that the instance is in the cluster.
2545 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2546 assert self.instance is not None, \
2547 "Cannot retrieve locked instance %s" % self.op.instance_name
2548 _CheckNodeOnline(self, self.instance.primary_node)
2550 def Exec(self, feedback_fn):
2551 """Activate the disks.
2554 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2556 raise errors.OpExecError("Cannot activate block devices")
2561 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2562 """Prepare the block devices for an instance.
2564 This sets up the block devices on all nodes.
2566 @type lu: L{LogicalUnit}
2567 @param lu: the logical unit on whose behalf we execute
2568 @type instance: L{objects.Instance}
2569 @param instance: the instance for whose disks we assemble
2570 @type ignore_secondaries: boolean
2571 @param ignore_secondaries: if true, errors on secondary nodes
2572 won't result in an error return from the function
2573 @return: False if the operation failed, otherwise a list of
2574 (host, instance_visible_name, node_visible_name)
2575 with the mapping from node devices to instance devices
2580 iname = instance.name
2581 # With the two passes mechanism we try to reduce the window of
2582 # opportunity for the race condition of switching DRBD to primary
2583 # before handshaking occured, but we do not eliminate it
2585 # The proper fix would be to wait (with some limits) until the
2586 # connection has been made and drbd transitions from WFConnection
2587 # into any other network-connected state (Connected, SyncTarget,
2590 # 1st pass, assemble on all nodes in secondary mode
2591 for inst_disk in instance.disks:
2592 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2593 lu.cfg.SetDiskID(node_disk, node)
2594 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2595 msg = result.RemoteFailMsg()
2597 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2598 " (is_primary=False, pass=1): %s",
2599 inst_disk.iv_name, node, msg)
2600 if not ignore_secondaries:
2603 # FIXME: race condition on drbd migration to primary
2605 # 2nd pass, do only the primary node
2606 for inst_disk in instance.disks:
2607 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2608 if node != instance.primary_node:
2610 lu.cfg.SetDiskID(node_disk, node)
2611 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2612 msg = result.RemoteFailMsg()
2614 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2615 " (is_primary=True, pass=2): %s",
2616 inst_disk.iv_name, node, msg)
2618 device_info.append((instance.primary_node, inst_disk.iv_name,
2621 # leave the disks configured for the primary node
2622 # this is a workaround that would be fixed better by
2623 # improving the logical/physical id handling
2624 for disk in instance.disks:
2625 lu.cfg.SetDiskID(disk, instance.primary_node)
2627 return disks_ok, device_info
2630 def _StartInstanceDisks(lu, instance, force):
2631 """Start the disks of an instance.
2634 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2635 ignore_secondaries=force)
2637 _ShutdownInstanceDisks(lu, instance)
2638 if force is not None and not force:
2639 lu.proc.LogWarning("", hint="If the message above refers to a"
2641 " you can retry the operation using '--force'.")
2642 raise errors.OpExecError("Disk consistency error")
2645 class LUDeactivateInstanceDisks(NoHooksLU):
2646 """Shutdown an instance's disks.
2649 _OP_REQP = ["instance_name"]
2652 def ExpandNames(self):
2653 self._ExpandAndLockInstance()
2654 self.needed_locks[locking.LEVEL_NODE] = []
2655 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2657 def DeclareLocks(self, level):
2658 if level == locking.LEVEL_NODE:
2659 self._LockInstancesNodes()
2661 def CheckPrereq(self):
2662 """Check prerequisites.
2664 This checks that the instance is in the cluster.
2667 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2668 assert self.instance is not None, \
2669 "Cannot retrieve locked instance %s" % self.op.instance_name
2671 def Exec(self, feedback_fn):
2672 """Deactivate the disks
2675 instance = self.instance
2676 _SafeShutdownInstanceDisks(self, instance)
2679 def _SafeShutdownInstanceDisks(lu, instance):
2680 """Shutdown block devices of an instance.
2682 This function checks if an instance is running, before calling
2683 _ShutdownInstanceDisks.
2686 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2687 [instance.hypervisor])
2688 ins_l = ins_l[instance.primary_node]
2689 if ins_l.failed or not isinstance(ins_l.data, list):
2690 raise errors.OpExecError("Can't contact node '%s'" %
2691 instance.primary_node)
2693 if instance.name in ins_l.data:
2694 raise errors.OpExecError("Instance is running, can't shutdown"
2697 _ShutdownInstanceDisks(lu, instance)
2700 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2701 """Shutdown block devices of an instance.
2703 This does the shutdown on all nodes of the instance.
2705 If the ignore_primary is false, errors on the primary node are
2710 for disk in instance.disks:
2711 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2712 lu.cfg.SetDiskID(top_disk, node)
2713 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2714 msg = result.RemoteFailMsg()
2716 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2717 disk.iv_name, node, msg)
2718 if not ignore_primary or node != instance.primary_node:
2723 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2724 """Checks if a node has enough free memory.
2726 This function check if a given node has the needed amount of free
2727 memory. In case the node has less memory or we cannot get the
2728 information from the node, this function raise an OpPrereqError
2731 @type lu: C{LogicalUnit}
2732 @param lu: a logical unit from which we get configuration data
2734 @param node: the node to check
2735 @type reason: C{str}
2736 @param reason: string to use in the error message
2737 @type requested: C{int}
2738 @param requested: the amount of memory in MiB to check for
2739 @type hypervisor_name: C{str}
2740 @param hypervisor_name: the hypervisor to ask for memory stats
2741 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2742 we cannot check the node
2745 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2746 nodeinfo[node].Raise()
2747 free_mem = nodeinfo[node].data.get('memory_free')
2748 if not isinstance(free_mem, int):
2749 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2750 " was '%s'" % (node, free_mem))
2751 if requested > free_mem:
2752 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2753 " needed %s MiB, available %s MiB" %
2754 (node, reason, requested, free_mem))
2757 class LUStartupInstance(LogicalUnit):
2758 """Starts an instance.
2761 HPATH = "instance-start"
2762 HTYPE = constants.HTYPE_INSTANCE
2763 _OP_REQP = ["instance_name", "force"]
2766 def ExpandNames(self):
2767 self._ExpandAndLockInstance()
2769 def BuildHooksEnv(self):
2772 This runs on master, primary and secondary nodes of the instance.
2776 "FORCE": self.op.force,
2778 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2779 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2782 def CheckPrereq(self):
2783 """Check prerequisites.
2785 This checks that the instance is in the cluster.
2788 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2789 assert self.instance is not None, \
2790 "Cannot retrieve locked instance %s" % self.op.instance_name
2793 self.beparams = getattr(self.op, "beparams", {})
2795 if not isinstance(self.beparams, dict):
2796 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2797 " dict" % (type(self.beparams), ))
2798 # fill the beparams dict
2799 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2800 self.op.beparams = self.beparams
2803 self.hvparams = getattr(self.op, "hvparams", {})
2805 if not isinstance(self.hvparams, dict):
2806 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2807 " dict" % (type(self.hvparams), ))
2809 # check hypervisor parameter syntax (locally)
2810 cluster = self.cfg.GetClusterInfo()
2811 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2812 filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2814 filled_hvp.update(self.hvparams)
2815 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2816 hv_type.CheckParameterSyntax(filled_hvp)
2817 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2818 self.op.hvparams = self.hvparams
2820 _CheckNodeOnline(self, instance.primary_node)
2822 bep = self.cfg.GetClusterInfo().FillBE(instance)
2823 # check bridges existance
2824 _CheckInstanceBridgesExist(self, instance)
2826 remote_info = self.rpc.call_instance_info(instance.primary_node,
2828 instance.hypervisor)
2830 if not remote_info.data:
2831 _CheckNodeFreeMemory(self, instance.primary_node,
2832 "starting instance %s" % instance.name,
2833 bep[constants.BE_MEMORY], instance.hypervisor)
2835 def Exec(self, feedback_fn):
2836 """Start the instance.
2839 instance = self.instance
2840 force = self.op.force
2842 self.cfg.MarkInstanceUp(instance.name)
2844 node_current = instance.primary_node
2846 _StartInstanceDisks(self, instance, force)
2848 result = self.rpc.call_instance_start(node_current, instance,
2849 self.hvparams, self.beparams)
2850 msg = result.RemoteFailMsg()
2852 _ShutdownInstanceDisks(self, instance)
2853 raise errors.OpExecError("Could not start instance: %s" % msg)
2856 class LURebootInstance(LogicalUnit):
2857 """Reboot an instance.
2860 HPATH = "instance-reboot"
2861 HTYPE = constants.HTYPE_INSTANCE
2862 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2865 def ExpandNames(self):
2866 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2867 constants.INSTANCE_REBOOT_HARD,
2868 constants.INSTANCE_REBOOT_FULL]:
2869 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2870 (constants.INSTANCE_REBOOT_SOFT,
2871 constants.INSTANCE_REBOOT_HARD,
2872 constants.INSTANCE_REBOOT_FULL))
2873 self._ExpandAndLockInstance()
2875 def BuildHooksEnv(self):
2878 This runs on master, primary and secondary nodes of the instance.
2882 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2883 "REBOOT_TYPE": self.op.reboot_type,
2885 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2886 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2889 def CheckPrereq(self):
2890 """Check prerequisites.
2892 This checks that the instance is in the cluster.
2895 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2896 assert self.instance is not None, \
2897 "Cannot retrieve locked instance %s" % self.op.instance_name
2899 _CheckNodeOnline(self, instance.primary_node)
2901 # check bridges existance
2902 _CheckInstanceBridgesExist(self, instance)
2904 def Exec(self, feedback_fn):
2905 """Reboot the instance.
2908 instance = self.instance
2909 ignore_secondaries = self.op.ignore_secondaries
2910 reboot_type = self.op.reboot_type
2912 node_current = instance.primary_node
2914 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2915 constants.INSTANCE_REBOOT_HARD]:
2916 for disk in instance.disks:
2917 self.cfg.SetDiskID(disk, node_current)
2918 result = self.rpc.call_instance_reboot(node_current, instance,
2920 msg = result.RemoteFailMsg()
2922 raise errors.OpExecError("Could not reboot instance: %s" % msg)
2924 result = self.rpc.call_instance_shutdown(node_current, instance)
2925 msg = result.RemoteFailMsg()
2927 raise errors.OpExecError("Could not shutdown instance for"
2928 " full reboot: %s" % msg)
2929 _ShutdownInstanceDisks(self, instance)
2930 _StartInstanceDisks(self, instance, ignore_secondaries)
2931 result = self.rpc.call_instance_start(node_current, instance, None, None)
2932 msg = result.RemoteFailMsg()
2934 _ShutdownInstanceDisks(self, instance)
2935 raise errors.OpExecError("Could not start instance for"
2936 " full reboot: %s" % msg)
2938 self.cfg.MarkInstanceUp(instance.name)
2941 class LUShutdownInstance(LogicalUnit):
2942 """Shutdown an instance.
2945 HPATH = "instance-stop"
2946 HTYPE = constants.HTYPE_INSTANCE
2947 _OP_REQP = ["instance_name"]
2950 def ExpandNames(self):
2951 self._ExpandAndLockInstance()
2953 def BuildHooksEnv(self):
2956 This runs on master, primary and secondary nodes of the instance.
2959 env = _BuildInstanceHookEnvByObject(self, self.instance)
2960 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2963 def CheckPrereq(self):
2964 """Check prerequisites.
2966 This checks that the instance is in the cluster.
2969 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2970 assert self.instance is not None, \
2971 "Cannot retrieve locked instance %s" % self.op.instance_name
2972 _CheckNodeOnline(self, self.instance.primary_node)
2974 def Exec(self, feedback_fn):
2975 """Shutdown the instance.
2978 instance = self.instance
2979 node_current = instance.primary_node
2980 self.cfg.MarkInstanceDown(instance.name)
2981 result = self.rpc.call_instance_shutdown(node_current, instance)
2982 msg = result.RemoteFailMsg()
2984 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2986 _ShutdownInstanceDisks(self, instance)
2989 class LUReinstallInstance(LogicalUnit):
2990 """Reinstall an instance.
2993 HPATH = "instance-reinstall"
2994 HTYPE = constants.HTYPE_INSTANCE
2995 _OP_REQP = ["instance_name"]
2998 def ExpandNames(self):
2999 self._ExpandAndLockInstance()
3001 def BuildHooksEnv(self):
3004 This runs on master, primary and secondary nodes of the instance.
3007 env = _BuildInstanceHookEnvByObject(self, self.instance)
3008 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3011 def CheckPrereq(self):
3012 """Check prerequisites.
3014 This checks that the instance is in the cluster and is not running.
3017 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3018 assert instance is not None, \
3019 "Cannot retrieve locked instance %s" % self.op.instance_name
3020 _CheckNodeOnline(self, instance.primary_node)
3022 if instance.disk_template == constants.DT_DISKLESS:
3023 raise errors.OpPrereqError("Instance '%s' has no disks" %
3024 self.op.instance_name)
3025 if instance.admin_up:
3026 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3027 self.op.instance_name)
3028 remote_info = self.rpc.call_instance_info(instance.primary_node,
3030 instance.hypervisor)
3032 if remote_info.data:
3033 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3034 (self.op.instance_name,
3035 instance.primary_node))
3037 self.op.os_type = getattr(self.op, "os_type", None)
3038 if self.op.os_type is not None:
3040 pnode = self.cfg.GetNodeInfo(
3041 self.cfg.ExpandNodeName(instance.primary_node))
3043 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3045 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3047 if not isinstance(result.data, objects.OS):
3048 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3049 " primary node" % self.op.os_type)
3051 self.instance = instance
3053 def Exec(self, feedback_fn):
3054 """Reinstall the instance.
3057 inst = self.instance
3059 if self.op.os_type is not None:
3060 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3061 inst.os = self.op.os_type
3062 self.cfg.Update(inst)
3064 _StartInstanceDisks(self, inst, None)
3066 feedback_fn("Running the instance OS create scripts...")
3067 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3068 msg = result.RemoteFailMsg()
3070 raise errors.OpExecError("Could not install OS for instance %s"
3072 (inst.name, inst.primary_node, msg))
3074 _ShutdownInstanceDisks(self, inst)
3077 class LURenameInstance(LogicalUnit):
3078 """Rename an instance.
3081 HPATH = "instance-rename"
3082 HTYPE = constants.HTYPE_INSTANCE
3083 _OP_REQP = ["instance_name", "new_name"]
3085 def BuildHooksEnv(self):
3088 This runs on master, primary and secondary nodes of the instance.
3091 env = _BuildInstanceHookEnvByObject(self, self.instance)
3092 env["INSTANCE_NEW_NAME"] = self.op.new_name
3093 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3096 def CheckPrereq(self):
3097 """Check prerequisites.
3099 This checks that the instance is in the cluster and is not running.
3102 instance = self.cfg.GetInstanceInfo(
3103 self.cfg.ExpandInstanceName(self.op.instance_name))
3104 if instance is None:
3105 raise errors.OpPrereqError("Instance '%s' not known" %
3106 self.op.instance_name)
3107 _CheckNodeOnline(self, instance.primary_node)
3109 if instance.admin_up:
3110 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3111 self.op.instance_name)
3112 remote_info = self.rpc.call_instance_info(instance.primary_node,
3114 instance.hypervisor)
3116 if remote_info.data:
3117 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3118 (self.op.instance_name,
3119 instance.primary_node))
3120 self.instance = instance
3122 # new name verification
3123 name_info = utils.HostInfo(self.op.new_name)
3125 self.op.new_name = new_name = name_info.name
3126 instance_list = self.cfg.GetInstanceList()
3127 if new_name in instance_list:
3128 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3131 if not getattr(self.op, "ignore_ip", False):
3132 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3133 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3134 (name_info.ip, new_name))
3137 def Exec(self, feedback_fn):
3138 """Reinstall the instance.
3141 inst = self.instance
3142 old_name = inst.name
3144 if inst.disk_template == constants.DT_FILE:
3145 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3147 self.cfg.RenameInstance(inst.name, self.op.new_name)
3148 # Change the instance lock. This is definitely safe while we hold the BGL
3149 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3150 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3152 # re-read the instance from the configuration after rename
3153 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3155 if inst.disk_template == constants.DT_FILE:
3156 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3157 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3158 old_file_storage_dir,
3159 new_file_storage_dir)
3162 raise errors.OpExecError("Could not connect to node '%s' to rename"
3163 " directory '%s' to '%s' (but the instance"
3164 " has been renamed in Ganeti)" % (
3165 inst.primary_node, old_file_storage_dir,
3166 new_file_storage_dir))
3168 if not result.data[0]:
3169 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3170 " (but the instance has been renamed in"
3171 " Ganeti)" % (old_file_storage_dir,
3172 new_file_storage_dir))
3174 _StartInstanceDisks(self, inst, None)
3176 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3178 msg = result.RemoteFailMsg()
3180 msg = ("Could not run OS rename script for instance %s on node %s"
3181 " (but the instance has been renamed in Ganeti): %s" %
3182 (inst.name, inst.primary_node, msg))
3183 self.proc.LogWarning(msg)
3185 _ShutdownInstanceDisks(self, inst)
3188 class LURemoveInstance(LogicalUnit):
3189 """Remove an instance.
3192 HPATH = "instance-remove"
3193 HTYPE = constants.HTYPE_INSTANCE
3194 _OP_REQP = ["instance_name", "ignore_failures"]
3197 def ExpandNames(self):
3198 self._ExpandAndLockInstance()
3199 self.needed_locks[locking.LEVEL_NODE] = []
3200 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3202 def DeclareLocks(self, level):
3203 if level == locking.LEVEL_NODE:
3204 self._LockInstancesNodes()
3206 def BuildHooksEnv(self):
3209 This runs on master, primary and secondary nodes of the instance.
3212 env = _BuildInstanceHookEnvByObject(self, self.instance)
3213 nl = [self.cfg.GetMasterNode()]
3216 def CheckPrereq(self):
3217 """Check prerequisites.
3219 This checks that the instance is in the cluster.
3222 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3223 assert self.instance is not None, \
3224 "Cannot retrieve locked instance %s" % self.op.instance_name
3226 def Exec(self, feedback_fn):
3227 """Remove the instance.
3230 instance = self.instance
3231 logging.info("Shutting down instance %s on node %s",
3232 instance.name, instance.primary_node)
3234 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3235 msg = result.RemoteFailMsg()
3237 if self.op.ignore_failures:
3238 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3240 raise errors.OpExecError("Could not shutdown instance %s on"
3242 (instance.name, instance.primary_node, msg))
3244 logging.info("Removing block devices for instance %s", instance.name)
3246 if not _RemoveDisks(self, instance):
3247 if self.op.ignore_failures:
3248 feedback_fn("Warning: can't remove instance's disks")
3250 raise errors.OpExecError("Can't remove instance's disks")
3252 logging.info("Removing instance %s out of cluster config", instance.name)
3254 self.cfg.RemoveInstance(instance.name)
3255 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3258 class LUQueryInstances(NoHooksLU):
3259 """Logical unit for querying instances.
3262 _OP_REQP = ["output_fields", "names", "use_locking"]
3264 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3266 "disk_template", "ip", "mac", "bridge",
3267 "sda_size", "sdb_size", "vcpus", "tags",
3268 "network_port", "beparams",
3269 r"(disk)\.(size)/([0-9]+)",
3270 r"(disk)\.(sizes)", "disk_usage",
3271 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3272 r"(nic)\.(macs|ips|bridges)",
3273 r"(disk|nic)\.(count)",
3274 "serial_no", "hypervisor", "hvparams",] +
3276 for name in constants.HVS_PARAMETERS] +
3278 for name in constants.BES_PARAMETERS])
3279 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3282 def ExpandNames(self):
3283 _CheckOutputFields(static=self._FIELDS_STATIC,
3284 dynamic=self._FIELDS_DYNAMIC,
3285 selected=self.op.output_fields)
3287 self.needed_locks = {}
3288 self.share_locks[locking.LEVEL_INSTANCE] = 1
3289 self.share_locks[locking.LEVEL_NODE] = 1
3292 self.wanted = _GetWantedInstances(self, self.op.names)
3294 self.wanted = locking.ALL_SET
3296 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3297 self.do_locking = self.do_node_query and self.op.use_locking
3299 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3300 self.needed_locks[locking.LEVEL_NODE] = []
3301 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3303 def DeclareLocks(self, level):
3304 if level == locking.LEVEL_NODE and self.do_locking:
3305 self._LockInstancesNodes()
3307 def CheckPrereq(self):
3308 """Check prerequisites.
3313 def Exec(self, feedback_fn):
3314 """Computes the list of nodes and their attributes.
3317 all_info = self.cfg.GetAllInstancesInfo()
3318 if self.wanted == locking.ALL_SET:
3319 # caller didn't specify instance names, so ordering is not important
3321 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3323 instance_names = all_info.keys()
3324 instance_names = utils.NiceSort(instance_names)
3326 # caller did specify names, so we must keep the ordering
3328 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3330 tgt_set = all_info.keys()
3331 missing = set(self.wanted).difference(tgt_set)
3333 raise errors.OpExecError("Some instances were removed before"
3334 " retrieving their data: %s" % missing)
3335 instance_names = self.wanted
3337 instance_list = [all_info[iname] for iname in instance_names]
3339 # begin data gathering
3341 nodes = frozenset([inst.primary_node for inst in instance_list])
3342 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3346 if self.do_node_query:
3348 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3350 result = node_data[name]
3352 # offline nodes will be in both lists
3353 off_nodes.append(name)
3355 bad_nodes.append(name)
3358 live_data.update(result.data)
3359 # else no instance is alive
3361 live_data = dict([(name, {}) for name in instance_names])
3363 # end data gathering
3368 for instance in instance_list:
3370 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3371 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3372 for field in self.op.output_fields:
3373 st_match = self._FIELDS_STATIC.Matches(field)
3378 elif field == "pnode":
3379 val = instance.primary_node
3380 elif field == "snodes":
3381 val = list(instance.secondary_nodes)
3382 elif field == "admin_state":
3383 val = instance.admin_up
3384 elif field == "oper_state":
3385 if instance.primary_node in bad_nodes:
3388 val = bool(live_data.get(instance.name))
3389 elif field == "status":
3390 if instance.primary_node in off_nodes:
3391 val = "ERROR_nodeoffline"
3392 elif instance.primary_node in bad_nodes:
3393 val = "ERROR_nodedown"
3395 running = bool(live_data.get(instance.name))
3397 if instance.admin_up:
3402 if instance.admin_up:
3406 elif field == "oper_ram":
3407 if instance.primary_node in bad_nodes:
3409 elif instance.name in live_data:
3410 val = live_data[instance.name].get("memory", "?")
3413 elif field == "disk_template":
3414 val = instance.disk_template
3417 val = instance.nics[0].ip
3420 elif field == "bridge":
3422 val = instance.nics[0].bridge
3425 elif field == "mac":
3427 val = instance.nics[0].mac
3430 elif field == "sda_size" or field == "sdb_size":
3431 idx = ord(field[2]) - ord('a')
3433 val = instance.FindDisk(idx).size
3434 except errors.OpPrereqError:
3436 elif field == "disk_usage": # total disk usage per node
3437 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3438 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3439 elif field == "tags":
3440 val = list(instance.GetTags())
3441 elif field == "serial_no":
3442 val = instance.serial_no
3443 elif field == "network_port":
3444 val = instance.network_port
3445 elif field == "hypervisor":
3446 val = instance.hypervisor
3447 elif field == "hvparams":
3449 elif (field.startswith(HVPREFIX) and
3450 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3451 val = i_hv.get(field[len(HVPREFIX):], None)
3452 elif field == "beparams":
3454 elif (field.startswith(BEPREFIX) and
3455 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3456 val = i_be.get(field[len(BEPREFIX):], None)
3457 elif st_match and st_match.groups():
3458 # matches a variable list
3459 st_groups = st_match.groups()
3460 if st_groups and st_groups[0] == "disk":
3461 if st_groups[1] == "count":
3462 val = len(instance.disks)
3463 elif st_groups[1] == "sizes":
3464 val = [disk.size for disk in instance.disks]
3465 elif st_groups[1] == "size":
3467 val = instance.FindDisk(st_groups[2]).size
3468 except errors.OpPrereqError:
3471 assert False, "Unhandled disk parameter"
3472 elif st_groups[0] == "nic":
3473 if st_groups[1] == "count":
3474 val = len(instance.nics)
3475 elif st_groups[1] == "macs":
3476 val = [nic.mac for nic in instance.nics]
3477 elif st_groups[1] == "ips":
3478 val = [nic.ip for nic in instance.nics]
3479 elif st_groups[1] == "bridges":
3480 val = [nic.bridge for nic in instance.nics]
3483 nic_idx = int(st_groups[2])
3484 if nic_idx >= len(instance.nics):
3487 if st_groups[1] == "mac":
3488 val = instance.nics[nic_idx].mac
3489 elif st_groups[1] == "ip":
3490 val = instance.nics[nic_idx].ip
3491 elif st_groups[1] == "bridge":
3492 val = instance.nics[nic_idx].bridge
3494 assert False, "Unhandled NIC parameter"
3496 assert False, "Unhandled variable parameter"
3498 raise errors.ParameterError(field)
3505 class LUFailoverInstance(LogicalUnit):
3506 """Failover an instance.
3509 HPATH = "instance-failover"
3510 HTYPE = constants.HTYPE_INSTANCE
3511 _OP_REQP = ["instance_name", "ignore_consistency"]
3514 def ExpandNames(self):
3515 self._ExpandAndLockInstance()
3516 self.needed_locks[locking.LEVEL_NODE] = []
3517 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3519 def DeclareLocks(self, level):
3520 if level == locking.LEVEL_NODE:
3521 self._LockInstancesNodes()
3523 def BuildHooksEnv(self):
3526 This runs on master, primary and secondary nodes of the instance.
3530 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3532 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3533 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3536 def CheckPrereq(self):
3537 """Check prerequisites.
3539 This checks that the instance is in the cluster.
3542 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3543 assert self.instance is not None, \
3544 "Cannot retrieve locked instance %s" % self.op.instance_name
3546 bep = self.cfg.GetClusterInfo().FillBE(instance)
3547 if instance.disk_template not in constants.DTS_NET_MIRROR:
3548 raise errors.OpPrereqError("Instance's disk layout is not"
3549 " network mirrored, cannot failover.")
3551 secondary_nodes = instance.secondary_nodes
3552 if not secondary_nodes:
3553 raise errors.ProgrammerError("no secondary node but using "
3554 "a mirrored disk template")
3556 target_node = secondary_nodes[0]
3557 _CheckNodeOnline(self, target_node)
3558 _CheckNodeNotDrained(self, target_node)
3560 if instance.admin_up:
3561 # check memory requirements on the secondary node
3562 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3563 instance.name, bep[constants.BE_MEMORY],
3564 instance.hypervisor)
3566 self.LogInfo("Not checking memory on the secondary node as"
3567 " instance will not be started")
3569 # check bridge existance
3570 brlist = [nic.bridge for nic in instance.nics]
3571 result = self.rpc.call_bridges_exist(target_node, brlist)
3574 raise errors.OpPrereqError("One or more target bridges %s does not"
3575 " exist on destination node '%s'" %
3576 (brlist, target_node))
3578 def Exec(self, feedback_fn):
3579 """Failover an instance.
3581 The failover is done by shutting it down on its present node and
3582 starting it on the secondary.
3585 instance = self.instance
3587 source_node = instance.primary_node
3588 target_node = instance.secondary_nodes[0]
3590 feedback_fn("* checking disk consistency between source and target")
3591 for dev in instance.disks:
3592 # for drbd, these are drbd over lvm
3593 if not _CheckDiskConsistency(self, dev, target_node, False):
3594 if instance.admin_up and not self.op.ignore_consistency:
3595 raise errors.OpExecError("Disk %s is degraded on target node,"
3596 " aborting failover." % dev.iv_name)
3598 feedback_fn("* shutting down instance on source node")
3599 logging.info("Shutting down instance %s on node %s",
3600 instance.name, source_node)
3602 result = self.rpc.call_instance_shutdown(source_node, instance)
3603 msg = result.RemoteFailMsg()
3605 if self.op.ignore_consistency:
3606 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3607 " Proceeding anyway. Please make sure node"
3608 " %s is down. Error details: %s",
3609 instance.name, source_node, source_node, msg)
3611 raise errors.OpExecError("Could not shutdown instance %s on"
3613 (instance.name, source_node, msg))
3615 feedback_fn("* deactivating the instance's disks on source node")
3616 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3617 raise errors.OpExecError("Can't shut down the instance's disks.")
3619 instance.primary_node = target_node
3620 # distribute new instance config to the other nodes
3621 self.cfg.Update(instance)
3623 # Only start the instance if it's marked as up
3624 if instance.admin_up:
3625 feedback_fn("* activating the instance's disks on target node")
3626 logging.info("Starting instance %s on node %s",
3627 instance.name, target_node)
3629 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3630 ignore_secondaries=True)
3632 _ShutdownInstanceDisks(self, instance)
3633 raise errors.OpExecError("Can't activate the instance's disks")
3635 feedback_fn("* starting the instance on the target node")
3636 result = self.rpc.call_instance_start(target_node, instance, None, None)
3637 msg = result.RemoteFailMsg()
3639 _ShutdownInstanceDisks(self, instance)
3640 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3641 (instance.name, target_node, msg))
3644 class LUMigrateInstance(LogicalUnit):
3645 """Migrate an instance.
3647 This is migration without shutting down, compared to the failover,
3648 which is done with shutdown.
3651 HPATH = "instance-migrate"
3652 HTYPE = constants.HTYPE_INSTANCE
3653 _OP_REQP = ["instance_name", "live", "cleanup"]
3657 def ExpandNames(self):
3658 self._ExpandAndLockInstance()
3659 self.needed_locks[locking.LEVEL_NODE] = []
3660 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3662 def DeclareLocks(self, level):
3663 if level == locking.LEVEL_NODE:
3664 self._LockInstancesNodes()
3666 def BuildHooksEnv(self):
3669 This runs on master, primary and secondary nodes of the instance.
3672 env = _BuildInstanceHookEnvByObject(self, self.instance)
3673 env["MIGRATE_LIVE"] = self.op.live
3674 env["MIGRATE_CLEANUP"] = self.op.cleanup
3675 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3678 def CheckPrereq(self):
3679 """Check prerequisites.
3681 This checks that the instance is in the cluster.
3684 instance = self.cfg.GetInstanceInfo(
3685 self.cfg.ExpandInstanceName(self.op.instance_name))
3686 if instance is None:
3687 raise errors.OpPrereqError("Instance '%s' not known" %
3688 self.op.instance_name)
3690 if instance.disk_template != constants.DT_DRBD8:
3691 raise errors.OpPrereqError("Instance's disk layout is not"
3692 " drbd8, cannot migrate.")
3694 secondary_nodes = instance.secondary_nodes
3695 if not secondary_nodes:
3696 raise errors.ConfigurationError("No secondary node but using"
3697 " drbd8 disk template")
3699 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3701 target_node = secondary_nodes[0]
3702 # check memory requirements on the secondary node
3703 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3704 instance.name, i_be[constants.BE_MEMORY],
3705 instance.hypervisor)
3707 # check bridge existance
3708 brlist = [nic.bridge for nic in instance.nics]
3709 result = self.rpc.call_bridges_exist(target_node, brlist)
3710 if result.failed or not result.data:
3711 raise errors.OpPrereqError("One or more target bridges %s does not"
3712 " exist on destination node '%s'" %
3713 (brlist, target_node))
3715 if not self.op.cleanup:
3716 _CheckNodeNotDrained(self, target_node)
3717 result = self.rpc.call_instance_migratable(instance.primary_node,
3719 msg = result.RemoteFailMsg()
3721 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3724 self.instance = instance
3726 def _WaitUntilSync(self):
3727 """Poll with custom rpc for disk sync.
3729 This uses our own step-based rpc call.
3732 self.feedback_fn("* wait until resync is done")
3736 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3738 self.instance.disks)
3740 for node, nres in result.items():
3741 msg = nres.RemoteFailMsg()
3743 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3745 node_done, node_percent = nres.payload
3746 all_done = all_done and node_done
3747 if node_percent is not None:
3748 min_percent = min(min_percent, node_percent)
3750 if min_percent < 100:
3751 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3754 def _EnsureSecondary(self, node):
3755 """Demote a node to secondary.
3758 self.feedback_fn("* switching node %s to secondary mode" % node)
3760 for dev in self.instance.disks:
3761 self.cfg.SetDiskID(dev, node)
3763 result = self.rpc.call_blockdev_close(node, self.instance.name,
3764 self.instance.disks)
3765 msg = result.RemoteFailMsg()
3767 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3768 " error %s" % (node, msg))
3770 def _GoStandalone(self):
3771 """Disconnect from the network.
3774 self.feedback_fn("* changing into standalone mode")
3775 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3776 self.instance.disks)
3777 for node, nres in result.items():
3778 msg = nres.RemoteFailMsg()
3780 raise errors.OpExecError("Cannot disconnect disks node %s,"
3781 " error %s" % (node, msg))
3783 def _GoReconnect(self, multimaster):
3784 """Reconnect to the network.
3790 msg = "single-master"
3791 self.feedback_fn("* changing disks into %s mode" % msg)
3792 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3793 self.instance.disks,
3794 self.instance.name, multimaster)
3795 for node, nres in result.items():
3796 msg = nres.RemoteFailMsg()
3798 raise errors.OpExecError("Cannot change disks config on node %s,"
3799 " error: %s" % (node, msg))
3801 def _ExecCleanup(self):
3802 """Try to cleanup after a failed migration.
3804 The cleanup is done by:
3805 - check that the instance is running only on one node
3806 (and update the config if needed)
3807 - change disks on its secondary node to secondary
3808 - wait until disks are fully synchronized
3809 - disconnect from the network
3810 - change disks into single-master mode
3811 - wait again until disks are fully synchronized
3814 instance = self.instance
3815 target_node = self.target_node
3816 source_node = self.source_node
3818 # check running on only one node
3819 self.feedback_fn("* checking where the instance actually runs"
3820 " (if this hangs, the hypervisor might be in"
3822 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3823 for node, result in ins_l.items():
3825 if not isinstance(result.data, list):
3826 raise errors.OpExecError("Can't contact node '%s'" % node)
3828 runningon_source = instance.name in ins_l[source_node].data
3829 runningon_target = instance.name in ins_l[target_node].data
3831 if runningon_source and runningon_target:
3832 raise errors.OpExecError("Instance seems to be running on two nodes,"
3833 " or the hypervisor is confused. You will have"
3834 " to ensure manually that it runs only on one"
3835 " and restart this operation.")
3837 if not (runningon_source or runningon_target):
3838 raise errors.OpExecError("Instance does not seem to be running at all."
3839 " In this case, it's safer to repair by"
3840 " running 'gnt-instance stop' to ensure disk"
3841 " shutdown, and then restarting it.")
3843 if runningon_target:
3844 # the migration has actually succeeded, we need to update the config
3845 self.feedback_fn("* instance running on secondary node (%s),"
3846 " updating config" % target_node)
3847 instance.primary_node = target_node
3848 self.cfg.Update(instance)
3849 demoted_node = source_node
3851 self.feedback_fn("* instance confirmed to be running on its"
3852 " primary node (%s)" % source_node)
3853 demoted_node = target_node
3855 self._EnsureSecondary(demoted_node)
3857 self._WaitUntilSync()
3858 except errors.OpExecError:
3859 # we ignore here errors, since if the device is standalone, it
3860 # won't be able to sync
3862 self._GoStandalone()
3863 self._GoReconnect(False)
3864 self._WaitUntilSync()
3866 self.feedback_fn("* done")
3868 def _RevertDiskStatus(self):
3869 """Try to revert the disk status after a failed migration.
3872 target_node = self.target_node
3874 self._EnsureSecondary(target_node)
3875 self._GoStandalone()
3876 self._GoReconnect(False)
3877 self._WaitUntilSync()
3878 except errors.OpExecError, err:
3879 self.LogWarning("Migration failed and I can't reconnect the"
3880 " drives: error '%s'\n"
3881 "Please look and recover the instance status" %
3884 def _AbortMigration(self):
3885 """Call the hypervisor code to abort a started migration.
3888 instance = self.instance
3889 target_node = self.target_node
3890 migration_info = self.migration_info
3892 abort_result = self.rpc.call_finalize_migration(target_node,
3896 abort_msg = abort_result.RemoteFailMsg()
3898 logging.error("Aborting migration failed on target node %s: %s" %
3899 (target_node, abort_msg))
3900 # Don't raise an exception here, as we stil have to try to revert the
3901 # disk status, even if this step failed.
3903 def _ExecMigration(self):
3904 """Migrate an instance.
3906 The migrate is done by:
3907 - change the disks into dual-master mode
3908 - wait until disks are fully synchronized again
3909 - migrate the instance
3910 - change disks on the new secondary node (the old primary) to secondary
3911 - wait until disks are fully synchronized
3912 - change disks into single-master mode
3915 instance = self.instance
3916 target_node = self.target_node
3917 source_node = self.source_node
3919 self.feedback_fn("* checking disk consistency between source and target")
3920 for dev in instance.disks:
3921 if not _CheckDiskConsistency(self, dev, target_node, False):
3922 raise errors.OpExecError("Disk %s is degraded or not fully"
3923 " synchronized on target node,"
3924 " aborting migrate." % dev.iv_name)
3926 # First get the migration information from the remote node
3927 result = self.rpc.call_migration_info(source_node, instance)
3928 msg = result.RemoteFailMsg()
3930 log_err = ("Failed fetching source migration information from %s: %s" %
3932 logging.error(log_err)
3933 raise errors.OpExecError(log_err)
3935 self.migration_info = migration_info = result.payload
3937 # Then switch the disks to master/master mode
3938 self._EnsureSecondary(target_node)
3939 self._GoStandalone()
3940 self._GoReconnect(True)
3941 self._WaitUntilSync()
3943 self.feedback_fn("* preparing %s to accept the instance" % target_node)
3944 result = self.rpc.call_accept_instance(target_node,
3947 self.nodes_ip[target_node])
3949 msg = result.RemoteFailMsg()
3951 logging.error("Instance pre-migration failed, trying to revert"
3952 " disk status: %s", msg)
3953 self._AbortMigration()
3954 self._RevertDiskStatus()
3955 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3956 (instance.name, msg))
3958 self.feedback_fn("* migrating instance to %s" % target_node)
3960 result = self.rpc.call_instance_migrate(source_node, instance,
3961 self.nodes_ip[target_node],
3963 msg = result.RemoteFailMsg()
3965 logging.error("Instance migration failed, trying to revert"
3966 " disk status: %s", msg)
3967 self._AbortMigration()
3968 self._RevertDiskStatus()
3969 raise errors.OpExecError("Could not migrate instance %s: %s" %
3970 (instance.name, msg))
3973 instance.primary_node = target_node
3974 # distribute new instance config to the other nodes
3975 self.cfg.Update(instance)
3977 result = self.rpc.call_finalize_migration(target_node,
3981 msg = result.RemoteFailMsg()
3983 logging.error("Instance migration succeeded, but finalization failed:"
3985 raise errors.OpExecError("Could not finalize instance migration: %s" %
3988 self._EnsureSecondary(source_node)
3989 self._WaitUntilSync()
3990 self._GoStandalone()
3991 self._GoReconnect(False)
3992 self._WaitUntilSync()
3994 self.feedback_fn("* done")
3996 def Exec(self, feedback_fn):
3997 """Perform the migration.
4000 self.feedback_fn = feedback_fn
4002 self.source_node = self.instance.primary_node
4003 self.target_node = self.instance.secondary_nodes[0]
4004 self.all_nodes = [self.source_node, self.target_node]
4006 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4007 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4010 return self._ExecCleanup()
4012 return self._ExecMigration()
4015 def _CreateBlockDev(lu, node, instance, device, force_create,
4017 """Create a tree of block devices on a given node.
4019 If this device type has to be created on secondaries, create it and
4022 If not, just recurse to children keeping the same 'force' value.
4024 @param lu: the lu on whose behalf we execute
4025 @param node: the node on which to create the device
4026 @type instance: L{objects.Instance}
4027 @param instance: the instance which owns the device
4028 @type device: L{objects.Disk}
4029 @param device: the device to create
4030 @type force_create: boolean
4031 @param force_create: whether to force creation of this device; this
4032 will be change to True whenever we find a device which has
4033 CreateOnSecondary() attribute
4034 @param info: the extra 'metadata' we should attach to the device
4035 (this will be represented as a LVM tag)
4036 @type force_open: boolean
4037 @param force_open: this parameter will be passes to the
4038 L{backend.BlockdevCreate} function where it specifies
4039 whether we run on primary or not, and it affects both
4040 the child assembly and the device own Open() execution
4043 if device.CreateOnSecondary():
4047 for child in device.children:
4048 _CreateBlockDev(lu, node, instance, child, force_create,
4051 if not force_create:
4054 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4057 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4058 """Create a single block device on a given node.
4060 This will not recurse over children of the device, so they must be
4063 @param lu: the lu on whose behalf we execute
4064 @param node: the node on which to create the device
4065 @type instance: L{objects.Instance}
4066 @param instance: the instance which owns the device
4067 @type device: L{objects.Disk}
4068 @param device: the device to create
4069 @param info: the extra 'metadata' we should attach to the device
4070 (this will be represented as a LVM tag)
4071 @type force_open: boolean
4072 @param force_open: this parameter will be passes to the
4073 L{backend.BlockdevCreate} function where it specifies
4074 whether we run on primary or not, and it affects both
4075 the child assembly and the device own Open() execution
4078 lu.cfg.SetDiskID(device, node)
4079 result = lu.rpc.call_blockdev_create(node, device, device.size,
4080 instance.name, force_open, info)
4081 msg = result.RemoteFailMsg()
4083 raise errors.OpExecError("Can't create block device %s on"
4084 " node %s for instance %s: %s" %
4085 (device, node, instance.name, msg))
4086 if device.physical_id is None:
4087 device.physical_id = result.payload
4090 def _GenerateUniqueNames(lu, exts):
4091 """Generate a suitable LV name.
4093 This will generate a logical volume name for the given instance.
4098 new_id = lu.cfg.GenerateUniqueID()
4099 results.append("%s%s" % (new_id, val))
4103 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4105 """Generate a drbd8 device complete with its children.
4108 port = lu.cfg.AllocatePort()
4109 vgname = lu.cfg.GetVGName()
4110 shared_secret = lu.cfg.GenerateDRBDSecret()
4111 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4112 logical_id=(vgname, names[0]))
4113 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4114 logical_id=(vgname, names[1]))
4115 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4116 logical_id=(primary, secondary, port,
4119 children=[dev_data, dev_meta],
4124 def _GenerateDiskTemplate(lu, template_name,
4125 instance_name, primary_node,
4126 secondary_nodes, disk_info,
4127 file_storage_dir, file_driver,
4129 """Generate the entire disk layout for a given template type.
4132 #TODO: compute space requirements
4134 vgname = lu.cfg.GetVGName()
4135 disk_count = len(disk_info)
4137 if template_name == constants.DT_DISKLESS:
4139 elif template_name == constants.DT_PLAIN:
4140 if len(secondary_nodes) != 0:
4141 raise errors.ProgrammerError("Wrong template configuration")
4143 names = _GenerateUniqueNames(lu, [".disk%d" % i
4144 for i in range(disk_count)])
4145 for idx, disk in enumerate(disk_info):
4146 disk_index = idx + base_index
4147 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4148 logical_id=(vgname, names[idx]),
4149 iv_name="disk/%d" % disk_index,
4151 disks.append(disk_dev)
4152 elif template_name == constants.DT_DRBD8:
4153 if len(secondary_nodes) != 1:
4154 raise errors.ProgrammerError("Wrong template configuration")
4155 remote_node = secondary_nodes[0]
4156 minors = lu.cfg.AllocateDRBDMinor(
4157 [primary_node, remote_node] * len(disk_info), instance_name)
4160 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4161 for i in range(disk_count)]):
4162 names.append(lv_prefix + "_data")
4163 names.append(lv_prefix + "_meta")
4164 for idx, disk in enumerate(disk_info):
4165 disk_index = idx + base_index
4166 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4167 disk["size"], names[idx*2:idx*2+2],
4168 "disk/%d" % disk_index,
4169 minors[idx*2], minors[idx*2+1])
4170 disk_dev.mode = disk["mode"]
4171 disks.append(disk_dev)
4172 elif template_name == constants.DT_FILE:
4173 if len(secondary_nodes) != 0:
4174 raise errors.ProgrammerError("Wrong template configuration")
4176 for idx, disk in enumerate(disk_info):
4177 disk_index = idx + base_index
4178 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4179 iv_name="disk/%d" % disk_index,
4180 logical_id=(file_driver,
4181 "%s/disk%d" % (file_storage_dir,
4184 disks.append(disk_dev)
4186 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4190 def _GetInstanceInfoText(instance):
4191 """Compute that text that should be added to the disk's metadata.
4194 return "originstname+%s" % instance.name
4197 def _CreateDisks(lu, instance):
4198 """Create all disks for an instance.
4200 This abstracts away some work from AddInstance.
4202 @type lu: L{LogicalUnit}
4203 @param lu: the logical unit on whose behalf we execute
4204 @type instance: L{objects.Instance}
4205 @param instance: the instance whose disks we should create
4207 @return: the success of the creation
4210 info = _GetInstanceInfoText(instance)
4211 pnode = instance.primary_node
4213 if instance.disk_template == constants.DT_FILE:
4214 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4215 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4217 if result.failed or not result.data:
4218 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4220 if not result.data[0]:
4221 raise errors.OpExecError("Failed to create directory '%s'" %
4224 # Note: this needs to be kept in sync with adding of disks in
4225 # LUSetInstanceParams
4226 for device in instance.disks:
4227 logging.info("Creating volume %s for instance %s",
4228 device.iv_name, instance.name)
4230 for node in instance.all_nodes:
4231 f_create = node == pnode
4232 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4235 def _RemoveDisks(lu, instance):
4236 """Remove all disks for an instance.
4238 This abstracts away some work from `AddInstance()` and
4239 `RemoveInstance()`. Note that in case some of the devices couldn't
4240 be removed, the removal will continue with the other ones (compare
4241 with `_CreateDisks()`).
4243 @type lu: L{LogicalUnit}
4244 @param lu: the logical unit on whose behalf we execute
4245 @type instance: L{objects.Instance}
4246 @param instance: the instance whose disks we should remove
4248 @return: the success of the removal
4251 logging.info("Removing block devices for instance %s", instance.name)
4254 for device in instance.disks:
4255 for node, disk in device.ComputeNodeTree(instance.primary_node):
4256 lu.cfg.SetDiskID(disk, node)
4257 msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4259 lu.LogWarning("Could not remove block device %s on node %s,"
4260 " continuing anyway: %s", device.iv_name, node, msg)
4263 if instance.disk_template == constants.DT_FILE:
4264 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4265 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4267 if result.failed or not result.data:
4268 logging.error("Could not remove directory '%s'", file_storage_dir)
4274 def _ComputeDiskSize(disk_template, disks):
4275 """Compute disk size requirements in the volume group
4278 # Required free disk space as a function of disk and swap space
4280 constants.DT_DISKLESS: None,
4281 constants.DT_PLAIN: sum(d["size"] for d in disks),
4282 # 128 MB are added for drbd metadata for each disk
4283 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4284 constants.DT_FILE: None,
4287 if disk_template not in req_size_dict:
4288 raise errors.ProgrammerError("Disk template '%s' size requirement"
4289 " is unknown" % disk_template)
4291 return req_size_dict[disk_template]
4294 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4295 """Hypervisor parameter validation.
4297 This function abstract the hypervisor parameter validation to be
4298 used in both instance create and instance modify.
4300 @type lu: L{LogicalUnit}
4301 @param lu: the logical unit for which we check
4302 @type nodenames: list
4303 @param nodenames: the list of nodes on which we should check
4304 @type hvname: string
4305 @param hvname: the name of the hypervisor we should use
4306 @type hvparams: dict
4307 @param hvparams: the parameters which we need to check
4308 @raise errors.OpPrereqError: if the parameters are not valid
4311 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4314 for node in nodenames:
4318 msg = info.RemoteFailMsg()
4320 raise errors.OpPrereqError("Hypervisor parameter validation"
4321 " failed on node %s: %s" % (node, msg))
4324 class LUCreateInstance(LogicalUnit):
4325 """Create an instance.
4328 HPATH = "instance-add"
4329 HTYPE = constants.HTYPE_INSTANCE
4330 _OP_REQP = ["instance_name", "disks", "disk_template",
4332 "wait_for_sync", "ip_check", "nics",
4333 "hvparams", "beparams"]
4336 def _ExpandNode(self, node):
4337 """Expands and checks one node name.
4340 node_full = self.cfg.ExpandNodeName(node)
4341 if node_full is None:
4342 raise errors.OpPrereqError("Unknown node %s" % node)
4345 def ExpandNames(self):
4346 """ExpandNames for CreateInstance.
4348 Figure out the right locks for instance creation.
4351 self.needed_locks = {}
4353 # set optional parameters to none if they don't exist
4354 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4355 if not hasattr(self.op, attr):
4356 setattr(self.op, attr, None)
4358 # cheap checks, mostly valid constants given
4360 # verify creation mode
4361 if self.op.mode not in (constants.INSTANCE_CREATE,
4362 constants.INSTANCE_IMPORT):
4363 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4366 # disk template and mirror node verification
4367 if self.op.disk_template not in constants.DISK_TEMPLATES:
4368 raise errors.OpPrereqError("Invalid disk template name")
4370 if self.op.hypervisor is None:
4371 self.op.hypervisor = self.cfg.GetHypervisorType()
4373 cluster = self.cfg.GetClusterInfo()
4374 enabled_hvs = cluster.enabled_hypervisors
4375 if self.op.hypervisor not in enabled_hvs:
4376 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4377 " cluster (%s)" % (self.op.hypervisor,
4378 ",".join(enabled_hvs)))
4380 # check hypervisor parameter syntax (locally)
4381 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4382 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4384 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4385 hv_type.CheckParameterSyntax(filled_hvp)
4386 self.hv_full = filled_hvp
4388 # fill and remember the beparams dict
4389 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4390 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4393 #### instance parameters check
4395 # instance name verification
4396 hostname1 = utils.HostInfo(self.op.instance_name)
4397 self.op.instance_name = instance_name = hostname1.name
4399 # this is just a preventive check, but someone might still add this
4400 # instance in the meantime, and creation will fail at lock-add time
4401 if instance_name in self.cfg.GetInstanceList():
4402 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4405 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4409 for nic in self.op.nics:
4410 # ip validity checks
4411 ip = nic.get("ip", None)
4412 if ip is None or ip.lower() == "none":
4414 elif ip.lower() == constants.VALUE_AUTO:
4415 nic_ip = hostname1.ip
4417 if not utils.IsValidIP(ip):
4418 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4419 " like a valid IP" % ip)
4422 # MAC address verification
4423 mac = nic.get("mac", constants.VALUE_AUTO)
4424 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4425 if not utils.IsValidMac(mac.lower()):
4426 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4428 # bridge verification
4429 bridge = nic.get("bridge", None)
4431 bridge = self.cfg.GetDefBridge()
4432 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4434 # disk checks/pre-build
4436 for disk in self.op.disks:
4437 mode = disk.get("mode", constants.DISK_RDWR)
4438 if mode not in constants.DISK_ACCESS_SET:
4439 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4441 size = disk.get("size", None)
4443 raise errors.OpPrereqError("Missing disk size")
4447 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4448 self.disks.append({"size": size, "mode": mode})
4450 # used in CheckPrereq for ip ping check
4451 self.check_ip = hostname1.ip
4453 # file storage checks
4454 if (self.op.file_driver and
4455 not self.op.file_driver in constants.FILE_DRIVER):
4456 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4457 self.op.file_driver)
4459 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4460 raise errors.OpPrereqError("File storage directory path not absolute")
4462 ### Node/iallocator related checks
4463 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4464 raise errors.OpPrereqError("One and only one of iallocator and primary"
4465 " node must be given")
4467 if self.op.iallocator:
4468 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4470 self.op.pnode = self._ExpandNode(self.op.pnode)
4471 nodelist = [self.op.pnode]
4472 if self.op.snode is not None:
4473 self.op.snode = self._ExpandNode(self.op.snode)
4474 nodelist.append(self.op.snode)
4475 self.needed_locks[locking.LEVEL_NODE] = nodelist
4477 # in case of import lock the source node too
4478 if self.op.mode == constants.INSTANCE_IMPORT:
4479 src_node = getattr(self.op, "src_node", None)
4480 src_path = getattr(self.op, "src_path", None)
4482 if src_path is None:
4483 self.op.src_path = src_path = self.op.instance_name
4485 if src_node is None:
4486 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4487 self.op.src_node = None
4488 if os.path.isabs(src_path):
4489 raise errors.OpPrereqError("Importing an instance from an absolute"
4490 " path requires a source node option.")
4492 self.op.src_node = src_node = self._ExpandNode(src_node)
4493 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4494 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4495 if not os.path.isabs(src_path):
4496 self.op.src_path = src_path = \
4497 os.path.join(constants.EXPORT_DIR, src_path)
4499 else: # INSTANCE_CREATE
4500 if getattr(self.op, "os_type", None) is None:
4501 raise errors.OpPrereqError("No guest OS specified")
4503 def _RunAllocator(self):
4504 """Run the allocator based on input opcode.
4507 nics = [n.ToDict() for n in self.nics]
4508 ial = IAllocator(self,
4509 mode=constants.IALLOCATOR_MODE_ALLOC,
4510 name=self.op.instance_name,
4511 disk_template=self.op.disk_template,
4514 vcpus=self.be_full[constants.BE_VCPUS],
4515 mem_size=self.be_full[constants.BE_MEMORY],
4518 hypervisor=self.op.hypervisor,
4521 ial.Run(self.op.iallocator)
4524 raise errors.OpPrereqError("Can't compute nodes using"
4525 " iallocator '%s': %s" % (self.op.iallocator,
4527 if len(ial.nodes) != ial.required_nodes:
4528 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4529 " of nodes (%s), required %s" %
4530 (self.op.iallocator, len(ial.nodes),
4531 ial.required_nodes))
4532 self.op.pnode = ial.nodes[0]
4533 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4534 self.op.instance_name, self.op.iallocator,
4535 ", ".join(ial.nodes))
4536 if ial.required_nodes == 2:
4537 self.op.snode = ial.nodes[1]
4539 def BuildHooksEnv(self):
4542 This runs on master, primary and secondary nodes of the instance.
4546 "ADD_MODE": self.op.mode,
4548 if self.op.mode == constants.INSTANCE_IMPORT:
4549 env["SRC_NODE"] = self.op.src_node
4550 env["SRC_PATH"] = self.op.src_path
4551 env["SRC_IMAGES"] = self.src_images
4553 env.update(_BuildInstanceHookEnv(
4554 name=self.op.instance_name,
4555 primary_node=self.op.pnode,
4556 secondary_nodes=self.secondaries,
4557 status=self.op.start,
4558 os_type=self.op.os_type,
4559 memory=self.be_full[constants.BE_MEMORY],
4560 vcpus=self.be_full[constants.BE_VCPUS],
4561 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4562 disk_template=self.op.disk_template,
4563 disks=[(d["size"], d["mode"]) for d in self.disks],
4566 hypervisor=self.op.hypervisor,
4569 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4574 def CheckPrereq(self):
4575 """Check prerequisites.
4578 if (not self.cfg.GetVGName() and
4579 self.op.disk_template not in constants.DTS_NOT_LVM):
4580 raise errors.OpPrereqError("Cluster does not support lvm-based"
4583 if self.op.mode == constants.INSTANCE_IMPORT:
4584 src_node = self.op.src_node
4585 src_path = self.op.src_path
4587 if src_node is None:
4588 exp_list = self.rpc.call_export_list(
4589 self.acquired_locks[locking.LEVEL_NODE])
4591 for node in exp_list:
4592 if not exp_list[node].failed and src_path in exp_list[node].data:
4594 self.op.src_node = src_node = node
4595 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4599 raise errors.OpPrereqError("No export found for relative path %s" %
4602 _CheckNodeOnline(self, src_node)
4603 result = self.rpc.call_export_info(src_node, src_path)
4606 raise errors.OpPrereqError("No export found in dir %s" % src_path)
4608 export_info = result.data
4609 if not export_info.has_section(constants.INISECT_EXP):
4610 raise errors.ProgrammerError("Corrupted export config")
4612 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4613 if (int(ei_version) != constants.EXPORT_VERSION):
4614 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4615 (ei_version, constants.EXPORT_VERSION))
4617 # Check that the new instance doesn't have less disks than the export
4618 instance_disks = len(self.disks)
4619 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4620 if instance_disks < export_disks:
4621 raise errors.OpPrereqError("Not enough disks to import."
4622 " (instance: %d, export: %d)" %
4623 (instance_disks, export_disks))
4625 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4627 for idx in range(export_disks):
4628 option = 'disk%d_dump' % idx
4629 if export_info.has_option(constants.INISECT_INS, option):
4630 # FIXME: are the old os-es, disk sizes, etc. useful?
4631 export_name = export_info.get(constants.INISECT_INS, option)
4632 image = os.path.join(src_path, export_name)
4633 disk_images.append(image)
4635 disk_images.append(False)
4637 self.src_images = disk_images
4639 old_name = export_info.get(constants.INISECT_INS, 'name')
4640 # FIXME: int() here could throw a ValueError on broken exports
4641 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4642 if self.op.instance_name == old_name:
4643 for idx, nic in enumerate(self.nics):
4644 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4645 nic_mac_ini = 'nic%d_mac' % idx
4646 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4648 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4649 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4650 if self.op.start and not self.op.ip_check:
4651 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4652 " adding an instance in start mode")
4654 if self.op.ip_check:
4655 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4656 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4657 (self.check_ip, self.op.instance_name))
4659 #### mac address generation
4660 # By generating here the mac address both the allocator and the hooks get
4661 # the real final mac address rather than the 'auto' or 'generate' value.
4662 # There is a race condition between the generation and the instance object
4663 # creation, which means that we know the mac is valid now, but we're not
4664 # sure it will be when we actually add the instance. If things go bad
4665 # adding the instance will abort because of a duplicate mac, and the
4666 # creation job will fail.
4667 for nic in self.nics:
4668 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4669 nic.mac = self.cfg.GenerateMAC()
4673 if self.op.iallocator is not None:
4674 self._RunAllocator()
4676 #### node related checks
4678 # check primary node
4679 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4680 assert self.pnode is not None, \
4681 "Cannot retrieve locked node %s" % self.op.pnode
4683 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4686 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4689 self.secondaries = []
4691 # mirror node verification
4692 if self.op.disk_template in constants.DTS_NET_MIRROR:
4693 if self.op.snode is None:
4694 raise errors.OpPrereqError("The networked disk templates need"
4696 if self.op.snode == pnode.name:
4697 raise errors.OpPrereqError("The secondary node cannot be"
4698 " the primary node.")
4699 _CheckNodeOnline(self, self.op.snode)
4700 _CheckNodeNotDrained(self, self.op.snode)
4701 self.secondaries.append(self.op.snode)
4703 nodenames = [pnode.name] + self.secondaries
4705 req_size = _ComputeDiskSize(self.op.disk_template,
4708 # Check lv size requirements
4709 if req_size is not None:
4710 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4712 for node in nodenames:
4713 info = nodeinfo[node]
4717 raise errors.OpPrereqError("Cannot get current information"
4718 " from node '%s'" % node)
4719 vg_free = info.get('vg_free', None)
4720 if not isinstance(vg_free, int):
4721 raise errors.OpPrereqError("Can't compute free disk space on"
4723 if req_size > info['vg_free']:
4724 raise errors.OpPrereqError("Not enough disk space on target node %s."
4725 " %d MB available, %d MB required" %
4726 (node, info['vg_free'], req_size))
4728 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4731 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4733 if not isinstance(result.data, objects.OS):
4734 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4735 " primary node" % self.op.os_type)
4737 # bridge check on primary node
4738 bridges = [n.bridge for n in self.nics]
4739 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4742 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4743 " exist on destination node '%s'" %
4744 (",".join(bridges), pnode.name))
4746 # memory check on primary node
4748 _CheckNodeFreeMemory(self, self.pnode.name,
4749 "creating instance %s" % self.op.instance_name,
4750 self.be_full[constants.BE_MEMORY],
4753 def Exec(self, feedback_fn):
4754 """Create and add the instance to the cluster.
4757 instance = self.op.instance_name
4758 pnode_name = self.pnode.name
4760 ht_kind = self.op.hypervisor
4761 if ht_kind in constants.HTS_REQ_PORT:
4762 network_port = self.cfg.AllocatePort()
4766 ##if self.op.vnc_bind_address is None:
4767 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4769 # this is needed because os.path.join does not accept None arguments
4770 if self.op.file_storage_dir is None:
4771 string_file_storage_dir = ""
4773 string_file_storage_dir = self.op.file_storage_dir
4775 # build the full file storage dir path
4776 file_storage_dir = os.path.normpath(os.path.join(
4777 self.cfg.GetFileStorageDir(),
4778 string_file_storage_dir, instance))
4781 disks = _GenerateDiskTemplate(self,
4782 self.op.disk_template,
4783 instance, pnode_name,
4787 self.op.file_driver,
4790 iobj = objects.Instance(name=instance, os=self.op.os_type,
4791 primary_node=pnode_name,
4792 nics=self.nics, disks=disks,
4793 disk_template=self.op.disk_template,
4795 network_port=network_port,
4796 beparams=self.op.beparams,
4797 hvparams=self.op.hvparams,
4798 hypervisor=self.op.hypervisor,
4801 feedback_fn("* creating instance disks...")
4803 _CreateDisks(self, iobj)
4804 except errors.OpExecError:
4805 self.LogWarning("Device creation failed, reverting...")
4807 _RemoveDisks(self, iobj)
4809 self.cfg.ReleaseDRBDMinors(instance)
4812 feedback_fn("adding instance %s to cluster config" % instance)
4814 self.cfg.AddInstance(iobj)
4815 # Declare that we don't want to remove the instance lock anymore, as we've
4816 # added the instance to the config
4817 del self.remove_locks[locking.LEVEL_INSTANCE]
4818 # Unlock all the nodes
4819 if self.op.mode == constants.INSTANCE_IMPORT:
4820 nodes_keep = [self.op.src_node]
4821 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4822 if node != self.op.src_node]
4823 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4824 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4826 self.context.glm.release(locking.LEVEL_NODE)
4827 del self.acquired_locks[locking.LEVEL_NODE]
4829 if self.op.wait_for_sync:
4830 disk_abort = not _WaitForSync(self, iobj)
4831 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4832 # make sure the disks are not degraded (still sync-ing is ok)
4834 feedback_fn("* checking mirrors status")
4835 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4840 _RemoveDisks(self, iobj)
4841 self.cfg.RemoveInstance(iobj.name)
4842 # Make sure the instance lock gets removed
4843 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4844 raise errors.OpExecError("There are some degraded disks for"
4847 feedback_fn("creating os for instance %s on node %s" %
4848 (instance, pnode_name))
4850 if iobj.disk_template != constants.DT_DISKLESS:
4851 if self.op.mode == constants.INSTANCE_CREATE:
4852 feedback_fn("* running the instance OS create scripts...")
4853 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4854 msg = result.RemoteFailMsg()
4856 raise errors.OpExecError("Could not add os for instance %s"
4858 (instance, pnode_name, msg))
4860 elif self.op.mode == constants.INSTANCE_IMPORT:
4861 feedback_fn("* running the instance OS import scripts...")
4862 src_node = self.op.src_node
4863 src_images = self.src_images
4864 cluster_name = self.cfg.GetClusterName()
4865 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4866 src_node, src_images,
4868 import_result.Raise()
4869 for idx, result in enumerate(import_result.data):
4871 self.LogWarning("Could not import the image %s for instance"
4872 " %s, disk %d, on node %s" %
4873 (src_images[idx], instance, idx, pnode_name))
4875 # also checked in the prereq part
4876 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4880 iobj.admin_up = True
4881 self.cfg.Update(iobj)
4882 logging.info("Starting instance %s on node %s", instance, pnode_name)
4883 feedback_fn("* starting instance...")
4884 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4885 msg = result.RemoteFailMsg()
4887 raise errors.OpExecError("Could not start instance: %s" % msg)
4890 class LUConnectConsole(NoHooksLU):
4891 """Connect to an instance's console.
4893 This is somewhat special in that it returns the command line that
4894 you need to run on the master node in order to connect to the
4898 _OP_REQP = ["instance_name"]
4901 def ExpandNames(self):
4902 self._ExpandAndLockInstance()
4904 def CheckPrereq(self):
4905 """Check prerequisites.
4907 This checks that the instance is in the cluster.
4910 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4911 assert self.instance is not None, \
4912 "Cannot retrieve locked instance %s" % self.op.instance_name
4913 _CheckNodeOnline(self, self.instance.primary_node)
4915 def Exec(self, feedback_fn):
4916 """Connect to the console of an instance
4919 instance = self.instance
4920 node = instance.primary_node
4922 node_insts = self.rpc.call_instance_list([node],
4923 [instance.hypervisor])[node]
4926 if instance.name not in node_insts.data:
4927 raise errors.OpExecError("Instance %s is not running." % instance.name)
4929 logging.debug("Connecting to console of %s on %s", instance.name, node)
4931 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4932 cluster = self.cfg.GetClusterInfo()
4933 # beparams and hvparams are passed separately, to avoid editing the
4934 # instance and then saving the defaults in the instance itself.
4935 hvparams = cluster.FillHV(instance)
4936 beparams = cluster.FillBE(instance)
4937 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4940 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4943 class LUReplaceDisks(LogicalUnit):
4944 """Replace the disks of an instance.
4947 HPATH = "mirrors-replace"
4948 HTYPE = constants.HTYPE_INSTANCE
4949 _OP_REQP = ["instance_name", "mode", "disks"]
4952 def CheckArguments(self):
4953 if not hasattr(self.op, "remote_node"):
4954 self.op.remote_node = None
4955 if not hasattr(self.op, "iallocator"):
4956 self.op.iallocator = None
4958 # check for valid parameter combination
4959 cnt = [self.op.remote_node, self.op.iallocator].count(None)
4960 if self.op.mode == constants.REPLACE_DISK_CHG:
4962 raise errors.OpPrereqError("When changing the secondary either an"
4963 " iallocator script must be used or the"
4966 raise errors.OpPrereqError("Give either the iallocator or the new"
4967 " secondary, not both")
4968 else: # not replacing the secondary
4970 raise errors.OpPrereqError("The iallocator and new node options can"
4971 " be used only when changing the"
4974 def ExpandNames(self):
4975 self._ExpandAndLockInstance()
4977 if self.op.iallocator is not None:
4978 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4979 elif self.op.remote_node is not None:
4980 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4981 if remote_node is None:
4982 raise errors.OpPrereqError("Node '%s' not known" %
4983 self.op.remote_node)
4984 self.op.remote_node = remote_node
4985 # Warning: do not remove the locking of the new secondary here
4986 # unless DRBD8.AddChildren is changed to work in parallel;
4987 # currently it doesn't since parallel invocations of
4988 # FindUnusedMinor will conflict
4989 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4990 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4992 self.needed_locks[locking.LEVEL_NODE] = []
4993 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4995 def DeclareLocks(self, level):
4996 # If we're not already locking all nodes in the set we have to declare the
4997 # instance's primary/secondary nodes.
4998 if (level == locking.LEVEL_NODE and
4999 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5000 self._LockInstancesNodes()
5002 def _RunAllocator(self):
5003 """Compute a new secondary node using an IAllocator.
5006 ial = IAllocator(self,
5007 mode=constants.IALLOCATOR_MODE_RELOC,
5008 name=self.op.instance_name,
5009 relocate_from=[self.sec_node])
5011 ial.Run(self.op.iallocator)
5014 raise errors.OpPrereqError("Can't compute nodes using"
5015 " iallocator '%s': %s" % (self.op.iallocator,
5017 if len(ial.nodes) != ial.required_nodes:
5018 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5019 " of nodes (%s), required %s" %
5020 (len(ial.nodes), ial.required_nodes))
5021 self.op.remote_node = ial.nodes[0]
5022 self.LogInfo("Selected new secondary for the instance: %s",
5023 self.op.remote_node)
5025 def BuildHooksEnv(self):
5028 This runs on the master, the primary and all the secondaries.
5032 "MODE": self.op.mode,
5033 "NEW_SECONDARY": self.op.remote_node,
5034 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5036 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5038 self.cfg.GetMasterNode(),
5039 self.instance.primary_node,
5041 if self.op.remote_node is not None:
5042 nl.append(self.op.remote_node)
5045 def CheckPrereq(self):
5046 """Check prerequisites.
5048 This checks that the instance is in the cluster.
5051 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5052 assert instance is not None, \
5053 "Cannot retrieve locked instance %s" % self.op.instance_name
5054 self.instance = instance
5056 if instance.disk_template != constants.DT_DRBD8:
5057 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5060 if len(instance.secondary_nodes) != 1:
5061 raise errors.OpPrereqError("The instance has a strange layout,"
5062 " expected one secondary but found %d" %
5063 len(instance.secondary_nodes))
5065 self.sec_node = instance.secondary_nodes[0]
5067 if self.op.iallocator is not None:
5068 self._RunAllocator()
5070 remote_node = self.op.remote_node
5071 if remote_node is not None:
5072 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5073 assert self.remote_node_info is not None, \
5074 "Cannot retrieve locked node %s" % remote_node
5076 self.remote_node_info = None
5077 if remote_node == instance.primary_node:
5078 raise errors.OpPrereqError("The specified node is the primary node of"
5080 elif remote_node == self.sec_node:
5081 raise errors.OpPrereqError("The specified node is already the"
5082 " secondary node of the instance.")
5084 if self.op.mode == constants.REPLACE_DISK_PRI:
5085 n1 = self.tgt_node = instance.primary_node
5086 n2 = self.oth_node = self.sec_node
5087 elif self.op.mode == constants.REPLACE_DISK_SEC:
5088 n1 = self.tgt_node = self.sec_node
5089 n2 = self.oth_node = instance.primary_node
5090 elif self.op.mode == constants.REPLACE_DISK_CHG:
5091 n1 = self.new_node = remote_node
5092 n2 = self.oth_node = instance.primary_node
5093 self.tgt_node = self.sec_node
5094 _CheckNodeNotDrained(self, remote_node)
5096 raise errors.ProgrammerError("Unhandled disk replace mode")
5098 _CheckNodeOnline(self, n1)
5099 _CheckNodeOnline(self, n2)
5101 if not self.op.disks:
5102 self.op.disks = range(len(instance.disks))
5104 for disk_idx in self.op.disks:
5105 instance.FindDisk(disk_idx)
5107 def _ExecD8DiskOnly(self, feedback_fn):
5108 """Replace a disk on the primary or secondary for dbrd8.
5110 The algorithm for replace is quite complicated:
5112 1. for each disk to be replaced:
5114 1. create new LVs on the target node with unique names
5115 1. detach old LVs from the drbd device
5116 1. rename old LVs to name_replaced.<time_t>
5117 1. rename new LVs to old LVs
5118 1. attach the new LVs (with the old names now) to the drbd device
5120 1. wait for sync across all devices
5122 1. for each modified disk:
5124 1. remove old LVs (which have the name name_replaces.<time_t>)
5126 Failures are not very well handled.
5130 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5131 instance = self.instance
5133 vgname = self.cfg.GetVGName()
5136 tgt_node = self.tgt_node
5137 oth_node = self.oth_node
5139 # Step: check device activation
5140 self.proc.LogStep(1, steps_total, "check device existence")
5141 info("checking volume groups")
5142 my_vg = cfg.GetVGName()
5143 results = self.rpc.call_vg_list([oth_node, tgt_node])
5145 raise errors.OpExecError("Can't list volume groups on the nodes")
5146 for node in oth_node, tgt_node:
5148 if res.failed or not res.data or my_vg not in res.data:
5149 raise errors.OpExecError("Volume group '%s' not found on %s" %
5151 for idx, dev in enumerate(instance.disks):
5152 if idx not in self.op.disks:
5154 for node in tgt_node, oth_node:
5155 info("checking disk/%d on %s" % (idx, node))
5156 cfg.SetDiskID(dev, node)
5157 result = self.rpc.call_blockdev_find(node, dev)
5158 msg = result.RemoteFailMsg()
5159 if not msg and not result.payload:
5160 msg = "disk not found"
5162 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5165 # Step: check other node consistency
5166 self.proc.LogStep(2, steps_total, "check peer consistency")
5167 for idx, dev in enumerate(instance.disks):
5168 if idx not in self.op.disks:
5170 info("checking disk/%d consistency on %s" % (idx, oth_node))
5171 if not _CheckDiskConsistency(self, dev, oth_node,
5172 oth_node==instance.primary_node):
5173 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5174 " to replace disks on this node (%s)" %
5175 (oth_node, tgt_node))
5177 # Step: create new storage
5178 self.proc.LogStep(3, steps_total, "allocate new storage")
5179 for idx, dev in enumerate(instance.disks):
5180 if idx not in self.op.disks:
5183 cfg.SetDiskID(dev, tgt_node)
5184 lv_names = [".disk%d_%s" % (idx, suf)
5185 for suf in ["data", "meta"]]
5186 names = _GenerateUniqueNames(self, lv_names)
5187 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5188 logical_id=(vgname, names[0]))
5189 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5190 logical_id=(vgname, names[1]))
5191 new_lvs = [lv_data, lv_meta]
5192 old_lvs = dev.children
5193 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5194 info("creating new local storage on %s for %s" %
5195 (tgt_node, dev.iv_name))
5196 # we pass force_create=True to force the LVM creation
5197 for new_lv in new_lvs:
5198 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5199 _GetInstanceInfoText(instance), False)
5201 # Step: for each lv, detach+rename*2+attach
5202 self.proc.LogStep(4, steps_total, "change drbd configuration")
5203 for dev, old_lvs, new_lvs in iv_names.itervalues():
5204 info("detaching %s drbd from local storage" % dev.iv_name)
5205 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5208 raise errors.OpExecError("Can't detach drbd from local storage on node"
5209 " %s for device %s" % (tgt_node, dev.iv_name))
5211 #cfg.Update(instance)
5213 # ok, we created the new LVs, so now we know we have the needed
5214 # storage; as such, we proceed on the target node to rename
5215 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5216 # using the assumption that logical_id == physical_id (which in
5217 # turn is the unique_id on that node)
5219 # FIXME(iustin): use a better name for the replaced LVs
5220 temp_suffix = int(time.time())
5221 ren_fn = lambda d, suff: (d.physical_id[0],
5222 d.physical_id[1] + "_replaced-%s" % suff)
5223 # build the rename list based on what LVs exist on the node
5225 for to_ren in old_lvs:
5226 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5227 if not result.RemoteFailMsg() and result.payload:
5229 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5231 info("renaming the old LVs on the target node")
5232 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5235 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5236 # now we rename the new LVs to the old LVs
5237 info("renaming the new LVs on the target node")
5238 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5239 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5242 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5244 for old, new in zip(old_lvs, new_lvs):
5245 new.logical_id = old.logical_id
5246 cfg.SetDiskID(new, tgt_node)
5248 for disk in old_lvs:
5249 disk.logical_id = ren_fn(disk, temp_suffix)
5250 cfg.SetDiskID(disk, tgt_node)
5252 # now that the new lvs have the old name, we can add them to the device
5253 info("adding new mirror component on %s" % tgt_node)
5254 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5255 if result.failed or not result.data:
5256 for new_lv in new_lvs:
5257 msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5259 warning("Can't rollback device %s: %s", dev, msg,
5260 hint="cleanup manually the unused logical volumes")
5261 raise errors.OpExecError("Can't add local storage to drbd")
5263 dev.children = new_lvs
5264 cfg.Update(instance)
5266 # Step: wait for sync
5268 # this can fail as the old devices are degraded and _WaitForSync
5269 # does a combined result over all disks, so we don't check its
5271 self.proc.LogStep(5, steps_total, "sync devices")
5272 _WaitForSync(self, instance, unlock=True)
5274 # so check manually all the devices
5275 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5276 cfg.SetDiskID(dev, instance.primary_node)
5277 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5278 msg = result.RemoteFailMsg()
5279 if not msg and not result.payload:
5280 msg = "disk not found"
5282 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5284 if result.payload[5]:
5285 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5287 # Step: remove old storage
5288 self.proc.LogStep(6, steps_total, "removing old storage")
5289 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5290 info("remove logical volumes for %s" % name)
5292 cfg.SetDiskID(lv, tgt_node)
5293 msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5295 warning("Can't remove old LV: %s" % msg,
5296 hint="manually remove unused LVs")
5299 def _ExecD8Secondary(self, feedback_fn):
5300 """Replace the secondary node for drbd8.
5302 The algorithm for replace is quite complicated:
5303 - for all disks of the instance:
5304 - create new LVs on the new node with same names
5305 - shutdown the drbd device on the old secondary
5306 - disconnect the drbd network on the primary
5307 - create the drbd device on the new secondary
5308 - network attach the drbd on the primary, using an artifice:
5309 the drbd code for Attach() will connect to the network if it
5310 finds a device which is connected to the good local disks but
5312 - wait for sync across all devices
5313 - remove all disks from the old secondary
5315 Failures are not very well handled.
5319 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5320 instance = self.instance
5324 old_node = self.tgt_node
5325 new_node = self.new_node
5326 pri_node = instance.primary_node
5328 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5329 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5330 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5333 # Step: check device activation
5334 self.proc.LogStep(1, steps_total, "check device existence")
5335 info("checking volume groups")
5336 my_vg = cfg.GetVGName()
5337 results = self.rpc.call_vg_list([pri_node, new_node])
5338 for node in pri_node, new_node:
5340 if res.failed or not res.data or my_vg not in res.data:
5341 raise errors.OpExecError("Volume group '%s' not found on %s" %
5343 for idx, dev in enumerate(instance.disks):
5344 if idx not in self.op.disks:
5346 info("checking disk/%d on %s" % (idx, pri_node))
5347 cfg.SetDiskID(dev, pri_node)
5348 result = self.rpc.call_blockdev_find(pri_node, dev)
5349 msg = result.RemoteFailMsg()
5350 if not msg and not result.payload:
5351 msg = "disk not found"
5353 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5354 (idx, pri_node, msg))
5356 # Step: check other node consistency
5357 self.proc.LogStep(2, steps_total, "check peer consistency")
5358 for idx, dev in enumerate(instance.disks):
5359 if idx not in self.op.disks:
5361 info("checking disk/%d consistency on %s" % (idx, pri_node))
5362 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5363 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5364 " unsafe to replace the secondary" %
5367 # Step: create new storage
5368 self.proc.LogStep(3, steps_total, "allocate new storage")
5369 for idx, dev in enumerate(instance.disks):
5370 info("adding new local storage on %s for disk/%d" %
5372 # we pass force_create=True to force LVM creation
5373 for new_lv in dev.children:
5374 _CreateBlockDev(self, new_node, instance, new_lv, True,
5375 _GetInstanceInfoText(instance), False)
5377 # Step 4: dbrd minors and drbd setups changes
5378 # after this, we must manually remove the drbd minors on both the
5379 # error and the success paths
5380 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5382 logging.debug("Allocated minors %s" % (minors,))
5383 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5384 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5386 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5387 # create new devices on new_node; note that we create two IDs:
5388 # one without port, so the drbd will be activated without
5389 # networking information on the new node at this stage, and one
5390 # with network, for the latter activation in step 4
5391 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5392 if pri_node == o_node1:
5397 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5398 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5400 iv_names[idx] = (dev, dev.children, new_net_id)
5401 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5403 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5404 logical_id=new_alone_id,
5405 children=dev.children,
5408 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5409 _GetInstanceInfoText(instance), False)
5410 except errors.GenericError:
5411 self.cfg.ReleaseDRBDMinors(instance.name)
5414 for idx, dev in enumerate(instance.disks):
5415 # we have new devices, shutdown the drbd on the old secondary
5416 info("shutting down drbd for disk/%d on old node" % idx)
5417 cfg.SetDiskID(dev, old_node)
5418 msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5420 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5422 hint="Please cleanup this device manually as soon as possible")
5424 info("detaching primary drbds from the network (=> standalone)")
5425 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5426 instance.disks)[pri_node]
5428 msg = result.RemoteFailMsg()
5430 # detaches didn't succeed (unlikely)
5431 self.cfg.ReleaseDRBDMinors(instance.name)
5432 raise errors.OpExecError("Can't detach the disks from the network on"
5433 " old node: %s" % (msg,))
5435 # if we managed to detach at least one, we update all the disks of
5436 # the instance to point to the new secondary
5437 info("updating instance configuration")
5438 for dev, _, new_logical_id in iv_names.itervalues():
5439 dev.logical_id = new_logical_id
5440 cfg.SetDiskID(dev, pri_node)
5441 cfg.Update(instance)
5443 # and now perform the drbd attach
5444 info("attaching primary drbds to new secondary (standalone => connected)")
5445 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5446 instance.disks, instance.name,
5448 for to_node, to_result in result.items():
5449 msg = to_result.RemoteFailMsg()
5451 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5452 hint="please do a gnt-instance info to see the"
5455 # this can fail as the old devices are degraded and _WaitForSync
5456 # does a combined result over all disks, so we don't check its
5458 self.proc.LogStep(5, steps_total, "sync devices")
5459 _WaitForSync(self, instance, unlock=True)
5461 # so check manually all the devices
5462 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5463 cfg.SetDiskID(dev, pri_node)
5464 result = self.rpc.call_blockdev_find(pri_node, dev)
5465 msg = result.RemoteFailMsg()
5466 if not msg and not result.payload:
5467 msg = "disk not found"
5469 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5471 if result.payload[5]:
5472 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5474 self.proc.LogStep(6, steps_total, "removing old storage")
5475 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5476 info("remove logical volumes for disk/%d" % idx)
5478 cfg.SetDiskID(lv, old_node)
5479 msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5481 warning("Can't remove LV on old secondary: %s", msg,
5482 hint="Cleanup stale volumes by hand")
5484 def Exec(self, feedback_fn):
5485 """Execute disk replacement.
5487 This dispatches the disk replacement to the appropriate handler.
5490 instance = self.instance
5492 # Activate the instance disks if we're replacing them on a down instance
5493 if not instance.admin_up:
5494 _StartInstanceDisks(self, instance, True)
5496 if self.op.mode == constants.REPLACE_DISK_CHG:
5497 fn = self._ExecD8Secondary
5499 fn = self._ExecD8DiskOnly
5501 ret = fn(feedback_fn)
5503 # Deactivate the instance disks if we're replacing them on a down instance
5504 if not instance.admin_up:
5505 _SafeShutdownInstanceDisks(self, instance)
5510 class LUGrowDisk(LogicalUnit):
5511 """Grow a disk of an instance.
5515 HTYPE = constants.HTYPE_INSTANCE
5516 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5519 def ExpandNames(self):
5520 self._ExpandAndLockInstance()
5521 self.needed_locks[locking.LEVEL_NODE] = []
5522 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5524 def DeclareLocks(self, level):
5525 if level == locking.LEVEL_NODE:
5526 self._LockInstancesNodes()
5528 def BuildHooksEnv(self):
5531 This runs on the master, the primary and all the secondaries.
5535 "DISK": self.op.disk,
5536 "AMOUNT": self.op.amount,
5538 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5540 self.cfg.GetMasterNode(),
5541 self.instance.primary_node,
5545 def CheckPrereq(self):
5546 """Check prerequisites.
5548 This checks that the instance is in the cluster.
5551 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5552 assert instance is not None, \
5553 "Cannot retrieve locked instance %s" % self.op.instance_name
5554 nodenames = list(instance.all_nodes)
5555 for node in nodenames:
5556 _CheckNodeOnline(self, node)
5559 self.instance = instance
5561 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5562 raise errors.OpPrereqError("Instance's disk layout does not support"
5565 self.disk = instance.FindDisk(self.op.disk)
5567 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5568 instance.hypervisor)
5569 for node in nodenames:
5570 info = nodeinfo[node]
5571 if info.failed or not info.data:
5572 raise errors.OpPrereqError("Cannot get current information"
5573 " from node '%s'" % node)
5574 vg_free = info.data.get('vg_free', None)
5575 if not isinstance(vg_free, int):
5576 raise errors.OpPrereqError("Can't compute free disk space on"
5578 if self.op.amount > vg_free:
5579 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5580 " %d MiB available, %d MiB required" %
5581 (node, vg_free, self.op.amount))
5583 def Exec(self, feedback_fn):
5584 """Execute disk grow.
5587 instance = self.instance
5589 for node in instance.all_nodes:
5590 self.cfg.SetDiskID(disk, node)
5591 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5592 msg = result.RemoteFailMsg()
5594 raise errors.OpExecError("Grow request failed to node %s: %s" %
5596 disk.RecordGrow(self.op.amount)
5597 self.cfg.Update(instance)
5598 if self.op.wait_for_sync:
5599 disk_abort = not _WaitForSync(self, instance)
5601 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5602 " status.\nPlease check the instance.")
5605 class LUQueryInstanceData(NoHooksLU):
5606 """Query runtime instance data.
5609 _OP_REQP = ["instances", "static"]
5612 def ExpandNames(self):
5613 self.needed_locks = {}
5614 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5616 if not isinstance(self.op.instances, list):
5617 raise errors.OpPrereqError("Invalid argument type 'instances'")
5619 if self.op.instances:
5620 self.wanted_names = []
5621 for name in self.op.instances:
5622 full_name = self.cfg.ExpandInstanceName(name)
5623 if full_name is None:
5624 raise errors.OpPrereqError("Instance '%s' not known" % name)
5625 self.wanted_names.append(full_name)
5626 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5628 self.wanted_names = None
5629 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5631 self.needed_locks[locking.LEVEL_NODE] = []
5632 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5634 def DeclareLocks(self, level):
5635 if level == locking.LEVEL_NODE:
5636 self._LockInstancesNodes()
5638 def CheckPrereq(self):
5639 """Check prerequisites.
5641 This only checks the optional instance list against the existing names.
5644 if self.wanted_names is None:
5645 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5647 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5648 in self.wanted_names]
5651 def _ComputeDiskStatus(self, instance, snode, dev):
5652 """Compute block device status.
5655 static = self.op.static
5657 self.cfg.SetDiskID(dev, instance.primary_node)
5658 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5659 if dev_pstatus.offline:
5662 msg = dev_pstatus.RemoteFailMsg()
5664 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5665 (instance.name, msg))
5666 dev_pstatus = dev_pstatus.payload
5670 if dev.dev_type in constants.LDS_DRBD:
5671 # we change the snode then (otherwise we use the one passed in)
5672 if dev.logical_id[0] == instance.primary_node:
5673 snode = dev.logical_id[1]
5675 snode = dev.logical_id[0]
5677 if snode and not static:
5678 self.cfg.SetDiskID(dev, snode)
5679 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5680 if dev_sstatus.offline:
5683 msg = dev_sstatus.RemoteFailMsg()
5685 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5686 (instance.name, msg))
5687 dev_sstatus = dev_sstatus.payload
5692 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5693 for child in dev.children]
5698 "iv_name": dev.iv_name,
5699 "dev_type": dev.dev_type,
5700 "logical_id": dev.logical_id,
5701 "physical_id": dev.physical_id,
5702 "pstatus": dev_pstatus,
5703 "sstatus": dev_sstatus,
5704 "children": dev_children,
5710 def Exec(self, feedback_fn):
5711 """Gather and return data"""
5714 cluster = self.cfg.GetClusterInfo()
5716 for instance in self.wanted_instances:
5717 if not self.op.static:
5718 remote_info = self.rpc.call_instance_info(instance.primary_node,
5720 instance.hypervisor)
5722 remote_info = remote_info.data
5723 if remote_info and "state" in remote_info:
5726 remote_state = "down"
5729 if instance.admin_up:
5732 config_state = "down"
5734 disks = [self._ComputeDiskStatus(instance, None, device)
5735 for device in instance.disks]
5738 "name": instance.name,
5739 "config_state": config_state,
5740 "run_state": remote_state,
5741 "pnode": instance.primary_node,
5742 "snodes": instance.secondary_nodes,
5744 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5746 "hypervisor": instance.hypervisor,
5747 "network_port": instance.network_port,
5748 "hv_instance": instance.hvparams,
5749 "hv_actual": cluster.FillHV(instance),
5750 "be_instance": instance.beparams,
5751 "be_actual": cluster.FillBE(instance),
5754 result[instance.name] = idict
5759 class LUSetInstanceParams(LogicalUnit):
5760 """Modifies an instances's parameters.
5763 HPATH = "instance-modify"
5764 HTYPE = constants.HTYPE_INSTANCE
5765 _OP_REQP = ["instance_name"]
5768 def CheckArguments(self):
5769 if not hasattr(self.op, 'nics'):
5771 if not hasattr(self.op, 'disks'):
5773 if not hasattr(self.op, 'beparams'):
5774 self.op.beparams = {}
5775 if not hasattr(self.op, 'hvparams'):
5776 self.op.hvparams = {}
5777 self.op.force = getattr(self.op, "force", False)
5778 if not (self.op.nics or self.op.disks or
5779 self.op.hvparams or self.op.beparams):
5780 raise errors.OpPrereqError("No changes submitted")
5784 for disk_op, disk_dict in self.op.disks:
5785 if disk_op == constants.DDM_REMOVE:
5788 elif disk_op == constants.DDM_ADD:
5791 if not isinstance(disk_op, int):
5792 raise errors.OpPrereqError("Invalid disk index")
5793 if disk_op == constants.DDM_ADD:
5794 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5795 if mode not in constants.DISK_ACCESS_SET:
5796 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5797 size = disk_dict.get('size', None)
5799 raise errors.OpPrereqError("Required disk parameter size missing")
5802 except ValueError, err:
5803 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5805 disk_dict['size'] = size
5807 # modification of disk
5808 if 'size' in disk_dict:
5809 raise errors.OpPrereqError("Disk size change not possible, use"
5812 if disk_addremove > 1:
5813 raise errors.OpPrereqError("Only one disk add or remove operation"
5814 " supported at a time")
5818 for nic_op, nic_dict in self.op.nics:
5819 if nic_op == constants.DDM_REMOVE:
5822 elif nic_op == constants.DDM_ADD:
5825 if not isinstance(nic_op, int):
5826 raise errors.OpPrereqError("Invalid nic index")
5828 # nic_dict should be a dict
5829 nic_ip = nic_dict.get('ip', None)
5830 if nic_ip is not None:
5831 if nic_ip.lower() == constants.VALUE_NONE:
5832 nic_dict['ip'] = None
5834 if not utils.IsValidIP(nic_ip):
5835 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5837 if nic_op == constants.DDM_ADD:
5838 nic_bridge = nic_dict.get('bridge', None)
5839 if nic_bridge is None:
5840 nic_dict['bridge'] = self.cfg.GetDefBridge()
5841 nic_mac = nic_dict.get('mac', None)
5843 nic_dict['mac'] = constants.VALUE_AUTO
5845 if 'mac' in nic_dict:
5846 nic_mac = nic_dict['mac']
5847 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5848 if not utils.IsValidMac(nic_mac):
5849 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5850 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5851 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5852 " modifying an existing nic")
5854 if nic_addremove > 1:
5855 raise errors.OpPrereqError("Only one NIC add or remove operation"
5856 " supported at a time")
5858 def ExpandNames(self):
5859 self._ExpandAndLockInstance()
5860 self.needed_locks[locking.LEVEL_NODE] = []
5861 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5863 def DeclareLocks(self, level):
5864 if level == locking.LEVEL_NODE:
5865 self._LockInstancesNodes()
5867 def BuildHooksEnv(self):
5870 This runs on the master, primary and secondaries.
5874 if constants.BE_MEMORY in self.be_new:
5875 args['memory'] = self.be_new[constants.BE_MEMORY]
5876 if constants.BE_VCPUS in self.be_new:
5877 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5878 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5879 # information at all.
5882 nic_override = dict(self.op.nics)
5883 for idx, nic in enumerate(self.instance.nics):
5884 if idx in nic_override:
5885 this_nic_override = nic_override[idx]
5887 this_nic_override = {}
5888 if 'ip' in this_nic_override:
5889 ip = this_nic_override['ip']
5892 if 'bridge' in this_nic_override:
5893 bridge = this_nic_override['bridge']
5896 if 'mac' in this_nic_override:
5897 mac = this_nic_override['mac']
5900 args['nics'].append((ip, bridge, mac))
5901 if constants.DDM_ADD in nic_override:
5902 ip = nic_override[constants.DDM_ADD].get('ip', None)
5903 bridge = nic_override[constants.DDM_ADD]['bridge']
5904 mac = nic_override[constants.DDM_ADD]['mac']
5905 args['nics'].append((ip, bridge, mac))
5906 elif constants.DDM_REMOVE in nic_override:
5907 del args['nics'][-1]
5909 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5910 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5913 def CheckPrereq(self):
5914 """Check prerequisites.
5916 This only checks the instance list against the existing names.
5919 force = self.force = self.op.force
5921 # checking the new params on the primary/secondary nodes
5923 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5924 assert self.instance is not None, \
5925 "Cannot retrieve locked instance %s" % self.op.instance_name
5926 pnode = instance.primary_node
5927 nodelist = list(instance.all_nodes)
5929 # hvparams processing
5930 if self.op.hvparams:
5931 i_hvdict = copy.deepcopy(instance.hvparams)
5932 for key, val in self.op.hvparams.iteritems():
5933 if val == constants.VALUE_DEFAULT:
5940 cluster = self.cfg.GetClusterInfo()
5941 utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5942 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5945 hypervisor.GetHypervisor(
5946 instance.hypervisor).CheckParameterSyntax(hv_new)
5947 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5948 self.hv_new = hv_new # the new actual values
5949 self.hv_inst = i_hvdict # the new dict (without defaults)
5951 self.hv_new = self.hv_inst = {}
5953 # beparams processing
5954 if self.op.beparams:
5955 i_bedict = copy.deepcopy(instance.beparams)
5956 for key, val in self.op.beparams.iteritems():
5957 if val == constants.VALUE_DEFAULT:
5964 cluster = self.cfg.GetClusterInfo()
5965 utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5966 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5968 self.be_new = be_new # the new actual values
5969 self.be_inst = i_bedict # the new dict (without defaults)
5971 self.be_new = self.be_inst = {}
5975 if constants.BE_MEMORY in self.op.beparams and not self.force:
5976 mem_check_list = [pnode]
5977 if be_new[constants.BE_AUTO_BALANCE]:
5978 # either we changed auto_balance to yes or it was from before
5979 mem_check_list.extend(instance.secondary_nodes)
5980 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5981 instance.hypervisor)
5982 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5983 instance.hypervisor)
5984 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5985 # Assume the primary node is unreachable and go ahead
5986 self.warn.append("Can't get info from primary node %s" % pnode)
5988 if not instance_info.failed and instance_info.data:
5989 current_mem = int(instance_info.data['memory'])
5991 # Assume instance not running
5992 # (there is a slight race condition here, but it's not very probable,
5993 # and we have no other way to check)
5995 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5996 nodeinfo[pnode].data['memory_free'])
5998 raise errors.OpPrereqError("This change will prevent the instance"
5999 " from starting, due to %d MB of memory"
6000 " missing on its primary node" % miss_mem)
6002 if be_new[constants.BE_AUTO_BALANCE]:
6003 for node, nres in nodeinfo.iteritems():
6004 if node not in instance.secondary_nodes:
6006 if nres.failed or not isinstance(nres.data, dict):
6007 self.warn.append("Can't get info from secondary node %s" % node)
6008 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6009 self.warn.append("Not enough memory to failover instance to"
6010 " secondary node %s" % node)
6013 for nic_op, nic_dict in self.op.nics:
6014 if nic_op == constants.DDM_REMOVE:
6015 if not instance.nics:
6016 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6018 if nic_op != constants.DDM_ADD:
6020 if nic_op < 0 or nic_op >= len(instance.nics):
6021 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6023 (nic_op, len(instance.nics)))
6024 if 'bridge' in nic_dict:
6025 nic_bridge = nic_dict['bridge']
6026 if nic_bridge is None:
6027 raise errors.OpPrereqError('Cannot set the nic bridge to None')
6028 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6029 msg = ("Bridge '%s' doesn't exist on one of"
6030 " the instance nodes" % nic_bridge)
6032 self.warn.append(msg)
6034 raise errors.OpPrereqError(msg)
6035 if 'mac' in nic_dict:
6036 nic_mac = nic_dict['mac']
6038 raise errors.OpPrereqError('Cannot set the nic mac to None')
6039 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6040 # otherwise generate the mac
6041 nic_dict['mac'] = self.cfg.GenerateMAC()
6043 # or validate/reserve the current one
6044 if self.cfg.IsMacInUse(nic_mac):
6045 raise errors.OpPrereqError("MAC address %s already in use"
6046 " in cluster" % nic_mac)
6049 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6050 raise errors.OpPrereqError("Disk operations not supported for"
6051 " diskless instances")
6052 for disk_op, disk_dict in self.op.disks:
6053 if disk_op == constants.DDM_REMOVE:
6054 if len(instance.disks) == 1:
6055 raise errors.OpPrereqError("Cannot remove the last disk of"
6057 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6058 ins_l = ins_l[pnode]
6059 if ins_l.failed or not isinstance(ins_l.data, list):
6060 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6061 if instance.name in ins_l.data:
6062 raise errors.OpPrereqError("Instance is running, can't remove"
6065 if (disk_op == constants.DDM_ADD and
6066 len(instance.nics) >= constants.MAX_DISKS):
6067 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6068 " add more" % constants.MAX_DISKS)
6069 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6071 if disk_op < 0 or disk_op >= len(instance.disks):
6072 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6074 (disk_op, len(instance.disks)))
6078 def Exec(self, feedback_fn):
6079 """Modifies an instance.
6081 All parameters take effect only at the next restart of the instance.
6084 # Process here the warnings from CheckPrereq, as we don't have a
6085 # feedback_fn there.
6086 for warn in self.warn:
6087 feedback_fn("WARNING: %s" % warn)
6090 instance = self.instance
6092 for disk_op, disk_dict in self.op.disks:
6093 if disk_op == constants.DDM_REMOVE:
6094 # remove the last disk
6095 device = instance.disks.pop()
6096 device_idx = len(instance.disks)
6097 for node, disk in device.ComputeNodeTree(instance.primary_node):
6098 self.cfg.SetDiskID(disk, node)
6099 msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6101 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6102 " continuing anyway", device_idx, node, msg)
6103 result.append(("disk/%d" % device_idx, "remove"))
6104 elif disk_op == constants.DDM_ADD:
6106 if instance.disk_template == constants.DT_FILE:
6107 file_driver, file_path = instance.disks[0].logical_id
6108 file_path = os.path.dirname(file_path)
6110 file_driver = file_path = None
6111 disk_idx_base = len(instance.disks)
6112 new_disk = _GenerateDiskTemplate(self,
6113 instance.disk_template,
6114 instance.name, instance.primary_node,
6115 instance.secondary_nodes,
6120 instance.disks.append(new_disk)
6121 info = _GetInstanceInfoText(instance)
6123 logging.info("Creating volume %s for instance %s",
6124 new_disk.iv_name, instance.name)
6125 # Note: this needs to be kept in sync with _CreateDisks
6127 for node in instance.all_nodes:
6128 f_create = node == instance.primary_node
6130 _CreateBlockDev(self, node, instance, new_disk,
6131 f_create, info, f_create)
6132 except errors.OpExecError, err:
6133 self.LogWarning("Failed to create volume %s (%s) on"
6135 new_disk.iv_name, new_disk, node, err)
6136 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6137 (new_disk.size, new_disk.mode)))
6139 # change a given disk
6140 instance.disks[disk_op].mode = disk_dict['mode']
6141 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6143 for nic_op, nic_dict in self.op.nics:
6144 if nic_op == constants.DDM_REMOVE:
6145 # remove the last nic
6146 del instance.nics[-1]
6147 result.append(("nic.%d" % len(instance.nics), "remove"))
6148 elif nic_op == constants.DDM_ADD:
6149 # mac and bridge should be set, by now
6150 mac = nic_dict['mac']
6151 bridge = nic_dict['bridge']
6152 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6154 instance.nics.append(new_nic)
6155 result.append(("nic.%d" % (len(instance.nics) - 1),
6156 "add:mac=%s,ip=%s,bridge=%s" %
6157 (new_nic.mac, new_nic.ip, new_nic.bridge)))
6159 # change a given nic
6160 for key in 'mac', 'ip', 'bridge':
6162 setattr(instance.nics[nic_op], key, nic_dict[key])
6163 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6166 if self.op.hvparams:
6167 instance.hvparams = self.hv_inst
6168 for key, val in self.op.hvparams.iteritems():
6169 result.append(("hv/%s" % key, val))
6172 if self.op.beparams:
6173 instance.beparams = self.be_inst
6174 for key, val in self.op.beparams.iteritems():
6175 result.append(("be/%s" % key, val))
6177 self.cfg.Update(instance)
6182 class LUQueryExports(NoHooksLU):
6183 """Query the exports list
6186 _OP_REQP = ['nodes']
6189 def ExpandNames(self):
6190 self.needed_locks = {}
6191 self.share_locks[locking.LEVEL_NODE] = 1
6192 if not self.op.nodes:
6193 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6195 self.needed_locks[locking.LEVEL_NODE] = \
6196 _GetWantedNodes(self, self.op.nodes)
6198 def CheckPrereq(self):
6199 """Check prerequisites.
6202 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6204 def Exec(self, feedback_fn):
6205 """Compute the list of all the exported system images.
6208 @return: a dictionary with the structure node->(export-list)
6209 where export-list is a list of the instances exported on
6213 rpcresult = self.rpc.call_export_list(self.nodes)
6215 for node in rpcresult:
6216 if rpcresult[node].failed:
6217 result[node] = False
6219 result[node] = rpcresult[node].data
6224 class LUExportInstance(LogicalUnit):
6225 """Export an instance to an image in the cluster.
6228 HPATH = "instance-export"
6229 HTYPE = constants.HTYPE_INSTANCE
6230 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6233 def ExpandNames(self):
6234 self._ExpandAndLockInstance()
6235 # FIXME: lock only instance primary and destination node
6237 # Sad but true, for now we have do lock all nodes, as we don't know where
6238 # the previous export might be, and and in this LU we search for it and
6239 # remove it from its current node. In the future we could fix this by:
6240 # - making a tasklet to search (share-lock all), then create the new one,
6241 # then one to remove, after
6242 # - removing the removal operation altoghether
6243 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6245 def DeclareLocks(self, level):
6246 """Last minute lock declaration."""
6247 # All nodes are locked anyway, so nothing to do here.
6249 def BuildHooksEnv(self):
6252 This will run on the master, primary node and target node.
6256 "EXPORT_NODE": self.op.target_node,
6257 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6259 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6260 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6261 self.op.target_node]
6264 def CheckPrereq(self):
6265 """Check prerequisites.
6267 This checks that the instance and node names are valid.
6270 instance_name = self.op.instance_name
6271 self.instance = self.cfg.GetInstanceInfo(instance_name)
6272 assert self.instance is not None, \
6273 "Cannot retrieve locked instance %s" % self.op.instance_name
6274 _CheckNodeOnline(self, self.instance.primary_node)
6276 self.dst_node = self.cfg.GetNodeInfo(
6277 self.cfg.ExpandNodeName(self.op.target_node))
6279 if self.dst_node is None:
6280 # This is wrong node name, not a non-locked node
6281 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6282 _CheckNodeOnline(self, self.dst_node.name)
6283 _CheckNodeNotDrained(self, self.dst_node.name)
6285 # instance disk type verification
6286 for disk in self.instance.disks:
6287 if disk.dev_type == constants.LD_FILE:
6288 raise errors.OpPrereqError("Export not supported for instances with"
6289 " file-based disks")
6291 def Exec(self, feedback_fn):
6292 """Export an instance to an image in the cluster.
6295 instance = self.instance
6296 dst_node = self.dst_node
6297 src_node = instance.primary_node
6298 if self.op.shutdown:
6299 # shutdown the instance, but not the disks
6300 result = self.rpc.call_instance_shutdown(src_node, instance)
6301 msg = result.RemoteFailMsg()
6303 raise errors.OpExecError("Could not shutdown instance %s on"
6305 (instance.name, src_node, msg))
6307 vgname = self.cfg.GetVGName()
6311 # set the disks ID correctly since call_instance_start needs the
6312 # correct drbd minor to create the symlinks
6313 for disk in instance.disks:
6314 self.cfg.SetDiskID(disk, src_node)
6317 for idx, disk in enumerate(instance.disks):
6318 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6319 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6320 if new_dev_name.failed or not new_dev_name.data:
6321 self.LogWarning("Could not snapshot disk/%d on node %s",
6323 snap_disks.append(False)
6325 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6326 logical_id=(vgname, new_dev_name.data),
6327 physical_id=(vgname, new_dev_name.data),
6328 iv_name=disk.iv_name)
6329 snap_disks.append(new_dev)
6332 if self.op.shutdown and instance.admin_up:
6333 result = self.rpc.call_instance_start(src_node, instance, None, None)
6334 msg = result.RemoteFailMsg()
6336 _ShutdownInstanceDisks(self, instance)
6337 raise errors.OpExecError("Could not start instance: %s" % msg)
6339 # TODO: check for size
6341 cluster_name = self.cfg.GetClusterName()
6342 for idx, dev in enumerate(snap_disks):
6344 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6345 instance, cluster_name, idx)
6346 if result.failed or not result.data:
6347 self.LogWarning("Could not export disk/%d from node %s to"
6348 " node %s", idx, src_node, dst_node.name)
6349 msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6351 self.LogWarning("Could not remove snapshot for disk/%d from node"
6352 " %s: %s", idx, src_node, msg)
6354 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6355 if result.failed or not result.data:
6356 self.LogWarning("Could not finalize export for instance %s on node %s",
6357 instance.name, dst_node.name)
6359 nodelist = self.cfg.GetNodeList()
6360 nodelist.remove(dst_node.name)
6362 # on one-node clusters nodelist will be empty after the removal
6363 # if we proceed the backup would be removed because OpQueryExports
6364 # substitutes an empty list with the full cluster node list.
6366 exportlist = self.rpc.call_export_list(nodelist)
6367 for node in exportlist:
6368 if exportlist[node].failed:
6370 if instance.name in exportlist[node].data:
6371 if not self.rpc.call_export_remove(node, instance.name):
6372 self.LogWarning("Could not remove older export for instance %s"
6373 " on node %s", instance.name, node)
6376 class LURemoveExport(NoHooksLU):
6377 """Remove exports related to the named instance.
6380 _OP_REQP = ["instance_name"]
6383 def ExpandNames(self):
6384 self.needed_locks = {}
6385 # We need all nodes to be locked in order for RemoveExport to work, but we
6386 # don't need to lock the instance itself, as nothing will happen to it (and
6387 # we can remove exports also for a removed instance)
6388 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6390 def CheckPrereq(self):
6391 """Check prerequisites.
6395 def Exec(self, feedback_fn):
6396 """Remove any export.
6399 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6400 # If the instance was not found we'll try with the name that was passed in.
6401 # This will only work if it was an FQDN, though.
6403 if not instance_name:
6405 instance_name = self.op.instance_name
6407 exportlist = self.rpc.call_export_list(self.acquired_locks[
6408 locking.LEVEL_NODE])
6410 for node in exportlist:
6411 if exportlist[node].failed:
6412 self.LogWarning("Failed to query node %s, continuing" % node)
6414 if instance_name in exportlist[node].data:
6416 result = self.rpc.call_export_remove(node, instance_name)
6417 if result.failed or not result.data:
6418 logging.error("Could not remove export for instance %s"
6419 " on node %s", instance_name, node)
6421 if fqdn_warn and not found:
6422 feedback_fn("Export not found. If trying to remove an export belonging"
6423 " to a deleted instance please use its Fully Qualified"
6427 class TagsLU(NoHooksLU):
6430 This is an abstract class which is the parent of all the other tags LUs.
6434 def ExpandNames(self):
6435 self.needed_locks = {}
6436 if self.op.kind == constants.TAG_NODE:
6437 name = self.cfg.ExpandNodeName(self.op.name)
6439 raise errors.OpPrereqError("Invalid node name (%s)" %
6442 self.needed_locks[locking.LEVEL_NODE] = name
6443 elif self.op.kind == constants.TAG_INSTANCE:
6444 name = self.cfg.ExpandInstanceName(self.op.name)
6446 raise errors.OpPrereqError("Invalid instance name (%s)" %
6449 self.needed_locks[locking.LEVEL_INSTANCE] = name
6451 def CheckPrereq(self):
6452 """Check prerequisites.
6455 if self.op.kind == constants.TAG_CLUSTER:
6456 self.target = self.cfg.GetClusterInfo()
6457 elif self.op.kind == constants.TAG_NODE:
6458 self.target = self.cfg.GetNodeInfo(self.op.name)
6459 elif self.op.kind == constants.TAG_INSTANCE:
6460 self.target = self.cfg.GetInstanceInfo(self.op.name)
6462 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6466 class LUGetTags(TagsLU):
6467 """Returns the tags of a given object.
6470 _OP_REQP = ["kind", "name"]
6473 def Exec(self, feedback_fn):
6474 """Returns the tag list.
6477 return list(self.target.GetTags())
6480 class LUSearchTags(NoHooksLU):
6481 """Searches the tags for a given pattern.
6484 _OP_REQP = ["pattern"]
6487 def ExpandNames(self):
6488 self.needed_locks = {}
6490 def CheckPrereq(self):
6491 """Check prerequisites.
6493 This checks the pattern passed for validity by compiling it.
6497 self.re = re.compile(self.op.pattern)
6498 except re.error, err:
6499 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6500 (self.op.pattern, err))
6502 def Exec(self, feedback_fn):
6503 """Returns the tag list.
6507 tgts = [("/cluster", cfg.GetClusterInfo())]
6508 ilist = cfg.GetAllInstancesInfo().values()
6509 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6510 nlist = cfg.GetAllNodesInfo().values()
6511 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6513 for path, target in tgts:
6514 for tag in target.GetTags():
6515 if self.re.search(tag):
6516 results.append((path, tag))
6520 class LUAddTags(TagsLU):
6521 """Sets a tag on a given object.
6524 _OP_REQP = ["kind", "name", "tags"]
6527 def CheckPrereq(self):
6528 """Check prerequisites.
6530 This checks the type and length of the tag name and value.
6533 TagsLU.CheckPrereq(self)
6534 for tag in self.op.tags:
6535 objects.TaggableObject.ValidateTag(tag)
6537 def Exec(self, feedback_fn):
6542 for tag in self.op.tags:
6543 self.target.AddTag(tag)
6544 except errors.TagError, err:
6545 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6547 self.cfg.Update(self.target)
6548 except errors.ConfigurationError:
6549 raise errors.OpRetryError("There has been a modification to the"
6550 " config file and the operation has been"
6551 " aborted. Please retry.")
6554 class LUDelTags(TagsLU):
6555 """Delete a list of tags from a given object.
6558 _OP_REQP = ["kind", "name", "tags"]
6561 def CheckPrereq(self):
6562 """Check prerequisites.
6564 This checks that we have the given tag.
6567 TagsLU.CheckPrereq(self)
6568 for tag in self.op.tags:
6569 objects.TaggableObject.ValidateTag(tag)
6570 del_tags = frozenset(self.op.tags)
6571 cur_tags = self.target.GetTags()
6572 if not del_tags <= cur_tags:
6573 diff_tags = del_tags - cur_tags
6574 diff_names = ["'%s'" % tag for tag in diff_tags]
6576 raise errors.OpPrereqError("Tag(s) %s not found" %
6577 (",".join(diff_names)))
6579 def Exec(self, feedback_fn):
6580 """Remove the tag from the object.
6583 for tag in self.op.tags:
6584 self.target.RemoveTag(tag)
6586 self.cfg.Update(self.target)
6587 except errors.ConfigurationError:
6588 raise errors.OpRetryError("There has been a modification to the"
6589 " config file and the operation has been"
6590 " aborted. Please retry.")
6593 class LUTestDelay(NoHooksLU):
6594 """Sleep for a specified amount of time.
6596 This LU sleeps on the master and/or nodes for a specified amount of
6600 _OP_REQP = ["duration", "on_master", "on_nodes"]
6603 def ExpandNames(self):
6604 """Expand names and set required locks.
6606 This expands the node list, if any.
6609 self.needed_locks = {}
6610 if self.op.on_nodes:
6611 # _GetWantedNodes can be used here, but is not always appropriate to use
6612 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6614 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6615 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6617 def CheckPrereq(self):
6618 """Check prerequisites.
6622 def Exec(self, feedback_fn):
6623 """Do the actual sleep.
6626 if self.op.on_master:
6627 if not utils.TestDelay(self.op.duration):
6628 raise errors.OpExecError("Error during master delay test")
6629 if self.op.on_nodes:
6630 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6632 raise errors.OpExecError("Complete failure from rpc call")
6633 for node, node_result in result.items():
6635 if not node_result.data:
6636 raise errors.OpExecError("Failure during rpc call to node %s,"
6637 " result: %s" % (node, node_result.data))
6640 class IAllocator(object):
6641 """IAllocator framework.
6643 An IAllocator instance has three sets of attributes:
6644 - cfg that is needed to query the cluster
6645 - input data (all members of the _KEYS class attribute are required)
6646 - four buffer attributes (in|out_data|text), that represent the
6647 input (to the external script) in text and data structure format,
6648 and the output from it, again in two formats
6649 - the result variables from the script (success, info, nodes) for
6654 "mem_size", "disks", "disk_template",
6655 "os", "tags", "nics", "vcpus", "hypervisor",
6661 def __init__(self, lu, mode, name, **kwargs):
6663 # init buffer variables
6664 self.in_text = self.out_text = self.in_data = self.out_data = None
6665 # init all input fields so that pylint is happy
6668 self.mem_size = self.disks = self.disk_template = None
6669 self.os = self.tags = self.nics = self.vcpus = None
6670 self.hypervisor = None
6671 self.relocate_from = None
6673 self.required_nodes = None
6674 # init result fields
6675 self.success = self.info = self.nodes = None
6676 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6677 keyset = self._ALLO_KEYS
6678 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6679 keyset = self._RELO_KEYS
6681 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6682 " IAllocator" % self.mode)
6684 if key not in keyset:
6685 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6686 " IAllocator" % key)
6687 setattr(self, key, kwargs[key])
6689 if key not in kwargs:
6690 raise errors.ProgrammerError("Missing input parameter '%s' to"
6691 " IAllocator" % key)
6692 self._BuildInputData()
6694 def _ComputeClusterData(self):
6695 """Compute the generic allocator input data.
6697 This is the data that is independent of the actual operation.
6701 cluster_info = cfg.GetClusterInfo()
6704 "version": constants.IALLOCATOR_VERSION,
6705 "cluster_name": cfg.GetClusterName(),
6706 "cluster_tags": list(cluster_info.GetTags()),
6707 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6708 # we don't have job IDs
6710 iinfo = cfg.GetAllInstancesInfo().values()
6711 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6715 node_list = cfg.GetNodeList()
6717 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6718 hypervisor_name = self.hypervisor
6719 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6720 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6722 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6724 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6725 cluster_info.enabled_hypervisors)
6726 for nname, nresult in node_data.items():
6727 # first fill in static (config-based) values
6728 ninfo = cfg.GetNodeInfo(nname)
6730 "tags": list(ninfo.GetTags()),
6731 "primary_ip": ninfo.primary_ip,
6732 "secondary_ip": ninfo.secondary_ip,
6733 "offline": ninfo.offline,
6734 "drained": ninfo.drained,
6735 "master_candidate": ninfo.master_candidate,
6738 if not ninfo.offline:
6740 if not isinstance(nresult.data, dict):
6741 raise errors.OpExecError("Can't get data for node %s" % nname)
6742 remote_info = nresult.data
6743 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6744 'vg_size', 'vg_free', 'cpu_total']:
6745 if attr not in remote_info:
6746 raise errors.OpExecError("Node '%s' didn't return attribute"
6747 " '%s'" % (nname, attr))
6749 remote_info[attr] = int(remote_info[attr])
6750 except ValueError, err:
6751 raise errors.OpExecError("Node '%s' returned invalid value"
6752 " for '%s': %s" % (nname, attr, err))
6753 # compute memory used by primary instances
6754 i_p_mem = i_p_up_mem = 0
6755 for iinfo, beinfo in i_list:
6756 if iinfo.primary_node == nname:
6757 i_p_mem += beinfo[constants.BE_MEMORY]
6758 if iinfo.name not in node_iinfo[nname].data:
6761 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6762 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6763 remote_info['memory_free'] -= max(0, i_mem_diff)
6766 i_p_up_mem += beinfo[constants.BE_MEMORY]
6768 # compute memory used by instances
6770 "total_memory": remote_info['memory_total'],
6771 "reserved_memory": remote_info['memory_dom0'],
6772 "free_memory": remote_info['memory_free'],
6773 "total_disk": remote_info['vg_size'],
6774 "free_disk": remote_info['vg_free'],
6775 "total_cpus": remote_info['cpu_total'],
6776 "i_pri_memory": i_p_mem,
6777 "i_pri_up_memory": i_p_up_mem,
6781 node_results[nname] = pnr
6782 data["nodes"] = node_results
6786 for iinfo, beinfo in i_list:
6787 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6788 for n in iinfo.nics]
6790 "tags": list(iinfo.GetTags()),
6791 "admin_up": iinfo.admin_up,
6792 "vcpus": beinfo[constants.BE_VCPUS],
6793 "memory": beinfo[constants.BE_MEMORY],
6795 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6797 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6798 "disk_template": iinfo.disk_template,
6799 "hypervisor": iinfo.hypervisor,
6801 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6803 instance_data[iinfo.name] = pir
6805 data["instances"] = instance_data
6809 def _AddNewInstance(self):
6810 """Add new instance data to allocator structure.
6812 This in combination with _AllocatorGetClusterData will create the
6813 correct structure needed as input for the allocator.
6815 The checks for the completeness of the opcode must have already been
6821 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6823 if self.disk_template in constants.DTS_NET_MIRROR:
6824 self.required_nodes = 2
6826 self.required_nodes = 1
6830 "disk_template": self.disk_template,
6833 "vcpus": self.vcpus,
6834 "memory": self.mem_size,
6835 "disks": self.disks,
6836 "disk_space_total": disk_space,
6838 "required_nodes": self.required_nodes,
6840 data["request"] = request
6842 def _AddRelocateInstance(self):
6843 """Add relocate instance data to allocator structure.
6845 This in combination with _IAllocatorGetClusterData will create the
6846 correct structure needed as input for the allocator.
6848 The checks for the completeness of the opcode must have already been
6852 instance = self.lu.cfg.GetInstanceInfo(self.name)
6853 if instance is None:
6854 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6855 " IAllocator" % self.name)
6857 if instance.disk_template not in constants.DTS_NET_MIRROR:
6858 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6860 if len(instance.secondary_nodes) != 1:
6861 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6863 self.required_nodes = 1
6864 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6865 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6870 "disk_space_total": disk_space,
6871 "required_nodes": self.required_nodes,
6872 "relocate_from": self.relocate_from,
6874 self.in_data["request"] = request
6876 def _BuildInputData(self):
6877 """Build input data structures.
6880 self._ComputeClusterData()
6882 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6883 self._AddNewInstance()
6885 self._AddRelocateInstance()
6887 self.in_text = serializer.Dump(self.in_data)
6889 def Run(self, name, validate=True, call_fn=None):
6890 """Run an instance allocator and return the results.
6894 call_fn = self.lu.rpc.call_iallocator_runner
6897 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6900 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6901 raise errors.OpExecError("Invalid result from master iallocator runner")
6903 rcode, stdout, stderr, fail = result.data
6905 if rcode == constants.IARUN_NOTFOUND:
6906 raise errors.OpExecError("Can't find allocator '%s'" % name)
6907 elif rcode == constants.IARUN_FAILURE:
6908 raise errors.OpExecError("Instance allocator call failed: %s,"
6909 " output: %s" % (fail, stdout+stderr))
6910 self.out_text = stdout
6912 self._ValidateResult()
6914 def _ValidateResult(self):
6915 """Process the allocator results.
6917 This will process and if successful save the result in
6918 self.out_data and the other parameters.
6922 rdict = serializer.Load(self.out_text)
6923 except Exception, err:
6924 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6926 if not isinstance(rdict, dict):
6927 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6929 for key in "success", "info", "nodes":
6930 if key not in rdict:
6931 raise errors.OpExecError("Can't parse iallocator results:"
6932 " missing key '%s'" % key)
6933 setattr(self, key, rdict[key])
6935 if not isinstance(rdict["nodes"], list):
6936 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6938 self.out_data = rdict
6941 class LUTestAllocator(NoHooksLU):
6942 """Run allocator tests.
6944 This LU runs the allocator tests
6947 _OP_REQP = ["direction", "mode", "name"]
6949 def CheckPrereq(self):
6950 """Check prerequisites.
6952 This checks the opcode parameters depending on the director and mode test.
6955 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6956 for attr in ["name", "mem_size", "disks", "disk_template",
6957 "os", "tags", "nics", "vcpus"]:
6958 if not hasattr(self.op, attr):
6959 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6961 iname = self.cfg.ExpandInstanceName(self.op.name)
6962 if iname is not None:
6963 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6965 if not isinstance(self.op.nics, list):
6966 raise errors.OpPrereqError("Invalid parameter 'nics'")
6967 for row in self.op.nics:
6968 if (not isinstance(row, dict) or
6971 "bridge" not in row):
6972 raise errors.OpPrereqError("Invalid contents of the"
6973 " 'nics' parameter")
6974 if not isinstance(self.op.disks, list):
6975 raise errors.OpPrereqError("Invalid parameter 'disks'")
6976 for row in self.op.disks:
6977 if (not isinstance(row, dict) or
6978 "size" not in row or
6979 not isinstance(row["size"], int) or
6980 "mode" not in row or
6981 row["mode"] not in ['r', 'w']):
6982 raise errors.OpPrereqError("Invalid contents of the"
6983 " 'disks' parameter")
6984 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6985 self.op.hypervisor = self.cfg.GetHypervisorType()
6986 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6987 if not hasattr(self.op, "name"):
6988 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6989 fname = self.cfg.ExpandInstanceName(self.op.name)
6991 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6993 self.op.name = fname
6994 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6996 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6999 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7000 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7001 raise errors.OpPrereqError("Missing allocator name")
7002 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7003 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7006 def Exec(self, feedback_fn):
7007 """Run the allocator test.
7010 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7011 ial = IAllocator(self,
7014 mem_size=self.op.mem_size,
7015 disks=self.op.disks,
7016 disk_template=self.op.disk_template,
7020 vcpus=self.op.vcpus,
7021 hypervisor=self.op.hypervisor,
7024 ial = IAllocator(self,
7027 relocate_from=list(self.relocate_from),
7030 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7031 result = ial.in_text
7033 ial.Run(self.op.allocator, validate=False)
7034 result = ial.out_text