4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 for attr_name in self._OP_REQP:
94 attr_val = getattr(op, attr_name, None)
96 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108 ssh = property(fget=__GetSSH)
110 def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments.
113 This method is for doing a simple syntactic check and ensure
114 validity of opcode parameters, without any cluster-related
115 checks. While the same can be accomplished in ExpandNames and/or
116 CheckPrereq, doing these separate is better because:
118 - ExpandNames is left as as purely a lock-related function
119 - CheckPrereq is run after we have aquired locks (and possible
122 The function is allowed to change the self.op attribute so that
123 later methods can no longer worry about missing parameters.
128 def ExpandNames(self):
129 """Expand names for this LU.
131 This method is called before starting to execute the opcode, and it should
132 update all the parameters of the opcode to their canonical form (e.g. a
133 short node name must be fully expanded after this method has successfully
134 completed). This way locking, hooks, logging, ecc. can work correctly.
136 LUs which implement this method must also populate the self.needed_locks
137 member, as a dict with lock levels as keys, and a list of needed lock names
140 - use an empty dict if you don't need any lock
141 - if you don't need any lock at a particular level omit that level
142 - don't put anything for the BGL level
143 - if you want all locks at a level use locking.ALL_SET as a value
145 If you need to share locks (rather than acquire them exclusively) at one
146 level you can modify self.share_locks, setting a true value (usually 1) for
147 that level. By default locks are not shared.
151 # Acquire all nodes and one instance
152 self.needed_locks = {
153 locking.LEVEL_NODE: locking.ALL_SET,
154 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156 # Acquire just two nodes
157 self.needed_locks = {
158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161 self.needed_locks = {} # No, you can't leave it to the default value None
164 # The implementation of this method is mandatory only if the new LU is
165 # concurrent, so that old LUs don't need to be changed all at the same
168 self.needed_locks = {} # Exclusive LUs don't need locks.
170 raise NotImplementedError
172 def DeclareLocks(self, level):
173 """Declare LU locking needs for a level
175 While most LUs can just declare their locking needs at ExpandNames time,
176 sometimes there's the need to calculate some locks after having acquired
177 the ones before. This function is called just before acquiring locks at a
178 particular level, but after acquiring the ones at lower levels, and permits
179 such calculations. It can be used to modify self.needed_locks, and by
180 default it does nothing.
182 This function is only called if you have something already set in
183 self.needed_locks for the level.
185 @param level: Locking level which is going to be locked
186 @type level: member of ganeti.locking.LEVELS
190 def CheckPrereq(self):
191 """Check prerequisites for this LU.
193 This method should check that the prerequisites for the execution
194 of this LU are fulfilled. It can do internode communication, but
195 it should be idempotent - no cluster or system changes are
198 The method should raise errors.OpPrereqError in case something is
199 not fulfilled. Its return value is ignored.
201 This method should also update all the parameters of the opcode to
202 their canonical form if it hasn't been done by ExpandNames before.
205 raise NotImplementedError
207 def Exec(self, feedback_fn):
210 This method should implement the actual work. It should raise
211 errors.OpExecError for failures that are somewhat dealt with in
215 raise NotImplementedError
217 def BuildHooksEnv(self):
218 """Build hooks environment for this LU.
220 This method should return a three-node tuple consisting of: a dict
221 containing the environment that will be used for running the
222 specific hook for this LU, a list of node names on which the hook
223 should run before the execution, and a list of node names on which
224 the hook should run after the execution.
226 The keys of the dict must not have 'GANETI_' prefixed as this will
227 be handled in the hooks runner. Also note additional keys will be
228 added by the hooks runner. If the LU doesn't define any
229 environment, an empty dict (and not None) should be returned.
231 No nodes should be returned as an empty list (and not None).
233 Note that if the HPATH for a LU class is None, this function will
237 raise NotImplementedError
239 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks.
242 This method is called every time a hooks phase is executed, and notifies
243 the Logical Unit about the hooks' result. The LU can then use it to alter
244 its result based on the hooks. By default the method does nothing and the
245 previous result is passed back unchanged but any LU can define it if it
246 wants to use the local cluster hook-scripts somehow.
248 @param phase: one of L{constants.HOOKS_PHASE_POST} or
249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250 @param hook_results: the results of the multi-node hooks rpc call
251 @param feedback_fn: function used send feedback back to the caller
252 @param lu_result: the previous Exec result this LU had, or None
254 @return: the new Exec result, based on the previous result
260 def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance.
263 Many LUs that work on an instance take its name in self.op.instance_name
264 and need to expand it and then declare the expanded name for locking. This
265 function does it, and then updates self.op.instance_name to the expanded
266 name. It also initializes needed_locks as a dict, if this hasn't been done
270 if self.needed_locks is None:
271 self.needed_locks = {}
273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274 "_ExpandAndLockInstance called with instance-level locks set"
275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276 if expanded_name is None:
277 raise errors.OpPrereqError("Instance '%s' not known" %
278 self.op.instance_name)
279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280 self.op.instance_name = expanded_name
282 def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking.
285 This function should be called after locking one or more instances to lock
286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287 with all primary or secondary nodes for instances already locked and
288 present in self.needed_locks[locking.LEVEL_INSTANCE].
290 It should be called from DeclareLocks, and for safety only works if
291 self.recalculate_locks[locking.LEVEL_NODE] is set.
293 In the future it may grow parameters to just lock some instance's nodes, or
294 to just lock primaries or secondary nodes, if needed.
296 If should be called in DeclareLocks in a way similar to::
298 if level == locking.LEVEL_NODE:
299 self._LockInstancesNodes()
301 @type primary_only: boolean
302 @param primary_only: only lock primary nodes of locked instances
305 assert locking.LEVEL_NODE in self.recalculate_locks, \
306 "_LockInstancesNodes helper function called with no nodes to recalculate"
308 # TODO: check if we're really been called with the instance locks held
310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311 # future we might want to have different behaviors depending on the value
312 # of self.recalculate_locks[locking.LEVEL_NODE]
314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315 instance = self.context.cfg.GetInstanceInfo(instance_name)
316 wanted_nodes.append(instance.primary_node)
318 wanted_nodes.extend(instance.secondary_nodes)
320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325 del self.recalculate_locks[locking.LEVEL_NODE]
328 class NoHooksLU(LogicalUnit):
329 """Simple LU which runs no hooks.
331 This LU is intended as a parent for other LogicalUnits which will
332 run no hooks, in order to reduce duplicate code.
339 def _GetWantedNodes(lu, nodes):
340 """Returns list of checked and expanded node names.
342 @type lu: L{LogicalUnit}
343 @param lu: the logical unit on whose behalf we execute
345 @param nodes: list of node names or None for all nodes
347 @return: the list of nodes, sorted
348 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351 if not isinstance(nodes, list):
352 raise errors.OpPrereqError("Invalid argument type 'nodes'")
355 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356 " non-empty list of nodes whose name is to be expanded.")
360 node = lu.cfg.ExpandNodeName(name)
362 raise errors.OpPrereqError("No such node name '%s'" % name)
365 return utils.NiceSort(wanted)
368 def _GetWantedInstances(lu, instances):
369 """Returns list of checked and expanded instance names.
371 @type lu: L{LogicalUnit}
372 @param lu: the logical unit on whose behalf we execute
373 @type instances: list
374 @param instances: list of instance names or None for all instances
376 @return: the list of instances, sorted
377 @raise errors.OpPrereqError: if the instances parameter is wrong type
378 @raise errors.OpPrereqError: if any of the passed instances is not found
381 if not isinstance(instances, list):
382 raise errors.OpPrereqError("Invalid argument type 'instances'")
387 for name in instances:
388 instance = lu.cfg.ExpandInstanceName(name)
390 raise errors.OpPrereqError("No such instance name '%s'" % name)
391 wanted.append(instance)
394 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398 def _CheckOutputFields(static, dynamic, selected):
399 """Checks whether all selected fields are valid.
401 @type static: L{utils.FieldSet}
402 @param static: static fields set
403 @type dynamic: L{utils.FieldSet}
404 @param dynamic: dynamic fields set
411 delta = f.NonMatching(selected)
413 raise errors.OpPrereqError("Unknown output fields selected: %s"
417 def _CheckBooleanOpField(op, name):
418 """Validates boolean opcode parameters.
420 This will ensure that an opcode parameter is either a boolean value,
421 or None (but that it always exists).
424 val = getattr(op, name, None)
425 if not (val is None or isinstance(val, bool)):
426 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428 setattr(op, name, val)
431 def _CheckNodeOnline(lu, node):
432 """Ensure that a given node is online.
434 @param lu: the LU on behalf of which we make the check
435 @param node: the node to check
436 @raise errors.OpPrereqError: if the node is offline
439 if lu.cfg.GetNodeInfo(node).offline:
440 raise errors.OpPrereqError("Can't use offline node %s" % node)
443 def _CheckNodeNotDrained(lu, node):
444 """Ensure that a given node is not drained.
446 @param lu: the LU on behalf of which we make the check
447 @param node: the node to check
448 @raise errors.OpPrereqError: if the node is drained
451 if lu.cfg.GetNodeInfo(node).drained:
452 raise errors.OpPrereqError("Can't use drained node %s" % node)
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456 memory, vcpus, nics, disk_template, disks,
457 bep, hvp, hypervisor):
458 """Builds instance related env variables for hooks
460 This builds the hook environment from individual variables.
463 @param name: the name of the instance
464 @type primary_node: string
465 @param primary_node: the name of the instance's primary node
466 @type secondary_nodes: list
467 @param secondary_nodes: list of secondary nodes as strings
468 @type os_type: string
469 @param os_type: the name of the instance's OS
470 @type status: boolean
471 @param status: the should_run status of the instance
473 @param memory: the memory size of the instance
475 @param vcpus: the count of VCPUs the instance has
477 @param nics: list of tuples (ip, bridge, mac) representing
478 the NICs the instance has
479 @type disk_template: string
480 @param disk_template: the distk template of the instance
482 @param disks: the list of (size, mode) pairs
484 @param bep: the backend parameters for the instance
486 @param hvp: the hypervisor parameters for the instance
487 @type hypervisor: string
488 @param hypervisor: the hypervisor for the instance
490 @return: the hook environment for this instance
499 "INSTANCE_NAME": name,
500 "INSTANCE_PRIMARY": primary_node,
501 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502 "INSTANCE_OS_TYPE": os_type,
503 "INSTANCE_STATUS": str_status,
504 "INSTANCE_MEMORY": memory,
505 "INSTANCE_VCPUS": vcpus,
506 "INSTANCE_DISK_TEMPLATE": disk_template,
507 "INSTANCE_HYPERVISOR": hypervisor,
511 nic_count = len(nics)
512 for idx, (ip, bridge, mac) in enumerate(nics):
515 env["INSTANCE_NIC%d_IP" % idx] = ip
516 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517 env["INSTANCE_NIC%d_MAC" % idx] = mac
521 env["INSTANCE_NIC_COUNT"] = nic_count
524 disk_count = len(disks)
525 for idx, (size, mode) in enumerate(disks):
526 env["INSTANCE_DISK%d_SIZE" % idx] = size
527 env["INSTANCE_DISK%d_MODE" % idx] = mode
531 env["INSTANCE_DISK_COUNT"] = disk_count
533 for source, kind in [(bep, "BE"), (hvp, "HV")]:
534 for key, value in source.items():
535 env["INSTANCE_%s_%s" % (kind, key)] = value
540 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541 """Builds instance related env variables for hooks from an object.
543 @type lu: L{LogicalUnit}
544 @param lu: the logical unit on whose behalf we execute
545 @type instance: L{objects.Instance}
546 @param instance: the instance for which we should build the
549 @param override: dictionary with key/values that will override
552 @return: the hook environment dictionary
555 cluster = lu.cfg.GetClusterInfo()
556 bep = cluster.FillBE(instance)
557 hvp = cluster.FillHV(instance)
559 'name': instance.name,
560 'primary_node': instance.primary_node,
561 'secondary_nodes': instance.secondary_nodes,
562 'os_type': instance.os,
563 'status': instance.admin_up,
564 'memory': bep[constants.BE_MEMORY],
565 'vcpus': bep[constants.BE_VCPUS],
566 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567 'disk_template': instance.disk_template,
568 'disks': [(disk.size, disk.mode) for disk in instance.disks],
571 'hypervisor': instance.hypervisor,
574 args.update(override)
575 return _BuildInstanceHookEnv(**args)
578 def _AdjustCandidatePool(lu):
579 """Adjust the candidate pool after node operations.
582 mod_list = lu.cfg.MaintainCandidatePool()
584 lu.LogInfo("Promoted nodes to master candidate role: %s",
585 ", ".join(node.name for node in mod_list))
586 for name in mod_list:
587 lu.context.ReaddNode(name)
588 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
590 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
594 def _CheckInstanceBridgesExist(lu, instance):
595 """Check that the brigdes needed by an instance exist.
598 # check bridges existance
599 brlist = [nic.bridge for nic in instance.nics]
600 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
603 raise errors.OpPrereqError("One or more target bridges %s does not"
604 " exist on destination node '%s'" %
605 (brlist, instance.primary_node))
608 class LUDestroyCluster(NoHooksLU):
609 """Logical unit for destroying the cluster.
614 def CheckPrereq(self):
615 """Check prerequisites.
617 This checks whether the cluster is empty.
619 Any errors are signalled by raising errors.OpPrereqError.
622 master = self.cfg.GetMasterNode()
624 nodelist = self.cfg.GetNodeList()
625 if len(nodelist) != 1 or nodelist[0] != master:
626 raise errors.OpPrereqError("There are still %d node(s) in"
627 " this cluster." % (len(nodelist) - 1))
628 instancelist = self.cfg.GetInstanceList()
630 raise errors.OpPrereqError("There are still %d instance(s) in"
631 " this cluster." % len(instancelist))
633 def Exec(self, feedback_fn):
634 """Destroys the cluster.
637 master = self.cfg.GetMasterNode()
638 result = self.rpc.call_node_stop_master(master, False)
641 raise errors.OpExecError("Could not disable the master role")
642 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643 utils.CreateBackup(priv_key)
644 utils.CreateBackup(pub_key)
648 class LUVerifyCluster(LogicalUnit):
649 """Verifies the cluster status.
652 HPATH = "cluster-verify"
653 HTYPE = constants.HTYPE_CLUSTER
654 _OP_REQP = ["skip_checks"]
657 def ExpandNames(self):
658 self.needed_locks = {
659 locking.LEVEL_NODE: locking.ALL_SET,
660 locking.LEVEL_INSTANCE: locking.ALL_SET,
662 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
664 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665 node_result, feedback_fn, master_files,
667 """Run multiple tests against a node.
671 - compares ganeti version
672 - checks vg existance and size > 20G
673 - checks config file checksum
674 - checks ssh to other nodes
676 @type nodeinfo: L{objects.Node}
677 @param nodeinfo: the node to check
678 @param file_list: required list of files
679 @param local_cksum: dictionary of local files and their checksums
680 @param node_result: the results from the node
681 @param feedback_fn: function used to accumulate results
682 @param master_files: list of files that only masters should have
683 @param drbd_map: the useddrbd minors for this node, in
684 form of minor: (instance, must_exist) which correspond to instances
685 and their running status
686 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
691 # main result, node_result should be a non-empty dict
692 if not node_result or not isinstance(node_result, dict):
693 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
696 # compares ganeti version
697 local_version = constants.PROTOCOL_VERSION
698 remote_version = node_result.get('version', None)
699 if not (remote_version and isinstance(remote_version, (list, tuple)) and
700 len(remote_version) == 2):
701 feedback_fn(" - ERROR: connection to %s failed" % (node))
704 if local_version != remote_version[0]:
705 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
706 " node %s %s" % (local_version, node, remote_version[0]))
709 # node seems compatible, we can actually try to look into its results
713 # full package version
714 if constants.RELEASE_VERSION != remote_version[1]:
715 feedback_fn(" - WARNING: software version mismatch: master %s,"
717 (constants.RELEASE_VERSION, node, remote_version[1]))
719 # checks vg existence and size > 20G
720 if vg_name is not None:
721 vglist = node_result.get(constants.NV_VGLIST, None)
723 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
727 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728 constants.MIN_VG_SIZE)
730 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
733 # checks config file checksum
735 remote_cksum = node_result.get(constants.NV_FILELIST, None)
736 if not isinstance(remote_cksum, dict):
738 feedback_fn(" - ERROR: node hasn't returned file checksum data")
740 for file_name in file_list:
741 node_is_mc = nodeinfo.master_candidate
742 must_have_file = file_name not in master_files
743 if file_name not in remote_cksum:
744 if node_is_mc or must_have_file:
746 feedback_fn(" - ERROR: file '%s' missing" % file_name)
747 elif remote_cksum[file_name] != local_cksum[file_name]:
748 if node_is_mc or must_have_file:
750 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
752 # not candidate and this is not a must-have file
754 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
757 # all good, except non-master/non-must have combination
758 if not node_is_mc and not must_have_file:
759 feedback_fn(" - ERROR: file '%s' should not exist on non master"
760 " candidates" % file_name)
764 if constants.NV_NODELIST not in node_result:
766 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
768 if node_result[constants.NV_NODELIST]:
770 for node in node_result[constants.NV_NODELIST]:
771 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
772 (node, node_result[constants.NV_NODELIST][node]))
774 if constants.NV_NODENETTEST not in node_result:
776 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
778 if node_result[constants.NV_NODENETTEST]:
780 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
782 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
783 (node, node_result[constants.NV_NODENETTEST][node]))
785 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786 if isinstance(hyp_result, dict):
787 for hv_name, hv_result in hyp_result.iteritems():
788 if hv_result is not None:
789 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
790 (hv_name, hv_result))
792 # check used drbd list
793 if vg_name is not None:
794 used_minors = node_result.get(constants.NV_DRBDLIST, [])
795 if not isinstance(used_minors, (tuple, list)):
796 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
799 for minor, (iname, must_exist) in drbd_map.items():
800 if minor not in used_minors and must_exist:
801 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
802 " not active" % (minor, iname))
804 for minor in used_minors:
805 if minor not in drbd_map:
806 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
812 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813 node_instance, feedback_fn, n_offline):
814 """Verify an instance.
816 This function checks to see if the required block devices are
817 available on the instance's node.
822 node_current = instanceconfig.primary_node
825 instanceconfig.MapLVsByNode(node_vol_should)
827 for node in node_vol_should:
828 if node in n_offline:
829 # ignore missing volumes on offline nodes
831 for volume in node_vol_should[node]:
832 if node not in node_vol_is or volume not in node_vol_is[node]:
833 feedback_fn(" - ERROR: volume %s missing on node %s" %
837 if instanceconfig.admin_up:
838 if ((node_current not in node_instance or
839 not instance in node_instance[node_current]) and
840 node_current not in n_offline):
841 feedback_fn(" - ERROR: instance %s not running on node %s" %
842 (instance, node_current))
845 for node in node_instance:
846 if (not node == node_current):
847 if instance in node_instance[node]:
848 feedback_fn(" - ERROR: instance %s should not run on node %s" %
854 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855 """Verify if there are any unknown volumes in the cluster.
857 The .os, .swap and backup volumes are ignored. All other volumes are
863 for node in node_vol_is:
864 for volume in node_vol_is[node]:
865 if node not in node_vol_should or volume not in node_vol_should[node]:
866 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
871 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872 """Verify the list of running instances.
874 This checks what instances are running but unknown to the cluster.
878 for node in node_instance:
879 for runninginstance in node_instance[node]:
880 if runninginstance not in instancelist:
881 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
882 (runninginstance, node))
886 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887 """Verify N+1 Memory Resilience.
889 Check that if one single node dies we can still start all the instances it
895 for node, nodeinfo in node_info.iteritems():
896 # This code checks that every node which is now listed as secondary has
897 # enough memory to host all instances it is supposed to should a single
898 # other node in the cluster fail.
899 # FIXME: not ready for failover to an arbitrary node
900 # FIXME: does not support file-backed instances
901 # WARNING: we currently take into account down instances as well as up
902 # ones, considering that even if they're down someone might want to start
903 # them even in the event of a node failure.
904 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
906 for instance in instances:
907 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908 if bep[constants.BE_AUTO_BALANCE]:
909 needed_mem += bep[constants.BE_MEMORY]
910 if nodeinfo['mfree'] < needed_mem:
911 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
912 " failovers should node %s fail" % (node, prinode))
916 def CheckPrereq(self):
917 """Check prerequisites.
919 Transform the list of checks we're going to skip into a set and check that
920 all its members are valid.
923 self.skip_set = frozenset(self.op.skip_checks)
924 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925 raise errors.OpPrereqError("Invalid checks to be skipped specified")
927 def BuildHooksEnv(self):
930 Cluster-Verify hooks just rone in the post phase and their failure makes
931 the output be logged in the verify output and the verification to fail.
934 all_nodes = self.cfg.GetNodeList()
936 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
938 for node in self.cfg.GetAllNodesInfo().values():
939 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
941 return env, [], all_nodes
943 def Exec(self, feedback_fn):
944 """Verify integrity of cluster, performing various test on nodes.
948 feedback_fn("* Verifying global settings")
949 for msg in self.cfg.VerifyConfig():
950 feedback_fn(" - ERROR: %s" % msg)
952 vg_name = self.cfg.GetVGName()
953 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954 nodelist = utils.NiceSort(self.cfg.GetNodeList())
955 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958 for iname in instancelist)
959 i_non_redundant = [] # Non redundant instances
960 i_non_a_balanced = [] # Non auto-balanced instances
961 n_offline = [] # List of offline nodes
962 n_drained = [] # List of nodes being drained
968 # FIXME: verify OS list
970 master_files = [constants.CLUSTER_CONF_FILE]
972 file_names = ssconf.SimpleStore().GetFileList()
973 file_names.append(constants.SSL_CERT_FILE)
974 file_names.append(constants.RAPI_CERT_FILE)
975 file_names.extend(master_files)
977 local_checksums = utils.FingerprintFiles(file_names)
979 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980 node_verify_param = {
981 constants.NV_FILELIST: file_names,
982 constants.NV_NODELIST: [node.name for node in nodeinfo
983 if not node.offline],
984 constants.NV_HYPERVISOR: hypervisors,
985 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986 node.secondary_ip) for node in nodeinfo
987 if not node.offline],
988 constants.NV_INSTANCELIST: hypervisors,
989 constants.NV_VERSION: None,
990 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
992 if vg_name is not None:
993 node_verify_param[constants.NV_VGLIST] = None
994 node_verify_param[constants.NV_LVLIST] = vg_name
995 node_verify_param[constants.NV_DRBDLIST] = None
996 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997 self.cfg.GetClusterName())
999 cluster = self.cfg.GetClusterInfo()
1000 master_node = self.cfg.GetMasterNode()
1001 all_drbd_map = self.cfg.ComputeDRBDMap()
1003 for node_i in nodeinfo:
1005 nresult = all_nvinfo[node].data
1008 feedback_fn("* Skipping offline node %s" % (node,))
1009 n_offline.append(node)
1012 if node == master_node:
1014 elif node_i.master_candidate:
1015 ntype = "master candidate"
1016 elif node_i.drained:
1018 n_drained.append(node)
1021 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1023 if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024 feedback_fn(" - ERROR: connection to %s failed" % (node,))
1029 for minor, instance in all_drbd_map[node].items():
1030 if instance not in instanceinfo:
1031 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1033 # ghost instance should not be running, but otherwise we
1034 # don't give double warnings (both ghost instance and
1035 # unallocated minor in use)
1036 node_drbd[minor] = (instance, False)
1038 instance = instanceinfo[instance]
1039 node_drbd[minor] = (instance.name, instance.admin_up)
1040 result = self._VerifyNode(node_i, file_names, local_checksums,
1041 nresult, feedback_fn, master_files,
1045 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1047 node_volume[node] = {}
1048 elif isinstance(lvdata, basestring):
1049 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1050 (node, utils.SafeEncode(lvdata)))
1052 node_volume[node] = {}
1053 elif not isinstance(lvdata, dict):
1054 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1058 node_volume[node] = lvdata
1061 idata = nresult.get(constants.NV_INSTANCELIST, None)
1062 if not isinstance(idata, list):
1063 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1068 node_instance[node] = idata
1071 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072 if not isinstance(nodeinfo, dict):
1073 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1079 "mfree": int(nodeinfo['memory_free']),
1082 # dictionary holding all instances this node is secondary for,
1083 # grouped by their primary node. Each key is a cluster node, and each
1084 # value is a list of instances which have the key as primary and the
1085 # current node as secondary. this is handy to calculate N+1 memory
1086 # availability if you can only failover from a primary to its
1088 "sinst-by-pnode": {},
1090 # FIXME: devise a free space model for file based instances as well
1091 if vg_name is not None:
1092 if (constants.NV_VGLIST not in nresult or
1093 vg_name not in nresult[constants.NV_VGLIST]):
1094 feedback_fn(" - ERROR: node %s didn't return data for the"
1095 " volume group '%s' - it is either missing or broken" %
1099 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100 except (ValueError, KeyError):
1101 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1102 " from node %s" % (node,))
1106 node_vol_should = {}
1108 for instance in instancelist:
1109 feedback_fn("* Verifying instance %s" % instance)
1110 inst_config = instanceinfo[instance]
1111 result = self._VerifyInstance(instance, inst_config, node_volume,
1112 node_instance, feedback_fn, n_offline)
1114 inst_nodes_offline = []
1116 inst_config.MapLVsByNode(node_vol_should)
1118 instance_cfg[instance] = inst_config
1120 pnode = inst_config.primary_node
1121 if pnode in node_info:
1122 node_info[pnode]['pinst'].append(instance)
1123 elif pnode not in n_offline:
1124 feedback_fn(" - ERROR: instance %s, connection to primary node"
1125 " %s failed" % (instance, pnode))
1128 if pnode in n_offline:
1129 inst_nodes_offline.append(pnode)
1131 # If the instance is non-redundant we cannot survive losing its primary
1132 # node, so we are not N+1 compliant. On the other hand we have no disk
1133 # templates with more than one secondary so that situation is not well
1135 # FIXME: does not support file-backed instances
1136 if len(inst_config.secondary_nodes) == 0:
1137 i_non_redundant.append(instance)
1138 elif len(inst_config.secondary_nodes) > 1:
1139 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1142 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143 i_non_a_balanced.append(instance)
1145 for snode in inst_config.secondary_nodes:
1146 if snode in node_info:
1147 node_info[snode]['sinst'].append(instance)
1148 if pnode not in node_info[snode]['sinst-by-pnode']:
1149 node_info[snode]['sinst-by-pnode'][pnode] = []
1150 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151 elif snode not in n_offline:
1152 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1153 " %s failed" % (instance, snode))
1155 if snode in n_offline:
1156 inst_nodes_offline.append(snode)
1158 if inst_nodes_offline:
1159 # warn that the instance lives on offline nodes, and set bad=True
1160 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1161 ", ".join(inst_nodes_offline))
1164 feedback_fn("* Verifying orphan volumes")
1165 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1169 feedback_fn("* Verifying remaining instances")
1170 result = self._VerifyOrphanInstances(instancelist, node_instance,
1174 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175 feedback_fn("* Verifying N+1 Memory redundancy")
1176 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1179 feedback_fn("* Other Notes")
1181 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1182 % len(i_non_redundant))
1184 if i_non_a_balanced:
1185 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1186 % len(i_non_a_balanced))
1189 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1192 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1196 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197 """Analize the post-hooks' result
1199 This method analyses the hook result, handles it, and sends some
1200 nicely-formatted feedback back to the user.
1202 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204 @param hooks_results: the results of the multi-node hooks rpc call
1205 @param feedback_fn: function used send feedback back to the caller
1206 @param lu_result: previous Exec result
1207 @return: the new Exec result, based on the previous result
1211 # We only really run POST phase hooks, and are only interested in
1213 if phase == constants.HOOKS_PHASE_POST:
1214 # Used to change hooks' output to proper indentation
1215 indent_re = re.compile('^', re.M)
1216 feedback_fn("* Hooks Results")
1217 if not hooks_results:
1218 feedback_fn(" - ERROR: general communication failure")
1221 for node_name in hooks_results:
1222 show_node_header = True
1223 res = hooks_results[node_name]
1224 if res.failed or res.data is False or not isinstance(res.data, list):
1226 # no need to warn or set fail return value
1228 feedback_fn(" Communication failure in hooks execution")
1231 for script, hkr, output in res.data:
1232 if hkr == constants.HKR_FAIL:
1233 # The node header is only shown once, if there are
1234 # failing hooks on that node
1235 if show_node_header:
1236 feedback_fn(" Node %s:" % node_name)
1237 show_node_header = False
1238 feedback_fn(" ERROR: Script %s failed, output:" % script)
1239 output = indent_re.sub(' ', output)
1240 feedback_fn("%s" % output)
1246 class LUVerifyDisks(NoHooksLU):
1247 """Verifies the cluster disks status.
1253 def ExpandNames(self):
1254 self.needed_locks = {
1255 locking.LEVEL_NODE: locking.ALL_SET,
1256 locking.LEVEL_INSTANCE: locking.ALL_SET,
1258 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1260 def CheckPrereq(self):
1261 """Check prerequisites.
1263 This has no prerequisites.
1268 def Exec(self, feedback_fn):
1269 """Verify integrity of cluster disks.
1272 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1274 vg_name = self.cfg.GetVGName()
1275 nodes = utils.NiceSort(self.cfg.GetNodeList())
1276 instances = [self.cfg.GetInstanceInfo(name)
1277 for name in self.cfg.GetInstanceList()]
1280 for inst in instances:
1282 if (not inst.admin_up or
1283 inst.disk_template not in constants.DTS_NET_MIRROR):
1285 inst.MapLVsByNode(inst_lvs)
1286 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287 for node, vol_list in inst_lvs.iteritems():
1288 for vol in vol_list:
1289 nv_dict[(node, vol)] = inst
1294 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1299 lvs = node_lvs[node]
1302 self.LogWarning("Connection to node %s failed: %s" %
1306 if isinstance(lvs, basestring):
1307 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308 res_nlvm[node] = lvs
1310 elif not isinstance(lvs, dict):
1311 logging.warning("Connection to node %s failed or invalid data"
1313 res_nodes.append(node)
1316 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317 inst = nv_dict.pop((node, lv_name), None)
1318 if (not lv_online and inst is not None
1319 and inst.name not in res_instances):
1320 res_instances.append(inst.name)
1322 # any leftover items in nv_dict are missing LVs, let's arrange the
1324 for key, inst in nv_dict.iteritems():
1325 if inst.name not in res_missing:
1326 res_missing[inst.name] = []
1327 res_missing[inst.name].append(key)
1332 class LURenameCluster(LogicalUnit):
1333 """Rename the cluster.
1336 HPATH = "cluster-rename"
1337 HTYPE = constants.HTYPE_CLUSTER
1340 def BuildHooksEnv(self):
1345 "OP_TARGET": self.cfg.GetClusterName(),
1346 "NEW_NAME": self.op.name,
1348 mn = self.cfg.GetMasterNode()
1349 return env, [mn], [mn]
1351 def CheckPrereq(self):
1352 """Verify that the passed name is a valid one.
1355 hostname = utils.HostInfo(self.op.name)
1357 new_name = hostname.name
1358 self.ip = new_ip = hostname.ip
1359 old_name = self.cfg.GetClusterName()
1360 old_ip = self.cfg.GetMasterIP()
1361 if new_name == old_name and new_ip == old_ip:
1362 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1363 " cluster has changed")
1364 if new_ip != old_ip:
1365 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1366 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1367 " reachable on the network. Aborting." %
1370 self.op.name = new_name
1372 def Exec(self, feedback_fn):
1373 """Rename the cluster.
1376 clustername = self.op.name
1379 # shutdown the master IP
1380 master = self.cfg.GetMasterNode()
1381 result = self.rpc.call_node_stop_master(master, False)
1382 if result.failed or not result.data:
1383 raise errors.OpExecError("Could not disable the master role")
1386 cluster = self.cfg.GetClusterInfo()
1387 cluster.cluster_name = clustername
1388 cluster.master_ip = ip
1389 self.cfg.Update(cluster)
1391 # update the known hosts file
1392 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1393 node_list = self.cfg.GetNodeList()
1395 node_list.remove(master)
1398 result = self.rpc.call_upload_file(node_list,
1399 constants.SSH_KNOWN_HOSTS_FILE)
1400 for to_node, to_result in result.iteritems():
1401 if to_result.failed or not to_result.data:
1402 logging.error("Copy of file %s to node %s failed",
1403 constants.SSH_KNOWN_HOSTS_FILE, to_node)
1406 result = self.rpc.call_node_start_master(master, False)
1407 if result.failed or not result.data:
1408 self.LogWarning("Could not re-enable the master role on"
1409 " the master, please restart manually.")
1412 def _RecursiveCheckIfLVMBased(disk):
1413 """Check if the given disk or its children are lvm-based.
1415 @type disk: L{objects.Disk}
1416 @param disk: the disk to check
1418 @return: boolean indicating whether a LD_LV dev_type was found or not
1422 for chdisk in disk.children:
1423 if _RecursiveCheckIfLVMBased(chdisk):
1425 return disk.dev_type == constants.LD_LV
1428 class LUSetClusterParams(LogicalUnit):
1429 """Change the parameters of the cluster.
1432 HPATH = "cluster-modify"
1433 HTYPE = constants.HTYPE_CLUSTER
1437 def CheckArguments(self):
1441 if not hasattr(self.op, "candidate_pool_size"):
1442 self.op.candidate_pool_size = None
1443 if self.op.candidate_pool_size is not None:
1445 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1446 except (ValueError, TypeError), err:
1447 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1449 if self.op.candidate_pool_size < 1:
1450 raise errors.OpPrereqError("At least one master candidate needed")
1452 def ExpandNames(self):
1453 # FIXME: in the future maybe other cluster params won't require checking on
1454 # all nodes to be modified.
1455 self.needed_locks = {
1456 locking.LEVEL_NODE: locking.ALL_SET,
1458 self.share_locks[locking.LEVEL_NODE] = 1
1460 def BuildHooksEnv(self):
1465 "OP_TARGET": self.cfg.GetClusterName(),
1466 "NEW_VG_NAME": self.op.vg_name,
1468 mn = self.cfg.GetMasterNode()
1469 return env, [mn], [mn]
1471 def CheckPrereq(self):
1472 """Check prerequisites.
1474 This checks whether the given params don't conflict and
1475 if the given volume group is valid.
1478 if self.op.vg_name is not None and not self.op.vg_name:
1479 instances = self.cfg.GetAllInstancesInfo().values()
1480 for inst in instances:
1481 for disk in inst.disks:
1482 if _RecursiveCheckIfLVMBased(disk):
1483 raise errors.OpPrereqError("Cannot disable lvm storage while"
1484 " lvm-based instances exist")
1486 node_list = self.acquired_locks[locking.LEVEL_NODE]
1488 # if vg_name not None, checks given volume group on all nodes
1490 vglist = self.rpc.call_vg_list(node_list)
1491 for node in node_list:
1492 if vglist[node].failed:
1493 # ignoring down node
1494 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1496 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1498 constants.MIN_VG_SIZE)
1500 raise errors.OpPrereqError("Error on node '%s': %s" %
1503 self.cluster = cluster = self.cfg.GetClusterInfo()
1504 # validate beparams changes
1505 if self.op.beparams:
1506 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1507 self.new_beparams = cluster.FillDict(
1508 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1510 # hypervisor list/parameters
1511 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1512 if self.op.hvparams:
1513 if not isinstance(self.op.hvparams, dict):
1514 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1515 for hv_name, hv_dict in self.op.hvparams.items():
1516 if hv_name not in self.new_hvparams:
1517 self.new_hvparams[hv_name] = hv_dict
1519 self.new_hvparams[hv_name].update(hv_dict)
1521 if self.op.enabled_hypervisors is not None:
1522 self.hv_list = self.op.enabled_hypervisors
1524 self.hv_list = cluster.enabled_hypervisors
1526 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1527 # either the enabled list has changed, or the parameters have, validate
1528 for hv_name, hv_params in self.new_hvparams.items():
1529 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1530 (self.op.enabled_hypervisors and
1531 hv_name in self.op.enabled_hypervisors)):
1532 # either this is a new hypervisor, or its parameters have changed
1533 hv_class = hypervisor.GetHypervisor(hv_name)
1534 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1535 hv_class.CheckParameterSyntax(hv_params)
1536 _CheckHVParams(self, node_list, hv_name, hv_params)
1538 def Exec(self, feedback_fn):
1539 """Change the parameters of the cluster.
1542 if self.op.vg_name is not None:
1543 new_volume = self.op.vg_name
1546 if new_volume != self.cfg.GetVGName():
1547 self.cfg.SetVGName(new_volume)
1549 feedback_fn("Cluster LVM configuration already in desired"
1550 " state, not changing")
1551 if self.op.hvparams:
1552 self.cluster.hvparams = self.new_hvparams
1553 if self.op.enabled_hypervisors is not None:
1554 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1555 if self.op.beparams:
1556 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1557 if self.op.candidate_pool_size is not None:
1558 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1560 self.cfg.Update(self.cluster)
1562 # we want to update nodes after the cluster so that if any errors
1563 # happen, we have recorded and saved the cluster info
1564 if self.op.candidate_pool_size is not None:
1565 _AdjustCandidatePool(self)
1568 class LURedistributeConfig(NoHooksLU):
1569 """Force the redistribution of cluster configuration.
1571 This is a very simple LU.
1577 def ExpandNames(self):
1578 self.needed_locks = {
1579 locking.LEVEL_NODE: locking.ALL_SET,
1581 self.share_locks[locking.LEVEL_NODE] = 1
1583 def CheckPrereq(self):
1584 """Check prerequisites.
1588 def Exec(self, feedback_fn):
1589 """Redistribute the configuration.
1592 self.cfg.Update(self.cfg.GetClusterInfo())
1595 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1596 """Sleep and poll for an instance's disk to sync.
1599 if not instance.disks:
1603 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1605 node = instance.primary_node
1607 for dev in instance.disks:
1608 lu.cfg.SetDiskID(dev, node)
1611 degr_retries = 10 # in seconds, as we sleep 1 second each time
1615 cumul_degraded = False
1616 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1617 if rstats.failed or not rstats.data:
1618 lu.LogWarning("Can't get any data from node %s", node)
1621 raise errors.RemoteError("Can't contact node %s for mirror data,"
1622 " aborting." % node)
1625 rstats = rstats.data
1627 for i, mstat in enumerate(rstats):
1629 lu.LogWarning("Can't compute data for node %s/%s",
1630 node, instance.disks[i].iv_name)
1632 # we ignore the ldisk parameter
1633 perc_done, est_time, is_degraded, _ = mstat
1634 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1635 if perc_done is not None:
1637 if est_time is not None:
1638 rem_time = "%d estimated seconds remaining" % est_time
1641 rem_time = "no time estimate"
1642 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1643 (instance.disks[i].iv_name, perc_done, rem_time))
1645 # if we're done but degraded, let's do a few small retries, to
1646 # make sure we see a stable and not transient situation; therefore
1647 # we force restart of the loop
1648 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1649 logging.info("Degraded disks found, %d retries left", degr_retries)
1657 time.sleep(min(60, max_time))
1660 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1661 return not cumul_degraded
1664 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1665 """Check that mirrors are not degraded.
1667 The ldisk parameter, if True, will change the test from the
1668 is_degraded attribute (which represents overall non-ok status for
1669 the device(s)) to the ldisk (representing the local storage status).
1672 lu.cfg.SetDiskID(dev, node)
1679 if on_primary or dev.AssembleOnSecondary():
1680 rstats = lu.rpc.call_blockdev_find(node, dev)
1681 msg = rstats.RemoteFailMsg()
1683 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1685 elif not rstats.payload:
1686 lu.LogWarning("Can't find disk on node %s", node)
1689 result = result and (not rstats.payload[idx])
1691 for child in dev.children:
1692 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1697 class LUDiagnoseOS(NoHooksLU):
1698 """Logical unit for OS diagnose/query.
1701 _OP_REQP = ["output_fields", "names"]
1703 _FIELDS_STATIC = utils.FieldSet()
1704 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1706 def ExpandNames(self):
1708 raise errors.OpPrereqError("Selective OS query not supported")
1710 _CheckOutputFields(static=self._FIELDS_STATIC,
1711 dynamic=self._FIELDS_DYNAMIC,
1712 selected=self.op.output_fields)
1714 # Lock all nodes, in shared mode
1715 # Temporary removal of locks, should be reverted later
1716 # TODO: reintroduce locks when they are lighter-weight
1717 self.needed_locks = {}
1718 #self.share_locks[locking.LEVEL_NODE] = 1
1719 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1721 def CheckPrereq(self):
1722 """Check prerequisites.
1727 def _DiagnoseByOS(node_list, rlist):
1728 """Remaps a per-node return list into an a per-os per-node dictionary
1730 @param node_list: a list with the names of all nodes
1731 @param rlist: a map with node names as keys and OS objects as values
1734 @return: a dictionary with osnames as keys and as value another map, with
1735 nodes as keys and list of OS objects as values, eg::
1737 {"debian-etch": {"node1": [<object>,...],
1738 "node2": [<object>,]}
1743 # we build here the list of nodes that didn't fail the RPC (at RPC
1744 # level), so that nodes with a non-responding node daemon don't
1745 # make all OSes invalid
1746 good_nodes = [node_name for node_name in rlist
1747 if not rlist[node_name].failed]
1748 for node_name, nr in rlist.iteritems():
1749 if nr.failed or not nr.data:
1751 for os_obj in nr.data:
1752 if os_obj.name not in all_os:
1753 # build a list of nodes for this os containing empty lists
1754 # for each node in node_list
1755 all_os[os_obj.name] = {}
1756 for nname in good_nodes:
1757 all_os[os_obj.name][nname] = []
1758 all_os[os_obj.name][node_name].append(os_obj)
1761 def Exec(self, feedback_fn):
1762 """Compute the list of OSes.
1765 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1766 node_data = self.rpc.call_os_diagnose(valid_nodes)
1767 if node_data == False:
1768 raise errors.OpExecError("Can't gather the list of OSes")
1769 pol = self._DiagnoseByOS(valid_nodes, node_data)
1771 for os_name, os_data in pol.iteritems():
1773 for field in self.op.output_fields:
1776 elif field == "valid":
1777 val = utils.all([osl and osl[0] for osl in os_data.values()])
1778 elif field == "node_status":
1780 for node_name, nos_list in os_data.iteritems():
1781 val[node_name] = [(v.status, v.path) for v in nos_list]
1783 raise errors.ParameterError(field)
1790 class LURemoveNode(LogicalUnit):
1791 """Logical unit for removing a node.
1794 HPATH = "node-remove"
1795 HTYPE = constants.HTYPE_NODE
1796 _OP_REQP = ["node_name"]
1798 def BuildHooksEnv(self):
1801 This doesn't run on the target node in the pre phase as a failed
1802 node would then be impossible to remove.
1806 "OP_TARGET": self.op.node_name,
1807 "NODE_NAME": self.op.node_name,
1809 all_nodes = self.cfg.GetNodeList()
1810 all_nodes.remove(self.op.node_name)
1811 return env, all_nodes, all_nodes
1813 def CheckPrereq(self):
1814 """Check prerequisites.
1817 - the node exists in the configuration
1818 - it does not have primary or secondary instances
1819 - it's not the master
1821 Any errors are signalled by raising errors.OpPrereqError.
1824 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1826 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1828 instance_list = self.cfg.GetInstanceList()
1830 masternode = self.cfg.GetMasterNode()
1831 if node.name == masternode:
1832 raise errors.OpPrereqError("Node is the master node,"
1833 " you need to failover first.")
1835 for instance_name in instance_list:
1836 instance = self.cfg.GetInstanceInfo(instance_name)
1837 if node.name in instance.all_nodes:
1838 raise errors.OpPrereqError("Instance %s is still running on the node,"
1839 " please remove first." % instance_name)
1840 self.op.node_name = node.name
1843 def Exec(self, feedback_fn):
1844 """Removes the node from the cluster.
1848 logging.info("Stopping the node daemon and removing configs from node %s",
1851 self.context.RemoveNode(node.name)
1853 self.rpc.call_node_leave_cluster(node.name)
1855 # Promote nodes to master candidate as needed
1856 _AdjustCandidatePool(self)
1859 class LUQueryNodes(NoHooksLU):
1860 """Logical unit for querying nodes.
1863 _OP_REQP = ["output_fields", "names", "use_locking"]
1865 _FIELDS_DYNAMIC = utils.FieldSet(
1867 "mtotal", "mnode", "mfree",
1869 "ctotal", "cnodes", "csockets",
1872 _FIELDS_STATIC = utils.FieldSet(
1873 "name", "pinst_cnt", "sinst_cnt",
1874 "pinst_list", "sinst_list",
1875 "pip", "sip", "tags",
1883 def ExpandNames(self):
1884 _CheckOutputFields(static=self._FIELDS_STATIC,
1885 dynamic=self._FIELDS_DYNAMIC,
1886 selected=self.op.output_fields)
1888 self.needed_locks = {}
1889 self.share_locks[locking.LEVEL_NODE] = 1
1892 self.wanted = _GetWantedNodes(self, self.op.names)
1894 self.wanted = locking.ALL_SET
1896 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1897 self.do_locking = self.do_node_query and self.op.use_locking
1899 # if we don't request only static fields, we need to lock the nodes
1900 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1903 def CheckPrereq(self):
1904 """Check prerequisites.
1907 # The validation of the node list is done in the _GetWantedNodes,
1908 # if non empty, and if empty, there's no validation to do
1911 def Exec(self, feedback_fn):
1912 """Computes the list of nodes and their attributes.
1915 all_info = self.cfg.GetAllNodesInfo()
1917 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1918 elif self.wanted != locking.ALL_SET:
1919 nodenames = self.wanted
1920 missing = set(nodenames).difference(all_info.keys())
1922 raise errors.OpExecError(
1923 "Some nodes were removed before retrieving their data: %s" % missing)
1925 nodenames = all_info.keys()
1927 nodenames = utils.NiceSort(nodenames)
1928 nodelist = [all_info[name] for name in nodenames]
1930 # begin data gathering
1932 if self.do_node_query:
1934 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1935 self.cfg.GetHypervisorType())
1936 for name in nodenames:
1937 nodeinfo = node_data[name]
1938 if not nodeinfo.failed and nodeinfo.data:
1939 nodeinfo = nodeinfo.data
1940 fn = utils.TryConvert
1942 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1943 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1944 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1945 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1946 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1947 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1948 "bootid": nodeinfo.get('bootid', None),
1949 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1950 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1953 live_data[name] = {}
1955 live_data = dict.fromkeys(nodenames, {})
1957 node_to_primary = dict([(name, set()) for name in nodenames])
1958 node_to_secondary = dict([(name, set()) for name in nodenames])
1960 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1961 "sinst_cnt", "sinst_list"))
1962 if inst_fields & frozenset(self.op.output_fields):
1963 instancelist = self.cfg.GetInstanceList()
1965 for instance_name in instancelist:
1966 inst = self.cfg.GetInstanceInfo(instance_name)
1967 if inst.primary_node in node_to_primary:
1968 node_to_primary[inst.primary_node].add(inst.name)
1969 for secnode in inst.secondary_nodes:
1970 if secnode in node_to_secondary:
1971 node_to_secondary[secnode].add(inst.name)
1973 master_node = self.cfg.GetMasterNode()
1975 # end data gathering
1978 for node in nodelist:
1980 for field in self.op.output_fields:
1983 elif field == "pinst_list":
1984 val = list(node_to_primary[node.name])
1985 elif field == "sinst_list":
1986 val = list(node_to_secondary[node.name])
1987 elif field == "pinst_cnt":
1988 val = len(node_to_primary[node.name])
1989 elif field == "sinst_cnt":
1990 val = len(node_to_secondary[node.name])
1991 elif field == "pip":
1992 val = node.primary_ip
1993 elif field == "sip":
1994 val = node.secondary_ip
1995 elif field == "tags":
1996 val = list(node.GetTags())
1997 elif field == "serial_no":
1998 val = node.serial_no
1999 elif field == "master_candidate":
2000 val = node.master_candidate
2001 elif field == "master":
2002 val = node.name == master_node
2003 elif field == "offline":
2005 elif field == "drained":
2007 elif self._FIELDS_DYNAMIC.Matches(field):
2008 val = live_data[node.name].get(field, None)
2010 raise errors.ParameterError(field)
2011 node_output.append(val)
2012 output.append(node_output)
2017 class LUQueryNodeVolumes(NoHooksLU):
2018 """Logical unit for getting volumes on node(s).
2021 _OP_REQP = ["nodes", "output_fields"]
2023 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2024 _FIELDS_STATIC = utils.FieldSet("node")
2026 def ExpandNames(self):
2027 _CheckOutputFields(static=self._FIELDS_STATIC,
2028 dynamic=self._FIELDS_DYNAMIC,
2029 selected=self.op.output_fields)
2031 self.needed_locks = {}
2032 self.share_locks[locking.LEVEL_NODE] = 1
2033 if not self.op.nodes:
2034 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2036 self.needed_locks[locking.LEVEL_NODE] = \
2037 _GetWantedNodes(self, self.op.nodes)
2039 def CheckPrereq(self):
2040 """Check prerequisites.
2042 This checks that the fields required are valid output fields.
2045 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2047 def Exec(self, feedback_fn):
2048 """Computes the list of nodes and their attributes.
2051 nodenames = self.nodes
2052 volumes = self.rpc.call_node_volumes(nodenames)
2054 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2055 in self.cfg.GetInstanceList()]
2057 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2060 for node in nodenames:
2061 if node not in volumes or volumes[node].failed or not volumes[node].data:
2064 node_vols = volumes[node].data[:]
2065 node_vols.sort(key=lambda vol: vol['dev'])
2067 for vol in node_vols:
2069 for field in self.op.output_fields:
2072 elif field == "phys":
2076 elif field == "name":
2078 elif field == "size":
2079 val = int(float(vol['size']))
2080 elif field == "instance":
2082 if node not in lv_by_node[inst]:
2084 if vol['name'] in lv_by_node[inst][node]:
2090 raise errors.ParameterError(field)
2091 node_output.append(str(val))
2093 output.append(node_output)
2098 class LUAddNode(LogicalUnit):
2099 """Logical unit for adding node to the cluster.
2103 HTYPE = constants.HTYPE_NODE
2104 _OP_REQP = ["node_name"]
2106 def BuildHooksEnv(self):
2109 This will run on all nodes before, and on all nodes + the new node after.
2113 "OP_TARGET": self.op.node_name,
2114 "NODE_NAME": self.op.node_name,
2115 "NODE_PIP": self.op.primary_ip,
2116 "NODE_SIP": self.op.secondary_ip,
2118 nodes_0 = self.cfg.GetNodeList()
2119 nodes_1 = nodes_0 + [self.op.node_name, ]
2120 return env, nodes_0, nodes_1
2122 def CheckPrereq(self):
2123 """Check prerequisites.
2126 - the new node is not already in the config
2128 - its parameters (single/dual homed) matches the cluster
2130 Any errors are signalled by raising errors.OpPrereqError.
2133 node_name = self.op.node_name
2136 dns_data = utils.HostInfo(node_name)
2138 node = dns_data.name
2139 primary_ip = self.op.primary_ip = dns_data.ip
2140 secondary_ip = getattr(self.op, "secondary_ip", None)
2141 if secondary_ip is None:
2142 secondary_ip = primary_ip
2143 if not utils.IsValidIP(secondary_ip):
2144 raise errors.OpPrereqError("Invalid secondary IP given")
2145 self.op.secondary_ip = secondary_ip
2147 node_list = cfg.GetNodeList()
2148 if not self.op.readd and node in node_list:
2149 raise errors.OpPrereqError("Node %s is already in the configuration" %
2151 elif self.op.readd and node not in node_list:
2152 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2154 for existing_node_name in node_list:
2155 existing_node = cfg.GetNodeInfo(existing_node_name)
2157 if self.op.readd and node == existing_node_name:
2158 if (existing_node.primary_ip != primary_ip or
2159 existing_node.secondary_ip != secondary_ip):
2160 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2161 " address configuration as before")
2164 if (existing_node.primary_ip == primary_ip or
2165 existing_node.secondary_ip == primary_ip or
2166 existing_node.primary_ip == secondary_ip or
2167 existing_node.secondary_ip == secondary_ip):
2168 raise errors.OpPrereqError("New node ip address(es) conflict with"
2169 " existing node %s" % existing_node.name)
2171 # check that the type of the node (single versus dual homed) is the
2172 # same as for the master
2173 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2174 master_singlehomed = myself.secondary_ip == myself.primary_ip
2175 newbie_singlehomed = secondary_ip == primary_ip
2176 if master_singlehomed != newbie_singlehomed:
2177 if master_singlehomed:
2178 raise errors.OpPrereqError("The master has no private ip but the"
2179 " new node has one")
2181 raise errors.OpPrereqError("The master has a private ip but the"
2182 " new node doesn't have one")
2184 # checks reachablity
2185 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2186 raise errors.OpPrereqError("Node not reachable by ping")
2188 if not newbie_singlehomed:
2189 # check reachability from my secondary ip to newbie's secondary ip
2190 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2191 source=myself.secondary_ip):
2192 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2193 " based ping to noded port")
2195 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2196 mc_now, _ = self.cfg.GetMasterCandidateStats()
2197 master_candidate = mc_now < cp_size
2199 self.new_node = objects.Node(name=node,
2200 primary_ip=primary_ip,
2201 secondary_ip=secondary_ip,
2202 master_candidate=master_candidate,
2203 offline=False, drained=False)
2205 def Exec(self, feedback_fn):
2206 """Adds the new node to the cluster.
2209 new_node = self.new_node
2210 node = new_node.name
2212 # check connectivity
2213 result = self.rpc.call_version([node])[node]
2216 if constants.PROTOCOL_VERSION == result.data:
2217 logging.info("Communication to node %s fine, sw version %s match",
2220 raise errors.OpExecError("Version mismatch master version %s,"
2221 " node version %s" %
2222 (constants.PROTOCOL_VERSION, result.data))
2224 raise errors.OpExecError("Cannot get version from the new node")
2227 logging.info("Copy ssh key to node %s", node)
2228 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2230 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2231 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2237 keyarray.append(f.read())
2241 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2243 keyarray[3], keyarray[4], keyarray[5])
2245 msg = result.RemoteFailMsg()
2247 raise errors.OpExecError("Cannot transfer ssh keys to the"
2248 " new node: %s" % msg)
2250 # Add node to our /etc/hosts, and add key to known_hosts
2251 utils.AddHostToEtcHosts(new_node.name)
2253 if new_node.secondary_ip != new_node.primary_ip:
2254 result = self.rpc.call_node_has_ip_address(new_node.name,
2255 new_node.secondary_ip)
2256 if result.failed or not result.data:
2257 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2258 " you gave (%s). Please fix and re-run this"
2259 " command." % new_node.secondary_ip)
2261 node_verify_list = [self.cfg.GetMasterNode()]
2262 node_verify_param = {
2264 # TODO: do a node-net-test as well?
2267 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2268 self.cfg.GetClusterName())
2269 for verifier in node_verify_list:
2270 if result[verifier].failed or not result[verifier].data:
2271 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2272 " for remote verification" % verifier)
2273 if result[verifier].data['nodelist']:
2274 for failed in result[verifier].data['nodelist']:
2275 feedback_fn("ssh/hostname verification failed %s -> %s" %
2276 (verifier, result[verifier].data['nodelist'][failed]))
2277 raise errors.OpExecError("ssh/hostname verification failed.")
2279 # Distribute updated /etc/hosts and known_hosts to all nodes,
2280 # including the node just added
2281 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2282 dist_nodes = self.cfg.GetNodeList()
2283 if not self.op.readd:
2284 dist_nodes.append(node)
2285 if myself.name in dist_nodes:
2286 dist_nodes.remove(myself.name)
2288 logging.debug("Copying hosts and known_hosts to all nodes")
2289 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2290 result = self.rpc.call_upload_file(dist_nodes, fname)
2291 for to_node, to_result in result.iteritems():
2292 if to_result.failed or not to_result.data:
2293 logging.error("Copy of file %s to node %s failed", fname, to_node)
2296 enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2297 if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2298 to_copy.append(constants.VNC_PASSWORD_FILE)
2300 for fname in to_copy:
2301 result = self.rpc.call_upload_file([node], fname)
2302 if result[node].failed or not result[node]:
2303 logging.error("Could not copy file %s to node %s", fname, node)
2306 self.context.ReaddNode(new_node)
2308 self.context.AddNode(new_node)
2311 class LUSetNodeParams(LogicalUnit):
2312 """Modifies the parameters of a node.
2315 HPATH = "node-modify"
2316 HTYPE = constants.HTYPE_NODE
2317 _OP_REQP = ["node_name"]
2320 def CheckArguments(self):
2321 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2322 if node_name is None:
2323 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2324 self.op.node_name = node_name
2325 _CheckBooleanOpField(self.op, 'master_candidate')
2326 _CheckBooleanOpField(self.op, 'offline')
2327 _CheckBooleanOpField(self.op, 'drained')
2328 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2329 if all_mods.count(None) == 3:
2330 raise errors.OpPrereqError("Please pass at least one modification")
2331 if all_mods.count(True) > 1:
2332 raise errors.OpPrereqError("Can't set the node into more than one"
2333 " state at the same time")
2335 def ExpandNames(self):
2336 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2338 def BuildHooksEnv(self):
2341 This runs on the master node.
2345 "OP_TARGET": self.op.node_name,
2346 "MASTER_CANDIDATE": str(self.op.master_candidate),
2347 "OFFLINE": str(self.op.offline),
2348 "DRAINED": str(self.op.drained),
2350 nl = [self.cfg.GetMasterNode(),
2354 def CheckPrereq(self):
2355 """Check prerequisites.
2357 This only checks the instance list against the existing names.
2360 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2362 if ((self.op.master_candidate == False or self.op.offline == True or
2363 self.op.drained == True) and node.master_candidate):
2364 # we will demote the node from master_candidate
2365 if self.op.node_name == self.cfg.GetMasterNode():
2366 raise errors.OpPrereqError("The master node has to be a"
2367 " master candidate, online and not drained")
2368 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2369 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2370 if num_candidates <= cp_size:
2371 msg = ("Not enough master candidates (desired"
2372 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2374 self.LogWarning(msg)
2376 raise errors.OpPrereqError(msg)
2378 if (self.op.master_candidate == True and
2379 ((node.offline and not self.op.offline == False) or
2380 (node.drained and not self.op.drained == False))):
2381 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2382 " to master_candidate" % node.name)
2386 def Exec(self, feedback_fn):
2395 if self.op.offline is not None:
2396 node.offline = self.op.offline
2397 result.append(("offline", str(self.op.offline)))
2398 if self.op.offline == True:
2399 if node.master_candidate:
2400 node.master_candidate = False
2402 result.append(("master_candidate", "auto-demotion due to offline"))
2404 node.drained = False
2405 result.append(("drained", "clear drained status due to offline"))
2407 if self.op.master_candidate is not None:
2408 node.master_candidate = self.op.master_candidate
2410 result.append(("master_candidate", str(self.op.master_candidate)))
2411 if self.op.master_candidate == False:
2412 rrc = self.rpc.call_node_demote_from_mc(node.name)
2413 msg = rrc.RemoteFailMsg()
2415 self.LogWarning("Node failed to demote itself: %s" % msg)
2417 if self.op.drained is not None:
2418 node.drained = self.op.drained
2419 result.append(("drained", str(self.op.drained)))
2420 if self.op.drained == True:
2421 if node.master_candidate:
2422 node.master_candidate = False
2424 result.append(("master_candidate", "auto-demotion due to drain"))
2426 node.offline = False
2427 result.append(("offline", "clear offline status due to drain"))
2429 # this will trigger configuration file update, if needed
2430 self.cfg.Update(node)
2431 # this will trigger job queue propagation or cleanup
2433 self.context.ReaddNode(node)
2438 class LUQueryClusterInfo(NoHooksLU):
2439 """Query cluster configuration.
2445 def ExpandNames(self):
2446 self.needed_locks = {}
2448 def CheckPrereq(self):
2449 """No prerequsites needed for this LU.
2454 def Exec(self, feedback_fn):
2455 """Return cluster config.
2458 cluster = self.cfg.GetClusterInfo()
2460 "software_version": constants.RELEASE_VERSION,
2461 "protocol_version": constants.PROTOCOL_VERSION,
2462 "config_version": constants.CONFIG_VERSION,
2463 "os_api_version": constants.OS_API_VERSION,
2464 "export_version": constants.EXPORT_VERSION,
2465 "architecture": (platform.architecture()[0], platform.machine()),
2466 "name": cluster.cluster_name,
2467 "master": cluster.master_node,
2468 "default_hypervisor": cluster.default_hypervisor,
2469 "enabled_hypervisors": cluster.enabled_hypervisors,
2470 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2471 for hypervisor in cluster.enabled_hypervisors]),
2472 "beparams": cluster.beparams,
2473 "candidate_pool_size": cluster.candidate_pool_size,
2474 "default_bridge": cluster.default_bridge,
2475 "master_netdev": cluster.master_netdev,
2476 "volume_group_name": cluster.volume_group_name,
2477 "file_storage_dir": cluster.file_storage_dir,
2483 class LUQueryConfigValues(NoHooksLU):
2484 """Return configuration values.
2489 _FIELDS_DYNAMIC = utils.FieldSet()
2490 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2492 def ExpandNames(self):
2493 self.needed_locks = {}
2495 _CheckOutputFields(static=self._FIELDS_STATIC,
2496 dynamic=self._FIELDS_DYNAMIC,
2497 selected=self.op.output_fields)
2499 def CheckPrereq(self):
2500 """No prerequisites.
2505 def Exec(self, feedback_fn):
2506 """Dump a representation of the cluster config to the standard output.
2510 for field in self.op.output_fields:
2511 if field == "cluster_name":
2512 entry = self.cfg.GetClusterName()
2513 elif field == "master_node":
2514 entry = self.cfg.GetMasterNode()
2515 elif field == "drain_flag":
2516 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2518 raise errors.ParameterError(field)
2519 values.append(entry)
2523 class LUActivateInstanceDisks(NoHooksLU):
2524 """Bring up an instance's disks.
2527 _OP_REQP = ["instance_name"]
2530 def ExpandNames(self):
2531 self._ExpandAndLockInstance()
2532 self.needed_locks[locking.LEVEL_NODE] = []
2533 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2535 def DeclareLocks(self, level):
2536 if level == locking.LEVEL_NODE:
2537 self._LockInstancesNodes()
2539 def CheckPrereq(self):
2540 """Check prerequisites.
2542 This checks that the instance is in the cluster.
2545 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2546 assert self.instance is not None, \
2547 "Cannot retrieve locked instance %s" % self.op.instance_name
2548 _CheckNodeOnline(self, self.instance.primary_node)
2550 def Exec(self, feedback_fn):
2551 """Activate the disks.
2554 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2556 raise errors.OpExecError("Cannot activate block devices")
2561 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2562 """Prepare the block devices for an instance.
2564 This sets up the block devices on all nodes.
2566 @type lu: L{LogicalUnit}
2567 @param lu: the logical unit on whose behalf we execute
2568 @type instance: L{objects.Instance}
2569 @param instance: the instance for whose disks we assemble
2570 @type ignore_secondaries: boolean
2571 @param ignore_secondaries: if true, errors on secondary nodes
2572 won't result in an error return from the function
2573 @return: False if the operation failed, otherwise a list of
2574 (host, instance_visible_name, node_visible_name)
2575 with the mapping from node devices to instance devices
2580 iname = instance.name
2581 # With the two passes mechanism we try to reduce the window of
2582 # opportunity for the race condition of switching DRBD to primary
2583 # before handshaking occured, but we do not eliminate it
2585 # The proper fix would be to wait (with some limits) until the
2586 # connection has been made and drbd transitions from WFConnection
2587 # into any other network-connected state (Connected, SyncTarget,
2590 # 1st pass, assemble on all nodes in secondary mode
2591 for inst_disk in instance.disks:
2592 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2593 lu.cfg.SetDiskID(node_disk, node)
2594 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2595 msg = result.RemoteFailMsg()
2597 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2598 " (is_primary=False, pass=1): %s",
2599 inst_disk.iv_name, node, msg)
2600 if not ignore_secondaries:
2603 # FIXME: race condition on drbd migration to primary
2605 # 2nd pass, do only the primary node
2606 for inst_disk in instance.disks:
2607 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2608 if node != instance.primary_node:
2610 lu.cfg.SetDiskID(node_disk, node)
2611 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2612 msg = result.RemoteFailMsg()
2614 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2615 " (is_primary=True, pass=2): %s",
2616 inst_disk.iv_name, node, msg)
2618 device_info.append((instance.primary_node, inst_disk.iv_name,
2621 # leave the disks configured for the primary node
2622 # this is a workaround that would be fixed better by
2623 # improving the logical/physical id handling
2624 for disk in instance.disks:
2625 lu.cfg.SetDiskID(disk, instance.primary_node)
2627 return disks_ok, device_info
2630 def _StartInstanceDisks(lu, instance, force):
2631 """Start the disks of an instance.
2634 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2635 ignore_secondaries=force)
2637 _ShutdownInstanceDisks(lu, instance)
2638 if force is not None and not force:
2639 lu.proc.LogWarning("", hint="If the message above refers to a"
2641 " you can retry the operation using '--force'.")
2642 raise errors.OpExecError("Disk consistency error")
2645 class LUDeactivateInstanceDisks(NoHooksLU):
2646 """Shutdown an instance's disks.
2649 _OP_REQP = ["instance_name"]
2652 def ExpandNames(self):
2653 self._ExpandAndLockInstance()
2654 self.needed_locks[locking.LEVEL_NODE] = []
2655 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2657 def DeclareLocks(self, level):
2658 if level == locking.LEVEL_NODE:
2659 self._LockInstancesNodes()
2661 def CheckPrereq(self):
2662 """Check prerequisites.
2664 This checks that the instance is in the cluster.
2667 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2668 assert self.instance is not None, \
2669 "Cannot retrieve locked instance %s" % self.op.instance_name
2671 def Exec(self, feedback_fn):
2672 """Deactivate the disks
2675 instance = self.instance
2676 _SafeShutdownInstanceDisks(self, instance)
2679 def _SafeShutdownInstanceDisks(lu, instance):
2680 """Shutdown block devices of an instance.
2682 This function checks if an instance is running, before calling
2683 _ShutdownInstanceDisks.
2686 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2687 [instance.hypervisor])
2688 ins_l = ins_l[instance.primary_node]
2689 if ins_l.failed or not isinstance(ins_l.data, list):
2690 raise errors.OpExecError("Can't contact node '%s'" %
2691 instance.primary_node)
2693 if instance.name in ins_l.data:
2694 raise errors.OpExecError("Instance is running, can't shutdown"
2697 _ShutdownInstanceDisks(lu, instance)
2700 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2701 """Shutdown block devices of an instance.
2703 This does the shutdown on all nodes of the instance.
2705 If the ignore_primary is false, errors on the primary node are
2710 for disk in instance.disks:
2711 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2712 lu.cfg.SetDiskID(top_disk, node)
2713 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2714 msg = result.RemoteFailMsg()
2716 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2717 disk.iv_name, node, msg)
2718 if not ignore_primary or node != instance.primary_node:
2723 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2724 """Checks if a node has enough free memory.
2726 This function check if a given node has the needed amount of free
2727 memory. In case the node has less memory or we cannot get the
2728 information from the node, this function raise an OpPrereqError
2731 @type lu: C{LogicalUnit}
2732 @param lu: a logical unit from which we get configuration data
2734 @param node: the node to check
2735 @type reason: C{str}
2736 @param reason: string to use in the error message
2737 @type requested: C{int}
2738 @param requested: the amount of memory in MiB to check for
2739 @type hypervisor_name: C{str}
2740 @param hypervisor_name: the hypervisor to ask for memory stats
2741 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2742 we cannot check the node
2745 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2746 nodeinfo[node].Raise()
2747 free_mem = nodeinfo[node].data.get('memory_free')
2748 if not isinstance(free_mem, int):
2749 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2750 " was '%s'" % (node, free_mem))
2751 if requested > free_mem:
2752 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2753 " needed %s MiB, available %s MiB" %
2754 (node, reason, requested, free_mem))
2757 class LUStartupInstance(LogicalUnit):
2758 """Starts an instance.
2761 HPATH = "instance-start"
2762 HTYPE = constants.HTYPE_INSTANCE
2763 _OP_REQP = ["instance_name", "force"]
2766 def ExpandNames(self):
2767 self._ExpandAndLockInstance()
2769 def BuildHooksEnv(self):
2772 This runs on master, primary and secondary nodes of the instance.
2776 "FORCE": self.op.force,
2778 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2779 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2782 def CheckPrereq(self):
2783 """Check prerequisites.
2785 This checks that the instance is in the cluster.
2788 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2789 assert self.instance is not None, \
2790 "Cannot retrieve locked instance %s" % self.op.instance_name
2793 self.beparams = getattr(self.op, "beparams", {})
2795 if not isinstance(self.beparams, dict):
2796 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2797 " dict" % (type(self.beparams), ))
2798 # fill the beparams dict
2799 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2800 self.op.beparams = self.beparams
2803 self.hvparams = getattr(self.op, "hvparams", {})
2805 if not isinstance(self.hvparams, dict):
2806 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2807 " dict" % (type(self.hvparams), ))
2809 # check hypervisor parameter syntax (locally)
2810 cluster = self.cfg.GetClusterInfo()
2811 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2812 filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2814 filled_hvp.update(self.hvparams)
2815 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2816 hv_type.CheckParameterSyntax(filled_hvp)
2817 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2818 self.op.hvparams = self.hvparams
2820 _CheckNodeOnline(self, instance.primary_node)
2822 bep = self.cfg.GetClusterInfo().FillBE(instance)
2823 # check bridges existance
2824 _CheckInstanceBridgesExist(self, instance)
2826 remote_info = self.rpc.call_instance_info(instance.primary_node,
2828 instance.hypervisor)
2830 if not remote_info.data:
2831 _CheckNodeFreeMemory(self, instance.primary_node,
2832 "starting instance %s" % instance.name,
2833 bep[constants.BE_MEMORY], instance.hypervisor)
2835 def Exec(self, feedback_fn):
2836 """Start the instance.
2839 instance = self.instance
2840 force = self.op.force
2842 self.cfg.MarkInstanceUp(instance.name)
2844 node_current = instance.primary_node
2846 _StartInstanceDisks(self, instance, force)
2848 result = self.rpc.call_instance_start(node_current, instance,
2849 self.hvparams, self.beparams)
2850 msg = result.RemoteFailMsg()
2852 _ShutdownInstanceDisks(self, instance)
2853 raise errors.OpExecError("Could not start instance: %s" % msg)
2856 class LURebootInstance(LogicalUnit):
2857 """Reboot an instance.
2860 HPATH = "instance-reboot"
2861 HTYPE = constants.HTYPE_INSTANCE
2862 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2865 def ExpandNames(self):
2866 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2867 constants.INSTANCE_REBOOT_HARD,
2868 constants.INSTANCE_REBOOT_FULL]:
2869 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2870 (constants.INSTANCE_REBOOT_SOFT,
2871 constants.INSTANCE_REBOOT_HARD,
2872 constants.INSTANCE_REBOOT_FULL))
2873 self._ExpandAndLockInstance()
2875 def BuildHooksEnv(self):
2878 This runs on master, primary and secondary nodes of the instance.
2882 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2883 "REBOOT_TYPE": self.op.reboot_type,
2885 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2886 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2889 def CheckPrereq(self):
2890 """Check prerequisites.
2892 This checks that the instance is in the cluster.
2895 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2896 assert self.instance is not None, \
2897 "Cannot retrieve locked instance %s" % self.op.instance_name
2899 _CheckNodeOnline(self, instance.primary_node)
2901 # check bridges existance
2902 _CheckInstanceBridgesExist(self, instance)
2904 def Exec(self, feedback_fn):
2905 """Reboot the instance.
2908 instance = self.instance
2909 ignore_secondaries = self.op.ignore_secondaries
2910 reboot_type = self.op.reboot_type
2912 node_current = instance.primary_node
2914 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2915 constants.INSTANCE_REBOOT_HARD]:
2916 for disk in instance.disks:
2917 self.cfg.SetDiskID(disk, node_current)
2918 result = self.rpc.call_instance_reboot(node_current, instance,
2920 msg = result.RemoteFailMsg()
2922 raise errors.OpExecError("Could not reboot instance: %s" % msg)
2924 result = self.rpc.call_instance_shutdown(node_current, instance)
2925 msg = result.RemoteFailMsg()
2927 raise errors.OpExecError("Could not shutdown instance for"
2928 " full reboot: %s" % msg)
2929 _ShutdownInstanceDisks(self, instance)
2930 _StartInstanceDisks(self, instance, ignore_secondaries)
2931 result = self.rpc.call_instance_start(node_current, instance, None, None)
2932 msg = result.RemoteFailMsg()
2934 _ShutdownInstanceDisks(self, instance)
2935 raise errors.OpExecError("Could not start instance for"
2936 " full reboot: %s" % msg)
2938 self.cfg.MarkInstanceUp(instance.name)
2941 class LUShutdownInstance(LogicalUnit):
2942 """Shutdown an instance.
2945 HPATH = "instance-stop"
2946 HTYPE = constants.HTYPE_INSTANCE
2947 _OP_REQP = ["instance_name"]
2950 def ExpandNames(self):
2951 self._ExpandAndLockInstance()
2953 def BuildHooksEnv(self):
2956 This runs on master, primary and secondary nodes of the instance.
2959 env = _BuildInstanceHookEnvByObject(self, self.instance)
2960 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2963 def CheckPrereq(self):
2964 """Check prerequisites.
2966 This checks that the instance is in the cluster.
2969 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2970 assert self.instance is not None, \
2971 "Cannot retrieve locked instance %s" % self.op.instance_name
2972 _CheckNodeOnline(self, self.instance.primary_node)
2974 def Exec(self, feedback_fn):
2975 """Shutdown the instance.
2978 instance = self.instance
2979 node_current = instance.primary_node
2980 self.cfg.MarkInstanceDown(instance.name)
2981 result = self.rpc.call_instance_shutdown(node_current, instance)
2982 msg = result.RemoteFailMsg()
2984 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2986 _ShutdownInstanceDisks(self, instance)
2989 class LUReinstallInstance(LogicalUnit):
2990 """Reinstall an instance.
2993 HPATH = "instance-reinstall"
2994 HTYPE = constants.HTYPE_INSTANCE
2995 _OP_REQP = ["instance_name"]
2998 def ExpandNames(self):
2999 self._ExpandAndLockInstance()
3001 def BuildHooksEnv(self):
3004 This runs on master, primary and secondary nodes of the instance.
3007 env = _BuildInstanceHookEnvByObject(self, self.instance)
3008 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3011 def CheckPrereq(self):
3012 """Check prerequisites.
3014 This checks that the instance is in the cluster and is not running.
3017 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3018 assert instance is not None, \
3019 "Cannot retrieve locked instance %s" % self.op.instance_name
3020 _CheckNodeOnline(self, instance.primary_node)
3022 if instance.disk_template == constants.DT_DISKLESS:
3023 raise errors.OpPrereqError("Instance '%s' has no disks" %
3024 self.op.instance_name)
3025 if instance.admin_up:
3026 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3027 self.op.instance_name)
3028 remote_info = self.rpc.call_instance_info(instance.primary_node,
3030 instance.hypervisor)
3032 if remote_info.data:
3033 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3034 (self.op.instance_name,
3035 instance.primary_node))
3037 self.op.os_type = getattr(self.op, "os_type", None)
3038 if self.op.os_type is not None:
3040 pnode = self.cfg.GetNodeInfo(
3041 self.cfg.ExpandNodeName(instance.primary_node))
3043 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3045 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3047 if not isinstance(result.data, objects.OS):
3048 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3049 " primary node" % self.op.os_type)
3051 self.instance = instance
3053 def Exec(self, feedback_fn):
3054 """Reinstall the instance.
3057 inst = self.instance
3059 if self.op.os_type is not None:
3060 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3061 inst.os = self.op.os_type
3062 self.cfg.Update(inst)
3064 _StartInstanceDisks(self, inst, None)
3066 feedback_fn("Running the instance OS create scripts...")
3067 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3068 msg = result.RemoteFailMsg()
3070 raise errors.OpExecError("Could not install OS for instance %s"
3072 (inst.name, inst.primary_node, msg))
3074 _ShutdownInstanceDisks(self, inst)
3077 class LURenameInstance(LogicalUnit):
3078 """Rename an instance.
3081 HPATH = "instance-rename"
3082 HTYPE = constants.HTYPE_INSTANCE
3083 _OP_REQP = ["instance_name", "new_name"]
3085 def BuildHooksEnv(self):
3088 This runs on master, primary and secondary nodes of the instance.
3091 env = _BuildInstanceHookEnvByObject(self, self.instance)
3092 env["INSTANCE_NEW_NAME"] = self.op.new_name
3093 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3096 def CheckPrereq(self):
3097 """Check prerequisites.
3099 This checks that the instance is in the cluster and is not running.
3102 instance = self.cfg.GetInstanceInfo(
3103 self.cfg.ExpandInstanceName(self.op.instance_name))
3104 if instance is None:
3105 raise errors.OpPrereqError("Instance '%s' not known" %
3106 self.op.instance_name)
3107 _CheckNodeOnline(self, instance.primary_node)
3109 if instance.admin_up:
3110 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3111 self.op.instance_name)
3112 remote_info = self.rpc.call_instance_info(instance.primary_node,
3114 instance.hypervisor)
3116 if remote_info.data:
3117 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3118 (self.op.instance_name,
3119 instance.primary_node))
3120 self.instance = instance
3122 # new name verification
3123 name_info = utils.HostInfo(self.op.new_name)
3125 self.op.new_name = new_name = name_info.name
3126 instance_list = self.cfg.GetInstanceList()
3127 if new_name in instance_list:
3128 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3131 if not getattr(self.op, "ignore_ip", False):
3132 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3133 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3134 (name_info.ip, new_name))
3137 def Exec(self, feedback_fn):
3138 """Reinstall the instance.
3141 inst = self.instance
3142 old_name = inst.name
3144 if inst.disk_template == constants.DT_FILE:
3145 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3147 self.cfg.RenameInstance(inst.name, self.op.new_name)
3148 # Change the instance lock. This is definitely safe while we hold the BGL
3149 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3150 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3152 # re-read the instance from the configuration after rename
3153 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3155 if inst.disk_template == constants.DT_FILE:
3156 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3157 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3158 old_file_storage_dir,
3159 new_file_storage_dir)
3162 raise errors.OpExecError("Could not connect to node '%s' to rename"
3163 " directory '%s' to '%s' (but the instance"
3164 " has been renamed in Ganeti)" % (
3165 inst.primary_node, old_file_storage_dir,
3166 new_file_storage_dir))
3168 if not result.data[0]:
3169 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3170 " (but the instance has been renamed in"
3171 " Ganeti)" % (old_file_storage_dir,
3172 new_file_storage_dir))
3174 _StartInstanceDisks(self, inst, None)
3176 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3178 msg = result.RemoteFailMsg()
3180 msg = ("Could not run OS rename script for instance %s on node %s"
3181 " (but the instance has been renamed in Ganeti): %s" %
3182 (inst.name, inst.primary_node, msg))
3183 self.proc.LogWarning(msg)
3185 _ShutdownInstanceDisks(self, inst)
3188 class LURemoveInstance(LogicalUnit):
3189 """Remove an instance.
3192 HPATH = "instance-remove"
3193 HTYPE = constants.HTYPE_INSTANCE
3194 _OP_REQP = ["instance_name", "ignore_failures"]
3197 def ExpandNames(self):
3198 self._ExpandAndLockInstance()
3199 self.needed_locks[locking.LEVEL_NODE] = []
3200 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3202 def DeclareLocks(self, level):
3203 if level == locking.LEVEL_NODE:
3204 self._LockInstancesNodes()
3206 def BuildHooksEnv(self):
3209 This runs on master, primary and secondary nodes of the instance.
3212 env = _BuildInstanceHookEnvByObject(self, self.instance)
3213 nl = [self.cfg.GetMasterNode()]
3216 def CheckPrereq(self):
3217 """Check prerequisites.
3219 This checks that the instance is in the cluster.
3222 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3223 assert self.instance is not None, \
3224 "Cannot retrieve locked instance %s" % self.op.instance_name
3226 def Exec(self, feedback_fn):
3227 """Remove the instance.
3230 instance = self.instance
3231 logging.info("Shutting down instance %s on node %s",
3232 instance.name, instance.primary_node)
3234 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3235 msg = result.RemoteFailMsg()
3237 if self.op.ignore_failures:
3238 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3240 raise errors.OpExecError("Could not shutdown instance %s on"
3242 (instance.name, instance.primary_node, msg))
3244 logging.info("Removing block devices for instance %s", instance.name)
3246 if not _RemoveDisks(self, instance):
3247 if self.op.ignore_failures:
3248 feedback_fn("Warning: can't remove instance's disks")
3250 raise errors.OpExecError("Can't remove instance's disks")
3252 logging.info("Removing instance %s out of cluster config", instance.name)
3254 self.cfg.RemoveInstance(instance.name)
3255 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3258 class LUQueryInstances(NoHooksLU):
3259 """Logical unit for querying instances.
3262 _OP_REQP = ["output_fields", "names", "use_locking"]
3264 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3266 "disk_template", "ip", "mac", "bridge",
3267 "sda_size", "sdb_size", "vcpus", "tags",
3268 "network_port", "beparams",
3269 r"(disk)\.(size)/([0-9]+)",
3270 r"(disk)\.(sizes)", "disk_usage",
3271 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3272 r"(nic)\.(macs|ips|bridges)",
3273 r"(disk|nic)\.(count)",
3274 "serial_no", "hypervisor", "hvparams",] +
3276 for name in constants.HVS_PARAMETERS] +
3278 for name in constants.BES_PARAMETERS])
3279 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3282 def ExpandNames(self):
3283 _CheckOutputFields(static=self._FIELDS_STATIC,
3284 dynamic=self._FIELDS_DYNAMIC,
3285 selected=self.op.output_fields)
3287 self.needed_locks = {}
3288 self.share_locks[locking.LEVEL_INSTANCE] = 1
3289 self.share_locks[locking.LEVEL_NODE] = 1
3292 self.wanted = _GetWantedInstances(self, self.op.names)
3294 self.wanted = locking.ALL_SET
3296 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3297 self.do_locking = self.do_node_query and self.op.use_locking
3299 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3300 self.needed_locks[locking.LEVEL_NODE] = []
3301 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3303 def DeclareLocks(self, level):
3304 if level == locking.LEVEL_NODE and self.do_locking:
3305 self._LockInstancesNodes()
3307 def CheckPrereq(self):
3308 """Check prerequisites.
3313 def Exec(self, feedback_fn):
3314 """Computes the list of nodes and their attributes.
3317 all_info = self.cfg.GetAllInstancesInfo()
3318 if self.wanted == locking.ALL_SET:
3319 # caller didn't specify instance names, so ordering is not important
3321 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3323 instance_names = all_info.keys()
3324 instance_names = utils.NiceSort(instance_names)
3326 # caller did specify names, so we must keep the ordering
3328 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3330 tgt_set = all_info.keys()
3331 missing = set(self.wanted).difference(tgt_set)
3333 raise errors.OpExecError("Some instances were removed before"
3334 " retrieving their data: %s" % missing)
3335 instance_names = self.wanted
3337 instance_list = [all_info[iname] for iname in instance_names]
3339 # begin data gathering
3341 nodes = frozenset([inst.primary_node for inst in instance_list])
3342 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3346 if self.do_node_query:
3348 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3350 result = node_data[name]
3352 # offline nodes will be in both lists
3353 off_nodes.append(name)
3355 bad_nodes.append(name)
3358 live_data.update(result.data)
3359 # else no instance is alive
3361 live_data = dict([(name, {}) for name in instance_names])
3363 # end data gathering
3368 for instance in instance_list:
3370 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3371 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3372 for field in self.op.output_fields:
3373 st_match = self._FIELDS_STATIC.Matches(field)
3378 elif field == "pnode":
3379 val = instance.primary_node
3380 elif field == "snodes":
3381 val = list(instance.secondary_nodes)
3382 elif field == "admin_state":
3383 val = instance.admin_up
3384 elif field == "oper_state":
3385 if instance.primary_node in bad_nodes:
3388 val = bool(live_data.get(instance.name))
3389 elif field == "status":
3390 if instance.primary_node in off_nodes:
3391 val = "ERROR_nodeoffline"
3392 elif instance.primary_node in bad_nodes:
3393 val = "ERROR_nodedown"
3395 running = bool(live_data.get(instance.name))
3397 if instance.admin_up:
3402 if instance.admin_up:
3406 elif field == "oper_ram":
3407 if instance.primary_node in bad_nodes:
3409 elif instance.name in live_data:
3410 val = live_data[instance.name].get("memory", "?")
3413 elif field == "disk_template":
3414 val = instance.disk_template
3416 val = instance.nics[0].ip
3417 elif field == "bridge":
3418 val = instance.nics[0].bridge
3419 elif field == "mac":
3420 val = instance.nics[0].mac
3421 elif field == "sda_size" or field == "sdb_size":
3422 idx = ord(field[2]) - ord('a')
3424 val = instance.FindDisk(idx).size
3425 except errors.OpPrereqError:
3427 elif field == "disk_usage": # total disk usage per node
3428 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3429 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3430 elif field == "tags":
3431 val = list(instance.GetTags())
3432 elif field == "serial_no":
3433 val = instance.serial_no
3434 elif field == "network_port":
3435 val = instance.network_port
3436 elif field == "hypervisor":
3437 val = instance.hypervisor
3438 elif field == "hvparams":
3440 elif (field.startswith(HVPREFIX) and
3441 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3442 val = i_hv.get(field[len(HVPREFIX):], None)
3443 elif field == "beparams":
3445 elif (field.startswith(BEPREFIX) and
3446 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3447 val = i_be.get(field[len(BEPREFIX):], None)
3448 elif st_match and st_match.groups():
3449 # matches a variable list
3450 st_groups = st_match.groups()
3451 if st_groups and st_groups[0] == "disk":
3452 if st_groups[1] == "count":
3453 val = len(instance.disks)
3454 elif st_groups[1] == "sizes":
3455 val = [disk.size for disk in instance.disks]
3456 elif st_groups[1] == "size":
3458 val = instance.FindDisk(st_groups[2]).size
3459 except errors.OpPrereqError:
3462 assert False, "Unhandled disk parameter"
3463 elif st_groups[0] == "nic":
3464 if st_groups[1] == "count":
3465 val = len(instance.nics)
3466 elif st_groups[1] == "macs":
3467 val = [nic.mac for nic in instance.nics]
3468 elif st_groups[1] == "ips":
3469 val = [nic.ip for nic in instance.nics]
3470 elif st_groups[1] == "bridges":
3471 val = [nic.bridge for nic in instance.nics]
3474 nic_idx = int(st_groups[2])
3475 if nic_idx >= len(instance.nics):
3478 if st_groups[1] == "mac":
3479 val = instance.nics[nic_idx].mac
3480 elif st_groups[1] == "ip":
3481 val = instance.nics[nic_idx].ip
3482 elif st_groups[1] == "bridge":
3483 val = instance.nics[nic_idx].bridge
3485 assert False, "Unhandled NIC parameter"
3487 assert False, "Unhandled variable parameter"
3489 raise errors.ParameterError(field)
3496 class LUFailoverInstance(LogicalUnit):
3497 """Failover an instance.
3500 HPATH = "instance-failover"
3501 HTYPE = constants.HTYPE_INSTANCE
3502 _OP_REQP = ["instance_name", "ignore_consistency"]
3505 def ExpandNames(self):
3506 self._ExpandAndLockInstance()
3507 self.needed_locks[locking.LEVEL_NODE] = []
3508 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3510 def DeclareLocks(self, level):
3511 if level == locking.LEVEL_NODE:
3512 self._LockInstancesNodes()
3514 def BuildHooksEnv(self):
3517 This runs on master, primary and secondary nodes of the instance.
3521 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3523 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3524 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3527 def CheckPrereq(self):
3528 """Check prerequisites.
3530 This checks that the instance is in the cluster.
3533 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3534 assert self.instance is not None, \
3535 "Cannot retrieve locked instance %s" % self.op.instance_name
3537 bep = self.cfg.GetClusterInfo().FillBE(instance)
3538 if instance.disk_template not in constants.DTS_NET_MIRROR:
3539 raise errors.OpPrereqError("Instance's disk layout is not"
3540 " network mirrored, cannot failover.")
3542 secondary_nodes = instance.secondary_nodes
3543 if not secondary_nodes:
3544 raise errors.ProgrammerError("no secondary node but using "
3545 "a mirrored disk template")
3547 target_node = secondary_nodes[0]
3548 _CheckNodeOnline(self, target_node)
3549 _CheckNodeNotDrained(self, target_node)
3551 if instance.admin_up:
3552 # check memory requirements on the secondary node
3553 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3554 instance.name, bep[constants.BE_MEMORY],
3555 instance.hypervisor)
3557 self.LogInfo("Not checking memory on the secondary node as"
3558 " instance will not be started")
3560 # check bridge existance
3561 brlist = [nic.bridge for nic in instance.nics]
3562 result = self.rpc.call_bridges_exist(target_node, brlist)
3565 raise errors.OpPrereqError("One or more target bridges %s does not"
3566 " exist on destination node '%s'" %
3567 (brlist, target_node))
3569 def Exec(self, feedback_fn):
3570 """Failover an instance.
3572 The failover is done by shutting it down on its present node and
3573 starting it on the secondary.
3576 instance = self.instance
3578 source_node = instance.primary_node
3579 target_node = instance.secondary_nodes[0]
3581 feedback_fn("* checking disk consistency between source and target")
3582 for dev in instance.disks:
3583 # for drbd, these are drbd over lvm
3584 if not _CheckDiskConsistency(self, dev, target_node, False):
3585 if instance.admin_up and not self.op.ignore_consistency:
3586 raise errors.OpExecError("Disk %s is degraded on target node,"
3587 " aborting failover." % dev.iv_name)
3589 feedback_fn("* shutting down instance on source node")
3590 logging.info("Shutting down instance %s on node %s",
3591 instance.name, source_node)
3593 result = self.rpc.call_instance_shutdown(source_node, instance)
3594 msg = result.RemoteFailMsg()
3596 if self.op.ignore_consistency:
3597 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3598 " Proceeding anyway. Please make sure node"
3599 " %s is down. Error details: %s",
3600 instance.name, source_node, source_node, msg)
3602 raise errors.OpExecError("Could not shutdown instance %s on"
3604 (instance.name, source_node, msg))
3606 feedback_fn("* deactivating the instance's disks on source node")
3607 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3608 raise errors.OpExecError("Can't shut down the instance's disks.")
3610 instance.primary_node = target_node
3611 # distribute new instance config to the other nodes
3612 self.cfg.Update(instance)
3614 # Only start the instance if it's marked as up
3615 if instance.admin_up:
3616 feedback_fn("* activating the instance's disks on target node")
3617 logging.info("Starting instance %s on node %s",
3618 instance.name, target_node)
3620 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3621 ignore_secondaries=True)
3623 _ShutdownInstanceDisks(self, instance)
3624 raise errors.OpExecError("Can't activate the instance's disks")
3626 feedback_fn("* starting the instance on the target node")
3627 result = self.rpc.call_instance_start(target_node, instance, None, None)
3628 msg = result.RemoteFailMsg()
3630 _ShutdownInstanceDisks(self, instance)
3631 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3632 (instance.name, target_node, msg))
3635 class LUMigrateInstance(LogicalUnit):
3636 """Migrate an instance.
3638 This is migration without shutting down, compared to the failover,
3639 which is done with shutdown.
3642 HPATH = "instance-migrate"
3643 HTYPE = constants.HTYPE_INSTANCE
3644 _OP_REQP = ["instance_name", "live", "cleanup"]
3648 def ExpandNames(self):
3649 self._ExpandAndLockInstance()
3650 self.needed_locks[locking.LEVEL_NODE] = []
3651 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3653 def DeclareLocks(self, level):
3654 if level == locking.LEVEL_NODE:
3655 self._LockInstancesNodes()
3657 def BuildHooksEnv(self):
3660 This runs on master, primary and secondary nodes of the instance.
3663 env = _BuildInstanceHookEnvByObject(self, self.instance)
3664 env["MIGRATE_LIVE"] = self.op.live
3665 env["MIGRATE_CLEANUP"] = self.op.cleanup
3666 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3669 def CheckPrereq(self):
3670 """Check prerequisites.
3672 This checks that the instance is in the cluster.
3675 instance = self.cfg.GetInstanceInfo(
3676 self.cfg.ExpandInstanceName(self.op.instance_name))
3677 if instance is None:
3678 raise errors.OpPrereqError("Instance '%s' not known" %
3679 self.op.instance_name)
3681 if instance.disk_template != constants.DT_DRBD8:
3682 raise errors.OpPrereqError("Instance's disk layout is not"
3683 " drbd8, cannot migrate.")
3685 secondary_nodes = instance.secondary_nodes
3686 if not secondary_nodes:
3687 raise errors.ConfigurationError("No secondary node but using"
3688 " drbd8 disk template")
3690 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3692 target_node = secondary_nodes[0]
3693 # check memory requirements on the secondary node
3694 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3695 instance.name, i_be[constants.BE_MEMORY],
3696 instance.hypervisor)
3698 # check bridge existance
3699 brlist = [nic.bridge for nic in instance.nics]
3700 result = self.rpc.call_bridges_exist(target_node, brlist)
3701 if result.failed or not result.data:
3702 raise errors.OpPrereqError("One or more target bridges %s does not"
3703 " exist on destination node '%s'" %
3704 (brlist, target_node))
3706 if not self.op.cleanup:
3707 _CheckNodeNotDrained(self, target_node)
3708 result = self.rpc.call_instance_migratable(instance.primary_node,
3710 msg = result.RemoteFailMsg()
3712 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3715 self.instance = instance
3717 def _WaitUntilSync(self):
3718 """Poll with custom rpc for disk sync.
3720 This uses our own step-based rpc call.
3723 self.feedback_fn("* wait until resync is done")
3727 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3729 self.instance.disks)
3731 for node, nres in result.items():
3732 msg = nres.RemoteFailMsg()
3734 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3736 node_done, node_percent = nres.payload
3737 all_done = all_done and node_done
3738 if node_percent is not None:
3739 min_percent = min(min_percent, node_percent)
3741 if min_percent < 100:
3742 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3745 def _EnsureSecondary(self, node):
3746 """Demote a node to secondary.
3749 self.feedback_fn("* switching node %s to secondary mode" % node)
3751 for dev in self.instance.disks:
3752 self.cfg.SetDiskID(dev, node)
3754 result = self.rpc.call_blockdev_close(node, self.instance.name,
3755 self.instance.disks)
3756 msg = result.RemoteFailMsg()
3758 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3759 " error %s" % (node, msg))
3761 def _GoStandalone(self):
3762 """Disconnect from the network.
3765 self.feedback_fn("* changing into standalone mode")
3766 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3767 self.instance.disks)
3768 for node, nres in result.items():
3769 msg = nres.RemoteFailMsg()
3771 raise errors.OpExecError("Cannot disconnect disks node %s,"
3772 " error %s" % (node, msg))
3774 def _GoReconnect(self, multimaster):
3775 """Reconnect to the network.
3781 msg = "single-master"
3782 self.feedback_fn("* changing disks into %s mode" % msg)
3783 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3784 self.instance.disks,
3785 self.instance.name, multimaster)
3786 for node, nres in result.items():
3787 msg = nres.RemoteFailMsg()
3789 raise errors.OpExecError("Cannot change disks config on node %s,"
3790 " error: %s" % (node, msg))
3792 def _ExecCleanup(self):
3793 """Try to cleanup after a failed migration.
3795 The cleanup is done by:
3796 - check that the instance is running only on one node
3797 (and update the config if needed)
3798 - change disks on its secondary node to secondary
3799 - wait until disks are fully synchronized
3800 - disconnect from the network
3801 - change disks into single-master mode
3802 - wait again until disks are fully synchronized
3805 instance = self.instance
3806 target_node = self.target_node
3807 source_node = self.source_node
3809 # check running on only one node
3810 self.feedback_fn("* checking where the instance actually runs"
3811 " (if this hangs, the hypervisor might be in"
3813 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3814 for node, result in ins_l.items():
3816 if not isinstance(result.data, list):
3817 raise errors.OpExecError("Can't contact node '%s'" % node)
3819 runningon_source = instance.name in ins_l[source_node].data
3820 runningon_target = instance.name in ins_l[target_node].data
3822 if runningon_source and runningon_target:
3823 raise errors.OpExecError("Instance seems to be running on two nodes,"
3824 " or the hypervisor is confused. You will have"
3825 " to ensure manually that it runs only on one"
3826 " and restart this operation.")
3828 if not (runningon_source or runningon_target):
3829 raise errors.OpExecError("Instance does not seem to be running at all."
3830 " In this case, it's safer to repair by"
3831 " running 'gnt-instance stop' to ensure disk"
3832 " shutdown, and then restarting it.")
3834 if runningon_target:
3835 # the migration has actually succeeded, we need to update the config
3836 self.feedback_fn("* instance running on secondary node (%s),"
3837 " updating config" % target_node)
3838 instance.primary_node = target_node
3839 self.cfg.Update(instance)
3840 demoted_node = source_node
3842 self.feedback_fn("* instance confirmed to be running on its"
3843 " primary node (%s)" % source_node)
3844 demoted_node = target_node
3846 self._EnsureSecondary(demoted_node)
3848 self._WaitUntilSync()
3849 except errors.OpExecError:
3850 # we ignore here errors, since if the device is standalone, it
3851 # won't be able to sync
3853 self._GoStandalone()
3854 self._GoReconnect(False)
3855 self._WaitUntilSync()
3857 self.feedback_fn("* done")
3859 def _RevertDiskStatus(self):
3860 """Try to revert the disk status after a failed migration.
3863 target_node = self.target_node
3865 self._EnsureSecondary(target_node)
3866 self._GoStandalone()
3867 self._GoReconnect(False)
3868 self._WaitUntilSync()
3869 except errors.OpExecError, err:
3870 self.LogWarning("Migration failed and I can't reconnect the"
3871 " drives: error '%s'\n"
3872 "Please look and recover the instance status" %
3875 def _AbortMigration(self):
3876 """Call the hypervisor code to abort a started migration.
3879 instance = self.instance
3880 target_node = self.target_node
3881 migration_info = self.migration_info
3883 abort_result = self.rpc.call_finalize_migration(target_node,
3887 abort_msg = abort_result.RemoteFailMsg()
3889 logging.error("Aborting migration failed on target node %s: %s" %
3890 (target_node, abort_msg))
3891 # Don't raise an exception here, as we stil have to try to revert the
3892 # disk status, even if this step failed.
3894 def _ExecMigration(self):
3895 """Migrate an instance.
3897 The migrate is done by:
3898 - change the disks into dual-master mode
3899 - wait until disks are fully synchronized again
3900 - migrate the instance
3901 - change disks on the new secondary node (the old primary) to secondary
3902 - wait until disks are fully synchronized
3903 - change disks into single-master mode
3906 instance = self.instance
3907 target_node = self.target_node
3908 source_node = self.source_node
3910 self.feedback_fn("* checking disk consistency between source and target")
3911 for dev in instance.disks:
3912 if not _CheckDiskConsistency(self, dev, target_node, False):
3913 raise errors.OpExecError("Disk %s is degraded or not fully"
3914 " synchronized on target node,"
3915 " aborting migrate." % dev.iv_name)
3917 # First get the migration information from the remote node
3918 result = self.rpc.call_migration_info(source_node, instance)
3919 msg = result.RemoteFailMsg()
3921 log_err = ("Failed fetching source migration information from %s: %s" %
3923 logging.error(log_err)
3924 raise errors.OpExecError(log_err)
3926 self.migration_info = migration_info = result.payload
3928 # Then switch the disks to master/master mode
3929 self._EnsureSecondary(target_node)
3930 self._GoStandalone()
3931 self._GoReconnect(True)
3932 self._WaitUntilSync()
3934 self.feedback_fn("* preparing %s to accept the instance" % target_node)
3935 result = self.rpc.call_accept_instance(target_node,
3938 self.nodes_ip[target_node])
3940 msg = result.RemoteFailMsg()
3942 logging.error("Instance pre-migration failed, trying to revert"
3943 " disk status: %s", msg)
3944 self._AbortMigration()
3945 self._RevertDiskStatus()
3946 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3947 (instance.name, msg))
3949 self.feedback_fn("* migrating instance to %s" % target_node)
3951 result = self.rpc.call_instance_migrate(source_node, instance,
3952 self.nodes_ip[target_node],
3954 msg = result.RemoteFailMsg()
3956 logging.error("Instance migration failed, trying to revert"
3957 " disk status: %s", msg)
3958 self._AbortMigration()
3959 self._RevertDiskStatus()
3960 raise errors.OpExecError("Could not migrate instance %s: %s" %
3961 (instance.name, msg))
3964 instance.primary_node = target_node
3965 # distribute new instance config to the other nodes
3966 self.cfg.Update(instance)
3968 result = self.rpc.call_finalize_migration(target_node,
3972 msg = result.RemoteFailMsg()
3974 logging.error("Instance migration succeeded, but finalization failed:"
3976 raise errors.OpExecError("Could not finalize instance migration: %s" %
3979 self._EnsureSecondary(source_node)
3980 self._WaitUntilSync()
3981 self._GoStandalone()
3982 self._GoReconnect(False)
3983 self._WaitUntilSync()
3985 self.feedback_fn("* done")
3987 def Exec(self, feedback_fn):
3988 """Perform the migration.
3991 self.feedback_fn = feedback_fn
3993 self.source_node = self.instance.primary_node
3994 self.target_node = self.instance.secondary_nodes[0]
3995 self.all_nodes = [self.source_node, self.target_node]
3997 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3998 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4001 return self._ExecCleanup()
4003 return self._ExecMigration()
4006 def _CreateBlockDev(lu, node, instance, device, force_create,
4008 """Create a tree of block devices on a given node.
4010 If this device type has to be created on secondaries, create it and
4013 If not, just recurse to children keeping the same 'force' value.
4015 @param lu: the lu on whose behalf we execute
4016 @param node: the node on which to create the device
4017 @type instance: L{objects.Instance}
4018 @param instance: the instance which owns the device
4019 @type device: L{objects.Disk}
4020 @param device: the device to create
4021 @type force_create: boolean
4022 @param force_create: whether to force creation of this device; this
4023 will be change to True whenever we find a device which has
4024 CreateOnSecondary() attribute
4025 @param info: the extra 'metadata' we should attach to the device
4026 (this will be represented as a LVM tag)
4027 @type force_open: boolean
4028 @param force_open: this parameter will be passes to the
4029 L{backend.BlockdevCreate} function where it specifies
4030 whether we run on primary or not, and it affects both
4031 the child assembly and the device own Open() execution
4034 if device.CreateOnSecondary():
4038 for child in device.children:
4039 _CreateBlockDev(lu, node, instance, child, force_create,
4042 if not force_create:
4045 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4048 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4049 """Create a single block device on a given node.
4051 This will not recurse over children of the device, so they must be
4054 @param lu: the lu on whose behalf we execute
4055 @param node: the node on which to create the device
4056 @type instance: L{objects.Instance}
4057 @param instance: the instance which owns the device
4058 @type device: L{objects.Disk}
4059 @param device: the device to create
4060 @param info: the extra 'metadata' we should attach to the device
4061 (this will be represented as a LVM tag)
4062 @type force_open: boolean
4063 @param force_open: this parameter will be passes to the
4064 L{backend.BlockdevCreate} function where it specifies
4065 whether we run on primary or not, and it affects both
4066 the child assembly and the device own Open() execution
4069 lu.cfg.SetDiskID(device, node)
4070 result = lu.rpc.call_blockdev_create(node, device, device.size,
4071 instance.name, force_open, info)
4072 msg = result.RemoteFailMsg()
4074 raise errors.OpExecError("Can't create block device %s on"
4075 " node %s for instance %s: %s" %
4076 (device, node, instance.name, msg))
4077 if device.physical_id is None:
4078 device.physical_id = result.payload
4081 def _GenerateUniqueNames(lu, exts):
4082 """Generate a suitable LV name.
4084 This will generate a logical volume name for the given instance.
4089 new_id = lu.cfg.GenerateUniqueID()
4090 results.append("%s%s" % (new_id, val))
4094 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4096 """Generate a drbd8 device complete with its children.
4099 port = lu.cfg.AllocatePort()
4100 vgname = lu.cfg.GetVGName()
4101 shared_secret = lu.cfg.GenerateDRBDSecret()
4102 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4103 logical_id=(vgname, names[0]))
4104 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4105 logical_id=(vgname, names[1]))
4106 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4107 logical_id=(primary, secondary, port,
4110 children=[dev_data, dev_meta],
4115 def _GenerateDiskTemplate(lu, template_name,
4116 instance_name, primary_node,
4117 secondary_nodes, disk_info,
4118 file_storage_dir, file_driver,
4120 """Generate the entire disk layout for a given template type.
4123 #TODO: compute space requirements
4125 vgname = lu.cfg.GetVGName()
4126 disk_count = len(disk_info)
4128 if template_name == constants.DT_DISKLESS:
4130 elif template_name == constants.DT_PLAIN:
4131 if len(secondary_nodes) != 0:
4132 raise errors.ProgrammerError("Wrong template configuration")
4134 names = _GenerateUniqueNames(lu, [".disk%d" % i
4135 for i in range(disk_count)])
4136 for idx, disk in enumerate(disk_info):
4137 disk_index = idx + base_index
4138 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4139 logical_id=(vgname, names[idx]),
4140 iv_name="disk/%d" % disk_index,
4142 disks.append(disk_dev)
4143 elif template_name == constants.DT_DRBD8:
4144 if len(secondary_nodes) != 1:
4145 raise errors.ProgrammerError("Wrong template configuration")
4146 remote_node = secondary_nodes[0]
4147 minors = lu.cfg.AllocateDRBDMinor(
4148 [primary_node, remote_node] * len(disk_info), instance_name)
4151 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4152 for i in range(disk_count)]):
4153 names.append(lv_prefix + "_data")
4154 names.append(lv_prefix + "_meta")
4155 for idx, disk in enumerate(disk_info):
4156 disk_index = idx + base_index
4157 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4158 disk["size"], names[idx*2:idx*2+2],
4159 "disk/%d" % disk_index,
4160 minors[idx*2], minors[idx*2+1])
4161 disk_dev.mode = disk["mode"]
4162 disks.append(disk_dev)
4163 elif template_name == constants.DT_FILE:
4164 if len(secondary_nodes) != 0:
4165 raise errors.ProgrammerError("Wrong template configuration")
4167 for idx, disk in enumerate(disk_info):
4168 disk_index = idx + base_index
4169 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4170 iv_name="disk/%d" % disk_index,
4171 logical_id=(file_driver,
4172 "%s/disk%d" % (file_storage_dir,
4175 disks.append(disk_dev)
4177 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4181 def _GetInstanceInfoText(instance):
4182 """Compute that text that should be added to the disk's metadata.
4185 return "originstname+%s" % instance.name
4188 def _CreateDisks(lu, instance):
4189 """Create all disks for an instance.
4191 This abstracts away some work from AddInstance.
4193 @type lu: L{LogicalUnit}
4194 @param lu: the logical unit on whose behalf we execute
4195 @type instance: L{objects.Instance}
4196 @param instance: the instance whose disks we should create
4198 @return: the success of the creation
4201 info = _GetInstanceInfoText(instance)
4202 pnode = instance.primary_node
4204 if instance.disk_template == constants.DT_FILE:
4205 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4206 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4208 if result.failed or not result.data:
4209 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4211 if not result.data[0]:
4212 raise errors.OpExecError("Failed to create directory '%s'" %
4215 # Note: this needs to be kept in sync with adding of disks in
4216 # LUSetInstanceParams
4217 for device in instance.disks:
4218 logging.info("Creating volume %s for instance %s",
4219 device.iv_name, instance.name)
4221 for node in instance.all_nodes:
4222 f_create = node == pnode
4223 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4226 def _RemoveDisks(lu, instance):
4227 """Remove all disks for an instance.
4229 This abstracts away some work from `AddInstance()` and
4230 `RemoveInstance()`. Note that in case some of the devices couldn't
4231 be removed, the removal will continue with the other ones (compare
4232 with `_CreateDisks()`).
4234 @type lu: L{LogicalUnit}
4235 @param lu: the logical unit on whose behalf we execute
4236 @type instance: L{objects.Instance}
4237 @param instance: the instance whose disks we should remove
4239 @return: the success of the removal
4242 logging.info("Removing block devices for instance %s", instance.name)
4245 for device in instance.disks:
4246 for node, disk in device.ComputeNodeTree(instance.primary_node):
4247 lu.cfg.SetDiskID(disk, node)
4248 msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4250 lu.LogWarning("Could not remove block device %s on node %s,"
4251 " continuing anyway: %s", device.iv_name, node, msg)
4254 if instance.disk_template == constants.DT_FILE:
4255 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4256 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4258 if result.failed or not result.data:
4259 logging.error("Could not remove directory '%s'", file_storage_dir)
4265 def _ComputeDiskSize(disk_template, disks):
4266 """Compute disk size requirements in the volume group
4269 # Required free disk space as a function of disk and swap space
4271 constants.DT_DISKLESS: None,
4272 constants.DT_PLAIN: sum(d["size"] for d in disks),
4273 # 128 MB are added for drbd metadata for each disk
4274 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4275 constants.DT_FILE: None,
4278 if disk_template not in req_size_dict:
4279 raise errors.ProgrammerError("Disk template '%s' size requirement"
4280 " is unknown" % disk_template)
4282 return req_size_dict[disk_template]
4285 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4286 """Hypervisor parameter validation.
4288 This function abstract the hypervisor parameter validation to be
4289 used in both instance create and instance modify.
4291 @type lu: L{LogicalUnit}
4292 @param lu: the logical unit for which we check
4293 @type nodenames: list
4294 @param nodenames: the list of nodes on which we should check
4295 @type hvname: string
4296 @param hvname: the name of the hypervisor we should use
4297 @type hvparams: dict
4298 @param hvparams: the parameters which we need to check
4299 @raise errors.OpPrereqError: if the parameters are not valid
4302 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4305 for node in nodenames:
4309 msg = info.RemoteFailMsg()
4311 raise errors.OpPrereqError("Hypervisor parameter validation"
4312 " failed on node %s: %s" % (node, msg))
4315 class LUCreateInstance(LogicalUnit):
4316 """Create an instance.
4319 HPATH = "instance-add"
4320 HTYPE = constants.HTYPE_INSTANCE
4321 _OP_REQP = ["instance_name", "disks", "disk_template",
4323 "wait_for_sync", "ip_check", "nics",
4324 "hvparams", "beparams"]
4327 def _ExpandNode(self, node):
4328 """Expands and checks one node name.
4331 node_full = self.cfg.ExpandNodeName(node)
4332 if node_full is None:
4333 raise errors.OpPrereqError("Unknown node %s" % node)
4336 def ExpandNames(self):
4337 """ExpandNames for CreateInstance.
4339 Figure out the right locks for instance creation.
4342 self.needed_locks = {}
4344 # set optional parameters to none if they don't exist
4345 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4346 if not hasattr(self.op, attr):
4347 setattr(self.op, attr, None)
4349 # cheap checks, mostly valid constants given
4351 # verify creation mode
4352 if self.op.mode not in (constants.INSTANCE_CREATE,
4353 constants.INSTANCE_IMPORT):
4354 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4357 # disk template and mirror node verification
4358 if self.op.disk_template not in constants.DISK_TEMPLATES:
4359 raise errors.OpPrereqError("Invalid disk template name")
4361 if self.op.hypervisor is None:
4362 self.op.hypervisor = self.cfg.GetHypervisorType()
4364 cluster = self.cfg.GetClusterInfo()
4365 enabled_hvs = cluster.enabled_hypervisors
4366 if self.op.hypervisor not in enabled_hvs:
4367 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4368 " cluster (%s)" % (self.op.hypervisor,
4369 ",".join(enabled_hvs)))
4371 # check hypervisor parameter syntax (locally)
4372 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4373 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4375 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4376 hv_type.CheckParameterSyntax(filled_hvp)
4377 self.hv_full = filled_hvp
4379 # fill and remember the beparams dict
4380 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4381 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4384 #### instance parameters check
4386 # instance name verification
4387 hostname1 = utils.HostInfo(self.op.instance_name)
4388 self.op.instance_name = instance_name = hostname1.name
4390 # this is just a preventive check, but someone might still add this
4391 # instance in the meantime, and creation will fail at lock-add time
4392 if instance_name in self.cfg.GetInstanceList():
4393 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4396 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4400 for nic in self.op.nics:
4401 # ip validity checks
4402 ip = nic.get("ip", None)
4403 if ip is None or ip.lower() == "none":
4405 elif ip.lower() == constants.VALUE_AUTO:
4406 nic_ip = hostname1.ip
4408 if not utils.IsValidIP(ip):
4409 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4410 " like a valid IP" % ip)
4413 # MAC address verification
4414 mac = nic.get("mac", constants.VALUE_AUTO)
4415 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4416 if not utils.IsValidMac(mac.lower()):
4417 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4419 # bridge verification
4420 bridge = nic.get("bridge", None)
4422 bridge = self.cfg.GetDefBridge()
4423 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4425 # disk checks/pre-build
4427 for disk in self.op.disks:
4428 mode = disk.get("mode", constants.DISK_RDWR)
4429 if mode not in constants.DISK_ACCESS_SET:
4430 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4432 size = disk.get("size", None)
4434 raise errors.OpPrereqError("Missing disk size")
4438 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4439 self.disks.append({"size": size, "mode": mode})
4441 # used in CheckPrereq for ip ping check
4442 self.check_ip = hostname1.ip
4444 # file storage checks
4445 if (self.op.file_driver and
4446 not self.op.file_driver in constants.FILE_DRIVER):
4447 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4448 self.op.file_driver)
4450 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4451 raise errors.OpPrereqError("File storage directory path not absolute")
4453 ### Node/iallocator related checks
4454 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4455 raise errors.OpPrereqError("One and only one of iallocator and primary"
4456 " node must be given")
4458 if self.op.iallocator:
4459 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4461 self.op.pnode = self._ExpandNode(self.op.pnode)
4462 nodelist = [self.op.pnode]
4463 if self.op.snode is not None:
4464 self.op.snode = self._ExpandNode(self.op.snode)
4465 nodelist.append(self.op.snode)
4466 self.needed_locks[locking.LEVEL_NODE] = nodelist
4468 # in case of import lock the source node too
4469 if self.op.mode == constants.INSTANCE_IMPORT:
4470 src_node = getattr(self.op, "src_node", None)
4471 src_path = getattr(self.op, "src_path", None)
4473 if src_path is None:
4474 self.op.src_path = src_path = self.op.instance_name
4476 if src_node is None:
4477 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4478 self.op.src_node = None
4479 if os.path.isabs(src_path):
4480 raise errors.OpPrereqError("Importing an instance from an absolute"
4481 " path requires a source node option.")
4483 self.op.src_node = src_node = self._ExpandNode(src_node)
4484 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4485 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4486 if not os.path.isabs(src_path):
4487 self.op.src_path = src_path = \
4488 os.path.join(constants.EXPORT_DIR, src_path)
4490 else: # INSTANCE_CREATE
4491 if getattr(self.op, "os_type", None) is None:
4492 raise errors.OpPrereqError("No guest OS specified")
4494 def _RunAllocator(self):
4495 """Run the allocator based on input opcode.
4498 nics = [n.ToDict() for n in self.nics]
4499 ial = IAllocator(self,
4500 mode=constants.IALLOCATOR_MODE_ALLOC,
4501 name=self.op.instance_name,
4502 disk_template=self.op.disk_template,
4505 vcpus=self.be_full[constants.BE_VCPUS],
4506 mem_size=self.be_full[constants.BE_MEMORY],
4509 hypervisor=self.op.hypervisor,
4512 ial.Run(self.op.iallocator)
4515 raise errors.OpPrereqError("Can't compute nodes using"
4516 " iallocator '%s': %s" % (self.op.iallocator,
4518 if len(ial.nodes) != ial.required_nodes:
4519 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4520 " of nodes (%s), required %s" %
4521 (self.op.iallocator, len(ial.nodes),
4522 ial.required_nodes))
4523 self.op.pnode = ial.nodes[0]
4524 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4525 self.op.instance_name, self.op.iallocator,
4526 ", ".join(ial.nodes))
4527 if ial.required_nodes == 2:
4528 self.op.snode = ial.nodes[1]
4530 def BuildHooksEnv(self):
4533 This runs on master, primary and secondary nodes of the instance.
4537 "ADD_MODE": self.op.mode,
4539 if self.op.mode == constants.INSTANCE_IMPORT:
4540 env["SRC_NODE"] = self.op.src_node
4541 env["SRC_PATH"] = self.op.src_path
4542 env["SRC_IMAGES"] = self.src_images
4544 env.update(_BuildInstanceHookEnv(
4545 name=self.op.instance_name,
4546 primary_node=self.op.pnode,
4547 secondary_nodes=self.secondaries,
4548 status=self.op.start,
4549 os_type=self.op.os_type,
4550 memory=self.be_full[constants.BE_MEMORY],
4551 vcpus=self.be_full[constants.BE_VCPUS],
4552 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4553 disk_template=self.op.disk_template,
4554 disks=[(d["size"], d["mode"]) for d in self.disks],
4557 hypervisor=self.op.hypervisor,
4560 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4565 def CheckPrereq(self):
4566 """Check prerequisites.
4569 if (not self.cfg.GetVGName() and
4570 self.op.disk_template not in constants.DTS_NOT_LVM):
4571 raise errors.OpPrereqError("Cluster does not support lvm-based"
4574 if self.op.mode == constants.INSTANCE_IMPORT:
4575 src_node = self.op.src_node
4576 src_path = self.op.src_path
4578 if src_node is None:
4579 exp_list = self.rpc.call_export_list(
4580 self.acquired_locks[locking.LEVEL_NODE])
4582 for node in exp_list:
4583 if not exp_list[node].failed and src_path in exp_list[node].data:
4585 self.op.src_node = src_node = node
4586 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4590 raise errors.OpPrereqError("No export found for relative path %s" %
4593 _CheckNodeOnline(self, src_node)
4594 result = self.rpc.call_export_info(src_node, src_path)
4597 raise errors.OpPrereqError("No export found in dir %s" % src_path)
4599 export_info = result.data
4600 if not export_info.has_section(constants.INISECT_EXP):
4601 raise errors.ProgrammerError("Corrupted export config")
4603 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4604 if (int(ei_version) != constants.EXPORT_VERSION):
4605 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4606 (ei_version, constants.EXPORT_VERSION))
4608 # Check that the new instance doesn't have less disks than the export
4609 instance_disks = len(self.disks)
4610 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4611 if instance_disks < export_disks:
4612 raise errors.OpPrereqError("Not enough disks to import."
4613 " (instance: %d, export: %d)" %
4614 (instance_disks, export_disks))
4616 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4618 for idx in range(export_disks):
4619 option = 'disk%d_dump' % idx
4620 if export_info.has_option(constants.INISECT_INS, option):
4621 # FIXME: are the old os-es, disk sizes, etc. useful?
4622 export_name = export_info.get(constants.INISECT_INS, option)
4623 image = os.path.join(src_path, export_name)
4624 disk_images.append(image)
4626 disk_images.append(False)
4628 self.src_images = disk_images
4630 old_name = export_info.get(constants.INISECT_INS, 'name')
4631 # FIXME: int() here could throw a ValueError on broken exports
4632 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4633 if self.op.instance_name == old_name:
4634 for idx, nic in enumerate(self.nics):
4635 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4636 nic_mac_ini = 'nic%d_mac' % idx
4637 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4639 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4640 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4641 if self.op.start and not self.op.ip_check:
4642 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4643 " adding an instance in start mode")
4645 if self.op.ip_check:
4646 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4647 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4648 (self.check_ip, self.op.instance_name))
4650 #### mac address generation
4651 # By generating here the mac address both the allocator and the hooks get
4652 # the real final mac address rather than the 'auto' or 'generate' value.
4653 # There is a race condition between the generation and the instance object
4654 # creation, which means that we know the mac is valid now, but we're not
4655 # sure it will be when we actually add the instance. If things go bad
4656 # adding the instance will abort because of a duplicate mac, and the
4657 # creation job will fail.
4658 for nic in self.nics:
4659 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4660 nic.mac = self.cfg.GenerateMAC()
4664 if self.op.iallocator is not None:
4665 self._RunAllocator()
4667 #### node related checks
4669 # check primary node
4670 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4671 assert self.pnode is not None, \
4672 "Cannot retrieve locked node %s" % self.op.pnode
4674 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4677 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4680 self.secondaries = []
4682 # mirror node verification
4683 if self.op.disk_template in constants.DTS_NET_MIRROR:
4684 if self.op.snode is None:
4685 raise errors.OpPrereqError("The networked disk templates need"
4687 if self.op.snode == pnode.name:
4688 raise errors.OpPrereqError("The secondary node cannot be"
4689 " the primary node.")
4690 _CheckNodeOnline(self, self.op.snode)
4691 _CheckNodeNotDrained(self, self.op.snode)
4692 self.secondaries.append(self.op.snode)
4694 nodenames = [pnode.name] + self.secondaries
4696 req_size = _ComputeDiskSize(self.op.disk_template,
4699 # Check lv size requirements
4700 if req_size is not None:
4701 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4703 for node in nodenames:
4704 info = nodeinfo[node]
4708 raise errors.OpPrereqError("Cannot get current information"
4709 " from node '%s'" % node)
4710 vg_free = info.get('vg_free', None)
4711 if not isinstance(vg_free, int):
4712 raise errors.OpPrereqError("Can't compute free disk space on"
4714 if req_size > info['vg_free']:
4715 raise errors.OpPrereqError("Not enough disk space on target node %s."
4716 " %d MB available, %d MB required" %
4717 (node, info['vg_free'], req_size))
4719 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4722 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4724 if not isinstance(result.data, objects.OS):
4725 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4726 " primary node" % self.op.os_type)
4728 # bridge check on primary node
4729 bridges = [n.bridge for n in self.nics]
4730 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4733 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4734 " exist on destination node '%s'" %
4735 (",".join(bridges), pnode.name))
4737 # memory check on primary node
4739 _CheckNodeFreeMemory(self, self.pnode.name,
4740 "creating instance %s" % self.op.instance_name,
4741 self.be_full[constants.BE_MEMORY],
4744 def Exec(self, feedback_fn):
4745 """Create and add the instance to the cluster.
4748 instance = self.op.instance_name
4749 pnode_name = self.pnode.name
4751 ht_kind = self.op.hypervisor
4752 if ht_kind in constants.HTS_REQ_PORT:
4753 network_port = self.cfg.AllocatePort()
4757 ##if self.op.vnc_bind_address is None:
4758 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4760 # this is needed because os.path.join does not accept None arguments
4761 if self.op.file_storage_dir is None:
4762 string_file_storage_dir = ""
4764 string_file_storage_dir = self.op.file_storage_dir
4766 # build the full file storage dir path
4767 file_storage_dir = os.path.normpath(os.path.join(
4768 self.cfg.GetFileStorageDir(),
4769 string_file_storage_dir, instance))
4772 disks = _GenerateDiskTemplate(self,
4773 self.op.disk_template,
4774 instance, pnode_name,
4778 self.op.file_driver,
4781 iobj = objects.Instance(name=instance, os=self.op.os_type,
4782 primary_node=pnode_name,
4783 nics=self.nics, disks=disks,
4784 disk_template=self.op.disk_template,
4786 network_port=network_port,
4787 beparams=self.op.beparams,
4788 hvparams=self.op.hvparams,
4789 hypervisor=self.op.hypervisor,
4792 feedback_fn("* creating instance disks...")
4794 _CreateDisks(self, iobj)
4795 except errors.OpExecError:
4796 self.LogWarning("Device creation failed, reverting...")
4798 _RemoveDisks(self, iobj)
4800 self.cfg.ReleaseDRBDMinors(instance)
4803 feedback_fn("adding instance %s to cluster config" % instance)
4805 self.cfg.AddInstance(iobj)
4806 # Declare that we don't want to remove the instance lock anymore, as we've
4807 # added the instance to the config
4808 del self.remove_locks[locking.LEVEL_INSTANCE]
4809 # Unlock all the nodes
4810 if self.op.mode == constants.INSTANCE_IMPORT:
4811 nodes_keep = [self.op.src_node]
4812 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4813 if node != self.op.src_node]
4814 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4815 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4817 self.context.glm.release(locking.LEVEL_NODE)
4818 del self.acquired_locks[locking.LEVEL_NODE]
4820 if self.op.wait_for_sync:
4821 disk_abort = not _WaitForSync(self, iobj)
4822 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4823 # make sure the disks are not degraded (still sync-ing is ok)
4825 feedback_fn("* checking mirrors status")
4826 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4831 _RemoveDisks(self, iobj)
4832 self.cfg.RemoveInstance(iobj.name)
4833 # Make sure the instance lock gets removed
4834 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4835 raise errors.OpExecError("There are some degraded disks for"
4838 feedback_fn("creating os for instance %s on node %s" %
4839 (instance, pnode_name))
4841 if iobj.disk_template != constants.DT_DISKLESS:
4842 if self.op.mode == constants.INSTANCE_CREATE:
4843 feedback_fn("* running the instance OS create scripts...")
4844 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4845 msg = result.RemoteFailMsg()
4847 raise errors.OpExecError("Could not add os for instance %s"
4849 (instance, pnode_name, msg))
4851 elif self.op.mode == constants.INSTANCE_IMPORT:
4852 feedback_fn("* running the instance OS import scripts...")
4853 src_node = self.op.src_node
4854 src_images = self.src_images
4855 cluster_name = self.cfg.GetClusterName()
4856 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4857 src_node, src_images,
4859 import_result.Raise()
4860 for idx, result in enumerate(import_result.data):
4862 self.LogWarning("Could not import the image %s for instance"
4863 " %s, disk %d, on node %s" %
4864 (src_images[idx], instance, idx, pnode_name))
4866 # also checked in the prereq part
4867 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4871 iobj.admin_up = True
4872 self.cfg.Update(iobj)
4873 logging.info("Starting instance %s on node %s", instance, pnode_name)
4874 feedback_fn("* starting instance...")
4875 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4876 msg = result.RemoteFailMsg()
4878 raise errors.OpExecError("Could not start instance: %s" % msg)
4881 class LUConnectConsole(NoHooksLU):
4882 """Connect to an instance's console.
4884 This is somewhat special in that it returns the command line that
4885 you need to run on the master node in order to connect to the
4889 _OP_REQP = ["instance_name"]
4892 def ExpandNames(self):
4893 self._ExpandAndLockInstance()
4895 def CheckPrereq(self):
4896 """Check prerequisites.
4898 This checks that the instance is in the cluster.
4901 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4902 assert self.instance is not None, \
4903 "Cannot retrieve locked instance %s" % self.op.instance_name
4904 _CheckNodeOnline(self, self.instance.primary_node)
4906 def Exec(self, feedback_fn):
4907 """Connect to the console of an instance
4910 instance = self.instance
4911 node = instance.primary_node
4913 node_insts = self.rpc.call_instance_list([node],
4914 [instance.hypervisor])[node]
4917 if instance.name not in node_insts.data:
4918 raise errors.OpExecError("Instance %s is not running." % instance.name)
4920 logging.debug("Connecting to console of %s on %s", instance.name, node)
4922 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4923 cluster = self.cfg.GetClusterInfo()
4924 # beparams and hvparams are passed separately, to avoid editing the
4925 # instance and then saving the defaults in the instance itself.
4926 hvparams = cluster.FillHV(instance)
4927 beparams = cluster.FillBE(instance)
4928 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4931 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4934 class LUReplaceDisks(LogicalUnit):
4935 """Replace the disks of an instance.
4938 HPATH = "mirrors-replace"
4939 HTYPE = constants.HTYPE_INSTANCE
4940 _OP_REQP = ["instance_name", "mode", "disks"]
4943 def CheckArguments(self):
4944 if not hasattr(self.op, "remote_node"):
4945 self.op.remote_node = None
4946 if not hasattr(self.op, "iallocator"):
4947 self.op.iallocator = None
4949 # check for valid parameter combination
4950 cnt = [self.op.remote_node, self.op.iallocator].count(None)
4951 if self.op.mode == constants.REPLACE_DISK_CHG:
4953 raise errors.OpPrereqError("When changing the secondary either an"
4954 " iallocator script must be used or the"
4957 raise errors.OpPrereqError("Give either the iallocator or the new"
4958 " secondary, not both")
4959 else: # not replacing the secondary
4961 raise errors.OpPrereqError("The iallocator and new node options can"
4962 " be used only when changing the"
4965 def ExpandNames(self):
4966 self._ExpandAndLockInstance()
4968 if self.op.iallocator is not None:
4969 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4970 elif self.op.remote_node is not None:
4971 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4972 if remote_node is None:
4973 raise errors.OpPrereqError("Node '%s' not known" %
4974 self.op.remote_node)
4975 self.op.remote_node = remote_node
4976 # Warning: do not remove the locking of the new secondary here
4977 # unless DRBD8.AddChildren is changed to work in parallel;
4978 # currently it doesn't since parallel invocations of
4979 # FindUnusedMinor will conflict
4980 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4981 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4983 self.needed_locks[locking.LEVEL_NODE] = []
4984 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4986 def DeclareLocks(self, level):
4987 # If we're not already locking all nodes in the set we have to declare the
4988 # instance's primary/secondary nodes.
4989 if (level == locking.LEVEL_NODE and
4990 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4991 self._LockInstancesNodes()
4993 def _RunAllocator(self):
4994 """Compute a new secondary node using an IAllocator.
4997 ial = IAllocator(self,
4998 mode=constants.IALLOCATOR_MODE_RELOC,
4999 name=self.op.instance_name,
5000 relocate_from=[self.sec_node])
5002 ial.Run(self.op.iallocator)
5005 raise errors.OpPrereqError("Can't compute nodes using"
5006 " iallocator '%s': %s" % (self.op.iallocator,
5008 if len(ial.nodes) != ial.required_nodes:
5009 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5010 " of nodes (%s), required %s" %
5011 (len(ial.nodes), ial.required_nodes))
5012 self.op.remote_node = ial.nodes[0]
5013 self.LogInfo("Selected new secondary for the instance: %s",
5014 self.op.remote_node)
5016 def BuildHooksEnv(self):
5019 This runs on the master, the primary and all the secondaries.
5023 "MODE": self.op.mode,
5024 "NEW_SECONDARY": self.op.remote_node,
5025 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5027 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5029 self.cfg.GetMasterNode(),
5030 self.instance.primary_node,
5032 if self.op.remote_node is not None:
5033 nl.append(self.op.remote_node)
5036 def CheckPrereq(self):
5037 """Check prerequisites.
5039 This checks that the instance is in the cluster.
5042 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5043 assert instance is not None, \
5044 "Cannot retrieve locked instance %s" % self.op.instance_name
5045 self.instance = instance
5047 if instance.disk_template != constants.DT_DRBD8:
5048 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5051 if len(instance.secondary_nodes) != 1:
5052 raise errors.OpPrereqError("The instance has a strange layout,"
5053 " expected one secondary but found %d" %
5054 len(instance.secondary_nodes))
5056 self.sec_node = instance.secondary_nodes[0]
5058 if self.op.iallocator is not None:
5059 self._RunAllocator()
5061 remote_node = self.op.remote_node
5062 if remote_node is not None:
5063 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5064 assert self.remote_node_info is not None, \
5065 "Cannot retrieve locked node %s" % remote_node
5067 self.remote_node_info = None
5068 if remote_node == instance.primary_node:
5069 raise errors.OpPrereqError("The specified node is the primary node of"
5071 elif remote_node == self.sec_node:
5072 raise errors.OpPrereqError("The specified node is already the"
5073 " secondary node of the instance.")
5075 if self.op.mode == constants.REPLACE_DISK_PRI:
5076 n1 = self.tgt_node = instance.primary_node
5077 n2 = self.oth_node = self.sec_node
5078 elif self.op.mode == constants.REPLACE_DISK_SEC:
5079 n1 = self.tgt_node = self.sec_node
5080 n2 = self.oth_node = instance.primary_node
5081 elif self.op.mode == constants.REPLACE_DISK_CHG:
5082 n1 = self.new_node = remote_node
5083 n2 = self.oth_node = instance.primary_node
5084 self.tgt_node = self.sec_node
5085 _CheckNodeNotDrained(self, remote_node)
5087 raise errors.ProgrammerError("Unhandled disk replace mode")
5089 _CheckNodeOnline(self, n1)
5090 _CheckNodeOnline(self, n2)
5092 if not self.op.disks:
5093 self.op.disks = range(len(instance.disks))
5095 for disk_idx in self.op.disks:
5096 instance.FindDisk(disk_idx)
5098 def _ExecD8DiskOnly(self, feedback_fn):
5099 """Replace a disk on the primary or secondary for dbrd8.
5101 The algorithm for replace is quite complicated:
5103 1. for each disk to be replaced:
5105 1. create new LVs on the target node with unique names
5106 1. detach old LVs from the drbd device
5107 1. rename old LVs to name_replaced.<time_t>
5108 1. rename new LVs to old LVs
5109 1. attach the new LVs (with the old names now) to the drbd device
5111 1. wait for sync across all devices
5113 1. for each modified disk:
5115 1. remove old LVs (which have the name name_replaces.<time_t>)
5117 Failures are not very well handled.
5121 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5122 instance = self.instance
5124 vgname = self.cfg.GetVGName()
5127 tgt_node = self.tgt_node
5128 oth_node = self.oth_node
5130 # Step: check device activation
5131 self.proc.LogStep(1, steps_total, "check device existence")
5132 info("checking volume groups")
5133 my_vg = cfg.GetVGName()
5134 results = self.rpc.call_vg_list([oth_node, tgt_node])
5136 raise errors.OpExecError("Can't list volume groups on the nodes")
5137 for node in oth_node, tgt_node:
5139 if res.failed or not res.data or my_vg not in res.data:
5140 raise errors.OpExecError("Volume group '%s' not found on %s" %
5142 for idx, dev in enumerate(instance.disks):
5143 if idx not in self.op.disks:
5145 for node in tgt_node, oth_node:
5146 info("checking disk/%d on %s" % (idx, node))
5147 cfg.SetDiskID(dev, node)
5148 result = self.rpc.call_blockdev_find(node, dev)
5149 msg = result.RemoteFailMsg()
5150 if not msg and not result.payload:
5151 msg = "disk not found"
5153 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5156 # Step: check other node consistency
5157 self.proc.LogStep(2, steps_total, "check peer consistency")
5158 for idx, dev in enumerate(instance.disks):
5159 if idx not in self.op.disks:
5161 info("checking disk/%d consistency on %s" % (idx, oth_node))
5162 if not _CheckDiskConsistency(self, dev, oth_node,
5163 oth_node==instance.primary_node):
5164 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5165 " to replace disks on this node (%s)" %
5166 (oth_node, tgt_node))
5168 # Step: create new storage
5169 self.proc.LogStep(3, steps_total, "allocate new storage")
5170 for idx, dev in enumerate(instance.disks):
5171 if idx not in self.op.disks:
5174 cfg.SetDiskID(dev, tgt_node)
5175 lv_names = [".disk%d_%s" % (idx, suf)
5176 for suf in ["data", "meta"]]
5177 names = _GenerateUniqueNames(self, lv_names)
5178 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5179 logical_id=(vgname, names[0]))
5180 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5181 logical_id=(vgname, names[1]))
5182 new_lvs = [lv_data, lv_meta]
5183 old_lvs = dev.children
5184 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5185 info("creating new local storage on %s for %s" %
5186 (tgt_node, dev.iv_name))
5187 # we pass force_create=True to force the LVM creation
5188 for new_lv in new_lvs:
5189 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5190 _GetInstanceInfoText(instance), False)
5192 # Step: for each lv, detach+rename*2+attach
5193 self.proc.LogStep(4, steps_total, "change drbd configuration")
5194 for dev, old_lvs, new_lvs in iv_names.itervalues():
5195 info("detaching %s drbd from local storage" % dev.iv_name)
5196 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5199 raise errors.OpExecError("Can't detach drbd from local storage on node"
5200 " %s for device %s" % (tgt_node, dev.iv_name))
5202 #cfg.Update(instance)
5204 # ok, we created the new LVs, so now we know we have the needed
5205 # storage; as such, we proceed on the target node to rename
5206 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5207 # using the assumption that logical_id == physical_id (which in
5208 # turn is the unique_id on that node)
5210 # FIXME(iustin): use a better name for the replaced LVs
5211 temp_suffix = int(time.time())
5212 ren_fn = lambda d, suff: (d.physical_id[0],
5213 d.physical_id[1] + "_replaced-%s" % suff)
5214 # build the rename list based on what LVs exist on the node
5216 for to_ren in old_lvs:
5217 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5218 if not result.RemoteFailMsg() and result.payload:
5220 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5222 info("renaming the old LVs on the target node")
5223 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5226 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5227 # now we rename the new LVs to the old LVs
5228 info("renaming the new LVs on the target node")
5229 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5230 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5233 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5235 for old, new in zip(old_lvs, new_lvs):
5236 new.logical_id = old.logical_id
5237 cfg.SetDiskID(new, tgt_node)
5239 for disk in old_lvs:
5240 disk.logical_id = ren_fn(disk, temp_suffix)
5241 cfg.SetDiskID(disk, tgt_node)
5243 # now that the new lvs have the old name, we can add them to the device
5244 info("adding new mirror component on %s" % tgt_node)
5245 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5246 if result.failed or not result.data:
5247 for new_lv in new_lvs:
5248 msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5250 warning("Can't rollback device %s: %s", dev, msg,
5251 hint="cleanup manually the unused logical volumes")
5252 raise errors.OpExecError("Can't add local storage to drbd")
5254 dev.children = new_lvs
5255 cfg.Update(instance)
5257 # Step: wait for sync
5259 # this can fail as the old devices are degraded and _WaitForSync
5260 # does a combined result over all disks, so we don't check its
5262 self.proc.LogStep(5, steps_total, "sync devices")
5263 _WaitForSync(self, instance, unlock=True)
5265 # so check manually all the devices
5266 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5267 cfg.SetDiskID(dev, instance.primary_node)
5268 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5269 msg = result.RemoteFailMsg()
5270 if not msg and not result.payload:
5271 msg = "disk not found"
5273 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5275 if result.payload[5]:
5276 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5278 # Step: remove old storage
5279 self.proc.LogStep(6, steps_total, "removing old storage")
5280 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5281 info("remove logical volumes for %s" % name)
5283 cfg.SetDiskID(lv, tgt_node)
5284 msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5286 warning("Can't remove old LV: %s" % msg,
5287 hint="manually remove unused LVs")
5290 def _ExecD8Secondary(self, feedback_fn):
5291 """Replace the secondary node for drbd8.
5293 The algorithm for replace is quite complicated:
5294 - for all disks of the instance:
5295 - create new LVs on the new node with same names
5296 - shutdown the drbd device on the old secondary
5297 - disconnect the drbd network on the primary
5298 - create the drbd device on the new secondary
5299 - network attach the drbd on the primary, using an artifice:
5300 the drbd code for Attach() will connect to the network if it
5301 finds a device which is connected to the good local disks but
5303 - wait for sync across all devices
5304 - remove all disks from the old secondary
5306 Failures are not very well handled.
5310 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5311 instance = self.instance
5315 old_node = self.tgt_node
5316 new_node = self.new_node
5317 pri_node = instance.primary_node
5319 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5320 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5321 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5324 # Step: check device activation
5325 self.proc.LogStep(1, steps_total, "check device existence")
5326 info("checking volume groups")
5327 my_vg = cfg.GetVGName()
5328 results = self.rpc.call_vg_list([pri_node, new_node])
5329 for node in pri_node, new_node:
5331 if res.failed or not res.data or my_vg not in res.data:
5332 raise errors.OpExecError("Volume group '%s' not found on %s" %
5334 for idx, dev in enumerate(instance.disks):
5335 if idx not in self.op.disks:
5337 info("checking disk/%d on %s" % (idx, pri_node))
5338 cfg.SetDiskID(dev, pri_node)
5339 result = self.rpc.call_blockdev_find(pri_node, dev)
5340 msg = result.RemoteFailMsg()
5341 if not msg and not result.payload:
5342 msg = "disk not found"
5344 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5345 (idx, pri_node, msg))
5347 # Step: check other node consistency
5348 self.proc.LogStep(2, steps_total, "check peer consistency")
5349 for idx, dev in enumerate(instance.disks):
5350 if idx not in self.op.disks:
5352 info("checking disk/%d consistency on %s" % (idx, pri_node))
5353 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5354 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5355 " unsafe to replace the secondary" %
5358 # Step: create new storage
5359 self.proc.LogStep(3, steps_total, "allocate new storage")
5360 for idx, dev in enumerate(instance.disks):
5361 info("adding new local storage on %s for disk/%d" %
5363 # we pass force_create=True to force LVM creation
5364 for new_lv in dev.children:
5365 _CreateBlockDev(self, new_node, instance, new_lv, True,
5366 _GetInstanceInfoText(instance), False)
5368 # Step 4: dbrd minors and drbd setups changes
5369 # after this, we must manually remove the drbd minors on both the
5370 # error and the success paths
5371 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5373 logging.debug("Allocated minors %s" % (minors,))
5374 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5375 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5377 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5378 # create new devices on new_node; note that we create two IDs:
5379 # one without port, so the drbd will be activated without
5380 # networking information on the new node at this stage, and one
5381 # with network, for the latter activation in step 4
5382 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5383 if pri_node == o_node1:
5388 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5389 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5391 iv_names[idx] = (dev, dev.children, new_net_id)
5392 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5394 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5395 logical_id=new_alone_id,
5396 children=dev.children)
5398 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5399 _GetInstanceInfoText(instance), False)
5400 except errors.GenericError:
5401 self.cfg.ReleaseDRBDMinors(instance.name)
5404 for idx, dev in enumerate(instance.disks):
5405 # we have new devices, shutdown the drbd on the old secondary
5406 info("shutting down drbd for disk/%d on old node" % idx)
5407 cfg.SetDiskID(dev, old_node)
5408 msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5410 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5412 hint="Please cleanup this device manually as soon as possible")
5414 info("detaching primary drbds from the network (=> standalone)")
5415 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5416 instance.disks)[pri_node]
5418 msg = result.RemoteFailMsg()
5420 # detaches didn't succeed (unlikely)
5421 self.cfg.ReleaseDRBDMinors(instance.name)
5422 raise errors.OpExecError("Can't detach the disks from the network on"
5423 " old node: %s" % (msg,))
5425 # if we managed to detach at least one, we update all the disks of
5426 # the instance to point to the new secondary
5427 info("updating instance configuration")
5428 for dev, _, new_logical_id in iv_names.itervalues():
5429 dev.logical_id = new_logical_id
5430 cfg.SetDiskID(dev, pri_node)
5431 cfg.Update(instance)
5433 # and now perform the drbd attach
5434 info("attaching primary drbds to new secondary (standalone => connected)")
5435 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5436 instance.disks, instance.name,
5438 for to_node, to_result in result.items():
5439 msg = to_result.RemoteFailMsg()
5441 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5442 hint="please do a gnt-instance info to see the"
5445 # this can fail as the old devices are degraded and _WaitForSync
5446 # does a combined result over all disks, so we don't check its
5448 self.proc.LogStep(5, steps_total, "sync devices")
5449 _WaitForSync(self, instance, unlock=True)
5451 # so check manually all the devices
5452 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5453 cfg.SetDiskID(dev, pri_node)
5454 result = self.rpc.call_blockdev_find(pri_node, dev)
5455 msg = result.RemoteFailMsg()
5456 if not msg and not result.payload:
5457 msg = "disk not found"
5459 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5461 if result.payload[5]:
5462 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5464 self.proc.LogStep(6, steps_total, "removing old storage")
5465 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5466 info("remove logical volumes for disk/%d" % idx)
5468 cfg.SetDiskID(lv, old_node)
5469 msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5471 warning("Can't remove LV on old secondary: %s", msg,
5472 hint="Cleanup stale volumes by hand")
5474 def Exec(self, feedback_fn):
5475 """Execute disk replacement.
5477 This dispatches the disk replacement to the appropriate handler.
5480 instance = self.instance
5482 # Activate the instance disks if we're replacing them on a down instance
5483 if not instance.admin_up:
5484 _StartInstanceDisks(self, instance, True)
5486 if self.op.mode == constants.REPLACE_DISK_CHG:
5487 fn = self._ExecD8Secondary
5489 fn = self._ExecD8DiskOnly
5491 ret = fn(feedback_fn)
5493 # Deactivate the instance disks if we're replacing them on a down instance
5494 if not instance.admin_up:
5495 _SafeShutdownInstanceDisks(self, instance)
5500 class LUGrowDisk(LogicalUnit):
5501 """Grow a disk of an instance.
5505 HTYPE = constants.HTYPE_INSTANCE
5506 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5509 def ExpandNames(self):
5510 self._ExpandAndLockInstance()
5511 self.needed_locks[locking.LEVEL_NODE] = []
5512 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5514 def DeclareLocks(self, level):
5515 if level == locking.LEVEL_NODE:
5516 self._LockInstancesNodes()
5518 def BuildHooksEnv(self):
5521 This runs on the master, the primary and all the secondaries.
5525 "DISK": self.op.disk,
5526 "AMOUNT": self.op.amount,
5528 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5530 self.cfg.GetMasterNode(),
5531 self.instance.primary_node,
5535 def CheckPrereq(self):
5536 """Check prerequisites.
5538 This checks that the instance is in the cluster.
5541 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5542 assert instance is not None, \
5543 "Cannot retrieve locked instance %s" % self.op.instance_name
5544 nodenames = list(instance.all_nodes)
5545 for node in nodenames:
5546 _CheckNodeOnline(self, node)
5549 self.instance = instance
5551 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5552 raise errors.OpPrereqError("Instance's disk layout does not support"
5555 self.disk = instance.FindDisk(self.op.disk)
5557 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5558 instance.hypervisor)
5559 for node in nodenames:
5560 info = nodeinfo[node]
5561 if info.failed or not info.data:
5562 raise errors.OpPrereqError("Cannot get current information"
5563 " from node '%s'" % node)
5564 vg_free = info.data.get('vg_free', None)
5565 if not isinstance(vg_free, int):
5566 raise errors.OpPrereqError("Can't compute free disk space on"
5568 if self.op.amount > vg_free:
5569 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5570 " %d MiB available, %d MiB required" %
5571 (node, vg_free, self.op.amount))
5573 def Exec(self, feedback_fn):
5574 """Execute disk grow.
5577 instance = self.instance
5579 for node in instance.all_nodes:
5580 self.cfg.SetDiskID(disk, node)
5581 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5582 msg = result.RemoteFailMsg()
5584 raise errors.OpExecError("Grow request failed to node %s: %s" %
5586 disk.RecordGrow(self.op.amount)
5587 self.cfg.Update(instance)
5588 if self.op.wait_for_sync:
5589 disk_abort = not _WaitForSync(self, instance)
5591 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5592 " status.\nPlease check the instance.")
5595 class LUQueryInstanceData(NoHooksLU):
5596 """Query runtime instance data.
5599 _OP_REQP = ["instances", "static"]
5602 def ExpandNames(self):
5603 self.needed_locks = {}
5604 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5606 if not isinstance(self.op.instances, list):
5607 raise errors.OpPrereqError("Invalid argument type 'instances'")
5609 if self.op.instances:
5610 self.wanted_names = []
5611 for name in self.op.instances:
5612 full_name = self.cfg.ExpandInstanceName(name)
5613 if full_name is None:
5614 raise errors.OpPrereqError("Instance '%s' not known" % name)
5615 self.wanted_names.append(full_name)
5616 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5618 self.wanted_names = None
5619 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5621 self.needed_locks[locking.LEVEL_NODE] = []
5622 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5624 def DeclareLocks(self, level):
5625 if level == locking.LEVEL_NODE:
5626 self._LockInstancesNodes()
5628 def CheckPrereq(self):
5629 """Check prerequisites.
5631 This only checks the optional instance list against the existing names.
5634 if self.wanted_names is None:
5635 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5637 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5638 in self.wanted_names]
5641 def _ComputeDiskStatus(self, instance, snode, dev):
5642 """Compute block device status.
5645 static = self.op.static
5647 self.cfg.SetDiskID(dev, instance.primary_node)
5648 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5649 if dev_pstatus.offline:
5652 msg = dev_pstatus.RemoteFailMsg()
5654 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5655 (instance.name, msg))
5656 dev_pstatus = dev_pstatus.payload
5660 if dev.dev_type in constants.LDS_DRBD:
5661 # we change the snode then (otherwise we use the one passed in)
5662 if dev.logical_id[0] == instance.primary_node:
5663 snode = dev.logical_id[1]
5665 snode = dev.logical_id[0]
5667 if snode and not static:
5668 self.cfg.SetDiskID(dev, snode)
5669 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5670 if dev_sstatus.offline:
5673 msg = dev_sstatus.RemoteFailMsg()
5675 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5676 (instance.name, msg))
5677 dev_sstatus = dev_sstatus.payload
5682 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5683 for child in dev.children]
5688 "iv_name": dev.iv_name,
5689 "dev_type": dev.dev_type,
5690 "logical_id": dev.logical_id,
5691 "physical_id": dev.physical_id,
5692 "pstatus": dev_pstatus,
5693 "sstatus": dev_sstatus,
5694 "children": dev_children,
5700 def Exec(self, feedback_fn):
5701 """Gather and return data"""
5704 cluster = self.cfg.GetClusterInfo()
5706 for instance in self.wanted_instances:
5707 if not self.op.static:
5708 remote_info = self.rpc.call_instance_info(instance.primary_node,
5710 instance.hypervisor)
5712 remote_info = remote_info.data
5713 if remote_info and "state" in remote_info:
5716 remote_state = "down"
5719 if instance.admin_up:
5722 config_state = "down"
5724 disks = [self._ComputeDiskStatus(instance, None, device)
5725 for device in instance.disks]
5728 "name": instance.name,
5729 "config_state": config_state,
5730 "run_state": remote_state,
5731 "pnode": instance.primary_node,
5732 "snodes": instance.secondary_nodes,
5734 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5736 "hypervisor": instance.hypervisor,
5737 "network_port": instance.network_port,
5738 "hv_instance": instance.hvparams,
5739 "hv_actual": cluster.FillHV(instance),
5740 "be_instance": instance.beparams,
5741 "be_actual": cluster.FillBE(instance),
5744 result[instance.name] = idict
5749 class LUSetInstanceParams(LogicalUnit):
5750 """Modifies an instances's parameters.
5753 HPATH = "instance-modify"
5754 HTYPE = constants.HTYPE_INSTANCE
5755 _OP_REQP = ["instance_name"]
5758 def CheckArguments(self):
5759 if not hasattr(self.op, 'nics'):
5761 if not hasattr(self.op, 'disks'):
5763 if not hasattr(self.op, 'beparams'):
5764 self.op.beparams = {}
5765 if not hasattr(self.op, 'hvparams'):
5766 self.op.hvparams = {}
5767 self.op.force = getattr(self.op, "force", False)
5768 if not (self.op.nics or self.op.disks or
5769 self.op.hvparams or self.op.beparams):
5770 raise errors.OpPrereqError("No changes submitted")
5774 for disk_op, disk_dict in self.op.disks:
5775 if disk_op == constants.DDM_REMOVE:
5778 elif disk_op == constants.DDM_ADD:
5781 if not isinstance(disk_op, int):
5782 raise errors.OpPrereqError("Invalid disk index")
5783 if disk_op == constants.DDM_ADD:
5784 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5785 if mode not in constants.DISK_ACCESS_SET:
5786 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5787 size = disk_dict.get('size', None)
5789 raise errors.OpPrereqError("Required disk parameter size missing")
5792 except ValueError, err:
5793 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5795 disk_dict['size'] = size
5797 # modification of disk
5798 if 'size' in disk_dict:
5799 raise errors.OpPrereqError("Disk size change not possible, use"
5802 if disk_addremove > 1:
5803 raise errors.OpPrereqError("Only one disk add or remove operation"
5804 " supported at a time")
5808 for nic_op, nic_dict in self.op.nics:
5809 if nic_op == constants.DDM_REMOVE:
5812 elif nic_op == constants.DDM_ADD:
5815 if not isinstance(nic_op, int):
5816 raise errors.OpPrereqError("Invalid nic index")
5818 # nic_dict should be a dict
5819 nic_ip = nic_dict.get('ip', None)
5820 if nic_ip is not None:
5821 if nic_ip.lower() == constants.VALUE_NONE:
5822 nic_dict['ip'] = None
5824 if not utils.IsValidIP(nic_ip):
5825 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5827 if nic_op == constants.DDM_ADD:
5828 nic_bridge = nic_dict.get('bridge', None)
5829 if nic_bridge is None:
5830 nic_dict['bridge'] = self.cfg.GetDefBridge()
5831 nic_mac = nic_dict.get('mac', None)
5833 nic_dict['mac'] = constants.VALUE_AUTO
5835 if 'mac' in nic_dict:
5836 nic_mac = nic_dict['mac']
5837 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5838 if not utils.IsValidMac(nic_mac):
5839 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5840 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5841 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5842 " modifying an existing nic")
5844 if nic_addremove > 1:
5845 raise errors.OpPrereqError("Only one NIC add or remove operation"
5846 " supported at a time")
5848 def ExpandNames(self):
5849 self._ExpandAndLockInstance()
5850 self.needed_locks[locking.LEVEL_NODE] = []
5851 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5853 def DeclareLocks(self, level):
5854 if level == locking.LEVEL_NODE:
5855 self._LockInstancesNodes()
5857 def BuildHooksEnv(self):
5860 This runs on the master, primary and secondaries.
5864 if constants.BE_MEMORY in self.be_new:
5865 args['memory'] = self.be_new[constants.BE_MEMORY]
5866 if constants.BE_VCPUS in self.be_new:
5867 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5868 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5869 # information at all.
5872 nic_override = dict(self.op.nics)
5873 for idx, nic in enumerate(self.instance.nics):
5874 if idx in nic_override:
5875 this_nic_override = nic_override[idx]
5877 this_nic_override = {}
5878 if 'ip' in this_nic_override:
5879 ip = this_nic_override['ip']
5882 if 'bridge' in this_nic_override:
5883 bridge = this_nic_override['bridge']
5886 if 'mac' in this_nic_override:
5887 mac = this_nic_override['mac']
5890 args['nics'].append((ip, bridge, mac))
5891 if constants.DDM_ADD in nic_override:
5892 ip = nic_override[constants.DDM_ADD].get('ip', None)
5893 bridge = nic_override[constants.DDM_ADD]['bridge']
5894 mac = nic_override[constants.DDM_ADD]['mac']
5895 args['nics'].append((ip, bridge, mac))
5896 elif constants.DDM_REMOVE in nic_override:
5897 del args['nics'][-1]
5899 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5900 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5903 def CheckPrereq(self):
5904 """Check prerequisites.
5906 This only checks the instance list against the existing names.
5909 force = self.force = self.op.force
5911 # checking the new params on the primary/secondary nodes
5913 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5914 assert self.instance is not None, \
5915 "Cannot retrieve locked instance %s" % self.op.instance_name
5916 pnode = instance.primary_node
5917 nodelist = list(instance.all_nodes)
5919 # hvparams processing
5920 if self.op.hvparams:
5921 i_hvdict = copy.deepcopy(instance.hvparams)
5922 for key, val in self.op.hvparams.iteritems():
5923 if val == constants.VALUE_DEFAULT:
5930 cluster = self.cfg.GetClusterInfo()
5931 utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5932 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5935 hypervisor.GetHypervisor(
5936 instance.hypervisor).CheckParameterSyntax(hv_new)
5937 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5938 self.hv_new = hv_new # the new actual values
5939 self.hv_inst = i_hvdict # the new dict (without defaults)
5941 self.hv_new = self.hv_inst = {}
5943 # beparams processing
5944 if self.op.beparams:
5945 i_bedict = copy.deepcopy(instance.beparams)
5946 for key, val in self.op.beparams.iteritems():
5947 if val == constants.VALUE_DEFAULT:
5954 cluster = self.cfg.GetClusterInfo()
5955 utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5956 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5958 self.be_new = be_new # the new actual values
5959 self.be_inst = i_bedict # the new dict (without defaults)
5961 self.be_new = self.be_inst = {}
5965 if constants.BE_MEMORY in self.op.beparams and not self.force:
5966 mem_check_list = [pnode]
5967 if be_new[constants.BE_AUTO_BALANCE]:
5968 # either we changed auto_balance to yes or it was from before
5969 mem_check_list.extend(instance.secondary_nodes)
5970 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5971 instance.hypervisor)
5972 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5973 instance.hypervisor)
5974 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5975 # Assume the primary node is unreachable and go ahead
5976 self.warn.append("Can't get info from primary node %s" % pnode)
5978 if not instance_info.failed and instance_info.data:
5979 current_mem = int(instance_info.data['memory'])
5981 # Assume instance not running
5982 # (there is a slight race condition here, but it's not very probable,
5983 # and we have no other way to check)
5985 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5986 nodeinfo[pnode].data['memory_free'])
5988 raise errors.OpPrereqError("This change will prevent the instance"
5989 " from starting, due to %d MB of memory"
5990 " missing on its primary node" % miss_mem)
5992 if be_new[constants.BE_AUTO_BALANCE]:
5993 for node, nres in nodeinfo.iteritems():
5994 if node not in instance.secondary_nodes:
5996 if nres.failed or not isinstance(nres.data, dict):
5997 self.warn.append("Can't get info from secondary node %s" % node)
5998 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5999 self.warn.append("Not enough memory to failover instance to"
6000 " secondary node %s" % node)
6003 for nic_op, nic_dict in self.op.nics:
6004 if nic_op == constants.DDM_REMOVE:
6005 if not instance.nics:
6006 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6008 if nic_op != constants.DDM_ADD:
6010 if nic_op < 0 or nic_op >= len(instance.nics):
6011 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6013 (nic_op, len(instance.nics)))
6014 if 'bridge' in nic_dict:
6015 nic_bridge = nic_dict['bridge']
6016 if nic_bridge is None:
6017 raise errors.OpPrereqError('Cannot set the nic bridge to None')
6018 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6019 msg = ("Bridge '%s' doesn't exist on one of"
6020 " the instance nodes" % nic_bridge)
6022 self.warn.append(msg)
6024 raise errors.OpPrereqError(msg)
6025 if 'mac' in nic_dict:
6026 nic_mac = nic_dict['mac']
6028 raise errors.OpPrereqError('Cannot set the nic mac to None')
6029 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6030 # otherwise generate the mac
6031 nic_dict['mac'] = self.cfg.GenerateMAC()
6033 # or validate/reserve the current one
6034 if self.cfg.IsMacInUse(nic_mac):
6035 raise errors.OpPrereqError("MAC address %s already in use"
6036 " in cluster" % nic_mac)
6039 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6040 raise errors.OpPrereqError("Disk operations not supported for"
6041 " diskless instances")
6042 for disk_op, disk_dict in self.op.disks:
6043 if disk_op == constants.DDM_REMOVE:
6044 if len(instance.disks) == 1:
6045 raise errors.OpPrereqError("Cannot remove the last disk of"
6047 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6048 ins_l = ins_l[pnode]
6049 if ins_l.failed or not isinstance(ins_l.data, list):
6050 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6051 if instance.name in ins_l.data:
6052 raise errors.OpPrereqError("Instance is running, can't remove"
6055 if (disk_op == constants.DDM_ADD and
6056 len(instance.nics) >= constants.MAX_DISKS):
6057 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6058 " add more" % constants.MAX_DISKS)
6059 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6061 if disk_op < 0 or disk_op >= len(instance.disks):
6062 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6064 (disk_op, len(instance.disks)))
6068 def Exec(self, feedback_fn):
6069 """Modifies an instance.
6071 All parameters take effect only at the next restart of the instance.
6074 # Process here the warnings from CheckPrereq, as we don't have a
6075 # feedback_fn there.
6076 for warn in self.warn:
6077 feedback_fn("WARNING: %s" % warn)
6080 instance = self.instance
6082 for disk_op, disk_dict in self.op.disks:
6083 if disk_op == constants.DDM_REMOVE:
6084 # remove the last disk
6085 device = instance.disks.pop()
6086 device_idx = len(instance.disks)
6087 for node, disk in device.ComputeNodeTree(instance.primary_node):
6088 self.cfg.SetDiskID(disk, node)
6089 msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6091 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6092 " continuing anyway", device_idx, node, msg)
6093 result.append(("disk/%d" % device_idx, "remove"))
6094 elif disk_op == constants.DDM_ADD:
6096 if instance.disk_template == constants.DT_FILE:
6097 file_driver, file_path = instance.disks[0].logical_id
6098 file_path = os.path.dirname(file_path)
6100 file_driver = file_path = None
6101 disk_idx_base = len(instance.disks)
6102 new_disk = _GenerateDiskTemplate(self,
6103 instance.disk_template,
6104 instance.name, instance.primary_node,
6105 instance.secondary_nodes,
6110 instance.disks.append(new_disk)
6111 info = _GetInstanceInfoText(instance)
6113 logging.info("Creating volume %s for instance %s",
6114 new_disk.iv_name, instance.name)
6115 # Note: this needs to be kept in sync with _CreateDisks
6117 for node in instance.all_nodes:
6118 f_create = node == instance.primary_node
6120 _CreateBlockDev(self, node, instance, new_disk,
6121 f_create, info, f_create)
6122 except errors.OpExecError, err:
6123 self.LogWarning("Failed to create volume %s (%s) on"
6125 new_disk.iv_name, new_disk, node, err)
6126 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6127 (new_disk.size, new_disk.mode)))
6129 # change a given disk
6130 instance.disks[disk_op].mode = disk_dict['mode']
6131 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6133 for nic_op, nic_dict in self.op.nics:
6134 if nic_op == constants.DDM_REMOVE:
6135 # remove the last nic
6136 del instance.nics[-1]
6137 result.append(("nic.%d" % len(instance.nics), "remove"))
6138 elif nic_op == constants.DDM_ADD:
6139 # mac and bridge should be set, by now
6140 mac = nic_dict['mac']
6141 bridge = nic_dict['bridge']
6142 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6144 instance.nics.append(new_nic)
6145 result.append(("nic.%d" % (len(instance.nics) - 1),
6146 "add:mac=%s,ip=%s,bridge=%s" %
6147 (new_nic.mac, new_nic.ip, new_nic.bridge)))
6149 # change a given nic
6150 for key in 'mac', 'ip', 'bridge':
6152 setattr(instance.nics[nic_op], key, nic_dict[key])
6153 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6156 if self.op.hvparams:
6157 instance.hvparams = self.hv_inst
6158 for key, val in self.op.hvparams.iteritems():
6159 result.append(("hv/%s" % key, val))
6162 if self.op.beparams:
6163 instance.beparams = self.be_inst
6164 for key, val in self.op.beparams.iteritems():
6165 result.append(("be/%s" % key, val))
6167 self.cfg.Update(instance)
6172 class LUQueryExports(NoHooksLU):
6173 """Query the exports list
6176 _OP_REQP = ['nodes']
6179 def ExpandNames(self):
6180 self.needed_locks = {}
6181 self.share_locks[locking.LEVEL_NODE] = 1
6182 if not self.op.nodes:
6183 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6185 self.needed_locks[locking.LEVEL_NODE] = \
6186 _GetWantedNodes(self, self.op.nodes)
6188 def CheckPrereq(self):
6189 """Check prerequisites.
6192 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6194 def Exec(self, feedback_fn):
6195 """Compute the list of all the exported system images.
6198 @return: a dictionary with the structure node->(export-list)
6199 where export-list is a list of the instances exported on
6203 rpcresult = self.rpc.call_export_list(self.nodes)
6205 for node in rpcresult:
6206 if rpcresult[node].failed:
6207 result[node] = False
6209 result[node] = rpcresult[node].data
6214 class LUExportInstance(LogicalUnit):
6215 """Export an instance to an image in the cluster.
6218 HPATH = "instance-export"
6219 HTYPE = constants.HTYPE_INSTANCE
6220 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6223 def ExpandNames(self):
6224 self._ExpandAndLockInstance()
6225 # FIXME: lock only instance primary and destination node
6227 # Sad but true, for now we have do lock all nodes, as we don't know where
6228 # the previous export might be, and and in this LU we search for it and
6229 # remove it from its current node. In the future we could fix this by:
6230 # - making a tasklet to search (share-lock all), then create the new one,
6231 # then one to remove, after
6232 # - removing the removal operation altoghether
6233 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6235 def DeclareLocks(self, level):
6236 """Last minute lock declaration."""
6237 # All nodes are locked anyway, so nothing to do here.
6239 def BuildHooksEnv(self):
6242 This will run on the master, primary node and target node.
6246 "EXPORT_NODE": self.op.target_node,
6247 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6249 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6250 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6251 self.op.target_node]
6254 def CheckPrereq(self):
6255 """Check prerequisites.
6257 This checks that the instance and node names are valid.
6260 instance_name = self.op.instance_name
6261 self.instance = self.cfg.GetInstanceInfo(instance_name)
6262 assert self.instance is not None, \
6263 "Cannot retrieve locked instance %s" % self.op.instance_name
6264 _CheckNodeOnline(self, self.instance.primary_node)
6266 self.dst_node = self.cfg.GetNodeInfo(
6267 self.cfg.ExpandNodeName(self.op.target_node))
6269 if self.dst_node is None:
6270 # This is wrong node name, not a non-locked node
6271 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6272 _CheckNodeOnline(self, self.dst_node.name)
6273 _CheckNodeNotDrained(self, self.dst_node.name)
6275 # instance disk type verification
6276 for disk in self.instance.disks:
6277 if disk.dev_type == constants.LD_FILE:
6278 raise errors.OpPrereqError("Export not supported for instances with"
6279 " file-based disks")
6281 def Exec(self, feedback_fn):
6282 """Export an instance to an image in the cluster.
6285 instance = self.instance
6286 dst_node = self.dst_node
6287 src_node = instance.primary_node
6288 if self.op.shutdown:
6289 # shutdown the instance, but not the disks
6290 result = self.rpc.call_instance_shutdown(src_node, instance)
6291 msg = result.RemoteFailMsg()
6293 raise errors.OpExecError("Could not shutdown instance %s on"
6295 (instance.name, src_node, msg))
6297 vgname = self.cfg.GetVGName()
6301 # set the disks ID correctly since call_instance_start needs the
6302 # correct drbd minor to create the symlinks
6303 for disk in instance.disks:
6304 self.cfg.SetDiskID(disk, src_node)
6307 for idx, disk in enumerate(instance.disks):
6308 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6309 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6310 if new_dev_name.failed or not new_dev_name.data:
6311 self.LogWarning("Could not snapshot disk/%d on node %s",
6313 snap_disks.append(False)
6315 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6316 logical_id=(vgname, new_dev_name.data),
6317 physical_id=(vgname, new_dev_name.data),
6318 iv_name=disk.iv_name)
6319 snap_disks.append(new_dev)
6322 if self.op.shutdown and instance.admin_up:
6323 result = self.rpc.call_instance_start(src_node, instance, None, None)
6324 msg = result.RemoteFailMsg()
6326 _ShutdownInstanceDisks(self, instance)
6327 raise errors.OpExecError("Could not start instance: %s" % msg)
6329 # TODO: check for size
6331 cluster_name = self.cfg.GetClusterName()
6332 for idx, dev in enumerate(snap_disks):
6334 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6335 instance, cluster_name, idx)
6336 if result.failed or not result.data:
6337 self.LogWarning("Could not export disk/%d from node %s to"
6338 " node %s", idx, src_node, dst_node.name)
6339 msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6341 self.LogWarning("Could not remove snapshot for disk/%d from node"
6342 " %s: %s", idx, src_node, msg)
6344 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6345 if result.failed or not result.data:
6346 self.LogWarning("Could not finalize export for instance %s on node %s",
6347 instance.name, dst_node.name)
6349 nodelist = self.cfg.GetNodeList()
6350 nodelist.remove(dst_node.name)
6352 # on one-node clusters nodelist will be empty after the removal
6353 # if we proceed the backup would be removed because OpQueryExports
6354 # substitutes an empty list with the full cluster node list.
6356 exportlist = self.rpc.call_export_list(nodelist)
6357 for node in exportlist:
6358 if exportlist[node].failed:
6360 if instance.name in exportlist[node].data:
6361 if not self.rpc.call_export_remove(node, instance.name):
6362 self.LogWarning("Could not remove older export for instance %s"
6363 " on node %s", instance.name, node)
6366 class LURemoveExport(NoHooksLU):
6367 """Remove exports related to the named instance.
6370 _OP_REQP = ["instance_name"]
6373 def ExpandNames(self):
6374 self.needed_locks = {}
6375 # We need all nodes to be locked in order for RemoveExport to work, but we
6376 # don't need to lock the instance itself, as nothing will happen to it (and
6377 # we can remove exports also for a removed instance)
6378 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6380 def CheckPrereq(self):
6381 """Check prerequisites.
6385 def Exec(self, feedback_fn):
6386 """Remove any export.
6389 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6390 # If the instance was not found we'll try with the name that was passed in.
6391 # This will only work if it was an FQDN, though.
6393 if not instance_name:
6395 instance_name = self.op.instance_name
6397 exportlist = self.rpc.call_export_list(self.acquired_locks[
6398 locking.LEVEL_NODE])
6400 for node in exportlist:
6401 if exportlist[node].failed:
6402 self.LogWarning("Failed to query node %s, continuing" % node)
6404 if instance_name in exportlist[node].data:
6406 result = self.rpc.call_export_remove(node, instance_name)
6407 if result.failed or not result.data:
6408 logging.error("Could not remove export for instance %s"
6409 " on node %s", instance_name, node)
6411 if fqdn_warn and not found:
6412 feedback_fn("Export not found. If trying to remove an export belonging"
6413 " to a deleted instance please use its Fully Qualified"
6417 class TagsLU(NoHooksLU):
6420 This is an abstract class which is the parent of all the other tags LUs.
6424 def ExpandNames(self):
6425 self.needed_locks = {}
6426 if self.op.kind == constants.TAG_NODE:
6427 name = self.cfg.ExpandNodeName(self.op.name)
6429 raise errors.OpPrereqError("Invalid node name (%s)" %
6432 self.needed_locks[locking.LEVEL_NODE] = name
6433 elif self.op.kind == constants.TAG_INSTANCE:
6434 name = self.cfg.ExpandInstanceName(self.op.name)
6436 raise errors.OpPrereqError("Invalid instance name (%s)" %
6439 self.needed_locks[locking.LEVEL_INSTANCE] = name
6441 def CheckPrereq(self):
6442 """Check prerequisites.
6445 if self.op.kind == constants.TAG_CLUSTER:
6446 self.target = self.cfg.GetClusterInfo()
6447 elif self.op.kind == constants.TAG_NODE:
6448 self.target = self.cfg.GetNodeInfo(self.op.name)
6449 elif self.op.kind == constants.TAG_INSTANCE:
6450 self.target = self.cfg.GetInstanceInfo(self.op.name)
6452 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6456 class LUGetTags(TagsLU):
6457 """Returns the tags of a given object.
6460 _OP_REQP = ["kind", "name"]
6463 def Exec(self, feedback_fn):
6464 """Returns the tag list.
6467 return list(self.target.GetTags())
6470 class LUSearchTags(NoHooksLU):
6471 """Searches the tags for a given pattern.
6474 _OP_REQP = ["pattern"]
6477 def ExpandNames(self):
6478 self.needed_locks = {}
6480 def CheckPrereq(self):
6481 """Check prerequisites.
6483 This checks the pattern passed for validity by compiling it.
6487 self.re = re.compile(self.op.pattern)
6488 except re.error, err:
6489 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6490 (self.op.pattern, err))
6492 def Exec(self, feedback_fn):
6493 """Returns the tag list.
6497 tgts = [("/cluster", cfg.GetClusterInfo())]
6498 ilist = cfg.GetAllInstancesInfo().values()
6499 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6500 nlist = cfg.GetAllNodesInfo().values()
6501 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6503 for path, target in tgts:
6504 for tag in target.GetTags():
6505 if self.re.search(tag):
6506 results.append((path, tag))
6510 class LUAddTags(TagsLU):
6511 """Sets a tag on a given object.
6514 _OP_REQP = ["kind", "name", "tags"]
6517 def CheckPrereq(self):
6518 """Check prerequisites.
6520 This checks the type and length of the tag name and value.
6523 TagsLU.CheckPrereq(self)
6524 for tag in self.op.tags:
6525 objects.TaggableObject.ValidateTag(tag)
6527 def Exec(self, feedback_fn):
6532 for tag in self.op.tags:
6533 self.target.AddTag(tag)
6534 except errors.TagError, err:
6535 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6537 self.cfg.Update(self.target)
6538 except errors.ConfigurationError:
6539 raise errors.OpRetryError("There has been a modification to the"
6540 " config file and the operation has been"
6541 " aborted. Please retry.")
6544 class LUDelTags(TagsLU):
6545 """Delete a list of tags from a given object.
6548 _OP_REQP = ["kind", "name", "tags"]
6551 def CheckPrereq(self):
6552 """Check prerequisites.
6554 This checks that we have the given tag.
6557 TagsLU.CheckPrereq(self)
6558 for tag in self.op.tags:
6559 objects.TaggableObject.ValidateTag(tag)
6560 del_tags = frozenset(self.op.tags)
6561 cur_tags = self.target.GetTags()
6562 if not del_tags <= cur_tags:
6563 diff_tags = del_tags - cur_tags
6564 diff_names = ["'%s'" % tag for tag in diff_tags]
6566 raise errors.OpPrereqError("Tag(s) %s not found" %
6567 (",".join(diff_names)))
6569 def Exec(self, feedback_fn):
6570 """Remove the tag from the object.
6573 for tag in self.op.tags:
6574 self.target.RemoveTag(tag)
6576 self.cfg.Update(self.target)
6577 except errors.ConfigurationError:
6578 raise errors.OpRetryError("There has been a modification to the"
6579 " config file and the operation has been"
6580 " aborted. Please retry.")
6583 class LUTestDelay(NoHooksLU):
6584 """Sleep for a specified amount of time.
6586 This LU sleeps on the master and/or nodes for a specified amount of
6590 _OP_REQP = ["duration", "on_master", "on_nodes"]
6593 def ExpandNames(self):
6594 """Expand names and set required locks.
6596 This expands the node list, if any.
6599 self.needed_locks = {}
6600 if self.op.on_nodes:
6601 # _GetWantedNodes can be used here, but is not always appropriate to use
6602 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6604 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6605 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6607 def CheckPrereq(self):
6608 """Check prerequisites.
6612 def Exec(self, feedback_fn):
6613 """Do the actual sleep.
6616 if self.op.on_master:
6617 if not utils.TestDelay(self.op.duration):
6618 raise errors.OpExecError("Error during master delay test")
6619 if self.op.on_nodes:
6620 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6622 raise errors.OpExecError("Complete failure from rpc call")
6623 for node, node_result in result.items():
6625 if not node_result.data:
6626 raise errors.OpExecError("Failure during rpc call to node %s,"
6627 " result: %s" % (node, node_result.data))
6630 class IAllocator(object):
6631 """IAllocator framework.
6633 An IAllocator instance has three sets of attributes:
6634 - cfg that is needed to query the cluster
6635 - input data (all members of the _KEYS class attribute are required)
6636 - four buffer attributes (in|out_data|text), that represent the
6637 input (to the external script) in text and data structure format,
6638 and the output from it, again in two formats
6639 - the result variables from the script (success, info, nodes) for
6644 "mem_size", "disks", "disk_template",
6645 "os", "tags", "nics", "vcpus", "hypervisor",
6651 def __init__(self, lu, mode, name, **kwargs):
6653 # init buffer variables
6654 self.in_text = self.out_text = self.in_data = self.out_data = None
6655 # init all input fields so that pylint is happy
6658 self.mem_size = self.disks = self.disk_template = None
6659 self.os = self.tags = self.nics = self.vcpus = None
6660 self.hypervisor = None
6661 self.relocate_from = None
6663 self.required_nodes = None
6664 # init result fields
6665 self.success = self.info = self.nodes = None
6666 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6667 keyset = self._ALLO_KEYS
6668 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6669 keyset = self._RELO_KEYS
6671 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6672 " IAllocator" % self.mode)
6674 if key not in keyset:
6675 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6676 " IAllocator" % key)
6677 setattr(self, key, kwargs[key])
6679 if key not in kwargs:
6680 raise errors.ProgrammerError("Missing input parameter '%s' to"
6681 " IAllocator" % key)
6682 self._BuildInputData()
6684 def _ComputeClusterData(self):
6685 """Compute the generic allocator input data.
6687 This is the data that is independent of the actual operation.
6691 cluster_info = cfg.GetClusterInfo()
6694 "version": constants.IALLOCATOR_VERSION,
6695 "cluster_name": cfg.GetClusterName(),
6696 "cluster_tags": list(cluster_info.GetTags()),
6697 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6698 # we don't have job IDs
6700 iinfo = cfg.GetAllInstancesInfo().values()
6701 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6705 node_list = cfg.GetNodeList()
6707 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6708 hypervisor_name = self.hypervisor
6709 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6710 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6712 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6714 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6715 cluster_info.enabled_hypervisors)
6716 for nname, nresult in node_data.items():
6717 # first fill in static (config-based) values
6718 ninfo = cfg.GetNodeInfo(nname)
6720 "tags": list(ninfo.GetTags()),
6721 "primary_ip": ninfo.primary_ip,
6722 "secondary_ip": ninfo.secondary_ip,
6723 "offline": ninfo.offline,
6724 "drained": ninfo.drained,
6725 "master_candidate": ninfo.master_candidate,
6728 if not ninfo.offline:
6730 if not isinstance(nresult.data, dict):
6731 raise errors.OpExecError("Can't get data for node %s" % nname)
6732 remote_info = nresult.data
6733 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6734 'vg_size', 'vg_free', 'cpu_total']:
6735 if attr not in remote_info:
6736 raise errors.OpExecError("Node '%s' didn't return attribute"
6737 " '%s'" % (nname, attr))
6739 remote_info[attr] = int(remote_info[attr])
6740 except ValueError, err:
6741 raise errors.OpExecError("Node '%s' returned invalid value"
6742 " for '%s': %s" % (nname, attr, err))
6743 # compute memory used by primary instances
6744 i_p_mem = i_p_up_mem = 0
6745 for iinfo, beinfo in i_list:
6746 if iinfo.primary_node == nname:
6747 i_p_mem += beinfo[constants.BE_MEMORY]
6748 if iinfo.name not in node_iinfo[nname].data:
6751 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6752 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6753 remote_info['memory_free'] -= max(0, i_mem_diff)
6756 i_p_up_mem += beinfo[constants.BE_MEMORY]
6758 # compute memory used by instances
6760 "total_memory": remote_info['memory_total'],
6761 "reserved_memory": remote_info['memory_dom0'],
6762 "free_memory": remote_info['memory_free'],
6763 "total_disk": remote_info['vg_size'],
6764 "free_disk": remote_info['vg_free'],
6765 "total_cpus": remote_info['cpu_total'],
6766 "i_pri_memory": i_p_mem,
6767 "i_pri_up_memory": i_p_up_mem,
6771 node_results[nname] = pnr
6772 data["nodes"] = node_results
6776 for iinfo, beinfo in i_list:
6777 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6778 for n in iinfo.nics]
6780 "tags": list(iinfo.GetTags()),
6781 "admin_up": iinfo.admin_up,
6782 "vcpus": beinfo[constants.BE_VCPUS],
6783 "memory": beinfo[constants.BE_MEMORY],
6785 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6787 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6788 "disk_template": iinfo.disk_template,
6789 "hypervisor": iinfo.hypervisor,
6791 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6793 instance_data[iinfo.name] = pir
6795 data["instances"] = instance_data
6799 def _AddNewInstance(self):
6800 """Add new instance data to allocator structure.
6802 This in combination with _AllocatorGetClusterData will create the
6803 correct structure needed as input for the allocator.
6805 The checks for the completeness of the opcode must have already been
6811 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6813 if self.disk_template in constants.DTS_NET_MIRROR:
6814 self.required_nodes = 2
6816 self.required_nodes = 1
6820 "disk_template": self.disk_template,
6823 "vcpus": self.vcpus,
6824 "memory": self.mem_size,
6825 "disks": self.disks,
6826 "disk_space_total": disk_space,
6828 "required_nodes": self.required_nodes,
6830 data["request"] = request
6832 def _AddRelocateInstance(self):
6833 """Add relocate instance data to allocator structure.
6835 This in combination with _IAllocatorGetClusterData will create the
6836 correct structure needed as input for the allocator.
6838 The checks for the completeness of the opcode must have already been
6842 instance = self.lu.cfg.GetInstanceInfo(self.name)
6843 if instance is None:
6844 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6845 " IAllocator" % self.name)
6847 if instance.disk_template not in constants.DTS_NET_MIRROR:
6848 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6850 if len(instance.secondary_nodes) != 1:
6851 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6853 self.required_nodes = 1
6854 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6855 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6860 "disk_space_total": disk_space,
6861 "required_nodes": self.required_nodes,
6862 "relocate_from": self.relocate_from,
6864 self.in_data["request"] = request
6866 def _BuildInputData(self):
6867 """Build input data structures.
6870 self._ComputeClusterData()
6872 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6873 self._AddNewInstance()
6875 self._AddRelocateInstance()
6877 self.in_text = serializer.Dump(self.in_data)
6879 def Run(self, name, validate=True, call_fn=None):
6880 """Run an instance allocator and return the results.
6884 call_fn = self.lu.rpc.call_iallocator_runner
6887 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6890 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6891 raise errors.OpExecError("Invalid result from master iallocator runner")
6893 rcode, stdout, stderr, fail = result.data
6895 if rcode == constants.IARUN_NOTFOUND:
6896 raise errors.OpExecError("Can't find allocator '%s'" % name)
6897 elif rcode == constants.IARUN_FAILURE:
6898 raise errors.OpExecError("Instance allocator call failed: %s,"
6899 " output: %s" % (fail, stdout+stderr))
6900 self.out_text = stdout
6902 self._ValidateResult()
6904 def _ValidateResult(self):
6905 """Process the allocator results.
6907 This will process and if successful save the result in
6908 self.out_data and the other parameters.
6912 rdict = serializer.Load(self.out_text)
6913 except Exception, err:
6914 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6916 if not isinstance(rdict, dict):
6917 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6919 for key in "success", "info", "nodes":
6920 if key not in rdict:
6921 raise errors.OpExecError("Can't parse iallocator results:"
6922 " missing key '%s'" % key)
6923 setattr(self, key, rdict[key])
6925 if not isinstance(rdict["nodes"], list):
6926 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6928 self.out_data = rdict
6931 class LUTestAllocator(NoHooksLU):
6932 """Run allocator tests.
6934 This LU runs the allocator tests
6937 _OP_REQP = ["direction", "mode", "name"]
6939 def CheckPrereq(self):
6940 """Check prerequisites.
6942 This checks the opcode parameters depending on the director and mode test.
6945 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6946 for attr in ["name", "mem_size", "disks", "disk_template",
6947 "os", "tags", "nics", "vcpus"]:
6948 if not hasattr(self.op, attr):
6949 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6951 iname = self.cfg.ExpandInstanceName(self.op.name)
6952 if iname is not None:
6953 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6955 if not isinstance(self.op.nics, list):
6956 raise errors.OpPrereqError("Invalid parameter 'nics'")
6957 for row in self.op.nics:
6958 if (not isinstance(row, dict) or
6961 "bridge" not in row):
6962 raise errors.OpPrereqError("Invalid contents of the"
6963 " 'nics' parameter")
6964 if not isinstance(self.op.disks, list):
6965 raise errors.OpPrereqError("Invalid parameter 'disks'")
6966 for row in self.op.disks:
6967 if (not isinstance(row, dict) or
6968 "size" not in row or
6969 not isinstance(row["size"], int) or
6970 "mode" not in row or
6971 row["mode"] not in ['r', 'w']):
6972 raise errors.OpPrereqError("Invalid contents of the"
6973 " 'disks' parameter")
6974 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6975 self.op.hypervisor = self.cfg.GetHypervisorType()
6976 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6977 if not hasattr(self.op, "name"):
6978 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6979 fname = self.cfg.ExpandInstanceName(self.op.name)
6981 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6983 self.op.name = fname
6984 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6986 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6989 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6990 if not hasattr(self.op, "allocator") or self.op.allocator is None:
6991 raise errors.OpPrereqError("Missing allocator name")
6992 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6993 raise errors.OpPrereqError("Wrong allocator test '%s'" %
6996 def Exec(self, feedback_fn):
6997 """Run the allocator test.
7000 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7001 ial = IAllocator(self,
7004 mem_size=self.op.mem_size,
7005 disks=self.op.disks,
7006 disk_template=self.op.disk_template,
7010 vcpus=self.op.vcpus,
7011 hypervisor=self.op.hypervisor,
7014 ial = IAllocator(self,
7017 relocate_from=list(self.relocate_from),
7020 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7021 result = ial.in_text
7023 ial.Run(self.op.allocator, validate=False)
7024 result = ial.out_text