4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 for attr_name in self._OP_REQP:
94 attr_val = getattr(op, attr_name, None)
96 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108 ssh = property(fget=__GetSSH)
110 def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments.
113 This method is for doing a simple syntactic check and ensure
114 validity of opcode parameters, without any cluster-related
115 checks. While the same can be accomplished in ExpandNames and/or
116 CheckPrereq, doing these separate is better because:
118 - ExpandNames is left as as purely a lock-related function
119 - CheckPrereq is run after we have aquired locks (and possible
122 The function is allowed to change the self.op attribute so that
123 later methods can no longer worry about missing parameters.
128 def ExpandNames(self):
129 """Expand names for this LU.
131 This method is called before starting to execute the opcode, and it should
132 update all the parameters of the opcode to their canonical form (e.g. a
133 short node name must be fully expanded after this method has successfully
134 completed). This way locking, hooks, logging, ecc. can work correctly.
136 LUs which implement this method must also populate the self.needed_locks
137 member, as a dict with lock levels as keys, and a list of needed lock names
140 - use an empty dict if you don't need any lock
141 - if you don't need any lock at a particular level omit that level
142 - don't put anything for the BGL level
143 - if you want all locks at a level use locking.ALL_SET as a value
145 If you need to share locks (rather than acquire them exclusively) at one
146 level you can modify self.share_locks, setting a true value (usually 1) for
147 that level. By default locks are not shared.
151 # Acquire all nodes and one instance
152 self.needed_locks = {
153 locking.LEVEL_NODE: locking.ALL_SET,
154 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156 # Acquire just two nodes
157 self.needed_locks = {
158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161 self.needed_locks = {} # No, you can't leave it to the default value None
164 # The implementation of this method is mandatory only if the new LU is
165 # concurrent, so that old LUs don't need to be changed all at the same
168 self.needed_locks = {} # Exclusive LUs don't need locks.
170 raise NotImplementedError
172 def DeclareLocks(self, level):
173 """Declare LU locking needs for a level
175 While most LUs can just declare their locking needs at ExpandNames time,
176 sometimes there's the need to calculate some locks after having acquired
177 the ones before. This function is called just before acquiring locks at a
178 particular level, but after acquiring the ones at lower levels, and permits
179 such calculations. It can be used to modify self.needed_locks, and by
180 default it does nothing.
182 This function is only called if you have something already set in
183 self.needed_locks for the level.
185 @param level: Locking level which is going to be locked
186 @type level: member of ganeti.locking.LEVELS
190 def CheckPrereq(self):
191 """Check prerequisites for this LU.
193 This method should check that the prerequisites for the execution
194 of this LU are fulfilled. It can do internode communication, but
195 it should be idempotent - no cluster or system changes are
198 The method should raise errors.OpPrereqError in case something is
199 not fulfilled. Its return value is ignored.
201 This method should also update all the parameters of the opcode to
202 their canonical form if it hasn't been done by ExpandNames before.
205 raise NotImplementedError
207 def Exec(self, feedback_fn):
210 This method should implement the actual work. It should raise
211 errors.OpExecError for failures that are somewhat dealt with in
215 raise NotImplementedError
217 def BuildHooksEnv(self):
218 """Build hooks environment for this LU.
220 This method should return a three-node tuple consisting of: a dict
221 containing the environment that will be used for running the
222 specific hook for this LU, a list of node names on which the hook
223 should run before the execution, and a list of node names on which
224 the hook should run after the execution.
226 The keys of the dict must not have 'GANETI_' prefixed as this will
227 be handled in the hooks runner. Also note additional keys will be
228 added by the hooks runner. If the LU doesn't define any
229 environment, an empty dict (and not None) should be returned.
231 No nodes should be returned as an empty list (and not None).
233 Note that if the HPATH for a LU class is None, this function will
237 raise NotImplementedError
239 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks.
242 This method is called every time a hooks phase is executed, and notifies
243 the Logical Unit about the hooks' result. The LU can then use it to alter
244 its result based on the hooks. By default the method does nothing and the
245 previous result is passed back unchanged but any LU can define it if it
246 wants to use the local cluster hook-scripts somehow.
248 @param phase: one of L{constants.HOOKS_PHASE_POST} or
249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250 @param hook_results: the results of the multi-node hooks rpc call
251 @param feedback_fn: function used send feedback back to the caller
252 @param lu_result: the previous Exec result this LU had, or None
254 @return: the new Exec result, based on the previous result
260 def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance.
263 Many LUs that work on an instance take its name in self.op.instance_name
264 and need to expand it and then declare the expanded name for locking. This
265 function does it, and then updates self.op.instance_name to the expanded
266 name. It also initializes needed_locks as a dict, if this hasn't been done
270 if self.needed_locks is None:
271 self.needed_locks = {}
273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274 "_ExpandAndLockInstance called with instance-level locks set"
275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276 if expanded_name is None:
277 raise errors.OpPrereqError("Instance '%s' not known" %
278 self.op.instance_name)
279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280 self.op.instance_name = expanded_name
282 def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking.
285 This function should be called after locking one or more instances to lock
286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287 with all primary or secondary nodes for instances already locked and
288 present in self.needed_locks[locking.LEVEL_INSTANCE].
290 It should be called from DeclareLocks, and for safety only works if
291 self.recalculate_locks[locking.LEVEL_NODE] is set.
293 In the future it may grow parameters to just lock some instance's nodes, or
294 to just lock primaries or secondary nodes, if needed.
296 If should be called in DeclareLocks in a way similar to::
298 if level == locking.LEVEL_NODE:
299 self._LockInstancesNodes()
301 @type primary_only: boolean
302 @param primary_only: only lock primary nodes of locked instances
305 assert locking.LEVEL_NODE in self.recalculate_locks, \
306 "_LockInstancesNodes helper function called with no nodes to recalculate"
308 # TODO: check if we're really been called with the instance locks held
310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311 # future we might want to have different behaviors depending on the value
312 # of self.recalculate_locks[locking.LEVEL_NODE]
314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315 instance = self.context.cfg.GetInstanceInfo(instance_name)
316 wanted_nodes.append(instance.primary_node)
318 wanted_nodes.extend(instance.secondary_nodes)
320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325 del self.recalculate_locks[locking.LEVEL_NODE]
328 class NoHooksLU(LogicalUnit):
329 """Simple LU which runs no hooks.
331 This LU is intended as a parent for other LogicalUnits which will
332 run no hooks, in order to reduce duplicate code.
339 def _GetWantedNodes(lu, nodes):
340 """Returns list of checked and expanded node names.
342 @type lu: L{LogicalUnit}
343 @param lu: the logical unit on whose behalf we execute
345 @param nodes: list of node names or None for all nodes
347 @return: the list of nodes, sorted
348 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351 if not isinstance(nodes, list):
352 raise errors.OpPrereqError("Invalid argument type 'nodes'")
355 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356 " non-empty list of nodes whose name is to be expanded.")
360 node = lu.cfg.ExpandNodeName(name)
362 raise errors.OpPrereqError("No such node name '%s'" % name)
365 return utils.NiceSort(wanted)
368 def _GetWantedInstances(lu, instances):
369 """Returns list of checked and expanded instance names.
371 @type lu: L{LogicalUnit}
372 @param lu: the logical unit on whose behalf we execute
373 @type instances: list
374 @param instances: list of instance names or None for all instances
376 @return: the list of instances, sorted
377 @raise errors.OpPrereqError: if the instances parameter is wrong type
378 @raise errors.OpPrereqError: if any of the passed instances is not found
381 if not isinstance(instances, list):
382 raise errors.OpPrereqError("Invalid argument type 'instances'")
387 for name in instances:
388 instance = lu.cfg.ExpandInstanceName(name)
390 raise errors.OpPrereqError("No such instance name '%s'" % name)
391 wanted.append(instance)
394 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398 def _CheckOutputFields(static, dynamic, selected):
399 """Checks whether all selected fields are valid.
401 @type static: L{utils.FieldSet}
402 @param static: static fields set
403 @type dynamic: L{utils.FieldSet}
404 @param dynamic: dynamic fields set
411 delta = f.NonMatching(selected)
413 raise errors.OpPrereqError("Unknown output fields selected: %s"
417 def _CheckBooleanOpField(op, name):
418 """Validates boolean opcode parameters.
420 This will ensure that an opcode parameter is either a boolean value,
421 or None (but that it always exists).
424 val = getattr(op, name, None)
425 if not (val is None or isinstance(val, bool)):
426 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428 setattr(op, name, val)
431 def _CheckNodeOnline(lu, node):
432 """Ensure that a given node is online.
434 @param lu: the LU on behalf of which we make the check
435 @param node: the node to check
436 @raise errors.OpPrereqError: if the node is offline
439 if lu.cfg.GetNodeInfo(node).offline:
440 raise errors.OpPrereqError("Can't use offline node %s" % node)
443 def _CheckNodeNotDrained(lu, node):
444 """Ensure that a given node is not drained.
446 @param lu: the LU on behalf of which we make the check
447 @param node: the node to check
448 @raise errors.OpPrereqError: if the node is drained
451 if lu.cfg.GetNodeInfo(node).drained:
452 raise errors.OpPrereqError("Can't use drained node %s" % node)
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456 memory, vcpus, nics, disk_template, disks,
457 bep, hvp, hypervisor):
458 """Builds instance related env variables for hooks
460 This builds the hook environment from individual variables.
463 @param name: the name of the instance
464 @type primary_node: string
465 @param primary_node: the name of the instance's primary node
466 @type secondary_nodes: list
467 @param secondary_nodes: list of secondary nodes as strings
468 @type os_type: string
469 @param os_type: the name of the instance's OS
470 @type status: boolean
471 @param status: the should_run status of the instance
473 @param memory: the memory size of the instance
475 @param vcpus: the count of VCPUs the instance has
477 @param nics: list of tuples (ip, bridge, mac) representing
478 the NICs the instance has
479 @type disk_template: string
480 @param disk_template: the distk template of the instance
482 @param disks: the list of (size, mode) pairs
484 @param bep: the backend parameters for the instance
486 @param hvp: the hypervisor parameters for the instance
487 @type hypervisor: string
488 @param hypervisor: the hypervisor for the instance
490 @return: the hook environment for this instance
499 "INSTANCE_NAME": name,
500 "INSTANCE_PRIMARY": primary_node,
501 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502 "INSTANCE_OS_TYPE": os_type,
503 "INSTANCE_STATUS": str_status,
504 "INSTANCE_MEMORY": memory,
505 "INSTANCE_VCPUS": vcpus,
506 "INSTANCE_DISK_TEMPLATE": disk_template,
507 "INSTANCE_HYPERVISOR": hypervisor,
511 nic_count = len(nics)
512 for idx, (ip, bridge, mac) in enumerate(nics):
515 env["INSTANCE_NIC%d_IP" % idx] = ip
516 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517 env["INSTANCE_NIC%d_MAC" % idx] = mac
521 env["INSTANCE_NIC_COUNT"] = nic_count
524 disk_count = len(disks)
525 for idx, (size, mode) in enumerate(disks):
526 env["INSTANCE_DISK%d_SIZE" % idx] = size
527 env["INSTANCE_DISK%d_MODE" % idx] = mode
531 env["INSTANCE_DISK_COUNT"] = disk_count
533 for source, kind in [(bep, "BE"), (hvp, "HV")]:
534 for key, value in source.items():
535 env["INSTANCE_%s_%s" % (kind, key)] = value
540 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541 """Builds instance related env variables for hooks from an object.
543 @type lu: L{LogicalUnit}
544 @param lu: the logical unit on whose behalf we execute
545 @type instance: L{objects.Instance}
546 @param instance: the instance for which we should build the
549 @param override: dictionary with key/values that will override
552 @return: the hook environment dictionary
555 cluster = lu.cfg.GetClusterInfo()
556 bep = cluster.FillBE(instance)
557 hvp = cluster.FillHV(instance)
559 'name': instance.name,
560 'primary_node': instance.primary_node,
561 'secondary_nodes': instance.secondary_nodes,
562 'os_type': instance.os,
563 'status': instance.admin_up,
564 'memory': bep[constants.BE_MEMORY],
565 'vcpus': bep[constants.BE_VCPUS],
566 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567 'disk_template': instance.disk_template,
568 'disks': [(disk.size, disk.mode) for disk in instance.disks],
571 'hypervisor': instance.hypervisor,
574 args.update(override)
575 return _BuildInstanceHookEnv(**args)
578 def _AdjustCandidatePool(lu):
579 """Adjust the candidate pool after node operations.
582 mod_list = lu.cfg.MaintainCandidatePool()
584 lu.LogInfo("Promoted nodes to master candidate role: %s",
585 ", ".join(node.name for node in mod_list))
586 for name in mod_list:
587 lu.context.ReaddNode(name)
588 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
590 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
594 def _CheckInstanceBridgesExist(lu, instance):
595 """Check that the brigdes needed by an instance exist.
598 # check bridges existance
599 brlist = [nic.bridge for nic in instance.nics]
600 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
603 raise errors.OpPrereqError("One or more target bridges %s does not"
604 " exist on destination node '%s'" %
605 (brlist, instance.primary_node))
608 class LUDestroyCluster(NoHooksLU):
609 """Logical unit for destroying the cluster.
614 def CheckPrereq(self):
615 """Check prerequisites.
617 This checks whether the cluster is empty.
619 Any errors are signalled by raising errors.OpPrereqError.
622 master = self.cfg.GetMasterNode()
624 nodelist = self.cfg.GetNodeList()
625 if len(nodelist) != 1 or nodelist[0] != master:
626 raise errors.OpPrereqError("There are still %d node(s) in"
627 " this cluster." % (len(nodelist) - 1))
628 instancelist = self.cfg.GetInstanceList()
630 raise errors.OpPrereqError("There are still %d instance(s) in"
631 " this cluster." % len(instancelist))
633 def Exec(self, feedback_fn):
634 """Destroys the cluster.
637 master = self.cfg.GetMasterNode()
638 result = self.rpc.call_node_stop_master(master, False)
641 raise errors.OpExecError("Could not disable the master role")
642 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643 utils.CreateBackup(priv_key)
644 utils.CreateBackup(pub_key)
648 class LUVerifyCluster(LogicalUnit):
649 """Verifies the cluster status.
652 HPATH = "cluster-verify"
653 HTYPE = constants.HTYPE_CLUSTER
654 _OP_REQP = ["skip_checks"]
657 def ExpandNames(self):
658 self.needed_locks = {
659 locking.LEVEL_NODE: locking.ALL_SET,
660 locking.LEVEL_INSTANCE: locking.ALL_SET,
662 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
664 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665 node_result, feedback_fn, master_files,
667 """Run multiple tests against a node.
671 - compares ganeti version
672 - checks vg existance and size > 20G
673 - checks config file checksum
674 - checks ssh to other nodes
676 @type nodeinfo: L{objects.Node}
677 @param nodeinfo: the node to check
678 @param file_list: required list of files
679 @param local_cksum: dictionary of local files and their checksums
680 @param node_result: the results from the node
681 @param feedback_fn: function used to accumulate results
682 @param master_files: list of files that only masters should have
683 @param drbd_map: the useddrbd minors for this node, in
684 form of minor: (instance, must_exist) which correspond to instances
685 and their running status
686 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
691 # main result, node_result should be a non-empty dict
692 if not node_result or not isinstance(node_result, dict):
693 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
696 # compares ganeti version
697 local_version = constants.PROTOCOL_VERSION
698 remote_version = node_result.get('version', None)
699 if not (remote_version and isinstance(remote_version, (list, tuple)) and
700 len(remote_version) == 2):
701 feedback_fn(" - ERROR: connection to %s failed" % (node))
704 if local_version != remote_version[0]:
705 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
706 " node %s %s" % (local_version, node, remote_version[0]))
709 # node seems compatible, we can actually try to look into its results
713 # full package version
714 if constants.RELEASE_VERSION != remote_version[1]:
715 feedback_fn(" - WARNING: software version mismatch: master %s,"
717 (constants.RELEASE_VERSION, node, remote_version[1]))
719 # checks vg existence and size > 20G
720 if vg_name is not None:
721 vglist = node_result.get(constants.NV_VGLIST, None)
723 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
727 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728 constants.MIN_VG_SIZE)
730 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
733 # checks config file checksum
735 remote_cksum = node_result.get(constants.NV_FILELIST, None)
736 if not isinstance(remote_cksum, dict):
738 feedback_fn(" - ERROR: node hasn't returned file checksum data")
740 for file_name in file_list:
741 node_is_mc = nodeinfo.master_candidate
742 must_have_file = file_name not in master_files
743 if file_name not in remote_cksum:
744 if node_is_mc or must_have_file:
746 feedback_fn(" - ERROR: file '%s' missing" % file_name)
747 elif remote_cksum[file_name] != local_cksum[file_name]:
748 if node_is_mc or must_have_file:
750 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
752 # not candidate and this is not a must-have file
754 feedback_fn(" - ERROR: non master-candidate has old/wrong file"
757 # all good, except non-master/non-must have combination
758 if not node_is_mc and not must_have_file:
759 feedback_fn(" - ERROR: file '%s' should not exist on non master"
760 " candidates" % file_name)
764 if constants.NV_NODELIST not in node_result:
766 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
768 if node_result[constants.NV_NODELIST]:
770 for node in node_result[constants.NV_NODELIST]:
771 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
772 (node, node_result[constants.NV_NODELIST][node]))
774 if constants.NV_NODENETTEST not in node_result:
776 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
778 if node_result[constants.NV_NODENETTEST]:
780 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
782 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
783 (node, node_result[constants.NV_NODENETTEST][node]))
785 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786 if isinstance(hyp_result, dict):
787 for hv_name, hv_result in hyp_result.iteritems():
788 if hv_result is not None:
789 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
790 (hv_name, hv_result))
792 # check used drbd list
793 if vg_name is not None:
794 used_minors = node_result.get(constants.NV_DRBDLIST, [])
795 if not isinstance(used_minors, (tuple, list)):
796 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
799 for minor, (iname, must_exist) in drbd_map.items():
800 if minor not in used_minors and must_exist:
801 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
802 " not active" % (minor, iname))
804 for minor in used_minors:
805 if minor not in drbd_map:
806 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
812 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813 node_instance, feedback_fn, n_offline):
814 """Verify an instance.
816 This function checks to see if the required block devices are
817 available on the instance's node.
822 node_current = instanceconfig.primary_node
825 instanceconfig.MapLVsByNode(node_vol_should)
827 for node in node_vol_should:
828 if node in n_offline:
829 # ignore missing volumes on offline nodes
831 for volume in node_vol_should[node]:
832 if node not in node_vol_is or volume not in node_vol_is[node]:
833 feedback_fn(" - ERROR: volume %s missing on node %s" %
837 if instanceconfig.admin_up:
838 if ((node_current not in node_instance or
839 not instance in node_instance[node_current]) and
840 node_current not in n_offline):
841 feedback_fn(" - ERROR: instance %s not running on node %s" %
842 (instance, node_current))
845 for node in node_instance:
846 if (not node == node_current):
847 if instance in node_instance[node]:
848 feedback_fn(" - ERROR: instance %s should not run on node %s" %
854 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855 """Verify if there are any unknown volumes in the cluster.
857 The .os, .swap and backup volumes are ignored. All other volumes are
863 for node in node_vol_is:
864 for volume in node_vol_is[node]:
865 if node not in node_vol_should or volume not in node_vol_should[node]:
866 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
871 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872 """Verify the list of running instances.
874 This checks what instances are running but unknown to the cluster.
878 for node in node_instance:
879 for runninginstance in node_instance[node]:
880 if runninginstance not in instancelist:
881 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
882 (runninginstance, node))
886 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887 """Verify N+1 Memory Resilience.
889 Check that if one single node dies we can still start all the instances it
895 for node, nodeinfo in node_info.iteritems():
896 # This code checks that every node which is now listed as secondary has
897 # enough memory to host all instances it is supposed to should a single
898 # other node in the cluster fail.
899 # FIXME: not ready for failover to an arbitrary node
900 # FIXME: does not support file-backed instances
901 # WARNING: we currently take into account down instances as well as up
902 # ones, considering that even if they're down someone might want to start
903 # them even in the event of a node failure.
904 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
906 for instance in instances:
907 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908 if bep[constants.BE_AUTO_BALANCE]:
909 needed_mem += bep[constants.BE_MEMORY]
910 if nodeinfo['mfree'] < needed_mem:
911 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
912 " failovers should node %s fail" % (node, prinode))
916 def CheckPrereq(self):
917 """Check prerequisites.
919 Transform the list of checks we're going to skip into a set and check that
920 all its members are valid.
923 self.skip_set = frozenset(self.op.skip_checks)
924 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925 raise errors.OpPrereqError("Invalid checks to be skipped specified")
927 def BuildHooksEnv(self):
930 Cluster-Verify hooks just rone in the post phase and their failure makes
931 the output be logged in the verify output and the verification to fail.
934 all_nodes = self.cfg.GetNodeList()
936 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
938 for node in self.cfg.GetAllNodesInfo().values():
939 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
941 return env, [], all_nodes
943 def Exec(self, feedback_fn):
944 """Verify integrity of cluster, performing various test on nodes.
948 feedback_fn("* Verifying global settings")
949 for msg in self.cfg.VerifyConfig():
950 feedback_fn(" - ERROR: %s" % msg)
952 vg_name = self.cfg.GetVGName()
953 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954 nodelist = utils.NiceSort(self.cfg.GetNodeList())
955 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958 for iname in instancelist)
959 i_non_redundant = [] # Non redundant instances
960 i_non_a_balanced = [] # Non auto-balanced instances
961 n_offline = [] # List of offline nodes
962 n_drained = [] # List of nodes being drained
968 # FIXME: verify OS list
970 master_files = [constants.CLUSTER_CONF_FILE]
972 file_names = ssconf.SimpleStore().GetFileList()
973 file_names.append(constants.SSL_CERT_FILE)
974 file_names.append(constants.RAPI_CERT_FILE)
975 file_names.extend(master_files)
977 local_checksums = utils.FingerprintFiles(file_names)
979 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980 node_verify_param = {
981 constants.NV_FILELIST: file_names,
982 constants.NV_NODELIST: [node.name for node in nodeinfo
983 if not node.offline],
984 constants.NV_HYPERVISOR: hypervisors,
985 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986 node.secondary_ip) for node in nodeinfo
987 if not node.offline],
988 constants.NV_INSTANCELIST: hypervisors,
989 constants.NV_VERSION: None,
990 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
992 if vg_name is not None:
993 node_verify_param[constants.NV_VGLIST] = None
994 node_verify_param[constants.NV_LVLIST] = vg_name
995 node_verify_param[constants.NV_DRBDLIST] = None
996 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997 self.cfg.GetClusterName())
999 cluster = self.cfg.GetClusterInfo()
1000 master_node = self.cfg.GetMasterNode()
1001 all_drbd_map = self.cfg.ComputeDRBDMap()
1003 for node_i in nodeinfo:
1005 nresult = all_nvinfo[node].data
1008 feedback_fn("* Skipping offline node %s" % (node,))
1009 n_offline.append(node)
1012 if node == master_node:
1014 elif node_i.master_candidate:
1015 ntype = "master candidate"
1016 elif node_i.drained:
1018 n_drained.append(node)
1021 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1023 if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024 feedback_fn(" - ERROR: connection to %s failed" % (node,))
1029 for minor, instance in all_drbd_map[node].items():
1030 if instance not in instanceinfo:
1031 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1033 # ghost instance should not be running, but otherwise we
1034 # don't give double warnings (both ghost instance and
1035 # unallocated minor in use)
1036 node_drbd[minor] = (instance, False)
1038 instance = instanceinfo[instance]
1039 node_drbd[minor] = (instance.name, instance.admin_up)
1040 result = self._VerifyNode(node_i, file_names, local_checksums,
1041 nresult, feedback_fn, master_files,
1045 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1047 node_volume[node] = {}
1048 elif isinstance(lvdata, basestring):
1049 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1050 (node, utils.SafeEncode(lvdata)))
1052 node_volume[node] = {}
1053 elif not isinstance(lvdata, dict):
1054 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1058 node_volume[node] = lvdata
1061 idata = nresult.get(constants.NV_INSTANCELIST, None)
1062 if not isinstance(idata, list):
1063 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1068 node_instance[node] = idata
1071 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072 if not isinstance(nodeinfo, dict):
1073 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1079 "mfree": int(nodeinfo['memory_free']),
1082 # dictionary holding all instances this node is secondary for,
1083 # grouped by their primary node. Each key is a cluster node, and each
1084 # value is a list of instances which have the key as primary and the
1085 # current node as secondary. this is handy to calculate N+1 memory
1086 # availability if you can only failover from a primary to its
1088 "sinst-by-pnode": {},
1090 # FIXME: devise a free space model for file based instances as well
1091 if vg_name is not None:
1092 if (constants.NV_VGLIST not in nresult or
1093 vg_name not in nresult[constants.NV_VGLIST]):
1094 feedback_fn(" - ERROR: node %s didn't return data for the"
1095 " volume group '%s' - it is either missing or broken" %
1099 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100 except (ValueError, KeyError):
1101 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1102 " from node %s" % (node,))
1106 node_vol_should = {}
1108 for instance in instancelist:
1109 feedback_fn("* Verifying instance %s" % instance)
1110 inst_config = instanceinfo[instance]
1111 result = self._VerifyInstance(instance, inst_config, node_volume,
1112 node_instance, feedback_fn, n_offline)
1114 inst_nodes_offline = []
1116 inst_config.MapLVsByNode(node_vol_should)
1118 instance_cfg[instance] = inst_config
1120 pnode = inst_config.primary_node
1121 if pnode in node_info:
1122 node_info[pnode]['pinst'].append(instance)
1123 elif pnode not in n_offline:
1124 feedback_fn(" - ERROR: instance %s, connection to primary node"
1125 " %s failed" % (instance, pnode))
1128 if pnode in n_offline:
1129 inst_nodes_offline.append(pnode)
1131 # If the instance is non-redundant we cannot survive losing its primary
1132 # node, so we are not N+1 compliant. On the other hand we have no disk
1133 # templates with more than one secondary so that situation is not well
1135 # FIXME: does not support file-backed instances
1136 if len(inst_config.secondary_nodes) == 0:
1137 i_non_redundant.append(instance)
1138 elif len(inst_config.secondary_nodes) > 1:
1139 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1142 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143 i_non_a_balanced.append(instance)
1145 for snode in inst_config.secondary_nodes:
1146 if snode in node_info:
1147 node_info[snode]['sinst'].append(instance)
1148 if pnode not in node_info[snode]['sinst-by-pnode']:
1149 node_info[snode]['sinst-by-pnode'][pnode] = []
1150 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151 elif snode not in n_offline:
1152 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1153 " %s failed" % (instance, snode))
1155 if snode in n_offline:
1156 inst_nodes_offline.append(snode)
1158 if inst_nodes_offline:
1159 # warn that the instance lives on offline nodes, and set bad=True
1160 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1161 ", ".join(inst_nodes_offline))
1164 feedback_fn("* Verifying orphan volumes")
1165 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1169 feedback_fn("* Verifying remaining instances")
1170 result = self._VerifyOrphanInstances(instancelist, node_instance,
1174 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175 feedback_fn("* Verifying N+1 Memory redundancy")
1176 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1179 feedback_fn("* Other Notes")
1181 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1182 % len(i_non_redundant))
1184 if i_non_a_balanced:
1185 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1186 % len(i_non_a_balanced))
1189 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1192 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1196 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197 """Analize the post-hooks' result
1199 This method analyses the hook result, handles it, and sends some
1200 nicely-formatted feedback back to the user.
1202 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204 @param hooks_results: the results of the multi-node hooks rpc call
1205 @param feedback_fn: function used send feedback back to the caller
1206 @param lu_result: previous Exec result
1207 @return: the new Exec result, based on the previous result
1211 # We only really run POST phase hooks, and are only interested in
1213 if phase == constants.HOOKS_PHASE_POST:
1214 # Used to change hooks' output to proper indentation
1215 indent_re = re.compile('^', re.M)
1216 feedback_fn("* Hooks Results")
1217 if not hooks_results:
1218 feedback_fn(" - ERROR: general communication failure")
1221 for node_name in hooks_results:
1222 show_node_header = True
1223 res = hooks_results[node_name]
1224 if res.failed or res.data is False or not isinstance(res.data, list):
1226 # no need to warn or set fail return value
1228 feedback_fn(" Communication failure in hooks execution")
1231 for script, hkr, output in res.data:
1232 if hkr == constants.HKR_FAIL:
1233 # The node header is only shown once, if there are
1234 # failing hooks on that node
1235 if show_node_header:
1236 feedback_fn(" Node %s:" % node_name)
1237 show_node_header = False
1238 feedback_fn(" ERROR: Script %s failed, output:" % script)
1239 output = indent_re.sub(' ', output)
1240 feedback_fn("%s" % output)
1246 class LUVerifyDisks(NoHooksLU):
1247 """Verifies the cluster disks status.
1253 def ExpandNames(self):
1254 self.needed_locks = {
1255 locking.LEVEL_NODE: locking.ALL_SET,
1256 locking.LEVEL_INSTANCE: locking.ALL_SET,
1258 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1260 def CheckPrereq(self):
1261 """Check prerequisites.
1263 This has no prerequisites.
1268 def Exec(self, feedback_fn):
1269 """Verify integrity of cluster disks.
1272 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1274 vg_name = self.cfg.GetVGName()
1275 nodes = utils.NiceSort(self.cfg.GetNodeList())
1276 instances = [self.cfg.GetInstanceInfo(name)
1277 for name in self.cfg.GetInstanceList()]
1280 for inst in instances:
1282 if (not inst.admin_up or
1283 inst.disk_template not in constants.DTS_NET_MIRROR):
1285 inst.MapLVsByNode(inst_lvs)
1286 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287 for node, vol_list in inst_lvs.iteritems():
1288 for vol in vol_list:
1289 nv_dict[(node, vol)] = inst
1294 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1299 lvs = node_lvs[node]
1302 self.LogWarning("Connection to node %s failed: %s" %
1306 if isinstance(lvs, basestring):
1307 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308 res_nlvm[node] = lvs
1310 elif not isinstance(lvs, dict):
1311 logging.warning("Connection to node %s failed or invalid data"
1313 res_nodes.append(node)
1316 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317 inst = nv_dict.pop((node, lv_name), None)
1318 if (not lv_online and inst is not None
1319 and inst.name not in res_instances):
1320 res_instances.append(inst.name)
1322 # any leftover items in nv_dict are missing LVs, let's arrange the
1324 for key, inst in nv_dict.iteritems():
1325 if inst.name not in res_missing:
1326 res_missing[inst.name] = []
1327 res_missing[inst.name].append(key)
1332 class LURenameCluster(LogicalUnit):
1333 """Rename the cluster.
1336 HPATH = "cluster-rename"
1337 HTYPE = constants.HTYPE_CLUSTER
1340 def BuildHooksEnv(self):
1345 "OP_TARGET": self.cfg.GetClusterName(),
1346 "NEW_NAME": self.op.name,
1348 mn = self.cfg.GetMasterNode()
1349 return env, [mn], [mn]
1351 def CheckPrereq(self):
1352 """Verify that the passed name is a valid one.
1355 hostname = utils.HostInfo(self.op.name)
1357 new_name = hostname.name
1358 self.ip = new_ip = hostname.ip
1359 old_name = self.cfg.GetClusterName()
1360 old_ip = self.cfg.GetMasterIP()
1361 if new_name == old_name and new_ip == old_ip:
1362 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1363 " cluster has changed")
1364 if new_ip != old_ip:
1365 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1366 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1367 " reachable on the network. Aborting." %
1370 self.op.name = new_name
1372 def Exec(self, feedback_fn):
1373 """Rename the cluster.
1376 clustername = self.op.name
1379 # shutdown the master IP
1380 master = self.cfg.GetMasterNode()
1381 result = self.rpc.call_node_stop_master(master, False)
1382 if result.failed or not result.data:
1383 raise errors.OpExecError("Could not disable the master role")
1386 cluster = self.cfg.GetClusterInfo()
1387 cluster.cluster_name = clustername
1388 cluster.master_ip = ip
1389 self.cfg.Update(cluster)
1391 # update the known hosts file
1392 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1393 node_list = self.cfg.GetNodeList()
1395 node_list.remove(master)
1398 result = self.rpc.call_upload_file(node_list,
1399 constants.SSH_KNOWN_HOSTS_FILE)
1400 for to_node, to_result in result.iteritems():
1401 if to_result.failed or not to_result.data:
1402 logging.error("Copy of file %s to node %s failed",
1403 constants.SSH_KNOWN_HOSTS_FILE, to_node)
1406 result = self.rpc.call_node_start_master(master, False)
1407 if result.failed or not result.data:
1408 self.LogWarning("Could not re-enable the master role on"
1409 " the master, please restart manually.")
1412 def _RecursiveCheckIfLVMBased(disk):
1413 """Check if the given disk or its children are lvm-based.
1415 @type disk: L{objects.Disk}
1416 @param disk: the disk to check
1418 @return: boolean indicating whether a LD_LV dev_type was found or not
1422 for chdisk in disk.children:
1423 if _RecursiveCheckIfLVMBased(chdisk):
1425 return disk.dev_type == constants.LD_LV
1428 class LUSetClusterParams(LogicalUnit):
1429 """Change the parameters of the cluster.
1432 HPATH = "cluster-modify"
1433 HTYPE = constants.HTYPE_CLUSTER
1437 def CheckArguments(self):
1441 if not hasattr(self.op, "candidate_pool_size"):
1442 self.op.candidate_pool_size = None
1443 if self.op.candidate_pool_size is not None:
1445 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1446 except (ValueError, TypeError), err:
1447 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1449 if self.op.candidate_pool_size < 1:
1450 raise errors.OpPrereqError("At least one master candidate needed")
1452 def ExpandNames(self):
1453 # FIXME: in the future maybe other cluster params won't require checking on
1454 # all nodes to be modified.
1455 self.needed_locks = {
1456 locking.LEVEL_NODE: locking.ALL_SET,
1458 self.share_locks[locking.LEVEL_NODE] = 1
1460 def BuildHooksEnv(self):
1465 "OP_TARGET": self.cfg.GetClusterName(),
1466 "NEW_VG_NAME": self.op.vg_name,
1468 mn = self.cfg.GetMasterNode()
1469 return env, [mn], [mn]
1471 def CheckPrereq(self):
1472 """Check prerequisites.
1474 This checks whether the given params don't conflict and
1475 if the given volume group is valid.
1478 if self.op.vg_name is not None and not self.op.vg_name:
1479 instances = self.cfg.GetAllInstancesInfo().values()
1480 for inst in instances:
1481 for disk in inst.disks:
1482 if _RecursiveCheckIfLVMBased(disk):
1483 raise errors.OpPrereqError("Cannot disable lvm storage while"
1484 " lvm-based instances exist")
1486 node_list = self.acquired_locks[locking.LEVEL_NODE]
1488 # if vg_name not None, checks given volume group on all nodes
1490 vglist = self.rpc.call_vg_list(node_list)
1491 for node in node_list:
1492 if vglist[node].failed:
1493 # ignoring down node
1494 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1496 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1498 constants.MIN_VG_SIZE)
1500 raise errors.OpPrereqError("Error on node '%s': %s" %
1503 self.cluster = cluster = self.cfg.GetClusterInfo()
1504 # validate beparams changes
1505 if self.op.beparams:
1506 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1507 self.new_beparams = cluster.FillDict(
1508 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1510 # hypervisor list/parameters
1511 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1512 if self.op.hvparams:
1513 if not isinstance(self.op.hvparams, dict):
1514 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1515 for hv_name, hv_dict in self.op.hvparams.items():
1516 if hv_name not in self.new_hvparams:
1517 self.new_hvparams[hv_name] = hv_dict
1519 self.new_hvparams[hv_name].update(hv_dict)
1521 if self.op.enabled_hypervisors is not None:
1522 self.hv_list = self.op.enabled_hypervisors
1524 self.hv_list = cluster.enabled_hypervisors
1526 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1527 # either the enabled list has changed, or the parameters have, validate
1528 for hv_name, hv_params in self.new_hvparams.items():
1529 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1530 (self.op.enabled_hypervisors and
1531 hv_name in self.op.enabled_hypervisors)):
1532 # either this is a new hypervisor, or its parameters have changed
1533 hv_class = hypervisor.GetHypervisor(hv_name)
1534 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1535 hv_class.CheckParameterSyntax(hv_params)
1536 _CheckHVParams(self, node_list, hv_name, hv_params)
1538 def Exec(self, feedback_fn):
1539 """Change the parameters of the cluster.
1542 if self.op.vg_name is not None:
1543 new_volume = self.op.vg_name
1546 if new_volume != self.cfg.GetVGName():
1547 self.cfg.SetVGName(new_volume)
1549 feedback_fn("Cluster LVM configuration already in desired"
1550 " state, not changing")
1551 if self.op.hvparams:
1552 self.cluster.hvparams = self.new_hvparams
1553 if self.op.enabled_hypervisors is not None:
1554 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1555 if self.op.beparams:
1556 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1557 if self.op.candidate_pool_size is not None:
1558 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1560 self.cfg.Update(self.cluster)
1562 # we want to update nodes after the cluster so that if any errors
1563 # happen, we have recorded and saved the cluster info
1564 if self.op.candidate_pool_size is not None:
1565 _AdjustCandidatePool(self)
1568 class LURedistributeConfig(NoHooksLU):
1569 """Force the redistribution of cluster configuration.
1571 This is a very simple LU.
1577 def ExpandNames(self):
1578 self.needed_locks = {
1579 locking.LEVEL_NODE: locking.ALL_SET,
1581 self.share_locks[locking.LEVEL_NODE] = 1
1583 def CheckPrereq(self):
1584 """Check prerequisites.
1588 def Exec(self, feedback_fn):
1589 """Redistribute the configuration.
1592 self.cfg.Update(self.cfg.GetClusterInfo())
1595 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1596 """Sleep and poll for an instance's disk to sync.
1599 if not instance.disks:
1603 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1605 node = instance.primary_node
1607 for dev in instance.disks:
1608 lu.cfg.SetDiskID(dev, node)
1614 cumul_degraded = False
1615 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1616 if rstats.failed or not rstats.data:
1617 lu.LogWarning("Can't get any data from node %s", node)
1620 raise errors.RemoteError("Can't contact node %s for mirror data,"
1621 " aborting." % node)
1624 rstats = rstats.data
1626 for i, mstat in enumerate(rstats):
1628 lu.LogWarning("Can't compute data for node %s/%s",
1629 node, instance.disks[i].iv_name)
1631 # we ignore the ldisk parameter
1632 perc_done, est_time, is_degraded, _ = mstat
1633 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1634 if perc_done is not None:
1636 if est_time is not None:
1637 rem_time = "%d estimated seconds remaining" % est_time
1640 rem_time = "no time estimate"
1641 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1642 (instance.disks[i].iv_name, perc_done, rem_time))
1646 time.sleep(min(60, max_time))
1649 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1650 return not cumul_degraded
1653 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1654 """Check that mirrors are not degraded.
1656 The ldisk parameter, if True, will change the test from the
1657 is_degraded attribute (which represents overall non-ok status for
1658 the device(s)) to the ldisk (representing the local storage status).
1661 lu.cfg.SetDiskID(dev, node)
1668 if on_primary or dev.AssembleOnSecondary():
1669 rstats = lu.rpc.call_blockdev_find(node, dev)
1670 msg = rstats.RemoteFailMsg()
1672 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1674 elif not rstats.payload:
1675 lu.LogWarning("Can't find disk on node %s", node)
1678 result = result and (not rstats.payload[idx])
1680 for child in dev.children:
1681 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1686 class LUDiagnoseOS(NoHooksLU):
1687 """Logical unit for OS diagnose/query.
1690 _OP_REQP = ["output_fields", "names"]
1692 _FIELDS_STATIC = utils.FieldSet()
1693 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1695 def ExpandNames(self):
1697 raise errors.OpPrereqError("Selective OS query not supported")
1699 _CheckOutputFields(static=self._FIELDS_STATIC,
1700 dynamic=self._FIELDS_DYNAMIC,
1701 selected=self.op.output_fields)
1703 # Lock all nodes, in shared mode
1704 # Temporary removal of locks, should be reverted later
1705 # TODO: reintroduce locks when they are lighter-weight
1706 self.needed_locks = {}
1707 #self.share_locks[locking.LEVEL_NODE] = 1
1708 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1710 def CheckPrereq(self):
1711 """Check prerequisites.
1716 def _DiagnoseByOS(node_list, rlist):
1717 """Remaps a per-node return list into an a per-os per-node dictionary
1719 @param node_list: a list with the names of all nodes
1720 @param rlist: a map with node names as keys and OS objects as values
1723 @return: a dictionary with osnames as keys and as value another map, with
1724 nodes as keys and list of OS objects as values, eg::
1726 {"debian-etch": {"node1": [<object>,...],
1727 "node2": [<object>,]}
1732 # we build here the list of nodes that didn't fail the RPC (at RPC
1733 # level), so that nodes with a non-responding node daemon don't
1734 # make all OSes invalid
1735 good_nodes = [node_name for node_name in rlist
1736 if not rlist[node_name].failed]
1737 for node_name, nr in rlist.iteritems():
1738 if nr.failed or not nr.data:
1740 for os_obj in nr.data:
1741 if os_obj.name not in all_os:
1742 # build a list of nodes for this os containing empty lists
1743 # for each node in node_list
1744 all_os[os_obj.name] = {}
1745 for nname in good_nodes:
1746 all_os[os_obj.name][nname] = []
1747 all_os[os_obj.name][node_name].append(os_obj)
1750 def Exec(self, feedback_fn):
1751 """Compute the list of OSes.
1754 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1755 node_data = self.rpc.call_os_diagnose(valid_nodes)
1756 if node_data == False:
1757 raise errors.OpExecError("Can't gather the list of OSes")
1758 pol = self._DiagnoseByOS(valid_nodes, node_data)
1760 for os_name, os_data in pol.iteritems():
1762 for field in self.op.output_fields:
1765 elif field == "valid":
1766 val = utils.all([osl and osl[0] for osl in os_data.values()])
1767 elif field == "node_status":
1769 for node_name, nos_list in os_data.iteritems():
1770 val[node_name] = [(v.status, v.path) for v in nos_list]
1772 raise errors.ParameterError(field)
1779 class LURemoveNode(LogicalUnit):
1780 """Logical unit for removing a node.
1783 HPATH = "node-remove"
1784 HTYPE = constants.HTYPE_NODE
1785 _OP_REQP = ["node_name"]
1787 def BuildHooksEnv(self):
1790 This doesn't run on the target node in the pre phase as a failed
1791 node would then be impossible to remove.
1795 "OP_TARGET": self.op.node_name,
1796 "NODE_NAME": self.op.node_name,
1798 all_nodes = self.cfg.GetNodeList()
1799 all_nodes.remove(self.op.node_name)
1800 return env, all_nodes, all_nodes
1802 def CheckPrereq(self):
1803 """Check prerequisites.
1806 - the node exists in the configuration
1807 - it does not have primary or secondary instances
1808 - it's not the master
1810 Any errors are signalled by raising errors.OpPrereqError.
1813 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1815 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1817 instance_list = self.cfg.GetInstanceList()
1819 masternode = self.cfg.GetMasterNode()
1820 if node.name == masternode:
1821 raise errors.OpPrereqError("Node is the master node,"
1822 " you need to failover first.")
1824 for instance_name in instance_list:
1825 instance = self.cfg.GetInstanceInfo(instance_name)
1826 if node.name in instance.all_nodes:
1827 raise errors.OpPrereqError("Instance %s is still running on the node,"
1828 " please remove first." % instance_name)
1829 self.op.node_name = node.name
1832 def Exec(self, feedback_fn):
1833 """Removes the node from the cluster.
1837 logging.info("Stopping the node daemon and removing configs from node %s",
1840 self.context.RemoveNode(node.name)
1842 self.rpc.call_node_leave_cluster(node.name)
1844 # Promote nodes to master candidate as needed
1845 _AdjustCandidatePool(self)
1848 class LUQueryNodes(NoHooksLU):
1849 """Logical unit for querying nodes.
1852 _OP_REQP = ["output_fields", "names", "use_locking"]
1854 _FIELDS_DYNAMIC = utils.FieldSet(
1856 "mtotal", "mnode", "mfree",
1858 "ctotal", "cnodes", "csockets",
1861 _FIELDS_STATIC = utils.FieldSet(
1862 "name", "pinst_cnt", "sinst_cnt",
1863 "pinst_list", "sinst_list",
1864 "pip", "sip", "tags",
1872 def ExpandNames(self):
1873 _CheckOutputFields(static=self._FIELDS_STATIC,
1874 dynamic=self._FIELDS_DYNAMIC,
1875 selected=self.op.output_fields)
1877 self.needed_locks = {}
1878 self.share_locks[locking.LEVEL_NODE] = 1
1881 self.wanted = _GetWantedNodes(self, self.op.names)
1883 self.wanted = locking.ALL_SET
1885 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1886 self.do_locking = self.do_node_query and self.op.use_locking
1888 # if we don't request only static fields, we need to lock the nodes
1889 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1892 def CheckPrereq(self):
1893 """Check prerequisites.
1896 # The validation of the node list is done in the _GetWantedNodes,
1897 # if non empty, and if empty, there's no validation to do
1900 def Exec(self, feedback_fn):
1901 """Computes the list of nodes and their attributes.
1904 all_info = self.cfg.GetAllNodesInfo()
1906 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1907 elif self.wanted != locking.ALL_SET:
1908 nodenames = self.wanted
1909 missing = set(nodenames).difference(all_info.keys())
1911 raise errors.OpExecError(
1912 "Some nodes were removed before retrieving their data: %s" % missing)
1914 nodenames = all_info.keys()
1916 nodenames = utils.NiceSort(nodenames)
1917 nodelist = [all_info[name] for name in nodenames]
1919 # begin data gathering
1921 if self.do_node_query:
1923 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1924 self.cfg.GetHypervisorType())
1925 for name in nodenames:
1926 nodeinfo = node_data[name]
1927 if not nodeinfo.failed and nodeinfo.data:
1928 nodeinfo = nodeinfo.data
1929 fn = utils.TryConvert
1931 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1932 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1933 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1934 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1935 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1936 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1937 "bootid": nodeinfo.get('bootid', None),
1938 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1939 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1942 live_data[name] = {}
1944 live_data = dict.fromkeys(nodenames, {})
1946 node_to_primary = dict([(name, set()) for name in nodenames])
1947 node_to_secondary = dict([(name, set()) for name in nodenames])
1949 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1950 "sinst_cnt", "sinst_list"))
1951 if inst_fields & frozenset(self.op.output_fields):
1952 instancelist = self.cfg.GetInstanceList()
1954 for instance_name in instancelist:
1955 inst = self.cfg.GetInstanceInfo(instance_name)
1956 if inst.primary_node in node_to_primary:
1957 node_to_primary[inst.primary_node].add(inst.name)
1958 for secnode in inst.secondary_nodes:
1959 if secnode in node_to_secondary:
1960 node_to_secondary[secnode].add(inst.name)
1962 master_node = self.cfg.GetMasterNode()
1964 # end data gathering
1967 for node in nodelist:
1969 for field in self.op.output_fields:
1972 elif field == "pinst_list":
1973 val = list(node_to_primary[node.name])
1974 elif field == "sinst_list":
1975 val = list(node_to_secondary[node.name])
1976 elif field == "pinst_cnt":
1977 val = len(node_to_primary[node.name])
1978 elif field == "sinst_cnt":
1979 val = len(node_to_secondary[node.name])
1980 elif field == "pip":
1981 val = node.primary_ip
1982 elif field == "sip":
1983 val = node.secondary_ip
1984 elif field == "tags":
1985 val = list(node.GetTags())
1986 elif field == "serial_no":
1987 val = node.serial_no
1988 elif field == "master_candidate":
1989 val = node.master_candidate
1990 elif field == "master":
1991 val = node.name == master_node
1992 elif field == "offline":
1994 elif field == "drained":
1996 elif self._FIELDS_DYNAMIC.Matches(field):
1997 val = live_data[node.name].get(field, None)
1999 raise errors.ParameterError(field)
2000 node_output.append(val)
2001 output.append(node_output)
2006 class LUQueryNodeVolumes(NoHooksLU):
2007 """Logical unit for getting volumes on node(s).
2010 _OP_REQP = ["nodes", "output_fields"]
2012 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2013 _FIELDS_STATIC = utils.FieldSet("node")
2015 def ExpandNames(self):
2016 _CheckOutputFields(static=self._FIELDS_STATIC,
2017 dynamic=self._FIELDS_DYNAMIC,
2018 selected=self.op.output_fields)
2020 self.needed_locks = {}
2021 self.share_locks[locking.LEVEL_NODE] = 1
2022 if not self.op.nodes:
2023 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2025 self.needed_locks[locking.LEVEL_NODE] = \
2026 _GetWantedNodes(self, self.op.nodes)
2028 def CheckPrereq(self):
2029 """Check prerequisites.
2031 This checks that the fields required are valid output fields.
2034 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2036 def Exec(self, feedback_fn):
2037 """Computes the list of nodes and their attributes.
2040 nodenames = self.nodes
2041 volumes = self.rpc.call_node_volumes(nodenames)
2043 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2044 in self.cfg.GetInstanceList()]
2046 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2049 for node in nodenames:
2050 if node not in volumes or volumes[node].failed or not volumes[node].data:
2053 node_vols = volumes[node].data[:]
2054 node_vols.sort(key=lambda vol: vol['dev'])
2056 for vol in node_vols:
2058 for field in self.op.output_fields:
2061 elif field == "phys":
2065 elif field == "name":
2067 elif field == "size":
2068 val = int(float(vol['size']))
2069 elif field == "instance":
2071 if node not in lv_by_node[inst]:
2073 if vol['name'] in lv_by_node[inst][node]:
2079 raise errors.ParameterError(field)
2080 node_output.append(str(val))
2082 output.append(node_output)
2087 class LUAddNode(LogicalUnit):
2088 """Logical unit for adding node to the cluster.
2092 HTYPE = constants.HTYPE_NODE
2093 _OP_REQP = ["node_name"]
2095 def BuildHooksEnv(self):
2098 This will run on all nodes before, and on all nodes + the new node after.
2102 "OP_TARGET": self.op.node_name,
2103 "NODE_NAME": self.op.node_name,
2104 "NODE_PIP": self.op.primary_ip,
2105 "NODE_SIP": self.op.secondary_ip,
2107 nodes_0 = self.cfg.GetNodeList()
2108 nodes_1 = nodes_0 + [self.op.node_name, ]
2109 return env, nodes_0, nodes_1
2111 def CheckPrereq(self):
2112 """Check prerequisites.
2115 - the new node is not already in the config
2117 - its parameters (single/dual homed) matches the cluster
2119 Any errors are signalled by raising errors.OpPrereqError.
2122 node_name = self.op.node_name
2125 dns_data = utils.HostInfo(node_name)
2127 node = dns_data.name
2128 primary_ip = self.op.primary_ip = dns_data.ip
2129 secondary_ip = getattr(self.op, "secondary_ip", None)
2130 if secondary_ip is None:
2131 secondary_ip = primary_ip
2132 if not utils.IsValidIP(secondary_ip):
2133 raise errors.OpPrereqError("Invalid secondary IP given")
2134 self.op.secondary_ip = secondary_ip
2136 node_list = cfg.GetNodeList()
2137 if not self.op.readd and node in node_list:
2138 raise errors.OpPrereqError("Node %s is already in the configuration" %
2140 elif self.op.readd and node not in node_list:
2141 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2143 for existing_node_name in node_list:
2144 existing_node = cfg.GetNodeInfo(existing_node_name)
2146 if self.op.readd and node == existing_node_name:
2147 if (existing_node.primary_ip != primary_ip or
2148 existing_node.secondary_ip != secondary_ip):
2149 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2150 " address configuration as before")
2153 if (existing_node.primary_ip == primary_ip or
2154 existing_node.secondary_ip == primary_ip or
2155 existing_node.primary_ip == secondary_ip or
2156 existing_node.secondary_ip == secondary_ip):
2157 raise errors.OpPrereqError("New node ip address(es) conflict with"
2158 " existing node %s" % existing_node.name)
2160 # check that the type of the node (single versus dual homed) is the
2161 # same as for the master
2162 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2163 master_singlehomed = myself.secondary_ip == myself.primary_ip
2164 newbie_singlehomed = secondary_ip == primary_ip
2165 if master_singlehomed != newbie_singlehomed:
2166 if master_singlehomed:
2167 raise errors.OpPrereqError("The master has no private ip but the"
2168 " new node has one")
2170 raise errors.OpPrereqError("The master has a private ip but the"
2171 " new node doesn't have one")
2173 # checks reachablity
2174 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2175 raise errors.OpPrereqError("Node not reachable by ping")
2177 if not newbie_singlehomed:
2178 # check reachability from my secondary ip to newbie's secondary ip
2179 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2180 source=myself.secondary_ip):
2181 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2182 " based ping to noded port")
2184 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2185 mc_now, _ = self.cfg.GetMasterCandidateStats()
2186 master_candidate = mc_now < cp_size
2188 self.new_node = objects.Node(name=node,
2189 primary_ip=primary_ip,
2190 secondary_ip=secondary_ip,
2191 master_candidate=master_candidate,
2192 offline=False, drained=False)
2194 def Exec(self, feedback_fn):
2195 """Adds the new node to the cluster.
2198 new_node = self.new_node
2199 node = new_node.name
2201 # check connectivity
2202 result = self.rpc.call_version([node])[node]
2205 if constants.PROTOCOL_VERSION == result.data:
2206 logging.info("Communication to node %s fine, sw version %s match",
2209 raise errors.OpExecError("Version mismatch master version %s,"
2210 " node version %s" %
2211 (constants.PROTOCOL_VERSION, result.data))
2213 raise errors.OpExecError("Cannot get version from the new node")
2216 logging.info("Copy ssh key to node %s", node)
2217 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2219 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2220 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2226 keyarray.append(f.read())
2230 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2232 keyarray[3], keyarray[4], keyarray[5])
2234 msg = result.RemoteFailMsg()
2236 raise errors.OpExecError("Cannot transfer ssh keys to the"
2237 " new node: %s" % msg)
2239 # Add node to our /etc/hosts, and add key to known_hosts
2240 utils.AddHostToEtcHosts(new_node.name)
2242 if new_node.secondary_ip != new_node.primary_ip:
2243 result = self.rpc.call_node_has_ip_address(new_node.name,
2244 new_node.secondary_ip)
2245 if result.failed or not result.data:
2246 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2247 " you gave (%s). Please fix and re-run this"
2248 " command." % new_node.secondary_ip)
2250 node_verify_list = [self.cfg.GetMasterNode()]
2251 node_verify_param = {
2253 # TODO: do a node-net-test as well?
2256 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2257 self.cfg.GetClusterName())
2258 for verifier in node_verify_list:
2259 if result[verifier].failed or not result[verifier].data:
2260 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2261 " for remote verification" % verifier)
2262 if result[verifier].data['nodelist']:
2263 for failed in result[verifier].data['nodelist']:
2264 feedback_fn("ssh/hostname verification failed %s -> %s" %
2265 (verifier, result[verifier].data['nodelist'][failed]))
2266 raise errors.OpExecError("ssh/hostname verification failed.")
2268 # Distribute updated /etc/hosts and known_hosts to all nodes,
2269 # including the node just added
2270 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2271 dist_nodes = self.cfg.GetNodeList()
2272 if not self.op.readd:
2273 dist_nodes.append(node)
2274 if myself.name in dist_nodes:
2275 dist_nodes.remove(myself.name)
2277 logging.debug("Copying hosts and known_hosts to all nodes")
2278 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2279 result = self.rpc.call_upload_file(dist_nodes, fname)
2280 for to_node, to_result in result.iteritems():
2281 if to_result.failed or not to_result.data:
2282 logging.error("Copy of file %s to node %s failed", fname, to_node)
2285 enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2286 if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2287 to_copy.append(constants.VNC_PASSWORD_FILE)
2289 for fname in to_copy:
2290 result = self.rpc.call_upload_file([node], fname)
2291 if result[node].failed or not result[node]:
2292 logging.error("Could not copy file %s to node %s", fname, node)
2295 self.context.ReaddNode(new_node)
2297 self.context.AddNode(new_node)
2300 class LUSetNodeParams(LogicalUnit):
2301 """Modifies the parameters of a node.
2304 HPATH = "node-modify"
2305 HTYPE = constants.HTYPE_NODE
2306 _OP_REQP = ["node_name"]
2309 def CheckArguments(self):
2310 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2311 if node_name is None:
2312 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2313 self.op.node_name = node_name
2314 _CheckBooleanOpField(self.op, 'master_candidate')
2315 _CheckBooleanOpField(self.op, 'offline')
2316 _CheckBooleanOpField(self.op, 'drained')
2317 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2318 if all_mods.count(None) == 3:
2319 raise errors.OpPrereqError("Please pass at least one modification")
2320 if all_mods.count(True) > 1:
2321 raise errors.OpPrereqError("Can't set the node into more than one"
2322 " state at the same time")
2324 def ExpandNames(self):
2325 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2327 def BuildHooksEnv(self):
2330 This runs on the master node.
2334 "OP_TARGET": self.op.node_name,
2335 "MASTER_CANDIDATE": str(self.op.master_candidate),
2336 "OFFLINE": str(self.op.offline),
2337 "DRAINED": str(self.op.drained),
2339 nl = [self.cfg.GetMasterNode(),
2343 def CheckPrereq(self):
2344 """Check prerequisites.
2346 This only checks the instance list against the existing names.
2349 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2351 if ((self.op.master_candidate == False or self.op.offline == True or
2352 self.op.drained == True) and node.master_candidate):
2353 # we will demote the node from master_candidate
2354 if self.op.node_name == self.cfg.GetMasterNode():
2355 raise errors.OpPrereqError("The master node has to be a"
2356 " master candidate, online and not drained")
2357 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2358 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2359 if num_candidates <= cp_size:
2360 msg = ("Not enough master candidates (desired"
2361 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2363 self.LogWarning(msg)
2365 raise errors.OpPrereqError(msg)
2367 if (self.op.master_candidate == True and
2368 ((node.offline and not self.op.offline == False) or
2369 (node.drained and not self.op.drained == False))):
2370 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2371 " to master_candidate" % node.name)
2375 def Exec(self, feedback_fn):
2384 if self.op.offline is not None:
2385 node.offline = self.op.offline
2386 result.append(("offline", str(self.op.offline)))
2387 if self.op.offline == True:
2388 if node.master_candidate:
2389 node.master_candidate = False
2391 result.append(("master_candidate", "auto-demotion due to offline"))
2393 node.drained = False
2394 result.append(("drained", "clear drained status due to offline"))
2396 if self.op.master_candidate is not None:
2397 node.master_candidate = self.op.master_candidate
2399 result.append(("master_candidate", str(self.op.master_candidate)))
2400 if self.op.master_candidate == False:
2401 rrc = self.rpc.call_node_demote_from_mc(node.name)
2402 msg = rrc.RemoteFailMsg()
2404 self.LogWarning("Node failed to demote itself: %s" % msg)
2406 if self.op.drained is not None:
2407 node.drained = self.op.drained
2408 result.append(("drained", str(self.op.drained)))
2409 if self.op.drained == True:
2410 if node.master_candidate:
2411 node.master_candidate = False
2413 result.append(("master_candidate", "auto-demotion due to drain"))
2415 node.offline = False
2416 result.append(("offline", "clear offline status due to drain"))
2418 # this will trigger configuration file update, if needed
2419 self.cfg.Update(node)
2420 # this will trigger job queue propagation or cleanup
2422 self.context.ReaddNode(node)
2427 class LUQueryClusterInfo(NoHooksLU):
2428 """Query cluster configuration.
2434 def ExpandNames(self):
2435 self.needed_locks = {}
2437 def CheckPrereq(self):
2438 """No prerequsites needed for this LU.
2443 def Exec(self, feedback_fn):
2444 """Return cluster config.
2447 cluster = self.cfg.GetClusterInfo()
2449 "software_version": constants.RELEASE_VERSION,
2450 "protocol_version": constants.PROTOCOL_VERSION,
2451 "config_version": constants.CONFIG_VERSION,
2452 "os_api_version": constants.OS_API_VERSION,
2453 "export_version": constants.EXPORT_VERSION,
2454 "architecture": (platform.architecture()[0], platform.machine()),
2455 "name": cluster.cluster_name,
2456 "master": cluster.master_node,
2457 "default_hypervisor": cluster.default_hypervisor,
2458 "enabled_hypervisors": cluster.enabled_hypervisors,
2459 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2460 for hypervisor in cluster.enabled_hypervisors]),
2461 "beparams": cluster.beparams,
2462 "candidate_pool_size": cluster.candidate_pool_size,
2463 "default_bridge": cluster.default_bridge,
2464 "master_netdev": cluster.master_netdev,
2465 "volume_group_name": cluster.volume_group_name,
2466 "file_storage_dir": cluster.file_storage_dir,
2472 class LUQueryConfigValues(NoHooksLU):
2473 """Return configuration values.
2478 _FIELDS_DYNAMIC = utils.FieldSet()
2479 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2481 def ExpandNames(self):
2482 self.needed_locks = {}
2484 _CheckOutputFields(static=self._FIELDS_STATIC,
2485 dynamic=self._FIELDS_DYNAMIC,
2486 selected=self.op.output_fields)
2488 def CheckPrereq(self):
2489 """No prerequisites.
2494 def Exec(self, feedback_fn):
2495 """Dump a representation of the cluster config to the standard output.
2499 for field in self.op.output_fields:
2500 if field == "cluster_name":
2501 entry = self.cfg.GetClusterName()
2502 elif field == "master_node":
2503 entry = self.cfg.GetMasterNode()
2504 elif field == "drain_flag":
2505 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2507 raise errors.ParameterError(field)
2508 values.append(entry)
2512 class LUActivateInstanceDisks(NoHooksLU):
2513 """Bring up an instance's disks.
2516 _OP_REQP = ["instance_name"]
2519 def ExpandNames(self):
2520 self._ExpandAndLockInstance()
2521 self.needed_locks[locking.LEVEL_NODE] = []
2522 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2524 def DeclareLocks(self, level):
2525 if level == locking.LEVEL_NODE:
2526 self._LockInstancesNodes()
2528 def CheckPrereq(self):
2529 """Check prerequisites.
2531 This checks that the instance is in the cluster.
2534 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2535 assert self.instance is not None, \
2536 "Cannot retrieve locked instance %s" % self.op.instance_name
2537 _CheckNodeOnline(self, self.instance.primary_node)
2539 def Exec(self, feedback_fn):
2540 """Activate the disks.
2543 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2545 raise errors.OpExecError("Cannot activate block devices")
2550 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2551 """Prepare the block devices for an instance.
2553 This sets up the block devices on all nodes.
2555 @type lu: L{LogicalUnit}
2556 @param lu: the logical unit on whose behalf we execute
2557 @type instance: L{objects.Instance}
2558 @param instance: the instance for whose disks we assemble
2559 @type ignore_secondaries: boolean
2560 @param ignore_secondaries: if true, errors on secondary nodes
2561 won't result in an error return from the function
2562 @return: False if the operation failed, otherwise a list of
2563 (host, instance_visible_name, node_visible_name)
2564 with the mapping from node devices to instance devices
2569 iname = instance.name
2570 # With the two passes mechanism we try to reduce the window of
2571 # opportunity for the race condition of switching DRBD to primary
2572 # before handshaking occured, but we do not eliminate it
2574 # The proper fix would be to wait (with some limits) until the
2575 # connection has been made and drbd transitions from WFConnection
2576 # into any other network-connected state (Connected, SyncTarget,
2579 # 1st pass, assemble on all nodes in secondary mode
2580 for inst_disk in instance.disks:
2581 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2582 lu.cfg.SetDiskID(node_disk, node)
2583 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2584 msg = result.RemoteFailMsg()
2586 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2587 " (is_primary=False, pass=1): %s",
2588 inst_disk.iv_name, node, msg)
2589 if not ignore_secondaries:
2592 # FIXME: race condition on drbd migration to primary
2594 # 2nd pass, do only the primary node
2595 for inst_disk in instance.disks:
2596 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2597 if node != instance.primary_node:
2599 lu.cfg.SetDiskID(node_disk, node)
2600 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2601 msg = result.RemoteFailMsg()
2603 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2604 " (is_primary=True, pass=2): %s",
2605 inst_disk.iv_name, node, msg)
2607 device_info.append((instance.primary_node, inst_disk.iv_name,
2610 # leave the disks configured for the primary node
2611 # this is a workaround that would be fixed better by
2612 # improving the logical/physical id handling
2613 for disk in instance.disks:
2614 lu.cfg.SetDiskID(disk, instance.primary_node)
2616 return disks_ok, device_info
2619 def _StartInstanceDisks(lu, instance, force):
2620 """Start the disks of an instance.
2623 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2624 ignore_secondaries=force)
2626 _ShutdownInstanceDisks(lu, instance)
2627 if force is not None and not force:
2628 lu.proc.LogWarning("", hint="If the message above refers to a"
2630 " you can retry the operation using '--force'.")
2631 raise errors.OpExecError("Disk consistency error")
2634 class LUDeactivateInstanceDisks(NoHooksLU):
2635 """Shutdown an instance's disks.
2638 _OP_REQP = ["instance_name"]
2641 def ExpandNames(self):
2642 self._ExpandAndLockInstance()
2643 self.needed_locks[locking.LEVEL_NODE] = []
2644 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2646 def DeclareLocks(self, level):
2647 if level == locking.LEVEL_NODE:
2648 self._LockInstancesNodes()
2650 def CheckPrereq(self):
2651 """Check prerequisites.
2653 This checks that the instance is in the cluster.
2656 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2657 assert self.instance is not None, \
2658 "Cannot retrieve locked instance %s" % self.op.instance_name
2660 def Exec(self, feedback_fn):
2661 """Deactivate the disks
2664 instance = self.instance
2665 _SafeShutdownInstanceDisks(self, instance)
2668 def _SafeShutdownInstanceDisks(lu, instance):
2669 """Shutdown block devices of an instance.
2671 This function checks if an instance is running, before calling
2672 _ShutdownInstanceDisks.
2675 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2676 [instance.hypervisor])
2677 ins_l = ins_l[instance.primary_node]
2678 if ins_l.failed or not isinstance(ins_l.data, list):
2679 raise errors.OpExecError("Can't contact node '%s'" %
2680 instance.primary_node)
2682 if instance.name in ins_l.data:
2683 raise errors.OpExecError("Instance is running, can't shutdown"
2686 _ShutdownInstanceDisks(lu, instance)
2689 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2690 """Shutdown block devices of an instance.
2692 This does the shutdown on all nodes of the instance.
2694 If the ignore_primary is false, errors on the primary node are
2699 for disk in instance.disks:
2700 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2701 lu.cfg.SetDiskID(top_disk, node)
2702 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2703 msg = result.RemoteFailMsg()
2705 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2706 disk.iv_name, node, msg)
2707 if not ignore_primary or node != instance.primary_node:
2712 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2713 """Checks if a node has enough free memory.
2715 This function check if a given node has the needed amount of free
2716 memory. In case the node has less memory or we cannot get the
2717 information from the node, this function raise an OpPrereqError
2720 @type lu: C{LogicalUnit}
2721 @param lu: a logical unit from which we get configuration data
2723 @param node: the node to check
2724 @type reason: C{str}
2725 @param reason: string to use in the error message
2726 @type requested: C{int}
2727 @param requested: the amount of memory in MiB to check for
2728 @type hypervisor_name: C{str}
2729 @param hypervisor_name: the hypervisor to ask for memory stats
2730 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2731 we cannot check the node
2734 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2735 nodeinfo[node].Raise()
2736 free_mem = nodeinfo[node].data.get('memory_free')
2737 if not isinstance(free_mem, int):
2738 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2739 " was '%s'" % (node, free_mem))
2740 if requested > free_mem:
2741 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2742 " needed %s MiB, available %s MiB" %
2743 (node, reason, requested, free_mem))
2746 class LUStartupInstance(LogicalUnit):
2747 """Starts an instance.
2750 HPATH = "instance-start"
2751 HTYPE = constants.HTYPE_INSTANCE
2752 _OP_REQP = ["instance_name", "force"]
2755 def ExpandNames(self):
2756 self._ExpandAndLockInstance()
2758 def BuildHooksEnv(self):
2761 This runs on master, primary and secondary nodes of the instance.
2765 "FORCE": self.op.force,
2767 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2768 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2771 def CheckPrereq(self):
2772 """Check prerequisites.
2774 This checks that the instance is in the cluster.
2777 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2778 assert self.instance is not None, \
2779 "Cannot retrieve locked instance %s" % self.op.instance_name
2782 self.beparams = getattr(self.op, "beparams", {})
2784 if not isinstance(self.beparams, dict):
2785 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2786 " dict" % (type(self.beparams), ))
2787 # fill the beparams dict
2788 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2789 self.op.beparams = self.beparams
2792 self.hvparams = getattr(self.op, "hvparams", {})
2794 if not isinstance(self.hvparams, dict):
2795 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2796 " dict" % (type(self.hvparams), ))
2798 # check hypervisor parameter syntax (locally)
2799 cluster = self.cfg.GetClusterInfo()
2800 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2801 filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2803 filled_hvp.update(self.hvparams)
2804 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2805 hv_type.CheckParameterSyntax(filled_hvp)
2806 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2807 self.op.hvparams = self.hvparams
2809 _CheckNodeOnline(self, instance.primary_node)
2811 bep = self.cfg.GetClusterInfo().FillBE(instance)
2812 # check bridges existance
2813 _CheckInstanceBridgesExist(self, instance)
2815 remote_info = self.rpc.call_instance_info(instance.primary_node,
2817 instance.hypervisor)
2819 if not remote_info.data:
2820 _CheckNodeFreeMemory(self, instance.primary_node,
2821 "starting instance %s" % instance.name,
2822 bep[constants.BE_MEMORY], instance.hypervisor)
2824 def Exec(self, feedback_fn):
2825 """Start the instance.
2828 instance = self.instance
2829 force = self.op.force
2831 self.cfg.MarkInstanceUp(instance.name)
2833 node_current = instance.primary_node
2835 _StartInstanceDisks(self, instance, force)
2837 result = self.rpc.call_instance_start(node_current, instance,
2838 self.hvparams, self.beparams)
2839 msg = result.RemoteFailMsg()
2841 _ShutdownInstanceDisks(self, instance)
2842 raise errors.OpExecError("Could not start instance: %s" % msg)
2845 class LURebootInstance(LogicalUnit):
2846 """Reboot an instance.
2849 HPATH = "instance-reboot"
2850 HTYPE = constants.HTYPE_INSTANCE
2851 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2854 def ExpandNames(self):
2855 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2856 constants.INSTANCE_REBOOT_HARD,
2857 constants.INSTANCE_REBOOT_FULL]:
2858 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2859 (constants.INSTANCE_REBOOT_SOFT,
2860 constants.INSTANCE_REBOOT_HARD,
2861 constants.INSTANCE_REBOOT_FULL))
2862 self._ExpandAndLockInstance()
2864 def BuildHooksEnv(self):
2867 This runs on master, primary and secondary nodes of the instance.
2871 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2872 "REBOOT_TYPE": self.op.reboot_type,
2874 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2875 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2878 def CheckPrereq(self):
2879 """Check prerequisites.
2881 This checks that the instance is in the cluster.
2884 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2885 assert self.instance is not None, \
2886 "Cannot retrieve locked instance %s" % self.op.instance_name
2888 _CheckNodeOnline(self, instance.primary_node)
2890 # check bridges existance
2891 _CheckInstanceBridgesExist(self, instance)
2893 def Exec(self, feedback_fn):
2894 """Reboot the instance.
2897 instance = self.instance
2898 ignore_secondaries = self.op.ignore_secondaries
2899 reboot_type = self.op.reboot_type
2901 node_current = instance.primary_node
2903 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2904 constants.INSTANCE_REBOOT_HARD]:
2905 for disk in instance.disks:
2906 self.cfg.SetDiskID(disk, node_current)
2907 result = self.rpc.call_instance_reboot(node_current, instance,
2909 msg = result.RemoteFailMsg()
2911 raise errors.OpExecError("Could not reboot instance: %s" % msg)
2913 result = self.rpc.call_instance_shutdown(node_current, instance)
2914 msg = result.RemoteFailMsg()
2916 raise errors.OpExecError("Could not shutdown instance for"
2917 " full reboot: %s" % msg)
2918 _ShutdownInstanceDisks(self, instance)
2919 _StartInstanceDisks(self, instance, ignore_secondaries)
2920 result = self.rpc.call_instance_start(node_current, instance, None, None)
2921 msg = result.RemoteFailMsg()
2923 _ShutdownInstanceDisks(self, instance)
2924 raise errors.OpExecError("Could not start instance for"
2925 " full reboot: %s" % msg)
2927 self.cfg.MarkInstanceUp(instance.name)
2930 class LUShutdownInstance(LogicalUnit):
2931 """Shutdown an instance.
2934 HPATH = "instance-stop"
2935 HTYPE = constants.HTYPE_INSTANCE
2936 _OP_REQP = ["instance_name"]
2939 def ExpandNames(self):
2940 self._ExpandAndLockInstance()
2942 def BuildHooksEnv(self):
2945 This runs on master, primary and secondary nodes of the instance.
2948 env = _BuildInstanceHookEnvByObject(self, self.instance)
2949 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2952 def CheckPrereq(self):
2953 """Check prerequisites.
2955 This checks that the instance is in the cluster.
2958 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2959 assert self.instance is not None, \
2960 "Cannot retrieve locked instance %s" % self.op.instance_name
2961 _CheckNodeOnline(self, self.instance.primary_node)
2963 def Exec(self, feedback_fn):
2964 """Shutdown the instance.
2967 instance = self.instance
2968 node_current = instance.primary_node
2969 self.cfg.MarkInstanceDown(instance.name)
2970 result = self.rpc.call_instance_shutdown(node_current, instance)
2971 msg = result.RemoteFailMsg()
2973 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2975 _ShutdownInstanceDisks(self, instance)
2978 class LUReinstallInstance(LogicalUnit):
2979 """Reinstall an instance.
2982 HPATH = "instance-reinstall"
2983 HTYPE = constants.HTYPE_INSTANCE
2984 _OP_REQP = ["instance_name"]
2987 def ExpandNames(self):
2988 self._ExpandAndLockInstance()
2990 def BuildHooksEnv(self):
2993 This runs on master, primary and secondary nodes of the instance.
2996 env = _BuildInstanceHookEnvByObject(self, self.instance)
2997 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3000 def CheckPrereq(self):
3001 """Check prerequisites.
3003 This checks that the instance is in the cluster and is not running.
3006 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3007 assert instance is not None, \
3008 "Cannot retrieve locked instance %s" % self.op.instance_name
3009 _CheckNodeOnline(self, instance.primary_node)
3011 if instance.disk_template == constants.DT_DISKLESS:
3012 raise errors.OpPrereqError("Instance '%s' has no disks" %
3013 self.op.instance_name)
3014 if instance.admin_up:
3015 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3016 self.op.instance_name)
3017 remote_info = self.rpc.call_instance_info(instance.primary_node,
3019 instance.hypervisor)
3021 if remote_info.data:
3022 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3023 (self.op.instance_name,
3024 instance.primary_node))
3026 self.op.os_type = getattr(self.op, "os_type", None)
3027 if self.op.os_type is not None:
3029 pnode = self.cfg.GetNodeInfo(
3030 self.cfg.ExpandNodeName(instance.primary_node))
3032 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3034 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3036 if not isinstance(result.data, objects.OS):
3037 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3038 " primary node" % self.op.os_type)
3040 self.instance = instance
3042 def Exec(self, feedback_fn):
3043 """Reinstall the instance.
3046 inst = self.instance
3048 if self.op.os_type is not None:
3049 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3050 inst.os = self.op.os_type
3051 self.cfg.Update(inst)
3053 _StartInstanceDisks(self, inst, None)
3055 feedback_fn("Running the instance OS create scripts...")
3056 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3057 msg = result.RemoteFailMsg()
3059 raise errors.OpExecError("Could not install OS for instance %s"
3061 (inst.name, inst.primary_node, msg))
3063 _ShutdownInstanceDisks(self, inst)
3066 class LURenameInstance(LogicalUnit):
3067 """Rename an instance.
3070 HPATH = "instance-rename"
3071 HTYPE = constants.HTYPE_INSTANCE
3072 _OP_REQP = ["instance_name", "new_name"]
3074 def BuildHooksEnv(self):
3077 This runs on master, primary and secondary nodes of the instance.
3080 env = _BuildInstanceHookEnvByObject(self, self.instance)
3081 env["INSTANCE_NEW_NAME"] = self.op.new_name
3082 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3085 def CheckPrereq(self):
3086 """Check prerequisites.
3088 This checks that the instance is in the cluster and is not running.
3091 instance = self.cfg.GetInstanceInfo(
3092 self.cfg.ExpandInstanceName(self.op.instance_name))
3093 if instance is None:
3094 raise errors.OpPrereqError("Instance '%s' not known" %
3095 self.op.instance_name)
3096 _CheckNodeOnline(self, instance.primary_node)
3098 if instance.admin_up:
3099 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3100 self.op.instance_name)
3101 remote_info = self.rpc.call_instance_info(instance.primary_node,
3103 instance.hypervisor)
3105 if remote_info.data:
3106 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3107 (self.op.instance_name,
3108 instance.primary_node))
3109 self.instance = instance
3111 # new name verification
3112 name_info = utils.HostInfo(self.op.new_name)
3114 self.op.new_name = new_name = name_info.name
3115 instance_list = self.cfg.GetInstanceList()
3116 if new_name in instance_list:
3117 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3120 if not getattr(self.op, "ignore_ip", False):
3121 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3122 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3123 (name_info.ip, new_name))
3126 def Exec(self, feedback_fn):
3127 """Reinstall the instance.
3130 inst = self.instance
3131 old_name = inst.name
3133 if inst.disk_template == constants.DT_FILE:
3134 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3136 self.cfg.RenameInstance(inst.name, self.op.new_name)
3137 # Change the instance lock. This is definitely safe while we hold the BGL
3138 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3139 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3141 # re-read the instance from the configuration after rename
3142 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3144 if inst.disk_template == constants.DT_FILE:
3145 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3146 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3147 old_file_storage_dir,
3148 new_file_storage_dir)
3151 raise errors.OpExecError("Could not connect to node '%s' to rename"
3152 " directory '%s' to '%s' (but the instance"
3153 " has been renamed in Ganeti)" % (
3154 inst.primary_node, old_file_storage_dir,
3155 new_file_storage_dir))
3157 if not result.data[0]:
3158 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3159 " (but the instance has been renamed in"
3160 " Ganeti)" % (old_file_storage_dir,
3161 new_file_storage_dir))
3163 _StartInstanceDisks(self, inst, None)
3165 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3167 msg = result.RemoteFailMsg()
3169 msg = ("Could not run OS rename script for instance %s on node %s"
3170 " (but the instance has been renamed in Ganeti): %s" %
3171 (inst.name, inst.primary_node, msg))
3172 self.proc.LogWarning(msg)
3174 _ShutdownInstanceDisks(self, inst)
3177 class LURemoveInstance(LogicalUnit):
3178 """Remove an instance.
3181 HPATH = "instance-remove"
3182 HTYPE = constants.HTYPE_INSTANCE
3183 _OP_REQP = ["instance_name", "ignore_failures"]
3186 def ExpandNames(self):
3187 self._ExpandAndLockInstance()
3188 self.needed_locks[locking.LEVEL_NODE] = []
3189 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3191 def DeclareLocks(self, level):
3192 if level == locking.LEVEL_NODE:
3193 self._LockInstancesNodes()
3195 def BuildHooksEnv(self):
3198 This runs on master, primary and secondary nodes of the instance.
3201 env = _BuildInstanceHookEnvByObject(self, self.instance)
3202 nl = [self.cfg.GetMasterNode()]
3205 def CheckPrereq(self):
3206 """Check prerequisites.
3208 This checks that the instance is in the cluster.
3211 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3212 assert self.instance is not None, \
3213 "Cannot retrieve locked instance %s" % self.op.instance_name
3215 def Exec(self, feedback_fn):
3216 """Remove the instance.
3219 instance = self.instance
3220 logging.info("Shutting down instance %s on node %s",
3221 instance.name, instance.primary_node)
3223 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3224 msg = result.RemoteFailMsg()
3226 if self.op.ignore_failures:
3227 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3229 raise errors.OpExecError("Could not shutdown instance %s on"
3231 (instance.name, instance.primary_node, msg))
3233 logging.info("Removing block devices for instance %s", instance.name)
3235 if not _RemoveDisks(self, instance):
3236 if self.op.ignore_failures:
3237 feedback_fn("Warning: can't remove instance's disks")
3239 raise errors.OpExecError("Can't remove instance's disks")
3241 logging.info("Removing instance %s out of cluster config", instance.name)
3243 self.cfg.RemoveInstance(instance.name)
3244 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3247 class LUQueryInstances(NoHooksLU):
3248 """Logical unit for querying instances.
3251 _OP_REQP = ["output_fields", "names", "use_locking"]
3253 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3255 "disk_template", "ip", "mac", "bridge",
3256 "sda_size", "sdb_size", "vcpus", "tags",
3257 "network_port", "beparams",
3258 r"(disk)\.(size)/([0-9]+)",
3259 r"(disk)\.(sizes)", "disk_usage",
3260 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3261 r"(nic)\.(macs|ips|bridges)",
3262 r"(disk|nic)\.(count)",
3263 "serial_no", "hypervisor", "hvparams",] +
3265 for name in constants.HVS_PARAMETERS] +
3267 for name in constants.BES_PARAMETERS])
3268 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3271 def ExpandNames(self):
3272 _CheckOutputFields(static=self._FIELDS_STATIC,
3273 dynamic=self._FIELDS_DYNAMIC,
3274 selected=self.op.output_fields)
3276 self.needed_locks = {}
3277 self.share_locks[locking.LEVEL_INSTANCE] = 1
3278 self.share_locks[locking.LEVEL_NODE] = 1
3281 self.wanted = _GetWantedInstances(self, self.op.names)
3283 self.wanted = locking.ALL_SET
3285 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3286 self.do_locking = self.do_node_query and self.op.use_locking
3288 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3289 self.needed_locks[locking.LEVEL_NODE] = []
3290 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3292 def DeclareLocks(self, level):
3293 if level == locking.LEVEL_NODE and self.do_locking:
3294 self._LockInstancesNodes()
3296 def CheckPrereq(self):
3297 """Check prerequisites.
3302 def Exec(self, feedback_fn):
3303 """Computes the list of nodes and their attributes.
3306 all_info = self.cfg.GetAllInstancesInfo()
3307 if self.wanted == locking.ALL_SET:
3308 # caller didn't specify instance names, so ordering is not important
3310 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3312 instance_names = all_info.keys()
3313 instance_names = utils.NiceSort(instance_names)
3315 # caller did specify names, so we must keep the ordering
3317 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3319 tgt_set = all_info.keys()
3320 missing = set(self.wanted).difference(tgt_set)
3322 raise errors.OpExecError("Some instances were removed before"
3323 " retrieving their data: %s" % missing)
3324 instance_names = self.wanted
3326 instance_list = [all_info[iname] for iname in instance_names]
3328 # begin data gathering
3330 nodes = frozenset([inst.primary_node for inst in instance_list])
3331 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3335 if self.do_node_query:
3337 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3339 result = node_data[name]
3341 # offline nodes will be in both lists
3342 off_nodes.append(name)
3344 bad_nodes.append(name)
3347 live_data.update(result.data)
3348 # else no instance is alive
3350 live_data = dict([(name, {}) for name in instance_names])
3352 # end data gathering
3357 for instance in instance_list:
3359 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3360 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3361 for field in self.op.output_fields:
3362 st_match = self._FIELDS_STATIC.Matches(field)
3367 elif field == "pnode":
3368 val = instance.primary_node
3369 elif field == "snodes":
3370 val = list(instance.secondary_nodes)
3371 elif field == "admin_state":
3372 val = instance.admin_up
3373 elif field == "oper_state":
3374 if instance.primary_node in bad_nodes:
3377 val = bool(live_data.get(instance.name))
3378 elif field == "status":
3379 if instance.primary_node in off_nodes:
3380 val = "ERROR_nodeoffline"
3381 elif instance.primary_node in bad_nodes:
3382 val = "ERROR_nodedown"
3384 running = bool(live_data.get(instance.name))
3386 if instance.admin_up:
3391 if instance.admin_up:
3395 elif field == "oper_ram":
3396 if instance.primary_node in bad_nodes:
3398 elif instance.name in live_data:
3399 val = live_data[instance.name].get("memory", "?")
3402 elif field == "disk_template":
3403 val = instance.disk_template
3405 val = instance.nics[0].ip
3406 elif field == "bridge":
3407 val = instance.nics[0].bridge
3408 elif field == "mac":
3409 val = instance.nics[0].mac
3410 elif field == "sda_size" or field == "sdb_size":
3411 idx = ord(field[2]) - ord('a')
3413 val = instance.FindDisk(idx).size
3414 except errors.OpPrereqError:
3416 elif field == "disk_usage": # total disk usage per node
3417 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3418 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3419 elif field == "tags":
3420 val = list(instance.GetTags())
3421 elif field == "serial_no":
3422 val = instance.serial_no
3423 elif field == "network_port":
3424 val = instance.network_port
3425 elif field == "hypervisor":
3426 val = instance.hypervisor
3427 elif field == "hvparams":
3429 elif (field.startswith(HVPREFIX) and
3430 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3431 val = i_hv.get(field[len(HVPREFIX):], None)
3432 elif field == "beparams":
3434 elif (field.startswith(BEPREFIX) and
3435 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3436 val = i_be.get(field[len(BEPREFIX):], None)
3437 elif st_match and st_match.groups():
3438 # matches a variable list
3439 st_groups = st_match.groups()
3440 if st_groups and st_groups[0] == "disk":
3441 if st_groups[1] == "count":
3442 val = len(instance.disks)
3443 elif st_groups[1] == "sizes":
3444 val = [disk.size for disk in instance.disks]
3445 elif st_groups[1] == "size":
3447 val = instance.FindDisk(st_groups[2]).size
3448 except errors.OpPrereqError:
3451 assert False, "Unhandled disk parameter"
3452 elif st_groups[0] == "nic":
3453 if st_groups[1] == "count":
3454 val = len(instance.nics)
3455 elif st_groups[1] == "macs":
3456 val = [nic.mac for nic in instance.nics]
3457 elif st_groups[1] == "ips":
3458 val = [nic.ip for nic in instance.nics]
3459 elif st_groups[1] == "bridges":
3460 val = [nic.bridge for nic in instance.nics]
3463 nic_idx = int(st_groups[2])
3464 if nic_idx >= len(instance.nics):
3467 if st_groups[1] == "mac":
3468 val = instance.nics[nic_idx].mac
3469 elif st_groups[1] == "ip":
3470 val = instance.nics[nic_idx].ip
3471 elif st_groups[1] == "bridge":
3472 val = instance.nics[nic_idx].bridge
3474 assert False, "Unhandled NIC parameter"
3476 assert False, "Unhandled variable parameter"
3478 raise errors.ParameterError(field)
3485 class LUFailoverInstance(LogicalUnit):
3486 """Failover an instance.
3489 HPATH = "instance-failover"
3490 HTYPE = constants.HTYPE_INSTANCE
3491 _OP_REQP = ["instance_name", "ignore_consistency"]
3494 def ExpandNames(self):
3495 self._ExpandAndLockInstance()
3496 self.needed_locks[locking.LEVEL_NODE] = []
3497 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3499 def DeclareLocks(self, level):
3500 if level == locking.LEVEL_NODE:
3501 self._LockInstancesNodes()
3503 def BuildHooksEnv(self):
3506 This runs on master, primary and secondary nodes of the instance.
3510 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3512 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3513 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3516 def CheckPrereq(self):
3517 """Check prerequisites.
3519 This checks that the instance is in the cluster.
3522 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3523 assert self.instance is not None, \
3524 "Cannot retrieve locked instance %s" % self.op.instance_name
3526 bep = self.cfg.GetClusterInfo().FillBE(instance)
3527 if instance.disk_template not in constants.DTS_NET_MIRROR:
3528 raise errors.OpPrereqError("Instance's disk layout is not"
3529 " network mirrored, cannot failover.")
3531 secondary_nodes = instance.secondary_nodes
3532 if not secondary_nodes:
3533 raise errors.ProgrammerError("no secondary node but using "
3534 "a mirrored disk template")
3536 target_node = secondary_nodes[0]
3537 _CheckNodeOnline(self, target_node)
3538 _CheckNodeNotDrained(self, target_node)
3540 if instance.admin_up:
3541 # check memory requirements on the secondary node
3542 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3543 instance.name, bep[constants.BE_MEMORY],
3544 instance.hypervisor)
3546 self.LogInfo("Not checking memory on the secondary node as"
3547 " instance will not be started")
3549 # check bridge existance
3550 brlist = [nic.bridge for nic in instance.nics]
3551 result = self.rpc.call_bridges_exist(target_node, brlist)
3554 raise errors.OpPrereqError("One or more target bridges %s does not"
3555 " exist on destination node '%s'" %
3556 (brlist, target_node))
3558 def Exec(self, feedback_fn):
3559 """Failover an instance.
3561 The failover is done by shutting it down on its present node and
3562 starting it on the secondary.
3565 instance = self.instance
3567 source_node = instance.primary_node
3568 target_node = instance.secondary_nodes[0]
3570 feedback_fn("* checking disk consistency between source and target")
3571 for dev in instance.disks:
3572 # for drbd, these are drbd over lvm
3573 if not _CheckDiskConsistency(self, dev, target_node, False):
3574 if instance.admin_up and not self.op.ignore_consistency:
3575 raise errors.OpExecError("Disk %s is degraded on target node,"
3576 " aborting failover." % dev.iv_name)
3578 feedback_fn("* shutting down instance on source node")
3579 logging.info("Shutting down instance %s on node %s",
3580 instance.name, source_node)
3582 result = self.rpc.call_instance_shutdown(source_node, instance)
3583 msg = result.RemoteFailMsg()
3585 if self.op.ignore_consistency:
3586 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3587 " Proceeding anyway. Please make sure node"
3588 " %s is down. Error details: %s",
3589 instance.name, source_node, source_node, msg)
3591 raise errors.OpExecError("Could not shutdown instance %s on"
3593 (instance.name, source_node, msg))
3595 feedback_fn("* deactivating the instance's disks on source node")
3596 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3597 raise errors.OpExecError("Can't shut down the instance's disks.")
3599 instance.primary_node = target_node
3600 # distribute new instance config to the other nodes
3601 self.cfg.Update(instance)
3603 # Only start the instance if it's marked as up
3604 if instance.admin_up:
3605 feedback_fn("* activating the instance's disks on target node")
3606 logging.info("Starting instance %s on node %s",
3607 instance.name, target_node)
3609 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3610 ignore_secondaries=True)
3612 _ShutdownInstanceDisks(self, instance)
3613 raise errors.OpExecError("Can't activate the instance's disks")
3615 feedback_fn("* starting the instance on the target node")
3616 result = self.rpc.call_instance_start(target_node, instance, None, None)
3617 msg = result.RemoteFailMsg()
3619 _ShutdownInstanceDisks(self, instance)
3620 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3621 (instance.name, target_node, msg))
3624 class LUMigrateInstance(LogicalUnit):
3625 """Migrate an instance.
3627 This is migration without shutting down, compared to the failover,
3628 which is done with shutdown.
3631 HPATH = "instance-migrate"
3632 HTYPE = constants.HTYPE_INSTANCE
3633 _OP_REQP = ["instance_name", "live", "cleanup"]
3637 def ExpandNames(self):
3638 self._ExpandAndLockInstance()
3639 self.needed_locks[locking.LEVEL_NODE] = []
3640 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3642 def DeclareLocks(self, level):
3643 if level == locking.LEVEL_NODE:
3644 self._LockInstancesNodes()
3646 def BuildHooksEnv(self):
3649 This runs on master, primary and secondary nodes of the instance.
3652 env = _BuildInstanceHookEnvByObject(self, self.instance)
3653 env["MIGRATE_LIVE"] = self.op.live
3654 env["MIGRATE_CLEANUP"] = self.op.cleanup
3655 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3658 def CheckPrereq(self):
3659 """Check prerequisites.
3661 This checks that the instance is in the cluster.
3664 instance = self.cfg.GetInstanceInfo(
3665 self.cfg.ExpandInstanceName(self.op.instance_name))
3666 if instance is None:
3667 raise errors.OpPrereqError("Instance '%s' not known" %
3668 self.op.instance_name)
3670 if instance.disk_template != constants.DT_DRBD8:
3671 raise errors.OpPrereqError("Instance's disk layout is not"
3672 " drbd8, cannot migrate.")
3674 secondary_nodes = instance.secondary_nodes
3675 if not secondary_nodes:
3676 raise errors.ConfigurationError("No secondary node but using"
3677 " drbd8 disk template")
3679 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3681 target_node = secondary_nodes[0]
3682 # check memory requirements on the secondary node
3683 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3684 instance.name, i_be[constants.BE_MEMORY],
3685 instance.hypervisor)
3687 # check bridge existance
3688 brlist = [nic.bridge for nic in instance.nics]
3689 result = self.rpc.call_bridges_exist(target_node, brlist)
3690 if result.failed or not result.data:
3691 raise errors.OpPrereqError("One or more target bridges %s does not"
3692 " exist on destination node '%s'" %
3693 (brlist, target_node))
3695 if not self.op.cleanup:
3696 _CheckNodeNotDrained(self, target_node)
3697 result = self.rpc.call_instance_migratable(instance.primary_node,
3699 msg = result.RemoteFailMsg()
3701 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3704 self.instance = instance
3706 def _WaitUntilSync(self):
3707 """Poll with custom rpc for disk sync.
3709 This uses our own step-based rpc call.
3712 self.feedback_fn("* wait until resync is done")
3716 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3718 self.instance.disks)
3720 for node, nres in result.items():
3721 msg = nres.RemoteFailMsg()
3723 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3725 node_done, node_percent = nres.payload
3726 all_done = all_done and node_done
3727 if node_percent is not None:
3728 min_percent = min(min_percent, node_percent)
3730 if min_percent < 100:
3731 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3734 def _EnsureSecondary(self, node):
3735 """Demote a node to secondary.
3738 self.feedback_fn("* switching node %s to secondary mode" % node)
3740 for dev in self.instance.disks:
3741 self.cfg.SetDiskID(dev, node)
3743 result = self.rpc.call_blockdev_close(node, self.instance.name,
3744 self.instance.disks)
3745 msg = result.RemoteFailMsg()
3747 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3748 " error %s" % (node, msg))
3750 def _GoStandalone(self):
3751 """Disconnect from the network.
3754 self.feedback_fn("* changing into standalone mode")
3755 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3756 self.instance.disks)
3757 for node, nres in result.items():
3758 msg = nres.RemoteFailMsg()
3760 raise errors.OpExecError("Cannot disconnect disks node %s,"
3761 " error %s" % (node, msg))
3763 def _GoReconnect(self, multimaster):
3764 """Reconnect to the network.
3770 msg = "single-master"
3771 self.feedback_fn("* changing disks into %s mode" % msg)
3772 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3773 self.instance.disks,
3774 self.instance.name, multimaster)
3775 for node, nres in result.items():
3776 msg = nres.RemoteFailMsg()
3778 raise errors.OpExecError("Cannot change disks config on node %s,"
3779 " error: %s" % (node, msg))
3781 def _ExecCleanup(self):
3782 """Try to cleanup after a failed migration.
3784 The cleanup is done by:
3785 - check that the instance is running only on one node
3786 (and update the config if needed)
3787 - change disks on its secondary node to secondary
3788 - wait until disks are fully synchronized
3789 - disconnect from the network
3790 - change disks into single-master mode
3791 - wait again until disks are fully synchronized
3794 instance = self.instance
3795 target_node = self.target_node
3796 source_node = self.source_node
3798 # check running on only one node
3799 self.feedback_fn("* checking where the instance actually runs"
3800 " (if this hangs, the hypervisor might be in"
3802 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3803 for node, result in ins_l.items():
3805 if not isinstance(result.data, list):
3806 raise errors.OpExecError("Can't contact node '%s'" % node)
3808 runningon_source = instance.name in ins_l[source_node].data
3809 runningon_target = instance.name in ins_l[target_node].data
3811 if runningon_source and runningon_target:
3812 raise errors.OpExecError("Instance seems to be running on two nodes,"
3813 " or the hypervisor is confused. You will have"
3814 " to ensure manually that it runs only on one"
3815 " and restart this operation.")
3817 if not (runningon_source or runningon_target):
3818 raise errors.OpExecError("Instance does not seem to be running at all."
3819 " In this case, it's safer to repair by"
3820 " running 'gnt-instance stop' to ensure disk"
3821 " shutdown, and then restarting it.")
3823 if runningon_target:
3824 # the migration has actually succeeded, we need to update the config
3825 self.feedback_fn("* instance running on secondary node (%s),"
3826 " updating config" % target_node)
3827 instance.primary_node = target_node
3828 self.cfg.Update(instance)
3829 demoted_node = source_node
3831 self.feedback_fn("* instance confirmed to be running on its"
3832 " primary node (%s)" % source_node)
3833 demoted_node = target_node
3835 self._EnsureSecondary(demoted_node)
3837 self._WaitUntilSync()
3838 except errors.OpExecError:
3839 # we ignore here errors, since if the device is standalone, it
3840 # won't be able to sync
3842 self._GoStandalone()
3843 self._GoReconnect(False)
3844 self._WaitUntilSync()
3846 self.feedback_fn("* done")
3848 def _RevertDiskStatus(self):
3849 """Try to revert the disk status after a failed migration.
3852 target_node = self.target_node
3854 self._EnsureSecondary(target_node)
3855 self._GoStandalone()
3856 self._GoReconnect(False)
3857 self._WaitUntilSync()
3858 except errors.OpExecError, err:
3859 self.LogWarning("Migration failed and I can't reconnect the"
3860 " drives: error '%s'\n"
3861 "Please look and recover the instance status" %
3864 def _AbortMigration(self):
3865 """Call the hypervisor code to abort a started migration.
3868 instance = self.instance
3869 target_node = self.target_node
3870 migration_info = self.migration_info
3872 abort_result = self.rpc.call_finalize_migration(target_node,
3876 abort_msg = abort_result.RemoteFailMsg()
3878 logging.error("Aborting migration failed on target node %s: %s" %
3879 (target_node, abort_msg))
3880 # Don't raise an exception here, as we stil have to try to revert the
3881 # disk status, even if this step failed.
3883 def _ExecMigration(self):
3884 """Migrate an instance.
3886 The migrate is done by:
3887 - change the disks into dual-master mode
3888 - wait until disks are fully synchronized again
3889 - migrate the instance
3890 - change disks on the new secondary node (the old primary) to secondary
3891 - wait until disks are fully synchronized
3892 - change disks into single-master mode
3895 instance = self.instance
3896 target_node = self.target_node
3897 source_node = self.source_node
3899 self.feedback_fn("* checking disk consistency between source and target")
3900 for dev in instance.disks:
3901 if not _CheckDiskConsistency(self, dev, target_node, False):
3902 raise errors.OpExecError("Disk %s is degraded or not fully"
3903 " synchronized on target node,"
3904 " aborting migrate." % dev.iv_name)
3906 # First get the migration information from the remote node
3907 result = self.rpc.call_migration_info(source_node, instance)
3908 msg = result.RemoteFailMsg()
3910 log_err = ("Failed fetching source migration information from %s: %s" %
3912 logging.error(log_err)
3913 raise errors.OpExecError(log_err)
3915 self.migration_info = migration_info = result.payload
3917 # Then switch the disks to master/master mode
3918 self._EnsureSecondary(target_node)
3919 self._GoStandalone()
3920 self._GoReconnect(True)
3921 self._WaitUntilSync()
3923 self.feedback_fn("* preparing %s to accept the instance" % target_node)
3924 result = self.rpc.call_accept_instance(target_node,
3927 self.nodes_ip[target_node])
3929 msg = result.RemoteFailMsg()
3931 logging.error("Instance pre-migration failed, trying to revert"
3932 " disk status: %s", msg)
3933 self._AbortMigration()
3934 self._RevertDiskStatus()
3935 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3936 (instance.name, msg))
3938 self.feedback_fn("* migrating instance to %s" % target_node)
3940 result = self.rpc.call_instance_migrate(source_node, instance,
3941 self.nodes_ip[target_node],
3943 msg = result.RemoteFailMsg()
3945 logging.error("Instance migration failed, trying to revert"
3946 " disk status: %s", msg)
3947 self._AbortMigration()
3948 self._RevertDiskStatus()
3949 raise errors.OpExecError("Could not migrate instance %s: %s" %
3950 (instance.name, msg))
3953 instance.primary_node = target_node
3954 # distribute new instance config to the other nodes
3955 self.cfg.Update(instance)
3957 result = self.rpc.call_finalize_migration(target_node,
3961 msg = result.RemoteFailMsg()
3963 logging.error("Instance migration succeeded, but finalization failed:"
3965 raise errors.OpExecError("Could not finalize instance migration: %s" %
3968 self._EnsureSecondary(source_node)
3969 self._WaitUntilSync()
3970 self._GoStandalone()
3971 self._GoReconnect(False)
3972 self._WaitUntilSync()
3974 self.feedback_fn("* done")
3976 def Exec(self, feedback_fn):
3977 """Perform the migration.
3980 self.feedback_fn = feedback_fn
3982 self.source_node = self.instance.primary_node
3983 self.target_node = self.instance.secondary_nodes[0]
3984 self.all_nodes = [self.source_node, self.target_node]
3986 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3987 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3990 return self._ExecCleanup()
3992 return self._ExecMigration()
3995 def _CreateBlockDev(lu, node, instance, device, force_create,
3997 """Create a tree of block devices on a given node.
3999 If this device type has to be created on secondaries, create it and
4002 If not, just recurse to children keeping the same 'force' value.
4004 @param lu: the lu on whose behalf we execute
4005 @param node: the node on which to create the device
4006 @type instance: L{objects.Instance}
4007 @param instance: the instance which owns the device
4008 @type device: L{objects.Disk}
4009 @param device: the device to create
4010 @type force_create: boolean
4011 @param force_create: whether to force creation of this device; this
4012 will be change to True whenever we find a device which has
4013 CreateOnSecondary() attribute
4014 @param info: the extra 'metadata' we should attach to the device
4015 (this will be represented as a LVM tag)
4016 @type force_open: boolean
4017 @param force_open: this parameter will be passes to the
4018 L{backend.BlockdevCreate} function where it specifies
4019 whether we run on primary or not, and it affects both
4020 the child assembly and the device own Open() execution
4023 if device.CreateOnSecondary():
4027 for child in device.children:
4028 _CreateBlockDev(lu, node, instance, child, force_create,
4031 if not force_create:
4034 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4037 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4038 """Create a single block device on a given node.
4040 This will not recurse over children of the device, so they must be
4043 @param lu: the lu on whose behalf we execute
4044 @param node: the node on which to create the device
4045 @type instance: L{objects.Instance}
4046 @param instance: the instance which owns the device
4047 @type device: L{objects.Disk}
4048 @param device: the device to create
4049 @param info: the extra 'metadata' we should attach to the device
4050 (this will be represented as a LVM tag)
4051 @type force_open: boolean
4052 @param force_open: this parameter will be passes to the
4053 L{backend.BlockdevCreate} function where it specifies
4054 whether we run on primary or not, and it affects both
4055 the child assembly and the device own Open() execution
4058 lu.cfg.SetDiskID(device, node)
4059 result = lu.rpc.call_blockdev_create(node, device, device.size,
4060 instance.name, force_open, info)
4061 msg = result.RemoteFailMsg()
4063 raise errors.OpExecError("Can't create block device %s on"
4064 " node %s for instance %s: %s" %
4065 (device, node, instance.name, msg))
4066 if device.physical_id is None:
4067 device.physical_id = result.payload
4070 def _GenerateUniqueNames(lu, exts):
4071 """Generate a suitable LV name.
4073 This will generate a logical volume name for the given instance.
4078 new_id = lu.cfg.GenerateUniqueID()
4079 results.append("%s%s" % (new_id, val))
4083 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4085 """Generate a drbd8 device complete with its children.
4088 port = lu.cfg.AllocatePort()
4089 vgname = lu.cfg.GetVGName()
4090 shared_secret = lu.cfg.GenerateDRBDSecret()
4091 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4092 logical_id=(vgname, names[0]))
4093 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4094 logical_id=(vgname, names[1]))
4095 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4096 logical_id=(primary, secondary, port,
4099 children=[dev_data, dev_meta],
4104 def _GenerateDiskTemplate(lu, template_name,
4105 instance_name, primary_node,
4106 secondary_nodes, disk_info,
4107 file_storage_dir, file_driver,
4109 """Generate the entire disk layout for a given template type.
4112 #TODO: compute space requirements
4114 vgname = lu.cfg.GetVGName()
4115 disk_count = len(disk_info)
4117 if template_name == constants.DT_DISKLESS:
4119 elif template_name == constants.DT_PLAIN:
4120 if len(secondary_nodes) != 0:
4121 raise errors.ProgrammerError("Wrong template configuration")
4123 names = _GenerateUniqueNames(lu, [".disk%d" % i
4124 for i in range(disk_count)])
4125 for idx, disk in enumerate(disk_info):
4126 disk_index = idx + base_index
4127 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4128 logical_id=(vgname, names[idx]),
4129 iv_name="disk/%d" % disk_index,
4131 disks.append(disk_dev)
4132 elif template_name == constants.DT_DRBD8:
4133 if len(secondary_nodes) != 1:
4134 raise errors.ProgrammerError("Wrong template configuration")
4135 remote_node = secondary_nodes[0]
4136 minors = lu.cfg.AllocateDRBDMinor(
4137 [primary_node, remote_node] * len(disk_info), instance_name)
4140 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4141 for i in range(disk_count)]):
4142 names.append(lv_prefix + "_data")
4143 names.append(lv_prefix + "_meta")
4144 for idx, disk in enumerate(disk_info):
4145 disk_index = idx + base_index
4146 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4147 disk["size"], names[idx*2:idx*2+2],
4148 "disk/%d" % disk_index,
4149 minors[idx*2], minors[idx*2+1])
4150 disk_dev.mode = disk["mode"]
4151 disks.append(disk_dev)
4152 elif template_name == constants.DT_FILE:
4153 if len(secondary_nodes) != 0:
4154 raise errors.ProgrammerError("Wrong template configuration")
4156 for idx, disk in enumerate(disk_info):
4157 disk_index = idx + base_index
4158 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4159 iv_name="disk/%d" % disk_index,
4160 logical_id=(file_driver,
4161 "%s/disk%d" % (file_storage_dir,
4164 disks.append(disk_dev)
4166 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4170 def _GetInstanceInfoText(instance):
4171 """Compute that text that should be added to the disk's metadata.
4174 return "originstname+%s" % instance.name
4177 def _CreateDisks(lu, instance):
4178 """Create all disks for an instance.
4180 This abstracts away some work from AddInstance.
4182 @type lu: L{LogicalUnit}
4183 @param lu: the logical unit on whose behalf we execute
4184 @type instance: L{objects.Instance}
4185 @param instance: the instance whose disks we should create
4187 @return: the success of the creation
4190 info = _GetInstanceInfoText(instance)
4191 pnode = instance.primary_node
4193 if instance.disk_template == constants.DT_FILE:
4194 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4195 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4197 if result.failed or not result.data:
4198 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4200 if not result.data[0]:
4201 raise errors.OpExecError("Failed to create directory '%s'" %
4204 # Note: this needs to be kept in sync with adding of disks in
4205 # LUSetInstanceParams
4206 for device in instance.disks:
4207 logging.info("Creating volume %s for instance %s",
4208 device.iv_name, instance.name)
4210 for node in instance.all_nodes:
4211 f_create = node == pnode
4212 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4215 def _RemoveDisks(lu, instance):
4216 """Remove all disks for an instance.
4218 This abstracts away some work from `AddInstance()` and
4219 `RemoveInstance()`. Note that in case some of the devices couldn't
4220 be removed, the removal will continue with the other ones (compare
4221 with `_CreateDisks()`).
4223 @type lu: L{LogicalUnit}
4224 @param lu: the logical unit on whose behalf we execute
4225 @type instance: L{objects.Instance}
4226 @param instance: the instance whose disks we should remove
4228 @return: the success of the removal
4231 logging.info("Removing block devices for instance %s", instance.name)
4234 for device in instance.disks:
4235 for node, disk in device.ComputeNodeTree(instance.primary_node):
4236 lu.cfg.SetDiskID(disk, node)
4237 msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4239 lu.LogWarning("Could not remove block device %s on node %s,"
4240 " continuing anyway: %s", device.iv_name, node, msg)
4243 if instance.disk_template == constants.DT_FILE:
4244 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4245 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4247 if result.failed or not result.data:
4248 logging.error("Could not remove directory '%s'", file_storage_dir)
4254 def _ComputeDiskSize(disk_template, disks):
4255 """Compute disk size requirements in the volume group
4258 # Required free disk space as a function of disk and swap space
4260 constants.DT_DISKLESS: None,
4261 constants.DT_PLAIN: sum(d["size"] for d in disks),
4262 # 128 MB are added for drbd metadata for each disk
4263 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4264 constants.DT_FILE: None,
4267 if disk_template not in req_size_dict:
4268 raise errors.ProgrammerError("Disk template '%s' size requirement"
4269 " is unknown" % disk_template)
4271 return req_size_dict[disk_template]
4274 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4275 """Hypervisor parameter validation.
4277 This function abstract the hypervisor parameter validation to be
4278 used in both instance create and instance modify.
4280 @type lu: L{LogicalUnit}
4281 @param lu: the logical unit for which we check
4282 @type nodenames: list
4283 @param nodenames: the list of nodes on which we should check
4284 @type hvname: string
4285 @param hvname: the name of the hypervisor we should use
4286 @type hvparams: dict
4287 @param hvparams: the parameters which we need to check
4288 @raise errors.OpPrereqError: if the parameters are not valid
4291 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4294 for node in nodenames:
4298 msg = info.RemoteFailMsg()
4300 raise errors.OpPrereqError("Hypervisor parameter validation"
4301 " failed on node %s: %s" % (node, msg))
4304 class LUCreateInstance(LogicalUnit):
4305 """Create an instance.
4308 HPATH = "instance-add"
4309 HTYPE = constants.HTYPE_INSTANCE
4310 _OP_REQP = ["instance_name", "disks", "disk_template",
4312 "wait_for_sync", "ip_check", "nics",
4313 "hvparams", "beparams"]
4316 def _ExpandNode(self, node):
4317 """Expands and checks one node name.
4320 node_full = self.cfg.ExpandNodeName(node)
4321 if node_full is None:
4322 raise errors.OpPrereqError("Unknown node %s" % node)
4325 def ExpandNames(self):
4326 """ExpandNames for CreateInstance.
4328 Figure out the right locks for instance creation.
4331 self.needed_locks = {}
4333 # set optional parameters to none if they don't exist
4334 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4335 if not hasattr(self.op, attr):
4336 setattr(self.op, attr, None)
4338 # cheap checks, mostly valid constants given
4340 # verify creation mode
4341 if self.op.mode not in (constants.INSTANCE_CREATE,
4342 constants.INSTANCE_IMPORT):
4343 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4346 # disk template and mirror node verification
4347 if self.op.disk_template not in constants.DISK_TEMPLATES:
4348 raise errors.OpPrereqError("Invalid disk template name")
4350 if self.op.hypervisor is None:
4351 self.op.hypervisor = self.cfg.GetHypervisorType()
4353 cluster = self.cfg.GetClusterInfo()
4354 enabled_hvs = cluster.enabled_hypervisors
4355 if self.op.hypervisor not in enabled_hvs:
4356 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4357 " cluster (%s)" % (self.op.hypervisor,
4358 ",".join(enabled_hvs)))
4360 # check hypervisor parameter syntax (locally)
4361 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4362 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4364 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4365 hv_type.CheckParameterSyntax(filled_hvp)
4366 self.hv_full = filled_hvp
4368 # fill and remember the beparams dict
4369 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4370 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4373 #### instance parameters check
4375 # instance name verification
4376 hostname1 = utils.HostInfo(self.op.instance_name)
4377 self.op.instance_name = instance_name = hostname1.name
4379 # this is just a preventive check, but someone might still add this
4380 # instance in the meantime, and creation will fail at lock-add time
4381 if instance_name in self.cfg.GetInstanceList():
4382 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4385 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4389 for nic in self.op.nics:
4390 # ip validity checks
4391 ip = nic.get("ip", None)
4392 if ip is None or ip.lower() == "none":
4394 elif ip.lower() == constants.VALUE_AUTO:
4395 nic_ip = hostname1.ip
4397 if not utils.IsValidIP(ip):
4398 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4399 " like a valid IP" % ip)
4402 # MAC address verification
4403 mac = nic.get("mac", constants.VALUE_AUTO)
4404 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4405 if not utils.IsValidMac(mac.lower()):
4406 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4408 # bridge verification
4409 bridge = nic.get("bridge", None)
4411 bridge = self.cfg.GetDefBridge()
4412 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4414 # disk checks/pre-build
4416 for disk in self.op.disks:
4417 mode = disk.get("mode", constants.DISK_RDWR)
4418 if mode not in constants.DISK_ACCESS_SET:
4419 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4421 size = disk.get("size", None)
4423 raise errors.OpPrereqError("Missing disk size")
4427 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4428 self.disks.append({"size": size, "mode": mode})
4430 # used in CheckPrereq for ip ping check
4431 self.check_ip = hostname1.ip
4433 # file storage checks
4434 if (self.op.file_driver and
4435 not self.op.file_driver in constants.FILE_DRIVER):
4436 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4437 self.op.file_driver)
4439 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4440 raise errors.OpPrereqError("File storage directory path not absolute")
4442 ### Node/iallocator related checks
4443 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4444 raise errors.OpPrereqError("One and only one of iallocator and primary"
4445 " node must be given")
4447 if self.op.iallocator:
4448 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4450 self.op.pnode = self._ExpandNode(self.op.pnode)
4451 nodelist = [self.op.pnode]
4452 if self.op.snode is not None:
4453 self.op.snode = self._ExpandNode(self.op.snode)
4454 nodelist.append(self.op.snode)
4455 self.needed_locks[locking.LEVEL_NODE] = nodelist
4457 # in case of import lock the source node too
4458 if self.op.mode == constants.INSTANCE_IMPORT:
4459 src_node = getattr(self.op, "src_node", None)
4460 src_path = getattr(self.op, "src_path", None)
4462 if src_path is None:
4463 self.op.src_path = src_path = self.op.instance_name
4465 if src_node is None:
4466 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4467 self.op.src_node = None
4468 if os.path.isabs(src_path):
4469 raise errors.OpPrereqError("Importing an instance from an absolute"
4470 " path requires a source node option.")
4472 self.op.src_node = src_node = self._ExpandNode(src_node)
4473 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4474 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4475 if not os.path.isabs(src_path):
4476 self.op.src_path = src_path = \
4477 os.path.join(constants.EXPORT_DIR, src_path)
4479 else: # INSTANCE_CREATE
4480 if getattr(self.op, "os_type", None) is None:
4481 raise errors.OpPrereqError("No guest OS specified")
4483 def _RunAllocator(self):
4484 """Run the allocator based on input opcode.
4487 nics = [n.ToDict() for n in self.nics]
4488 ial = IAllocator(self,
4489 mode=constants.IALLOCATOR_MODE_ALLOC,
4490 name=self.op.instance_name,
4491 disk_template=self.op.disk_template,
4494 vcpus=self.be_full[constants.BE_VCPUS],
4495 mem_size=self.be_full[constants.BE_MEMORY],
4498 hypervisor=self.op.hypervisor,
4501 ial.Run(self.op.iallocator)
4504 raise errors.OpPrereqError("Can't compute nodes using"
4505 " iallocator '%s': %s" % (self.op.iallocator,
4507 if len(ial.nodes) != ial.required_nodes:
4508 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4509 " of nodes (%s), required %s" %
4510 (self.op.iallocator, len(ial.nodes),
4511 ial.required_nodes))
4512 self.op.pnode = ial.nodes[0]
4513 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4514 self.op.instance_name, self.op.iallocator,
4515 ", ".join(ial.nodes))
4516 if ial.required_nodes == 2:
4517 self.op.snode = ial.nodes[1]
4519 def BuildHooksEnv(self):
4522 This runs on master, primary and secondary nodes of the instance.
4526 "ADD_MODE": self.op.mode,
4528 if self.op.mode == constants.INSTANCE_IMPORT:
4529 env["SRC_NODE"] = self.op.src_node
4530 env["SRC_PATH"] = self.op.src_path
4531 env["SRC_IMAGES"] = self.src_images
4533 env.update(_BuildInstanceHookEnv(
4534 name=self.op.instance_name,
4535 primary_node=self.op.pnode,
4536 secondary_nodes=self.secondaries,
4537 status=self.op.start,
4538 os_type=self.op.os_type,
4539 memory=self.be_full[constants.BE_MEMORY],
4540 vcpus=self.be_full[constants.BE_VCPUS],
4541 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4542 disk_template=self.op.disk_template,
4543 disks=[(d["size"], d["mode"]) for d in self.disks],
4546 hypervisor=self.op.hypervisor,
4549 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4554 def CheckPrereq(self):
4555 """Check prerequisites.
4558 if (not self.cfg.GetVGName() and
4559 self.op.disk_template not in constants.DTS_NOT_LVM):
4560 raise errors.OpPrereqError("Cluster does not support lvm-based"
4563 if self.op.mode == constants.INSTANCE_IMPORT:
4564 src_node = self.op.src_node
4565 src_path = self.op.src_path
4567 if src_node is None:
4568 exp_list = self.rpc.call_export_list(
4569 self.acquired_locks[locking.LEVEL_NODE])
4571 for node in exp_list:
4572 if not exp_list[node].failed and src_path in exp_list[node].data:
4574 self.op.src_node = src_node = node
4575 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4579 raise errors.OpPrereqError("No export found for relative path %s" %
4582 _CheckNodeOnline(self, src_node)
4583 result = self.rpc.call_export_info(src_node, src_path)
4586 raise errors.OpPrereqError("No export found in dir %s" % src_path)
4588 export_info = result.data
4589 if not export_info.has_section(constants.INISECT_EXP):
4590 raise errors.ProgrammerError("Corrupted export config")
4592 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4593 if (int(ei_version) != constants.EXPORT_VERSION):
4594 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4595 (ei_version, constants.EXPORT_VERSION))
4597 # Check that the new instance doesn't have less disks than the export
4598 instance_disks = len(self.disks)
4599 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4600 if instance_disks < export_disks:
4601 raise errors.OpPrereqError("Not enough disks to import."
4602 " (instance: %d, export: %d)" %
4603 (instance_disks, export_disks))
4605 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4607 for idx in range(export_disks):
4608 option = 'disk%d_dump' % idx
4609 if export_info.has_option(constants.INISECT_INS, option):
4610 # FIXME: are the old os-es, disk sizes, etc. useful?
4611 export_name = export_info.get(constants.INISECT_INS, option)
4612 image = os.path.join(src_path, export_name)
4613 disk_images.append(image)
4615 disk_images.append(False)
4617 self.src_images = disk_images
4619 old_name = export_info.get(constants.INISECT_INS, 'name')
4620 # FIXME: int() here could throw a ValueError on broken exports
4621 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4622 if self.op.instance_name == old_name:
4623 for idx, nic in enumerate(self.nics):
4624 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4625 nic_mac_ini = 'nic%d_mac' % idx
4626 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4628 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4629 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4630 if self.op.start and not self.op.ip_check:
4631 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4632 " adding an instance in start mode")
4634 if self.op.ip_check:
4635 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4636 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4637 (self.check_ip, self.op.instance_name))
4639 #### mac address generation
4640 # By generating here the mac address both the allocator and the hooks get
4641 # the real final mac address rather than the 'auto' or 'generate' value.
4642 # There is a race condition between the generation and the instance object
4643 # creation, which means that we know the mac is valid now, but we're not
4644 # sure it will be when we actually add the instance. If things go bad
4645 # adding the instance will abort because of a duplicate mac, and the
4646 # creation job will fail.
4647 for nic in self.nics:
4648 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4649 nic.mac = self.cfg.GenerateMAC()
4653 if self.op.iallocator is not None:
4654 self._RunAllocator()
4656 #### node related checks
4658 # check primary node
4659 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4660 assert self.pnode is not None, \
4661 "Cannot retrieve locked node %s" % self.op.pnode
4663 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4666 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4669 self.secondaries = []
4671 # mirror node verification
4672 if self.op.disk_template in constants.DTS_NET_MIRROR:
4673 if self.op.snode is None:
4674 raise errors.OpPrereqError("The networked disk templates need"
4676 if self.op.snode == pnode.name:
4677 raise errors.OpPrereqError("The secondary node cannot be"
4678 " the primary node.")
4679 _CheckNodeOnline(self, self.op.snode)
4680 _CheckNodeNotDrained(self, self.op.snode)
4681 self.secondaries.append(self.op.snode)
4683 nodenames = [pnode.name] + self.secondaries
4685 req_size = _ComputeDiskSize(self.op.disk_template,
4688 # Check lv size requirements
4689 if req_size is not None:
4690 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4692 for node in nodenames:
4693 info = nodeinfo[node]
4697 raise errors.OpPrereqError("Cannot get current information"
4698 " from node '%s'" % node)
4699 vg_free = info.get('vg_free', None)
4700 if not isinstance(vg_free, int):
4701 raise errors.OpPrereqError("Can't compute free disk space on"
4703 if req_size > info['vg_free']:
4704 raise errors.OpPrereqError("Not enough disk space on target node %s."
4705 " %d MB available, %d MB required" %
4706 (node, info['vg_free'], req_size))
4708 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4711 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4713 if not isinstance(result.data, objects.OS):
4714 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4715 " primary node" % self.op.os_type)
4717 # bridge check on primary node
4718 bridges = [n.bridge for n in self.nics]
4719 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4722 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4723 " exist on destination node '%s'" %
4724 (",".join(bridges), pnode.name))
4726 # memory check on primary node
4728 _CheckNodeFreeMemory(self, self.pnode.name,
4729 "creating instance %s" % self.op.instance_name,
4730 self.be_full[constants.BE_MEMORY],
4733 def Exec(self, feedback_fn):
4734 """Create and add the instance to the cluster.
4737 instance = self.op.instance_name
4738 pnode_name = self.pnode.name
4740 ht_kind = self.op.hypervisor
4741 if ht_kind in constants.HTS_REQ_PORT:
4742 network_port = self.cfg.AllocatePort()
4746 ##if self.op.vnc_bind_address is None:
4747 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4749 # this is needed because os.path.join does not accept None arguments
4750 if self.op.file_storage_dir is None:
4751 string_file_storage_dir = ""
4753 string_file_storage_dir = self.op.file_storage_dir
4755 # build the full file storage dir path
4756 file_storage_dir = os.path.normpath(os.path.join(
4757 self.cfg.GetFileStorageDir(),
4758 string_file_storage_dir, instance))
4761 disks = _GenerateDiskTemplate(self,
4762 self.op.disk_template,
4763 instance, pnode_name,
4767 self.op.file_driver,
4770 iobj = objects.Instance(name=instance, os=self.op.os_type,
4771 primary_node=pnode_name,
4772 nics=self.nics, disks=disks,
4773 disk_template=self.op.disk_template,
4775 network_port=network_port,
4776 beparams=self.op.beparams,
4777 hvparams=self.op.hvparams,
4778 hypervisor=self.op.hypervisor,
4781 feedback_fn("* creating instance disks...")
4783 _CreateDisks(self, iobj)
4784 except errors.OpExecError:
4785 self.LogWarning("Device creation failed, reverting...")
4787 _RemoveDisks(self, iobj)
4789 self.cfg.ReleaseDRBDMinors(instance)
4792 feedback_fn("adding instance %s to cluster config" % instance)
4794 self.cfg.AddInstance(iobj)
4795 # Declare that we don't want to remove the instance lock anymore, as we've
4796 # added the instance to the config
4797 del self.remove_locks[locking.LEVEL_INSTANCE]
4798 # Unlock all the nodes
4799 if self.op.mode == constants.INSTANCE_IMPORT:
4800 nodes_keep = [self.op.src_node]
4801 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4802 if node != self.op.src_node]
4803 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4804 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4806 self.context.glm.release(locking.LEVEL_NODE)
4807 del self.acquired_locks[locking.LEVEL_NODE]
4809 if self.op.wait_for_sync:
4810 disk_abort = not _WaitForSync(self, iobj)
4811 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4812 # make sure the disks are not degraded (still sync-ing is ok)
4814 feedback_fn("* checking mirrors status")
4815 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4820 _RemoveDisks(self, iobj)
4821 self.cfg.RemoveInstance(iobj.name)
4822 # Make sure the instance lock gets removed
4823 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4824 raise errors.OpExecError("There are some degraded disks for"
4827 feedback_fn("creating os for instance %s on node %s" %
4828 (instance, pnode_name))
4830 if iobj.disk_template != constants.DT_DISKLESS:
4831 if self.op.mode == constants.INSTANCE_CREATE:
4832 feedback_fn("* running the instance OS create scripts...")
4833 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4834 msg = result.RemoteFailMsg()
4836 raise errors.OpExecError("Could not add os for instance %s"
4838 (instance, pnode_name, msg))
4840 elif self.op.mode == constants.INSTANCE_IMPORT:
4841 feedback_fn("* running the instance OS import scripts...")
4842 src_node = self.op.src_node
4843 src_images = self.src_images
4844 cluster_name = self.cfg.GetClusterName()
4845 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4846 src_node, src_images,
4848 import_result.Raise()
4849 for idx, result in enumerate(import_result.data):
4851 self.LogWarning("Could not import the image %s for instance"
4852 " %s, disk %d, on node %s" %
4853 (src_images[idx], instance, idx, pnode_name))
4855 # also checked in the prereq part
4856 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4860 iobj.admin_up = True
4861 self.cfg.Update(iobj)
4862 logging.info("Starting instance %s on node %s", instance, pnode_name)
4863 feedback_fn("* starting instance...")
4864 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4865 msg = result.RemoteFailMsg()
4867 raise errors.OpExecError("Could not start instance: %s" % msg)
4870 class LUConnectConsole(NoHooksLU):
4871 """Connect to an instance's console.
4873 This is somewhat special in that it returns the command line that
4874 you need to run on the master node in order to connect to the
4878 _OP_REQP = ["instance_name"]
4881 def ExpandNames(self):
4882 self._ExpandAndLockInstance()
4884 def CheckPrereq(self):
4885 """Check prerequisites.
4887 This checks that the instance is in the cluster.
4890 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4891 assert self.instance is not None, \
4892 "Cannot retrieve locked instance %s" % self.op.instance_name
4893 _CheckNodeOnline(self, self.instance.primary_node)
4895 def Exec(self, feedback_fn):
4896 """Connect to the console of an instance
4899 instance = self.instance
4900 node = instance.primary_node
4902 node_insts = self.rpc.call_instance_list([node],
4903 [instance.hypervisor])[node]
4906 if instance.name not in node_insts.data:
4907 raise errors.OpExecError("Instance %s is not running." % instance.name)
4909 logging.debug("Connecting to console of %s on %s", instance.name, node)
4911 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4912 cluster = self.cfg.GetClusterInfo()
4913 # beparams and hvparams are passed separately, to avoid editing the
4914 # instance and then saving the defaults in the instance itself.
4915 hvparams = cluster.FillHV(instance)
4916 beparams = cluster.FillBE(instance)
4917 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4920 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4923 class LUReplaceDisks(LogicalUnit):
4924 """Replace the disks of an instance.
4927 HPATH = "mirrors-replace"
4928 HTYPE = constants.HTYPE_INSTANCE
4929 _OP_REQP = ["instance_name", "mode", "disks"]
4932 def CheckArguments(self):
4933 if not hasattr(self.op, "remote_node"):
4934 self.op.remote_node = None
4935 if not hasattr(self.op, "iallocator"):
4936 self.op.iallocator = None
4938 # check for valid parameter combination
4939 cnt = [self.op.remote_node, self.op.iallocator].count(None)
4940 if self.op.mode == constants.REPLACE_DISK_CHG:
4942 raise errors.OpPrereqError("When changing the secondary either an"
4943 " iallocator script must be used or the"
4946 raise errors.OpPrereqError("Give either the iallocator or the new"
4947 " secondary, not both")
4948 else: # not replacing the secondary
4950 raise errors.OpPrereqError("The iallocator and new node options can"
4951 " be used only when changing the"
4954 def ExpandNames(self):
4955 self._ExpandAndLockInstance()
4957 if self.op.iallocator is not None:
4958 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4959 elif self.op.remote_node is not None:
4960 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4961 if remote_node is None:
4962 raise errors.OpPrereqError("Node '%s' not known" %
4963 self.op.remote_node)
4964 self.op.remote_node = remote_node
4965 # Warning: do not remove the locking of the new secondary here
4966 # unless DRBD8.AddChildren is changed to work in parallel;
4967 # currently it doesn't since parallel invocations of
4968 # FindUnusedMinor will conflict
4969 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4970 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4972 self.needed_locks[locking.LEVEL_NODE] = []
4973 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4975 def DeclareLocks(self, level):
4976 # If we're not already locking all nodes in the set we have to declare the
4977 # instance's primary/secondary nodes.
4978 if (level == locking.LEVEL_NODE and
4979 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4980 self._LockInstancesNodes()
4982 def _RunAllocator(self):
4983 """Compute a new secondary node using an IAllocator.
4986 ial = IAllocator(self,
4987 mode=constants.IALLOCATOR_MODE_RELOC,
4988 name=self.op.instance_name,
4989 relocate_from=[self.sec_node])
4991 ial.Run(self.op.iallocator)
4994 raise errors.OpPrereqError("Can't compute nodes using"
4995 " iallocator '%s': %s" % (self.op.iallocator,
4997 if len(ial.nodes) != ial.required_nodes:
4998 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4999 " of nodes (%s), required %s" %
5000 (len(ial.nodes), ial.required_nodes))
5001 self.op.remote_node = ial.nodes[0]
5002 self.LogInfo("Selected new secondary for the instance: %s",
5003 self.op.remote_node)
5005 def BuildHooksEnv(self):
5008 This runs on the master, the primary and all the secondaries.
5012 "MODE": self.op.mode,
5013 "NEW_SECONDARY": self.op.remote_node,
5014 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5016 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5018 self.cfg.GetMasterNode(),
5019 self.instance.primary_node,
5021 if self.op.remote_node is not None:
5022 nl.append(self.op.remote_node)
5025 def CheckPrereq(self):
5026 """Check prerequisites.
5028 This checks that the instance is in the cluster.
5031 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5032 assert instance is not None, \
5033 "Cannot retrieve locked instance %s" % self.op.instance_name
5034 self.instance = instance
5036 if instance.disk_template != constants.DT_DRBD8:
5037 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5040 if len(instance.secondary_nodes) != 1:
5041 raise errors.OpPrereqError("The instance has a strange layout,"
5042 " expected one secondary but found %d" %
5043 len(instance.secondary_nodes))
5045 self.sec_node = instance.secondary_nodes[0]
5047 if self.op.iallocator is not None:
5048 self._RunAllocator()
5050 remote_node = self.op.remote_node
5051 if remote_node is not None:
5052 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5053 assert self.remote_node_info is not None, \
5054 "Cannot retrieve locked node %s" % remote_node
5056 self.remote_node_info = None
5057 if remote_node == instance.primary_node:
5058 raise errors.OpPrereqError("The specified node is the primary node of"
5060 elif remote_node == self.sec_node:
5061 raise errors.OpPrereqError("The specified node is already the"
5062 " secondary node of the instance.")
5064 if self.op.mode == constants.REPLACE_DISK_PRI:
5065 n1 = self.tgt_node = instance.primary_node
5066 n2 = self.oth_node = self.sec_node
5067 elif self.op.mode == constants.REPLACE_DISK_SEC:
5068 n1 = self.tgt_node = self.sec_node
5069 n2 = self.oth_node = instance.primary_node
5070 elif self.op.mode == constants.REPLACE_DISK_CHG:
5071 n1 = self.new_node = remote_node
5072 n2 = self.oth_node = instance.primary_node
5073 self.tgt_node = self.sec_node
5074 _CheckNodeNotDrained(self, remote_node)
5076 raise errors.ProgrammerError("Unhandled disk replace mode")
5078 _CheckNodeOnline(self, n1)
5079 _CheckNodeOnline(self, n2)
5081 if not self.op.disks:
5082 self.op.disks = range(len(instance.disks))
5084 for disk_idx in self.op.disks:
5085 instance.FindDisk(disk_idx)
5087 def _ExecD8DiskOnly(self, feedback_fn):
5088 """Replace a disk on the primary or secondary for dbrd8.
5090 The algorithm for replace is quite complicated:
5092 1. for each disk to be replaced:
5094 1. create new LVs on the target node with unique names
5095 1. detach old LVs from the drbd device
5096 1. rename old LVs to name_replaced.<time_t>
5097 1. rename new LVs to old LVs
5098 1. attach the new LVs (with the old names now) to the drbd device
5100 1. wait for sync across all devices
5102 1. for each modified disk:
5104 1. remove old LVs (which have the name name_replaces.<time_t>)
5106 Failures are not very well handled.
5110 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5111 instance = self.instance
5113 vgname = self.cfg.GetVGName()
5116 tgt_node = self.tgt_node
5117 oth_node = self.oth_node
5119 # Step: check device activation
5120 self.proc.LogStep(1, steps_total, "check device existence")
5121 info("checking volume groups")
5122 my_vg = cfg.GetVGName()
5123 results = self.rpc.call_vg_list([oth_node, tgt_node])
5125 raise errors.OpExecError("Can't list volume groups on the nodes")
5126 for node in oth_node, tgt_node:
5128 if res.failed or not res.data or my_vg not in res.data:
5129 raise errors.OpExecError("Volume group '%s' not found on %s" %
5131 for idx, dev in enumerate(instance.disks):
5132 if idx not in self.op.disks:
5134 for node in tgt_node, oth_node:
5135 info("checking disk/%d on %s" % (idx, node))
5136 cfg.SetDiskID(dev, node)
5137 result = self.rpc.call_blockdev_find(node, dev)
5138 msg = result.RemoteFailMsg()
5139 if not msg and not result.payload:
5140 msg = "disk not found"
5142 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5145 # Step: check other node consistency
5146 self.proc.LogStep(2, steps_total, "check peer consistency")
5147 for idx, dev in enumerate(instance.disks):
5148 if idx not in self.op.disks:
5150 info("checking disk/%d consistency on %s" % (idx, oth_node))
5151 if not _CheckDiskConsistency(self, dev, oth_node,
5152 oth_node==instance.primary_node):
5153 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5154 " to replace disks on this node (%s)" %
5155 (oth_node, tgt_node))
5157 # Step: create new storage
5158 self.proc.LogStep(3, steps_total, "allocate new storage")
5159 for idx, dev in enumerate(instance.disks):
5160 if idx not in self.op.disks:
5163 cfg.SetDiskID(dev, tgt_node)
5164 lv_names = [".disk%d_%s" % (idx, suf)
5165 for suf in ["data", "meta"]]
5166 names = _GenerateUniqueNames(self, lv_names)
5167 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5168 logical_id=(vgname, names[0]))
5169 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5170 logical_id=(vgname, names[1]))
5171 new_lvs = [lv_data, lv_meta]
5172 old_lvs = dev.children
5173 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5174 info("creating new local storage on %s for %s" %
5175 (tgt_node, dev.iv_name))
5176 # we pass force_create=True to force the LVM creation
5177 for new_lv in new_lvs:
5178 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5179 _GetInstanceInfoText(instance), False)
5181 # Step: for each lv, detach+rename*2+attach
5182 self.proc.LogStep(4, steps_total, "change drbd configuration")
5183 for dev, old_lvs, new_lvs in iv_names.itervalues():
5184 info("detaching %s drbd from local storage" % dev.iv_name)
5185 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5188 raise errors.OpExecError("Can't detach drbd from local storage on node"
5189 " %s for device %s" % (tgt_node, dev.iv_name))
5191 #cfg.Update(instance)
5193 # ok, we created the new LVs, so now we know we have the needed
5194 # storage; as such, we proceed on the target node to rename
5195 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5196 # using the assumption that logical_id == physical_id (which in
5197 # turn is the unique_id on that node)
5199 # FIXME(iustin): use a better name for the replaced LVs
5200 temp_suffix = int(time.time())
5201 ren_fn = lambda d, suff: (d.physical_id[0],
5202 d.physical_id[1] + "_replaced-%s" % suff)
5203 # build the rename list based on what LVs exist on the node
5205 for to_ren in old_lvs:
5206 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5207 if not result.RemoteFailMsg() and result.payload:
5209 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5211 info("renaming the old LVs on the target node")
5212 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5215 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5216 # now we rename the new LVs to the old LVs
5217 info("renaming the new LVs on the target node")
5218 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5219 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5222 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5224 for old, new in zip(old_lvs, new_lvs):
5225 new.logical_id = old.logical_id
5226 cfg.SetDiskID(new, tgt_node)
5228 for disk in old_lvs:
5229 disk.logical_id = ren_fn(disk, temp_suffix)
5230 cfg.SetDiskID(disk, tgt_node)
5232 # now that the new lvs have the old name, we can add them to the device
5233 info("adding new mirror component on %s" % tgt_node)
5234 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5235 if result.failed or not result.data:
5236 for new_lv in new_lvs:
5237 msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5239 warning("Can't rollback device %s: %s", dev, msg,
5240 hint="cleanup manually the unused logical volumes")
5241 raise errors.OpExecError("Can't add local storage to drbd")
5243 dev.children = new_lvs
5244 cfg.Update(instance)
5246 # Step: wait for sync
5248 # this can fail as the old devices are degraded and _WaitForSync
5249 # does a combined result over all disks, so we don't check its
5251 self.proc.LogStep(5, steps_total, "sync devices")
5252 _WaitForSync(self, instance, unlock=True)
5254 # so check manually all the devices
5255 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5256 cfg.SetDiskID(dev, instance.primary_node)
5257 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5258 msg = result.RemoteFailMsg()
5259 if not msg and not result.payload:
5260 msg = "disk not found"
5262 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5264 if result.payload[5]:
5265 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5267 # Step: remove old storage
5268 self.proc.LogStep(6, steps_total, "removing old storage")
5269 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5270 info("remove logical volumes for %s" % name)
5272 cfg.SetDiskID(lv, tgt_node)
5273 msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5275 warning("Can't remove old LV: %s" % msg,
5276 hint="manually remove unused LVs")
5279 def _ExecD8Secondary(self, feedback_fn):
5280 """Replace the secondary node for drbd8.
5282 The algorithm for replace is quite complicated:
5283 - for all disks of the instance:
5284 - create new LVs on the new node with same names
5285 - shutdown the drbd device on the old secondary
5286 - disconnect the drbd network on the primary
5287 - create the drbd device on the new secondary
5288 - network attach the drbd on the primary, using an artifice:
5289 the drbd code for Attach() will connect to the network if it
5290 finds a device which is connected to the good local disks but
5292 - wait for sync across all devices
5293 - remove all disks from the old secondary
5295 Failures are not very well handled.
5299 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5300 instance = self.instance
5304 old_node = self.tgt_node
5305 new_node = self.new_node
5306 pri_node = instance.primary_node
5308 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5309 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5310 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5313 # Step: check device activation
5314 self.proc.LogStep(1, steps_total, "check device existence")
5315 info("checking volume groups")
5316 my_vg = cfg.GetVGName()
5317 results = self.rpc.call_vg_list([pri_node, new_node])
5318 for node in pri_node, new_node:
5320 if res.failed or not res.data or my_vg not in res.data:
5321 raise errors.OpExecError("Volume group '%s' not found on %s" %
5323 for idx, dev in enumerate(instance.disks):
5324 if idx not in self.op.disks:
5326 info("checking disk/%d on %s" % (idx, pri_node))
5327 cfg.SetDiskID(dev, pri_node)
5328 result = self.rpc.call_blockdev_find(pri_node, dev)
5329 msg = result.RemoteFailMsg()
5330 if not msg and not result.payload:
5331 msg = "disk not found"
5333 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5334 (idx, pri_node, msg))
5336 # Step: check other node consistency
5337 self.proc.LogStep(2, steps_total, "check peer consistency")
5338 for idx, dev in enumerate(instance.disks):
5339 if idx not in self.op.disks:
5341 info("checking disk/%d consistency on %s" % (idx, pri_node))
5342 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5343 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5344 " unsafe to replace the secondary" %
5347 # Step: create new storage
5348 self.proc.LogStep(3, steps_total, "allocate new storage")
5349 for idx, dev in enumerate(instance.disks):
5350 info("adding new local storage on %s for disk/%d" %
5352 # we pass force_create=True to force LVM creation
5353 for new_lv in dev.children:
5354 _CreateBlockDev(self, new_node, instance, new_lv, True,
5355 _GetInstanceInfoText(instance), False)
5357 # Step 4: dbrd minors and drbd setups changes
5358 # after this, we must manually remove the drbd minors on both the
5359 # error and the success paths
5360 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5362 logging.debug("Allocated minors %s" % (minors,))
5363 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5364 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5366 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5367 # create new devices on new_node; note that we create two IDs:
5368 # one without port, so the drbd will be activated without
5369 # networking information on the new node at this stage, and one
5370 # with network, for the latter activation in step 4
5371 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5372 if pri_node == o_node1:
5377 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5378 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5380 iv_names[idx] = (dev, dev.children, new_net_id)
5381 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5383 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5384 logical_id=new_alone_id,
5385 children=dev.children,
5388 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5389 _GetInstanceInfoText(instance), False)
5390 except errors.GenericError:
5391 self.cfg.ReleaseDRBDMinors(instance.name)
5394 for idx, dev in enumerate(instance.disks):
5395 # we have new devices, shutdown the drbd on the old secondary
5396 info("shutting down drbd for disk/%d on old node" % idx)
5397 cfg.SetDiskID(dev, old_node)
5398 msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5400 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5402 hint="Please cleanup this device manually as soon as possible")
5404 info("detaching primary drbds from the network (=> standalone)")
5405 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5406 instance.disks)[pri_node]
5408 msg = result.RemoteFailMsg()
5410 # detaches didn't succeed (unlikely)
5411 self.cfg.ReleaseDRBDMinors(instance.name)
5412 raise errors.OpExecError("Can't detach the disks from the network on"
5413 " old node: %s" % (msg,))
5415 # if we managed to detach at least one, we update all the disks of
5416 # the instance to point to the new secondary
5417 info("updating instance configuration")
5418 for dev, _, new_logical_id in iv_names.itervalues():
5419 dev.logical_id = new_logical_id
5420 cfg.SetDiskID(dev, pri_node)
5421 cfg.Update(instance)
5423 # and now perform the drbd attach
5424 info("attaching primary drbds to new secondary (standalone => connected)")
5425 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5426 instance.disks, instance.name,
5428 for to_node, to_result in result.items():
5429 msg = to_result.RemoteFailMsg()
5431 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5432 hint="please do a gnt-instance info to see the"
5435 # this can fail as the old devices are degraded and _WaitForSync
5436 # does a combined result over all disks, so we don't check its
5438 self.proc.LogStep(5, steps_total, "sync devices")
5439 _WaitForSync(self, instance, unlock=True)
5441 # so check manually all the devices
5442 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5443 cfg.SetDiskID(dev, pri_node)
5444 result = self.rpc.call_blockdev_find(pri_node, dev)
5445 msg = result.RemoteFailMsg()
5446 if not msg and not result.payload:
5447 msg = "disk not found"
5449 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5451 if result.payload[5]:
5452 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5454 self.proc.LogStep(6, steps_total, "removing old storage")
5455 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5456 info("remove logical volumes for disk/%d" % idx)
5458 cfg.SetDiskID(lv, old_node)
5459 msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5461 warning("Can't remove LV on old secondary: %s", msg,
5462 hint="Cleanup stale volumes by hand")
5464 def Exec(self, feedback_fn):
5465 """Execute disk replacement.
5467 This dispatches the disk replacement to the appropriate handler.
5470 instance = self.instance
5472 # Activate the instance disks if we're replacing them on a down instance
5473 if not instance.admin_up:
5474 _StartInstanceDisks(self, instance, True)
5476 if self.op.mode == constants.REPLACE_DISK_CHG:
5477 fn = self._ExecD8Secondary
5479 fn = self._ExecD8DiskOnly
5481 ret = fn(feedback_fn)
5483 # Deactivate the instance disks if we're replacing them on a down instance
5484 if not instance.admin_up:
5485 _SafeShutdownInstanceDisks(self, instance)
5490 class LUGrowDisk(LogicalUnit):
5491 """Grow a disk of an instance.
5495 HTYPE = constants.HTYPE_INSTANCE
5496 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5499 def ExpandNames(self):
5500 self._ExpandAndLockInstance()
5501 self.needed_locks[locking.LEVEL_NODE] = []
5502 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5504 def DeclareLocks(self, level):
5505 if level == locking.LEVEL_NODE:
5506 self._LockInstancesNodes()
5508 def BuildHooksEnv(self):
5511 This runs on the master, the primary and all the secondaries.
5515 "DISK": self.op.disk,
5516 "AMOUNT": self.op.amount,
5518 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5520 self.cfg.GetMasterNode(),
5521 self.instance.primary_node,
5525 def CheckPrereq(self):
5526 """Check prerequisites.
5528 This checks that the instance is in the cluster.
5531 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5532 assert instance is not None, \
5533 "Cannot retrieve locked instance %s" % self.op.instance_name
5534 nodenames = list(instance.all_nodes)
5535 for node in nodenames:
5536 _CheckNodeOnline(self, node)
5539 self.instance = instance
5541 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5542 raise errors.OpPrereqError("Instance's disk layout does not support"
5545 self.disk = instance.FindDisk(self.op.disk)
5547 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5548 instance.hypervisor)
5549 for node in nodenames:
5550 info = nodeinfo[node]
5551 if info.failed or not info.data:
5552 raise errors.OpPrereqError("Cannot get current information"
5553 " from node '%s'" % node)
5554 vg_free = info.data.get('vg_free', None)
5555 if not isinstance(vg_free, int):
5556 raise errors.OpPrereqError("Can't compute free disk space on"
5558 if self.op.amount > vg_free:
5559 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5560 " %d MiB available, %d MiB required" %
5561 (node, vg_free, self.op.amount))
5563 def Exec(self, feedback_fn):
5564 """Execute disk grow.
5567 instance = self.instance
5569 for node in instance.all_nodes:
5570 self.cfg.SetDiskID(disk, node)
5571 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5572 msg = result.RemoteFailMsg()
5574 raise errors.OpExecError("Grow request failed to node %s: %s" %
5576 disk.RecordGrow(self.op.amount)
5577 self.cfg.Update(instance)
5578 if self.op.wait_for_sync:
5579 disk_abort = not _WaitForSync(self, instance)
5581 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5582 " status.\nPlease check the instance.")
5585 class LUQueryInstanceData(NoHooksLU):
5586 """Query runtime instance data.
5589 _OP_REQP = ["instances", "static"]
5592 def ExpandNames(self):
5593 self.needed_locks = {}
5594 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5596 if not isinstance(self.op.instances, list):
5597 raise errors.OpPrereqError("Invalid argument type 'instances'")
5599 if self.op.instances:
5600 self.wanted_names = []
5601 for name in self.op.instances:
5602 full_name = self.cfg.ExpandInstanceName(name)
5603 if full_name is None:
5604 raise errors.OpPrereqError("Instance '%s' not known" % name)
5605 self.wanted_names.append(full_name)
5606 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5608 self.wanted_names = None
5609 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5611 self.needed_locks[locking.LEVEL_NODE] = []
5612 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5614 def DeclareLocks(self, level):
5615 if level == locking.LEVEL_NODE:
5616 self._LockInstancesNodes()
5618 def CheckPrereq(self):
5619 """Check prerequisites.
5621 This only checks the optional instance list against the existing names.
5624 if self.wanted_names is None:
5625 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5627 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5628 in self.wanted_names]
5631 def _ComputeDiskStatus(self, instance, snode, dev):
5632 """Compute block device status.
5635 static = self.op.static
5637 self.cfg.SetDiskID(dev, instance.primary_node)
5638 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5639 if dev_pstatus.offline:
5642 msg = dev_pstatus.RemoteFailMsg()
5644 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5645 (instance.name, msg))
5646 dev_pstatus = dev_pstatus.payload
5650 if dev.dev_type in constants.LDS_DRBD:
5651 # we change the snode then (otherwise we use the one passed in)
5652 if dev.logical_id[0] == instance.primary_node:
5653 snode = dev.logical_id[1]
5655 snode = dev.logical_id[0]
5657 if snode and not static:
5658 self.cfg.SetDiskID(dev, snode)
5659 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5660 if dev_sstatus.offline:
5663 msg = dev_sstatus.RemoteFailMsg()
5665 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5666 (instance.name, msg))
5667 dev_sstatus = dev_sstatus.payload
5672 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5673 for child in dev.children]
5678 "iv_name": dev.iv_name,
5679 "dev_type": dev.dev_type,
5680 "logical_id": dev.logical_id,
5681 "physical_id": dev.physical_id,
5682 "pstatus": dev_pstatus,
5683 "sstatus": dev_sstatus,
5684 "children": dev_children,
5690 def Exec(self, feedback_fn):
5691 """Gather and return data"""
5694 cluster = self.cfg.GetClusterInfo()
5696 for instance in self.wanted_instances:
5697 if not self.op.static:
5698 remote_info = self.rpc.call_instance_info(instance.primary_node,
5700 instance.hypervisor)
5702 remote_info = remote_info.data
5703 if remote_info and "state" in remote_info:
5706 remote_state = "down"
5709 if instance.admin_up:
5712 config_state = "down"
5714 disks = [self._ComputeDiskStatus(instance, None, device)
5715 for device in instance.disks]
5718 "name": instance.name,
5719 "config_state": config_state,
5720 "run_state": remote_state,
5721 "pnode": instance.primary_node,
5722 "snodes": instance.secondary_nodes,
5724 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5726 "hypervisor": instance.hypervisor,
5727 "network_port": instance.network_port,
5728 "hv_instance": instance.hvparams,
5729 "hv_actual": cluster.FillHV(instance),
5730 "be_instance": instance.beparams,
5731 "be_actual": cluster.FillBE(instance),
5734 result[instance.name] = idict
5739 class LUSetInstanceParams(LogicalUnit):
5740 """Modifies an instances's parameters.
5743 HPATH = "instance-modify"
5744 HTYPE = constants.HTYPE_INSTANCE
5745 _OP_REQP = ["instance_name"]
5748 def CheckArguments(self):
5749 if not hasattr(self.op, 'nics'):
5751 if not hasattr(self.op, 'disks'):
5753 if not hasattr(self.op, 'beparams'):
5754 self.op.beparams = {}
5755 if not hasattr(self.op, 'hvparams'):
5756 self.op.hvparams = {}
5757 self.op.force = getattr(self.op, "force", False)
5758 if not (self.op.nics or self.op.disks or
5759 self.op.hvparams or self.op.beparams):
5760 raise errors.OpPrereqError("No changes submitted")
5764 for disk_op, disk_dict in self.op.disks:
5765 if disk_op == constants.DDM_REMOVE:
5768 elif disk_op == constants.DDM_ADD:
5771 if not isinstance(disk_op, int):
5772 raise errors.OpPrereqError("Invalid disk index")
5773 if disk_op == constants.DDM_ADD:
5774 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5775 if mode not in constants.DISK_ACCESS_SET:
5776 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5777 size = disk_dict.get('size', None)
5779 raise errors.OpPrereqError("Required disk parameter size missing")
5782 except ValueError, err:
5783 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5785 disk_dict['size'] = size
5787 # modification of disk
5788 if 'size' in disk_dict:
5789 raise errors.OpPrereqError("Disk size change not possible, use"
5792 if disk_addremove > 1:
5793 raise errors.OpPrereqError("Only one disk add or remove operation"
5794 " supported at a time")
5798 for nic_op, nic_dict in self.op.nics:
5799 if nic_op == constants.DDM_REMOVE:
5802 elif nic_op == constants.DDM_ADD:
5805 if not isinstance(nic_op, int):
5806 raise errors.OpPrereqError("Invalid nic index")
5808 # nic_dict should be a dict
5809 nic_ip = nic_dict.get('ip', None)
5810 if nic_ip is not None:
5811 if nic_ip.lower() == constants.VALUE_NONE:
5812 nic_dict['ip'] = None
5814 if not utils.IsValidIP(nic_ip):
5815 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5817 if nic_op == constants.DDM_ADD:
5818 nic_bridge = nic_dict.get('bridge', None)
5819 if nic_bridge is None:
5820 nic_dict['bridge'] = self.cfg.GetDefBridge()
5821 nic_mac = nic_dict.get('mac', None)
5823 nic_dict['mac'] = constants.VALUE_AUTO
5825 if 'mac' in nic_dict:
5826 nic_mac = nic_dict['mac']
5827 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5828 if not utils.IsValidMac(nic_mac):
5829 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5830 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5831 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5832 " modifying an existing nic")
5834 if nic_addremove > 1:
5835 raise errors.OpPrereqError("Only one NIC add or remove operation"
5836 " supported at a time")
5838 def ExpandNames(self):
5839 self._ExpandAndLockInstance()
5840 self.needed_locks[locking.LEVEL_NODE] = []
5841 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5843 def DeclareLocks(self, level):
5844 if level == locking.LEVEL_NODE:
5845 self._LockInstancesNodes()
5847 def BuildHooksEnv(self):
5850 This runs on the master, primary and secondaries.
5854 if constants.BE_MEMORY in self.be_new:
5855 args['memory'] = self.be_new[constants.BE_MEMORY]
5856 if constants.BE_VCPUS in self.be_new:
5857 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5858 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5859 # information at all.
5862 nic_override = dict(self.op.nics)
5863 for idx, nic in enumerate(self.instance.nics):
5864 if idx in nic_override:
5865 this_nic_override = nic_override[idx]
5867 this_nic_override = {}
5868 if 'ip' in this_nic_override:
5869 ip = this_nic_override['ip']
5872 if 'bridge' in this_nic_override:
5873 bridge = this_nic_override['bridge']
5876 if 'mac' in this_nic_override:
5877 mac = this_nic_override['mac']
5880 args['nics'].append((ip, bridge, mac))
5881 if constants.DDM_ADD in nic_override:
5882 ip = nic_override[constants.DDM_ADD].get('ip', None)
5883 bridge = nic_override[constants.DDM_ADD]['bridge']
5884 mac = nic_override[constants.DDM_ADD]['mac']
5885 args['nics'].append((ip, bridge, mac))
5886 elif constants.DDM_REMOVE in nic_override:
5887 del args['nics'][-1]
5889 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5890 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5893 def CheckPrereq(self):
5894 """Check prerequisites.
5896 This only checks the instance list against the existing names.
5899 force = self.force = self.op.force
5901 # checking the new params on the primary/secondary nodes
5903 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5904 assert self.instance is not None, \
5905 "Cannot retrieve locked instance %s" % self.op.instance_name
5906 pnode = instance.primary_node
5907 nodelist = list(instance.all_nodes)
5909 # hvparams processing
5910 if self.op.hvparams:
5911 i_hvdict = copy.deepcopy(instance.hvparams)
5912 for key, val in self.op.hvparams.iteritems():
5913 if val == constants.VALUE_DEFAULT:
5920 cluster = self.cfg.GetClusterInfo()
5921 utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5922 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5925 hypervisor.GetHypervisor(
5926 instance.hypervisor).CheckParameterSyntax(hv_new)
5927 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5928 self.hv_new = hv_new # the new actual values
5929 self.hv_inst = i_hvdict # the new dict (without defaults)
5931 self.hv_new = self.hv_inst = {}
5933 # beparams processing
5934 if self.op.beparams:
5935 i_bedict = copy.deepcopy(instance.beparams)
5936 for key, val in self.op.beparams.iteritems():
5937 if val == constants.VALUE_DEFAULT:
5944 cluster = self.cfg.GetClusterInfo()
5945 utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5946 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5948 self.be_new = be_new # the new actual values
5949 self.be_inst = i_bedict # the new dict (without defaults)
5951 self.be_new = self.be_inst = {}
5955 if constants.BE_MEMORY in self.op.beparams and not self.force:
5956 mem_check_list = [pnode]
5957 if be_new[constants.BE_AUTO_BALANCE]:
5958 # either we changed auto_balance to yes or it was from before
5959 mem_check_list.extend(instance.secondary_nodes)
5960 instance_info = self.rpc.call_instance_info(pnode, instance.name,
5961 instance.hypervisor)
5962 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5963 instance.hypervisor)
5964 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5965 # Assume the primary node is unreachable and go ahead
5966 self.warn.append("Can't get info from primary node %s" % pnode)
5968 if not instance_info.failed and instance_info.data:
5969 current_mem = int(instance_info.data['memory'])
5971 # Assume instance not running
5972 # (there is a slight race condition here, but it's not very probable,
5973 # and we have no other way to check)
5975 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5976 nodeinfo[pnode].data['memory_free'])
5978 raise errors.OpPrereqError("This change will prevent the instance"
5979 " from starting, due to %d MB of memory"
5980 " missing on its primary node" % miss_mem)
5982 if be_new[constants.BE_AUTO_BALANCE]:
5983 for node, nres in nodeinfo.iteritems():
5984 if node not in instance.secondary_nodes:
5986 if nres.failed or not isinstance(nres.data, dict):
5987 self.warn.append("Can't get info from secondary node %s" % node)
5988 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5989 self.warn.append("Not enough memory to failover instance to"
5990 " secondary node %s" % node)
5993 for nic_op, nic_dict in self.op.nics:
5994 if nic_op == constants.DDM_REMOVE:
5995 if not instance.nics:
5996 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5998 if nic_op != constants.DDM_ADD:
6000 if nic_op < 0 or nic_op >= len(instance.nics):
6001 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6003 (nic_op, len(instance.nics)))
6004 if 'bridge' in nic_dict:
6005 nic_bridge = nic_dict['bridge']
6006 if nic_bridge is None:
6007 raise errors.OpPrereqError('Cannot set the nic bridge to None')
6008 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6009 msg = ("Bridge '%s' doesn't exist on one of"
6010 " the instance nodes" % nic_bridge)
6012 self.warn.append(msg)
6014 raise errors.OpPrereqError(msg)
6015 if 'mac' in nic_dict:
6016 nic_mac = nic_dict['mac']
6018 raise errors.OpPrereqError('Cannot set the nic mac to None')
6019 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6020 # otherwise generate the mac
6021 nic_dict['mac'] = self.cfg.GenerateMAC()
6023 # or validate/reserve the current one
6024 if self.cfg.IsMacInUse(nic_mac):
6025 raise errors.OpPrereqError("MAC address %s already in use"
6026 " in cluster" % nic_mac)
6029 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6030 raise errors.OpPrereqError("Disk operations not supported for"
6031 " diskless instances")
6032 for disk_op, disk_dict in self.op.disks:
6033 if disk_op == constants.DDM_REMOVE:
6034 if len(instance.disks) == 1:
6035 raise errors.OpPrereqError("Cannot remove the last disk of"
6037 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6038 ins_l = ins_l[pnode]
6039 if ins_l.failed or not isinstance(ins_l.data, list):
6040 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6041 if instance.name in ins_l.data:
6042 raise errors.OpPrereqError("Instance is running, can't remove"
6045 if (disk_op == constants.DDM_ADD and
6046 len(instance.nics) >= constants.MAX_DISKS):
6047 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6048 " add more" % constants.MAX_DISKS)
6049 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6051 if disk_op < 0 or disk_op >= len(instance.disks):
6052 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6054 (disk_op, len(instance.disks)))
6058 def Exec(self, feedback_fn):
6059 """Modifies an instance.
6061 All parameters take effect only at the next restart of the instance.
6064 # Process here the warnings from CheckPrereq, as we don't have a
6065 # feedback_fn there.
6066 for warn in self.warn:
6067 feedback_fn("WARNING: %s" % warn)
6070 instance = self.instance
6072 for disk_op, disk_dict in self.op.disks:
6073 if disk_op == constants.DDM_REMOVE:
6074 # remove the last disk
6075 device = instance.disks.pop()
6076 device_idx = len(instance.disks)
6077 for node, disk in device.ComputeNodeTree(instance.primary_node):
6078 self.cfg.SetDiskID(disk, node)
6079 msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6081 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6082 " continuing anyway", device_idx, node, msg)
6083 result.append(("disk/%d" % device_idx, "remove"))
6084 elif disk_op == constants.DDM_ADD:
6086 if instance.disk_template == constants.DT_FILE:
6087 file_driver, file_path = instance.disks[0].logical_id
6088 file_path = os.path.dirname(file_path)
6090 file_driver = file_path = None
6091 disk_idx_base = len(instance.disks)
6092 new_disk = _GenerateDiskTemplate(self,
6093 instance.disk_template,
6094 instance.name, instance.primary_node,
6095 instance.secondary_nodes,
6100 instance.disks.append(new_disk)
6101 info = _GetInstanceInfoText(instance)
6103 logging.info("Creating volume %s for instance %s",
6104 new_disk.iv_name, instance.name)
6105 # Note: this needs to be kept in sync with _CreateDisks
6107 for node in instance.all_nodes:
6108 f_create = node == instance.primary_node
6110 _CreateBlockDev(self, node, instance, new_disk,
6111 f_create, info, f_create)
6112 except errors.OpExecError, err:
6113 self.LogWarning("Failed to create volume %s (%s) on"
6115 new_disk.iv_name, new_disk, node, err)
6116 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6117 (new_disk.size, new_disk.mode)))
6119 # change a given disk
6120 instance.disks[disk_op].mode = disk_dict['mode']
6121 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6123 for nic_op, nic_dict in self.op.nics:
6124 if nic_op == constants.DDM_REMOVE:
6125 # remove the last nic
6126 del instance.nics[-1]
6127 result.append(("nic.%d" % len(instance.nics), "remove"))
6128 elif nic_op == constants.DDM_ADD:
6129 # mac and bridge should be set, by now
6130 mac = nic_dict['mac']
6131 bridge = nic_dict['bridge']
6132 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6134 instance.nics.append(new_nic)
6135 result.append(("nic.%d" % (len(instance.nics) - 1),
6136 "add:mac=%s,ip=%s,bridge=%s" %
6137 (new_nic.mac, new_nic.ip, new_nic.bridge)))
6139 # change a given nic
6140 for key in 'mac', 'ip', 'bridge':
6142 setattr(instance.nics[nic_op], key, nic_dict[key])
6143 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6146 if self.op.hvparams:
6147 instance.hvparams = self.hv_inst
6148 for key, val in self.op.hvparams.iteritems():
6149 result.append(("hv/%s" % key, val))
6152 if self.op.beparams:
6153 instance.beparams = self.be_inst
6154 for key, val in self.op.beparams.iteritems():
6155 result.append(("be/%s" % key, val))
6157 self.cfg.Update(instance)
6162 class LUQueryExports(NoHooksLU):
6163 """Query the exports list
6166 _OP_REQP = ['nodes']
6169 def ExpandNames(self):
6170 self.needed_locks = {}
6171 self.share_locks[locking.LEVEL_NODE] = 1
6172 if not self.op.nodes:
6173 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6175 self.needed_locks[locking.LEVEL_NODE] = \
6176 _GetWantedNodes(self, self.op.nodes)
6178 def CheckPrereq(self):
6179 """Check prerequisites.
6182 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6184 def Exec(self, feedback_fn):
6185 """Compute the list of all the exported system images.
6188 @return: a dictionary with the structure node->(export-list)
6189 where export-list is a list of the instances exported on
6193 rpcresult = self.rpc.call_export_list(self.nodes)
6195 for node in rpcresult:
6196 if rpcresult[node].failed:
6197 result[node] = False
6199 result[node] = rpcresult[node].data
6204 class LUExportInstance(LogicalUnit):
6205 """Export an instance to an image in the cluster.
6208 HPATH = "instance-export"
6209 HTYPE = constants.HTYPE_INSTANCE
6210 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6213 def ExpandNames(self):
6214 self._ExpandAndLockInstance()
6215 # FIXME: lock only instance primary and destination node
6217 # Sad but true, for now we have do lock all nodes, as we don't know where
6218 # the previous export might be, and and in this LU we search for it and
6219 # remove it from its current node. In the future we could fix this by:
6220 # - making a tasklet to search (share-lock all), then create the new one,
6221 # then one to remove, after
6222 # - removing the removal operation altoghether
6223 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6225 def DeclareLocks(self, level):
6226 """Last minute lock declaration."""
6227 # All nodes are locked anyway, so nothing to do here.
6229 def BuildHooksEnv(self):
6232 This will run on the master, primary node and target node.
6236 "EXPORT_NODE": self.op.target_node,
6237 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6239 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6240 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6241 self.op.target_node]
6244 def CheckPrereq(self):
6245 """Check prerequisites.
6247 This checks that the instance and node names are valid.
6250 instance_name = self.op.instance_name
6251 self.instance = self.cfg.GetInstanceInfo(instance_name)
6252 assert self.instance is not None, \
6253 "Cannot retrieve locked instance %s" % self.op.instance_name
6254 _CheckNodeOnline(self, self.instance.primary_node)
6256 self.dst_node = self.cfg.GetNodeInfo(
6257 self.cfg.ExpandNodeName(self.op.target_node))
6259 if self.dst_node is None:
6260 # This is wrong node name, not a non-locked node
6261 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6262 _CheckNodeOnline(self, self.dst_node.name)
6263 _CheckNodeNotDrained(self, self.dst_node.name)
6265 # instance disk type verification
6266 for disk in self.instance.disks:
6267 if disk.dev_type == constants.LD_FILE:
6268 raise errors.OpPrereqError("Export not supported for instances with"
6269 " file-based disks")
6271 def Exec(self, feedback_fn):
6272 """Export an instance to an image in the cluster.
6275 instance = self.instance
6276 dst_node = self.dst_node
6277 src_node = instance.primary_node
6278 if self.op.shutdown:
6279 # shutdown the instance, but not the disks
6280 result = self.rpc.call_instance_shutdown(src_node, instance)
6281 msg = result.RemoteFailMsg()
6283 raise errors.OpExecError("Could not shutdown instance %s on"
6285 (instance.name, src_node, msg))
6287 vgname = self.cfg.GetVGName()
6291 # set the disks ID correctly since call_instance_start needs the
6292 # correct drbd minor to create the symlinks
6293 for disk in instance.disks:
6294 self.cfg.SetDiskID(disk, src_node)
6297 for disk in instance.disks:
6298 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6299 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6300 if new_dev_name.failed or not new_dev_name.data:
6301 self.LogWarning("Could not snapshot block device %s on node %s",
6302 disk.logical_id[1], src_node)
6303 snap_disks.append(False)
6305 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6306 logical_id=(vgname, new_dev_name.data),
6307 physical_id=(vgname, new_dev_name.data),
6308 iv_name=disk.iv_name)
6309 snap_disks.append(new_dev)
6312 if self.op.shutdown and instance.admin_up:
6313 result = self.rpc.call_instance_start(src_node, instance, None, None)
6314 msg = result.RemoteFailMsg()
6316 _ShutdownInstanceDisks(self, instance)
6317 raise errors.OpExecError("Could not start instance: %s" % msg)
6319 # TODO: check for size
6321 cluster_name = self.cfg.GetClusterName()
6322 for idx, dev in enumerate(snap_disks):
6324 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6325 instance, cluster_name, idx)
6326 if result.failed or not result.data:
6327 self.LogWarning("Could not export block device %s from node %s to"
6328 " node %s", dev.logical_id[1], src_node,
6330 msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6332 self.LogWarning("Could not remove snapshot block device %s from node"
6333 " %s: %s", dev.logical_id[1], src_node, msg)
6335 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6336 if result.failed or not result.data:
6337 self.LogWarning("Could not finalize export for instance %s on node %s",
6338 instance.name, dst_node.name)
6340 nodelist = self.cfg.GetNodeList()
6341 nodelist.remove(dst_node.name)
6343 # on one-node clusters nodelist will be empty after the removal
6344 # if we proceed the backup would be removed because OpQueryExports
6345 # substitutes an empty list with the full cluster node list.
6347 exportlist = self.rpc.call_export_list(nodelist)
6348 for node in exportlist:
6349 if exportlist[node].failed:
6351 if instance.name in exportlist[node].data:
6352 if not self.rpc.call_export_remove(node, instance.name):
6353 self.LogWarning("Could not remove older export for instance %s"
6354 " on node %s", instance.name, node)
6357 class LURemoveExport(NoHooksLU):
6358 """Remove exports related to the named instance.
6361 _OP_REQP = ["instance_name"]
6364 def ExpandNames(self):
6365 self.needed_locks = {}
6366 # We need all nodes to be locked in order for RemoveExport to work, but we
6367 # don't need to lock the instance itself, as nothing will happen to it (and
6368 # we can remove exports also for a removed instance)
6369 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6371 def CheckPrereq(self):
6372 """Check prerequisites.
6376 def Exec(self, feedback_fn):
6377 """Remove any export.
6380 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6381 # If the instance was not found we'll try with the name that was passed in.
6382 # This will only work if it was an FQDN, though.
6384 if not instance_name:
6386 instance_name = self.op.instance_name
6388 exportlist = self.rpc.call_export_list(self.acquired_locks[
6389 locking.LEVEL_NODE])
6391 for node in exportlist:
6392 if exportlist[node].failed:
6393 self.LogWarning("Failed to query node %s, continuing" % node)
6395 if instance_name in exportlist[node].data:
6397 result = self.rpc.call_export_remove(node, instance_name)
6398 if result.failed or not result.data:
6399 logging.error("Could not remove export for instance %s"
6400 " on node %s", instance_name, node)
6402 if fqdn_warn and not found:
6403 feedback_fn("Export not found. If trying to remove an export belonging"
6404 " to a deleted instance please use its Fully Qualified"
6408 class TagsLU(NoHooksLU):
6411 This is an abstract class which is the parent of all the other tags LUs.
6415 def ExpandNames(self):
6416 self.needed_locks = {}
6417 if self.op.kind == constants.TAG_NODE:
6418 name = self.cfg.ExpandNodeName(self.op.name)
6420 raise errors.OpPrereqError("Invalid node name (%s)" %
6423 self.needed_locks[locking.LEVEL_NODE] = name
6424 elif self.op.kind == constants.TAG_INSTANCE:
6425 name = self.cfg.ExpandInstanceName(self.op.name)
6427 raise errors.OpPrereqError("Invalid instance name (%s)" %
6430 self.needed_locks[locking.LEVEL_INSTANCE] = name
6432 def CheckPrereq(self):
6433 """Check prerequisites.
6436 if self.op.kind == constants.TAG_CLUSTER:
6437 self.target = self.cfg.GetClusterInfo()
6438 elif self.op.kind == constants.TAG_NODE:
6439 self.target = self.cfg.GetNodeInfo(self.op.name)
6440 elif self.op.kind == constants.TAG_INSTANCE:
6441 self.target = self.cfg.GetInstanceInfo(self.op.name)
6443 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6447 class LUGetTags(TagsLU):
6448 """Returns the tags of a given object.
6451 _OP_REQP = ["kind", "name"]
6454 def Exec(self, feedback_fn):
6455 """Returns the tag list.
6458 return list(self.target.GetTags())
6461 class LUSearchTags(NoHooksLU):
6462 """Searches the tags for a given pattern.
6465 _OP_REQP = ["pattern"]
6468 def ExpandNames(self):
6469 self.needed_locks = {}
6471 def CheckPrereq(self):
6472 """Check prerequisites.
6474 This checks the pattern passed for validity by compiling it.
6478 self.re = re.compile(self.op.pattern)
6479 except re.error, err:
6480 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6481 (self.op.pattern, err))
6483 def Exec(self, feedback_fn):
6484 """Returns the tag list.
6488 tgts = [("/cluster", cfg.GetClusterInfo())]
6489 ilist = cfg.GetAllInstancesInfo().values()
6490 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6491 nlist = cfg.GetAllNodesInfo().values()
6492 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6494 for path, target in tgts:
6495 for tag in target.GetTags():
6496 if self.re.search(tag):
6497 results.append((path, tag))
6501 class LUAddTags(TagsLU):
6502 """Sets a tag on a given object.
6505 _OP_REQP = ["kind", "name", "tags"]
6508 def CheckPrereq(self):
6509 """Check prerequisites.
6511 This checks the type and length of the tag name and value.
6514 TagsLU.CheckPrereq(self)
6515 for tag in self.op.tags:
6516 objects.TaggableObject.ValidateTag(tag)
6518 def Exec(self, feedback_fn):
6523 for tag in self.op.tags:
6524 self.target.AddTag(tag)
6525 except errors.TagError, err:
6526 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6528 self.cfg.Update(self.target)
6529 except errors.ConfigurationError:
6530 raise errors.OpRetryError("There has been a modification to the"
6531 " config file and the operation has been"
6532 " aborted. Please retry.")
6535 class LUDelTags(TagsLU):
6536 """Delete a list of tags from a given object.
6539 _OP_REQP = ["kind", "name", "tags"]
6542 def CheckPrereq(self):
6543 """Check prerequisites.
6545 This checks that we have the given tag.
6548 TagsLU.CheckPrereq(self)
6549 for tag in self.op.tags:
6550 objects.TaggableObject.ValidateTag(tag)
6551 del_tags = frozenset(self.op.tags)
6552 cur_tags = self.target.GetTags()
6553 if not del_tags <= cur_tags:
6554 diff_tags = del_tags - cur_tags
6555 diff_names = ["'%s'" % tag for tag in diff_tags]
6557 raise errors.OpPrereqError("Tag(s) %s not found" %
6558 (",".join(diff_names)))
6560 def Exec(self, feedback_fn):
6561 """Remove the tag from the object.
6564 for tag in self.op.tags:
6565 self.target.RemoveTag(tag)
6567 self.cfg.Update(self.target)
6568 except errors.ConfigurationError:
6569 raise errors.OpRetryError("There has been a modification to the"
6570 " config file and the operation has been"
6571 " aborted. Please retry.")
6574 class LUTestDelay(NoHooksLU):
6575 """Sleep for a specified amount of time.
6577 This LU sleeps on the master and/or nodes for a specified amount of
6581 _OP_REQP = ["duration", "on_master", "on_nodes"]
6584 def ExpandNames(self):
6585 """Expand names and set required locks.
6587 This expands the node list, if any.
6590 self.needed_locks = {}
6591 if self.op.on_nodes:
6592 # _GetWantedNodes can be used here, but is not always appropriate to use
6593 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6595 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6596 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6598 def CheckPrereq(self):
6599 """Check prerequisites.
6603 def Exec(self, feedback_fn):
6604 """Do the actual sleep.
6607 if self.op.on_master:
6608 if not utils.TestDelay(self.op.duration):
6609 raise errors.OpExecError("Error during master delay test")
6610 if self.op.on_nodes:
6611 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6613 raise errors.OpExecError("Complete failure from rpc call")
6614 for node, node_result in result.items():
6616 if not node_result.data:
6617 raise errors.OpExecError("Failure during rpc call to node %s,"
6618 " result: %s" % (node, node_result.data))
6621 class IAllocator(object):
6622 """IAllocator framework.
6624 An IAllocator instance has three sets of attributes:
6625 - cfg that is needed to query the cluster
6626 - input data (all members of the _KEYS class attribute are required)
6627 - four buffer attributes (in|out_data|text), that represent the
6628 input (to the external script) in text and data structure format,
6629 and the output from it, again in two formats
6630 - the result variables from the script (success, info, nodes) for
6635 "mem_size", "disks", "disk_template",
6636 "os", "tags", "nics", "vcpus", "hypervisor",
6642 def __init__(self, lu, mode, name, **kwargs):
6644 # init buffer variables
6645 self.in_text = self.out_text = self.in_data = self.out_data = None
6646 # init all input fields so that pylint is happy
6649 self.mem_size = self.disks = self.disk_template = None
6650 self.os = self.tags = self.nics = self.vcpus = None
6651 self.hypervisor = None
6652 self.relocate_from = None
6654 self.required_nodes = None
6655 # init result fields
6656 self.success = self.info = self.nodes = None
6657 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6658 keyset = self._ALLO_KEYS
6659 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6660 keyset = self._RELO_KEYS
6662 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6663 " IAllocator" % self.mode)
6665 if key not in keyset:
6666 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6667 " IAllocator" % key)
6668 setattr(self, key, kwargs[key])
6670 if key not in kwargs:
6671 raise errors.ProgrammerError("Missing input parameter '%s' to"
6672 " IAllocator" % key)
6673 self._BuildInputData()
6675 def _ComputeClusterData(self):
6676 """Compute the generic allocator input data.
6678 This is the data that is independent of the actual operation.
6682 cluster_info = cfg.GetClusterInfo()
6685 "version": constants.IALLOCATOR_VERSION,
6686 "cluster_name": cfg.GetClusterName(),
6687 "cluster_tags": list(cluster_info.GetTags()),
6688 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6689 # we don't have job IDs
6691 iinfo = cfg.GetAllInstancesInfo().values()
6692 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6696 node_list = cfg.GetNodeList()
6698 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6699 hypervisor_name = self.hypervisor
6700 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6701 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6703 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6705 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6706 cluster_info.enabled_hypervisors)
6707 for nname, nresult in node_data.items():
6708 # first fill in static (config-based) values
6709 ninfo = cfg.GetNodeInfo(nname)
6711 "tags": list(ninfo.GetTags()),
6712 "primary_ip": ninfo.primary_ip,
6713 "secondary_ip": ninfo.secondary_ip,
6714 "offline": ninfo.offline,
6715 "drained": ninfo.drained,
6716 "master_candidate": ninfo.master_candidate,
6719 if not ninfo.offline:
6721 if not isinstance(nresult.data, dict):
6722 raise errors.OpExecError("Can't get data for node %s" % nname)
6723 remote_info = nresult.data
6724 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6725 'vg_size', 'vg_free', 'cpu_total']:
6726 if attr not in remote_info:
6727 raise errors.OpExecError("Node '%s' didn't return attribute"
6728 " '%s'" % (nname, attr))
6730 remote_info[attr] = int(remote_info[attr])
6731 except ValueError, err:
6732 raise errors.OpExecError("Node '%s' returned invalid value"
6733 " for '%s': %s" % (nname, attr, err))
6734 # compute memory used by primary instances
6735 i_p_mem = i_p_up_mem = 0
6736 for iinfo, beinfo in i_list:
6737 if iinfo.primary_node == nname:
6738 i_p_mem += beinfo[constants.BE_MEMORY]
6739 if iinfo.name not in node_iinfo[nname].data:
6742 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6743 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6744 remote_info['memory_free'] -= max(0, i_mem_diff)
6747 i_p_up_mem += beinfo[constants.BE_MEMORY]
6749 # compute memory used by instances
6751 "total_memory": remote_info['memory_total'],
6752 "reserved_memory": remote_info['memory_dom0'],
6753 "free_memory": remote_info['memory_free'],
6754 "total_disk": remote_info['vg_size'],
6755 "free_disk": remote_info['vg_free'],
6756 "total_cpus": remote_info['cpu_total'],
6757 "i_pri_memory": i_p_mem,
6758 "i_pri_up_memory": i_p_up_mem,
6762 node_results[nname] = pnr
6763 data["nodes"] = node_results
6767 for iinfo, beinfo in i_list:
6768 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6769 for n in iinfo.nics]
6771 "tags": list(iinfo.GetTags()),
6772 "admin_up": iinfo.admin_up,
6773 "vcpus": beinfo[constants.BE_VCPUS],
6774 "memory": beinfo[constants.BE_MEMORY],
6776 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6778 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6779 "disk_template": iinfo.disk_template,
6780 "hypervisor": iinfo.hypervisor,
6782 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6784 instance_data[iinfo.name] = pir
6786 data["instances"] = instance_data
6790 def _AddNewInstance(self):
6791 """Add new instance data to allocator structure.
6793 This in combination with _AllocatorGetClusterData will create the
6794 correct structure needed as input for the allocator.
6796 The checks for the completeness of the opcode must have already been
6802 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6804 if self.disk_template in constants.DTS_NET_MIRROR:
6805 self.required_nodes = 2
6807 self.required_nodes = 1
6811 "disk_template": self.disk_template,
6814 "vcpus": self.vcpus,
6815 "memory": self.mem_size,
6816 "disks": self.disks,
6817 "disk_space_total": disk_space,
6819 "required_nodes": self.required_nodes,
6821 data["request"] = request
6823 def _AddRelocateInstance(self):
6824 """Add relocate instance data to allocator structure.
6826 This in combination with _IAllocatorGetClusterData will create the
6827 correct structure needed as input for the allocator.
6829 The checks for the completeness of the opcode must have already been
6833 instance = self.lu.cfg.GetInstanceInfo(self.name)
6834 if instance is None:
6835 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6836 " IAllocator" % self.name)
6838 if instance.disk_template not in constants.DTS_NET_MIRROR:
6839 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6841 if len(instance.secondary_nodes) != 1:
6842 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6844 self.required_nodes = 1
6845 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6846 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6851 "disk_space_total": disk_space,
6852 "required_nodes": self.required_nodes,
6853 "relocate_from": self.relocate_from,
6855 self.in_data["request"] = request
6857 def _BuildInputData(self):
6858 """Build input data structures.
6861 self._ComputeClusterData()
6863 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6864 self._AddNewInstance()
6866 self._AddRelocateInstance()
6868 self.in_text = serializer.Dump(self.in_data)
6870 def Run(self, name, validate=True, call_fn=None):
6871 """Run an instance allocator and return the results.
6875 call_fn = self.lu.rpc.call_iallocator_runner
6878 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6881 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6882 raise errors.OpExecError("Invalid result from master iallocator runner")
6884 rcode, stdout, stderr, fail = result.data
6886 if rcode == constants.IARUN_NOTFOUND:
6887 raise errors.OpExecError("Can't find allocator '%s'" % name)
6888 elif rcode == constants.IARUN_FAILURE:
6889 raise errors.OpExecError("Instance allocator call failed: %s,"
6890 " output: %s" % (fail, stdout+stderr))
6891 self.out_text = stdout
6893 self._ValidateResult()
6895 def _ValidateResult(self):
6896 """Process the allocator results.
6898 This will process and if successful save the result in
6899 self.out_data and the other parameters.
6903 rdict = serializer.Load(self.out_text)
6904 except Exception, err:
6905 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6907 if not isinstance(rdict, dict):
6908 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6910 for key in "success", "info", "nodes":
6911 if key not in rdict:
6912 raise errors.OpExecError("Can't parse iallocator results:"
6913 " missing key '%s'" % key)
6914 setattr(self, key, rdict[key])
6916 if not isinstance(rdict["nodes"], list):
6917 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6919 self.out_data = rdict
6922 class LUTestAllocator(NoHooksLU):
6923 """Run allocator tests.
6925 This LU runs the allocator tests
6928 _OP_REQP = ["direction", "mode", "name"]
6930 def CheckPrereq(self):
6931 """Check prerequisites.
6933 This checks the opcode parameters depending on the director and mode test.
6936 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6937 for attr in ["name", "mem_size", "disks", "disk_template",
6938 "os", "tags", "nics", "vcpus"]:
6939 if not hasattr(self.op, attr):
6940 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6942 iname = self.cfg.ExpandInstanceName(self.op.name)
6943 if iname is not None:
6944 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6946 if not isinstance(self.op.nics, list):
6947 raise errors.OpPrereqError("Invalid parameter 'nics'")
6948 for row in self.op.nics:
6949 if (not isinstance(row, dict) or
6952 "bridge" not in row):
6953 raise errors.OpPrereqError("Invalid contents of the"
6954 " 'nics' parameter")
6955 if not isinstance(self.op.disks, list):
6956 raise errors.OpPrereqError("Invalid parameter 'disks'")
6957 for row in self.op.disks:
6958 if (not isinstance(row, dict) or
6959 "size" not in row or
6960 not isinstance(row["size"], int) or
6961 "mode" not in row or
6962 row["mode"] not in ['r', 'w']):
6963 raise errors.OpPrereqError("Invalid contents of the"
6964 " 'disks' parameter")
6965 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6966 self.op.hypervisor = self.cfg.GetHypervisorType()
6967 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6968 if not hasattr(self.op, "name"):
6969 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6970 fname = self.cfg.ExpandInstanceName(self.op.name)
6972 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6974 self.op.name = fname
6975 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6977 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6980 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6981 if not hasattr(self.op, "allocator") or self.op.allocator is None:
6982 raise errors.OpPrereqError("Missing allocator name")
6983 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6984 raise errors.OpPrereqError("Wrong allocator test '%s'" %
6987 def Exec(self, feedback_fn):
6988 """Run the allocator test.
6991 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6992 ial = IAllocator(self,
6995 mem_size=self.op.mem_size,
6996 disks=self.op.disks,
6997 disk_template=self.op.disk_template,
7001 vcpus=self.op.vcpus,
7002 hypervisor=self.op.hypervisor,
7005 ial = IAllocator(self,
7008 relocate_from=list(self.relocate_from),
7011 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7012 result = ial.in_text
7014 ial.Run(self.op.allocator, validate=False)
7015 result = ial.out_text