4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 for attr_name in self._OP_REQP:
94 attr_val = getattr(op, attr_name, None)
96 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108 ssh = property(fget=__GetSSH)
110 def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments.
113 This method is for doing a simple syntactic check and ensure
114 validity of opcode parameters, without any cluster-related
115 checks. While the same can be accomplished in ExpandNames and/or
116 CheckPrereq, doing these separate is better because:
118 - ExpandNames is left as as purely a lock-related function
119 - CheckPrereq is run after we have aquired locks (and possible
122 The function is allowed to change the self.op attribute so that
123 later methods can no longer worry about missing parameters.
128 def ExpandNames(self):
129 """Expand names for this LU.
131 This method is called before starting to execute the opcode, and it should
132 update all the parameters of the opcode to their canonical form (e.g. a
133 short node name must be fully expanded after this method has successfully
134 completed). This way locking, hooks, logging, ecc. can work correctly.
136 LUs which implement this method must also populate the self.needed_locks
137 member, as a dict with lock levels as keys, and a list of needed lock names
140 - use an empty dict if you don't need any lock
141 - if you don't need any lock at a particular level omit that level
142 - don't put anything for the BGL level
143 - if you want all locks at a level use locking.ALL_SET as a value
145 If you need to share locks (rather than acquire them exclusively) at one
146 level you can modify self.share_locks, setting a true value (usually 1) for
147 that level. By default locks are not shared.
151 # Acquire all nodes and one instance
152 self.needed_locks = {
153 locking.LEVEL_NODE: locking.ALL_SET,
154 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156 # Acquire just two nodes
157 self.needed_locks = {
158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161 self.needed_locks = {} # No, you can't leave it to the default value None
164 # The implementation of this method is mandatory only if the new LU is
165 # concurrent, so that old LUs don't need to be changed all at the same
168 self.needed_locks = {} # Exclusive LUs don't need locks.
170 raise NotImplementedError
172 def DeclareLocks(self, level):
173 """Declare LU locking needs for a level
175 While most LUs can just declare their locking needs at ExpandNames time,
176 sometimes there's the need to calculate some locks after having acquired
177 the ones before. This function is called just before acquiring locks at a
178 particular level, but after acquiring the ones at lower levels, and permits
179 such calculations. It can be used to modify self.needed_locks, and by
180 default it does nothing.
182 This function is only called if you have something already set in
183 self.needed_locks for the level.
185 @param level: Locking level which is going to be locked
186 @type level: member of ganeti.locking.LEVELS
190 def CheckPrereq(self):
191 """Check prerequisites for this LU.
193 This method should check that the prerequisites for the execution
194 of this LU are fulfilled. It can do internode communication, but
195 it should be idempotent - no cluster or system changes are
198 The method should raise errors.OpPrereqError in case something is
199 not fulfilled. Its return value is ignored.
201 This method should also update all the parameters of the opcode to
202 their canonical form if it hasn't been done by ExpandNames before.
205 raise NotImplementedError
207 def Exec(self, feedback_fn):
210 This method should implement the actual work. It should raise
211 errors.OpExecError for failures that are somewhat dealt with in
215 raise NotImplementedError
217 def BuildHooksEnv(self):
218 """Build hooks environment for this LU.
220 This method should return a three-node tuple consisting of: a dict
221 containing the environment that will be used for running the
222 specific hook for this LU, a list of node names on which the hook
223 should run before the execution, and a list of node names on which
224 the hook should run after the execution.
226 The keys of the dict must not have 'GANETI_' prefixed as this will
227 be handled in the hooks runner. Also note additional keys will be
228 added by the hooks runner. If the LU doesn't define any
229 environment, an empty dict (and not None) should be returned.
231 No nodes should be returned as an empty list (and not None).
233 Note that if the HPATH for a LU class is None, this function will
237 raise NotImplementedError
239 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks.
242 This method is called every time a hooks phase is executed, and notifies
243 the Logical Unit about the hooks' result. The LU can then use it to alter
244 its result based on the hooks. By default the method does nothing and the
245 previous result is passed back unchanged but any LU can define it if it
246 wants to use the local cluster hook-scripts somehow.
248 @param phase: one of L{constants.HOOKS_PHASE_POST} or
249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250 @param hook_results: the results of the multi-node hooks rpc call
251 @param feedback_fn: function used send feedback back to the caller
252 @param lu_result: the previous Exec result this LU had, or None
254 @return: the new Exec result, based on the previous result
260 def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance.
263 Many LUs that work on an instance take its name in self.op.instance_name
264 and need to expand it and then declare the expanded name for locking. This
265 function does it, and then updates self.op.instance_name to the expanded
266 name. It also initializes needed_locks as a dict, if this hasn't been done
270 if self.needed_locks is None:
271 self.needed_locks = {}
273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274 "_ExpandAndLockInstance called with instance-level locks set"
275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276 if expanded_name is None:
277 raise errors.OpPrereqError("Instance '%s' not known" %
278 self.op.instance_name)
279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280 self.op.instance_name = expanded_name
282 def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking.
285 This function should be called after locking one or more instances to lock
286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287 with all primary or secondary nodes for instances already locked and
288 present in self.needed_locks[locking.LEVEL_INSTANCE].
290 It should be called from DeclareLocks, and for safety only works if
291 self.recalculate_locks[locking.LEVEL_NODE] is set.
293 In the future it may grow parameters to just lock some instance's nodes, or
294 to just lock primaries or secondary nodes, if needed.
296 If should be called in DeclareLocks in a way similar to::
298 if level == locking.LEVEL_NODE:
299 self._LockInstancesNodes()
301 @type primary_only: boolean
302 @param primary_only: only lock primary nodes of locked instances
305 assert locking.LEVEL_NODE in self.recalculate_locks, \
306 "_LockInstancesNodes helper function called with no nodes to recalculate"
308 # TODO: check if we're really been called with the instance locks held
310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311 # future we might want to have different behaviors depending on the value
312 # of self.recalculate_locks[locking.LEVEL_NODE]
314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315 instance = self.context.cfg.GetInstanceInfo(instance_name)
316 wanted_nodes.append(instance.primary_node)
318 wanted_nodes.extend(instance.secondary_nodes)
320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325 del self.recalculate_locks[locking.LEVEL_NODE]
328 class NoHooksLU(LogicalUnit):
329 """Simple LU which runs no hooks.
331 This LU is intended as a parent for other LogicalUnits which will
332 run no hooks, in order to reduce duplicate code.
339 def _GetWantedNodes(lu, nodes):
340 """Returns list of checked and expanded node names.
342 @type lu: L{LogicalUnit}
343 @param lu: the logical unit on whose behalf we execute
345 @param nodes: list of node names or None for all nodes
347 @return: the list of nodes, sorted
348 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351 if not isinstance(nodes, list):
352 raise errors.OpPrereqError("Invalid argument type 'nodes'")
355 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356 " non-empty list of nodes whose name is to be expanded.")
360 node = lu.cfg.ExpandNodeName(name)
362 raise errors.OpPrereqError("No such node name '%s'" % name)
365 return utils.NiceSort(wanted)
368 def _GetWantedInstances(lu, instances):
369 """Returns list of checked and expanded instance names.
371 @type lu: L{LogicalUnit}
372 @param lu: the logical unit on whose behalf we execute
373 @type instances: list
374 @param instances: list of instance names or None for all instances
376 @return: the list of instances, sorted
377 @raise errors.OpPrereqError: if the instances parameter is wrong type
378 @raise errors.OpPrereqError: if any of the passed instances is not found
381 if not isinstance(instances, list):
382 raise errors.OpPrereqError("Invalid argument type 'instances'")
387 for name in instances:
388 instance = lu.cfg.ExpandInstanceName(name)
390 raise errors.OpPrereqError("No such instance name '%s'" % name)
391 wanted.append(instance)
394 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398 def _CheckOutputFields(static, dynamic, selected):
399 """Checks whether all selected fields are valid.
401 @type static: L{utils.FieldSet}
402 @param static: static fields set
403 @type dynamic: L{utils.FieldSet}
404 @param dynamic: dynamic fields set
411 delta = f.NonMatching(selected)
413 raise errors.OpPrereqError("Unknown output fields selected: %s"
417 def _CheckBooleanOpField(op, name):
418 """Validates boolean opcode parameters.
420 This will ensure that an opcode parameter is either a boolean value,
421 or None (but that it always exists).
424 val = getattr(op, name, None)
425 if not (val is None or isinstance(val, bool)):
426 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428 setattr(op, name, val)
431 def _CheckNodeOnline(lu, node):
432 """Ensure that a given node is online.
434 @param lu: the LU on behalf of which we make the check
435 @param node: the node to check
436 @raise errors.OpPrereqError: if the node is offline
439 if lu.cfg.GetNodeInfo(node).offline:
440 raise errors.OpPrereqError("Can't use offline node %s" % node)
443 def _CheckNodeNotDrained(lu, node):
444 """Ensure that a given node is not drained.
446 @param lu: the LU on behalf of which we make the check
447 @param node: the node to check
448 @raise errors.OpPrereqError: if the node is drained
451 if lu.cfg.GetNodeInfo(node).drained:
452 raise errors.OpPrereqError("Can't use drained node %s" % node)
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456 memory, vcpus, nics, disk_template, disks,
457 bep, hvp, hypervisor):
458 """Builds instance related env variables for hooks
460 This builds the hook environment from individual variables.
463 @param name: the name of the instance
464 @type primary_node: string
465 @param primary_node: the name of the instance's primary node
466 @type secondary_nodes: list
467 @param secondary_nodes: list of secondary nodes as strings
468 @type os_type: string
469 @param os_type: the name of the instance's OS
470 @type status: boolean
471 @param status: the should_run status of the instance
473 @param memory: the memory size of the instance
475 @param vcpus: the count of VCPUs the instance has
477 @param nics: list of tuples (ip, bridge, mac) representing
478 the NICs the instance has
479 @type disk_template: string
480 @param disk_template: the distk template of the instance
482 @param disks: the list of (size, mode) pairs
484 @param bep: the backend parameters for the instance
486 @param hvp: the hypervisor parameters for the instance
487 @type hypervisor: string
488 @param hypervisor: the hypervisor for the instance
490 @return: the hook environment for this instance
499 "INSTANCE_NAME": name,
500 "INSTANCE_PRIMARY": primary_node,
501 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502 "INSTANCE_OS_TYPE": os_type,
503 "INSTANCE_STATUS": str_status,
504 "INSTANCE_MEMORY": memory,
505 "INSTANCE_VCPUS": vcpus,
506 "INSTANCE_DISK_TEMPLATE": disk_template,
507 "INSTANCE_HYPERVISOR": hypervisor,
511 nic_count = len(nics)
512 for idx, (ip, bridge, mac) in enumerate(nics):
515 env["INSTANCE_NIC%d_IP" % idx] = ip
516 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517 env["INSTANCE_NIC%d_MAC" % idx] = mac
521 env["INSTANCE_NIC_COUNT"] = nic_count
524 disk_count = len(disks)
525 for idx, (size, mode) in enumerate(disks):
526 env["INSTANCE_DISK%d_SIZE" % idx] = size
527 env["INSTANCE_DISK%d_MODE" % idx] = mode
531 env["INSTANCE_DISK_COUNT"] = disk_count
533 for source, kind in [(bep, "BE"), (hvp, "HV")]:
534 for key, value in source.items():
535 env["INSTANCE_%s_%s" % (kind, key)] = value
540 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541 """Builds instance related env variables for hooks from an object.
543 @type lu: L{LogicalUnit}
544 @param lu: the logical unit on whose behalf we execute
545 @type instance: L{objects.Instance}
546 @param instance: the instance for which we should build the
549 @param override: dictionary with key/values that will override
552 @return: the hook environment dictionary
555 cluster = lu.cfg.GetClusterInfo()
556 bep = cluster.FillBE(instance)
557 hvp = cluster.FillHV(instance)
559 'name': instance.name,
560 'primary_node': instance.primary_node,
561 'secondary_nodes': instance.secondary_nodes,
562 'os_type': instance.os,
563 'status': instance.admin_up,
564 'memory': bep[constants.BE_MEMORY],
565 'vcpus': bep[constants.BE_VCPUS],
566 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567 'disk_template': instance.disk_template,
568 'disks': [(disk.size, disk.mode) for disk in instance.disks],
571 'hypervisor': instance.hypervisor,
574 args.update(override)
575 return _BuildInstanceHookEnv(**args)
578 def _AdjustCandidatePool(lu):
579 """Adjust the candidate pool after node operations.
582 mod_list = lu.cfg.MaintainCandidatePool()
584 lu.LogInfo("Promoted nodes to master candidate role: %s",
585 ", ".join(node.name for node in mod_list))
586 for name in mod_list:
587 lu.context.ReaddNode(name)
588 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
590 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
594 def _CheckInstanceBridgesExist(lu, instance):
595 """Check that the brigdes needed by an instance exist.
598 # check bridges existance
599 brlist = [nic.bridge for nic in instance.nics]
600 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
603 raise errors.OpPrereqError("One or more target bridges %s does not"
604 " exist on destination node '%s'" %
605 (brlist, instance.primary_node))
608 class LUDestroyCluster(NoHooksLU):
609 """Logical unit for destroying the cluster.
614 def CheckPrereq(self):
615 """Check prerequisites.
617 This checks whether the cluster is empty.
619 Any errors are signalled by raising errors.OpPrereqError.
622 master = self.cfg.GetMasterNode()
624 nodelist = self.cfg.GetNodeList()
625 if len(nodelist) != 1 or nodelist[0] != master:
626 raise errors.OpPrereqError("There are still %d node(s) in"
627 " this cluster." % (len(nodelist) - 1))
628 instancelist = self.cfg.GetInstanceList()
630 raise errors.OpPrereqError("There are still %d instance(s) in"
631 " this cluster." % len(instancelist))
633 def Exec(self, feedback_fn):
634 """Destroys the cluster.
637 master = self.cfg.GetMasterNode()
638 result = self.rpc.call_node_stop_master(master, False)
641 raise errors.OpExecError("Could not disable the master role")
642 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643 utils.CreateBackup(priv_key)
644 utils.CreateBackup(pub_key)
648 class LUVerifyCluster(LogicalUnit):
649 """Verifies the cluster status.
652 HPATH = "cluster-verify"
653 HTYPE = constants.HTYPE_CLUSTER
654 _OP_REQP = ["skip_checks"]
657 def ExpandNames(self):
658 self.needed_locks = {
659 locking.LEVEL_NODE: locking.ALL_SET,
660 locking.LEVEL_INSTANCE: locking.ALL_SET,
662 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
664 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665 node_result, feedback_fn, master_files,
667 """Run multiple tests against a node.
671 - compares ganeti version
672 - checks vg existance and size > 20G
673 - checks config file checksum
674 - checks ssh to other nodes
676 @type nodeinfo: L{objects.Node}
677 @param nodeinfo: the node to check
678 @param file_list: required list of files
679 @param local_cksum: dictionary of local files and their checksums
680 @param node_result: the results from the node
681 @param feedback_fn: function used to accumulate results
682 @param master_files: list of files that only masters should have
683 @param drbd_map: the useddrbd minors for this node, in
684 form of minor: (instance, must_exist) which correspond to instances
685 and their running status
686 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
691 # main result, node_result should be a non-empty dict
692 if not node_result or not isinstance(node_result, dict):
693 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
696 # compares ganeti version
697 local_version = constants.PROTOCOL_VERSION
698 remote_version = node_result.get('version', None)
699 if not (remote_version and isinstance(remote_version, (list, tuple)) and
700 len(remote_version) == 2):
701 feedback_fn(" - ERROR: connection to %s failed" % (node))
704 if local_version != remote_version[0]:
705 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
706 " node %s %s" % (local_version, node, remote_version[0]))
709 # node seems compatible, we can actually try to look into its results
713 # full package version
714 if constants.RELEASE_VERSION != remote_version[1]:
715 feedback_fn(" - WARNING: software version mismatch: master %s,"
717 (constants.RELEASE_VERSION, node, remote_version[1]))
719 # checks vg existence and size > 20G
720 if vg_name is not None:
721 vglist = node_result.get(constants.NV_VGLIST, None)
723 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
727 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728 constants.MIN_VG_SIZE)
730 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
733 # checks config file checksum
735 remote_cksum = node_result.get(constants.NV_FILELIST, None)
736 if not isinstance(remote_cksum, dict):
738 feedback_fn(" - ERROR: node hasn't returned file checksum data")
740 for file_name in file_list:
741 node_is_mc = nodeinfo.master_candidate
742 must_have_file = file_name not in master_files
743 if file_name not in remote_cksum:
744 if node_is_mc or must_have_file:
746 feedback_fn(" - ERROR: file '%s' missing" % file_name)
747 elif remote_cksum[file_name] != local_cksum[file_name]:
748 if node_is_mc or must_have_file:
750 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
752 # not candidate and this is not a must-have file
754 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
757 # all good, except non-master/non-must have combination
758 if not node_is_mc and not must_have_file:
759 feedback_fn(" - ERROR: file '%s' should not exist on non master"
760 " candidates" % file_name)
764 if constants.NV_NODELIST not in node_result:
766 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
768 if node_result[constants.NV_NODELIST]:
770 for node in node_result[constants.NV_NODELIST]:
771 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
772 (node, node_result[constants.NV_NODELIST][node]))
774 if constants.NV_NODENETTEST not in node_result:
776 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
778 if node_result[constants.NV_NODENETTEST]:
780 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
782 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
783 (node, node_result[constants.NV_NODENETTEST][node]))
785 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786 if isinstance(hyp_result, dict):
787 for hv_name, hv_result in hyp_result.iteritems():
788 if hv_result is not None:
789 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
790 (hv_name, hv_result))
792 # check used drbd list
793 if vg_name is not None:
794 used_minors = node_result.get(constants.NV_DRBDLIST, [])
795 if not isinstance(used_minors, (tuple, list)):
796 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
799 for minor, (iname, must_exist) in drbd_map.items():
800 if minor not in used_minors and must_exist:
801 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
802 " not active" % (minor, iname))
804 for minor in used_minors:
805 if minor not in drbd_map:
806 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
812 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813 node_instance, feedback_fn, n_offline):
814 """Verify an instance.
816 This function checks to see if the required block devices are
817 available on the instance's node.
822 node_current = instanceconfig.primary_node
825 instanceconfig.MapLVsByNode(node_vol_should)
827 for node in node_vol_should:
828 if node in n_offline:
829 # ignore missing volumes on offline nodes
831 for volume in node_vol_should[node]:
832 if node not in node_vol_is or volume not in node_vol_is[node]:
833 feedback_fn(" - ERROR: volume %s missing on node %s" %
837 if instanceconfig.admin_up:
838 if ((node_current not in node_instance or
839 not instance in node_instance[node_current]) and
840 node_current not in n_offline):
841 feedback_fn(" - ERROR: instance %s not running on node %s" %
842 (instance, node_current))
845 for node in node_instance:
846 if (not node == node_current):
847 if instance in node_instance[node]:
848 feedback_fn(" - ERROR: instance %s should not run on node %s" %
854 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855 """Verify if there are any unknown volumes in the cluster.
857 The .os, .swap and backup volumes are ignored. All other volumes are
863 for node in node_vol_is:
864 for volume in node_vol_is[node]:
865 if node not in node_vol_should or volume not in node_vol_should[node]:
866 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
871 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872 """Verify the list of running instances.
874 This checks what instances are running but unknown to the cluster.
878 for node in node_instance:
879 for runninginstance in node_instance[node]:
880 if runninginstance not in instancelist:
881 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
882 (runninginstance, node))
886 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887 """Verify N+1 Memory Resilience.
889 Check that if one single node dies we can still start all the instances it
895 for node, nodeinfo in node_info.iteritems():
896 # This code checks that every node which is now listed as secondary has
897 # enough memory to host all instances it is supposed to should a single
898 # other node in the cluster fail.
899 # FIXME: not ready for failover to an arbitrary node
900 # FIXME: does not support file-backed instances
901 # WARNING: we currently take into account down instances as well as up
902 # ones, considering that even if they're down someone might want to start
903 # them even in the event of a node failure.
904 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
906 for instance in instances:
907 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908 if bep[constants.BE_AUTO_BALANCE]:
909 needed_mem += bep[constants.BE_MEMORY]
910 if nodeinfo['mfree'] < needed_mem:
911 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
912 " failovers should node %s fail" % (node, prinode))
916 def CheckPrereq(self):
917 """Check prerequisites.
919 Transform the list of checks we're going to skip into a set and check that
920 all its members are valid.
923 self.skip_set = frozenset(self.op.skip_checks)
924 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925 raise errors.OpPrereqError("Invalid checks to be skipped specified")
927 def BuildHooksEnv(self):
930 Cluster-Verify hooks just rone in the post phase and their failure makes
931 the output be logged in the verify output and the verification to fail.
934 all_nodes = self.cfg.GetNodeList()
936 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
938 for node in self.cfg.GetAllNodesInfo().values():
939 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
941 return env, [], all_nodes
943 def Exec(self, feedback_fn):
944 """Verify integrity of cluster, performing various test on nodes.
948 feedback_fn("* Verifying global settings")
949 for msg in self.cfg.VerifyConfig():
950 feedback_fn(" - ERROR: %s" % msg)
952 vg_name = self.cfg.GetVGName()
953 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954 nodelist = utils.NiceSort(self.cfg.GetNodeList())
955 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958 for iname in instancelist)
959 i_non_redundant = [] # Non redundant instances
960 i_non_a_balanced = [] # Non auto-balanced instances
961 n_offline = [] # List of offline nodes
962 n_drained = [] # List of nodes being drained
968 # FIXME: verify OS list
970 master_files = [constants.CLUSTER_CONF_FILE]
972 file_names = ssconf.SimpleStore().GetFileList()
973 file_names.append(constants.SSL_CERT_FILE)
974 file_names.append(constants.RAPI_CERT_FILE)
975 file_names.extend(master_files)
977 local_checksums = utils.FingerprintFiles(file_names)
979 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980 node_verify_param = {
981 constants.NV_FILELIST: file_names,
982 constants.NV_NODELIST: [node.name for node in nodeinfo
983 if not node.offline],
984 constants.NV_HYPERVISOR: hypervisors,
985 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986 node.secondary_ip) for node in nodeinfo
987 if not node.offline],
988 constants.NV_INSTANCELIST: hypervisors,
989 constants.NV_VERSION: None,
990 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
992 if vg_name is not None:
993 node_verify_param[constants.NV_VGLIST] = None
994 node_verify_param[constants.NV_LVLIST] = vg_name
995 node_verify_param[constants.NV_DRBDLIST] = None
996 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997 self.cfg.GetClusterName())
999 cluster = self.cfg.GetClusterInfo()
1000 master_node = self.cfg.GetMasterNode()
1001 all_drbd_map = self.cfg.ComputeDRBDMap()
1003 for node_i in nodeinfo:
1005 nresult = all_nvinfo[node].data
1008 feedback_fn("* Skipping offline node %s" % (node,))
1009 n_offline.append(node)
1012 if node == master_node:
1014 elif node_i.master_candidate:
1015 ntype = "master candidate"
1016 elif node_i.drained:
1018 n_drained.append(node)
1021 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1023 if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024 feedback_fn(" - ERROR: connection to %s failed" % (node,))
1029 for minor, instance in all_drbd_map[node].items():
1030 if instance not in instanceinfo:
1031 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1033 # ghost instance should not be running, but otherwise we
1034 # don't give double warnings (both ghost instance and
1035 # unallocated minor in use)
1036 node_drbd[minor] = (instance, False)
1038 instance = instanceinfo[instance]
1039 node_drbd[minor] = (instance.name, instance.admin_up)
1040 result = self._VerifyNode(node_i, file_names, local_checksums,
1041 nresult, feedback_fn, master_files,
1045 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1047 node_volume[node] = {}
1048 elif isinstance(lvdata, basestring):
1049 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1050 (node, utils.SafeEncode(lvdata)))
1052 node_volume[node] = {}
1053 elif not isinstance(lvdata, dict):
1054 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1058 node_volume[node] = lvdata
1061 idata = nresult.get(constants.NV_INSTANCELIST, None)
1062 if not isinstance(idata, list):
1063 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1068 node_instance[node] = idata
1071 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072 if not isinstance(nodeinfo, dict):
1073 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1079 "mfree": int(nodeinfo['memory_free']),
1082 # dictionary holding all instances this node is secondary for,
1083 # grouped by their primary node. Each key is a cluster node, and each
1084 # value is a list of instances which have the key as primary and the
1085 # current node as secondary. this is handy to calculate N+1 memory
1086 # availability if you can only failover from a primary to its
1088 "sinst-by-pnode": {},
1090 # FIXME: devise a free space model for file based instances as well
1091 if vg_name is not None:
1092 if (constants.NV_VGLIST not in nresult or
1093 vg_name not in nresult[constants.NV_VGLIST]):
1094 feedback_fn(" - ERROR: node %s didn't return data for the"
1095 " volume group '%s' - it is either missing or broken" %
1099 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100 except (ValueError, KeyError):
1101 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1102 " from node %s" % (node,))
1106 node_vol_should = {}
1108 for instance in instancelist:
1109 feedback_fn("* Verifying instance %s" % instance)
1110 inst_config = instanceinfo[instance]
1111 result = self._VerifyInstance(instance, inst_config, node_volume,
1112 node_instance, feedback_fn, n_offline)
1114 inst_nodes_offline = []
1116 inst_config.MapLVsByNode(node_vol_should)
1118 instance_cfg[instance] = inst_config
1120 pnode = inst_config.primary_node
1121 if pnode in node_info:
1122 node_info[pnode]['pinst'].append(instance)
1123 elif pnode not in n_offline:
1124 feedback_fn(" - ERROR: instance %s, connection to primary node"
1125 " %s failed" % (instance, pnode))
1128 if pnode in n_offline:
1129 inst_nodes_offline.append(pnode)
1131 # If the instance is non-redundant we cannot survive losing its primary
1132 # node, so we are not N+1 compliant. On the other hand we have no disk
1133 # templates with more than one secondary so that situation is not well
1135 # FIXME: does not support file-backed instances
1136 if len(inst_config.secondary_nodes) == 0:
1137 i_non_redundant.append(instance)
1138 elif len(inst_config.secondary_nodes) > 1:
1139 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1142 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143 i_non_a_balanced.append(instance)
1145 for snode in inst_config.secondary_nodes:
1146 if snode in node_info:
1147 node_info[snode]['sinst'].append(instance)
1148 if pnode not in node_info[snode]['sinst-by-pnode']:
1149 node_info[snode]['sinst-by-pnode'][pnode] = []
1150 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151 elif snode not in n_offline:
1152 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1153 " %s failed" % (instance, snode))
1155 if snode in n_offline:
1156 inst_nodes_offline.append(snode)
1158 if inst_nodes_offline:
1159 # warn that the instance lives on offline nodes, and set bad=True
1160 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1161 ", ".join(inst_nodes_offline))
1164 feedback_fn("* Verifying orphan volumes")
1165 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1169 feedback_fn("* Verifying remaining instances")
1170 result = self._VerifyOrphanInstances(instancelist, node_instance,
1174 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175 feedback_fn("* Verifying N+1 Memory redundancy")
1176 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1179 feedback_fn("* Other Notes")
1181 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1182 % len(i_non_redundant))
1184 if i_non_a_balanced:
1185 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1186 % len(i_non_a_balanced))
1189 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1192 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1196 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197 """Analize the post-hooks' result
1199 This method analyses the hook result, handles it, and sends some
1200 nicely-formatted feedback back to the user.
1202 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204 @param hooks_results: the results of the multi-node hooks rpc call
1205 @param feedback_fn: function used send feedback back to the caller
1206 @param lu_result: previous Exec result
1207 @return: the new Exec result, based on the previous result
1211 # We only really run POST phase hooks, and are only interested in
1213 if phase == constants.HOOKS_PHASE_POST:
1214 # Used to change hooks' output to proper indentation
1215 indent_re = re.compile('^', re.M)
1216 feedback_fn("* Hooks Results")
1217 if not hooks_results:
1218 feedback_fn(" - ERROR: general communication failure")
1221 for node_name in hooks_results:
1222 show_node_header = True
1223 res = hooks_results[node_name]
1224 if res.failed or res.data is False or not isinstance(res.data, list):
1226 # no need to warn or set fail return value
1228 feedback_fn(" Communication failure in hooks execution")
1231 for script, hkr, output in res.data:
1232 if hkr == constants.HKR_FAIL:
1233 # The node header is only shown once, if there are
1234 # failing hooks on that node
1235 if show_node_header:
1236 feedback_fn(" Node %s:" % node_name)
1237 show_node_header = False
1238 feedback_fn(" ERROR: Script %s failed, output:" % script)
1239 output = indent_re.sub(' ', output)
1240 feedback_fn("%s" % output)
1246 class LUVerifyDisks(NoHooksLU):
1247 """Verifies the cluster disks status.
1253 def ExpandNames(self):
1254 self.needed_locks = {
1255 locking.LEVEL_NODE: locking.ALL_SET,
1256 locking.LEVEL_INSTANCE: locking.ALL_SET,
1258 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1260 def CheckPrereq(self):
1261 """Check prerequisites.
1263 This has no prerequisites.
1268 def Exec(self, feedback_fn):
1269 """Verify integrity of cluster disks.
1272 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1274 vg_name = self.cfg.GetVGName()
1275 nodes = utils.NiceSort(self.cfg.GetNodeList())
1276 instances = [self.cfg.GetInstanceInfo(name)
1277 for name in self.cfg.GetInstanceList()]
1280 for inst in instances:
1282 if (not inst.admin_up or
1283 inst.disk_template not in constants.DTS_NET_MIRROR):
1285 inst.MapLVsByNode(inst_lvs)
1286 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287 for node, vol_list in inst_lvs.iteritems():
1288 for vol in vol_list:
1289 nv_dict[(node, vol)] = inst
1294 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1299 lvs = node_lvs[node]
1302 self.LogWarning("Connection to node %s failed: %s" %
1306 if isinstance(lvs, basestring):
1307 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308 res_nlvm[node] = lvs
1310 elif not isinstance(lvs, dict):
1311 logging.warning("Connection to node %s failed or invalid data"
1313 res_nodes.append(node)
1316 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317 inst = nv_dict.pop((node, lv_name), None)
1318 if (not lv_online and inst is not None
1319 and inst.name not in res_instances):
1320 res_instances.append(inst.name)
1322 # any leftover items in nv_dict are missing LVs, let's arrange the
1324 for key, inst in nv_dict.iteritems():
1325 if inst.name not in res_missing:
1326 res_missing[inst.name] = []
1327 res_missing[inst.name].append(key)
1332 class LURenameCluster(LogicalUnit):
1333 """Rename the cluster.
1336 HPATH = "cluster-rename"
1337 HTYPE = constants.HTYPE_CLUSTER
1340 def BuildHooksEnv(self):
1345 "OP_TARGET": self.cfg.GetClusterName(),
1346 "NEW_NAME": self.op.name,
1348 mn = self.cfg.GetMasterNode()
1349 return env, [mn], [mn]
1351 def CheckPrereq(self):
1352 """Verify that the passed name is a valid one.
1355 hostname = utils.HostInfo(self.op.name)
1357 new_name = hostname.name
1358 self.ip = new_ip = hostname.ip
1359 old_name = self.cfg.GetClusterName()
1360 old_ip = self.cfg.GetMasterIP()
1361 if new_name == old_name and new_ip == old_ip:
1362 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1363 " cluster has changed")
1364 if new_ip != old_ip:
1365 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1366 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1367 " reachable on the network. Aborting." %
1370 self.op.name = new_name
1372 def Exec(self, feedback_fn):
1373 """Rename the cluster.
1376 clustername = self.op.name
1379 # shutdown the master IP
1380 master = self.cfg.GetMasterNode()
1381 result = self.rpc.call_node_stop_master(master, False)
1382 if result.failed or not result.data:
1383 raise errors.OpExecError("Could not disable the master role")
1386 cluster = self.cfg.GetClusterInfo()
1387 cluster.cluster_name = clustername
1388 cluster.master_ip = ip
1389 self.cfg.Update(cluster)
1391 # update the known hosts file
1392 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1393 node_list = self.cfg.GetNodeList()
1395 node_list.remove(master)
1398 result = self.rpc.call_upload_file(node_list,
1399 constants.SSH_KNOWN_HOSTS_FILE)
1400 for to_node, to_result in result.iteritems():
1401 if to_result.failed or not to_result.data:
1402 logging.error("Copy of file %s to node %s failed",
1403 constants.SSH_KNOWN_HOSTS_FILE, to_node)
1406 result = self.rpc.call_node_start_master(master, False)
1407 if result.failed or not result.data:
1408 self.LogWarning("Could not re-enable the master role on"
1409 " the master, please restart manually.")
1412 def _RecursiveCheckIfLVMBased(disk):
1413 """Check if the given disk or its children are lvm-based.
1415 @type disk: L{objects.Disk}
1416 @param disk: the disk to check
1418 @return: boolean indicating whether a LD_LV dev_type was found or not
1422 for chdisk in disk.children:
1423 if _RecursiveCheckIfLVMBased(chdisk):
1425 return disk.dev_type == constants.LD_LV
1428 class LUSetClusterParams(LogicalUnit):
1429 """Change the parameters of the cluster.
1432 HPATH = "cluster-modify"
1433 HTYPE = constants.HTYPE_CLUSTER
1437 def CheckArguments(self):
1441 if not hasattr(self.op, "candidate_pool_size"):
1442 self.op.candidate_pool_size = None
1443 if self.op.candidate_pool_size is not None:
1445 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1446 except (ValueError, TypeError), err:
1447 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1449 if self.op.candidate_pool_size < 1:
1450 raise errors.OpPrereqError("At least one master candidate needed")
1452 def ExpandNames(self):
1453 # FIXME: in the future maybe other cluster params won't require checking on
1454 # all nodes to be modified.
1455 self.needed_locks = {
1456 locking.LEVEL_NODE: locking.ALL_SET,
1458 self.share_locks[locking.LEVEL_NODE] = 1
1460 def BuildHooksEnv(self):
1465 "OP_TARGET": self.cfg.GetClusterName(),
1466 "NEW_VG_NAME": self.op.vg_name,
1468 mn = self.cfg.GetMasterNode()
1469 return env, [mn], [mn]
1471 def CheckPrereq(self):
1472 """Check prerequisites.
1474 This checks whether the given params don't conflict and
1475 if the given volume group is valid.
1478 if self.op.vg_name is not None and not self.op.vg_name:
1479 instances = self.cfg.GetAllInstancesInfo().values()
1480 for inst in instances:
1481 for disk in inst.disks:
1482 if _RecursiveCheckIfLVMBased(disk):
1483 raise errors.OpPrereqError("Cannot disable lvm storage while"
1484 " lvm-based instances exist")
1486 node_list = self.acquired_locks[locking.LEVEL_NODE]
1488 # if vg_name not None, checks given volume group on all nodes
1490 vglist = self.rpc.call_vg_list(node_list)
1491 for node in node_list:
1492 if vglist[node].failed:
1493 # ignoring down node
1494 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1496 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1498 constants.MIN_VG_SIZE)
1500 raise errors.OpPrereqError("Error on node '%s': %s" %
1503 self.cluster = cluster = self.cfg.GetClusterInfo()
1504 # validate beparams changes
1505 if self.op.beparams:
1506 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1507 self.new_beparams = cluster.FillDict(
1508 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1510 # hypervisor list/parameters
1511 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1512 if self.op.hvparams:
1513 if not isinstance(self.op.hvparams, dict):
1514 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1515 for hv_name, hv_dict in self.op.hvparams.items():
1516 if hv_name not in self.new_hvparams:
1517 self.new_hvparams[hv_name] = hv_dict
1519 self.new_hvparams[hv_name].update(hv_dict)
1521 if self.op.enabled_hypervisors is not None:
1522 self.hv_list = self.op.enabled_hypervisors
1524 self.hv_list = cluster.enabled_hypervisors
1526 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1527 # either the enabled list has changed, or the parameters have, validate
1528 for hv_name, hv_params in self.new_hvparams.items():
1529 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1530 (self.op.enabled_hypervisors and
1531 hv_name in self.op.enabled_hypervisors)):
1532 # either this is a new hypervisor, or its parameters have changed
1533 hv_class = hypervisor.GetHypervisor(hv_name)
1534 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1535 hv_class.CheckParameterSyntax(hv_params)
1536 _CheckHVParams(self, node_list, hv_name, hv_params)
1538 def Exec(self, feedback_fn):
1539 """Change the parameters of the cluster.
1542 if self.op.vg_name is not None:
1543 new_volume = self.op.vg_name
1546 if new_volume != self.cfg.GetVGName():
1547 self.cfg.SetVGName(new_volume)
1549 feedback_fn("Cluster LVM configuration already in desired"
1550 " state, not changing")
1551 if self.op.hvparams:
1552 self.cluster.hvparams = self.new_hvparams
1553 if self.op.enabled_hypervisors is not None:
1554 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1555 if self.op.beparams:
1556 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1557 if self.op.candidate_pool_size is not None:
1558 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1560 self.cfg.Update(self.cluster)
1562 # we want to update nodes after the cluster so that if any errors
1563 # happen, we have recorded and saved the cluster info
1564 if self.op.candidate_pool_size is not None:
1565 _AdjustCandidatePool(self)
1568 class LURedistributeConfig(NoHooksLU):
1569 """Force the redistribution of cluster configuration.
1571 This is a very simple LU.
1577 def ExpandNames(self):
1578 self.needed_locks = {
1579 locking.LEVEL_NODE: locking.ALL_SET,
1581 self.share_locks[locking.LEVEL_NODE] = 1
1583 def CheckPrereq(self):
1584 """Check prerequisites.
1588 def Exec(self, feedback_fn):
1589 """Redistribute the configuration.
1592 self.cfg.Update(self.cfg.GetClusterInfo())
1595 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1596 """Sleep and poll for an instance's disk to sync.
1599 if not instance.disks:
1603 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1605 node = instance.primary_node
1607 for dev in instance.disks:
1608 lu.cfg.SetDiskID(dev, node)
1614 cumul_degraded = False
1615 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1616 if rstats.failed or not rstats.data:
1617 lu.LogWarning("Can't get any data from node %s", node)
1620 raise errors.RemoteError("Can't contact node %s for mirror data,"
1621 " aborting." % node)
1624 rstats = rstats.data
1626 for i, mstat in enumerate(rstats):
1628 lu.LogWarning("Can't compute data for node %s/%s",
1629 node, instance.disks[i].iv_name)
1631 # we ignore the ldisk parameter
1632 perc_done, est_time, is_degraded, _ = mstat
1633 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1634 if perc_done is not None:
1636 if est_time is not None:
1637 rem_time = "%d estimated seconds remaining" % est_time
1640 rem_time = "no time estimate"
1641 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1642 (instance.disks[i].iv_name, perc_done, rem_time))
1646 time.sleep(min(60, max_time))
1649 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1650 return not cumul_degraded
1653 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1654 """Check that mirrors are not degraded.
1656 The ldisk parameter, if True, will change the test from the
1657 is_degraded attribute (which represents overall non-ok status for
1658 the device(s)) to the ldisk (representing the local storage status).
1661 lu.cfg.SetDiskID(dev, node)
1668 if on_primary or dev.AssembleOnSecondary():
1669 rstats = lu.rpc.call_blockdev_find(node, dev)
1670 msg = rstats.RemoteFailMsg()
1672 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1674 elif not rstats.payload:
1675 lu.LogWarning("Can't find disk on node %s", node)
1678 result = result and (not rstats.payload[idx])
1680 for child in dev.children:
1681 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1686 class LUDiagnoseOS(NoHooksLU):
1687 """Logical unit for OS diagnose/query.
1690 _OP_REQP = ["output_fields", "names"]
1692 _FIELDS_STATIC = utils.FieldSet()
1693 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1695 def ExpandNames(self):
1697 raise errors.OpPrereqError("Selective OS query not supported")
1699 _CheckOutputFields(static=self._FIELDS_STATIC,
1700 dynamic=self._FIELDS_DYNAMIC,
1701 selected=self.op.output_fields)
1703 # Lock all nodes, in shared mode
1704 # Temporary removal of locks, should be reverted later
1705 # TODO: reintroduce locks when they are lighter-weight
1706 self.needed_locks = {}
1707 #self.share_locks[locking.LEVEL_NODE] = 1
1708 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1710 def CheckPrereq(self):
1711 """Check prerequisites.
1716 def _DiagnoseByOS(node_list, rlist):
1717 """Remaps a per-node return list into an a per-os per-node dictionary
1719 @param node_list: a list with the names of all nodes
1720 @param rlist: a map with node names as keys and OS objects as values
1723 @return: a dictionary with osnames as keys and as value another map, with
1724 nodes as keys and list of OS objects as values, eg::
1726 {"debian-etch": {"node1": [<object>,...],
1727 "node2": [<object>,]}
1732 # we build here the list of nodes that didn't fail the RPC (at RPC
1733 # level), so that nodes with a non-responding node daemon don't
1734 # make all OSes invalid
1735 good_nodes = [node_name for node_name in rlist
1736 if not rlist[node_name].failed]
1737 for node_name, nr in rlist.iteritems():
1738 if nr.failed or not nr.data:
1740 for os_obj in nr.data:
1741 if os_obj.name not in all_os:
1742 # build a list of nodes for this os containing empty lists
1743 # for each node in node_list
1744 all_os[os_obj.name] = {}
1745 for nname in good_nodes:
1746 all_os[os_obj.name][nname] = []
1747 all_os[os_obj.name][node_name].append(os_obj)
1750 def Exec(self, feedback_fn):
1751 """Compute the list of OSes.
1754 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1755 node_data = self.rpc.call_os_diagnose(valid_nodes)
1756 if node_data == False:
1757 raise errors.OpExecError("Can't gather the list of OSes")
1758 pol = self._DiagnoseByOS(valid_nodes, node_data)
1760 for os_name, os_data in pol.iteritems():
1762 for field in self.op.output_fields:
1765 elif field == "valid":
1766 val = utils.all([osl and osl[0] for osl in os_data.values()])
1767 elif field == "node_status":
1769 for node_name, nos_list in os_data.iteritems():
1770 val[node_name] = [(v.status, v.path) for v in nos_list]
1772 raise errors.ParameterError(field)
1779 class LURemoveNode(LogicalUnit):
1780 """Logical unit for removing a node.
1783 HPATH = "node-remove"
1784 HTYPE = constants.HTYPE_NODE
1785 _OP_REQP = ["node_name"]
1787 def BuildHooksEnv(self):
1790 This doesn't run on the target node in the pre phase as a failed
1791 node would then be impossible to remove.
1795 "OP_TARGET": self.op.node_name,
1796 "NODE_NAME": self.op.node_name,
1798 all_nodes = self.cfg.GetNodeList()
1799 all_nodes.remove(self.op.node_name)
1800 return env, all_nodes, all_nodes
1802 def CheckPrereq(self):
1803 """Check prerequisites.
1806 - the node exists in the configuration
1807 - it does not have primary or secondary instances
1808 - it's not the master
1810 Any errors are signalled by raising errors.OpPrereqError.
1813 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1815 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1817 instance_list = self.cfg.GetInstanceList()
1819 masternode = self.cfg.GetMasterNode()
1820 if node.name == masternode:
1821 raise errors.OpPrereqError("Node is the master node,"
1822 " you need to failover first.")
1824 for instance_name in instance_list:
1825 instance = self.cfg.GetInstanceInfo(instance_name)
1826 if node.name in instance.all_nodes:
1827 raise errors.OpPrereqError("Instance %s is still running on the node,"
1828 " please remove first." % instance_name)
1829 self.op.node_name = node.name
1832 def Exec(self, feedback_fn):
1833 """Removes the node from the cluster.
1837 logging.info("Stopping the node daemon and removing configs from node %s",
1840 self.context.RemoveNode(node.name)
1842 self.rpc.call_node_leave_cluster(node.name)
1844 # Promote nodes to master candidate as needed
1845 _AdjustCandidatePool(self)
1848 class LUQueryNodes(NoHooksLU):
1849 """Logical unit for querying nodes.
1852 _OP_REQP = ["output_fields", "names", "use_locking"]
1854 _FIELDS_DYNAMIC = utils.FieldSet(
1856 "mtotal", "mnode", "mfree",
1858 "ctotal", "cnodes", "csockets",
1861 _FIELDS_STATIC = utils.FieldSet(
1862 "name", "pinst_cnt", "sinst_cnt",
1863 "pinst_list", "sinst_list",
1864 "pip", "sip", "tags",
1872 def ExpandNames(self):
1873 _CheckOutputFields(static=self._FIELDS_STATIC,
1874 dynamic=self._FIELDS_DYNAMIC,
1875 selected=self.op.output_fields)
1877 self.needed_locks = {}
1878 self.share_locks[locking.LEVEL_NODE] = 1
1881 self.wanted = _GetWantedNodes(self, self.op.names)
1883 self.wanted = locking.ALL_SET
1885 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1886 self.do_locking = self.do_node_query and self.op.use_locking
1888 # if we don't request only static fields, we need to lock the nodes
1889 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1892 def CheckPrereq(self):
1893 """Check prerequisites.
1896 # The validation of the node list is done in the _GetWantedNodes,
1897 # if non empty, and if empty, there's no validation to do
1900 def Exec(self, feedback_fn):
1901 """Computes the list of nodes and their attributes.
1904 all_info = self.cfg.GetAllNodesInfo()
1906 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1907 elif self.wanted != locking.ALL_SET:
1908 nodenames = self.wanted
1909 missing = set(nodenames).difference(all_info.keys())
1911 raise errors.OpExecError(
1912 "Some nodes were removed before retrieving their data: %s" % missing)
1914 nodenames = all_info.keys()
1916 nodenames = utils.NiceSort(nodenames)
1917 nodelist = [all_info[name] for name in nodenames]
1919 # begin data gathering
1921 if self.do_node_query:
1923 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1924 self.cfg.GetHypervisorType())
1925 for name in nodenames:
1926 nodeinfo = node_data[name]
1927 if not nodeinfo.failed and nodeinfo.data:
1928 nodeinfo = nodeinfo.data
1929 fn = utils.TryConvert
1931 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1932 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1933 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1934 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1935 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1936 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1937 "bootid": nodeinfo.get('bootid', None),
1938 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1939 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1942 live_data[name] = {}
1944 live_data = dict.fromkeys(nodenames, {})
1946 node_to_primary = dict([(name, set()) for name in nodenames])
1947 node_to_secondary = dict([(name, set()) for name in nodenames])
1949 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1950 "sinst_cnt", "sinst_list"))
1951 if inst_fields & frozenset(self.op.output_fields):
1952 instancelist = self.cfg.GetInstanceList()
1954 for instance_name in instancelist:
1955 inst = self.cfg.GetInstanceInfo(instance_name)
1956 if inst.primary_node in node_to_primary:
1957 node_to_primary[inst.primary_node].add(inst.name)
1958 for secnode in inst.secondary_nodes:
1959 if secnode in node_to_secondary:
1960 node_to_secondary[secnode].add(inst.name)
1962 master_node = self.cfg.GetMasterNode()
1964 # end data gathering
1967 for node in nodelist:
1969 for field in self.op.output_fields:
1972 elif field == "pinst_list":
1973 val = list(node_to_primary[node.name])
1974 elif field == "sinst_list":
1975 val = list(node_to_secondary[node.name])
1976 elif field == "pinst_cnt":
1977 val = len(node_to_primary[node.name])
1978 elif field == "sinst_cnt":
1979 val = len(node_to_secondary[node.name])
1980 elif field == "pip":
1981 val = node.primary_ip
1982 elif field == "sip":
1983 val = node.secondary_ip
1984 elif field == "tags":
1985 val = list(node.GetTags())
1986 elif field == "serial_no":
1987 val = node.serial_no
1988 elif field == "master_candidate":
1989 val = node.master_candidate
1990 elif field == "master":
1991 val = node.name == master_node
1992 elif field == "offline":
1994 elif field == "drained":
1996 elif self._FIELDS_DYNAMIC.Matches(field):
1997 val = live_data[node.name].get(field, None)
1999 raise errors.ParameterError(field)
2000 node_output.append(val)
2001 output.append(node_output)
2006 class LUQueryNodeVolumes(NoHooksLU):
2007 """Logical unit for getting volumes on node(s).
2010 _OP_REQP = ["nodes", "output_fields"]
2012 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2013 _FIELDS_STATIC = utils.FieldSet("node")
2015 def ExpandNames(self):
2016 _CheckOutputFields(static=self._FIELDS_STATIC,
2017 dynamic=self._FIELDS_DYNAMIC,
2018 selected=self.op.output_fields)
2020 self.needed_locks = {}
2021 self.share_locks[locking.LEVEL_NODE] = 1
2022 if not self.op.nodes:
2023 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2025 self.needed_locks[locking.LEVEL_NODE] = \
2026 _GetWantedNodes(self, self.op.nodes)
2028 def CheckPrereq(self):
2029 """Check prerequisites.
2031 This checks that the fields required are valid output fields.
2034 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2036 def Exec(self, feedback_fn):
2037 """Computes the list of nodes and their attributes.
2040 nodenames = self.nodes
2041 volumes = self.rpc.call_node_volumes(nodenames)
2043 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2044 in self.cfg.GetInstanceList()]
2046 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2049 for node in nodenames:
2050 if node not in volumes or volumes[node].failed or not volumes[node].data:
2053 node_vols = volumes[node].data[:]
2054 node_vols.sort(key=lambda vol: vol['dev'])
2056 for vol in node_vols:
2058 for field in self.op.output_fields:
2061 elif field == "phys":
2065 elif field == "name":
2067 elif field == "size":
2068 val = int(float(vol['size']))
2069 elif field == "instance":
2071 if node not in lv_by_node[inst]:
2073 if vol['name'] in lv_by_node[inst][node]:
2079 raise errors.ParameterError(field)
2080 node_output.append(str(val))
2082 output.append(node_output)
2087 class LUAddNode(LogicalUnit):
2088 """Logical unit for adding node to the cluster.
2092 HTYPE = constants.HTYPE_NODE
2093 _OP_REQP = ["node_name"]
2095 def BuildHooksEnv(self):
2098 This will run on all nodes before, and on all nodes + the new node after.
2102 "OP_TARGET": self.op.node_name,
2103 "NODE_NAME": self.op.node_name,
2104 "NODE_PIP": self.op.primary_ip,
2105 "NODE_SIP": self.op.secondary_ip,
2107 nodes_0 = self.cfg.GetNodeList()
2108 nodes_1 = nodes_0 + [self.op.node_name, ]
2109 return env, nodes_0, nodes_1
2111 def CheckPrereq(self):
2112 """Check prerequisites.
2115 - the new node is not already in the config
2117 - its parameters (single/dual homed) matches the cluster
2119 Any errors are signalled by raising errors.OpPrereqError.
2122 node_name = self.op.node_name
2125 dns_data = utils.HostInfo(node_name)
2127 node = dns_data.name
2128 primary_ip = self.op.primary_ip = dns_data.ip
2129 secondary_ip = getattr(self.op, "secondary_ip", None)
2130 if secondary_ip is None:
2131 secondary_ip = primary_ip
2132 if not utils.IsValidIP(secondary_ip):
2133 raise errors.OpPrereqError("Invalid secondary IP given")
2134 self.op.secondary_ip = secondary_ip
2136 node_list = cfg.GetNodeList()
2137 if not self.op.readd and node in node_list:
2138 raise errors.OpPrereqError("Node %s is already in the configuration" %
2140 elif self.op.readd and node not in node_list:
2141 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2143 for existing_node_name in node_list:
2144 existing_node = cfg.GetNodeInfo(existing_node_name)
2146 if self.op.readd and node == existing_node_name:
2147 if (existing_node.primary_ip != primary_ip or
2148 existing_node.secondary_ip != secondary_ip):
2149 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2150 " address configuration as before")
2153 if (existing_node.primary_ip == primary_ip or
2154 existing_node.secondary_ip == primary_ip or
2155 existing_node.primary_ip == secondary_ip or
2156 existing_node.secondary_ip == secondary_ip):
2157 raise errors.OpPrereqError("New node ip address(es) conflict with"
2158 " existing node %s" % existing_node.name)
2160 # check that the type of the node (single versus dual homed) is the
2161 # same as for the master
2162 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2163 master_singlehomed = myself.secondary_ip == myself.primary_ip
2164 newbie_singlehomed = secondary_ip == primary_ip
2165 if master_singlehomed != newbie_singlehomed:
2166 if master_singlehomed:
2167 raise errors.OpPrereqError("The master has no private ip but the"
2168 " new node has one")
2170 raise errors.OpPrereqError("The master has a private ip but the"
2171 " new node doesn't have one")
2173 # checks reachablity
2174 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2175 raise errors.OpPrereqError("Node not reachable by ping")
2177 if not newbie_singlehomed:
2178 # check reachability from my secondary ip to newbie's secondary ip
2179 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2180 source=myself.secondary_ip):
2181 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2182 " based ping to noded port")
2184 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2185 mc_now, _ = self.cfg.GetMasterCandidateStats()
2186 master_candidate = mc_now < cp_size
2188 self.new_node = objects.Node(name=node,
2189 primary_ip=primary_ip,
2190 secondary_ip=secondary_ip,
2191 master_candidate=master_candidate,
2192 offline=False, drained=False)
2194 def Exec(self, feedback_fn):
2195 """Adds the new node to the cluster.
2198 new_node = self.new_node
2199 node = new_node.name
2201 # check connectivity
2202 result = self.rpc.call_version([node])[node]
2205 if constants.PROTOCOL_VERSION == result.data:
2206 logging.info("Communication to node %s fine, sw version %s match",
2209 raise errors.OpExecError("Version mismatch master version %s,"
2210 " node version %s" %
2211 (constants.PROTOCOL_VERSION, result.data))
2213 raise errors.OpExecError("Cannot get version from the new node")
2216 logging.info("Copy ssh key to node %s", node)
2217 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2219 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2220 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2226 keyarray.append(f.read())
2230 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2232 keyarray[3], keyarray[4], keyarray[5])
2234 msg = result.RemoteFailMsg()
2236 raise errors.OpExecError("Cannot transfer ssh keys to the"
2237 " new node: %s" % msg)
2239 # Add node to our /etc/hosts, and add key to known_hosts
2240 utils.AddHostToEtcHosts(new_node.name)
2242 if new_node.secondary_ip != new_node.primary_ip:
2243 result = self.rpc.call_node_has_ip_address(new_node.name,
2244 new_node.secondary_ip)
2245 if result.failed or not result.data:
2246 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2247 " you gave (%s). Please fix and re-run this"
2248 " command." % new_node.secondary_ip)
2250 node_verify_list = [self.cfg.GetMasterNode()]
2251 node_verify_param = {
2253 # TODO: do a node-net-test as well?
2256 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2257 self.cfg.GetClusterName())
2258 for verifier in node_verify_list:
2259 if result[verifier].failed or not result[verifier].data:
2260 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2261 " for remote verification" % verifier)
2262 if result[verifier].data['nodelist']:
2263 for failed in result[verifier].data['nodelist']:
2264 feedback_fn("ssh/hostname verification failed %s -> %s" %
2265 (verifier, result[verifier].data['nodelist'][failed]))
2266 raise errors.OpExecError("ssh/hostname verification failed.")
2268 # Distribute updated /etc/hosts and known_hosts to all nodes,
2269 # including the node just added
2270 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2271 dist_nodes = self.cfg.GetNodeList()
2272 if not self.op.readd:
2273 dist_nodes.append(node)
2274 if myself.name in dist_nodes:
2275 dist_nodes.remove(myself.name)
2277 logging.debug("Copying hosts and known_hosts to all nodes")
2278 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2279 result = self.rpc.call_upload_file(dist_nodes, fname)
2280 for to_node, to_result in result.iteritems():
2281 if to_result.failed or not to_result.data:
2282 logging.error("Copy of file %s to node %s failed", fname, to_node)
2285 enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2286 if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2287 to_copy.append(constants.VNC_PASSWORD_FILE)
2289 for fname in to_copy:
2290 result = self.rpc.call_upload_file([node], fname)
2291 if result[node].failed or not result[node]:
2292 logging.error("Could not copy file %s to node %s", fname, node)
2295 self.context.ReaddNode(new_node)
2297 self.context.AddNode(new_node)
2300 class LUSetNodeParams(LogicalUnit):
2301 """Modifies the parameters of a node.
2304 HPATH = "node-modify"
2305 HTYPE = constants.HTYPE_NODE
2306 _OP_REQP = ["node_name"]
2309 def CheckArguments(self):
2310 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2311 if node_name is None:
2312 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2313 self.op.node_name = node_name
2314 _CheckBooleanOpField(self.op, 'master_candidate')
2315 _CheckBooleanOpField(self.op, 'offline')
2316 _CheckBooleanOpField(self.op, 'drained')
2317 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2318 if all_mods.count(None) == 3:
2319 raise errors.OpPrereqError("Please pass at least one modification")
2320 if all_mods.count(True) > 1:
2321 raise errors.OpPrereqError("Can't set the node into more than one"
2322 " state at the same time")
2324 def ExpandNames(self):
2325 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2327 def BuildHooksEnv(self):
2330 This runs on the master node.
2334 "OP_TARGET": self.op.node_name,
2335 "MASTER_CANDIDATE": str(self.op.master_candidate),
2336 "OFFLINE": str(self.op.offline),
2337 "DRAINED": str(self.op.drained),
2339 nl = [self.cfg.GetMasterNode(),
2343 def CheckPrereq(self):
2344 """Check prerequisites.
2346 This only checks the instance list against the existing names.
2349 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2351 if ((self.op.master_candidate == False or self.op.offline == True or
2352 self.op.drained == True) and node.master_candidate):
2353 # we will demote the node from master_candidate
2354 if self.op.node_name == self.cfg.GetMasterNode():
2355 raise errors.OpPrereqError("The master node has to be a"
2356 " master candidate, online and not drained")
2357 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2358 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2359 if num_candidates <= cp_size:
2360 msg = ("Not enough master candidates (desired"
2361 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2363 self.LogWarning(msg)
2365 raise errors.OpPrereqError(msg)
2367 if (self.op.master_candidate == True and
2368 ((node.offline and not self.op.offline == False) or
2369 (node.drained and not self.op.drained == False))):
2370 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2371 " to master_candidate" % node.name)
2375 def Exec(self, feedback_fn):
2384 if self.op.offline is not None:
2385 node.offline = self.op.offline
2386 result.append(("offline", str(self.op.offline)))
2387 if self.op.offline == True:
2388 if node.master_candidate:
2389 node.master_candidate = False
2391 result.append(("master_candidate", "auto-demotion due to offline"))
2393 node.drained = False
2394 result.append(("drained", "clear drained status due to offline"))
2396 if self.op.master_candidate is not None:
2397 node.master_candidate = self.op.master_candidate
2399 result.append(("master_candidate", str(self.op.master_candidate)))
2400 if self.op.master_candidate == False:
2401 rrc = self.rpc.call_node_demote_from_mc(node.name)
2402 msg = rrc.RemoteFailMsg()
2404 self.LogWarning("Node failed to demote itself: %s" % msg)
2406 if self.op.drained is not None:
2407 node.drained = self.op.drained
2408 result.append(("drained", str(self.op.drained)))
2409 if self.op.drained == True:
2410 if node.master_candidate:
2411 node.master_candidate = False
2413 result.append(("master_candidate", "auto-demotion due to drain"))
2415 node.offline = False
2416 result.append(("offline", "clear offline status due to drain"))
2418 # this will trigger configuration file update, if needed
2419 self.cfg.Update(node)
2420 # this will trigger job queue propagation or cleanup
2422 self.context.ReaddNode(node)
2427 class LUQueryClusterInfo(NoHooksLU):
2428 """Query cluster configuration.
2434 def ExpandNames(self):
2435 self.needed_locks = {}
2437 def CheckPrereq(self):
2438 """No prerequsites needed for this LU.
2443 def Exec(self, feedback_fn):
2444 """Return cluster config.
2447 cluster = self.cfg.GetClusterInfo()
2449 "software_version": constants.RELEASE_VERSION,
2450 "protocol_version": constants.PROTOCOL_VERSION,
2451 "config_version": constants.CONFIG_VERSION,
2452 "os_api_version": constants.OS_API_VERSION,
2453 "export_version": constants.EXPORT_VERSION,
2454 "architecture": (platform.architecture()[0], platform.machine()),
2455 "name": cluster.cluster_name,
2456 "master": cluster.master_node,
2457 "default_hypervisor": cluster.default_hypervisor,
2458 "enabled_hypervisors": cluster.enabled_hypervisors,
2459 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2460 for hypervisor in cluster.enabled_hypervisors]),
2461 "beparams": cluster.beparams,
2462 "candidate_pool_size": cluster.candidate_pool_size,
2463 "default_bridge": cluster.default_bridge,
2464 "master_netdev": cluster.master_netdev,
2465 "volume_group_name": cluster.volume_group_name,
2466 "file_storage_dir": cluster.file_storage_dir,
2472 class LUQueryConfigValues(NoHooksLU):
2473 """Return configuration values.
2478 _FIELDS_DYNAMIC = utils.FieldSet()
2479 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2481 def ExpandNames(self):
2482 self.needed_locks = {}
2484 _CheckOutputFields(static=self._FIELDS_STATIC,
2485 dynamic=self._FIELDS_DYNAMIC,
2486 selected=self.op.output_fields)
2488 def CheckPrereq(self):
2489 """No prerequisites.
2494 def Exec(self, feedback_fn):
2495 """Dump a representation of the cluster config to the standard output.
2499 for field in self.op.output_fields:
2500 if field == "cluster_name":
2501 entry = self.cfg.GetClusterName()
2502 elif field == "master_node":
2503 entry = self.cfg.GetMasterNode()
2504 elif field == "drain_flag":
2505 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2507 raise errors.ParameterError(field)
2508 values.append(entry)
2512 class LUActivateInstanceDisks(NoHooksLU):
2513 """Bring up an instance's disks.
2516 _OP_REQP = ["instance_name"]
2519 def ExpandNames(self):
2520 self._ExpandAndLockInstance()
2521 self.needed_locks[locking.LEVEL_NODE] = []
2522 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2524 def DeclareLocks(self, level):
2525 if level == locking.LEVEL_NODE:
2526 self._LockInstancesNodes()
2528 def CheckPrereq(self):
2529 """Check prerequisites.
2531 This checks that the instance is in the cluster.
2534 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2535 assert self.instance is not None, \
2536 "Cannot retrieve locked instance %s" % self.op.instance_name
2537 _CheckNodeOnline(self, self.instance.primary_node)
2539 def Exec(self, feedback_fn):
2540 """Activate the disks.
2543 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2545 raise errors.OpExecError("Cannot activate block devices")
2550 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2551 """Prepare the block devices for an instance.
2553 This sets up the block devices on all nodes.
2555 @type lu: L{LogicalUnit}
2556 @param lu: the logical unit on whose behalf we execute
2557 @type instance: L{objects.Instance}
2558 @param instance: the instance for whose disks we assemble
2559 @type ignore_secondaries: boolean
2560 @param ignore_secondaries: if true, errors on secondary nodes
2561 won't result in an error return from the function
2562 @return: False if the operation failed, otherwise a list of
2563 (host, instance_visible_name, node_visible_name)
2564 with the mapping from node devices to instance devices
2569 iname = instance.name
2570 # With the two passes mechanism we try to reduce the window of
2571 # opportunity for the race condition of switching DRBD to primary
2572 # before handshaking occured, but we do not eliminate it
2574 # The proper fix would be to wait (with some limits) until the
2575 # connection has been made and drbd transitions from WFConnection
2576 # into any other network-connected state (Connected, SyncTarget,
2579 # 1st pass, assemble on all nodes in secondary mode
2580 for inst_disk in instance.disks:
2581 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2582 lu.cfg.SetDiskID(node_disk, node)
2583 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2584 msg = result.RemoteFailMsg()
2586 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2587 " (is_primary=False, pass=1): %s",
2588 inst_disk.iv_name, node, msg)
2589 if not ignore_secondaries:
2592 # FIXME: race condition on drbd migration to primary
2594 # 2nd pass, do only the primary node
2595 for inst_disk in instance.disks:
2596 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2597 if node != instance.primary_node:
2599 lu.cfg.SetDiskID(node_disk, node)
2600 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2601 msg = result.RemoteFailMsg()
2603 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2604 " (is_primary=True, pass=2): %s",
2605 inst_disk.iv_name, node, msg)
2607 device_info.append((instance.primary_node, inst_disk.iv_name,
2610 # leave the disks configured for the primary node
2611 # this is a workaround that would be fixed better by
2612 # improving the logical/physical id handling
2613 for disk in instance.disks:
2614 lu.cfg.SetDiskID(disk, instance.primary_node)
2616 return disks_ok, device_info
2619 def _StartInstanceDisks(lu, instance, force):
2620 """Start the disks of an instance.
2623 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2624 ignore_secondaries=force)
2626 _ShutdownInstanceDisks(lu, instance)
2627 if force is not None and not force:
2628 lu.proc.LogWarning("", hint="If the message above refers to a"
2630 " you can retry the operation using '--force'.")
2631 raise errors.OpExecError("Disk consistency error")
2634 class LUDeactivateInstanceDisks(NoHooksLU):
2635 """Shutdown an instance's disks.
2638 _OP_REQP = ["instance_name"]
2641 def ExpandNames(self):
2642 self._ExpandAndLockInstance()
2643 self.needed_locks[locking.LEVEL_NODE] = []
2644 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2646 def DeclareLocks(self, level):
2647 if level == locking.LEVEL_NODE:
2648 self._LockInstancesNodes()
2650 def CheckPrereq(self):
2651 """Check prerequisites.
2653 This checks that the instance is in the cluster.
2656 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2657 assert self.instance is not None, \
2658 "Cannot retrieve locked instance %s" % self.op.instance_name
2660 def Exec(self, feedback_fn):
2661 """Deactivate the disks
2664 instance = self.instance
2665 _SafeShutdownInstanceDisks(self, instance)
2668 def _SafeShutdownInstanceDisks(lu, instance):
2669 """Shutdown block devices of an instance.
2671 This function checks if an instance is running, before calling
2672 _ShutdownInstanceDisks.
2675 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2676 [instance.hypervisor])
2677 ins_l = ins_l[instance.primary_node]
2678 if ins_l.failed or not isinstance(ins_l.data, list):
2679 raise errors.OpExecError("Can't contact node '%s'" %
2680 instance.primary_node)
2682 if instance.name in ins_l.data:
2683 raise errors.OpExecError("Instance is running, can't shutdown"
2686 _ShutdownInstanceDisks(lu, instance)
2689 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2690 """Shutdown block devices of an instance.
2692 This does the shutdown on all nodes of the instance.
2694 If the ignore_primary is false, errors on the primary node are
2699 for disk in instance.disks:
2700 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2701 lu.cfg.SetDiskID(top_disk, node)
2702 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2703 msg = result.RemoteFailMsg()
2705 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2706 disk.iv_name, node, msg)
2707 if not ignore_primary or node != instance.primary_node:
2712 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2713 """Checks if a node has enough free memory.
2715 This function check if a given node has the needed amount of free
2716 memory. In case the node has less memory or we cannot get the
2717 information from the node, this function raise an OpPrereqError
2720 @type lu: C{LogicalUnit}
2721 @param lu: a logical unit from which we get configuration data
2723 @param node: the node to check
2724 @type reason: C{str}
2725 @param reason: string to use in the error message
2726 @type requested: C{int}
2727 @param requested: the amount of memory in MiB to check for
2728 @type hypervisor_name: C{str}
2729 @param hypervisor_name: the hypervisor to ask for memory stats
2730 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2731 we cannot check the node
2734 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2735 nodeinfo[node].Raise()
2736 free_mem = nodeinfo[node].data.get('memory_free')
2737 if not isinstance(free_mem, int):
2738 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2739 " was '%s'" % (node, free_mem))
2740 if requested > free_mem:
2741 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2742 " needed %s MiB, available %s MiB" %
2743 (node, reason, requested, free_mem))
2746 class LUStartupInstance(LogicalUnit):
2747 """Starts an instance.
2750 HPATH = "instance-start"
2751 HTYPE = constants.HTYPE_INSTANCE
2752 _OP_REQP = ["instance_name", "force"]
2755 def ExpandNames(self):
2756 self._ExpandAndLockInstance()
2758 def BuildHooksEnv(self):
2761 This runs on master, primary and secondary nodes of the instance.
2765 "FORCE": self.op.force,
2767 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2768 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2771 def CheckPrereq(self):
2772 """Check prerequisites.
2774 This checks that the instance is in the cluster.
2777 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2778 assert self.instance is not None, \
2779 "Cannot retrieve locked instance %s" % self.op.instance_name
2782 self.beparams = getattr(self.op, "beparams", {})
2784 if not isinstance(self.beparams, dict):
2785 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2786 " dict" % (type(self.beparams), ))
2787 # fill the beparams dict
2788 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2789 self.op.beparams = self.beparams
2792 self.hvparams = getattr(self.op, "hvparams", {})
2794 if not isinstance(self.hvparams, dict):
2795 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2796 " dict" % (type(self.hvparams), ))
2798 # check hypervisor parameter syntax (locally)
2799 cluster = self.cfg.GetClusterInfo()
2800 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2801 filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2803 filled_hvp.update(self.hvparams)
2804 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2805 hv_type.CheckParameterSyntax(filled_hvp)
2806 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2807 self.op.hvparams = self.hvparams
2809 _CheckNodeOnline(self, instance.primary_node)
2811 bep = self.cfg.GetClusterInfo().FillBE(instance)
2812 # check bridges existance
2813 _CheckInstanceBridgesExist(self, instance)
2815 remote_info = self.rpc.call_instance_info(instance.primary_node,
2817 instance.hypervisor)
2819 if not remote_info.data:
2820 _CheckNodeFreeMemory(self, instance.primary_node,
2821 "starting instance %s" % instance.name,
2822 bep[constants.BE_MEMORY], instance.hypervisor)
2824 def Exec(self, feedback_fn):
2825 """Start the instance.
2828 instance = self.instance
2829 force = self.op.force
2831 self.cfg.MarkInstanceUp(instance.name)
2833 node_current = instance.primary_node
2835 _StartInstanceDisks(self, instance, force)
2837 result = self.rpc.call_instance_start(node_current, instance,
2838 self.hvparams, self.beparams)
2839 msg = result.RemoteFailMsg()
2841 _ShutdownInstanceDisks(self, instance)
2842 raise errors.OpExecError("Could not start instance: %s" % msg)
2845 class LURebootInstance(LogicalUnit):
2846 """Reboot an instance.
2849 HPATH = "instance-reboot"
2850 HTYPE = constants.HTYPE_INSTANCE
2851 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2854 def ExpandNames(self):
2855 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2856 constants.INSTANCE_REBOOT_HARD,
2857 constants.INSTANCE_REBOOT_FULL]:
2858 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2859 (constants.INSTANCE_REBOOT_SOFT,
2860 constants.INSTANCE_REBOOT_HARD,
2861 constants.INSTANCE_REBOOT_FULL))
2862 self._ExpandAndLockInstance()
2864 def BuildHooksEnv(self):
2867 This runs on master, primary and secondary nodes of the instance.
2871 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2872 "REBOOT_TYPE": self.op.reboot_type,
2874 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2875 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2878 def CheckPrereq(self):
2879 """Check prerequisites.
2881 This checks that the instance is in the cluster.
2884 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2885 assert self.instance is not None, \
2886 "Cannot retrieve locked instance %s" % self.op.instance_name
2888 _CheckNodeOnline(self, instance.primary_node)
2890 # check bridges existance
2891 _CheckInstanceBridgesExist(self, instance)
2893 def Exec(self, feedback_fn):
2894 """Reboot the instance.
2897 instance = self.instance
2898 ignore_secondaries = self.op.ignore_secondaries
2899 reboot_type = self.op.reboot_type
2901 node_current = instance.primary_node
2903 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2904 constants.INSTANCE_REBOOT_HARD]:
2905 for disk in instance.disks:
2906 self.cfg.SetDiskID(disk, node_current)
2907 result = self.rpc.call_instance_reboot(node_current, instance,
2909 msg = result.RemoteFailMsg()
2911 raise errors.OpExecError("Could not reboot instance: %s" % msg)
2913 result = self.rpc.call_instance_shutdown(node_current, instance)
2914 msg = result.RemoteFailMsg()
2916 raise errors.OpExecError("Could not shutdown instance for"
2917 " full reboot: %s" % msg)
2918 _ShutdownInstanceDisks(self, instance)
2919 _StartInstanceDisks(self, instance, ignore_secondaries)
2920 result = self.rpc.call_instance_start(node_current, instance, None, None)
2921 msg = result.RemoteFailMsg()
2923 _ShutdownInstanceDisks(self, instance)
2924 raise errors.OpExecError("Could not start instance for"
2925 " full reboot: %s" % msg)
2927 self.cfg.MarkInstanceUp(instance.name)
2930 class LUShutdownInstance(LogicalUnit):
2931 """Shutdown an instance.
2934 HPATH = "instance-stop"
2935 HTYPE = constants.HTYPE_INSTANCE
2936 _OP_REQP = ["instance_name"]
2939 def ExpandNames(self):
2940 self._ExpandAndLockInstance()
2942 def BuildHooksEnv(self):
2945 This runs on master, primary and secondary nodes of the instance.
2948 env = _BuildInstanceHookEnvByObject(self, self.instance)
2949 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2952 def CheckPrereq(self):
2953 """Check prerequisites.
2955 This checks that the instance is in the cluster.
2958 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2959 assert self.instance is not None, \
2960 "Cannot retrieve locked instance %s" % self.op.instance_name
2961 _CheckNodeOnline(self, self.instance.primary_node)
2963 def Exec(self, feedback_fn):
2964 """Shutdown the instance.
2967 instance = self.instance
2968 node_current = instance.primary_node
2969 self.cfg.MarkInstanceDown(instance.name)
2970 result = self.rpc.call_instance_shutdown(node_current, instance)
2971 msg = result.RemoteFailMsg()
2973 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2975 _ShutdownInstanceDisks(self, instance)
2978 class LUReinstallInstance(LogicalUnit):
2979 """Reinstall an instance.
2982 HPATH = "instance-reinstall"
2983 HTYPE = constants.HTYPE_INSTANCE
2984 _OP_REQP = ["instance_name"]
2987 def ExpandNames(self):
2988 self._ExpandAndLockInstance()
2990 def BuildHooksEnv(self):
2993 This runs on master, primary and secondary nodes of the instance.
2996 env = _BuildInstanceHookEnvByObject(self, self.instance)
2997 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3000 def CheckPrereq(self):
3001 """Check prerequisites.
3003 This checks that the instance is in the cluster and is not running.
3006 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3007 assert instance is not None, \
3008 "Cannot retrieve locked instance %s" % self.op.instance_name
3009 _CheckNodeOnline(self, instance.primary_node)
3011 if instance.disk_template == constants.DT_DISKLESS:
3012 raise errors.OpPrereqError("Instance '%s' has no disks" %
3013 self.op.instance_name)
3014 if instance.admin_up:
3015 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3016 self.op.instance_name)
3017 remote_info = self.rpc.call_instance_info(instance.primary_node,
3019 instance.hypervisor)
3021 if remote_info.data:
3022 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3023 (self.op.instance_name,
3024 instance.primary_node))
3026 self.op.os_type = getattr(self.op, "os_type", None)
3027 if self.op.os_type is not None:
3029 pnode = self.cfg.GetNodeInfo(
3030 self.cfg.ExpandNodeName(instance.primary_node))
3032 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3034 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3036 if not isinstance(result.data, objects.OS):
3037 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3038 " primary node" % self.op.os_type)
3040 self.instance = instance
3042 def Exec(self, feedback_fn):
3043 """Reinstall the instance.
3046 inst = self.instance
3048 if self.op.os_type is not None:
3049 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3050 inst.os = self.op.os_type
3051 self.cfg.Update(inst)
3053 _StartInstanceDisks(self, inst, None)
3055 feedback_fn("Running the instance OS create scripts...")
3056 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3057 msg = result.RemoteFailMsg()
3059 raise errors.OpExecError("Could not install OS for instance %s"
3061 (inst.name, inst.primary_node, msg))
3063 _ShutdownInstanceDisks(self, inst)
3066 class LURenameInstance(LogicalUnit):
3067 """Rename an instance.
3070 HPATH = "instance-rename"
3071 HTYPE = constants.HTYPE_INSTANCE
3072 _OP_REQP = ["instance_name", "new_name"]
3074 def BuildHooksEnv(self):
3077 This runs on master, primary and secondary nodes of the instance.
3080 env = _BuildInstanceHookEnvByObject(self, self.instance)
3081 env["INSTANCE_NEW_NAME"] = self.op.new_name
3082 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3085 def CheckPrereq(self):
3086 """Check prerequisites.
3088 This checks that the instance is in the cluster and is not running.
3091 instance = self.cfg.GetInstanceInfo(
3092 self.cfg.ExpandInstanceName(self.op.instance_name))
3093 if instance is None:
3094 raise errors.OpPrereqError("Instance '%s' not known" %
3095 self.op.instance_name)
3096 _CheckNodeOnline(self, instance.primary_node)
3098 if instance.admin_up:
3099 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3100 self.op.instance_name)
3101 remote_info = self.rpc.call_instance_info(instance.primary_node,
3103 instance.hypervisor)
3105 if remote_info.data:
3106 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3107 (self.op.instance_name,
3108 instance.primary_node))
3109 self.instance = instance
3111 # new name verification
3112 name_info = utils.HostInfo(self.op.new_name)
3114 self.op.new_name = new_name = name_info.name
3115 instance_list = self.cfg.GetInstanceList()
3116 if new_name in instance_list:
3117 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3120 if not getattr(self.op, "ignore_ip", False):
3121 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3122 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3123 (name_info.ip, new_name))
3126 def Exec(self, feedback_fn):
3127 """Reinstall the instance.
3130 inst = self.instance
3131 old_name = inst.name
3133 if inst.disk_template == constants.DT_FILE:
3134 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3136 self.cfg.RenameInstance(inst.name, self.op.new_name)
3137 # Change the instance lock. This is definitely safe while we hold the BGL
3138 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3139 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3141 # re-read the instance from the configuration after rename
3142 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3144 if inst.disk_template == constants.DT_FILE:
3145 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3146 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3147 old_file_storage_dir,
3148 new_file_storage_dir)
3151 raise errors.OpExecError("Could not connect to node '%s' to rename"
3152 " directory '%s' to '%s' (but the instance"
3153 " has been renamed in Ganeti)" % (
3154 inst.primary_node, old_file_storage_dir,
3155 new_file_storage_dir))
3157 if not result.data[0]:
3158 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3159 " (but the instance has been renamed in"
3160 " Ganeti)" % (old_file_storage_dir,
3161 new_file_storage_dir))
3163 _StartInstanceDisks(self, inst, None)
3165 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3167 msg = result.RemoteFailMsg()
3169 msg = ("Could not run OS rename script for instance %s on node %s"
3170 " (but the instance has been renamed in Ganeti): %s" %
3171 (inst.name, inst.primary_node, msg))
3172 self.proc.LogWarning(msg)
3174 _ShutdownInstanceDisks(self, inst)
3177 class LURemoveInstance(LogicalUnit):
3178 """Remove an instance.
3181 HPATH = "instance-remove"
3182 HTYPE = constants.HTYPE_INSTANCE
3183 _OP_REQP = ["instance_name", "ignore_failures"]
3186 def ExpandNames(self):
3187 self._ExpandAndLockInstance()
3188 self.needed_locks[locking.LEVEL_NODE] = []
3189 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3191 def DeclareLocks(self, level):
3192 if level == locking.LEVEL_NODE:
3193 self._LockInstancesNodes()
3195 def BuildHooksEnv(self):
3198 This runs on master, primary and secondary nodes of the instance.
3201 env = _BuildInstanceHookEnvByObject(self, self.instance)
3202 nl = [self.cfg.GetMasterNode()]
3205 def CheckPrereq(self):
3206 """Check prerequisites.
3208 This checks that the instance is in the cluster.
3211 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3212 assert self.instance is not None, \
3213 "Cannot retrieve locked instance %s" % self.op.instance_name
3215 def Exec(self, feedback_fn):
3216 """Remove the instance.
3219 instance = self.instance
3220 logging.info("Shutting down instance %s on node %s",
3221 instance.name, instance.primary_node)
3223 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3224 msg = result.RemoteFailMsg()
3226 if self.op.ignore_failures:
3227 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3229 raise errors.OpExecError("Could not shutdown instance %s on"
3231 (instance.name, instance.primary_node, msg))
3233 logging.info("Removing block devices for instance %s", instance.name)
3235 if not _RemoveDisks(self, instance):
3236 if self.op.ignore_failures:
3237 feedback_fn("Warning: can't remove instance's disks")
3239 raise errors.OpExecError("Can't remove instance's disks")
3241 logging.info("Removing instance %s out of cluster config", instance.name)
3243 self.cfg.RemoveInstance(instance.name)
3244 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3247 class LUQueryInstances(NoHooksLU):
3248 """Logical unit for querying instances.
3251 _OP_REQP = ["output_fields", "names", "use_locking"]
3253 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3255 "disk_template", "ip", "mac", "bridge",
3256 "sda_size", "sdb_size", "vcpus", "tags",
3257 "network_port", "beparams",
3258 r"(disk)\.(size)/([0-9]+)",
3259 r"(disk)\.(sizes)", "disk_usage",
3260 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3261 r"(nic)\.(macs|ips|bridges)",
3262 r"(disk|nic)\.(count)",
3263 "serial_no", "hypervisor", "hvparams",] +
3265 for name in constants.HVS_PARAMETERS] +
3267 for name in constants.BES_PARAMETERS])
3268 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3271 def ExpandNames(self):
3272 _CheckOutputFields(static=self._FIELDS_STATIC,
3273 dynamic=self._FIELDS_DYNAMIC,
3274 selected=self.op.output_fields)
3276 self.needed_locks = {}
3277 self.share_locks[locking.LEVEL_INSTANCE] = 1
3278 self.share_locks[locking.LEVEL_NODE] = 1
3281 self.wanted = _GetWantedInstances(self, self.op.names)
3283 self.wanted = locking.ALL_SET
3285 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3286 self.do_locking = self.do_node_query and self.op.use_locking
3288 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3289 self.needed_locks[locking.LEVEL_NODE] = []
3290 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3292 def DeclareLocks(self, level):
3293 if level == locking.LEVEL_NODE and self.do_locking:
3294 self._LockInstancesNodes()
3296 def CheckPrereq(self):
3297 """Check prerequisites.
3302 def Exec(self, feedback_fn):
3303 """Computes the list of nodes and their attributes.
3306 all_info = self.cfg.GetAllInstancesInfo()
3307 if self.wanted == locking.ALL_SET:
3308 # caller didn't specify instance names, so ordering is not important
3310 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3312 instance_names = all_info.keys()
3313 instance_names = utils.NiceSort(instance_names)
3315 # caller did specify names, so we must keep the ordering
3317 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3319 tgt_set = all_info.keys()
3320 missing = set(self.wanted).difference(tgt_set)
3322 raise errors.OpExecError("Some instances were removed before"
3323 " retrieving their data: %s" % missing)
3324 instance_names = self.wanted
3326 instance_list = [all_info[iname] for iname in instance_names]
3328 # begin data gathering
3330 nodes = frozenset([inst.primary_node for inst in instance_list])
3331 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3335 if self.do_node_query:
3337 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3339 result = node_data[name]
3341 # offline nodes will be in both lists
3342 off_nodes.append(name)
3344 bad_nodes.append(name)
3347 live_data.update(result.data)
3348 # else no instance is alive
3350 live_data = dict([(name, {}) for name in instance_names])
3352 # end data gathering
3357 for instance in instance_list:
3359 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3360 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3361 for field in self.op.output_fields:
3362 st_match = self._FIELDS_STATIC.Matches(field)
3367 elif field == "pnode":
3368 val = instance.primary_node
3369 elif field == "snodes":
3370 val = list(instance.secondary_nodes)
3371 elif field == "admin_state":
3372 val = instance.admin_up
3373 elif field == "oper_state":
3374 if instance.primary_node in bad_nodes:
3377 val = bool(live_data.get(instance.name))
3378 elif field == "status":
3379 if instance.primary_node in off_nodes:
3380 val = "ERROR_nodeoffline"
3381 elif instance.primary_node in bad_nodes:
3382 val = "ERROR_nodedown"
3384 running = bool(live_data.get(instance.name))
3386 if instance.admin_up:
3391 if instance.admin_up:
3395 elif field == "oper_ram":
3396 if instance.primary_node in bad_nodes:
3398 elif instance.name in live_data:
3399 val = live_data[instance.name].get("memory", "?")
3402 elif field == "disk_template":
3403 val = instance.disk_template
3405 val = instance.nics[0].ip
3406 elif field == "bridge":
3407 val = instance.nics[0].bridge
3408 elif field == "mac":
3409 val = instance.nics[0].mac
3410 elif field == "sda_size" or field == "sdb_size":
3411 idx = ord(field[2]) - ord('a')
3413 val = instance.FindDisk(idx).size
3414 except errors.OpPrereqError:
3416 elif field == "disk_usage": # total disk usage per node
3417 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3418 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3419 elif field == "tags":
3420 val = list(instance.GetTags())
3421 elif field == "serial_no":
3422 val = instance.serial_no
3423 elif field == "network_port":
3424 val = instance.network_port
3425 elif field == "hypervisor":
3426 val = instance.hypervisor
3427 elif field == "hvparams":
3429 elif (field.startswith(HVPREFIX) and
3430 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3431 val = i_hv.get(field[len(HVPREFIX):], None)
3432 elif field == "beparams":
3434 elif (field.startswith(BEPREFIX) and
3435 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3436 val = i_be.get(field[len(BEPREFIX):], None)
3437 elif st_match and st_match.groups():
3438 # matches a variable list
3439 st_groups = st_match.groups()
3440 if st_groups and st_groups[0] == "disk":
3441 if st_groups[1] == "count":
3442 val = len(instance.disks)
3443 elif st_groups[1] == "sizes":
3444 val = [disk.size for disk in instance.disks]
3445 elif st_groups[1] == "size":
3447 val = instance.FindDisk(st_groups[2]).size
3448 except errors.OpPrereqError:
3451 assert False, "Unhandled disk parameter"
3452 elif st_groups[0] == "nic":
3453 if st_groups[1] == "count":
3454 val = len(instance.nics)
3455 elif st_groups[1] == "macs":
3456 val = [nic.mac for nic in instance.nics]
3457 elif st_groups[1] == "ips":
3458 val = [nic.ip for nic in instance.nics]
3459 elif st_groups[1] == "bridges":
3460 val = [nic.bridge for nic in instance.nics]
3463 nic_idx = int(st_groups[2])
3464 if nic_idx >= len(instance.nics):
3467 if st_groups[1] == "mac":
3468 val = instance.nics[nic_idx].mac
3469 elif st_groups[1] == "ip":
3470 val = instance.nics[nic_idx].ip
3471 elif st_groups[1] == "bridge":
3472 val = instance.nics[nic_idx].bridge
3474 assert False, "Unhandled NIC parameter"
3476 assert False, "Unhandled variable parameter"
3478 raise errors.ParameterError(field)
3485 class LUFailoverInstance(LogicalUnit):
3486 """Failover an instance.
3489 HPATH = "instance-failover"
3490 HTYPE = constants.HTYPE_INSTANCE
3491 _OP_REQP = ["instance_name", "ignore_consistency"]
3494 def ExpandNames(self):
3495 self._ExpandAndLockInstance()
3496 self.needed_locks[locking.LEVEL_NODE] = []
3497 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3499 def DeclareLocks(self, level):
3500 if level == locking.LEVEL_NODE:
3501 self._LockInstancesNodes()
3503 def BuildHooksEnv(self):
3506 This runs on master, primary and secondary nodes of the instance.
3510 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3512 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3513 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3516 def CheckPrereq(self):
3517 """Check prerequisites.
3519 This checks that the instance is in the cluster.
3522 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3523 assert self.instance is not None, \
3524 "Cannot retrieve locked instance %s" % self.op.instance_name
3526 bep = self.cfg.GetClusterInfo().FillBE(instance)
3527 if instance.disk_template not in constants.DTS_NET_MIRROR:
3528 raise errors.OpPrereqError("Instance's disk layout is not"
3529 " network mirrored, cannot failover.")
3531 secondary_nodes = instance.secondary_nodes
3532 if not secondary_nodes:
3533 raise errors.ProgrammerError("no secondary node but using "
3534 "a mirrored disk template")
3536 target_node = secondary_nodes[0]
3537 _CheckNodeOnline(self, target_node)
3538 _CheckNodeNotDrained(self, target_node)
3539 # check memory requirements on the secondary node
3540 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3541 instance.name, bep[constants.BE_MEMORY],
3542 instance.hypervisor)
3544 # check bridge existance
3545 brlist = [nic.bridge for nic in instance.nics]
3546 result = self.rpc.call_bridges_exist(target_node, brlist)
3549 raise errors.OpPrereqError("One or more target bridges %s does not"
3550 " exist on destination node '%s'" %
3551 (brlist, target_node))
3553 def Exec(self, feedback_fn):
3554 """Failover an instance.
3556 The failover is done by shutting it down on its present node and
3557 starting it on the secondary.
3560 instance = self.instance
3562 source_node = instance.primary_node
3563 target_node = instance.secondary_nodes[0]
3565 feedback_fn("* checking disk consistency between source and target")
3566 for dev in instance.disks:
3567 # for drbd, these are drbd over lvm
3568 if not _CheckDiskConsistency(self, dev, target_node, False):
3569 if instance.admin_up and not self.op.ignore_consistency:
3570 raise errors.OpExecError("Disk %s is degraded on target node,"
3571 " aborting failover." % dev.iv_name)
3573 feedback_fn("* shutting down instance on source node")
3574 logging.info("Shutting down instance %s on node %s",
3575 instance.name, source_node)
3577 result = self.rpc.call_instance_shutdown(source_node, instance)
3578 msg = result.RemoteFailMsg()
3580 if self.op.ignore_consistency:
3581 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3582 " Proceeding anyway. Please make sure node"
3583 " %s is down. Error details: %s",
3584 instance.name, source_node, source_node, msg)
3586 raise errors.OpExecError("Could not shutdown instance %s on"
3588 (instance.name, source_node, msg))
3590 feedback_fn("* deactivating the instance's disks on source node")
3591 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3592 raise errors.OpExecError("Can't shut down the instance's disks.")
3594 instance.primary_node = target_node
3595 # distribute new instance config to the other nodes
3596 self.cfg.Update(instance)
3598 # Only start the instance if it's marked as up
3599 if instance.admin_up:
3600 feedback_fn("* activating the instance's disks on target node")
3601 logging.info("Starting instance %s on node %s",
3602 instance.name, target_node)
3604 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3605 ignore_secondaries=True)
3607 _ShutdownInstanceDisks(self, instance)
3608 raise errors.OpExecError("Can't activate the instance's disks")
3610 feedback_fn("* starting the instance on the target node")
3611 result = self.rpc.call_instance_start(target_node, instance, None, None)
3612 msg = result.RemoteFailMsg()
3614 _ShutdownInstanceDisks(self, instance)
3615 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3616 (instance.name, target_node, msg))
3619 class LUMigrateInstance(LogicalUnit):
3620 """Migrate an instance.
3622 This is migration without shutting down, compared to the failover,
3623 which is done with shutdown.
3626 HPATH = "instance-migrate"
3627 HTYPE = constants.HTYPE_INSTANCE
3628 _OP_REQP = ["instance_name", "live", "cleanup"]
3632 def ExpandNames(self):
3633 self._ExpandAndLockInstance()
3634 self.needed_locks[locking.LEVEL_NODE] = []
3635 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3637 def DeclareLocks(self, level):
3638 if level == locking.LEVEL_NODE:
3639 self._LockInstancesNodes()
3641 def BuildHooksEnv(self):
3644 This runs on master, primary and secondary nodes of the instance.
3647 env = _BuildInstanceHookEnvByObject(self, self.instance)
3648 env["MIGRATE_LIVE"] = self.op.live
3649 env["MIGRATE_CLEANUP"] = self.op.cleanup
3650 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3653 def CheckPrereq(self):
3654 """Check prerequisites.
3656 This checks that the instance is in the cluster.
3659 instance = self.cfg.GetInstanceInfo(
3660 self.cfg.ExpandInstanceName(self.op.instance_name))
3661 if instance is None:
3662 raise errors.OpPrereqError("Instance '%s' not known" %
3663 self.op.instance_name)
3665 if instance.disk_template != constants.DT_DRBD8:
3666 raise errors.OpPrereqError("Instance's disk layout is not"
3667 " drbd8, cannot migrate.")
3669 secondary_nodes = instance.secondary_nodes
3670 if not secondary_nodes:
3671 raise errors.ConfigurationError("No secondary node but using"
3672 " drbd8 disk template")
3674 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3676 target_node = secondary_nodes[0]
3677 # check memory requirements on the secondary node
3678 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3679 instance.name, i_be[constants.BE_MEMORY],
3680 instance.hypervisor)
3682 # check bridge existance
3683 brlist = [nic.bridge for nic in instance.nics]
3684 result = self.rpc.call_bridges_exist(target_node, brlist)
3685 if result.failed or not result.data:
3686 raise errors.OpPrereqError("One or more target bridges %s does not"
3687 " exist on destination node '%s'" %
3688 (brlist, target_node))
3690 if not self.op.cleanup:
3691 _CheckNodeNotDrained(self, target_node)
3692 result = self.rpc.call_instance_migratable(instance.primary_node,
3694 msg = result.RemoteFailMsg()
3696 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3699 self.instance = instance
3701 def _WaitUntilSync(self):
3702 """Poll with custom rpc for disk sync.
3704 This uses our own step-based rpc call.
3707 self.feedback_fn("* wait until resync is done")
3711 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3713 self.instance.disks)
3715 for node, nres in result.items():
3716 msg = nres.RemoteFailMsg()
3718 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3720 node_done, node_percent = nres.payload
3721 all_done = all_done and node_done
3722 if node_percent is not None:
3723 min_percent = min(min_percent, node_percent)
3725 if min_percent < 100:
3726 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3729 def _EnsureSecondary(self, node):
3730 """Demote a node to secondary.
3733 self.feedback_fn("* switching node %s to secondary mode" % node)
3735 for dev in self.instance.disks:
3736 self.cfg.SetDiskID(dev, node)
3738 result = self.rpc.call_blockdev_close(node, self.instance.name,
3739 self.instance.disks)
3740 msg = result.RemoteFailMsg()
3742 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3743 " error %s" % (node, msg))
3745 def _GoStandalone(self):
3746 """Disconnect from the network.
3749 self.feedback_fn("* changing into standalone mode")
3750 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3751 self.instance.disks)
3752 for node, nres in result.items():
3753 msg = nres.RemoteFailMsg()
3755 raise errors.OpExecError("Cannot disconnect disks node %s,"
3756 " error %s" % (node, msg))
3758 def _GoReconnect(self, multimaster):
3759 """Reconnect to the network.
3765 msg = "single-master"
3766 self.feedback_fn("* changing disks into %s mode" % msg)
3767 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3768 self.instance.disks,
3769 self.instance.name, multimaster)
3770 for node, nres in result.items():
3771 msg = nres.RemoteFailMsg()
3773 raise errors.OpExecError("Cannot change disks config on node %s,"
3774 " error: %s" % (node, msg))
3776 def _ExecCleanup(self):
3777 """Try to cleanup after a failed migration.
3779 The cleanup is done by:
3780 - check that the instance is running only on one node
3781 (and update the config if needed)
3782 - change disks on its secondary node to secondary
3783 - wait until disks are fully synchronized
3784 - disconnect from the network
3785 - change disks into single-master mode
3786 - wait again until disks are fully synchronized
3789 instance = self.instance
3790 target_node = self.target_node
3791 source_node = self.source_node
3793 # check running on only one node
3794 self.feedback_fn("* checking where the instance actually runs"
3795 " (if this hangs, the hypervisor might be in"
3797 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3798 for node, result in ins_l.items():
3800 if not isinstance(result.data, list):
3801 raise errors.OpExecError("Can't contact node '%s'" % node)
3803 runningon_source = instance.name in ins_l[source_node].data
3804 runningon_target = instance.name in ins_l[target_node].data
3806 if runningon_source and runningon_target:
3807 raise errors.OpExecError("Instance seems to be running on two nodes,"
3808 " or the hypervisor is confused. You will have"
3809 " to ensure manually that it runs only on one"
3810 " and restart this operation.")
3812 if not (runningon_source or runningon_target):
3813 raise errors.OpExecError("Instance does not seem to be running at all."
3814 " In this case, it's safer to repair by"
3815 " running 'gnt-instance stop' to ensure disk"
3816 " shutdown, and then restarting it.")
3818 if runningon_target:
3819 # the migration has actually succeeded, we need to update the config
3820 self.feedback_fn("* instance running on secondary node (%s),"
3821 " updating config" % target_node)
3822 instance.primary_node = target_node
3823 self.cfg.Update(instance)
3824 demoted_node = source_node
3826 self.feedback_fn("* instance confirmed to be running on its"
3827 " primary node (%s)" % source_node)
3828 demoted_node = target_node
3830 self._EnsureSecondary(demoted_node)
3832 self._WaitUntilSync()
3833 except errors.OpExecError:
3834 # we ignore here errors, since if the device is standalone, it
3835 # won't be able to sync
3837 self._GoStandalone()
3838 self._GoReconnect(False)
3839 self._WaitUntilSync()
3841 self.feedback_fn("* done")
3843 def _RevertDiskStatus(self):
3844 """Try to revert the disk status after a failed migration.
3847 target_node = self.target_node
3849 self._EnsureSecondary(target_node)
3850 self._GoStandalone()
3851 self._GoReconnect(False)
3852 self._WaitUntilSync()
3853 except errors.OpExecError, err:
3854 self.LogWarning("Migration failed and I can't reconnect the"
3855 " drives: error '%s'\n"
3856 "Please look and recover the instance status" %
3859 def _AbortMigration(self):
3860 """Call the hypervisor code to abort a started migration.
3863 instance = self.instance
3864 target_node = self.target_node
3865 migration_info = self.migration_info
3867 abort_result = self.rpc.call_finalize_migration(target_node,
3871 abort_msg = abort_result.RemoteFailMsg()
3873 logging.error("Aborting migration failed on target node %s: %s" %
3874 (target_node, abort_msg))
3875 # Don't raise an exception here, as we stil have to try to revert the
3876 # disk status, even if this step failed.
3878 def _ExecMigration(self):
3879 """Migrate an instance.
3881 The migrate is done by:
3882 - change the disks into dual-master mode
3883 - wait until disks are fully synchronized again
3884 - migrate the instance
3885 - change disks on the new secondary node (the old primary) to secondary
3886 - wait until disks are fully synchronized
3887 - change disks into single-master mode
3890 instance = self.instance
3891 target_node = self.target_node
3892 source_node = self.source_node
3894 self.feedback_fn("* checking disk consistency between source and target")
3895 for dev in instance.disks:
3896 if not _CheckDiskConsistency(self, dev, target_node, False):
3897 raise errors.OpExecError("Disk %s is degraded or not fully"
3898 " synchronized on target node,"
3899 " aborting migrate." % dev.iv_name)
3901 # First get the migration information from the remote node
3902 result = self.rpc.call_migration_info(source_node, instance)
3903 msg = result.RemoteFailMsg()
3905 log_err = ("Failed fetching source migration information from %s: %s" %
3907 logging.error(log_err)
3908 raise errors.OpExecError(log_err)
3910 self.migration_info = migration_info = result.payload
3912 # Then switch the disks to master/master mode
3913 self._EnsureSecondary(target_node)
3914 self._GoStandalone()
3915 self._GoReconnect(True)
3916 self._WaitUntilSync()
3918 self.feedback_fn("* preparing %s to accept the instance" % target_node)
3919 result = self.rpc.call_accept_instance(target_node,
3922 self.nodes_ip[target_node])
3924 msg = result.RemoteFailMsg()
3926 logging.error("Instance pre-migration failed, trying to revert"
3927 " disk status: %s", msg)
3928 self._AbortMigration()
3929 self._RevertDiskStatus()
3930 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3931 (instance.name, msg))
3933 self.feedback_fn("* migrating instance to %s" % target_node)
3935 result = self.rpc.call_instance_migrate(source_node, instance,
3936 self.nodes_ip[target_node],
3938 msg = result.RemoteFailMsg()
3940 logging.error("Instance migration failed, trying to revert"
3941 " disk status: %s", msg)
3942 self._AbortMigration()
3943 self._RevertDiskStatus()
3944 raise errors.OpExecError("Could not migrate instance %s: %s" %
3945 (instance.name, msg))
3948 instance.primary_node = target_node
3949 # distribute new instance config to the other nodes
3950 self.cfg.Update(instance)
3952 result = self.rpc.call_finalize_migration(target_node,
3956 msg = result.RemoteFailMsg()
3958 logging.error("Instance migration succeeded, but finalization failed:"
3960 raise errors.OpExecError("Could not finalize instance migration: %s" %
3963 self._EnsureSecondary(source_node)
3964 self._WaitUntilSync()
3965 self._GoStandalone()
3966 self._GoReconnect(False)
3967 self._WaitUntilSync()
3969 self.feedback_fn("* done")
3971 def Exec(self, feedback_fn):
3972 """Perform the migration.
3975 self.feedback_fn = feedback_fn
3977 self.source_node = self.instance.primary_node
3978 self.target_node = self.instance.secondary_nodes[0]
3979 self.all_nodes = [self.source_node, self.target_node]
3981 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3982 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3985 return self._ExecCleanup()
3987 return self._ExecMigration()
3990 def _CreateBlockDev(lu, node, instance, device, force_create,
3992 """Create a tree of block devices on a given node.
3994 If this device type has to be created on secondaries, create it and
3997 If not, just recurse to children keeping the same 'force' value.
3999 @param lu: the lu on whose behalf we execute
4000 @param node: the node on which to create the device
4001 @type instance: L{objects.Instance}
4002 @param instance: the instance which owns the device
4003 @type device: L{objects.Disk}
4004 @param device: the device to create
4005 @type force_create: boolean
4006 @param force_create: whether to force creation of this device; this
4007 will be change to True whenever we find a device which has
4008 CreateOnSecondary() attribute
4009 @param info: the extra 'metadata' we should attach to the device
4010 (this will be represented as a LVM tag)
4011 @type force_open: boolean
4012 @param force_open: this parameter will be passes to the
4013 L{backend.BlockdevCreate} function where it specifies
4014 whether we run on primary or not, and it affects both
4015 the child assembly and the device own Open() execution
4018 if device.CreateOnSecondary():
4022 for child in device.children:
4023 _CreateBlockDev(lu, node, instance, child, force_create,
4026 if not force_create:
4029 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4032 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4033 """Create a single block device on a given node.
4035 This will not recurse over children of the device, so they must be
4038 @param lu: the lu on whose behalf we execute
4039 @param node: the node on which to create the device
4040 @type instance: L{objects.Instance}
4041 @param instance: the instance which owns the device
4042 @type device: L{objects.Disk}
4043 @param device: the device to create
4044 @param info: the extra 'metadata' we should attach to the device
4045 (this will be represented as a LVM tag)
4046 @type force_open: boolean
4047 @param force_open: this parameter will be passes to the
4048 L{backend.BlockdevCreate} function where it specifies
4049 whether we run on primary or not, and it affects both
4050 the child assembly and the device own Open() execution
4053 lu.cfg.SetDiskID(device, node)
4054 result = lu.rpc.call_blockdev_create(node, device, device.size,
4055 instance.name, force_open, info)
4056 msg = result.RemoteFailMsg()
4058 raise errors.OpExecError("Can't create block device %s on"
4059 " node %s for instance %s: %s" %
4060 (device, node, instance.name, msg))
4061 if device.physical_id is None:
4062 device.physical_id = result.payload
4065 def _GenerateUniqueNames(lu, exts):
4066 """Generate a suitable LV name.
4068 This will generate a logical volume name for the given instance.
4073 new_id = lu.cfg.GenerateUniqueID()
4074 results.append("%s%s" % (new_id, val))
4078 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4080 """Generate a drbd8 device complete with its children.
4083 port = lu.cfg.AllocatePort()
4084 vgname = lu.cfg.GetVGName()
4085 shared_secret = lu.cfg.GenerateDRBDSecret()
4086 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4087 logical_id=(vgname, names[0]))
4088 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4089 logical_id=(vgname, names[1]))
4090 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4091 logical_id=(primary, secondary, port,
4094 children=[dev_data, dev_meta],
4099 def _GenerateDiskTemplate(lu, template_name,
4100 instance_name, primary_node,
4101 secondary_nodes, disk_info,
4102 file_storage_dir, file_driver,
4104 """Generate the entire disk layout for a given template type.
4107 #TODO: compute space requirements
4109 vgname = lu.cfg.GetVGName()
4110 disk_count = len(disk_info)
4112 if template_name == constants.DT_DISKLESS:
4114 elif template_name == constants.DT_PLAIN:
4115 if len(secondary_nodes) != 0:
4116 raise errors.ProgrammerError("Wrong template configuration")
4118 names = _GenerateUniqueNames(lu, [".disk%d" % i
4119 for i in range(disk_count)])
4120 for idx, disk in enumerate(disk_info):
4121 disk_index = idx + base_index
4122 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4123 logical_id=(vgname, names[idx]),
4124 iv_name="disk/%d" % disk_index,
4126 disks.append(disk_dev)
4127 elif template_name == constants.DT_DRBD8:
4128 if len(secondary_nodes) != 1:
4129 raise errors.ProgrammerError("Wrong template configuration")
4130 remote_node = secondary_nodes[0]
4131 minors = lu.cfg.AllocateDRBDMinor(
4132 [primary_node, remote_node] * len(disk_info), instance_name)
4135 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4136 for i in range(disk_count)]):
4137 names.append(lv_prefix + "_data")
4138 names.append(lv_prefix + "_meta")
4139 for idx, disk in enumerate(disk_info):
4140 disk_index = idx + base_index
4141 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4142 disk["size"], names[idx*2:idx*2+2],
4143 "disk/%d" % disk_index,
4144 minors[idx*2], minors[idx*2+1])
4145 disk_dev.mode = disk["mode"]
4146 disks.append(disk_dev)
4147 elif template_name == constants.DT_FILE:
4148 if len(secondary_nodes) != 0:
4149 raise errors.ProgrammerError("Wrong template configuration")
4151 for idx, disk in enumerate(disk_info):
4152 disk_index = idx + base_index
4153 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4154 iv_name="disk/%d" % disk_index,
4155 logical_id=(file_driver,
4156 "%s/disk%d" % (file_storage_dir,
4159 disks.append(disk_dev)
4161 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4165 def _GetInstanceInfoText(instance):
4166 """Compute that text that should be added to the disk's metadata.
4169 return "originstname+%s" % instance.name
4172 def _CreateDisks(lu, instance):
4173 """Create all disks for an instance.
4175 This abstracts away some work from AddInstance.
4177 @type lu: L{LogicalUnit}
4178 @param lu: the logical unit on whose behalf we execute
4179 @type instance: L{objects.Instance}
4180 @param instance: the instance whose disks we should create
4182 @return: the success of the creation
4185 info = _GetInstanceInfoText(instance)
4186 pnode = instance.primary_node
4188 if instance.disk_template == constants.DT_FILE:
4189 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4190 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4192 if result.failed or not result.data:
4193 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4195 if not result.data[0]:
4196 raise errors.OpExecError("Failed to create directory '%s'" %
4199 # Note: this needs to be kept in sync with adding of disks in
4200 # LUSetInstanceParams
4201 for device in instance.disks:
4202 logging.info("Creating volume %s for instance %s",
4203 device.iv_name, instance.name)
4205 for node in instance.all_nodes:
4206 f_create = node == pnode
4207 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4210 def _RemoveDisks(lu, instance):
4211 """Remove all disks for an instance.
4213 This abstracts away some work from `AddInstance()` and
4214 `RemoveInstance()`. Note that in case some of the devices couldn't
4215 be removed, the removal will continue with the other ones (compare
4216 with `_CreateDisks()`).
4218 @type lu: L{LogicalUnit}
4219 @param lu: the logical unit on whose behalf we execute
4220 @type instance: L{objects.Instance}
4221 @param instance: the instance whose disks we should remove
4223 @return: the success of the removal
4226 logging.info("Removing block devices for instance %s", instance.name)
4229 for device in instance.disks:
4230 for node, disk in device.ComputeNodeTree(instance.primary_node):
4231 lu.cfg.SetDiskID(disk, node)
4232 msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4234 lu.LogWarning("Could not remove block device %s on node %s,"
4235 " continuing anyway: %s", device.iv_name, node, msg)
4238 if instance.disk_template == constants.DT_FILE:
4239 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4240 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4242 if result.failed or not result.data:
4243 logging.error("Could not remove directory '%s'", file_storage_dir)
4249 def _ComputeDiskSize(disk_template, disks):
4250 """Compute disk size requirements in the volume group
4253 # Required free disk space as a function of disk and swap space
4255 constants.DT_DISKLESS: None,
4256 constants.DT_PLAIN: sum(d["size"] for d in disks),
4257 # 128 MB are added for drbd metadata for each disk
4258 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4259 constants.DT_FILE: None,
4262 if disk_template not in req_size_dict:
4263 raise errors.ProgrammerError("Disk template '%s' size requirement"
4264 " is unknown" % disk_template)
4266 return req_size_dict[disk_template]
4269 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4270 """Hypervisor parameter validation.
4272 This function abstract the hypervisor parameter validation to be
4273 used in both instance create and instance modify.
4275 @type lu: L{LogicalUnit}
4276 @param lu: the logical unit for which we check
4277 @type nodenames: list
4278 @param nodenames: the list of nodes on which we should check
4279 @type hvname: string
4280 @param hvname: the name of the hypervisor we should use
4281 @type hvparams: dict
4282 @param hvparams: the parameters which we need to check
4283 @raise errors.OpPrereqError: if the parameters are not valid
4286 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4289 for node in nodenames:
4293 msg = info.RemoteFailMsg()
4295 raise errors.OpPrereqError("Hypervisor parameter validation"
4296 " failed on node %s: %s" % (node, msg))
4299 class LUCreateInstance(LogicalUnit):
4300 """Create an instance.
4303 HPATH = "instance-add"
4304 HTYPE = constants.HTYPE_INSTANCE
4305 _OP_REQP = ["instance_name", "disks", "disk_template",
4307 "wait_for_sync", "ip_check", "nics",
4308 "hvparams", "beparams"]
4311 def _ExpandNode(self, node):
4312 """Expands and checks one node name.
4315 node_full = self.cfg.ExpandNodeName(node)
4316 if node_full is None:
4317 raise errors.OpPrereqError("Unknown node %s" % node)
4320 def ExpandNames(self):
4321 """ExpandNames for CreateInstance.
4323 Figure out the right locks for instance creation.
4326 self.needed_locks = {}
4328 # set optional parameters to none if they don't exist
4329 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4330 if not hasattr(self.op, attr):
4331 setattr(self.op, attr, None)
4333 # cheap checks, mostly valid constants given
4335 # verify creation mode
4336 if self.op.mode not in (constants.INSTANCE_CREATE,
4337 constants.INSTANCE_IMPORT):
4338 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4341 # disk template and mirror node verification
4342 if self.op.disk_template not in constants.DISK_TEMPLATES:
4343 raise errors.OpPrereqError("Invalid disk template name")
4345 if self.op.hypervisor is None:
4346 self.op.hypervisor = self.cfg.GetHypervisorType()
4348 cluster = self.cfg.GetClusterInfo()
4349 enabled_hvs = cluster.enabled_hypervisors
4350 if self.op.hypervisor not in enabled_hvs:
4351 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4352 " cluster (%s)" % (self.op.hypervisor,
4353 ",".join(enabled_hvs)))
4355 # check hypervisor parameter syntax (locally)
4356 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4357 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4359 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4360 hv_type.CheckParameterSyntax(filled_hvp)
4361 self.hv_full = filled_hvp
4363 # fill and remember the beparams dict
4364 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4365 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4368 #### instance parameters check
4370 # instance name verification
4371 hostname1 = utils.HostInfo(self.op.instance_name)
4372 self.op.instance_name = instance_name = hostname1.name
4374 # this is just a preventive check, but someone might still add this
4375 # instance in the meantime, and creation will fail at lock-add time
4376 if instance_name in self.cfg.GetInstanceList():
4377 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4380 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4384 for nic in self.op.nics:
4385 # ip validity checks
4386 ip = nic.get("ip", None)
4387 if ip is None or ip.lower() == "none":
4389 elif ip.lower() == constants.VALUE_AUTO:
4390 nic_ip = hostname1.ip
4392 if not utils.IsValidIP(ip):
4393 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4394 " like a valid IP" % ip)
4397 # MAC address verification
4398 mac = nic.get("mac", constants.VALUE_AUTO)
4399 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4400 if not utils.IsValidMac(mac.lower()):
4401 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4403 # bridge verification
4404 bridge = nic.get("bridge", None)
4406 bridge = self.cfg.GetDefBridge()
4407 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4409 # disk checks/pre-build
4411 for disk in self.op.disks:
4412 mode = disk.get("mode", constants.DISK_RDWR)
4413 if mode not in constants.DISK_ACCESS_SET:
4414 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4416 size = disk.get("size", None)
4418 raise errors.OpPrereqError("Missing disk size")
4422 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4423 self.disks.append({"size": size, "mode": mode})
4425 # used in CheckPrereq for ip ping check
4426 self.check_ip = hostname1.ip
4428 # file storage checks
4429 if (self.op.file_driver and
4430 not self.op.file_driver in constants.FILE_DRIVER):
4431 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4432 self.op.file_driver)
4434 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4435 raise errors.OpPrereqError("File storage directory path not absolute")
4437 ### Node/iallocator related checks
4438 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4439 raise errors.OpPrereqError("One and only one of iallocator and primary"
4440 " node must be given")
4442 if self.op.iallocator:
4443 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4445 self.op.pnode = self._ExpandNode(self.op.pnode)
4446 nodelist = [self.op.pnode]
4447 if self.op.snode is not None:
4448 self.op.snode = self._ExpandNode(self.op.snode)
4449 nodelist.append(self.op.snode)
4450 self.needed_locks[locking.LEVEL_NODE] = nodelist
4452 # in case of import lock the source node too
4453 if self.op.mode == constants.INSTANCE_IMPORT:
4454 src_node = getattr(self.op, "src_node", None)
4455 src_path = getattr(self.op, "src_path", None)
4457 if src_path is None:
4458 self.op.src_path = src_path = self.op.instance_name
4460 if src_node is None:
4461 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4462 self.op.src_node = None
4463 if os.path.isabs(src_path):
4464 raise errors.OpPrereqError("Importing an instance from an absolute"
4465 " path requires a source node option.")
4467 self.op.src_node = src_node = self._ExpandNode(src_node)
4468 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4469 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4470 if not os.path.isabs(src_path):
4471 self.op.src_path = src_path = \
4472 os.path.join(constants.EXPORT_DIR, src_path)
4474 else: # INSTANCE_CREATE
4475 if getattr(self.op, "os_type", None) is None:
4476 raise errors.OpPrereqError("No guest OS specified")
4478 def _RunAllocator(self):
4479 """Run the allocator based on input opcode.
4482 nics = [n.ToDict() for n in self.nics]
4483 ial = IAllocator(self,
4484 mode=constants.IALLOCATOR_MODE_ALLOC,
4485 name=self.op.instance_name,
4486 disk_template=self.op.disk_template,
4489 vcpus=self.be_full[constants.BE_VCPUS],
4490 mem_size=self.be_full[constants.BE_MEMORY],
4493 hypervisor=self.op.hypervisor,
4496 ial.Run(self.op.iallocator)
4499 raise errors.OpPrereqError("Can't compute nodes using"
4500 " iallocator '%s': %s" % (self.op.iallocator,
4502 if len(ial.nodes) != ial.required_nodes:
4503 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4504 " of nodes (%s), required %s" %
4505 (self.op.iallocator, len(ial.nodes),
4506 ial.required_nodes))
4507 self.op.pnode = ial.nodes[0]
4508 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4509 self.op.instance_name, self.op.iallocator,
4510 ", ".join(ial.nodes))
4511 if ial.required_nodes == 2:
4512 self.op.snode = ial.nodes[1]
4514 def BuildHooksEnv(self):
4517 This runs on master, primary and secondary nodes of the instance.
4521 "ADD_MODE": self.op.mode,
4523 if self.op.mode == constants.INSTANCE_IMPORT:
4524 env["SRC_NODE"] = self.op.src_node
4525 env["SRC_PATH"] = self.op.src_path
4526 env["SRC_IMAGES"] = self.src_images
4528 env.update(_BuildInstanceHookEnv(
4529 name=self.op.instance_name,
4530 primary_node=self.op.pnode,
4531 secondary_nodes=self.secondaries,
4532 status=self.op.start,
4533 os_type=self.op.os_type,
4534 memory=self.be_full[constants.BE_MEMORY],
4535 vcpus=self.be_full[constants.BE_VCPUS],
4536 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4537 disk_template=self.op.disk_template,
4538 disks=[(d["size"], d["mode"]) for d in self.disks],
4541 hypervisor=self.op.hypervisor,
4544 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4549 def CheckPrereq(self):
4550 """Check prerequisites.
4553 if (not self.cfg.GetVGName() and
4554 self.op.disk_template not in constants.DTS_NOT_LVM):
4555 raise errors.OpPrereqError("Cluster does not support lvm-based"
4558 if self.op.mode == constants.INSTANCE_IMPORT:
4559 src_node = self.op.src_node
4560 src_path = self.op.src_path
4562 if src_node is None:
4563 exp_list = self.rpc.call_export_list(
4564 self.acquired_locks[locking.LEVEL_NODE])
4566 for node in exp_list:
4567 if not exp_list[node].failed and src_path in exp_list[node].data:
4569 self.op.src_node = src_node = node
4570 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4574 raise errors.OpPrereqError("No export found for relative path %s" %
4577 _CheckNodeOnline(self, src_node)
4578 result = self.rpc.call_export_info(src_node, src_path)
4581 raise errors.OpPrereqError("No export found in dir %s" % src_path)
4583 export_info = result.data
4584 if not export_info.has_section(constants.INISECT_EXP):
4585 raise errors.ProgrammerError("Corrupted export config")
4587 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4588 if (int(ei_version) != constants.EXPORT_VERSION):
4589 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4590 (ei_version, constants.EXPORT_VERSION))
4592 # Check that the new instance doesn't have less disks than the export
4593 instance_disks = len(self.disks)
4594 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4595 if instance_disks < export_disks:
4596 raise errors.OpPrereqError("Not enough disks to import."
4597 " (instance: %d, export: %d)" %
4598 (instance_disks, export_disks))
4600 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4602 for idx in range(export_disks):
4603 option = 'disk%d_dump' % idx
4604 if export_info.has_option(constants.INISECT_INS, option):
4605 # FIXME: are the old os-es, disk sizes, etc. useful?
4606 export_name = export_info.get(constants.INISECT_INS, option)
4607 image = os.path.join(src_path, export_name)
4608 disk_images.append(image)
4610 disk_images.append(False)
4612 self.src_images = disk_images
4614 old_name = export_info.get(constants.INISECT_INS, 'name')
4615 # FIXME: int() here could throw a ValueError on broken exports
4616 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4617 if self.op.instance_name == old_name:
4618 for idx, nic in enumerate(self.nics):
4619 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4620 nic_mac_ini = 'nic%d_mac' % idx
4621 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4623 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4624 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4625 if self.op.start and not self.op.ip_check:
4626 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4627 " adding an instance in start mode")
4629 if self.op.ip_check:
4630 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4631 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4632 (self.check_ip, self.op.instance_name))
4634 #### mac address generation
4635 # By generating here the mac address both the allocator and the hooks get
4636 # the real final mac address rather than the 'auto' or 'generate' value.
4637 # There is a race condition between the generation and the instance object
4638 # creation, which means that we know the mac is valid now, but we're not
4639 # sure it will be when we actually add the instance. If things go bad
4640 # adding the instance will abort because of a duplicate mac, and the
4641 # creation job will fail.
4642 for nic in self.nics:
4643 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4644 nic.mac = self.cfg.GenerateMAC()
4648 if self.op.iallocator is not None:
4649 self._RunAllocator()
4651 #### node related checks
4653 # check primary node
4654 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4655 assert self.pnode is not None, \
4656 "Cannot retrieve locked node %s" % self.op.pnode
4658 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4661 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4664 self.secondaries = []
4666 # mirror node verification
4667 if self.op.disk_template in constants.DTS_NET_MIRROR:
4668 if self.op.snode is None:
4669 raise errors.OpPrereqError("The networked disk templates need"
4671 if self.op.snode == pnode.name:
4672 raise errors.OpPrereqError("The secondary node cannot be"
4673 " the primary node.")
4674 _CheckNodeOnline(self, self.op.snode)
4675 _CheckNodeNotDrained(self, self.op.snode)
4676 self.secondaries.append(self.op.snode)
4678 nodenames = [pnode.name] + self.secondaries
4680 req_size = _ComputeDiskSize(self.op.disk_template,
4683 # Check lv size requirements
4684 if req_size is not None:
4685 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4687 for node in nodenames:
4688 info = nodeinfo[node]
4692 raise errors.OpPrereqError("Cannot get current information"
4693 " from node '%s'" % node)
4694 vg_free = info.get('vg_free', None)
4695 if not isinstance(vg_free, int):
4696 raise errors.OpPrereqError("Can't compute free disk space on"
4698 if req_size > info['vg_free']:
4699 raise errors.OpPrereqError("Not enough disk space on target node %s."
4700 " %d MB available, %d MB required" %
4701 (node, info['vg_free'], req_size))
4703 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4706 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4708 if not isinstance(result.data, objects.OS):
4709 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4710 " primary node" % self.op.os_type)
4712 # bridge check on primary node
4713 bridges = [n.bridge for n in self.nics]
4714 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4717 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4718 " exist on destination node '%s'" %
4719 (",".join(bridges), pnode.name))
4721 # memory check on primary node
4723 _CheckNodeFreeMemory(self, self.pnode.name,
4724 "creating instance %s" % self.op.instance_name,
4725 self.be_full[constants.BE_MEMORY],
4728 def Exec(self, feedback_fn):
4729 """Create and add the instance to the cluster.
4732 instance = self.op.instance_name
4733 pnode_name = self.pnode.name
4735 ht_kind = self.op.hypervisor
4736 if ht_kind in constants.HTS_REQ_PORT:
4737 network_port = self.cfg.AllocatePort()
4741 ##if self.op.vnc_bind_address is None:
4742 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4744 # this is needed because os.path.join does not accept None arguments
4745 if self.op.file_storage_dir is None:
4746 string_file_storage_dir = ""
4748 string_file_storage_dir = self.op.file_storage_dir
4750 # build the full file storage dir path
4751 file_storage_dir = os.path.normpath(os.path.join(
4752 self.cfg.GetFileStorageDir(),
4753 string_file_storage_dir, instance))
4756 disks = _GenerateDiskTemplate(self,
4757 self.op.disk_template,
4758 instance, pnode_name,
4762 self.op.file_driver,
4765 iobj = objects.Instance(name=instance, os=self.op.os_type,
4766 primary_node=pnode_name,
4767 nics=self.nics, disks=disks,
4768 disk_template=self.op.disk_template,
4770 network_port=network_port,
4771 beparams=self.op.beparams,
4772 hvparams=self.op.hvparams,
4773 hypervisor=self.op.hypervisor,
4776 feedback_fn("* creating instance disks...")
4778 _CreateDisks(self, iobj)
4779 except errors.OpExecError:
4780 self.LogWarning("Device creation failed, reverting...")
4782 _RemoveDisks(self, iobj)
4784 self.cfg.ReleaseDRBDMinors(instance)
4787 feedback_fn("adding instance %s to cluster config" % instance)
4789 self.cfg.AddInstance(iobj)
4790 # Declare that we don't want to remove the instance lock anymore, as we've
4791 # added the instance to the config
4792 del self.remove_locks[locking.LEVEL_INSTANCE]
4793 # Unlock all the nodes
4794 if self.op.mode == constants.INSTANCE_IMPORT:
4795 nodes_keep = [self.op.src_node]
4796 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4797 if node != self.op.src_node]
4798 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4799 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4801 self.context.glm.release(locking.LEVEL_NODE)
4802 del self.acquired_locks[locking.LEVEL_NODE]
4804 if self.op.wait_for_sync:
4805 disk_abort = not _WaitForSync(self, iobj)
4806 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4807 # make sure the disks are not degraded (still sync-ing is ok)
4809 feedback_fn("* checking mirrors status")
4810 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4815 _RemoveDisks(self, iobj)
4816 self.cfg.RemoveInstance(iobj.name)
4817 # Make sure the instance lock gets removed
4818 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4819 raise errors.OpExecError("There are some degraded disks for"
4822 feedback_fn("creating os for instance %s on node %s" %
4823 (instance, pnode_name))
4825 if iobj.disk_template != constants.DT_DISKLESS:
4826 if self.op.mode == constants.INSTANCE_CREATE:
4827 feedback_fn("* running the instance OS create scripts...")
4828 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4829 msg = result.RemoteFailMsg()
4831 raise errors.OpExecError("Could not add os for instance %s"
4833 (instance, pnode_name, msg))
4835 elif self.op.mode == constants.INSTANCE_IMPORT:
4836 feedback_fn("* running the instance OS import scripts...")
4837 src_node = self.op.src_node
4838 src_images = self.src_images
4839 cluster_name = self.cfg.GetClusterName()
4840 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4841 src_node, src_images,
4843 import_result.Raise()
4844 for idx, result in enumerate(import_result.data):
4846 self.LogWarning("Could not import the image %s for instance"
4847 " %s, disk %d, on node %s" %
4848 (src_images[idx], instance, idx, pnode_name))
4850 # also checked in the prereq part
4851 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4855 iobj.admin_up = True
4856 self.cfg.Update(iobj)
4857 logging.info("Starting instance %s on node %s", instance, pnode_name)
4858 feedback_fn("* starting instance...")
4859 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4860 msg = result.RemoteFailMsg()
4862 raise errors.OpExecError("Could not start instance: %s" % msg)
4865 class LUConnectConsole(NoHooksLU):
4866 """Connect to an instance's console.
4868 This is somewhat special in that it returns the command line that
4869 you need to run on the master node in order to connect to the
4873 _OP_REQP = ["instance_name"]
4876 def ExpandNames(self):
4877 self._ExpandAndLockInstance()
4879 def CheckPrereq(self):
4880 """Check prerequisites.
4882 This checks that the instance is in the cluster.
4885 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4886 assert self.instance is not None, \
4887 "Cannot retrieve locked instance %s" % self.op.instance_name
4888 _CheckNodeOnline(self, self.instance.primary_node)
4890 def Exec(self, feedback_fn):
4891 """Connect to the console of an instance
4894 instance = self.instance
4895 node = instance.primary_node
4897 node_insts = self.rpc.call_instance_list([node],
4898 [instance.hypervisor])[node]
4901 if instance.name not in node_insts.data:
4902 raise errors.OpExecError("Instance %s is not running." % instance.name)
4904 logging.debug("Connecting to console of %s on %s", instance.name, node)
4906 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4907 cluster = self.cfg.GetClusterInfo()
4908 # beparams and hvparams are passed separately, to avoid editing the
4909 # instance and then saving the defaults in the instance itself.
4910 hvparams = cluster.FillHV(instance)
4911 beparams = cluster.FillBE(instance)
4912 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4915 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4918 class LUReplaceDisks(LogicalUnit):
4919 """Replace the disks of an instance.
4922 HPATH = "mirrors-replace"
4923 HTYPE = constants.HTYPE_INSTANCE
4924 _OP_REQP = ["instance_name", "mode", "disks"]
4927 def CheckArguments(self):
4928 if not hasattr(self.op, "remote_node"):
4929 self.op.remote_node = None
4930 if not hasattr(self.op, "iallocator"):
4931 self.op.iallocator = None
4933 # check for valid parameter combination
4934 cnt = [self.op.remote_node, self.op.iallocator].count(None)
4935 if self.op.mode == constants.REPLACE_DISK_CHG:
4937 raise errors.OpPrereqError("When changing the secondary either an"
4938 " iallocator script must be used or the"
4941 raise errors.OpPrereqError("Give either the iallocator or the new"
4942 " secondary, not both")
4943 else: # not replacing the secondary
4945 raise errors.OpPrereqError("The iallocator and new node options can"
4946 " be used only when changing the"
4949 def ExpandNames(self):
4950 self._ExpandAndLockInstance()
4952 if self.op.iallocator is not None:
4953 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4954 elif self.op.remote_node is not None:
4955 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4956 if remote_node is None:
4957 raise errors.OpPrereqError("Node '%s' not known" %
4958 self.op.remote_node)
4959 self.op.remote_node = remote_node
4960 # Warning: do not remove the locking of the new secondary here
4961 # unless DRBD8.AddChildren is changed to work in parallel;
4962 # currently it doesn't since parallel invocations of
4963 # FindUnusedMinor will conflict
4964 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4965 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4967 self.needed_locks[locking.LEVEL_NODE] = []
4968 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4970 def DeclareLocks(self, level):
4971 # If we're not already locking all nodes in the set we have to declare the
4972 # instance's primary/secondary nodes.
4973 if (level == locking.LEVEL_NODE and
4974 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4975 self._LockInstancesNodes()
4977 def _RunAllocator(self):
4978 """Compute a new secondary node using an IAllocator.
4981 ial = IAllocator(self,
4982 mode=constants.IALLOCATOR_MODE_RELOC,
4983 name=self.op.instance_name,
4984 relocate_from=[self.sec_node])
4986 ial.Run(self.op.iallocator)
4989 raise errors.OpPrereqError("Can't compute nodes using"
4990 " iallocator '%s': %s" % (self.op.iallocator,
4992 if len(ial.nodes) != ial.required_nodes:
4993 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4994 " of nodes (%s), required %s" %
4995 (len(ial.nodes), ial.required_nodes))
4996 self.op.remote_node = ial.nodes[0]
4997 self.LogInfo("Selected new secondary for the instance: %s",
4998 self.op.remote_node)
5000 def BuildHooksEnv(self):
5003 This runs on the master, the primary and all the secondaries.
5007 "MODE": self.op.mode,
5008 "NEW_SECONDARY": self.op.remote_node,
5009 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5011 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5013 self.cfg.GetMasterNode(),
5014 self.instance.primary_node,
5016 if self.op.remote_node is not None:
5017 nl.append(self.op.remote_node)
5020 def CheckPrereq(self):
5021 """Check prerequisites.
5023 This checks that the instance is in the cluster.
5026 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5027 assert instance is not None, \
5028 "Cannot retrieve locked instance %s" % self.op.instance_name
5029 self.instance = instance
5031 if instance.disk_template != constants.DT_DRBD8:
5032 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5035 if len(instance.secondary_nodes) != 1:
5036 raise errors.OpPrereqError("The instance has a strange layout,"
5037 " expected one secondary but found %d" %
5038 len(instance.secondary_nodes))
5040 self.sec_node = instance.secondary_nodes[0]
5042 if self.op.iallocator is not None:
5043 self._RunAllocator()
5045 remote_node = self.op.remote_node
5046 if remote_node is not None:
5047 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5048 assert self.remote_node_info is not None, \
5049 "Cannot retrieve locked node %s" % remote_node
5051 self.remote_node_info = None
5052 if remote_node == instance.primary_node:
5053 raise errors.OpPrereqError("The specified node is the primary node of"
5055 elif remote_node == self.sec_node:
5056 raise errors.OpPrereqError("The specified node is already the"
5057 " secondary node of the instance.")
5059 if self.op.mode == constants.REPLACE_DISK_PRI:
5060 n1 = self.tgt_node = instance.primary_node
5061 n2 = self.oth_node = self.sec_node
5062 elif self.op.mode == constants.REPLACE_DISK_SEC:
5063 n1 = self.tgt_node = self.sec_node
5064 n2 = self.oth_node = instance.primary_node
5065 elif self.op.mode == constants.REPLACE_DISK_CHG:
5066 n1 = self.new_node = remote_node
5067 n2 = self.oth_node = instance.primary_node
5068 self.tgt_node = self.sec_node
5069 _CheckNodeNotDrained(self, remote_node)
5071 raise errors.ProgrammerError("Unhandled disk replace mode")
5073 _CheckNodeOnline(self, n1)
5074 _CheckNodeOnline(self, n2)
5076 if not self.op.disks:
5077 self.op.disks = range(len(instance.disks))
5079 for disk_idx in self.op.disks:
5080 instance.FindDisk(disk_idx)
5082 def _ExecD8DiskOnly(self, feedback_fn):
5083 """Replace a disk on the primary or secondary for dbrd8.
5085 The algorithm for replace is quite complicated:
5087 1. for each disk to be replaced:
5089 1. create new LVs on the target node with unique names
5090 1. detach old LVs from the drbd device
5091 1. rename old LVs to name_replaced.<time_t>
5092 1. rename new LVs to old LVs
5093 1. attach the new LVs (with the old names now) to the drbd device
5095 1. wait for sync across all devices
5097 1. for each modified disk:
5099 1. remove old LVs (which have the name name_replaces.<time_t>)
5101 Failures are not very well handled.
5105 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5106 instance = self.instance
5108 vgname = self.cfg.GetVGName()
5111 tgt_node = self.tgt_node
5112 oth_node = self.oth_node
5114 # Step: check device activation
5115 self.proc.LogStep(1, steps_total, "check device existence")
5116 info("checking volume groups")
5117 my_vg = cfg.GetVGName()
5118 results = self.rpc.call_vg_list([oth_node, tgt_node])
5120 raise errors.OpExecError("Can't list volume groups on the nodes")
5121 for node in oth_node, tgt_node:
5123 if res.failed or not res.data or my_vg not in res.data:
5124 raise errors.OpExecError("Volume group '%s' not found on %s" %
5126 for idx, dev in enumerate(instance.disks):
5127 if idx not in self.op.disks:
5129 for node in tgt_node, oth_node:
5130 info("checking disk/%d on %s" % (idx, node))
5131 cfg.SetDiskID(dev, node)
5132 result = self.rpc.call_blockdev_find(node, dev)
5133 msg = result.RemoteFailMsg()
5134 if not msg and not result.payload:
5135 msg = "disk not found"
5137 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5140 # Step: check other node consistency
5141 self.proc.LogStep(2, steps_total, "check peer consistency")
5142 for idx, dev in enumerate(instance.disks):
5143 if idx not in self.op.disks:
5145 info("checking disk/%d consistency on %s" % (idx, oth_node))
5146 if not _CheckDiskConsistency(self, dev, oth_node,
5147 oth_node==instance.primary_node):
5148 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5149 " to replace disks on this node (%s)" %
5150 (oth_node, tgt_node))
5152 # Step: create new storage
5153 self.proc.LogStep(3, steps_total, "allocate new storage")
5154 for idx, dev in enumerate(instance.disks):
5155 if idx not in self.op.disks:
5158 cfg.SetDiskID(dev, tgt_node)
5159 lv_names = [".disk%d_%s" % (idx, suf)
5160 for suf in ["data", "meta"]]
5161 names = _GenerateUniqueNames(self, lv_names)
5162 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5163 logical_id=(vgname, names[0]))
5164 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5165 logical_id=(vgname, names[1]))
5166 new_lvs = [lv_data, lv_meta]
5167 old_lvs = dev.children
5168 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5169 info("creating new local storage on %s for %s" %
5170 (tgt_node, dev.iv_name))
5171 # we pass force_create=True to force the LVM creation
5172 for new_lv in new_lvs:
5173 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5174 _GetInstanceInfoText(instance), False)
5176 # Step: for each lv, detach+rename*2+attach
5177 self.proc.LogStep(4, steps_total, "change drbd configuration")
5178 for dev, old_lvs, new_lvs in iv_names.itervalues():
5179 info("detaching %s drbd from local storage" % dev.iv_name)
5180 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5183 raise errors.OpExecError("Can't detach drbd from local storage on node"
5184 " %s for device %s" % (tgt_node, dev.iv_name))
5186 #cfg.Update(instance)
5188 # ok, we created the new LVs, so now we know we have the needed
5189 # storage; as such, we proceed on the target node to rename
5190 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5191 # using the assumption that logical_id == physical_id (which in
5192 # turn is the unique_id on that node)
5194 # FIXME(iustin): use a better name for the replaced LVs
5195 temp_suffix = int(time.time())
5196 ren_fn = lambda d, suff: (d.physical_id[0],
5197 d.physical_id[1] + "_replaced-%s" % suff)
5198 # build the rename list based on what LVs exist on the node
5200 for to_ren in old_lvs:
5201 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5202 if not result.RemoteFailMsg() and result.payload:
5204 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5206 info("renaming the old LVs on the target node")
5207 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5210 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5211 # now we rename the new LVs to the old LVs
5212 info("renaming the new LVs on the target node")
5213 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5214 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5217 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5219 for old, new in zip(old_lvs, new_lvs):
5220 new.logical_id = old.logical_id
5221 cfg.SetDiskID(new, tgt_node)
5223 for disk in old_lvs:
5224 disk.logical_id = ren_fn(disk, temp_suffix)
5225 cfg.SetDiskID(disk, tgt_node)
5227 # now that the new lvs have the old name, we can add them to the device
5228 info("adding new mirror component on %s" % tgt_node)
5229 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5230 if result.failed or not result.data:
5231 for new_lv in new_lvs:
5232 msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5234 warning("Can't rollback device %s: %s", dev, msg,
5235 hint="cleanup manually the unused logical volumes")
5236 raise errors.OpExecError("Can't add local storage to drbd")
5238 dev.children = new_lvs
5239 cfg.Update(instance)
5241 # Step: wait for sync
5243 # this can fail as the old devices are degraded and _WaitForSync
5244 # does a combined result over all disks, so we don't check its
5246 self.proc.LogStep(5, steps_total, "sync devices")
5247 _WaitForSync(self, instance, unlock=True)
5249 # so check manually all the devices
5250 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5251 cfg.SetDiskID(dev, instance.primary_node)
5252 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5253 msg = result.RemoteFailMsg()
5254 if not msg and not result.payload:
5255 msg = "disk not found"
5257 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5259 if result.payload[5]:
5260 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5262 # Step: remove old storage
5263 self.proc.LogStep(6, steps_total, "removing old storage")
5264 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5265 info("remove logical volumes for %s" % name)
5267 cfg.SetDiskID(lv, tgt_node)
5268 msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5270 warning("Can't remove old LV: %s" % msg,
5271 hint="manually remove unused LVs")
5274 def _ExecD8Secondary(self, feedback_fn):
5275 """Replace the secondary node for drbd8.
5277 The algorithm for replace is quite complicated:
5278 - for all disks of the instance:
5279 - create new LVs on the new node with same names
5280 - shutdown the drbd device on the old secondary
5281 - disconnect the drbd network on the primary
5282 - create the drbd device on the new secondary
5283 - network attach the drbd on the primary, using an artifice:
5284 the drbd code for Attach() will connect to the network if it
5285 finds a device which is connected to the good local disks but
5287 - wait for sync across all devices
5288 - remove all disks from the old secondary
5290 Failures are not very well handled.
5294 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5295 instance = self.instance
5299 old_node = self.tgt_node
5300 new_node = self.new_node
5301 pri_node = instance.primary_node
5303 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5304 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5305 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5308 # Step: check device activation
5309 self.proc.LogStep(1, steps_total, "check device existence")
5310 info("checking volume groups")
5311 my_vg = cfg.GetVGName()
5312 results = self.rpc.call_vg_list([pri_node, new_node])
5313 for node in pri_node, new_node:
5315 if res.failed or not res.data or my_vg not in res.data:
5316 raise errors.OpExecError("Volume group '%s' not found on %s" %
5318 for idx, dev in enumerate(instance.disks):
5319 if idx not in self.op.disks:
5321 info("checking disk/%d on %s" % (idx, pri_node))
5322 cfg.SetDiskID(dev, pri_node)
5323 result = self.rpc.call_blockdev_find(pri_node, dev)
5324 msg = result.RemoteFailMsg()
5325 if not msg and not result.payload:
5326 msg = "disk not found"
5328 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5329 (idx, pri_node, msg))
5331 # Step: check other node consistency
5332 self.proc.LogStep(2, steps_total, "check peer consistency")
5333 for idx, dev in enumerate(instance.disks):
5334 if idx not in self.op.disks:
5336 info("checking disk/%d consistency on %s" % (idx, pri_node))
5337 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5338 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5339 " unsafe to replace the secondary" %
5342 # Step: create new storage
5343 self.proc.LogStep(3, steps_total, "allocate new storage")
5344 for idx, dev in enumerate(instance.disks):
5345 info("adding new local storage on %s for disk/%d" %
5347 # we pass force_create=True to force LVM creation
5348 for new_lv in dev.children:
5349 _CreateBlockDev(self, new_node, instance, new_lv, True,
5350 _GetInstanceInfoText(instance), False)
5352 # Step 4: dbrd minors and drbd setups changes
5353 # after this, we must manually remove the drbd minors on both the
5354 # error and the success paths
5355 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5357 logging.debug("Allocated minors %s" % (minors,))
5358 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5359 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5361 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5362 # create new devices on new_node; note that we create two IDs:
5363 # one without port, so the drbd will be activated without
5364 # networking information on the new node at this stage, and one
5365 # with network, for the latter activation in step 4
5366 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5367 if pri_node == o_node1:
5372 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5373 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5375 iv_names[idx] = (dev, dev.children, new_net_id)
5376 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5378 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5379 logical_id=new_alone_id,
5380 children=dev.children)
5382 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5383 _GetInstanceInfoText(instance), False)
5384 except errors.GenericError:
5385 self.cfg.ReleaseDRBDMinors(instance.name)
5388 for idx, dev in enumerate(instance.disks):
5389 # we have new devices, shutdown the drbd on the old secondary
5390 info("shutting down drbd for disk/%d on old node" % idx)
5391 cfg.SetDiskID(dev, old_node)
5392 msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5394 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5396 hint="Please cleanup this device manually as soon as possible")
5398 info("detaching primary drbds from the network (=> standalone)")
5399 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5400 instance.disks)[pri_node]
5402 msg = result.RemoteFailMsg()
5404 # detaches didn't succeed (unlikely)
5405 self.cfg.ReleaseDRBDMinors(instance.name)
5406 raise errors.OpExecError("Can't detach the disks from the network on"
5407 " old node: %s" % (msg,))
5409 # if we managed to detach at least one, we update all the disks of
5410 # the instance to point to the new secondary
5411 info("updating instance configuration")
5412 for dev, _, new_logical_id in iv_names.itervalues():
5413 dev.logical_id = new_logical_id
5414 cfg.SetDiskID(dev, pri_node)
5415 cfg.Update(instance)
5417 # and now perform the drbd attach
5418 info("attaching primary drbds to new secondary (standalone => connected)")
5419 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5420 instance.disks, instance.name,
5422 for to_node, to_result in result.items():
5423 msg = to_result.RemoteFailMsg()
5425 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5426 hint="please do a gnt-instance info to see the"
5429 # this can fail as the old devices are degraded and _WaitForSync
5430 # does a combined result over all disks, so we don't check its
5432 self.proc.LogStep(5, steps_total, "sync devices")
5433 _WaitForSync(self, instance, unlock=True)
5435 # so check manually all the devices
5436 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5437 cfg.SetDiskID(dev, pri_node)
5438 result = self.rpc.call_blockdev_find(pri_node, dev)
5439 msg = result.RemoteFailMsg()
5440 if not msg and not result.payload:
5441 msg = "disk not found"
5443 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5445 if result.payload[5]:
5446 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5448 self.proc.LogStep(6, steps_total, "removing old storage")
5449 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5450 info("remove logical volumes for disk/%d" % idx)
5452 cfg.SetDiskID(lv, old_node)
5453 msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5455 warning("Can't remove LV on old secondary: %s", msg,
5456 hint="Cleanup stale volumes by hand")
5458 def Exec(self, feedback_fn):
5459 """Execute disk replacement.
5461 This dispatches the disk replacement to the appropriate handler.
5464 instance = self.instance
5466 # Activate the instance disks if we're replacing them on a down instance
5467 if not instance.admin_up:
5468 _StartInstanceDisks(self, instance, True)
5470 if self.op.mode == constants.REPLACE_DISK_CHG:
5471 fn = self._ExecD8Secondary
5473 fn = self._ExecD8DiskOnly
5475 ret = fn(feedback_fn)
5477 # Deactivate the instance disks if we're replacing them on a down instance
5478 if not instance.admin_up:
5479 _SafeShutdownInstanceDisks(self, instance)
5484 class LUGrowDisk(LogicalUnit):
5485 """Grow a disk of an instance.
5489 HTYPE = constants.HTYPE_INSTANCE
5490 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5493 def ExpandNames(self):
5494 self._ExpandAndLockInstance()
5495 self.needed_locks[locking.LEVEL_NODE] = []
5496 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5498 def DeclareLocks(self, level):
5499 if level == locking.LEVEL_NODE:
5500 self._LockInstancesNodes()
5502 def BuildHooksEnv(self):
5505 This runs on the master, the primary and all the secondaries.
5509 "DISK": self.op.disk,
5510 "AMOUNT": self.op.amount,
5512 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5514 self.cfg.GetMasterNode(),
5515 self.instance.primary_node,
5519 def CheckPrereq(self):
5520 """Check prerequisites.
5522 This checks that the instance is in the cluster.
5525 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5526 assert instance is not None, \
5527 "Cannot retrieve locked instance %s" % self.op.instance_name
5528 nodenames = list(instance.all_nodes)
5529 for node in nodenames:
5530 _CheckNodeOnline(self, node)
5533 self.instance = instance
5535 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5536 raise errors.OpPrereqError("Instance's disk layout does not support"
5539 self.disk = instance.FindDisk(self.op.disk)
5541 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5542 instance.hypervisor)
5543 for node in nodenames:
5544 info = nodeinfo[node]
5545 if info.failed or not info.data:
5546 raise errors.OpPrereqError("Cannot get current information"
5547 " from node '%s'" % node)
5548 vg_free = info.data.get('vg_free', None)
5549 if not isinstance(vg_free, int):
5550 raise errors.OpPrereqError("Can't compute free disk space on"
5552 if self.op.amount > vg_free:
5553 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5554 " %d MiB available, %d MiB required" %
5555 (node, vg_free, self.op.amount))
5557 def Exec(self, feedback_fn):
5558 """Execute disk grow.
5561 instance = self.instance
5563 for node in instance.all_nodes:
5564 self.cfg.SetDiskID(disk, node)
5565 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5566 msg = result.RemoteFailMsg()
5568 raise errors.OpExecError("Grow request failed to node %s: %s" %
5570 disk.RecordGrow(self.op.amount)
5571 self.cfg.Update(instance)
5572 if self.op.wait_for_sync:
5573 disk_abort = not _WaitForSync(self, instance)
5575 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5576 " status.\nPlease check the instance.")
5579 class LUQueryInstanceData(NoHooksLU):
5580 """Query runtime instance data.
5583 _OP_REQP = ["instances", "static"]
5586 def ExpandNames(self):
5587 self.needed_locks = {}
5588 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5590 if not isinstance(self.op.instances, list):
5591 raise errors.OpPrereqError("Invalid argument type 'instances'")
5593 if self.op.instances:
5594 self.wanted_names = []
5595 for name in self.op.instances:
5596 full_name = self.cfg.ExpandInstanceName(name)
5597 if full_name is None:
5598 raise errors.OpPrereqError("Instance '%s' not known" % name)
5599 self.wanted_names.append(full_name)
5600 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5602 self.wanted_names = None
5603 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5605 self.needed_locks[locking.LEVEL_NODE] = []
5606 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5608 def DeclareLocks(self, level):
5609 if level == locking.LEVEL_NODE:
5610 self._LockInstancesNodes()
5612 def CheckPrereq(self):
5613 """Check prerequisites.
5615 This only checks the optional instance list against the existing names.
5618 if self.wanted_names is None:
5619 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5621 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5622 in self.wanted_names]
5625 def _ComputeDiskStatus(self, instance, snode, dev):
5626 """Compute block device status.
5629 static = self.op.static
5631 self.cfg.SetDiskID(dev, instance.primary_node)
5632 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5633 if dev_pstatus.offline:
5636 msg = dev_pstatus.RemoteFailMsg()
5638 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5639 (instance.name, msg))
5640 dev_pstatus = dev_pstatus.payload
5644 if dev.dev_type in constants.LDS_DRBD:
5645 # we change the snode then (otherwise we use the one passed in)
5646 if dev.logical_id[0] == instance.primary_node:
5647 snode = dev.logical_id[1]
5649 snode = dev.logical_id[0]
5651 if snode and not static:
5652 self.cfg.SetDiskID(dev, snode)
5653 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5654 if dev_sstatus.offline:
5657 msg = dev_sstatus.RemoteFailMsg()
5659 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5660 (instance.name, msg))
5661 dev_sstatus = dev_sstatus.payload
5666 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5667 for child in dev.children]
5672 "iv_name": dev.iv_name,
5673 "dev_type": dev.dev_type,
5674 "logical_id": dev.logical_id,
5675 "physical_id": dev.physical_id,
5676 "pstatus": dev_pstatus,
5677 "sstatus": dev_sstatus,
5678 "children": dev_children,
5684 def Exec(self, feedback_fn):
5685 """Gather and return data"""
5688 cluster = self.cfg.GetClusterInfo()
5690 for instance in self.wanted_instances:
5691 if not self.op.static:
5692 remote_info = self.rpc.call_instance_info(instance.primary_node,
5694 instance.hypervisor)
5696 remote_info = remote_info.data
5697 if remote_info and "state" in remote_info:
5700 remote_state = "down"
5703 if instance.admin_up:
5706 config_state = "down"
5708 disks = [self._ComputeDiskStatus(instance, None, device)
5709 for device in instance.disks]
5712 "name": instance.name,
5713 "config_state": config_state,
5714 "run_state": remote_state,
5715 "pnode": instance.primary_node,
5716 "snodes": instance.secondary_nodes,
5718 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5720 "hypervisor": instance.hypervisor,
5721 "network_port": instance.network_port,
5722 "hv_instance": instance.hvparams,
5723 "hv_actual": cluster.FillHV(instance),
5724 "be_instance": instance.beparams,
5725 "be_actual": cluster.FillBE(instance),
5728 result[instance.name] = idict
5733 class LUSetInstanceParams(LogicalUnit):
5734 """Modifies an instances's parameters.
5737 HPATH = "instance-modify"
5738 HTYPE = constants.HTYPE_INSTANCE
5739 _OP_REQP = ["instance_name"]
5742 def CheckArguments(self):
5743 if not hasattr(self.op, 'nics'):
5745 if not hasattr(self.op, 'disks'):
5747 if not hasattr(self.op, 'beparams'):
5748 self.op.beparams = {}
5749 if not hasattr(self.op, 'hvparams'):
5750 self.op.hvparams = {}
5751 self.op.force = getattr(self.op, "force", False)
5752 if not (self.op.nics or self.op.disks or
5753 self.op.hvparams or self.op.beparams):
5754 raise errors.OpPrereqError("No changes submitted")
5758 for disk_op, disk_dict in self.op.disks:
5759 if disk_op == constants.DDM_REMOVE:
5762 elif disk_op == constants.DDM_ADD:
5765 if not isinstance(disk_op, int):
5766 raise errors.OpPrereqError("Invalid disk index")
5767 if disk_op == constants.DDM_ADD:
5768 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5769 if mode not in constants.DISK_ACCESS_SET:
5770 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5771 size = disk_dict.get('size', None)
5773 raise errors.OpPrereqError("Required disk parameter size missing")
5776 except ValueError, err:
5777 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5779 disk_dict['size'] = size
5781 # modification of disk
5782 if 'size' in disk_dict:
5783 raise errors.OpPrereqError("Disk size change not possible, use"
5786 if disk_addremove > 1:
5787 raise errors.OpPrereqError("Only one disk add or remove operation"
5788 " supported at a time")
5792 for nic_op, nic_dict in self.op.nics:
5793 if nic_op == constants.DDM_REMOVE:
5796 elif nic_op == constants.DDM_ADD:
5799 if not isinstance(nic_op, int):
5800 raise errors.OpPrereqError("Invalid nic index")
5802 # nic_dict should be a dict
5803 nic_ip = nic_dict.get('ip', None)
5804 if nic_ip is not None:
5805 if nic_ip.lower() == constants.VALUE_NONE:
5806 nic_dict['ip'] = None
5808 if not utils.IsValidIP(nic_ip):
5809 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5811 if nic_op == constants.DDM_ADD:
5812 nic_bridge = nic_dict.get('bridge', None)
5813 if nic_bridge is None:
5814 nic_dict['bridge'] = self.cfg.GetDefBridge()
5815 nic_mac = nic_dict.get('mac', None)
5817 nic_dict['mac'] = constants.VALUE_AUTO
5819 if 'mac' in nic_dict:
5820 nic_mac = nic_dict['mac']
5821 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5822 if not utils.IsValidMac(nic_mac):
5823 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5824 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5825 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5826 " modifying an existing nic")
5828 if nic_addremove > 1:
5829 raise errors.OpPrereqError("Only one NIC add or remove operation"
5830 " supported at a time")
5832 def ExpandNames(self):
5833 self._ExpandAndLockInstance()
5834 self.needed_locks[locking.LEVEL_NODE] = []
5835 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5837 def DeclareLocks(self, level):
5838 if level == locking.LEVEL_NODE:
5839 self._LockInstancesNodes()
5841 def BuildHooksEnv(self):
5844 This runs on the master, primary and secondaries.
5848 if constants.BE_MEMORY in self.be_new:
5849 args['memory'] = self.be_new[constants.BE_MEMORY]
5850 if constants.BE_VCPUS in self.be_new:
5851 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5852 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5853 # information at all.
5856 nic_override = dict(self.op.nics)
5857 for idx, nic in enumerate(self.instance.nics):
5858 if idx in nic_override:
5859 this_nic_override = nic_override[idx]
5861 this_nic_override = {}
5862 if 'ip' in this_nic_override:
5863 ip = this_nic_override['ip']
5866 if 'bridge' in this_nic_override:
5867 bridge = this_nic_override['bridge']
5870 if 'mac' in this_nic_override:
5871 mac = this_nic_override['mac']
5874 args['nics'].append((ip, bridge, mac))
5875 if constants.DDM_ADD in nic_override:
5876 ip = nic_override[constants.DDM_ADD].get('ip', None)
5877 bridge = nic_override[constants.DDM_ADD]['bridge']
5878 mac = nic_override[constants.DDM_ADD]['mac']
5879 args['nics'].append((ip, bridge, mac))
5880 elif constants.DDM_REMOVE in nic_override:
5881 del args['nics'][-1]
5883 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5884 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5887 def CheckPrereq(self):
5888 """Check prerequisites.
5890 This only checks the instance list against the existing names.
5893 force = self.force = self.op.force
5895 # checking the new params on the primary/secondary nodes
5897 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5898 assert self.instance is not None, \
5899 "Cannot retrieve locked instance %s" % self.op.instance_name
5900 pnode = instance.primary_node
5901 nodelist = list(instance.all_nodes)
5903 # hvparams processing
5904 if self.op.hvparams:
5905 i_hvdict = copy.deepcopy(instance.hvparams)
5906 for key, val in self.op.hvparams.iteritems():
5907 if val == constants.VALUE_DEFAULT:
5914 cluster = self.cfg.GetClusterInfo()
5915 utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5916 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5919 hypervisor.GetHypervisor(
5920 instance.hypervisor).CheckParameterSyntax(hv_new)
5921 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5922 self.hv_new = hv_new # the new actual values
5923 self.hv_inst = i_hvdict # the new dict (without defaults)
5925 self.hv_new = self.hv_inst = {}
5927 # beparams processing
5928 if self.op.beparams:
5929 i_bedict = copy.deepcopy(instance.beparams)
5930 for key, val in self.op.beparams.iteritems():
5931 if val == constants.VALUE_DEFAULT:
5938 cluster = self.cfg.GetClusterInfo()
5939 utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5940 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5942 self.be_new = be_new # the new actual values
5943 self.be_inst = i_bedict # the new dict (without defaults)
5945 self.be_new = self.be_inst = {}
5949 if constants.BE_MEMORY in self.op.beparams and not self.force:
5950 mem_check_list = [pnode]
5951 if be_new[constants.BE_AUTO_BALANCE]:
5952 # either we changed auto_balance to yes or it was from before
5953 mem_check_list.extend(instance.secondary_nodes)
5954 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5955 instance.hypervisor)
5956 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5957 instance.hypervisor)
5958 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5959 # Assume the primary node is unreachable and go ahead
5960 self.warn.append("Can't get info from primary node %s" % pnode)
5962 if not instance_info.failed and instance_info.data:
5963 current_mem = int(instance_info.data['memory'])
5965 # Assume instance not running
5966 # (there is a slight race condition here, but it's not very probable,
5967 # and we have no other way to check)
5969 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5970 nodeinfo[pnode].data['memory_free'])
5972 raise errors.OpPrereqError("This change will prevent the instance"
5973 " from starting, due to %d MB of memory"
5974 " missing on its primary node" % miss_mem)
5976 if be_new[constants.BE_AUTO_BALANCE]:
5977 for node, nres in nodeinfo.iteritems():
5978 if node not in instance.secondary_nodes:
5980 if nres.failed or not isinstance(nres.data, dict):
5981 self.warn.append("Can't get info from secondary node %s" % node)
5982 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5983 self.warn.append("Not enough memory to failover instance to"
5984 " secondary node %s" % node)
5987 for nic_op, nic_dict in self.op.nics:
5988 if nic_op == constants.DDM_REMOVE:
5989 if not instance.nics:
5990 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5992 if nic_op != constants.DDM_ADD:
5994 if nic_op < 0 or nic_op >= len(instance.nics):
5995 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5997 (nic_op, len(instance.nics)))
5998 if 'bridge' in nic_dict:
5999 nic_bridge = nic_dict['bridge']
6000 if nic_bridge is None:
6001 raise errors.OpPrereqError('Cannot set the nic bridge to None')
6002 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6003 msg = ("Bridge '%s' doesn't exist on one of"
6004 " the instance nodes" % nic_bridge)
6006 self.warn.append(msg)
6008 raise errors.OpPrereqError(msg)
6009 if 'mac' in nic_dict:
6010 nic_mac = nic_dict['mac']
6012 raise errors.OpPrereqError('Cannot set the nic mac to None')
6013 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6014 # otherwise generate the mac
6015 nic_dict['mac'] = self.cfg.GenerateMAC()
6017 # or validate/reserve the current one
6018 if self.cfg.IsMacInUse(nic_mac):
6019 raise errors.OpPrereqError("MAC address %s already in use"
6020 " in cluster" % nic_mac)
6023 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6024 raise errors.OpPrereqError("Disk operations not supported for"
6025 " diskless instances")
6026 for disk_op, disk_dict in self.op.disks:
6027 if disk_op == constants.DDM_REMOVE:
6028 if len(instance.disks) == 1:
6029 raise errors.OpPrereqError("Cannot remove the last disk of"
6031 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6032 ins_l = ins_l[pnode]
6033 if ins_l.failed or not isinstance(ins_l.data, list):
6034 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6035 if instance.name in ins_l.data:
6036 raise errors.OpPrereqError("Instance is running, can't remove"
6039 if (disk_op == constants.DDM_ADD and
6040 len(instance.nics) >= constants.MAX_DISKS):
6041 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6042 " add more" % constants.MAX_DISKS)
6043 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6045 if disk_op < 0 or disk_op >= len(instance.disks):
6046 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6048 (disk_op, len(instance.disks)))
6052 def Exec(self, feedback_fn):
6053 """Modifies an instance.
6055 All parameters take effect only at the next restart of the instance.
6058 # Process here the warnings from CheckPrereq, as we don't have a
6059 # feedback_fn there.
6060 for warn in self.warn:
6061 feedback_fn("WARNING: %s" % warn)
6064 instance = self.instance
6066 for disk_op, disk_dict in self.op.disks:
6067 if disk_op == constants.DDM_REMOVE:
6068 # remove the last disk
6069 device = instance.disks.pop()
6070 device_idx = len(instance.disks)
6071 for node, disk in device.ComputeNodeTree(instance.primary_node):
6072 self.cfg.SetDiskID(disk, node)
6073 msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6075 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6076 " continuing anyway", device_idx, node, msg)
6077 result.append(("disk/%d" % device_idx, "remove"))
6078 elif disk_op == constants.DDM_ADD:
6080 if instance.disk_template == constants.DT_FILE:
6081 file_driver, file_path = instance.disks[0].logical_id
6082 file_path = os.path.dirname(file_path)
6084 file_driver = file_path = None
6085 disk_idx_base = len(instance.disks)
6086 new_disk = _GenerateDiskTemplate(self,
6087 instance.disk_template,
6088 instance.name, instance.primary_node,
6089 instance.secondary_nodes,
6094 instance.disks.append(new_disk)
6095 info = _GetInstanceInfoText(instance)
6097 logging.info("Creating volume %s for instance %s",
6098 new_disk.iv_name, instance.name)
6099 # Note: this needs to be kept in sync with _CreateDisks
6101 for node in instance.all_nodes:
6102 f_create = node == instance.primary_node
6104 _CreateBlockDev(self, node, instance, new_disk,
6105 f_create, info, f_create)
6106 except errors.OpExecError, err:
6107 self.LogWarning("Failed to create volume %s (%s) on"
6109 new_disk.iv_name, new_disk, node, err)
6110 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6111 (new_disk.size, new_disk.mode)))
6113 # change a given disk
6114 instance.disks[disk_op].mode = disk_dict['mode']
6115 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6117 for nic_op, nic_dict in self.op.nics:
6118 if nic_op == constants.DDM_REMOVE:
6119 # remove the last nic
6120 del instance.nics[-1]
6121 result.append(("nic.%d" % len(instance.nics), "remove"))
6122 elif nic_op == constants.DDM_ADD:
6123 # mac and bridge should be set, by now
6124 mac = nic_dict['mac']
6125 bridge = nic_dict['bridge']
6126 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6128 instance.nics.append(new_nic)
6129 result.append(("nic.%d" % (len(instance.nics) - 1),
6130 "add:mac=%s,ip=%s,bridge=%s" %
6131 (new_nic.mac, new_nic.ip, new_nic.bridge)))
6133 # change a given nic
6134 for key in 'mac', 'ip', 'bridge':
6136 setattr(instance.nics[nic_op], key, nic_dict[key])
6137 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6140 if self.op.hvparams:
6141 instance.hvparams = self.hv_inst
6142 for key, val in self.op.hvparams.iteritems():
6143 result.append(("hv/%s" % key, val))
6146 if self.op.beparams:
6147 instance.beparams = self.be_inst
6148 for key, val in self.op.beparams.iteritems():
6149 result.append(("be/%s" % key, val))
6151 self.cfg.Update(instance)
6156 class LUQueryExports(NoHooksLU):
6157 """Query the exports list
6160 _OP_REQP = ['nodes']
6163 def ExpandNames(self):
6164 self.needed_locks = {}
6165 self.share_locks[locking.LEVEL_NODE] = 1
6166 if not self.op.nodes:
6167 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6169 self.needed_locks[locking.LEVEL_NODE] = \
6170 _GetWantedNodes(self, self.op.nodes)
6172 def CheckPrereq(self):
6173 """Check prerequisites.
6176 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6178 def Exec(self, feedback_fn):
6179 """Compute the list of all the exported system images.
6182 @return: a dictionary with the structure node->(export-list)
6183 where export-list is a list of the instances exported on
6187 rpcresult = self.rpc.call_export_list(self.nodes)
6189 for node in rpcresult:
6190 if rpcresult[node].failed:
6191 result[node] = False
6193 result[node] = rpcresult[node].data
6198 class LUExportInstance(LogicalUnit):
6199 """Export an instance to an image in the cluster.
6202 HPATH = "instance-export"
6203 HTYPE = constants.HTYPE_INSTANCE
6204 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6207 def ExpandNames(self):
6208 self._ExpandAndLockInstance()
6209 # FIXME: lock only instance primary and destination node
6211 # Sad but true, for now we have do lock all nodes, as we don't know where
6212 # the previous export might be, and and in this LU we search for it and
6213 # remove it from its current node. In the future we could fix this by:
6214 # - making a tasklet to search (share-lock all), then create the new one,
6215 # then one to remove, after
6216 # - removing the removal operation altoghether
6217 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6219 def DeclareLocks(self, level):
6220 """Last minute lock declaration."""
6221 # All nodes are locked anyway, so nothing to do here.
6223 def BuildHooksEnv(self):
6226 This will run on the master, primary node and target node.
6230 "EXPORT_NODE": self.op.target_node,
6231 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6233 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6234 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6235 self.op.target_node]
6238 def CheckPrereq(self):
6239 """Check prerequisites.
6241 This checks that the instance and node names are valid.
6244 instance_name = self.op.instance_name
6245 self.instance = self.cfg.GetInstanceInfo(instance_name)
6246 assert self.instance is not None, \
6247 "Cannot retrieve locked instance %s" % self.op.instance_name
6248 _CheckNodeOnline(self, self.instance.primary_node)
6250 self.dst_node = self.cfg.GetNodeInfo(
6251 self.cfg.ExpandNodeName(self.op.target_node))
6253 if self.dst_node is None:
6254 # This is wrong node name, not a non-locked node
6255 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6256 _CheckNodeOnline(self, self.dst_node.name)
6257 _CheckNodeNotDrained(self, self.dst_node.name)
6259 # instance disk type verification
6260 for disk in self.instance.disks:
6261 if disk.dev_type == constants.LD_FILE:
6262 raise errors.OpPrereqError("Export not supported for instances with"
6263 " file-based disks")
6265 def Exec(self, feedback_fn):
6266 """Export an instance to an image in the cluster.
6269 instance = self.instance
6270 dst_node = self.dst_node
6271 src_node = instance.primary_node
6272 if self.op.shutdown:
6273 # shutdown the instance, but not the disks
6274 result = self.rpc.call_instance_shutdown(src_node, instance)
6275 msg = result.RemoteFailMsg()
6277 raise errors.OpExecError("Could not shutdown instance %s on"
6279 (instance.name, src_node, msg))
6281 vgname = self.cfg.GetVGName()
6285 # set the disks ID correctly since call_instance_start needs the
6286 # correct drbd minor to create the symlinks
6287 for disk in instance.disks:
6288 self.cfg.SetDiskID(disk, src_node)
6291 for disk in instance.disks:
6292 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6293 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6294 if new_dev_name.failed or not new_dev_name.data:
6295 self.LogWarning("Could not snapshot block device %s on node %s",
6296 disk.logical_id[1], src_node)
6297 snap_disks.append(False)
6299 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6300 logical_id=(vgname, new_dev_name.data),
6301 physical_id=(vgname, new_dev_name.data),
6302 iv_name=disk.iv_name)
6303 snap_disks.append(new_dev)
6306 if self.op.shutdown and instance.admin_up:
6307 result = self.rpc.call_instance_start(src_node, instance, None, None)
6308 msg = result.RemoteFailMsg()
6310 _ShutdownInstanceDisks(self, instance)
6311 raise errors.OpExecError("Could not start instance: %s" % msg)
6313 # TODO: check for size
6315 cluster_name = self.cfg.GetClusterName()
6316 for idx, dev in enumerate(snap_disks):
6318 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6319 instance, cluster_name, idx)
6320 if result.failed or not result.data:
6321 self.LogWarning("Could not export block device %s from node %s to"
6322 " node %s", dev.logical_id[1], src_node,
6324 msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6326 self.LogWarning("Could not remove snapshot block device %s from node"
6327 " %s: %s", dev.logical_id[1], src_node, msg)
6329 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6330 if result.failed or not result.data:
6331 self.LogWarning("Could not finalize export for instance %s on node %s",
6332 instance.name, dst_node.name)
6334 nodelist = self.cfg.GetNodeList()
6335 nodelist.remove(dst_node.name)
6337 # on one-node clusters nodelist will be empty after the removal
6338 # if we proceed the backup would be removed because OpQueryExports
6339 # substitutes an empty list with the full cluster node list.
6341 exportlist = self.rpc.call_export_list(nodelist)
6342 for node in exportlist:
6343 if exportlist[node].failed:
6345 if instance.name in exportlist[node].data:
6346 if not self.rpc.call_export_remove(node, instance.name):
6347 self.LogWarning("Could not remove older export for instance %s"
6348 " on node %s", instance.name, node)
6351 class LURemoveExport(NoHooksLU):
6352 """Remove exports related to the named instance.
6355 _OP_REQP = ["instance_name"]
6358 def ExpandNames(self):
6359 self.needed_locks = {}
6360 # We need all nodes to be locked in order for RemoveExport to work, but we
6361 # don't need to lock the instance itself, as nothing will happen to it (and
6362 # we can remove exports also for a removed instance)
6363 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6365 def CheckPrereq(self):
6366 """Check prerequisites.
6370 def Exec(self, feedback_fn):
6371 """Remove any export.
6374 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6375 # If the instance was not found we'll try with the name that was passed in.
6376 # This will only work if it was an FQDN, though.
6378 if not instance_name:
6380 instance_name = self.op.instance_name
6382 exportlist = self.rpc.call_export_list(self.acquired_locks[
6383 locking.LEVEL_NODE])
6385 for node in exportlist:
6386 if exportlist[node].failed:
6387 self.LogWarning("Failed to query node %s, continuing" % node)
6389 if instance_name in exportlist[node].data:
6391 result = self.rpc.call_export_remove(node, instance_name)
6392 if result.failed or not result.data:
6393 logging.error("Could not remove export for instance %s"
6394 " on node %s", instance_name, node)
6396 if fqdn_warn and not found:
6397 feedback_fn("Export not found. If trying to remove an export belonging"
6398 " to a deleted instance please use its Fully Qualified"
6402 class TagsLU(NoHooksLU):
6405 This is an abstract class which is the parent of all the other tags LUs.
6409 def ExpandNames(self):
6410 self.needed_locks = {}
6411 if self.op.kind == constants.TAG_NODE:
6412 name = self.cfg.ExpandNodeName(self.op.name)
6414 raise errors.OpPrereqError("Invalid node name (%s)" %
6417 self.needed_locks[locking.LEVEL_NODE] = name
6418 elif self.op.kind == constants.TAG_INSTANCE:
6419 name = self.cfg.ExpandInstanceName(self.op.name)
6421 raise errors.OpPrereqError("Invalid instance name (%s)" %
6424 self.needed_locks[locking.LEVEL_INSTANCE] = name
6426 def CheckPrereq(self):
6427 """Check prerequisites.
6430 if self.op.kind == constants.TAG_CLUSTER:
6431 self.target = self.cfg.GetClusterInfo()
6432 elif self.op.kind == constants.TAG_NODE:
6433 self.target = self.cfg.GetNodeInfo(self.op.name)
6434 elif self.op.kind == constants.TAG_INSTANCE:
6435 self.target = self.cfg.GetInstanceInfo(self.op.name)
6437 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6441 class LUGetTags(TagsLU):
6442 """Returns the tags of a given object.
6445 _OP_REQP = ["kind", "name"]
6448 def Exec(self, feedback_fn):
6449 """Returns the tag list.
6452 return list(self.target.GetTags())
6455 class LUSearchTags(NoHooksLU):
6456 """Searches the tags for a given pattern.
6459 _OP_REQP = ["pattern"]
6462 def ExpandNames(self):
6463 self.needed_locks = {}
6465 def CheckPrereq(self):
6466 """Check prerequisites.
6468 This checks the pattern passed for validity by compiling it.
6472 self.re = re.compile(self.op.pattern)
6473 except re.error, err:
6474 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6475 (self.op.pattern, err))
6477 def Exec(self, feedback_fn):
6478 """Returns the tag list.
6482 tgts = [("/cluster", cfg.GetClusterInfo())]
6483 ilist = cfg.GetAllInstancesInfo().values()
6484 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6485 nlist = cfg.GetAllNodesInfo().values()
6486 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6488 for path, target in tgts:
6489 for tag in target.GetTags():
6490 if self.re.search(tag):
6491 results.append((path, tag))
6495 class LUAddTags(TagsLU):
6496 """Sets a tag on a given object.
6499 _OP_REQP = ["kind", "name", "tags"]
6502 def CheckPrereq(self):
6503 """Check prerequisites.
6505 This checks the type and length of the tag name and value.
6508 TagsLU.CheckPrereq(self)
6509 for tag in self.op.tags:
6510 objects.TaggableObject.ValidateTag(tag)
6512 def Exec(self, feedback_fn):
6517 for tag in self.op.tags:
6518 self.target.AddTag(tag)
6519 except errors.TagError, err:
6520 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6522 self.cfg.Update(self.target)
6523 except errors.ConfigurationError:
6524 raise errors.OpRetryError("There has been a modification to the"
6525 " config file and the operation has been"
6526 " aborted. Please retry.")
6529 class LUDelTags(TagsLU):
6530 """Delete a list of tags from a given object.
6533 _OP_REQP = ["kind", "name", "tags"]
6536 def CheckPrereq(self):
6537 """Check prerequisites.
6539 This checks that we have the given tag.
6542 TagsLU.CheckPrereq(self)
6543 for tag in self.op.tags:
6544 objects.TaggableObject.ValidateTag(tag)
6545 del_tags = frozenset(self.op.tags)
6546 cur_tags = self.target.GetTags()
6547 if not del_tags <= cur_tags:
6548 diff_tags = del_tags - cur_tags
6549 diff_names = ["'%s'" % tag for tag in diff_tags]
6551 raise errors.OpPrereqError("Tag(s) %s not found" %
6552 (",".join(diff_names)))
6554 def Exec(self, feedback_fn):
6555 """Remove the tag from the object.
6558 for tag in self.op.tags:
6559 self.target.RemoveTag(tag)
6561 self.cfg.Update(self.target)
6562 except errors.ConfigurationError:
6563 raise errors.OpRetryError("There has been a modification to the"
6564 " config file and the operation has been"
6565 " aborted. Please retry.")
6568 class LUTestDelay(NoHooksLU):
6569 """Sleep for a specified amount of time.
6571 This LU sleeps on the master and/or nodes for a specified amount of
6575 _OP_REQP = ["duration", "on_master", "on_nodes"]
6578 def ExpandNames(self):
6579 """Expand names and set required locks.
6581 This expands the node list, if any.
6584 self.needed_locks = {}
6585 if self.op.on_nodes:
6586 # _GetWantedNodes can be used here, but is not always appropriate to use
6587 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6589 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6590 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6592 def CheckPrereq(self):
6593 """Check prerequisites.
6597 def Exec(self, feedback_fn):
6598 """Do the actual sleep.
6601 if self.op.on_master:
6602 if not utils.TestDelay(self.op.duration):
6603 raise errors.OpExecError("Error during master delay test")
6604 if self.op.on_nodes:
6605 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6607 raise errors.OpExecError("Complete failure from rpc call")
6608 for node, node_result in result.items():
6610 if not node_result.data:
6611 raise errors.OpExecError("Failure during rpc call to node %s,"
6612 " result: %s" % (node, node_result.data))
6615 class IAllocator(object):
6616 """IAllocator framework.
6618 An IAllocator instance has three sets of attributes:
6619 - cfg that is needed to query the cluster
6620 - input data (all members of the _KEYS class attribute are required)
6621 - four buffer attributes (in|out_data|text), that represent the
6622 input (to the external script) in text and data structure format,
6623 and the output from it, again in two formats
6624 - the result variables from the script (success, info, nodes) for
6629 "mem_size", "disks", "disk_template",
6630 "os", "tags", "nics", "vcpus", "hypervisor",
6636 def __init__(self, lu, mode, name, **kwargs):
6638 # init buffer variables
6639 self.in_text = self.out_text = self.in_data = self.out_data = None
6640 # init all input fields so that pylint is happy
6643 self.mem_size = self.disks = self.disk_template = None
6644 self.os = self.tags = self.nics = self.vcpus = None
6645 self.hypervisor = None
6646 self.relocate_from = None
6648 self.required_nodes = None
6649 # init result fields
6650 self.success = self.info = self.nodes = None
6651 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6652 keyset = self._ALLO_KEYS
6653 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6654 keyset = self._RELO_KEYS
6656 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6657 " IAllocator" % self.mode)
6659 if key not in keyset:
6660 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6661 " IAllocator" % key)
6662 setattr(self, key, kwargs[key])
6664 if key not in kwargs:
6665 raise errors.ProgrammerError("Missing input parameter '%s' to"
6666 " IAllocator" % key)
6667 self._BuildInputData()
6669 def _ComputeClusterData(self):
6670 """Compute the generic allocator input data.
6672 This is the data that is independent of the actual operation.
6676 cluster_info = cfg.GetClusterInfo()
6679 "version": constants.IALLOCATOR_VERSION,
6680 "cluster_name": cfg.GetClusterName(),
6681 "cluster_tags": list(cluster_info.GetTags()),
6682 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6683 # we don't have job IDs
6685 iinfo = cfg.GetAllInstancesInfo().values()
6686 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6690 node_list = cfg.GetNodeList()
6692 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6693 hypervisor_name = self.hypervisor
6694 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6695 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6697 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6699 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6700 cluster_info.enabled_hypervisors)
6701 for nname, nresult in node_data.items():
6702 # first fill in static (config-based) values
6703 ninfo = cfg.GetNodeInfo(nname)
6705 "tags": list(ninfo.GetTags()),
6706 "primary_ip": ninfo.primary_ip,
6707 "secondary_ip": ninfo.secondary_ip,
6708 "offline": ninfo.offline,
6709 "drained": ninfo.drained,
6710 "master_candidate": ninfo.master_candidate,
6713 if not ninfo.offline:
6715 if not isinstance(nresult.data, dict):
6716 raise errors.OpExecError("Can't get data for node %s" % nname)
6717 remote_info = nresult.data
6718 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6719 'vg_size', 'vg_free', 'cpu_total']:
6720 if attr not in remote_info:
6721 raise errors.OpExecError("Node '%s' didn't return attribute"
6722 " '%s'" % (nname, attr))
6724 remote_info[attr] = int(remote_info[attr])
6725 except ValueError, err:
6726 raise errors.OpExecError("Node '%s' returned invalid value"
6727 " for '%s': %s" % (nname, attr, err))
6728 # compute memory used by primary instances
6729 i_p_mem = i_p_up_mem = 0
6730 for iinfo, beinfo in i_list:
6731 if iinfo.primary_node == nname:
6732 i_p_mem += beinfo[constants.BE_MEMORY]
6733 if iinfo.name not in node_iinfo[nname].data:
6736 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6737 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6738 remote_info['memory_free'] -= max(0, i_mem_diff)
6741 i_p_up_mem += beinfo[constants.BE_MEMORY]
6743 # compute memory used by instances
6745 "total_memory": remote_info['memory_total'],
6746 "reserved_memory": remote_info['memory_dom0'],
6747 "free_memory": remote_info['memory_free'],
6748 "total_disk": remote_info['vg_size'],
6749 "free_disk": remote_info['vg_free'],
6750 "total_cpus": remote_info['cpu_total'],
6751 "i_pri_memory": i_p_mem,
6752 "i_pri_up_memory": i_p_up_mem,
6756 node_results[nname] = pnr
6757 data["nodes"] = node_results
6761 for iinfo, beinfo in i_list:
6762 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6763 for n in iinfo.nics]
6765 "tags": list(iinfo.GetTags()),
6766 "admin_up": iinfo.admin_up,
6767 "vcpus": beinfo[constants.BE_VCPUS],
6768 "memory": beinfo[constants.BE_MEMORY],
6770 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6772 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6773 "disk_template": iinfo.disk_template,
6774 "hypervisor": iinfo.hypervisor,
6776 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6778 instance_data[iinfo.name] = pir
6780 data["instances"] = instance_data
6784 def _AddNewInstance(self):
6785 """Add new instance data to allocator structure.
6787 This in combination with _AllocatorGetClusterData will create the
6788 correct structure needed as input for the allocator.
6790 The checks for the completeness of the opcode must have already been
6796 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6798 if self.disk_template in constants.DTS_NET_MIRROR:
6799 self.required_nodes = 2
6801 self.required_nodes = 1
6805 "disk_template": self.disk_template,
6808 "vcpus": self.vcpus,
6809 "memory": self.mem_size,
6810 "disks": self.disks,
6811 "disk_space_total": disk_space,
6813 "required_nodes": self.required_nodes,
6815 data["request"] = request
6817 def _AddRelocateInstance(self):
6818 """Add relocate instance data to allocator structure.
6820 This in combination with _IAllocatorGetClusterData will create the
6821 correct structure needed as input for the allocator.
6823 The checks for the completeness of the opcode must have already been
6827 instance = self.lu.cfg.GetInstanceInfo(self.name)
6828 if instance is None:
6829 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6830 " IAllocator" % self.name)
6832 if instance.disk_template not in constants.DTS_NET_MIRROR:
6833 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6835 if len(instance.secondary_nodes) != 1:
6836 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6838 self.required_nodes = 1
6839 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6840 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6845 "disk_space_total": disk_space,
6846 "required_nodes": self.required_nodes,
6847 "relocate_from": self.relocate_from,
6849 self.in_data["request"] = request
6851 def _BuildInputData(self):
6852 """Build input data structures.
6855 self._ComputeClusterData()
6857 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6858 self._AddNewInstance()
6860 self._AddRelocateInstance()
6862 self.in_text = serializer.Dump(self.in_data)
6864 def Run(self, name, validate=True, call_fn=None):
6865 """Run an instance allocator and return the results.
6869 call_fn = self.lu.rpc.call_iallocator_runner
6872 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6875 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6876 raise errors.OpExecError("Invalid result from master iallocator runner")
6878 rcode, stdout, stderr, fail = result.data
6880 if rcode == constants.IARUN_NOTFOUND:
6881 raise errors.OpExecError("Can't find allocator '%s'" % name)
6882 elif rcode == constants.IARUN_FAILURE:
6883 raise errors.OpExecError("Instance allocator call failed: %s,"
6884 " output: %s" % (fail, stdout+stderr))
6885 self.out_text = stdout
6887 self._ValidateResult()
6889 def _ValidateResult(self):
6890 """Process the allocator results.
6892 This will process and if successful save the result in
6893 self.out_data and the other parameters.
6897 rdict = serializer.Load(self.out_text)
6898 except Exception, err:
6899 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6901 if not isinstance(rdict, dict):
6902 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6904 for key in "success", "info", "nodes":
6905 if key not in rdict:
6906 raise errors.OpExecError("Can't parse iallocator results:"
6907 " missing key '%s'" % key)
6908 setattr(self, key, rdict[key])
6910 if not isinstance(rdict["nodes"], list):
6911 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6913 self.out_data = rdict
6916 class LUTestAllocator(NoHooksLU):
6917 """Run allocator tests.
6919 This LU runs the allocator tests
6922 _OP_REQP = ["direction", "mode", "name"]
6924 def CheckPrereq(self):
6925 """Check prerequisites.
6927 This checks the opcode parameters depending on the director and mode test.
6930 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6931 for attr in ["name", "mem_size", "disks", "disk_template",
6932 "os", "tags", "nics", "vcpus"]:
6933 if not hasattr(self.op, attr):
6934 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6936 iname = self.cfg.ExpandInstanceName(self.op.name)
6937 if iname is not None:
6938 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6940 if not isinstance(self.op.nics, list):
6941 raise errors.OpPrereqError("Invalid parameter 'nics'")
6942 for row in self.op.nics:
6943 if (not isinstance(row, dict) or
6946 "bridge" not in row):
6947 raise errors.OpPrereqError("Invalid contents of the"
6948 " 'nics' parameter")
6949 if not isinstance(self.op.disks, list):
6950 raise errors.OpPrereqError("Invalid parameter 'disks'")
6951 for row in self.op.disks:
6952 if (not isinstance(row, dict) or
6953 "size" not in row or
6954 not isinstance(row["size"], int) or
6955 "mode" not in row or
6956 row["mode"] not in ['r', 'w']):
6957 raise errors.OpPrereqError("Invalid contents of the"
6958 " 'disks' parameter")
6959 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6960 self.op.hypervisor = self.cfg.GetHypervisorType()
6961 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6962 if not hasattr(self.op, "name"):
6963 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6964 fname = self.cfg.ExpandInstanceName(self.op.name)
6966 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6968 self.op.name = fname
6969 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6971 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6974 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6975 if not hasattr(self.op, "allocator") or self.op.allocator is None:
6976 raise errors.OpPrereqError("Missing allocator name")
6977 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6978 raise errors.OpPrereqError("Wrong allocator test '%s'" %
6981 def Exec(self, feedback_fn):
6982 """Run the allocator test.
6985 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6986 ial = IAllocator(self,
6989 mem_size=self.op.mem_size,
6990 disks=self.op.disks,
6991 disk_template=self.op.disk_template,
6995 vcpus=self.op.vcpus,
6996 hypervisor=self.op.hypervisor,
6999 ial = IAllocator(self,
7002 relocate_from=list(self.relocate_from),
7005 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7006 result = ial.in_text
7008 ial.Run(self.op.allocator, validate=False)
7009 result = ial.out_text