4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
68 def __init__(self, processor, op, context, rpc):
69 """Constructor for LogicalUnit.
71 This needs to be overriden in derived classes in order to check op
77 self.cfg = context.cfg
78 self.context = context
80 # Dicts used to declare locking needs to mcpu
81 self.needed_locks = None
82 self.acquired_locks = {}
83 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85 self.remove_locks = {}
86 # Used to force good behavior when calling helper functions
87 self.recalculate_locks = {}
90 self.LogWarning = processor.LogWarning
91 self.LogInfo = processor.LogInfo
93 for attr_name in self._OP_REQP:
94 attr_val = getattr(op, attr_name, None)
96 raise errors.OpPrereqError("Required parameter '%s' missing" %
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108 ssh = property(fget=__GetSSH)
110 def CheckArguments(self):
111 """Check syntactic validity for the opcode arguments.
113 This method is for doing a simple syntactic check and ensure
114 validity of opcode parameters, without any cluster-related
115 checks. While the same can be accomplished in ExpandNames and/or
116 CheckPrereq, doing these separate is better because:
118 - ExpandNames is left as as purely a lock-related function
119 - CheckPrereq is run after we have aquired locks (and possible
122 The function is allowed to change the self.op attribute so that
123 later methods can no longer worry about missing parameters.
128 def ExpandNames(self):
129 """Expand names for this LU.
131 This method is called before starting to execute the opcode, and it should
132 update all the parameters of the opcode to their canonical form (e.g. a
133 short node name must be fully expanded after this method has successfully
134 completed). This way locking, hooks, logging, ecc. can work correctly.
136 LUs which implement this method must also populate the self.needed_locks
137 member, as a dict with lock levels as keys, and a list of needed lock names
140 - use an empty dict if you don't need any lock
141 - if you don't need any lock at a particular level omit that level
142 - don't put anything for the BGL level
143 - if you want all locks at a level use locking.ALL_SET as a value
145 If you need to share locks (rather than acquire them exclusively) at one
146 level you can modify self.share_locks, setting a true value (usually 1) for
147 that level. By default locks are not shared.
151 # Acquire all nodes and one instance
152 self.needed_locks = {
153 locking.LEVEL_NODE: locking.ALL_SET,
154 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156 # Acquire just two nodes
157 self.needed_locks = {
158 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161 self.needed_locks = {} # No, you can't leave it to the default value None
164 # The implementation of this method is mandatory only if the new LU is
165 # concurrent, so that old LUs don't need to be changed all at the same
168 self.needed_locks = {} # Exclusive LUs don't need locks.
170 raise NotImplementedError
172 def DeclareLocks(self, level):
173 """Declare LU locking needs for a level
175 While most LUs can just declare their locking needs at ExpandNames time,
176 sometimes there's the need to calculate some locks after having acquired
177 the ones before. This function is called just before acquiring locks at a
178 particular level, but after acquiring the ones at lower levels, and permits
179 such calculations. It can be used to modify self.needed_locks, and by
180 default it does nothing.
182 This function is only called if you have something already set in
183 self.needed_locks for the level.
185 @param level: Locking level which is going to be locked
186 @type level: member of ganeti.locking.LEVELS
190 def CheckPrereq(self):
191 """Check prerequisites for this LU.
193 This method should check that the prerequisites for the execution
194 of this LU are fulfilled. It can do internode communication, but
195 it should be idempotent - no cluster or system changes are
198 The method should raise errors.OpPrereqError in case something is
199 not fulfilled. Its return value is ignored.
201 This method should also update all the parameters of the opcode to
202 their canonical form if it hasn't been done by ExpandNames before.
205 raise NotImplementedError
207 def Exec(self, feedback_fn):
210 This method should implement the actual work. It should raise
211 errors.OpExecError for failures that are somewhat dealt with in
215 raise NotImplementedError
217 def BuildHooksEnv(self):
218 """Build hooks environment for this LU.
220 This method should return a three-node tuple consisting of: a dict
221 containing the environment that will be used for running the
222 specific hook for this LU, a list of node names on which the hook
223 should run before the execution, and a list of node names on which
224 the hook should run after the execution.
226 The keys of the dict must not have 'GANETI_' prefixed as this will
227 be handled in the hooks runner. Also note additional keys will be
228 added by the hooks runner. If the LU doesn't define any
229 environment, an empty dict (and not None) should be returned.
231 No nodes should be returned as an empty list (and not None).
233 Note that if the HPATH for a LU class is None, this function will
237 raise NotImplementedError
239 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240 """Notify the LU about the results of its hooks.
242 This method is called every time a hooks phase is executed, and notifies
243 the Logical Unit about the hooks' result. The LU can then use it to alter
244 its result based on the hooks. By default the method does nothing and the
245 previous result is passed back unchanged but any LU can define it if it
246 wants to use the local cluster hook-scripts somehow.
248 @param phase: one of L{constants.HOOKS_PHASE_POST} or
249 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250 @param hook_results: the results of the multi-node hooks rpc call
251 @param feedback_fn: function used send feedback back to the caller
252 @param lu_result: the previous Exec result this LU had, or None
254 @return: the new Exec result, based on the previous result
260 def _ExpandAndLockInstance(self):
261 """Helper function to expand and lock an instance.
263 Many LUs that work on an instance take its name in self.op.instance_name
264 and need to expand it and then declare the expanded name for locking. This
265 function does it, and then updates self.op.instance_name to the expanded
266 name. It also initializes needed_locks as a dict, if this hasn't been done
270 if self.needed_locks is None:
271 self.needed_locks = {}
273 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274 "_ExpandAndLockInstance called with instance-level locks set"
275 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276 if expanded_name is None:
277 raise errors.OpPrereqError("Instance '%s' not known" %
278 self.op.instance_name)
279 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280 self.op.instance_name = expanded_name
282 def _LockInstancesNodes(self, primary_only=False):
283 """Helper function to declare instances' nodes for locking.
285 This function should be called after locking one or more instances to lock
286 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287 with all primary or secondary nodes for instances already locked and
288 present in self.needed_locks[locking.LEVEL_INSTANCE].
290 It should be called from DeclareLocks, and for safety only works if
291 self.recalculate_locks[locking.LEVEL_NODE] is set.
293 In the future it may grow parameters to just lock some instance's nodes, or
294 to just lock primaries or secondary nodes, if needed.
296 If should be called in DeclareLocks in a way similar to::
298 if level == locking.LEVEL_NODE:
299 self._LockInstancesNodes()
301 @type primary_only: boolean
302 @param primary_only: only lock primary nodes of locked instances
305 assert locking.LEVEL_NODE in self.recalculate_locks, \
306 "_LockInstancesNodes helper function called with no nodes to recalculate"
308 # TODO: check if we're really been called with the instance locks held
310 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311 # future we might want to have different behaviors depending on the value
312 # of self.recalculate_locks[locking.LEVEL_NODE]
314 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315 instance = self.context.cfg.GetInstanceInfo(instance_name)
316 wanted_nodes.append(instance.primary_node)
318 wanted_nodes.extend(instance.secondary_nodes)
320 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325 del self.recalculate_locks[locking.LEVEL_NODE]
328 class NoHooksLU(LogicalUnit):
329 """Simple LU which runs no hooks.
331 This LU is intended as a parent for other LogicalUnits which will
332 run no hooks, in order to reduce duplicate code.
339 def _GetWantedNodes(lu, nodes):
340 """Returns list of checked and expanded node names.
342 @type lu: L{LogicalUnit}
343 @param lu: the logical unit on whose behalf we execute
345 @param nodes: list of node names or None for all nodes
347 @return: the list of nodes, sorted
348 @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351 if not isinstance(nodes, list):
352 raise errors.OpPrereqError("Invalid argument type 'nodes'")
355 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356 " non-empty list of nodes whose name is to be expanded.")
360 node = lu.cfg.ExpandNodeName(name)
362 raise errors.OpPrereqError("No such node name '%s'" % name)
365 return utils.NiceSort(wanted)
368 def _GetWantedInstances(lu, instances):
369 """Returns list of checked and expanded instance names.
371 @type lu: L{LogicalUnit}
372 @param lu: the logical unit on whose behalf we execute
373 @type instances: list
374 @param instances: list of instance names or None for all instances
376 @return: the list of instances, sorted
377 @raise errors.OpPrereqError: if the instances parameter is wrong type
378 @raise errors.OpPrereqError: if any of the passed instances is not found
381 if not isinstance(instances, list):
382 raise errors.OpPrereqError("Invalid argument type 'instances'")
387 for name in instances:
388 instance = lu.cfg.ExpandInstanceName(name)
390 raise errors.OpPrereqError("No such instance name '%s'" % name)
391 wanted.append(instance)
394 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
398 def _CheckOutputFields(static, dynamic, selected):
399 """Checks whether all selected fields are valid.
401 @type static: L{utils.FieldSet}
402 @param static: static fields set
403 @type dynamic: L{utils.FieldSet}
404 @param dynamic: dynamic fields set
411 delta = f.NonMatching(selected)
413 raise errors.OpPrereqError("Unknown output fields selected: %s"
417 def _CheckBooleanOpField(op, name):
418 """Validates boolean opcode parameters.
420 This will ensure that an opcode parameter is either a boolean value,
421 or None (but that it always exists).
424 val = getattr(op, name, None)
425 if not (val is None or isinstance(val, bool)):
426 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428 setattr(op, name, val)
431 def _CheckNodeOnline(lu, node):
432 """Ensure that a given node is online.
434 @param lu: the LU on behalf of which we make the check
435 @param node: the node to check
436 @raise errors.OpPrereqError: if the node is offline
439 if lu.cfg.GetNodeInfo(node).offline:
440 raise errors.OpPrereqError("Can't use offline node %s" % node)
443 def _CheckNodeNotDrained(lu, node):
444 """Ensure that a given node is not drained.
446 @param lu: the LU on behalf of which we make the check
447 @param node: the node to check
448 @raise errors.OpPrereqError: if the node is drained
451 if lu.cfg.GetNodeInfo(node).drained:
452 raise errors.OpPrereqError("Can't use drained node %s" % node)
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456 memory, vcpus, nics, disk_template, disks,
457 bep, hvp, hypervisor):
458 """Builds instance related env variables for hooks
460 This builds the hook environment from individual variables.
463 @param name: the name of the instance
464 @type primary_node: string
465 @param primary_node: the name of the instance's primary node
466 @type secondary_nodes: list
467 @param secondary_nodes: list of secondary nodes as strings
468 @type os_type: string
469 @param os_type: the name of the instance's OS
470 @type status: boolean
471 @param status: the should_run status of the instance
473 @param memory: the memory size of the instance
475 @param vcpus: the count of VCPUs the instance has
477 @param nics: list of tuples (ip, bridge, mac) representing
478 the NICs the instance has
479 @type disk_template: string
480 @param disk_template: the distk template of the instance
482 @param disks: the list of (size, mode) pairs
484 @param bep: the backend parameters for the instance
486 @param hvp: the hypervisor parameters for the instance
487 @type hypervisor: string
488 @param hypervisor: the hypervisor for the instance
490 @return: the hook environment for this instance
499 "INSTANCE_NAME": name,
500 "INSTANCE_PRIMARY": primary_node,
501 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502 "INSTANCE_OS_TYPE": os_type,
503 "INSTANCE_STATUS": str_status,
504 "INSTANCE_MEMORY": memory,
505 "INSTANCE_VCPUS": vcpus,
506 "INSTANCE_DISK_TEMPLATE": disk_template,
507 "INSTANCE_HYPERVISOR": hypervisor,
511 nic_count = len(nics)
512 for idx, (ip, bridge, mac) in enumerate(nics):
515 env["INSTANCE_NIC%d_IP" % idx] = ip
516 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517 env["INSTANCE_NIC%d_MAC" % idx] = mac
521 env["INSTANCE_NIC_COUNT"] = nic_count
524 disk_count = len(disks)
525 for idx, (size, mode) in enumerate(disks):
526 env["INSTANCE_DISK%d_SIZE" % idx] = size
527 env["INSTANCE_DISK%d_MODE" % idx] = mode
531 env["INSTANCE_DISK_COUNT"] = disk_count
533 for source, kind in [(bep, "BE"), (hvp, "HV")]:
534 for key, value in source.items():
535 env["INSTANCE_%s_%s" % (kind, key)] = value
540 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541 """Builds instance related env variables for hooks from an object.
543 @type lu: L{LogicalUnit}
544 @param lu: the logical unit on whose behalf we execute
545 @type instance: L{objects.Instance}
546 @param instance: the instance for which we should build the
549 @param override: dictionary with key/values that will override
552 @return: the hook environment dictionary
555 cluster = lu.cfg.GetClusterInfo()
556 bep = cluster.FillBE(instance)
557 hvp = cluster.FillHV(instance)
559 'name': instance.name,
560 'primary_node': instance.primary_node,
561 'secondary_nodes': instance.secondary_nodes,
562 'os_type': instance.os,
563 'status': instance.admin_up,
564 'memory': bep[constants.BE_MEMORY],
565 'vcpus': bep[constants.BE_VCPUS],
566 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567 'disk_template': instance.disk_template,
568 'disks': [(disk.size, disk.mode) for disk in instance.disks],
571 'hypervisor': instance.hypervisor,
574 args.update(override)
575 return _BuildInstanceHookEnv(**args)
578 def _AdjustCandidatePool(lu):
579 """Adjust the candidate pool after node operations.
582 mod_list = lu.cfg.MaintainCandidatePool()
584 lu.LogInfo("Promoted nodes to master candidate role: %s",
585 ", ".join(node.name for node in mod_list))
586 for name in mod_list:
587 lu.context.ReaddNode(name)
588 mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
590 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
594 def _CheckInstanceBridgesExist(lu, instance):
595 """Check that the brigdes needed by an instance exist.
598 # check bridges existance
599 brlist = [nic.bridge for nic in instance.nics]
600 result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
603 raise errors.OpPrereqError("One or more target bridges %s does not"
604 " exist on destination node '%s'" %
605 (brlist, instance.primary_node))
608 class LUDestroyCluster(NoHooksLU):
609 """Logical unit for destroying the cluster.
614 def CheckPrereq(self):
615 """Check prerequisites.
617 This checks whether the cluster is empty.
619 Any errors are signalled by raising errors.OpPrereqError.
622 master = self.cfg.GetMasterNode()
624 nodelist = self.cfg.GetNodeList()
625 if len(nodelist) != 1 or nodelist[0] != master:
626 raise errors.OpPrereqError("There are still %d node(s) in"
627 " this cluster." % (len(nodelist) - 1))
628 instancelist = self.cfg.GetInstanceList()
630 raise errors.OpPrereqError("There are still %d instance(s) in"
631 " this cluster." % len(instancelist))
633 def Exec(self, feedback_fn):
634 """Destroys the cluster.
637 master = self.cfg.GetMasterNode()
638 result = self.rpc.call_node_stop_master(master, False)
641 raise errors.OpExecError("Could not disable the master role")
642 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643 utils.CreateBackup(priv_key)
644 utils.CreateBackup(pub_key)
648 class LUVerifyCluster(LogicalUnit):
649 """Verifies the cluster status.
652 HPATH = "cluster-verify"
653 HTYPE = constants.HTYPE_CLUSTER
654 _OP_REQP = ["skip_checks"]
657 def ExpandNames(self):
658 self.needed_locks = {
659 locking.LEVEL_NODE: locking.ALL_SET,
660 locking.LEVEL_INSTANCE: locking.ALL_SET,
662 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
664 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665 node_result, feedback_fn, master_files,
667 """Run multiple tests against a node.
671 - compares ganeti version
672 - checks vg existance and size > 20G
673 - checks config file checksum
674 - checks ssh to other nodes
676 @type nodeinfo: L{objects.Node}
677 @param nodeinfo: the node to check
678 @param file_list: required list of files
679 @param local_cksum: dictionary of local files and their checksums
680 @param node_result: the results from the node
681 @param feedback_fn: function used to accumulate results
682 @param master_files: list of files that only masters should have
683 @param drbd_map: the useddrbd minors for this node, in
684 form of minor: (instance, must_exist) which correspond to instances
685 and their running status
686 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
691 # main result, node_result should be a non-empty dict
692 if not node_result or not isinstance(node_result, dict):
693 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
696 # compares ganeti version
697 local_version = constants.PROTOCOL_VERSION
698 remote_version = node_result.get('version', None)
699 if not (remote_version and isinstance(remote_version, (list, tuple)) and
700 len(remote_version) == 2):
701 feedback_fn(" - ERROR: connection to %s failed" % (node))
704 if local_version != remote_version[0]:
705 feedback_fn(" - ERROR: incompatible protocol versions: master %s,"
706 " node %s %s" % (local_version, node, remote_version[0]))
709 # node seems compatible, we can actually try to look into its results
713 # full package version
714 if constants.RELEASE_VERSION != remote_version[1]:
715 feedback_fn(" - WARNING: software version mismatch: master %s,"
717 (constants.RELEASE_VERSION, node, remote_version[1]))
719 # checks vg existence and size > 20G
720 if vg_name is not None:
721 vglist = node_result.get(constants.NV_VGLIST, None)
723 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
727 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728 constants.MIN_VG_SIZE)
730 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
733 # checks config file checksum
735 remote_cksum = node_result.get(constants.NV_FILELIST, None)
736 if not isinstance(remote_cksum, dict):
738 feedback_fn(" - ERROR: node hasn't returned file checksum data")
740 for file_name in file_list:
741 node_is_mc = nodeinfo.master_candidate
742 must_have_file = file_name not in master_files
743 if file_name not in remote_cksum:
744 if node_is_mc or must_have_file:
746 feedback_fn(" - ERROR: file '%s' missing" % file_name)
747 elif remote_cksum[file_name] != local_cksum[file_name]:
748 if node_is_mc or must_have_file:
750 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
752 # not candidate and this is not a must-have file
754 feedback_fn(" - ERROR: file '%s' should not exist on non master"
755 " candidates (and the file is outdated)" % file_name)
757 # all good, except non-master/non-must have combination
758 if not node_is_mc and not must_have_file:
759 feedback_fn(" - ERROR: file '%s' should not exist on non master"
760 " candidates" % file_name)
764 if constants.NV_NODELIST not in node_result:
766 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
768 if node_result[constants.NV_NODELIST]:
770 for node in node_result[constants.NV_NODELIST]:
771 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
772 (node, node_result[constants.NV_NODELIST][node]))
774 if constants.NV_NODENETTEST not in node_result:
776 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
778 if node_result[constants.NV_NODENETTEST]:
780 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
782 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
783 (node, node_result[constants.NV_NODENETTEST][node]))
785 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786 if isinstance(hyp_result, dict):
787 for hv_name, hv_result in hyp_result.iteritems():
788 if hv_result is not None:
789 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
790 (hv_name, hv_result))
792 # check used drbd list
793 if vg_name is not None:
794 used_minors = node_result.get(constants.NV_DRBDLIST, [])
795 if not isinstance(used_minors, (tuple, list)):
796 feedback_fn(" - ERROR: cannot parse drbd status file: %s" %
799 for minor, (iname, must_exist) in drbd_map.items():
800 if minor not in used_minors and must_exist:
801 feedback_fn(" - ERROR: drbd minor %d of instance %s is"
802 " not active" % (minor, iname))
804 for minor in used_minors:
805 if minor not in drbd_map:
806 feedback_fn(" - ERROR: unallocated drbd minor %d is in use" %
812 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813 node_instance, feedback_fn, n_offline):
814 """Verify an instance.
816 This function checks to see if the required block devices are
817 available on the instance's node.
822 node_current = instanceconfig.primary_node
825 instanceconfig.MapLVsByNode(node_vol_should)
827 for node in node_vol_should:
828 if node in n_offline:
829 # ignore missing volumes on offline nodes
831 for volume in node_vol_should[node]:
832 if node not in node_vol_is or volume not in node_vol_is[node]:
833 feedback_fn(" - ERROR: volume %s missing on node %s" %
837 if instanceconfig.admin_up:
838 if ((node_current not in node_instance or
839 not instance in node_instance[node_current]) and
840 node_current not in n_offline):
841 feedback_fn(" - ERROR: instance %s not running on node %s" %
842 (instance, node_current))
845 for node in node_instance:
846 if (not node == node_current):
847 if instance in node_instance[node]:
848 feedback_fn(" - ERROR: instance %s should not run on node %s" %
854 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855 """Verify if there are any unknown volumes in the cluster.
857 The .os, .swap and backup volumes are ignored. All other volumes are
863 for node in node_vol_is:
864 for volume in node_vol_is[node]:
865 if node not in node_vol_should or volume not in node_vol_should[node]:
866 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
871 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872 """Verify the list of running instances.
874 This checks what instances are running but unknown to the cluster.
878 for node in node_instance:
879 for runninginstance in node_instance[node]:
880 if runninginstance not in instancelist:
881 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
882 (runninginstance, node))
886 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887 """Verify N+1 Memory Resilience.
889 Check that if one single node dies we can still start all the instances it
895 for node, nodeinfo in node_info.iteritems():
896 # This code checks that every node which is now listed as secondary has
897 # enough memory to host all instances it is supposed to should a single
898 # other node in the cluster fail.
899 # FIXME: not ready for failover to an arbitrary node
900 # FIXME: does not support file-backed instances
901 # WARNING: we currently take into account down instances as well as up
902 # ones, considering that even if they're down someone might want to start
903 # them even in the event of a node failure.
904 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
906 for instance in instances:
907 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908 if bep[constants.BE_AUTO_BALANCE]:
909 needed_mem += bep[constants.BE_MEMORY]
910 if nodeinfo['mfree'] < needed_mem:
911 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
912 " failovers should node %s fail" % (node, prinode))
916 def CheckPrereq(self):
917 """Check prerequisites.
919 Transform the list of checks we're going to skip into a set and check that
920 all its members are valid.
923 self.skip_set = frozenset(self.op.skip_checks)
924 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925 raise errors.OpPrereqError("Invalid checks to be skipped specified")
927 def BuildHooksEnv(self):
930 Cluster-Verify hooks just rone in the post phase and their failure makes
931 the output be logged in the verify output and the verification to fail.
934 all_nodes = self.cfg.GetNodeList()
936 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
938 for node in self.cfg.GetAllNodesInfo().values():
939 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
941 return env, [], all_nodes
943 def Exec(self, feedback_fn):
944 """Verify integrity of cluster, performing various test on nodes.
948 feedback_fn("* Verifying global settings")
949 for msg in self.cfg.VerifyConfig():
950 feedback_fn(" - ERROR: %s" % msg)
952 vg_name = self.cfg.GetVGName()
953 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954 nodelist = utils.NiceSort(self.cfg.GetNodeList())
955 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958 for iname in instancelist)
959 i_non_redundant = [] # Non redundant instances
960 i_non_a_balanced = [] # Non auto-balanced instances
961 n_offline = [] # List of offline nodes
962 n_drained = [] # List of nodes being drained
968 # FIXME: verify OS list
970 master_files = [constants.CLUSTER_CONF_FILE]
972 file_names = ssconf.SimpleStore().GetFileList()
973 file_names.append(constants.SSL_CERT_FILE)
974 file_names.append(constants.RAPI_CERT_FILE)
975 file_names.extend(master_files)
977 local_checksums = utils.FingerprintFiles(file_names)
979 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980 node_verify_param = {
981 constants.NV_FILELIST: file_names,
982 constants.NV_NODELIST: [node.name for node in nodeinfo
983 if not node.offline],
984 constants.NV_HYPERVISOR: hypervisors,
985 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986 node.secondary_ip) for node in nodeinfo
987 if not node.offline],
988 constants.NV_INSTANCELIST: hypervisors,
989 constants.NV_VERSION: None,
990 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
992 if vg_name is not None:
993 node_verify_param[constants.NV_VGLIST] = None
994 node_verify_param[constants.NV_LVLIST] = vg_name
995 node_verify_param[constants.NV_DRBDLIST] = None
996 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997 self.cfg.GetClusterName())
999 cluster = self.cfg.GetClusterInfo()
1000 master_node = self.cfg.GetMasterNode()
1001 all_drbd_map = self.cfg.ComputeDRBDMap()
1003 for node_i in nodeinfo:
1005 nresult = all_nvinfo[node].data
1008 feedback_fn("* Skipping offline node %s" % (node,))
1009 n_offline.append(node)
1012 if node == master_node:
1014 elif node_i.master_candidate:
1015 ntype = "master candidate"
1016 elif node_i.drained:
1018 n_drained.append(node)
1021 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1023 if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024 feedback_fn(" - ERROR: connection to %s failed" % (node,))
1029 for minor, instance in all_drbd_map[node].items():
1030 if instance not in instanceinfo:
1031 feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" %
1033 # ghost instance should not be running, but otherwise we
1034 # don't give double warnings (both ghost instance and
1035 # unallocated minor in use)
1036 node_drbd[minor] = (instance, False)
1038 instance = instanceinfo[instance]
1039 node_drbd[minor] = (instance.name, instance.admin_up)
1040 result = self._VerifyNode(node_i, file_names, local_checksums,
1041 nresult, feedback_fn, master_files,
1045 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1047 node_volume[node] = {}
1048 elif isinstance(lvdata, basestring):
1049 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
1050 (node, utils.SafeEncode(lvdata)))
1052 node_volume[node] = {}
1053 elif not isinstance(lvdata, dict):
1054 feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,))
1058 node_volume[node] = lvdata
1061 idata = nresult.get(constants.NV_INSTANCELIST, None)
1062 if not isinstance(idata, list):
1063 feedback_fn(" - ERROR: connection to %s failed (instancelist)" %
1068 node_instance[node] = idata
1071 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072 if not isinstance(nodeinfo, dict):
1073 feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,))
1079 "mfree": int(nodeinfo['memory_free']),
1082 # dictionary holding all instances this node is secondary for,
1083 # grouped by their primary node. Each key is a cluster node, and each
1084 # value is a list of instances which have the key as primary and the
1085 # current node as secondary. this is handy to calculate N+1 memory
1086 # availability if you can only failover from a primary to its
1088 "sinst-by-pnode": {},
1090 # FIXME: devise a free space model for file based instances as well
1091 if vg_name is not None:
1092 if (constants.NV_VGLIST not in nresult or
1093 vg_name not in nresult[constants.NV_VGLIST]):
1094 feedback_fn(" - ERROR: node %s didn't return data for the"
1095 " volume group '%s' - it is either missing or broken" %
1099 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100 except (ValueError, KeyError):
1101 feedback_fn(" - ERROR: invalid nodeinfo value returned"
1102 " from node %s" % (node,))
1106 node_vol_should = {}
1108 for instance in instancelist:
1109 feedback_fn("* Verifying instance %s" % instance)
1110 inst_config = instanceinfo[instance]
1111 result = self._VerifyInstance(instance, inst_config, node_volume,
1112 node_instance, feedback_fn, n_offline)
1114 inst_nodes_offline = []
1116 inst_config.MapLVsByNode(node_vol_should)
1118 instance_cfg[instance] = inst_config
1120 pnode = inst_config.primary_node
1121 if pnode in node_info:
1122 node_info[pnode]['pinst'].append(instance)
1123 elif pnode not in n_offline:
1124 feedback_fn(" - ERROR: instance %s, connection to primary node"
1125 " %s failed" % (instance, pnode))
1128 if pnode in n_offline:
1129 inst_nodes_offline.append(pnode)
1131 # If the instance is non-redundant we cannot survive losing its primary
1132 # node, so we are not N+1 compliant. On the other hand we have no disk
1133 # templates with more than one secondary so that situation is not well
1135 # FIXME: does not support file-backed instances
1136 if len(inst_config.secondary_nodes) == 0:
1137 i_non_redundant.append(instance)
1138 elif len(inst_config.secondary_nodes) > 1:
1139 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1142 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143 i_non_a_balanced.append(instance)
1145 for snode in inst_config.secondary_nodes:
1146 if snode in node_info:
1147 node_info[snode]['sinst'].append(instance)
1148 if pnode not in node_info[snode]['sinst-by-pnode']:
1149 node_info[snode]['sinst-by-pnode'][pnode] = []
1150 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151 elif snode not in n_offline:
1152 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1153 " %s failed" % (instance, snode))
1155 if snode in n_offline:
1156 inst_nodes_offline.append(snode)
1158 if inst_nodes_offline:
1159 # warn that the instance lives on offline nodes, and set bad=True
1160 feedback_fn(" - ERROR: instance lives on offline node(s) %s" %
1161 ", ".join(inst_nodes_offline))
1164 feedback_fn("* Verifying orphan volumes")
1165 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1169 feedback_fn("* Verifying remaining instances")
1170 result = self._VerifyOrphanInstances(instancelist, node_instance,
1174 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175 feedback_fn("* Verifying N+1 Memory redundancy")
1176 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1179 feedback_fn("* Other Notes")
1181 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1182 % len(i_non_redundant))
1184 if i_non_a_balanced:
1185 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1186 % len(i_non_a_balanced))
1189 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1192 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1196 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197 """Analize the post-hooks' result
1199 This method analyses the hook result, handles it, and sends some
1200 nicely-formatted feedback back to the user.
1202 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204 @param hooks_results: the results of the multi-node hooks rpc call
1205 @param feedback_fn: function used send feedback back to the caller
1206 @param lu_result: previous Exec result
1207 @return: the new Exec result, based on the previous result
1211 # We only really run POST phase hooks, and are only interested in
1213 if phase == constants.HOOKS_PHASE_POST:
1214 # Used to change hooks' output to proper indentation
1215 indent_re = re.compile('^', re.M)
1216 feedback_fn("* Hooks Results")
1217 if not hooks_results:
1218 feedback_fn(" - ERROR: general communication failure")
1221 for node_name in hooks_results:
1222 show_node_header = True
1223 res = hooks_results[node_name]
1224 if res.failed or res.data is False or not isinstance(res.data, list):
1226 # no need to warn or set fail return value
1228 feedback_fn(" Communication failure in hooks execution")
1231 for script, hkr, output in res.data:
1232 if hkr == constants.HKR_FAIL:
1233 # The node header is only shown once, if there are
1234 # failing hooks on that node
1235 if show_node_header:
1236 feedback_fn(" Node %s:" % node_name)
1237 show_node_header = False
1238 feedback_fn(" ERROR: Script %s failed, output:" % script)
1239 output = indent_re.sub(' ', output)
1240 feedback_fn("%s" % output)
1246 class LUVerifyDisks(NoHooksLU):
1247 """Verifies the cluster disks status.
1253 def ExpandNames(self):
1254 self.needed_locks = {
1255 locking.LEVEL_NODE: locking.ALL_SET,
1256 locking.LEVEL_INSTANCE: locking.ALL_SET,
1258 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1260 def CheckPrereq(self):
1261 """Check prerequisites.
1263 This has no prerequisites.
1268 def Exec(self, feedback_fn):
1269 """Verify integrity of cluster disks.
1272 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1274 vg_name = self.cfg.GetVGName()
1275 nodes = utils.NiceSort(self.cfg.GetNodeList())
1276 instances = [self.cfg.GetInstanceInfo(name)
1277 for name in self.cfg.GetInstanceList()]
1280 for inst in instances:
1282 if (not inst.admin_up or
1283 inst.disk_template not in constants.DTS_NET_MIRROR):
1285 inst.MapLVsByNode(inst_lvs)
1286 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287 for node, vol_list in inst_lvs.iteritems():
1288 for vol in vol_list:
1289 nv_dict[(node, vol)] = inst
1294 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1299 lvs = node_lvs[node]
1302 self.LogWarning("Connection to node %s failed: %s" %
1306 if isinstance(lvs, basestring):
1307 logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308 res_nlvm[node] = lvs
1310 elif not isinstance(lvs, dict):
1311 logging.warning("Connection to node %s failed or invalid data"
1313 res_nodes.append(node)
1316 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317 inst = nv_dict.pop((node, lv_name), None)
1318 if (not lv_online and inst is not None
1319 and inst.name not in res_instances):
1320 res_instances.append(inst.name)
1322 # any leftover items in nv_dict are missing LVs, let's arrange the
1324 for key, inst in nv_dict.iteritems():
1325 if inst.name not in res_missing:
1326 res_missing[inst.name] = []
1327 res_missing[inst.name].append(key)
1332 class LURenameCluster(LogicalUnit):
1333 """Rename the cluster.
1336 HPATH = "cluster-rename"
1337 HTYPE = constants.HTYPE_CLUSTER
1340 def BuildHooksEnv(self):
1345 "OP_TARGET": self.cfg.GetClusterName(),
1346 "NEW_NAME": self.op.name,
1348 mn = self.cfg.GetMasterNode()
1349 return env, [mn], [mn]
1351 def CheckPrereq(self):
1352 """Verify that the passed name is a valid one.
1355 hostname = utils.HostInfo(self.op.name)
1357 new_name = hostname.name
1358 self.ip = new_ip = hostname.ip
1359 old_name = self.cfg.GetClusterName()
1360 old_ip = self.cfg.GetMasterIP()
1361 if new_name == old_name and new_ip == old_ip:
1362 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1363 " cluster has changed")
1364 if new_ip != old_ip:
1365 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1366 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1367 " reachable on the network. Aborting." %
1370 self.op.name = new_name
1372 def Exec(self, feedback_fn):
1373 """Rename the cluster.
1376 clustername = self.op.name
1379 # shutdown the master IP
1380 master = self.cfg.GetMasterNode()
1381 result = self.rpc.call_node_stop_master(master, False)
1382 if result.failed or not result.data:
1383 raise errors.OpExecError("Could not disable the master role")
1386 cluster = self.cfg.GetClusterInfo()
1387 cluster.cluster_name = clustername
1388 cluster.master_ip = ip
1389 self.cfg.Update(cluster)
1391 # update the known hosts file
1392 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1393 node_list = self.cfg.GetNodeList()
1395 node_list.remove(master)
1398 result = self.rpc.call_upload_file(node_list,
1399 constants.SSH_KNOWN_HOSTS_FILE)
1400 for to_node, to_result in result.iteritems():
1401 if to_result.failed or not to_result.data:
1402 logging.error("Copy of file %s to node %s failed",
1403 constants.SSH_KNOWN_HOSTS_FILE, to_node)
1406 result = self.rpc.call_node_start_master(master, False)
1407 if result.failed or not result.data:
1408 self.LogWarning("Could not re-enable the master role on"
1409 " the master, please restart manually.")
1412 def _RecursiveCheckIfLVMBased(disk):
1413 """Check if the given disk or its children are lvm-based.
1415 @type disk: L{objects.Disk}
1416 @param disk: the disk to check
1418 @return: boolean indicating whether a LD_LV dev_type was found or not
1422 for chdisk in disk.children:
1423 if _RecursiveCheckIfLVMBased(chdisk):
1425 return disk.dev_type == constants.LD_LV
1428 class LUSetClusterParams(LogicalUnit):
1429 """Change the parameters of the cluster.
1432 HPATH = "cluster-modify"
1433 HTYPE = constants.HTYPE_CLUSTER
1437 def CheckArguments(self):
1441 if not hasattr(self.op, "candidate_pool_size"):
1442 self.op.candidate_pool_size = None
1443 if self.op.candidate_pool_size is not None:
1445 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1446 except (ValueError, TypeError), err:
1447 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1449 if self.op.candidate_pool_size < 1:
1450 raise errors.OpPrereqError("At least one master candidate needed")
1452 def ExpandNames(self):
1453 # FIXME: in the future maybe other cluster params won't require checking on
1454 # all nodes to be modified.
1455 self.needed_locks = {
1456 locking.LEVEL_NODE: locking.ALL_SET,
1458 self.share_locks[locking.LEVEL_NODE] = 1
1460 def BuildHooksEnv(self):
1465 "OP_TARGET": self.cfg.GetClusterName(),
1466 "NEW_VG_NAME": self.op.vg_name,
1468 mn = self.cfg.GetMasterNode()
1469 return env, [mn], [mn]
1471 def CheckPrereq(self):
1472 """Check prerequisites.
1474 This checks whether the given params don't conflict and
1475 if the given volume group is valid.
1478 if self.op.vg_name is not None and not self.op.vg_name:
1479 instances = self.cfg.GetAllInstancesInfo().values()
1480 for inst in instances:
1481 for disk in inst.disks:
1482 if _RecursiveCheckIfLVMBased(disk):
1483 raise errors.OpPrereqError("Cannot disable lvm storage while"
1484 " lvm-based instances exist")
1486 node_list = self.acquired_locks[locking.LEVEL_NODE]
1488 # if vg_name not None, checks given volume group on all nodes
1490 vglist = self.rpc.call_vg_list(node_list)
1491 for node in node_list:
1492 if vglist[node].failed:
1493 # ignoring down node
1494 self.LogWarning("Node %s unreachable/error, ignoring" % node)
1496 vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1498 constants.MIN_VG_SIZE)
1500 raise errors.OpPrereqError("Error on node '%s': %s" %
1503 self.cluster = cluster = self.cfg.GetClusterInfo()
1504 # validate beparams changes
1505 if self.op.beparams:
1506 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1507 self.new_beparams = cluster.FillDict(
1508 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1510 # hypervisor list/parameters
1511 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1512 if self.op.hvparams:
1513 if not isinstance(self.op.hvparams, dict):
1514 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1515 for hv_name, hv_dict in self.op.hvparams.items():
1516 if hv_name not in self.new_hvparams:
1517 self.new_hvparams[hv_name] = hv_dict
1519 self.new_hvparams[hv_name].update(hv_dict)
1521 if self.op.enabled_hypervisors is not None:
1522 self.hv_list = self.op.enabled_hypervisors
1524 self.hv_list = cluster.enabled_hypervisors
1526 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1527 # either the enabled list has changed, or the parameters have, validate
1528 for hv_name, hv_params in self.new_hvparams.items():
1529 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1530 (self.op.enabled_hypervisors and
1531 hv_name in self.op.enabled_hypervisors)):
1532 # either this is a new hypervisor, or its parameters have changed
1533 hv_class = hypervisor.GetHypervisor(hv_name)
1534 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1535 hv_class.CheckParameterSyntax(hv_params)
1536 _CheckHVParams(self, node_list, hv_name, hv_params)
1538 def Exec(self, feedback_fn):
1539 """Change the parameters of the cluster.
1542 if self.op.vg_name is not None:
1543 new_volume = self.op.vg_name
1546 if new_volume != self.cfg.GetVGName():
1547 self.cfg.SetVGName(new_volume)
1549 feedback_fn("Cluster LVM configuration already in desired"
1550 " state, not changing")
1551 if self.op.hvparams:
1552 self.cluster.hvparams = self.new_hvparams
1553 if self.op.enabled_hypervisors is not None:
1554 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1555 if self.op.beparams:
1556 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1557 if self.op.candidate_pool_size is not None:
1558 self.cluster.candidate_pool_size = self.op.candidate_pool_size
1559 # we need to update the pool size here, otherwise the save will fail
1560 _AdjustCandidatePool(self)
1562 self.cfg.Update(self.cluster)
1565 class LURedistributeConfig(NoHooksLU):
1566 """Force the redistribution of cluster configuration.
1568 This is a very simple LU.
1574 def ExpandNames(self):
1575 self.needed_locks = {
1576 locking.LEVEL_NODE: locking.ALL_SET,
1578 self.share_locks[locking.LEVEL_NODE] = 1
1580 def CheckPrereq(self):
1581 """Check prerequisites.
1585 def Exec(self, feedback_fn):
1586 """Redistribute the configuration.
1589 self.cfg.Update(self.cfg.GetClusterInfo())
1592 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1593 """Sleep and poll for an instance's disk to sync.
1596 if not instance.disks:
1600 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1602 node = instance.primary_node
1604 for dev in instance.disks:
1605 lu.cfg.SetDiskID(dev, node)
1608 degr_retries = 10 # in seconds, as we sleep 1 second each time
1612 cumul_degraded = False
1613 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1614 if rstats.failed or not rstats.data:
1615 lu.LogWarning("Can't get any data from node %s", node)
1618 raise errors.RemoteError("Can't contact node %s for mirror data,"
1619 " aborting." % node)
1622 rstats = rstats.data
1624 for i, mstat in enumerate(rstats):
1626 lu.LogWarning("Can't compute data for node %s/%s",
1627 node, instance.disks[i].iv_name)
1629 # we ignore the ldisk parameter
1630 perc_done, est_time, is_degraded, _ = mstat
1631 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1632 if perc_done is not None:
1634 if est_time is not None:
1635 rem_time = "%d estimated seconds remaining" % est_time
1638 rem_time = "no time estimate"
1639 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1640 (instance.disks[i].iv_name, perc_done, rem_time))
1642 # if we're done but degraded, let's do a few small retries, to
1643 # make sure we see a stable and not transient situation; therefore
1644 # we force restart of the loop
1645 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1646 logging.info("Degraded disks found, %d retries left", degr_retries)
1654 time.sleep(min(60, max_time))
1657 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1658 return not cumul_degraded
1661 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1662 """Check that mirrors are not degraded.
1664 The ldisk parameter, if True, will change the test from the
1665 is_degraded attribute (which represents overall non-ok status for
1666 the device(s)) to the ldisk (representing the local storage status).
1669 lu.cfg.SetDiskID(dev, node)
1676 if on_primary or dev.AssembleOnSecondary():
1677 rstats = lu.rpc.call_blockdev_find(node, dev)
1678 msg = rstats.RemoteFailMsg()
1680 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1682 elif not rstats.payload:
1683 lu.LogWarning("Can't find disk on node %s", node)
1686 result = result and (not rstats.payload[idx])
1688 for child in dev.children:
1689 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1694 class LUDiagnoseOS(NoHooksLU):
1695 """Logical unit for OS diagnose/query.
1698 _OP_REQP = ["output_fields", "names"]
1700 _FIELDS_STATIC = utils.FieldSet()
1701 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1703 def ExpandNames(self):
1705 raise errors.OpPrereqError("Selective OS query not supported")
1707 _CheckOutputFields(static=self._FIELDS_STATIC,
1708 dynamic=self._FIELDS_DYNAMIC,
1709 selected=self.op.output_fields)
1711 # Lock all nodes, in shared mode
1712 # Temporary removal of locks, should be reverted later
1713 # TODO: reintroduce locks when they are lighter-weight
1714 self.needed_locks = {}
1715 #self.share_locks[locking.LEVEL_NODE] = 1
1716 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1718 def CheckPrereq(self):
1719 """Check prerequisites.
1724 def _DiagnoseByOS(node_list, rlist):
1725 """Remaps a per-node return list into an a per-os per-node dictionary
1727 @param node_list: a list with the names of all nodes
1728 @param rlist: a map with node names as keys and OS objects as values
1731 @return: a dictionary with osnames as keys and as value another map, with
1732 nodes as keys and list of OS objects as values, eg::
1734 {"debian-etch": {"node1": [<object>,...],
1735 "node2": [<object>,]}
1740 # we build here the list of nodes that didn't fail the RPC (at RPC
1741 # level), so that nodes with a non-responding node daemon don't
1742 # make all OSes invalid
1743 good_nodes = [node_name for node_name in rlist
1744 if not rlist[node_name].failed]
1745 for node_name, nr in rlist.iteritems():
1746 if nr.failed or not nr.data:
1748 for os_obj in nr.data:
1749 if os_obj.name not in all_os:
1750 # build a list of nodes for this os containing empty lists
1751 # for each node in node_list
1752 all_os[os_obj.name] = {}
1753 for nname in good_nodes:
1754 all_os[os_obj.name][nname] = []
1755 all_os[os_obj.name][node_name].append(os_obj)
1758 def Exec(self, feedback_fn):
1759 """Compute the list of OSes.
1762 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1763 node_data = self.rpc.call_os_diagnose(valid_nodes)
1764 if node_data == False:
1765 raise errors.OpExecError("Can't gather the list of OSes")
1766 pol = self._DiagnoseByOS(valid_nodes, node_data)
1768 for os_name, os_data in pol.iteritems():
1770 for field in self.op.output_fields:
1773 elif field == "valid":
1774 val = utils.all([osl and osl[0] for osl in os_data.values()])
1775 elif field == "node_status":
1777 for node_name, nos_list in os_data.iteritems():
1778 val[node_name] = [(v.status, v.path) for v in nos_list]
1780 raise errors.ParameterError(field)
1787 class LURemoveNode(LogicalUnit):
1788 """Logical unit for removing a node.
1791 HPATH = "node-remove"
1792 HTYPE = constants.HTYPE_NODE
1793 _OP_REQP = ["node_name"]
1795 def BuildHooksEnv(self):
1798 This doesn't run on the target node in the pre phase as a failed
1799 node would then be impossible to remove.
1803 "OP_TARGET": self.op.node_name,
1804 "NODE_NAME": self.op.node_name,
1806 all_nodes = self.cfg.GetNodeList()
1807 all_nodes.remove(self.op.node_name)
1808 return env, all_nodes, all_nodes
1810 def CheckPrereq(self):
1811 """Check prerequisites.
1814 - the node exists in the configuration
1815 - it does not have primary or secondary instances
1816 - it's not the master
1818 Any errors are signalled by raising errors.OpPrereqError.
1821 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1823 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1825 instance_list = self.cfg.GetInstanceList()
1827 masternode = self.cfg.GetMasterNode()
1828 if node.name == masternode:
1829 raise errors.OpPrereqError("Node is the master node,"
1830 " you need to failover first.")
1832 for instance_name in instance_list:
1833 instance = self.cfg.GetInstanceInfo(instance_name)
1834 if node.name in instance.all_nodes:
1835 raise errors.OpPrereqError("Instance %s is still running on the node,"
1836 " please remove first." % instance_name)
1837 self.op.node_name = node.name
1840 def Exec(self, feedback_fn):
1841 """Removes the node from the cluster.
1845 logging.info("Stopping the node daemon and removing configs from node %s",
1848 self.context.RemoveNode(node.name)
1850 self.rpc.call_node_leave_cluster(node.name)
1852 # Promote nodes to master candidate as needed
1853 _AdjustCandidatePool(self)
1856 class LUQueryNodes(NoHooksLU):
1857 """Logical unit for querying nodes.
1860 _OP_REQP = ["output_fields", "names", "use_locking"]
1862 _FIELDS_DYNAMIC = utils.FieldSet(
1864 "mtotal", "mnode", "mfree",
1866 "ctotal", "cnodes", "csockets",
1869 _FIELDS_STATIC = utils.FieldSet(
1870 "name", "pinst_cnt", "sinst_cnt",
1871 "pinst_list", "sinst_list",
1872 "pip", "sip", "tags",
1881 def ExpandNames(self):
1882 _CheckOutputFields(static=self._FIELDS_STATIC,
1883 dynamic=self._FIELDS_DYNAMIC,
1884 selected=self.op.output_fields)
1886 self.needed_locks = {}
1887 self.share_locks[locking.LEVEL_NODE] = 1
1890 self.wanted = _GetWantedNodes(self, self.op.names)
1892 self.wanted = locking.ALL_SET
1894 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1895 self.do_locking = self.do_node_query and self.op.use_locking
1897 # if we don't request only static fields, we need to lock the nodes
1898 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1901 def CheckPrereq(self):
1902 """Check prerequisites.
1905 # The validation of the node list is done in the _GetWantedNodes,
1906 # if non empty, and if empty, there's no validation to do
1909 def Exec(self, feedback_fn):
1910 """Computes the list of nodes and their attributes.
1913 all_info = self.cfg.GetAllNodesInfo()
1915 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1916 elif self.wanted != locking.ALL_SET:
1917 nodenames = self.wanted
1918 missing = set(nodenames).difference(all_info.keys())
1920 raise errors.OpExecError(
1921 "Some nodes were removed before retrieving their data: %s" % missing)
1923 nodenames = all_info.keys()
1925 nodenames = utils.NiceSort(nodenames)
1926 nodelist = [all_info[name] for name in nodenames]
1928 # begin data gathering
1930 if self.do_node_query:
1932 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1933 self.cfg.GetHypervisorType())
1934 for name in nodenames:
1935 nodeinfo = node_data[name]
1936 if not nodeinfo.failed and nodeinfo.data:
1937 nodeinfo = nodeinfo.data
1938 fn = utils.TryConvert
1940 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1941 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1942 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1943 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1944 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1945 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1946 "bootid": nodeinfo.get('bootid', None),
1947 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1948 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1951 live_data[name] = {}
1953 live_data = dict.fromkeys(nodenames, {})
1955 node_to_primary = dict([(name, set()) for name in nodenames])
1956 node_to_secondary = dict([(name, set()) for name in nodenames])
1958 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1959 "sinst_cnt", "sinst_list"))
1960 if inst_fields & frozenset(self.op.output_fields):
1961 instancelist = self.cfg.GetInstanceList()
1963 for instance_name in instancelist:
1964 inst = self.cfg.GetInstanceInfo(instance_name)
1965 if inst.primary_node in node_to_primary:
1966 node_to_primary[inst.primary_node].add(inst.name)
1967 for secnode in inst.secondary_nodes:
1968 if secnode in node_to_secondary:
1969 node_to_secondary[secnode].add(inst.name)
1971 master_node = self.cfg.GetMasterNode()
1973 # end data gathering
1976 for node in nodelist:
1978 for field in self.op.output_fields:
1981 elif field == "pinst_list":
1982 val = list(node_to_primary[node.name])
1983 elif field == "sinst_list":
1984 val = list(node_to_secondary[node.name])
1985 elif field == "pinst_cnt":
1986 val = len(node_to_primary[node.name])
1987 elif field == "sinst_cnt":
1988 val = len(node_to_secondary[node.name])
1989 elif field == "pip":
1990 val = node.primary_ip
1991 elif field == "sip":
1992 val = node.secondary_ip
1993 elif field == "tags":
1994 val = list(node.GetTags())
1995 elif field == "serial_no":
1996 val = node.serial_no
1997 elif field == "master_candidate":
1998 val = node.master_candidate
1999 elif field == "master":
2000 val = node.name == master_node
2001 elif field == "offline":
2003 elif field == "drained":
2005 elif self._FIELDS_DYNAMIC.Matches(field):
2006 val = live_data[node.name].get(field, None)
2007 elif field == "role":
2008 if node.name == master_node:
2010 elif node.master_candidate:
2019 raise errors.ParameterError(field)
2020 node_output.append(val)
2021 output.append(node_output)
2026 class LUQueryNodeVolumes(NoHooksLU):
2027 """Logical unit for getting volumes on node(s).
2030 _OP_REQP = ["nodes", "output_fields"]
2032 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2033 _FIELDS_STATIC = utils.FieldSet("node")
2035 def ExpandNames(self):
2036 _CheckOutputFields(static=self._FIELDS_STATIC,
2037 dynamic=self._FIELDS_DYNAMIC,
2038 selected=self.op.output_fields)
2040 self.needed_locks = {}
2041 self.share_locks[locking.LEVEL_NODE] = 1
2042 if not self.op.nodes:
2043 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2045 self.needed_locks[locking.LEVEL_NODE] = \
2046 _GetWantedNodes(self, self.op.nodes)
2048 def CheckPrereq(self):
2049 """Check prerequisites.
2051 This checks that the fields required are valid output fields.
2054 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2056 def Exec(self, feedback_fn):
2057 """Computes the list of nodes and their attributes.
2060 nodenames = self.nodes
2061 volumes = self.rpc.call_node_volumes(nodenames)
2063 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2064 in self.cfg.GetInstanceList()]
2066 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2069 for node in nodenames:
2070 if node not in volumes or volumes[node].failed or not volumes[node].data:
2073 node_vols = volumes[node].data[:]
2074 node_vols.sort(key=lambda vol: vol['dev'])
2076 for vol in node_vols:
2078 for field in self.op.output_fields:
2081 elif field == "phys":
2085 elif field == "name":
2087 elif field == "size":
2088 val = int(float(vol['size']))
2089 elif field == "instance":
2091 if node not in lv_by_node[inst]:
2093 if vol['name'] in lv_by_node[inst][node]:
2099 raise errors.ParameterError(field)
2100 node_output.append(str(val))
2102 output.append(node_output)
2107 class LUAddNode(LogicalUnit):
2108 """Logical unit for adding node to the cluster.
2112 HTYPE = constants.HTYPE_NODE
2113 _OP_REQP = ["node_name"]
2115 def BuildHooksEnv(self):
2118 This will run on all nodes before, and on all nodes + the new node after.
2122 "OP_TARGET": self.op.node_name,
2123 "NODE_NAME": self.op.node_name,
2124 "NODE_PIP": self.op.primary_ip,
2125 "NODE_SIP": self.op.secondary_ip,
2127 nodes_0 = self.cfg.GetNodeList()
2128 nodes_1 = nodes_0 + [self.op.node_name, ]
2129 return env, nodes_0, nodes_1
2131 def CheckPrereq(self):
2132 """Check prerequisites.
2135 - the new node is not already in the config
2137 - its parameters (single/dual homed) matches the cluster
2139 Any errors are signalled by raising errors.OpPrereqError.
2142 node_name = self.op.node_name
2145 dns_data = utils.HostInfo(node_name)
2147 node = dns_data.name
2148 primary_ip = self.op.primary_ip = dns_data.ip
2149 secondary_ip = getattr(self.op, "secondary_ip", None)
2150 if secondary_ip is None:
2151 secondary_ip = primary_ip
2152 if not utils.IsValidIP(secondary_ip):
2153 raise errors.OpPrereqError("Invalid secondary IP given")
2154 self.op.secondary_ip = secondary_ip
2156 node_list = cfg.GetNodeList()
2157 if not self.op.readd and node in node_list:
2158 raise errors.OpPrereqError("Node %s is already in the configuration" %
2160 elif self.op.readd and node not in node_list:
2161 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2163 for existing_node_name in node_list:
2164 existing_node = cfg.GetNodeInfo(existing_node_name)
2166 if self.op.readd and node == existing_node_name:
2167 if (existing_node.primary_ip != primary_ip or
2168 existing_node.secondary_ip != secondary_ip):
2169 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2170 " address configuration as before")
2173 if (existing_node.primary_ip == primary_ip or
2174 existing_node.secondary_ip == primary_ip or
2175 existing_node.primary_ip == secondary_ip or
2176 existing_node.secondary_ip == secondary_ip):
2177 raise errors.OpPrereqError("New node ip address(es) conflict with"
2178 " existing node %s" % existing_node.name)
2180 # check that the type of the node (single versus dual homed) is the
2181 # same as for the master
2182 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2183 master_singlehomed = myself.secondary_ip == myself.primary_ip
2184 newbie_singlehomed = secondary_ip == primary_ip
2185 if master_singlehomed != newbie_singlehomed:
2186 if master_singlehomed:
2187 raise errors.OpPrereqError("The master has no private ip but the"
2188 " new node has one")
2190 raise errors.OpPrereqError("The master has a private ip but the"
2191 " new node doesn't have one")
2193 # checks reachablity
2194 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2195 raise errors.OpPrereqError("Node not reachable by ping")
2197 if not newbie_singlehomed:
2198 # check reachability from my secondary ip to newbie's secondary ip
2199 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2200 source=myself.secondary_ip):
2201 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2202 " based ping to noded port")
2204 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2209 mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2210 # the new node will increase mc_max with one, so:
2211 mc_max = min(mc_max + 1, cp_size)
2212 self.master_candidate = mc_now < mc_max
2215 self.new_node = self.cfg.GetNodeInfo(node)
2216 assert self.new_node is not None, "Can't retrieve locked node %s" % node
2218 self.new_node = objects.Node(name=node,
2219 primary_ip=primary_ip,
2220 secondary_ip=secondary_ip,
2221 master_candidate=self.master_candidate,
2222 offline=False, drained=False)
2224 def Exec(self, feedback_fn):
2225 """Adds the new node to the cluster.
2228 new_node = self.new_node
2229 node = new_node.name
2231 # for re-adds, reset the offline/drained/master-candidate flags;
2232 # we need to reset here, otherwise offline would prevent RPC calls
2233 # later in the procedure; this also means that if the re-add
2234 # fails, we are left with a non-offlined, broken node
2236 new_node.drained = new_node.offline = False
2237 self.LogInfo("Readding a node, the offline/drained flags were reset")
2238 # if we demote the node, we do cleanup later in the procedure
2239 new_node.master_candidate = self.master_candidate
2241 # notify the user about any possible mc promotion
2242 if new_node.master_candidate:
2243 self.LogInfo("Node will be a master candidate")
2245 # check connectivity
2246 result = self.rpc.call_version([node])[node]
2249 if constants.PROTOCOL_VERSION == result.data:
2250 logging.info("Communication to node %s fine, sw version %s match",
2253 raise errors.OpExecError("Version mismatch master version %s,"
2254 " node version %s" %
2255 (constants.PROTOCOL_VERSION, result.data))
2257 raise errors.OpExecError("Cannot get version from the new node")
2260 logging.info("Copy ssh key to node %s", node)
2261 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2263 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2264 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2270 keyarray.append(f.read())
2274 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2276 keyarray[3], keyarray[4], keyarray[5])
2278 msg = result.RemoteFailMsg()
2280 raise errors.OpExecError("Cannot transfer ssh keys to the"
2281 " new node: %s" % msg)
2283 # Add node to our /etc/hosts, and add key to known_hosts
2284 utils.AddHostToEtcHosts(new_node.name)
2286 if new_node.secondary_ip != new_node.primary_ip:
2287 result = self.rpc.call_node_has_ip_address(new_node.name,
2288 new_node.secondary_ip)
2289 if result.failed or not result.data:
2290 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2291 " you gave (%s). Please fix and re-run this"
2292 " command." % new_node.secondary_ip)
2294 node_verify_list = [self.cfg.GetMasterNode()]
2295 node_verify_param = {
2297 # TODO: do a node-net-test as well?
2300 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2301 self.cfg.GetClusterName())
2302 for verifier in node_verify_list:
2303 if result[verifier].failed or not result[verifier].data:
2304 raise errors.OpExecError("Cannot communicate with %s's node daemon"
2305 " for remote verification" % verifier)
2306 if result[verifier].data['nodelist']:
2307 for failed in result[verifier].data['nodelist']:
2308 feedback_fn("ssh/hostname verification failed %s -> %s" %
2309 (verifier, result[verifier].data['nodelist'][failed]))
2310 raise errors.OpExecError("ssh/hostname verification failed.")
2312 # Distribute updated /etc/hosts and known_hosts to all nodes,
2313 # including the node just added
2314 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2315 dist_nodes = self.cfg.GetNodeList()
2316 if not self.op.readd:
2317 dist_nodes.append(node)
2318 if myself.name in dist_nodes:
2319 dist_nodes.remove(myself.name)
2321 logging.debug("Copying hosts and known_hosts to all nodes")
2322 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2323 result = self.rpc.call_upload_file(dist_nodes, fname)
2324 for to_node, to_result in result.iteritems():
2325 if to_result.failed or not to_result.data:
2326 logging.error("Copy of file %s to node %s failed", fname, to_node)
2329 enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2330 if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2331 to_copy.append(constants.VNC_PASSWORD_FILE)
2333 for fname in to_copy:
2334 result = self.rpc.call_upload_file([node], fname)
2335 if result[node].failed or not result[node]:
2336 logging.error("Could not copy file %s to node %s", fname, node)
2339 self.context.ReaddNode(new_node)
2340 # make sure we redistribute the config
2341 self.cfg.Update(new_node)
2342 # and make sure the new node will not have old files around
2343 if not new_node.master_candidate:
2344 result = self.rpc.call_node_demote_from_mc(new_node.name)
2345 msg = result.RemoteFailMsg()
2347 self.LogWarning("Node failed to demote itself from master"
2348 " candidate status: %s" % msg)
2350 self.context.AddNode(new_node)
2353 class LUSetNodeParams(LogicalUnit):
2354 """Modifies the parameters of a node.
2357 HPATH = "node-modify"
2358 HTYPE = constants.HTYPE_NODE
2359 _OP_REQP = ["node_name"]
2362 def CheckArguments(self):
2363 node_name = self.cfg.ExpandNodeName(self.op.node_name)
2364 if node_name is None:
2365 raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2366 self.op.node_name = node_name
2367 _CheckBooleanOpField(self.op, 'master_candidate')
2368 _CheckBooleanOpField(self.op, 'offline')
2369 _CheckBooleanOpField(self.op, 'drained')
2370 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2371 if all_mods.count(None) == 3:
2372 raise errors.OpPrereqError("Please pass at least one modification")
2373 if all_mods.count(True) > 1:
2374 raise errors.OpPrereqError("Can't set the node into more than one"
2375 " state at the same time")
2377 def ExpandNames(self):
2378 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2380 def BuildHooksEnv(self):
2383 This runs on the master node.
2387 "OP_TARGET": self.op.node_name,
2388 "MASTER_CANDIDATE": str(self.op.master_candidate),
2389 "OFFLINE": str(self.op.offline),
2390 "DRAINED": str(self.op.drained),
2392 nl = [self.cfg.GetMasterNode(),
2396 def CheckPrereq(self):
2397 """Check prerequisites.
2399 This only checks the instance list against the existing names.
2402 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2404 if ((self.op.master_candidate == False or self.op.offline == True or
2405 self.op.drained == True) and node.master_candidate):
2406 # we will demote the node from master_candidate
2407 if self.op.node_name == self.cfg.GetMasterNode():
2408 raise errors.OpPrereqError("The master node has to be a"
2409 " master candidate, online and not drained")
2410 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2411 num_candidates, _ = self.cfg.GetMasterCandidateStats()
2412 if num_candidates <= cp_size:
2413 msg = ("Not enough master candidates (desired"
2414 " %d, new value will be %d)" % (cp_size, num_candidates-1))
2416 self.LogWarning(msg)
2418 raise errors.OpPrereqError(msg)
2420 if (self.op.master_candidate == True and
2421 ((node.offline and not self.op.offline == False) or
2422 (node.drained and not self.op.drained == False))):
2423 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2424 " to master_candidate" % node.name)
2428 def Exec(self, feedback_fn):
2437 if self.op.offline is not None:
2438 node.offline = self.op.offline
2439 result.append(("offline", str(self.op.offline)))
2440 if self.op.offline == True:
2441 if node.master_candidate:
2442 node.master_candidate = False
2444 result.append(("master_candidate", "auto-demotion due to offline"))
2446 node.drained = False
2447 result.append(("drained", "clear drained status due to offline"))
2449 if self.op.master_candidate is not None:
2450 node.master_candidate = self.op.master_candidate
2452 result.append(("master_candidate", str(self.op.master_candidate)))
2453 if self.op.master_candidate == False:
2454 rrc = self.rpc.call_node_demote_from_mc(node.name)
2455 msg = rrc.RemoteFailMsg()
2457 self.LogWarning("Node failed to demote itself: %s" % msg)
2459 if self.op.drained is not None:
2460 node.drained = self.op.drained
2461 result.append(("drained", str(self.op.drained)))
2462 if self.op.drained == True:
2463 if node.master_candidate:
2464 node.master_candidate = False
2466 result.append(("master_candidate", "auto-demotion due to drain"))
2468 node.offline = False
2469 result.append(("offline", "clear offline status due to drain"))
2471 # this will trigger configuration file update, if needed
2472 self.cfg.Update(node)
2473 # this will trigger job queue propagation or cleanup
2475 self.context.ReaddNode(node)
2480 class LUQueryClusterInfo(NoHooksLU):
2481 """Query cluster configuration.
2487 def ExpandNames(self):
2488 self.needed_locks = {}
2490 def CheckPrereq(self):
2491 """No prerequsites needed for this LU.
2496 def Exec(self, feedback_fn):
2497 """Return cluster config.
2500 cluster = self.cfg.GetClusterInfo()
2502 "software_version": constants.RELEASE_VERSION,
2503 "protocol_version": constants.PROTOCOL_VERSION,
2504 "config_version": constants.CONFIG_VERSION,
2505 "os_api_version": constants.OS_API_VERSION,
2506 "export_version": constants.EXPORT_VERSION,
2507 "architecture": (platform.architecture()[0], platform.machine()),
2508 "name": cluster.cluster_name,
2509 "master": cluster.master_node,
2510 "default_hypervisor": cluster.default_hypervisor,
2511 "enabled_hypervisors": cluster.enabled_hypervisors,
2512 "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2513 for hypervisor in cluster.enabled_hypervisors]),
2514 "beparams": cluster.beparams,
2515 "candidate_pool_size": cluster.candidate_pool_size,
2516 "default_bridge": cluster.default_bridge,
2517 "master_netdev": cluster.master_netdev,
2518 "volume_group_name": cluster.volume_group_name,
2519 "file_storage_dir": cluster.file_storage_dir,
2525 class LUQueryConfigValues(NoHooksLU):
2526 """Return configuration values.
2531 _FIELDS_DYNAMIC = utils.FieldSet()
2532 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2534 def ExpandNames(self):
2535 self.needed_locks = {}
2537 _CheckOutputFields(static=self._FIELDS_STATIC,
2538 dynamic=self._FIELDS_DYNAMIC,
2539 selected=self.op.output_fields)
2541 def CheckPrereq(self):
2542 """No prerequisites.
2547 def Exec(self, feedback_fn):
2548 """Dump a representation of the cluster config to the standard output.
2552 for field in self.op.output_fields:
2553 if field == "cluster_name":
2554 entry = self.cfg.GetClusterName()
2555 elif field == "master_node":
2556 entry = self.cfg.GetMasterNode()
2557 elif field == "drain_flag":
2558 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2560 raise errors.ParameterError(field)
2561 values.append(entry)
2565 class LUActivateInstanceDisks(NoHooksLU):
2566 """Bring up an instance's disks.
2569 _OP_REQP = ["instance_name"]
2572 def ExpandNames(self):
2573 self._ExpandAndLockInstance()
2574 self.needed_locks[locking.LEVEL_NODE] = []
2575 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2577 def DeclareLocks(self, level):
2578 if level == locking.LEVEL_NODE:
2579 self._LockInstancesNodes()
2581 def CheckPrereq(self):
2582 """Check prerequisites.
2584 This checks that the instance is in the cluster.
2587 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2588 assert self.instance is not None, \
2589 "Cannot retrieve locked instance %s" % self.op.instance_name
2590 _CheckNodeOnline(self, self.instance.primary_node)
2592 def Exec(self, feedback_fn):
2593 """Activate the disks.
2596 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2598 raise errors.OpExecError("Cannot activate block devices")
2603 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2604 """Prepare the block devices for an instance.
2606 This sets up the block devices on all nodes.
2608 @type lu: L{LogicalUnit}
2609 @param lu: the logical unit on whose behalf we execute
2610 @type instance: L{objects.Instance}
2611 @param instance: the instance for whose disks we assemble
2612 @type ignore_secondaries: boolean
2613 @param ignore_secondaries: if true, errors on secondary nodes
2614 won't result in an error return from the function
2615 @return: False if the operation failed, otherwise a list of
2616 (host, instance_visible_name, node_visible_name)
2617 with the mapping from node devices to instance devices
2622 iname = instance.name
2623 # With the two passes mechanism we try to reduce the window of
2624 # opportunity for the race condition of switching DRBD to primary
2625 # before handshaking occured, but we do not eliminate it
2627 # The proper fix would be to wait (with some limits) until the
2628 # connection has been made and drbd transitions from WFConnection
2629 # into any other network-connected state (Connected, SyncTarget,
2632 # 1st pass, assemble on all nodes in secondary mode
2633 for inst_disk in instance.disks:
2634 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2635 lu.cfg.SetDiskID(node_disk, node)
2636 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2637 msg = result.RemoteFailMsg()
2639 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2640 " (is_primary=False, pass=1): %s",
2641 inst_disk.iv_name, node, msg)
2642 if not ignore_secondaries:
2645 # FIXME: race condition on drbd migration to primary
2647 # 2nd pass, do only the primary node
2648 for inst_disk in instance.disks:
2649 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2650 if node != instance.primary_node:
2652 lu.cfg.SetDiskID(node_disk, node)
2653 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2654 msg = result.RemoteFailMsg()
2656 lu.proc.LogWarning("Could not prepare block device %s on node %s"
2657 " (is_primary=True, pass=2): %s",
2658 inst_disk.iv_name, node, msg)
2660 device_info.append((instance.primary_node, inst_disk.iv_name,
2663 # leave the disks configured for the primary node
2664 # this is a workaround that would be fixed better by
2665 # improving the logical/physical id handling
2666 for disk in instance.disks:
2667 lu.cfg.SetDiskID(disk, instance.primary_node)
2669 return disks_ok, device_info
2672 def _StartInstanceDisks(lu, instance, force):
2673 """Start the disks of an instance.
2676 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2677 ignore_secondaries=force)
2679 _ShutdownInstanceDisks(lu, instance)
2680 if force is not None and not force:
2681 lu.proc.LogWarning("", hint="If the message above refers to a"
2683 " you can retry the operation using '--force'.")
2684 raise errors.OpExecError("Disk consistency error")
2687 class LUDeactivateInstanceDisks(NoHooksLU):
2688 """Shutdown an instance's disks.
2691 _OP_REQP = ["instance_name"]
2694 def ExpandNames(self):
2695 self._ExpandAndLockInstance()
2696 self.needed_locks[locking.LEVEL_NODE] = []
2697 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2699 def DeclareLocks(self, level):
2700 if level == locking.LEVEL_NODE:
2701 self._LockInstancesNodes()
2703 def CheckPrereq(self):
2704 """Check prerequisites.
2706 This checks that the instance is in the cluster.
2709 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2710 assert self.instance is not None, \
2711 "Cannot retrieve locked instance %s" % self.op.instance_name
2713 def Exec(self, feedback_fn):
2714 """Deactivate the disks
2717 instance = self.instance
2718 _SafeShutdownInstanceDisks(self, instance)
2721 def _SafeShutdownInstanceDisks(lu, instance):
2722 """Shutdown block devices of an instance.
2724 This function checks if an instance is running, before calling
2725 _ShutdownInstanceDisks.
2728 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2729 [instance.hypervisor])
2730 ins_l = ins_l[instance.primary_node]
2731 if ins_l.failed or not isinstance(ins_l.data, list):
2732 raise errors.OpExecError("Can't contact node '%s'" %
2733 instance.primary_node)
2735 if instance.name in ins_l.data:
2736 raise errors.OpExecError("Instance is running, can't shutdown"
2739 _ShutdownInstanceDisks(lu, instance)
2742 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2743 """Shutdown block devices of an instance.
2745 This does the shutdown on all nodes of the instance.
2747 If the ignore_primary is false, errors on the primary node are
2752 for disk in instance.disks:
2753 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2754 lu.cfg.SetDiskID(top_disk, node)
2755 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2756 msg = result.RemoteFailMsg()
2758 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2759 disk.iv_name, node, msg)
2760 if not ignore_primary or node != instance.primary_node:
2765 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2766 """Checks if a node has enough free memory.
2768 This function check if a given node has the needed amount of free
2769 memory. In case the node has less memory or we cannot get the
2770 information from the node, this function raise an OpPrereqError
2773 @type lu: C{LogicalUnit}
2774 @param lu: a logical unit from which we get configuration data
2776 @param node: the node to check
2777 @type reason: C{str}
2778 @param reason: string to use in the error message
2779 @type requested: C{int}
2780 @param requested: the amount of memory in MiB to check for
2781 @type hypervisor_name: C{str}
2782 @param hypervisor_name: the hypervisor to ask for memory stats
2783 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2784 we cannot check the node
2787 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2788 nodeinfo[node].Raise()
2789 free_mem = nodeinfo[node].data.get('memory_free')
2790 if not isinstance(free_mem, int):
2791 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2792 " was '%s'" % (node, free_mem))
2793 if requested > free_mem:
2794 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2795 " needed %s MiB, available %s MiB" %
2796 (node, reason, requested, free_mem))
2799 class LUStartupInstance(LogicalUnit):
2800 """Starts an instance.
2803 HPATH = "instance-start"
2804 HTYPE = constants.HTYPE_INSTANCE
2805 _OP_REQP = ["instance_name", "force"]
2808 def ExpandNames(self):
2809 self._ExpandAndLockInstance()
2811 def BuildHooksEnv(self):
2814 This runs on master, primary and secondary nodes of the instance.
2818 "FORCE": self.op.force,
2820 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2821 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2824 def CheckPrereq(self):
2825 """Check prerequisites.
2827 This checks that the instance is in the cluster.
2830 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2831 assert self.instance is not None, \
2832 "Cannot retrieve locked instance %s" % self.op.instance_name
2835 self.beparams = getattr(self.op, "beparams", {})
2837 if not isinstance(self.beparams, dict):
2838 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2839 " dict" % (type(self.beparams), ))
2840 # fill the beparams dict
2841 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2842 self.op.beparams = self.beparams
2845 self.hvparams = getattr(self.op, "hvparams", {})
2847 if not isinstance(self.hvparams, dict):
2848 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2849 " dict" % (type(self.hvparams), ))
2851 # check hypervisor parameter syntax (locally)
2852 cluster = self.cfg.GetClusterInfo()
2853 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2854 filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2856 filled_hvp.update(self.hvparams)
2857 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2858 hv_type.CheckParameterSyntax(filled_hvp)
2859 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2860 self.op.hvparams = self.hvparams
2862 _CheckNodeOnline(self, instance.primary_node)
2864 bep = self.cfg.GetClusterInfo().FillBE(instance)
2865 # check bridges existance
2866 _CheckInstanceBridgesExist(self, instance)
2868 remote_info = self.rpc.call_instance_info(instance.primary_node,
2870 instance.hypervisor)
2872 if not remote_info.data:
2873 _CheckNodeFreeMemory(self, instance.primary_node,
2874 "starting instance %s" % instance.name,
2875 bep[constants.BE_MEMORY], instance.hypervisor)
2877 def Exec(self, feedback_fn):
2878 """Start the instance.
2881 instance = self.instance
2882 force = self.op.force
2884 self.cfg.MarkInstanceUp(instance.name)
2886 node_current = instance.primary_node
2888 _StartInstanceDisks(self, instance, force)
2890 result = self.rpc.call_instance_start(node_current, instance,
2891 self.hvparams, self.beparams)
2892 msg = result.RemoteFailMsg()
2894 _ShutdownInstanceDisks(self, instance)
2895 raise errors.OpExecError("Could not start instance: %s" % msg)
2898 class LURebootInstance(LogicalUnit):
2899 """Reboot an instance.
2902 HPATH = "instance-reboot"
2903 HTYPE = constants.HTYPE_INSTANCE
2904 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2907 def ExpandNames(self):
2908 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2909 constants.INSTANCE_REBOOT_HARD,
2910 constants.INSTANCE_REBOOT_FULL]:
2911 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2912 (constants.INSTANCE_REBOOT_SOFT,
2913 constants.INSTANCE_REBOOT_HARD,
2914 constants.INSTANCE_REBOOT_FULL))
2915 self._ExpandAndLockInstance()
2917 def BuildHooksEnv(self):
2920 This runs on master, primary and secondary nodes of the instance.
2924 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2925 "REBOOT_TYPE": self.op.reboot_type,
2927 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2928 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2931 def CheckPrereq(self):
2932 """Check prerequisites.
2934 This checks that the instance is in the cluster.
2937 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2938 assert self.instance is not None, \
2939 "Cannot retrieve locked instance %s" % self.op.instance_name
2941 _CheckNodeOnline(self, instance.primary_node)
2943 # check bridges existance
2944 _CheckInstanceBridgesExist(self, instance)
2946 def Exec(self, feedback_fn):
2947 """Reboot the instance.
2950 instance = self.instance
2951 ignore_secondaries = self.op.ignore_secondaries
2952 reboot_type = self.op.reboot_type
2954 node_current = instance.primary_node
2956 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2957 constants.INSTANCE_REBOOT_HARD]:
2958 for disk in instance.disks:
2959 self.cfg.SetDiskID(disk, node_current)
2960 result = self.rpc.call_instance_reboot(node_current, instance,
2962 msg = result.RemoteFailMsg()
2964 raise errors.OpExecError("Could not reboot instance: %s" % msg)
2966 result = self.rpc.call_instance_shutdown(node_current, instance)
2967 msg = result.RemoteFailMsg()
2969 raise errors.OpExecError("Could not shutdown instance for"
2970 " full reboot: %s" % msg)
2971 _ShutdownInstanceDisks(self, instance)
2972 _StartInstanceDisks(self, instance, ignore_secondaries)
2973 result = self.rpc.call_instance_start(node_current, instance, None, None)
2974 msg = result.RemoteFailMsg()
2976 _ShutdownInstanceDisks(self, instance)
2977 raise errors.OpExecError("Could not start instance for"
2978 " full reboot: %s" % msg)
2980 self.cfg.MarkInstanceUp(instance.name)
2983 class LUShutdownInstance(LogicalUnit):
2984 """Shutdown an instance.
2987 HPATH = "instance-stop"
2988 HTYPE = constants.HTYPE_INSTANCE
2989 _OP_REQP = ["instance_name"]
2992 def ExpandNames(self):
2993 self._ExpandAndLockInstance()
2995 def BuildHooksEnv(self):
2998 This runs on master, primary and secondary nodes of the instance.
3001 env = _BuildInstanceHookEnvByObject(self, self.instance)
3002 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3005 def CheckPrereq(self):
3006 """Check prerequisites.
3008 This checks that the instance is in the cluster.
3011 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3012 assert self.instance is not None, \
3013 "Cannot retrieve locked instance %s" % self.op.instance_name
3014 _CheckNodeOnline(self, self.instance.primary_node)
3016 def Exec(self, feedback_fn):
3017 """Shutdown the instance.
3020 instance = self.instance
3021 node_current = instance.primary_node
3022 self.cfg.MarkInstanceDown(instance.name)
3023 result = self.rpc.call_instance_shutdown(node_current, instance)
3024 msg = result.RemoteFailMsg()
3026 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3028 _ShutdownInstanceDisks(self, instance)
3031 class LUReinstallInstance(LogicalUnit):
3032 """Reinstall an instance.
3035 HPATH = "instance-reinstall"
3036 HTYPE = constants.HTYPE_INSTANCE
3037 _OP_REQP = ["instance_name"]
3040 def ExpandNames(self):
3041 self._ExpandAndLockInstance()
3043 def BuildHooksEnv(self):
3046 This runs on master, primary and secondary nodes of the instance.
3049 env = _BuildInstanceHookEnvByObject(self, self.instance)
3050 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3053 def CheckPrereq(self):
3054 """Check prerequisites.
3056 This checks that the instance is in the cluster and is not running.
3059 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3060 assert instance is not None, \
3061 "Cannot retrieve locked instance %s" % self.op.instance_name
3062 _CheckNodeOnline(self, instance.primary_node)
3064 if instance.disk_template == constants.DT_DISKLESS:
3065 raise errors.OpPrereqError("Instance '%s' has no disks" %
3066 self.op.instance_name)
3067 if instance.admin_up:
3068 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3069 self.op.instance_name)
3070 remote_info = self.rpc.call_instance_info(instance.primary_node,
3072 instance.hypervisor)
3074 if remote_info.data:
3075 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3076 (self.op.instance_name,
3077 instance.primary_node))
3079 self.op.os_type = getattr(self.op, "os_type", None)
3080 if self.op.os_type is not None:
3082 pnode = self.cfg.GetNodeInfo(
3083 self.cfg.ExpandNodeName(instance.primary_node))
3085 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3087 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3089 if not isinstance(result.data, objects.OS):
3090 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3091 " primary node" % self.op.os_type)
3093 self.instance = instance
3095 def Exec(self, feedback_fn):
3096 """Reinstall the instance.
3099 inst = self.instance
3101 if self.op.os_type is not None:
3102 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3103 inst.os = self.op.os_type
3104 self.cfg.Update(inst)
3106 _StartInstanceDisks(self, inst, None)
3108 feedback_fn("Running the instance OS create scripts...")
3109 result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3110 msg = result.RemoteFailMsg()
3112 raise errors.OpExecError("Could not install OS for instance %s"
3114 (inst.name, inst.primary_node, msg))
3116 _ShutdownInstanceDisks(self, inst)
3119 class LURenameInstance(LogicalUnit):
3120 """Rename an instance.
3123 HPATH = "instance-rename"
3124 HTYPE = constants.HTYPE_INSTANCE
3125 _OP_REQP = ["instance_name", "new_name"]
3127 def BuildHooksEnv(self):
3130 This runs on master, primary and secondary nodes of the instance.
3133 env = _BuildInstanceHookEnvByObject(self, self.instance)
3134 env["INSTANCE_NEW_NAME"] = self.op.new_name
3135 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3138 def CheckPrereq(self):
3139 """Check prerequisites.
3141 This checks that the instance is in the cluster and is not running.
3144 instance = self.cfg.GetInstanceInfo(
3145 self.cfg.ExpandInstanceName(self.op.instance_name))
3146 if instance is None:
3147 raise errors.OpPrereqError("Instance '%s' not known" %
3148 self.op.instance_name)
3149 _CheckNodeOnline(self, instance.primary_node)
3151 if instance.admin_up:
3152 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3153 self.op.instance_name)
3154 remote_info = self.rpc.call_instance_info(instance.primary_node,
3156 instance.hypervisor)
3158 if remote_info.data:
3159 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3160 (self.op.instance_name,
3161 instance.primary_node))
3162 self.instance = instance
3164 # new name verification
3165 name_info = utils.HostInfo(self.op.new_name)
3167 self.op.new_name = new_name = name_info.name
3168 instance_list = self.cfg.GetInstanceList()
3169 if new_name in instance_list:
3170 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3173 if not getattr(self.op, "ignore_ip", False):
3174 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3175 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3176 (name_info.ip, new_name))
3179 def Exec(self, feedback_fn):
3180 """Reinstall the instance.
3183 inst = self.instance
3184 old_name = inst.name
3186 if inst.disk_template == constants.DT_FILE:
3187 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3189 self.cfg.RenameInstance(inst.name, self.op.new_name)
3190 # Change the instance lock. This is definitely safe while we hold the BGL
3191 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3192 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3194 # re-read the instance from the configuration after rename
3195 inst = self.cfg.GetInstanceInfo(self.op.new_name)
3197 if inst.disk_template == constants.DT_FILE:
3198 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3199 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3200 old_file_storage_dir,
3201 new_file_storage_dir)
3204 raise errors.OpExecError("Could not connect to node '%s' to rename"
3205 " directory '%s' to '%s' (but the instance"
3206 " has been renamed in Ganeti)" % (
3207 inst.primary_node, old_file_storage_dir,
3208 new_file_storage_dir))
3210 if not result.data[0]:
3211 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3212 " (but the instance has been renamed in"
3213 " Ganeti)" % (old_file_storage_dir,
3214 new_file_storage_dir))
3216 _StartInstanceDisks(self, inst, None)
3218 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3220 msg = result.RemoteFailMsg()
3222 msg = ("Could not run OS rename script for instance %s on node %s"
3223 " (but the instance has been renamed in Ganeti): %s" %
3224 (inst.name, inst.primary_node, msg))
3225 self.proc.LogWarning(msg)
3227 _ShutdownInstanceDisks(self, inst)
3230 class LURemoveInstance(LogicalUnit):
3231 """Remove an instance.
3234 HPATH = "instance-remove"
3235 HTYPE = constants.HTYPE_INSTANCE
3236 _OP_REQP = ["instance_name", "ignore_failures"]
3239 def ExpandNames(self):
3240 self._ExpandAndLockInstance()
3241 self.needed_locks[locking.LEVEL_NODE] = []
3242 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3244 def DeclareLocks(self, level):
3245 if level == locking.LEVEL_NODE:
3246 self._LockInstancesNodes()
3248 def BuildHooksEnv(self):
3251 This runs on master, primary and secondary nodes of the instance.
3254 env = _BuildInstanceHookEnvByObject(self, self.instance)
3255 nl = [self.cfg.GetMasterNode()]
3258 def CheckPrereq(self):
3259 """Check prerequisites.
3261 This checks that the instance is in the cluster.
3264 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3265 assert self.instance is not None, \
3266 "Cannot retrieve locked instance %s" % self.op.instance_name
3268 def Exec(self, feedback_fn):
3269 """Remove the instance.
3272 instance = self.instance
3273 logging.info("Shutting down instance %s on node %s",
3274 instance.name, instance.primary_node)
3276 result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3277 msg = result.RemoteFailMsg()
3279 if self.op.ignore_failures:
3280 feedback_fn("Warning: can't shutdown instance: %s" % msg)
3282 raise errors.OpExecError("Could not shutdown instance %s on"
3284 (instance.name, instance.primary_node, msg))
3286 logging.info("Removing block devices for instance %s", instance.name)
3288 if not _RemoveDisks(self, instance):
3289 if self.op.ignore_failures:
3290 feedback_fn("Warning: can't remove instance's disks")
3292 raise errors.OpExecError("Can't remove instance's disks")
3294 logging.info("Removing instance %s out of cluster config", instance.name)
3296 self.cfg.RemoveInstance(instance.name)
3297 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3300 class LUQueryInstances(NoHooksLU):
3301 """Logical unit for querying instances.
3304 _OP_REQP = ["output_fields", "names", "use_locking"]
3306 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3308 "disk_template", "ip", "mac", "bridge",
3309 "sda_size", "sdb_size", "vcpus", "tags",
3310 "network_port", "beparams",
3311 r"(disk)\.(size)/([0-9]+)",
3312 r"(disk)\.(sizes)", "disk_usage",
3313 r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3314 r"(nic)\.(macs|ips|bridges)",
3315 r"(disk|nic)\.(count)",
3316 "serial_no", "hypervisor", "hvparams",] +
3318 for name in constants.HVS_PARAMETERS] +
3320 for name in constants.BES_PARAMETERS])
3321 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3324 def ExpandNames(self):
3325 _CheckOutputFields(static=self._FIELDS_STATIC,
3326 dynamic=self._FIELDS_DYNAMIC,
3327 selected=self.op.output_fields)
3329 self.needed_locks = {}
3330 self.share_locks[locking.LEVEL_INSTANCE] = 1
3331 self.share_locks[locking.LEVEL_NODE] = 1
3334 self.wanted = _GetWantedInstances(self, self.op.names)
3336 self.wanted = locking.ALL_SET
3338 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3339 self.do_locking = self.do_node_query and self.op.use_locking
3341 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3342 self.needed_locks[locking.LEVEL_NODE] = []
3343 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3345 def DeclareLocks(self, level):
3346 if level == locking.LEVEL_NODE and self.do_locking:
3347 self._LockInstancesNodes()
3349 def CheckPrereq(self):
3350 """Check prerequisites.
3355 def Exec(self, feedback_fn):
3356 """Computes the list of nodes and their attributes.
3359 all_info = self.cfg.GetAllInstancesInfo()
3360 if self.wanted == locking.ALL_SET:
3361 # caller didn't specify instance names, so ordering is not important
3363 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3365 instance_names = all_info.keys()
3366 instance_names = utils.NiceSort(instance_names)
3368 # caller did specify names, so we must keep the ordering
3370 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3372 tgt_set = all_info.keys()
3373 missing = set(self.wanted).difference(tgt_set)
3375 raise errors.OpExecError("Some instances were removed before"
3376 " retrieving their data: %s" % missing)
3377 instance_names = self.wanted
3379 instance_list = [all_info[iname] for iname in instance_names]
3381 # begin data gathering
3383 nodes = frozenset([inst.primary_node for inst in instance_list])
3384 hv_list = list(set([inst.hypervisor for inst in instance_list]))
3388 if self.do_node_query:
3390 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3392 result = node_data[name]
3394 # offline nodes will be in both lists
3395 off_nodes.append(name)
3397 bad_nodes.append(name)
3400 live_data.update(result.data)
3401 # else no instance is alive
3403 live_data = dict([(name, {}) for name in instance_names])
3405 # end data gathering
3410 for instance in instance_list:
3412 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3413 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3414 for field in self.op.output_fields:
3415 st_match = self._FIELDS_STATIC.Matches(field)
3420 elif field == "pnode":
3421 val = instance.primary_node
3422 elif field == "snodes":
3423 val = list(instance.secondary_nodes)
3424 elif field == "admin_state":
3425 val = instance.admin_up
3426 elif field == "oper_state":
3427 if instance.primary_node in bad_nodes:
3430 val = bool(live_data.get(instance.name))
3431 elif field == "status":
3432 if instance.primary_node in off_nodes:
3433 val = "ERROR_nodeoffline"
3434 elif instance.primary_node in bad_nodes:
3435 val = "ERROR_nodedown"
3437 running = bool(live_data.get(instance.name))
3439 if instance.admin_up:
3444 if instance.admin_up:
3448 elif field == "oper_ram":
3449 if instance.primary_node in bad_nodes:
3451 elif instance.name in live_data:
3452 val = live_data[instance.name].get("memory", "?")
3455 elif field == "vcpus":
3456 val = i_be[constants.BE_VCPUS]
3457 elif field == "disk_template":
3458 val = instance.disk_template
3461 val = instance.nics[0].ip
3464 elif field == "bridge":
3466 val = instance.nics[0].bridge
3469 elif field == "mac":
3471 val = instance.nics[0].mac
3474 elif field == "sda_size" or field == "sdb_size":
3475 idx = ord(field[2]) - ord('a')
3477 val = instance.FindDisk(idx).size
3478 except errors.OpPrereqError:
3480 elif field == "disk_usage": # total disk usage per node
3481 disk_sizes = [{'size': disk.size} for disk in instance.disks]
3482 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3483 elif field == "tags":
3484 val = list(instance.GetTags())
3485 elif field == "serial_no":
3486 val = instance.serial_no
3487 elif field == "network_port":
3488 val = instance.network_port
3489 elif field == "hypervisor":
3490 val = instance.hypervisor
3491 elif field == "hvparams":
3493 elif (field.startswith(HVPREFIX) and
3494 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3495 val = i_hv.get(field[len(HVPREFIX):], None)
3496 elif field == "beparams":
3498 elif (field.startswith(BEPREFIX) and
3499 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3500 val = i_be.get(field[len(BEPREFIX):], None)
3501 elif st_match and st_match.groups():
3502 # matches a variable list
3503 st_groups = st_match.groups()
3504 if st_groups and st_groups[0] == "disk":
3505 if st_groups[1] == "count":
3506 val = len(instance.disks)
3507 elif st_groups[1] == "sizes":
3508 val = [disk.size for disk in instance.disks]
3509 elif st_groups[1] == "size":
3511 val = instance.FindDisk(st_groups[2]).size
3512 except errors.OpPrereqError:
3515 assert False, "Unhandled disk parameter"
3516 elif st_groups[0] == "nic":
3517 if st_groups[1] == "count":
3518 val = len(instance.nics)
3519 elif st_groups[1] == "macs":
3520 val = [nic.mac for nic in instance.nics]
3521 elif st_groups[1] == "ips":
3522 val = [nic.ip for nic in instance.nics]
3523 elif st_groups[1] == "bridges":
3524 val = [nic.bridge for nic in instance.nics]
3527 nic_idx = int(st_groups[2])
3528 if nic_idx >= len(instance.nics):
3531 if st_groups[1] == "mac":
3532 val = instance.nics[nic_idx].mac
3533 elif st_groups[1] == "ip":
3534 val = instance.nics[nic_idx].ip
3535 elif st_groups[1] == "bridge":
3536 val = instance.nics[nic_idx].bridge
3538 assert False, "Unhandled NIC parameter"
3540 assert False, ("Declared but unhandled variable parameter '%s'" %
3543 assert False, "Declared but unhandled parameter '%s'" % field
3550 class LUFailoverInstance(LogicalUnit):
3551 """Failover an instance.
3554 HPATH = "instance-failover"
3555 HTYPE = constants.HTYPE_INSTANCE
3556 _OP_REQP = ["instance_name", "ignore_consistency"]
3559 def ExpandNames(self):
3560 self._ExpandAndLockInstance()
3561 self.needed_locks[locking.LEVEL_NODE] = []
3562 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3564 def DeclareLocks(self, level):
3565 if level == locking.LEVEL_NODE:
3566 self._LockInstancesNodes()
3568 def BuildHooksEnv(self):
3571 This runs on master, primary and secondary nodes of the instance.
3575 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3577 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3578 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3581 def CheckPrereq(self):
3582 """Check prerequisites.
3584 This checks that the instance is in the cluster.
3587 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3588 assert self.instance is not None, \
3589 "Cannot retrieve locked instance %s" % self.op.instance_name
3591 bep = self.cfg.GetClusterInfo().FillBE(instance)
3592 if instance.disk_template not in constants.DTS_NET_MIRROR:
3593 raise errors.OpPrereqError("Instance's disk layout is not"
3594 " network mirrored, cannot failover.")
3596 secondary_nodes = instance.secondary_nodes
3597 if not secondary_nodes:
3598 raise errors.ProgrammerError("no secondary node but using "
3599 "a mirrored disk template")
3601 target_node = secondary_nodes[0]
3602 _CheckNodeOnline(self, target_node)
3603 _CheckNodeNotDrained(self, target_node)
3605 if instance.admin_up:
3606 # check memory requirements on the secondary node
3607 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3608 instance.name, bep[constants.BE_MEMORY],
3609 instance.hypervisor)
3611 self.LogInfo("Not checking memory on the secondary node as"
3612 " instance will not be started")
3614 # check bridge existance
3615 brlist = [nic.bridge for nic in instance.nics]
3616 result = self.rpc.call_bridges_exist(target_node, brlist)
3619 raise errors.OpPrereqError("One or more target bridges %s does not"
3620 " exist on destination node '%s'" %
3621 (brlist, target_node))
3623 def Exec(self, feedback_fn):
3624 """Failover an instance.
3626 The failover is done by shutting it down on its present node and
3627 starting it on the secondary.
3630 instance = self.instance
3632 source_node = instance.primary_node
3633 target_node = instance.secondary_nodes[0]
3635 feedback_fn("* checking disk consistency between source and target")
3636 for dev in instance.disks:
3637 # for drbd, these are drbd over lvm
3638 if not _CheckDiskConsistency(self, dev, target_node, False):
3639 if instance.admin_up and not self.op.ignore_consistency:
3640 raise errors.OpExecError("Disk %s is degraded on target node,"
3641 " aborting failover." % dev.iv_name)
3643 feedback_fn("* shutting down instance on source node")
3644 logging.info("Shutting down instance %s on node %s",
3645 instance.name, source_node)
3647 result = self.rpc.call_instance_shutdown(source_node, instance)
3648 msg = result.RemoteFailMsg()
3650 if self.op.ignore_consistency:
3651 self.proc.LogWarning("Could not shutdown instance %s on node %s."
3652 " Proceeding anyway. Please make sure node"
3653 " %s is down. Error details: %s",
3654 instance.name, source_node, source_node, msg)
3656 raise errors.OpExecError("Could not shutdown instance %s on"
3658 (instance.name, source_node, msg))
3660 feedback_fn("* deactivating the instance's disks on source node")
3661 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3662 raise errors.OpExecError("Can't shut down the instance's disks.")
3664 instance.primary_node = target_node
3665 # distribute new instance config to the other nodes
3666 self.cfg.Update(instance)
3668 # Only start the instance if it's marked as up
3669 if instance.admin_up:
3670 feedback_fn("* activating the instance's disks on target node")
3671 logging.info("Starting instance %s on node %s",
3672 instance.name, target_node)
3674 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3675 ignore_secondaries=True)
3677 _ShutdownInstanceDisks(self, instance)
3678 raise errors.OpExecError("Can't activate the instance's disks")
3680 feedback_fn("* starting the instance on the target node")
3681 result = self.rpc.call_instance_start(target_node, instance, None, None)
3682 msg = result.RemoteFailMsg()
3684 _ShutdownInstanceDisks(self, instance)
3685 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3686 (instance.name, target_node, msg))
3689 class LUMigrateInstance(LogicalUnit):
3690 """Migrate an instance.
3692 This is migration without shutting down, compared to the failover,
3693 which is done with shutdown.
3696 HPATH = "instance-migrate"
3697 HTYPE = constants.HTYPE_INSTANCE
3698 _OP_REQP = ["instance_name", "live", "cleanup"]
3702 def ExpandNames(self):
3703 self._ExpandAndLockInstance()
3704 self.needed_locks[locking.LEVEL_NODE] = []
3705 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3707 def DeclareLocks(self, level):
3708 if level == locking.LEVEL_NODE:
3709 self._LockInstancesNodes()
3711 def BuildHooksEnv(self):
3714 This runs on master, primary and secondary nodes of the instance.
3717 env = _BuildInstanceHookEnvByObject(self, self.instance)
3718 env["MIGRATE_LIVE"] = self.op.live
3719 env["MIGRATE_CLEANUP"] = self.op.cleanup
3720 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3723 def CheckPrereq(self):
3724 """Check prerequisites.
3726 This checks that the instance is in the cluster.
3729 instance = self.cfg.GetInstanceInfo(
3730 self.cfg.ExpandInstanceName(self.op.instance_name))
3731 if instance is None:
3732 raise errors.OpPrereqError("Instance '%s' not known" %
3733 self.op.instance_name)
3735 if instance.disk_template != constants.DT_DRBD8:
3736 raise errors.OpPrereqError("Instance's disk layout is not"
3737 " drbd8, cannot migrate.")
3739 secondary_nodes = instance.secondary_nodes
3740 if not secondary_nodes:
3741 raise errors.ConfigurationError("No secondary node but using"
3742 " drbd8 disk template")
3744 i_be = self.cfg.GetClusterInfo().FillBE(instance)
3746 target_node = secondary_nodes[0]
3747 # check memory requirements on the secondary node
3748 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3749 instance.name, i_be[constants.BE_MEMORY],
3750 instance.hypervisor)
3752 # check bridge existance
3753 brlist = [nic.bridge for nic in instance.nics]
3754 result = self.rpc.call_bridges_exist(target_node, brlist)
3755 if result.failed or not result.data:
3756 raise errors.OpPrereqError("One or more target bridges %s does not"
3757 " exist on destination node '%s'" %
3758 (brlist, target_node))
3760 if not self.op.cleanup:
3761 _CheckNodeNotDrained(self, target_node)
3762 result = self.rpc.call_instance_migratable(instance.primary_node,
3764 msg = result.RemoteFailMsg()
3766 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3769 self.instance = instance
3771 def _WaitUntilSync(self):
3772 """Poll with custom rpc for disk sync.
3774 This uses our own step-based rpc call.
3777 self.feedback_fn("* wait until resync is done")
3781 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3783 self.instance.disks)
3785 for node, nres in result.items():
3786 msg = nres.RemoteFailMsg()
3788 raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3790 node_done, node_percent = nres.payload
3791 all_done = all_done and node_done
3792 if node_percent is not None:
3793 min_percent = min(min_percent, node_percent)
3795 if min_percent < 100:
3796 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3799 def _EnsureSecondary(self, node):
3800 """Demote a node to secondary.
3803 self.feedback_fn("* switching node %s to secondary mode" % node)
3805 for dev in self.instance.disks:
3806 self.cfg.SetDiskID(dev, node)
3808 result = self.rpc.call_blockdev_close(node, self.instance.name,
3809 self.instance.disks)
3810 msg = result.RemoteFailMsg()
3812 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3813 " error %s" % (node, msg))
3815 def _GoStandalone(self):
3816 """Disconnect from the network.
3819 self.feedback_fn("* changing into standalone mode")
3820 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3821 self.instance.disks)
3822 for node, nres in result.items():
3823 msg = nres.RemoteFailMsg()
3825 raise errors.OpExecError("Cannot disconnect disks node %s,"
3826 " error %s" % (node, msg))
3828 def _GoReconnect(self, multimaster):
3829 """Reconnect to the network.
3835 msg = "single-master"
3836 self.feedback_fn("* changing disks into %s mode" % msg)
3837 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3838 self.instance.disks,
3839 self.instance.name, multimaster)
3840 for node, nres in result.items():
3841 msg = nres.RemoteFailMsg()
3843 raise errors.OpExecError("Cannot change disks config on node %s,"
3844 " error: %s" % (node, msg))
3846 def _ExecCleanup(self):
3847 """Try to cleanup after a failed migration.
3849 The cleanup is done by:
3850 - check that the instance is running only on one node
3851 (and update the config if needed)
3852 - change disks on its secondary node to secondary
3853 - wait until disks are fully synchronized
3854 - disconnect from the network
3855 - change disks into single-master mode
3856 - wait again until disks are fully synchronized
3859 instance = self.instance
3860 target_node = self.target_node
3861 source_node = self.source_node
3863 # check running on only one node
3864 self.feedback_fn("* checking where the instance actually runs"
3865 " (if this hangs, the hypervisor might be in"
3867 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3868 for node, result in ins_l.items():
3870 if not isinstance(result.data, list):
3871 raise errors.OpExecError("Can't contact node '%s'" % node)
3873 runningon_source = instance.name in ins_l[source_node].data
3874 runningon_target = instance.name in ins_l[target_node].data
3876 if runningon_source and runningon_target:
3877 raise errors.OpExecError("Instance seems to be running on two nodes,"
3878 " or the hypervisor is confused. You will have"
3879 " to ensure manually that it runs only on one"
3880 " and restart this operation.")
3882 if not (runningon_source or runningon_target):
3883 raise errors.OpExecError("Instance does not seem to be running at all."
3884 " In this case, it's safer to repair by"
3885 " running 'gnt-instance stop' to ensure disk"
3886 " shutdown, and then restarting it.")
3888 if runningon_target:
3889 # the migration has actually succeeded, we need to update the config
3890 self.feedback_fn("* instance running on secondary node (%s),"
3891 " updating config" % target_node)
3892 instance.primary_node = target_node
3893 self.cfg.Update(instance)
3894 demoted_node = source_node
3896 self.feedback_fn("* instance confirmed to be running on its"
3897 " primary node (%s)" % source_node)
3898 demoted_node = target_node
3900 self._EnsureSecondary(demoted_node)
3902 self._WaitUntilSync()
3903 except errors.OpExecError:
3904 # we ignore here errors, since if the device is standalone, it
3905 # won't be able to sync
3907 self._GoStandalone()
3908 self._GoReconnect(False)
3909 self._WaitUntilSync()
3911 self.feedback_fn("* done")
3913 def _RevertDiskStatus(self):
3914 """Try to revert the disk status after a failed migration.
3917 target_node = self.target_node
3919 self._EnsureSecondary(target_node)
3920 self._GoStandalone()
3921 self._GoReconnect(False)
3922 self._WaitUntilSync()
3923 except errors.OpExecError, err:
3924 self.LogWarning("Migration failed and I can't reconnect the"
3925 " drives: error '%s'\n"
3926 "Please look and recover the instance status" %
3929 def _AbortMigration(self):
3930 """Call the hypervisor code to abort a started migration.
3933 instance = self.instance
3934 target_node = self.target_node
3935 migration_info = self.migration_info
3937 abort_result = self.rpc.call_finalize_migration(target_node,
3941 abort_msg = abort_result.RemoteFailMsg()
3943 logging.error("Aborting migration failed on target node %s: %s" %
3944 (target_node, abort_msg))
3945 # Don't raise an exception here, as we stil have to try to revert the
3946 # disk status, even if this step failed.
3948 def _ExecMigration(self):
3949 """Migrate an instance.
3951 The migrate is done by:
3952 - change the disks into dual-master mode
3953 - wait until disks are fully synchronized again
3954 - migrate the instance
3955 - change disks on the new secondary node (the old primary) to secondary
3956 - wait until disks are fully synchronized
3957 - change disks into single-master mode
3960 instance = self.instance
3961 target_node = self.target_node
3962 source_node = self.source_node
3964 self.feedback_fn("* checking disk consistency between source and target")
3965 for dev in instance.disks:
3966 if not _CheckDiskConsistency(self, dev, target_node, False):
3967 raise errors.OpExecError("Disk %s is degraded or not fully"
3968 " synchronized on target node,"
3969 " aborting migrate." % dev.iv_name)
3971 # First get the migration information from the remote node
3972 result = self.rpc.call_migration_info(source_node, instance)
3973 msg = result.RemoteFailMsg()
3975 log_err = ("Failed fetching source migration information from %s: %s" %
3977 logging.error(log_err)
3978 raise errors.OpExecError(log_err)
3980 self.migration_info = migration_info = result.payload
3982 # Then switch the disks to master/master mode
3983 self._EnsureSecondary(target_node)
3984 self._GoStandalone()
3985 self._GoReconnect(True)
3986 self._WaitUntilSync()
3988 self.feedback_fn("* preparing %s to accept the instance" % target_node)
3989 result = self.rpc.call_accept_instance(target_node,
3992 self.nodes_ip[target_node])
3994 msg = result.RemoteFailMsg()
3996 logging.error("Instance pre-migration failed, trying to revert"
3997 " disk status: %s", msg)
3998 self._AbortMigration()
3999 self._RevertDiskStatus()
4000 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4001 (instance.name, msg))
4003 self.feedback_fn("* migrating instance to %s" % target_node)
4005 result = self.rpc.call_instance_migrate(source_node, instance,
4006 self.nodes_ip[target_node],
4008 msg = result.RemoteFailMsg()
4010 logging.error("Instance migration failed, trying to revert"
4011 " disk status: %s", msg)
4012 self._AbortMigration()
4013 self._RevertDiskStatus()
4014 raise errors.OpExecError("Could not migrate instance %s: %s" %
4015 (instance.name, msg))
4018 instance.primary_node = target_node
4019 # distribute new instance config to the other nodes
4020 self.cfg.Update(instance)
4022 result = self.rpc.call_finalize_migration(target_node,
4026 msg = result.RemoteFailMsg()
4028 logging.error("Instance migration succeeded, but finalization failed:"
4030 raise errors.OpExecError("Could not finalize instance migration: %s" %
4033 self._EnsureSecondary(source_node)
4034 self._WaitUntilSync()
4035 self._GoStandalone()
4036 self._GoReconnect(False)
4037 self._WaitUntilSync()
4039 self.feedback_fn("* done")
4041 def Exec(self, feedback_fn):
4042 """Perform the migration.
4045 self.feedback_fn = feedback_fn
4047 self.source_node = self.instance.primary_node
4048 self.target_node = self.instance.secondary_nodes[0]
4049 self.all_nodes = [self.source_node, self.target_node]
4051 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4052 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4055 return self._ExecCleanup()
4057 return self._ExecMigration()
4060 def _CreateBlockDev(lu, node, instance, device, force_create,
4062 """Create a tree of block devices on a given node.
4064 If this device type has to be created on secondaries, create it and
4067 If not, just recurse to children keeping the same 'force' value.
4069 @param lu: the lu on whose behalf we execute
4070 @param node: the node on which to create the device
4071 @type instance: L{objects.Instance}
4072 @param instance: the instance which owns the device
4073 @type device: L{objects.Disk}
4074 @param device: the device to create
4075 @type force_create: boolean
4076 @param force_create: whether to force creation of this device; this
4077 will be change to True whenever we find a device which has
4078 CreateOnSecondary() attribute
4079 @param info: the extra 'metadata' we should attach to the device
4080 (this will be represented as a LVM tag)
4081 @type force_open: boolean
4082 @param force_open: this parameter will be passes to the
4083 L{backend.BlockdevCreate} function where it specifies
4084 whether we run on primary or not, and it affects both
4085 the child assembly and the device own Open() execution
4088 if device.CreateOnSecondary():
4092 for child in device.children:
4093 _CreateBlockDev(lu, node, instance, child, force_create,
4096 if not force_create:
4099 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4102 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4103 """Create a single block device on a given node.
4105 This will not recurse over children of the device, so they must be
4108 @param lu: the lu on whose behalf we execute
4109 @param node: the node on which to create the device
4110 @type instance: L{objects.Instance}
4111 @param instance: the instance which owns the device
4112 @type device: L{objects.Disk}
4113 @param device: the device to create
4114 @param info: the extra 'metadata' we should attach to the device
4115 (this will be represented as a LVM tag)
4116 @type force_open: boolean
4117 @param force_open: this parameter will be passes to the
4118 L{backend.BlockdevCreate} function where it specifies
4119 whether we run on primary or not, and it affects both
4120 the child assembly and the device own Open() execution
4123 lu.cfg.SetDiskID(device, node)
4124 result = lu.rpc.call_blockdev_create(node, device, device.size,
4125 instance.name, force_open, info)
4126 msg = result.RemoteFailMsg()
4128 raise errors.OpExecError("Can't create block device %s on"
4129 " node %s for instance %s: %s" %
4130 (device, node, instance.name, msg))
4131 if device.physical_id is None:
4132 device.physical_id = result.payload
4135 def _GenerateUniqueNames(lu, exts):
4136 """Generate a suitable LV name.
4138 This will generate a logical volume name for the given instance.
4143 new_id = lu.cfg.GenerateUniqueID()
4144 results.append("%s%s" % (new_id, val))
4148 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4150 """Generate a drbd8 device complete with its children.
4153 port = lu.cfg.AllocatePort()
4154 vgname = lu.cfg.GetVGName()
4155 shared_secret = lu.cfg.GenerateDRBDSecret()
4156 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4157 logical_id=(vgname, names[0]))
4158 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4159 logical_id=(vgname, names[1]))
4160 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4161 logical_id=(primary, secondary, port,
4164 children=[dev_data, dev_meta],
4169 def _GenerateDiskTemplate(lu, template_name,
4170 instance_name, primary_node,
4171 secondary_nodes, disk_info,
4172 file_storage_dir, file_driver,
4174 """Generate the entire disk layout for a given template type.
4177 #TODO: compute space requirements
4179 vgname = lu.cfg.GetVGName()
4180 disk_count = len(disk_info)
4182 if template_name == constants.DT_DISKLESS:
4184 elif template_name == constants.DT_PLAIN:
4185 if len(secondary_nodes) != 0:
4186 raise errors.ProgrammerError("Wrong template configuration")
4188 names = _GenerateUniqueNames(lu, [".disk%d" % i
4189 for i in range(disk_count)])
4190 for idx, disk in enumerate(disk_info):
4191 disk_index = idx + base_index
4192 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4193 logical_id=(vgname, names[idx]),
4194 iv_name="disk/%d" % disk_index,
4196 disks.append(disk_dev)
4197 elif template_name == constants.DT_DRBD8:
4198 if len(secondary_nodes) != 1:
4199 raise errors.ProgrammerError("Wrong template configuration")
4200 remote_node = secondary_nodes[0]
4201 minors = lu.cfg.AllocateDRBDMinor(
4202 [primary_node, remote_node] * len(disk_info), instance_name)
4205 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4206 for i in range(disk_count)]):
4207 names.append(lv_prefix + "_data")
4208 names.append(lv_prefix + "_meta")
4209 for idx, disk in enumerate(disk_info):
4210 disk_index = idx + base_index
4211 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4212 disk["size"], names[idx*2:idx*2+2],
4213 "disk/%d" % disk_index,
4214 minors[idx*2], minors[idx*2+1])
4215 disk_dev.mode = disk["mode"]
4216 disks.append(disk_dev)
4217 elif template_name == constants.DT_FILE:
4218 if len(secondary_nodes) != 0:
4219 raise errors.ProgrammerError("Wrong template configuration")
4221 for idx, disk in enumerate(disk_info):
4222 disk_index = idx + base_index
4223 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4224 iv_name="disk/%d" % disk_index,
4225 logical_id=(file_driver,
4226 "%s/disk%d" % (file_storage_dir,
4229 disks.append(disk_dev)
4231 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4235 def _GetInstanceInfoText(instance):
4236 """Compute that text that should be added to the disk's metadata.
4239 return "originstname+%s" % instance.name
4242 def _CreateDisks(lu, instance):
4243 """Create all disks for an instance.
4245 This abstracts away some work from AddInstance.
4247 @type lu: L{LogicalUnit}
4248 @param lu: the logical unit on whose behalf we execute
4249 @type instance: L{objects.Instance}
4250 @param instance: the instance whose disks we should create
4252 @return: the success of the creation
4255 info = _GetInstanceInfoText(instance)
4256 pnode = instance.primary_node
4258 if instance.disk_template == constants.DT_FILE:
4259 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4260 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4262 if result.failed or not result.data:
4263 raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4265 if not result.data[0]:
4266 raise errors.OpExecError("Failed to create directory '%s'" %
4269 # Note: this needs to be kept in sync with adding of disks in
4270 # LUSetInstanceParams
4271 for device in instance.disks:
4272 logging.info("Creating volume %s for instance %s",
4273 device.iv_name, instance.name)
4275 for node in instance.all_nodes:
4276 f_create = node == pnode
4277 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4280 def _RemoveDisks(lu, instance):
4281 """Remove all disks for an instance.
4283 This abstracts away some work from `AddInstance()` and
4284 `RemoveInstance()`. Note that in case some of the devices couldn't
4285 be removed, the removal will continue with the other ones (compare
4286 with `_CreateDisks()`).
4288 @type lu: L{LogicalUnit}
4289 @param lu: the logical unit on whose behalf we execute
4290 @type instance: L{objects.Instance}
4291 @param instance: the instance whose disks we should remove
4293 @return: the success of the removal
4296 logging.info("Removing block devices for instance %s", instance.name)
4299 for device in instance.disks:
4300 for node, disk in device.ComputeNodeTree(instance.primary_node):
4301 lu.cfg.SetDiskID(disk, node)
4302 msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4304 lu.LogWarning("Could not remove block device %s on node %s,"
4305 " continuing anyway: %s", device.iv_name, node, msg)
4308 if instance.disk_template == constants.DT_FILE:
4309 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4310 result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4312 if result.failed or not result.data:
4313 logging.error("Could not remove directory '%s'", file_storage_dir)
4319 def _ComputeDiskSize(disk_template, disks):
4320 """Compute disk size requirements in the volume group
4323 # Required free disk space as a function of disk and swap space
4325 constants.DT_DISKLESS: None,
4326 constants.DT_PLAIN: sum(d["size"] for d in disks),
4327 # 128 MB are added for drbd metadata for each disk
4328 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4329 constants.DT_FILE: None,
4332 if disk_template not in req_size_dict:
4333 raise errors.ProgrammerError("Disk template '%s' size requirement"
4334 " is unknown" % disk_template)
4336 return req_size_dict[disk_template]
4339 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4340 """Hypervisor parameter validation.
4342 This function abstract the hypervisor parameter validation to be
4343 used in both instance create and instance modify.
4345 @type lu: L{LogicalUnit}
4346 @param lu: the logical unit for which we check
4347 @type nodenames: list
4348 @param nodenames: the list of nodes on which we should check
4349 @type hvname: string
4350 @param hvname: the name of the hypervisor we should use
4351 @type hvparams: dict
4352 @param hvparams: the parameters which we need to check
4353 @raise errors.OpPrereqError: if the parameters are not valid
4356 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4359 for node in nodenames:
4363 msg = info.RemoteFailMsg()
4365 raise errors.OpPrereqError("Hypervisor parameter validation"
4366 " failed on node %s: %s" % (node, msg))
4369 class LUCreateInstance(LogicalUnit):
4370 """Create an instance.
4373 HPATH = "instance-add"
4374 HTYPE = constants.HTYPE_INSTANCE
4375 _OP_REQP = ["instance_name", "disks", "disk_template",
4377 "wait_for_sync", "ip_check", "nics",
4378 "hvparams", "beparams"]
4381 def _ExpandNode(self, node):
4382 """Expands and checks one node name.
4385 node_full = self.cfg.ExpandNodeName(node)
4386 if node_full is None:
4387 raise errors.OpPrereqError("Unknown node %s" % node)
4390 def ExpandNames(self):
4391 """ExpandNames for CreateInstance.
4393 Figure out the right locks for instance creation.
4396 self.needed_locks = {}
4398 # set optional parameters to none if they don't exist
4399 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4400 if not hasattr(self.op, attr):
4401 setattr(self.op, attr, None)
4403 # cheap checks, mostly valid constants given
4405 # verify creation mode
4406 if self.op.mode not in (constants.INSTANCE_CREATE,
4407 constants.INSTANCE_IMPORT):
4408 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4411 # disk template and mirror node verification
4412 if self.op.disk_template not in constants.DISK_TEMPLATES:
4413 raise errors.OpPrereqError("Invalid disk template name")
4415 if self.op.hypervisor is None:
4416 self.op.hypervisor = self.cfg.GetHypervisorType()
4418 cluster = self.cfg.GetClusterInfo()
4419 enabled_hvs = cluster.enabled_hypervisors
4420 if self.op.hypervisor not in enabled_hvs:
4421 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4422 " cluster (%s)" % (self.op.hypervisor,
4423 ",".join(enabled_hvs)))
4425 # check hypervisor parameter syntax (locally)
4426 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4427 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4429 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4430 hv_type.CheckParameterSyntax(filled_hvp)
4431 self.hv_full = filled_hvp
4433 # fill and remember the beparams dict
4434 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4435 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4438 #### instance parameters check
4440 # instance name verification
4441 hostname1 = utils.HostInfo(self.op.instance_name)
4442 self.op.instance_name = instance_name = hostname1.name
4444 # this is just a preventive check, but someone might still add this
4445 # instance in the meantime, and creation will fail at lock-add time
4446 if instance_name in self.cfg.GetInstanceList():
4447 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4450 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4454 for nic in self.op.nics:
4455 # ip validity checks
4456 ip = nic.get("ip", None)
4457 if ip is None or ip.lower() == "none":
4459 elif ip.lower() == constants.VALUE_AUTO:
4460 nic_ip = hostname1.ip
4462 if not utils.IsValidIP(ip):
4463 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4464 " like a valid IP" % ip)
4467 # MAC address verification
4468 mac = nic.get("mac", constants.VALUE_AUTO)
4469 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4470 if not utils.IsValidMac(mac.lower()):
4471 raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4473 # bridge verification
4474 bridge = nic.get("bridge", None)
4476 bridge = self.cfg.GetDefBridge()
4477 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4479 # disk checks/pre-build
4481 for disk in self.op.disks:
4482 mode = disk.get("mode", constants.DISK_RDWR)
4483 if mode not in constants.DISK_ACCESS_SET:
4484 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4486 size = disk.get("size", None)
4488 raise errors.OpPrereqError("Missing disk size")
4492 raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4493 self.disks.append({"size": size, "mode": mode})
4495 # used in CheckPrereq for ip ping check
4496 self.check_ip = hostname1.ip
4498 # file storage checks
4499 if (self.op.file_driver and
4500 not self.op.file_driver in constants.FILE_DRIVER):
4501 raise errors.OpPrereqError("Invalid file driver name '%s'" %
4502 self.op.file_driver)
4504 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4505 raise errors.OpPrereqError("File storage directory path not absolute")
4507 ### Node/iallocator related checks
4508 if [self.op.iallocator, self.op.pnode].count(None) != 1:
4509 raise errors.OpPrereqError("One and only one of iallocator and primary"
4510 " node must be given")
4512 if self.op.iallocator:
4513 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4515 self.op.pnode = self._ExpandNode(self.op.pnode)
4516 nodelist = [self.op.pnode]
4517 if self.op.snode is not None:
4518 self.op.snode = self._ExpandNode(self.op.snode)
4519 nodelist.append(self.op.snode)
4520 self.needed_locks[locking.LEVEL_NODE] = nodelist
4522 # in case of import lock the source node too
4523 if self.op.mode == constants.INSTANCE_IMPORT:
4524 src_node = getattr(self.op, "src_node", None)
4525 src_path = getattr(self.op, "src_path", None)
4527 if src_path is None:
4528 self.op.src_path = src_path = self.op.instance_name
4530 if src_node is None:
4531 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4532 self.op.src_node = None
4533 if os.path.isabs(src_path):
4534 raise errors.OpPrereqError("Importing an instance from an absolute"
4535 " path requires a source node option.")
4537 self.op.src_node = src_node = self._ExpandNode(src_node)
4538 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4539 self.needed_locks[locking.LEVEL_NODE].append(src_node)
4540 if not os.path.isabs(src_path):
4541 self.op.src_path = src_path = \
4542 os.path.join(constants.EXPORT_DIR, src_path)
4544 else: # INSTANCE_CREATE
4545 if getattr(self.op, "os_type", None) is None:
4546 raise errors.OpPrereqError("No guest OS specified")
4548 def _RunAllocator(self):
4549 """Run the allocator based on input opcode.
4552 nics = [n.ToDict() for n in self.nics]
4553 ial = IAllocator(self,
4554 mode=constants.IALLOCATOR_MODE_ALLOC,
4555 name=self.op.instance_name,
4556 disk_template=self.op.disk_template,
4559 vcpus=self.be_full[constants.BE_VCPUS],
4560 mem_size=self.be_full[constants.BE_MEMORY],
4563 hypervisor=self.op.hypervisor,
4566 ial.Run(self.op.iallocator)
4569 raise errors.OpPrereqError("Can't compute nodes using"
4570 " iallocator '%s': %s" % (self.op.iallocator,
4572 if len(ial.nodes) != ial.required_nodes:
4573 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4574 " of nodes (%s), required %s" %
4575 (self.op.iallocator, len(ial.nodes),
4576 ial.required_nodes))
4577 self.op.pnode = ial.nodes[0]
4578 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4579 self.op.instance_name, self.op.iallocator,
4580 ", ".join(ial.nodes))
4581 if ial.required_nodes == 2:
4582 self.op.snode = ial.nodes[1]
4584 def BuildHooksEnv(self):
4587 This runs on master, primary and secondary nodes of the instance.
4591 "ADD_MODE": self.op.mode,
4593 if self.op.mode == constants.INSTANCE_IMPORT:
4594 env["SRC_NODE"] = self.op.src_node
4595 env["SRC_PATH"] = self.op.src_path
4596 env["SRC_IMAGES"] = self.src_images
4598 env.update(_BuildInstanceHookEnv(
4599 name=self.op.instance_name,
4600 primary_node=self.op.pnode,
4601 secondary_nodes=self.secondaries,
4602 status=self.op.start,
4603 os_type=self.op.os_type,
4604 memory=self.be_full[constants.BE_MEMORY],
4605 vcpus=self.be_full[constants.BE_VCPUS],
4606 nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4607 disk_template=self.op.disk_template,
4608 disks=[(d["size"], d["mode"]) for d in self.disks],
4611 hypervisor=self.op.hypervisor,
4614 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4619 def CheckPrereq(self):
4620 """Check prerequisites.
4623 if (not self.cfg.GetVGName() and
4624 self.op.disk_template not in constants.DTS_NOT_LVM):
4625 raise errors.OpPrereqError("Cluster does not support lvm-based"
4628 if self.op.mode == constants.INSTANCE_IMPORT:
4629 src_node = self.op.src_node
4630 src_path = self.op.src_path
4632 if src_node is None:
4633 exp_list = self.rpc.call_export_list(
4634 self.acquired_locks[locking.LEVEL_NODE])
4636 for node in exp_list:
4637 if not exp_list[node].failed and src_path in exp_list[node].data:
4639 self.op.src_node = src_node = node
4640 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4644 raise errors.OpPrereqError("No export found for relative path %s" %
4647 _CheckNodeOnline(self, src_node)
4648 result = self.rpc.call_export_info(src_node, src_path)
4651 raise errors.OpPrereqError("No export found in dir %s" % src_path)
4653 export_info = result.data
4654 if not export_info.has_section(constants.INISECT_EXP):
4655 raise errors.ProgrammerError("Corrupted export config")
4657 ei_version = export_info.get(constants.INISECT_EXP, 'version')
4658 if (int(ei_version) != constants.EXPORT_VERSION):
4659 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4660 (ei_version, constants.EXPORT_VERSION))
4662 # Check that the new instance doesn't have less disks than the export
4663 instance_disks = len(self.disks)
4664 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4665 if instance_disks < export_disks:
4666 raise errors.OpPrereqError("Not enough disks to import."
4667 " (instance: %d, export: %d)" %
4668 (instance_disks, export_disks))
4670 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4672 for idx in range(export_disks):
4673 option = 'disk%d_dump' % idx
4674 if export_info.has_option(constants.INISECT_INS, option):
4675 # FIXME: are the old os-es, disk sizes, etc. useful?
4676 export_name = export_info.get(constants.INISECT_INS, option)
4677 image = os.path.join(src_path, export_name)
4678 disk_images.append(image)
4680 disk_images.append(False)
4682 self.src_images = disk_images
4684 old_name = export_info.get(constants.INISECT_INS, 'name')
4685 # FIXME: int() here could throw a ValueError on broken exports
4686 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4687 if self.op.instance_name == old_name:
4688 for idx, nic in enumerate(self.nics):
4689 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4690 nic_mac_ini = 'nic%d_mac' % idx
4691 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4693 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4694 # ip ping checks (we use the same ip that was resolved in ExpandNames)
4695 if self.op.start and not self.op.ip_check:
4696 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4697 " adding an instance in start mode")
4699 if self.op.ip_check:
4700 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4701 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4702 (self.check_ip, self.op.instance_name))
4704 #### mac address generation
4705 # By generating here the mac address both the allocator and the hooks get
4706 # the real final mac address rather than the 'auto' or 'generate' value.
4707 # There is a race condition between the generation and the instance object
4708 # creation, which means that we know the mac is valid now, but we're not
4709 # sure it will be when we actually add the instance. If things go bad
4710 # adding the instance will abort because of a duplicate mac, and the
4711 # creation job will fail.
4712 for nic in self.nics:
4713 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4714 nic.mac = self.cfg.GenerateMAC()
4718 if self.op.iallocator is not None:
4719 self._RunAllocator()
4721 #### node related checks
4723 # check primary node
4724 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4725 assert self.pnode is not None, \
4726 "Cannot retrieve locked node %s" % self.op.pnode
4728 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4731 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4734 self.secondaries = []
4736 # mirror node verification
4737 if self.op.disk_template in constants.DTS_NET_MIRROR:
4738 if self.op.snode is None:
4739 raise errors.OpPrereqError("The networked disk templates need"
4741 if self.op.snode == pnode.name:
4742 raise errors.OpPrereqError("The secondary node cannot be"
4743 " the primary node.")
4744 _CheckNodeOnline(self, self.op.snode)
4745 _CheckNodeNotDrained(self, self.op.snode)
4746 self.secondaries.append(self.op.snode)
4748 nodenames = [pnode.name] + self.secondaries
4750 req_size = _ComputeDiskSize(self.op.disk_template,
4753 # Check lv size requirements
4754 if req_size is not None:
4755 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4757 for node in nodenames:
4758 info = nodeinfo[node]
4762 raise errors.OpPrereqError("Cannot get current information"
4763 " from node '%s'" % node)
4764 vg_free = info.get('vg_free', None)
4765 if not isinstance(vg_free, int):
4766 raise errors.OpPrereqError("Can't compute free disk space on"
4768 if req_size > info['vg_free']:
4769 raise errors.OpPrereqError("Not enough disk space on target node %s."
4770 " %d MB available, %d MB required" %
4771 (node, info['vg_free'], req_size))
4773 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4776 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4778 if not isinstance(result.data, objects.OS) or not result.data:
4779 raise errors.OpPrereqError("OS '%s' not in supported os list for"
4780 " primary node" % self.op.os_type)
4782 # bridge check on primary node
4783 bridges = [n.bridge for n in self.nics]
4784 result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4787 raise errors.OpPrereqError("One of the target bridges '%s' does not"
4788 " exist on destination node '%s'" %
4789 (",".join(bridges), pnode.name))
4791 # memory check on primary node
4793 _CheckNodeFreeMemory(self, self.pnode.name,
4794 "creating instance %s" % self.op.instance_name,
4795 self.be_full[constants.BE_MEMORY],
4798 def Exec(self, feedback_fn):
4799 """Create and add the instance to the cluster.
4802 instance = self.op.instance_name
4803 pnode_name = self.pnode.name
4805 ht_kind = self.op.hypervisor
4806 if ht_kind in constants.HTS_REQ_PORT:
4807 network_port = self.cfg.AllocatePort()
4811 ##if self.op.vnc_bind_address is None:
4812 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4814 # this is needed because os.path.join does not accept None arguments
4815 if self.op.file_storage_dir is None:
4816 string_file_storage_dir = ""
4818 string_file_storage_dir = self.op.file_storage_dir
4820 # build the full file storage dir path
4821 file_storage_dir = os.path.normpath(os.path.join(
4822 self.cfg.GetFileStorageDir(),
4823 string_file_storage_dir, instance))
4826 disks = _GenerateDiskTemplate(self,
4827 self.op.disk_template,
4828 instance, pnode_name,
4832 self.op.file_driver,
4835 iobj = objects.Instance(name=instance, os=self.op.os_type,
4836 primary_node=pnode_name,
4837 nics=self.nics, disks=disks,
4838 disk_template=self.op.disk_template,
4840 network_port=network_port,
4841 beparams=self.op.beparams,
4842 hvparams=self.op.hvparams,
4843 hypervisor=self.op.hypervisor,
4846 feedback_fn("* creating instance disks...")
4848 _CreateDisks(self, iobj)
4849 except errors.OpExecError:
4850 self.LogWarning("Device creation failed, reverting...")
4852 _RemoveDisks(self, iobj)
4854 self.cfg.ReleaseDRBDMinors(instance)
4857 feedback_fn("adding instance %s to cluster config" % instance)
4859 self.cfg.AddInstance(iobj)
4860 # Declare that we don't want to remove the instance lock anymore, as we've
4861 # added the instance to the config
4862 del self.remove_locks[locking.LEVEL_INSTANCE]
4863 # Unlock all the nodes
4864 if self.op.mode == constants.INSTANCE_IMPORT:
4865 nodes_keep = [self.op.src_node]
4866 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4867 if node != self.op.src_node]
4868 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4869 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4871 self.context.glm.release(locking.LEVEL_NODE)
4872 del self.acquired_locks[locking.LEVEL_NODE]
4874 if self.op.wait_for_sync:
4875 disk_abort = not _WaitForSync(self, iobj)
4876 elif iobj.disk_template in constants.DTS_NET_MIRROR:
4877 # make sure the disks are not degraded (still sync-ing is ok)
4879 feedback_fn("* checking mirrors status")
4880 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4885 _RemoveDisks(self, iobj)
4886 self.cfg.RemoveInstance(iobj.name)
4887 # Make sure the instance lock gets removed
4888 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4889 raise errors.OpExecError("There are some degraded disks for"
4892 feedback_fn("creating os for instance %s on node %s" %
4893 (instance, pnode_name))
4895 if iobj.disk_template != constants.DT_DISKLESS:
4896 if self.op.mode == constants.INSTANCE_CREATE:
4897 feedback_fn("* running the instance OS create scripts...")
4898 result = self.rpc.call_instance_os_add(pnode_name, iobj)
4899 msg = result.RemoteFailMsg()
4901 raise errors.OpExecError("Could not add os for instance %s"
4903 (instance, pnode_name, msg))
4905 elif self.op.mode == constants.INSTANCE_IMPORT:
4906 feedback_fn("* running the instance OS import scripts...")
4907 src_node = self.op.src_node
4908 src_images = self.src_images
4909 cluster_name = self.cfg.GetClusterName()
4910 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4911 src_node, src_images,
4913 import_result.Raise()
4914 for idx, result in enumerate(import_result.data):
4916 self.LogWarning("Could not import the image %s for instance"
4917 " %s, disk %d, on node %s" %
4918 (src_images[idx], instance, idx, pnode_name))
4920 # also checked in the prereq part
4921 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4925 iobj.admin_up = True
4926 self.cfg.Update(iobj)
4927 logging.info("Starting instance %s on node %s", instance, pnode_name)
4928 feedback_fn("* starting instance...")
4929 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4930 msg = result.RemoteFailMsg()
4932 raise errors.OpExecError("Could not start instance: %s" % msg)
4935 class LUConnectConsole(NoHooksLU):
4936 """Connect to an instance's console.
4938 This is somewhat special in that it returns the command line that
4939 you need to run on the master node in order to connect to the
4943 _OP_REQP = ["instance_name"]
4946 def ExpandNames(self):
4947 self._ExpandAndLockInstance()
4949 def CheckPrereq(self):
4950 """Check prerequisites.
4952 This checks that the instance is in the cluster.
4955 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4956 assert self.instance is not None, \
4957 "Cannot retrieve locked instance %s" % self.op.instance_name
4958 _CheckNodeOnline(self, self.instance.primary_node)
4960 def Exec(self, feedback_fn):
4961 """Connect to the console of an instance
4964 instance = self.instance
4965 node = instance.primary_node
4967 node_insts = self.rpc.call_instance_list([node],
4968 [instance.hypervisor])[node]
4971 if instance.name not in node_insts.data:
4972 raise errors.OpExecError("Instance %s is not running." % instance.name)
4974 logging.debug("Connecting to console of %s on %s", instance.name, node)
4976 hyper = hypervisor.GetHypervisor(instance.hypervisor)
4977 cluster = self.cfg.GetClusterInfo()
4978 # beparams and hvparams are passed separately, to avoid editing the
4979 # instance and then saving the defaults in the instance itself.
4980 hvparams = cluster.FillHV(instance)
4981 beparams = cluster.FillBE(instance)
4982 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4985 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4988 class LUReplaceDisks(LogicalUnit):
4989 """Replace the disks of an instance.
4992 HPATH = "mirrors-replace"
4993 HTYPE = constants.HTYPE_INSTANCE
4994 _OP_REQP = ["instance_name", "mode", "disks"]
4997 def CheckArguments(self):
4998 if not hasattr(self.op, "remote_node"):
4999 self.op.remote_node = None
5000 if not hasattr(self.op, "iallocator"):
5001 self.op.iallocator = None
5003 # check for valid parameter combination
5004 cnt = [self.op.remote_node, self.op.iallocator].count(None)
5005 if self.op.mode == constants.REPLACE_DISK_CHG:
5007 raise errors.OpPrereqError("When changing the secondary either an"
5008 " iallocator script must be used or the"
5011 raise errors.OpPrereqError("Give either the iallocator or the new"
5012 " secondary, not both")
5013 else: # not replacing the secondary
5015 raise errors.OpPrereqError("The iallocator and new node options can"
5016 " be used only when changing the"
5019 def ExpandNames(self):
5020 self._ExpandAndLockInstance()
5022 if self.op.iallocator is not None:
5023 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5024 elif self.op.remote_node is not None:
5025 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5026 if remote_node is None:
5027 raise errors.OpPrereqError("Node '%s' not known" %
5028 self.op.remote_node)
5029 self.op.remote_node = remote_node
5030 # Warning: do not remove the locking of the new secondary here
5031 # unless DRBD8.AddChildren is changed to work in parallel;
5032 # currently it doesn't since parallel invocations of
5033 # FindUnusedMinor will conflict
5034 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5035 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5037 self.needed_locks[locking.LEVEL_NODE] = []
5038 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5040 def DeclareLocks(self, level):
5041 # If we're not already locking all nodes in the set we have to declare the
5042 # instance's primary/secondary nodes.
5043 if (level == locking.LEVEL_NODE and
5044 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5045 self._LockInstancesNodes()
5047 def _RunAllocator(self):
5048 """Compute a new secondary node using an IAllocator.
5051 ial = IAllocator(self,
5052 mode=constants.IALLOCATOR_MODE_RELOC,
5053 name=self.op.instance_name,
5054 relocate_from=[self.sec_node])
5056 ial.Run(self.op.iallocator)
5059 raise errors.OpPrereqError("Can't compute nodes using"
5060 " iallocator '%s': %s" % (self.op.iallocator,
5062 if len(ial.nodes) != ial.required_nodes:
5063 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5064 " of nodes (%s), required %s" %
5065 (len(ial.nodes), ial.required_nodes))
5066 self.op.remote_node = ial.nodes[0]
5067 self.LogInfo("Selected new secondary for the instance: %s",
5068 self.op.remote_node)
5070 def BuildHooksEnv(self):
5073 This runs on the master, the primary and all the secondaries.
5077 "MODE": self.op.mode,
5078 "NEW_SECONDARY": self.op.remote_node,
5079 "OLD_SECONDARY": self.instance.secondary_nodes[0],
5081 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5083 self.cfg.GetMasterNode(),
5084 self.instance.primary_node,
5086 if self.op.remote_node is not None:
5087 nl.append(self.op.remote_node)
5090 def CheckPrereq(self):
5091 """Check prerequisites.
5093 This checks that the instance is in the cluster.
5096 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5097 assert instance is not None, \
5098 "Cannot retrieve locked instance %s" % self.op.instance_name
5099 self.instance = instance
5101 if instance.disk_template != constants.DT_DRBD8:
5102 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5105 if len(instance.secondary_nodes) != 1:
5106 raise errors.OpPrereqError("The instance has a strange layout,"
5107 " expected one secondary but found %d" %
5108 len(instance.secondary_nodes))
5110 self.sec_node = instance.secondary_nodes[0]
5112 if self.op.iallocator is not None:
5113 self._RunAllocator()
5115 remote_node = self.op.remote_node
5116 if remote_node is not None:
5117 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5118 assert self.remote_node_info is not None, \
5119 "Cannot retrieve locked node %s" % remote_node
5121 self.remote_node_info = None
5122 if remote_node == instance.primary_node:
5123 raise errors.OpPrereqError("The specified node is the primary node of"
5125 elif remote_node == self.sec_node:
5126 raise errors.OpPrereqError("The specified node is already the"
5127 " secondary node of the instance.")
5129 if self.op.mode == constants.REPLACE_DISK_PRI:
5130 n1 = self.tgt_node = instance.primary_node
5131 n2 = self.oth_node = self.sec_node
5132 elif self.op.mode == constants.REPLACE_DISK_SEC:
5133 n1 = self.tgt_node = self.sec_node
5134 n2 = self.oth_node = instance.primary_node
5135 elif self.op.mode == constants.REPLACE_DISK_CHG:
5136 n1 = self.new_node = remote_node
5137 n2 = self.oth_node = instance.primary_node
5138 self.tgt_node = self.sec_node
5139 _CheckNodeNotDrained(self, remote_node)
5141 raise errors.ProgrammerError("Unhandled disk replace mode")
5143 _CheckNodeOnline(self, n1)
5144 _CheckNodeOnline(self, n2)
5146 if not self.op.disks:
5147 self.op.disks = range(len(instance.disks))
5149 for disk_idx in self.op.disks:
5150 instance.FindDisk(disk_idx)
5152 def _ExecD8DiskOnly(self, feedback_fn):
5153 """Replace a disk on the primary or secondary for dbrd8.
5155 The algorithm for replace is quite complicated:
5157 1. for each disk to be replaced:
5159 1. create new LVs on the target node with unique names
5160 1. detach old LVs from the drbd device
5161 1. rename old LVs to name_replaced.<time_t>
5162 1. rename new LVs to old LVs
5163 1. attach the new LVs (with the old names now) to the drbd device
5165 1. wait for sync across all devices
5167 1. for each modified disk:
5169 1. remove old LVs (which have the name name_replaces.<time_t>)
5171 Failures are not very well handled.
5175 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5176 instance = self.instance
5178 vgname = self.cfg.GetVGName()
5181 tgt_node = self.tgt_node
5182 oth_node = self.oth_node
5184 # Step: check device activation
5185 self.proc.LogStep(1, steps_total, "check device existence")
5186 info("checking volume groups")
5187 my_vg = cfg.GetVGName()
5188 results = self.rpc.call_vg_list([oth_node, tgt_node])
5190 raise errors.OpExecError("Can't list volume groups on the nodes")
5191 for node in oth_node, tgt_node:
5193 if res.failed or not res.data or my_vg not in res.data:
5194 raise errors.OpExecError("Volume group '%s' not found on %s" %
5196 for idx, dev in enumerate(instance.disks):
5197 if idx not in self.op.disks:
5199 for node in tgt_node, oth_node:
5200 info("checking disk/%d on %s" % (idx, node))
5201 cfg.SetDiskID(dev, node)
5202 result = self.rpc.call_blockdev_find(node, dev)
5203 msg = result.RemoteFailMsg()
5204 if not msg and not result.payload:
5205 msg = "disk not found"
5207 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5210 # Step: check other node consistency
5211 self.proc.LogStep(2, steps_total, "check peer consistency")
5212 for idx, dev in enumerate(instance.disks):
5213 if idx not in self.op.disks:
5215 info("checking disk/%d consistency on %s" % (idx, oth_node))
5216 if not _CheckDiskConsistency(self, dev, oth_node,
5217 oth_node==instance.primary_node):
5218 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5219 " to replace disks on this node (%s)" %
5220 (oth_node, tgt_node))
5222 # Step: create new storage
5223 self.proc.LogStep(3, steps_total, "allocate new storage")
5224 for idx, dev in enumerate(instance.disks):
5225 if idx not in self.op.disks:
5228 cfg.SetDiskID(dev, tgt_node)
5229 lv_names = [".disk%d_%s" % (idx, suf)
5230 for suf in ["data", "meta"]]
5231 names = _GenerateUniqueNames(self, lv_names)
5232 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5233 logical_id=(vgname, names[0]))
5234 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5235 logical_id=(vgname, names[1]))
5236 new_lvs = [lv_data, lv_meta]
5237 old_lvs = dev.children
5238 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5239 info("creating new local storage on %s for %s" %
5240 (tgt_node, dev.iv_name))
5241 # we pass force_create=True to force the LVM creation
5242 for new_lv in new_lvs:
5243 _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5244 _GetInstanceInfoText(instance), False)
5246 # Step: for each lv, detach+rename*2+attach
5247 self.proc.LogStep(4, steps_total, "change drbd configuration")
5248 for dev, old_lvs, new_lvs in iv_names.itervalues():
5249 info("detaching %s drbd from local storage" % dev.iv_name)
5250 result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5253 raise errors.OpExecError("Can't detach drbd from local storage on node"
5254 " %s for device %s" % (tgt_node, dev.iv_name))
5256 #cfg.Update(instance)
5258 # ok, we created the new LVs, so now we know we have the needed
5259 # storage; as such, we proceed on the target node to rename
5260 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5261 # using the assumption that logical_id == physical_id (which in
5262 # turn is the unique_id on that node)
5264 # FIXME(iustin): use a better name for the replaced LVs
5265 temp_suffix = int(time.time())
5266 ren_fn = lambda d, suff: (d.physical_id[0],
5267 d.physical_id[1] + "_replaced-%s" % suff)
5268 # build the rename list based on what LVs exist on the node
5270 for to_ren in old_lvs:
5271 result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5272 if not result.RemoteFailMsg() and result.payload:
5274 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5276 info("renaming the old LVs on the target node")
5277 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5280 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5281 # now we rename the new LVs to the old LVs
5282 info("renaming the new LVs on the target node")
5283 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5284 result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5287 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5289 for old, new in zip(old_lvs, new_lvs):
5290 new.logical_id = old.logical_id
5291 cfg.SetDiskID(new, tgt_node)
5293 for disk in old_lvs:
5294 disk.logical_id = ren_fn(disk, temp_suffix)
5295 cfg.SetDiskID(disk, tgt_node)
5297 # now that the new lvs have the old name, we can add them to the device
5298 info("adding new mirror component on %s" % tgt_node)
5299 result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5300 if result.failed or not result.data:
5301 for new_lv in new_lvs:
5302 msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5304 warning("Can't rollback device %s: %s", dev, msg,
5305 hint="cleanup manually the unused logical volumes")
5306 raise errors.OpExecError("Can't add local storage to drbd")
5308 dev.children = new_lvs
5309 cfg.Update(instance)
5311 # Step: wait for sync
5313 # this can fail as the old devices are degraded and _WaitForSync
5314 # does a combined result over all disks, so we don't check its
5316 self.proc.LogStep(5, steps_total, "sync devices")
5317 _WaitForSync(self, instance, unlock=True)
5319 # so check manually all the devices
5320 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5321 cfg.SetDiskID(dev, instance.primary_node)
5322 result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5323 msg = result.RemoteFailMsg()
5324 if not msg and not result.payload:
5325 msg = "disk not found"
5327 raise errors.OpExecError("Can't find DRBD device %s: %s" %
5329 if result.payload[5]:
5330 raise errors.OpExecError("DRBD device %s is degraded!" % name)
5332 # Step: remove old storage
5333 self.proc.LogStep(6, steps_total, "removing old storage")
5334 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5335 info("remove logical volumes for %s" % name)
5337 cfg.SetDiskID(lv, tgt_node)
5338 msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5340 warning("Can't remove old LV: %s" % msg,
5341 hint="manually remove unused LVs")
5344 def _ExecD8Secondary(self, feedback_fn):
5345 """Replace the secondary node for drbd8.
5347 The algorithm for replace is quite complicated:
5348 - for all disks of the instance:
5349 - create new LVs on the new node with same names
5350 - shutdown the drbd device on the old secondary
5351 - disconnect the drbd network on the primary
5352 - create the drbd device on the new secondary
5353 - network attach the drbd on the primary, using an artifice:
5354 the drbd code for Attach() will connect to the network if it
5355 finds a device which is connected to the good local disks but
5357 - wait for sync across all devices
5358 - remove all disks from the old secondary
5360 Failures are not very well handled.
5364 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5365 instance = self.instance
5369 old_node = self.tgt_node
5370 new_node = self.new_node
5371 pri_node = instance.primary_node
5373 old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5374 new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5375 pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5378 # Step: check device activation
5379 self.proc.LogStep(1, steps_total, "check device existence")
5380 info("checking volume groups")
5381 my_vg = cfg.GetVGName()
5382 results = self.rpc.call_vg_list([pri_node, new_node])
5383 for node in pri_node, new_node:
5385 if res.failed or not res.data or my_vg not in res.data:
5386 raise errors.OpExecError("Volume group '%s' not found on %s" %
5388 for idx, dev in enumerate(instance.disks):
5389 if idx not in self.op.disks:
5391 info("checking disk/%d on %s" % (idx, pri_node))
5392 cfg.SetDiskID(dev, pri_node)
5393 result = self.rpc.call_blockdev_find(pri_node, dev)
5394 msg = result.RemoteFailMsg()
5395 if not msg and not result.payload:
5396 msg = "disk not found"
5398 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5399 (idx, pri_node, msg))
5401 # Step: check other node consistency
5402 self.proc.LogStep(2, steps_total, "check peer consistency")
5403 for idx, dev in enumerate(instance.disks):
5404 if idx not in self.op.disks:
5406 info("checking disk/%d consistency on %s" % (idx, pri_node))
5407 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5408 raise errors.OpExecError("Primary node (%s) has degraded storage,"
5409 " unsafe to replace the secondary" %
5412 # Step: create new storage
5413 self.proc.LogStep(3, steps_total, "allocate new storage")
5414 for idx, dev in enumerate(instance.disks):
5415 info("adding new local storage on %s for disk/%d" %
5417 # we pass force_create=True to force LVM creation
5418 for new_lv in dev.children:
5419 _CreateBlockDev(self, new_node, instance, new_lv, True,
5420 _GetInstanceInfoText(instance), False)
5422 # Step 4: dbrd minors and drbd setups changes
5423 # after this, we must manually remove the drbd minors on both the
5424 # error and the success paths
5425 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5427 logging.debug("Allocated minors %s" % (minors,))
5428 self.proc.LogStep(4, steps_total, "changing drbd configuration")
5429 for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5431 info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5432 # create new devices on new_node; note that we create two IDs:
5433 # one without port, so the drbd will be activated without
5434 # networking information on the new node at this stage, and one
5435 # with network, for the latter activation in step 4
5436 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5437 if pri_node == o_node1:
5442 new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5443 new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5445 iv_names[idx] = (dev, dev.children, new_net_id)
5446 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5448 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5449 logical_id=new_alone_id,
5450 children=dev.children,
5453 _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5454 _GetInstanceInfoText(instance), False)
5455 except errors.GenericError:
5456 self.cfg.ReleaseDRBDMinors(instance.name)
5459 for idx, dev in enumerate(instance.disks):
5460 # we have new devices, shutdown the drbd on the old secondary
5461 info("shutting down drbd for disk/%d on old node" % idx)
5462 cfg.SetDiskID(dev, old_node)
5463 msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5465 warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5467 hint="Please cleanup this device manually as soon as possible")
5469 info("detaching primary drbds from the network (=> standalone)")
5470 result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5471 instance.disks)[pri_node]
5473 msg = result.RemoteFailMsg()
5475 # detaches didn't succeed (unlikely)
5476 self.cfg.ReleaseDRBDMinors(instance.name)
5477 raise errors.OpExecError("Can't detach the disks from the network on"
5478 " old node: %s" % (msg,))
5480 # if we managed to detach at least one, we update all the disks of
5481 # the instance to point to the new secondary
5482 info("updating instance configuration")
5483 for dev, _, new_logical_id in iv_names.itervalues():
5484 dev.logical_id = new_logical_id
5485 cfg.SetDiskID(dev, pri_node)
5486 cfg.Update(instance)
5488 # and now perform the drbd attach
5489 info("attaching primary drbds to new secondary (standalone => connected)")
5490 result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5491 instance.disks, instance.name,
5493 for to_node, to_result in result.items():
5494 msg = to_result.RemoteFailMsg()
5496 warning("can't attach drbd disks on node %s: %s", to_node, msg,
5497 hint="please do a gnt-instance info to see the"
5500 # this can fail as the old devices are degraded and _WaitForSync
5501 # does a combined result over all disks, so we don't check its
5503 self.proc.LogStep(5, steps_total, "sync devices")
5504 _WaitForSync(self, instance, unlock=True)
5506 # so check manually all the devices
5507 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5508 cfg.SetDiskID(dev, pri_node)
5509 result = self.rpc.call_blockdev_find(pri_node, dev)
5510 msg = result.RemoteFailMsg()
5511 if not msg and not result.payload:
5512 msg = "disk not found"
5514 raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5516 if result.payload[5]:
5517 raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5519 self.proc.LogStep(6, steps_total, "removing old storage")
5520 for idx, (dev, old_lvs, _) in iv_names.iteritems():
5521 info("remove logical volumes for disk/%d" % idx)
5523 cfg.SetDiskID(lv, old_node)
5524 msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5526 warning("Can't remove LV on old secondary: %s", msg,
5527 hint="Cleanup stale volumes by hand")
5529 def Exec(self, feedback_fn):
5530 """Execute disk replacement.
5532 This dispatches the disk replacement to the appropriate handler.
5535 instance = self.instance
5537 # Activate the instance disks if we're replacing them on a down instance
5538 if not instance.admin_up:
5539 _StartInstanceDisks(self, instance, True)
5541 if self.op.mode == constants.REPLACE_DISK_CHG:
5542 fn = self._ExecD8Secondary
5544 fn = self._ExecD8DiskOnly
5546 ret = fn(feedback_fn)
5548 # Deactivate the instance disks if we're replacing them on a down instance
5549 if not instance.admin_up:
5550 _SafeShutdownInstanceDisks(self, instance)
5555 class LUGrowDisk(LogicalUnit):
5556 """Grow a disk of an instance.
5560 HTYPE = constants.HTYPE_INSTANCE
5561 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5564 def ExpandNames(self):
5565 self._ExpandAndLockInstance()
5566 self.needed_locks[locking.LEVEL_NODE] = []
5567 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5569 def DeclareLocks(self, level):
5570 if level == locking.LEVEL_NODE:
5571 self._LockInstancesNodes()
5573 def BuildHooksEnv(self):
5576 This runs on the master, the primary and all the secondaries.
5580 "DISK": self.op.disk,
5581 "AMOUNT": self.op.amount,
5583 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5585 self.cfg.GetMasterNode(),
5586 self.instance.primary_node,
5590 def CheckPrereq(self):
5591 """Check prerequisites.
5593 This checks that the instance is in the cluster.
5596 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5597 assert instance is not None, \
5598 "Cannot retrieve locked instance %s" % self.op.instance_name
5599 nodenames = list(instance.all_nodes)
5600 for node in nodenames:
5601 _CheckNodeOnline(self, node)
5604 self.instance = instance
5606 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5607 raise errors.OpPrereqError("Instance's disk layout does not support"
5610 self.disk = instance.FindDisk(self.op.disk)
5612 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5613 instance.hypervisor)
5614 for node in nodenames:
5615 info = nodeinfo[node]
5616 if info.failed or not info.data:
5617 raise errors.OpPrereqError("Cannot get current information"
5618 " from node '%s'" % node)
5619 vg_free = info.data.get('vg_free', None)
5620 if not isinstance(vg_free, int):
5621 raise errors.OpPrereqError("Can't compute free disk space on"
5623 if self.op.amount > vg_free:
5624 raise errors.OpPrereqError("Not enough disk space on target node %s:"
5625 " %d MiB available, %d MiB required" %
5626 (node, vg_free, self.op.amount))
5628 def Exec(self, feedback_fn):
5629 """Execute disk grow.
5632 instance = self.instance
5634 for node in instance.all_nodes:
5635 self.cfg.SetDiskID(disk, node)
5636 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5637 msg = result.RemoteFailMsg()
5639 raise errors.OpExecError("Grow request failed to node %s: %s" %
5641 disk.RecordGrow(self.op.amount)
5642 self.cfg.Update(instance)
5643 if self.op.wait_for_sync:
5644 disk_abort = not _WaitForSync(self, instance)
5646 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5647 " status.\nPlease check the instance.")
5650 class LUQueryInstanceData(NoHooksLU):
5651 """Query runtime instance data.
5654 _OP_REQP = ["instances", "static"]
5657 def ExpandNames(self):
5658 self.needed_locks = {}
5659 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5661 if not isinstance(self.op.instances, list):
5662 raise errors.OpPrereqError("Invalid argument type 'instances'")
5664 if self.op.instances:
5665 self.wanted_names = []
5666 for name in self.op.instances:
5667 full_name = self.cfg.ExpandInstanceName(name)
5668 if full_name is None:
5669 raise errors.OpPrereqError("Instance '%s' not known" % name)
5670 self.wanted_names.append(full_name)
5671 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5673 self.wanted_names = None
5674 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5676 self.needed_locks[locking.LEVEL_NODE] = []
5677 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5679 def DeclareLocks(self, level):
5680 if level == locking.LEVEL_NODE:
5681 self._LockInstancesNodes()
5683 def CheckPrereq(self):
5684 """Check prerequisites.
5686 This only checks the optional instance list against the existing names.
5689 if self.wanted_names is None:
5690 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5692 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5693 in self.wanted_names]
5696 def _ComputeDiskStatus(self, instance, snode, dev):
5697 """Compute block device status.
5700 static = self.op.static
5702 self.cfg.SetDiskID(dev, instance.primary_node)
5703 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5704 if dev_pstatus.offline:
5707 msg = dev_pstatus.RemoteFailMsg()
5709 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5710 (instance.name, msg))
5711 dev_pstatus = dev_pstatus.payload
5715 if dev.dev_type in constants.LDS_DRBD:
5716 # we change the snode then (otherwise we use the one passed in)
5717 if dev.logical_id[0] == instance.primary_node:
5718 snode = dev.logical_id[1]
5720 snode = dev.logical_id[0]
5722 if snode and not static:
5723 self.cfg.SetDiskID(dev, snode)
5724 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5725 if dev_sstatus.offline:
5728 msg = dev_sstatus.RemoteFailMsg()
5730 raise errors.OpExecError("Can't compute disk status for %s: %s" %
5731 (instance.name, msg))
5732 dev_sstatus = dev_sstatus.payload
5737 dev_children = [self._ComputeDiskStatus(instance, snode, child)
5738 for child in dev.children]
5743 "iv_name": dev.iv_name,
5744 "dev_type": dev.dev_type,
5745 "logical_id": dev.logical_id,
5746 "physical_id": dev.physical_id,
5747 "pstatus": dev_pstatus,
5748 "sstatus": dev_sstatus,
5749 "children": dev_children,
5756 def Exec(self, feedback_fn):
5757 """Gather and return data"""
5760 cluster = self.cfg.GetClusterInfo()
5762 for instance in self.wanted_instances:
5763 if not self.op.static:
5764 remote_info = self.rpc.call_instance_info(instance.primary_node,
5766 instance.hypervisor)
5768 remote_info = remote_info.data
5769 if remote_info and "state" in remote_info:
5772 remote_state = "down"
5775 if instance.admin_up:
5778 config_state = "down"
5780 disks = [self._ComputeDiskStatus(instance, None, device)
5781 for device in instance.disks]
5784 "name": instance.name,
5785 "config_state": config_state,
5786 "run_state": remote_state,
5787 "pnode": instance.primary_node,
5788 "snodes": instance.secondary_nodes,
5790 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5792 "hypervisor": instance.hypervisor,
5793 "network_port": instance.network_port,
5794 "hv_instance": instance.hvparams,
5795 "hv_actual": cluster.FillHV(instance),
5796 "be_instance": instance.beparams,
5797 "be_actual": cluster.FillBE(instance),
5800 result[instance.name] = idict
5805 class LUSetInstanceParams(LogicalUnit):
5806 """Modifies an instances's parameters.
5809 HPATH = "instance-modify"
5810 HTYPE = constants.HTYPE_INSTANCE
5811 _OP_REQP = ["instance_name"]
5814 def CheckArguments(self):
5815 if not hasattr(self.op, 'nics'):
5817 if not hasattr(self.op, 'disks'):
5819 if not hasattr(self.op, 'beparams'):
5820 self.op.beparams = {}
5821 if not hasattr(self.op, 'hvparams'):
5822 self.op.hvparams = {}
5823 self.op.force = getattr(self.op, "force", False)
5824 if not (self.op.nics or self.op.disks or
5825 self.op.hvparams or self.op.beparams):
5826 raise errors.OpPrereqError("No changes submitted")
5830 for disk_op, disk_dict in self.op.disks:
5831 if disk_op == constants.DDM_REMOVE:
5834 elif disk_op == constants.DDM_ADD:
5837 if not isinstance(disk_op, int):
5838 raise errors.OpPrereqError("Invalid disk index")
5839 if disk_op == constants.DDM_ADD:
5840 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5841 if mode not in constants.DISK_ACCESS_SET:
5842 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5843 size = disk_dict.get('size', None)
5845 raise errors.OpPrereqError("Required disk parameter size missing")
5848 except ValueError, err:
5849 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5851 disk_dict['size'] = size
5853 # modification of disk
5854 if 'size' in disk_dict:
5855 raise errors.OpPrereqError("Disk size change not possible, use"
5858 if disk_addremove > 1:
5859 raise errors.OpPrereqError("Only one disk add or remove operation"
5860 " supported at a time")
5864 for nic_op, nic_dict in self.op.nics:
5865 if nic_op == constants.DDM_REMOVE:
5868 elif nic_op == constants.DDM_ADD:
5871 if not isinstance(nic_op, int):
5872 raise errors.OpPrereqError("Invalid nic index")
5874 # nic_dict should be a dict
5875 nic_ip = nic_dict.get('ip', None)
5876 if nic_ip is not None:
5877 if nic_ip.lower() == constants.VALUE_NONE:
5878 nic_dict['ip'] = None
5880 if not utils.IsValidIP(nic_ip):
5881 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5883 if nic_op == constants.DDM_ADD:
5884 nic_bridge = nic_dict.get('bridge', None)
5885 if nic_bridge is None:
5886 nic_dict['bridge'] = self.cfg.GetDefBridge()
5887 nic_mac = nic_dict.get('mac', None)
5889 nic_dict['mac'] = constants.VALUE_AUTO
5891 if 'mac' in nic_dict:
5892 nic_mac = nic_dict['mac']
5893 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5894 if not utils.IsValidMac(nic_mac):
5895 raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5896 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5897 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5898 " modifying an existing nic")
5900 if nic_addremove > 1:
5901 raise errors.OpPrereqError("Only one NIC add or remove operation"
5902 " supported at a time")
5904 def ExpandNames(self):
5905 self._ExpandAndLockInstance()
5906 self.needed_locks[locking.LEVEL_NODE] = []
5907 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5909 def DeclareLocks(self, level):
5910 if level == locking.LEVEL_NODE:
5911 self._LockInstancesNodes()
5913 def BuildHooksEnv(self):
5916 This runs on the master, primary and secondaries.
5920 if constants.BE_MEMORY in self.be_new:
5921 args['memory'] = self.be_new[constants.BE_MEMORY]
5922 if constants.BE_VCPUS in self.be_new:
5923 args['vcpus'] = self.be_new[constants.BE_VCPUS]
5924 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5925 # information at all.
5928 nic_override = dict(self.op.nics)
5929 for idx, nic in enumerate(self.instance.nics):
5930 if idx in nic_override:
5931 this_nic_override = nic_override[idx]
5933 this_nic_override = {}
5934 if 'ip' in this_nic_override:
5935 ip = this_nic_override['ip']
5938 if 'bridge' in this_nic_override:
5939 bridge = this_nic_override['bridge']
5942 if 'mac' in this_nic_override:
5943 mac = this_nic_override['mac']
5946 args['nics'].append((ip, bridge, mac))
5947 if constants.DDM_ADD in nic_override:
5948 ip = nic_override[constants.DDM_ADD].get('ip', None)
5949 bridge = nic_override[constants.DDM_ADD]['bridge']
5950 mac = nic_override[constants.DDM_ADD]['mac']
5951 args['nics'].append((ip, bridge, mac))
5952 elif constants.DDM_REMOVE in nic_override:
5953 del args['nics'][-1]
5955 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5956 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5959 def CheckPrereq(self):
5960 """Check prerequisites.
5962 This only checks the instance list against the existing names.
5965 force = self.force = self.op.force
5967 # checking the new params on the primary/secondary nodes
5969 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5970 assert self.instance is not None, \
5971 "Cannot retrieve locked instance %s" % self.op.instance_name
5972 pnode = instance.primary_node
5973 nodelist = list(instance.all_nodes)
5975 # hvparams processing
5976 if self.op.hvparams:
5977 i_hvdict = copy.deepcopy(instance.hvparams)
5978 for key, val in self.op.hvparams.iteritems():
5979 if val == constants.VALUE_DEFAULT:
5986 cluster = self.cfg.GetClusterInfo()
5987 utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5988 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5991 hypervisor.GetHypervisor(
5992 instance.hypervisor).CheckParameterSyntax(hv_new)
5993 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5994 self.hv_new = hv_new # the new actual values
5995 self.hv_inst = i_hvdict # the new dict (without defaults)
5997 self.hv_new = self.hv_inst = {}
5999 # beparams processing
6000 if self.op.beparams:
6001 i_bedict = copy.deepcopy(instance.beparams)
6002 for key, val in self.op.beparams.iteritems():
6003 if val == constants.VALUE_DEFAULT:
6010 cluster = self.cfg.GetClusterInfo()
6011 utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6012 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6014 self.be_new = be_new # the new actual values
6015 self.be_inst = i_bedict # the new dict (without defaults)
6017 self.be_new = self.be_inst = {}
6021 if constants.BE_MEMORY in self.op.beparams and not self.force:
6022 mem_check_list = [pnode]
6023 if be_new[constants.BE_AUTO_BALANCE]:
6024 # either we changed auto_balance to yes or it was from before
6025 mem_check_list.extend(instance.secondary_nodes)
6026 instance_info = self.rpc.call_instance_info(pnode, instance.name,
6027 instance.hypervisor)
6028 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6029 instance.hypervisor)
6030 if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6031 # Assume the primary node is unreachable and go ahead
6032 self.warn.append("Can't get info from primary node %s" % pnode)
6034 if not instance_info.failed and instance_info.data:
6035 current_mem = int(instance_info.data['memory'])
6037 # Assume instance not running
6038 # (there is a slight race condition here, but it's not very probable,
6039 # and we have no other way to check)
6041 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6042 nodeinfo[pnode].data['memory_free'])
6044 raise errors.OpPrereqError("This change will prevent the instance"
6045 " from starting, due to %d MB of memory"
6046 " missing on its primary node" % miss_mem)
6048 if be_new[constants.BE_AUTO_BALANCE]:
6049 for node, nres in nodeinfo.iteritems():
6050 if node not in instance.secondary_nodes:
6052 if nres.failed or not isinstance(nres.data, dict):
6053 self.warn.append("Can't get info from secondary node %s" % node)
6054 elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6055 self.warn.append("Not enough memory to failover instance to"
6056 " secondary node %s" % node)
6059 for nic_op, nic_dict in self.op.nics:
6060 if nic_op == constants.DDM_REMOVE:
6061 if not instance.nics:
6062 raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6064 if nic_op != constants.DDM_ADD:
6066 if nic_op < 0 or nic_op >= len(instance.nics):
6067 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6069 (nic_op, len(instance.nics)))
6070 if 'bridge' in nic_dict:
6071 nic_bridge = nic_dict['bridge']
6072 if nic_bridge is None:
6073 raise errors.OpPrereqError('Cannot set the nic bridge to None')
6074 if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6075 msg = ("Bridge '%s' doesn't exist on one of"
6076 " the instance nodes" % nic_bridge)
6078 self.warn.append(msg)
6080 raise errors.OpPrereqError(msg)
6081 if 'mac' in nic_dict:
6082 nic_mac = nic_dict['mac']
6084 raise errors.OpPrereqError('Cannot set the nic mac to None')
6085 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6086 # otherwise generate the mac
6087 nic_dict['mac'] = self.cfg.GenerateMAC()
6089 # or validate/reserve the current one
6090 if self.cfg.IsMacInUse(nic_mac):
6091 raise errors.OpPrereqError("MAC address %s already in use"
6092 " in cluster" % nic_mac)
6095 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6096 raise errors.OpPrereqError("Disk operations not supported for"
6097 " diskless instances")
6098 for disk_op, disk_dict in self.op.disks:
6099 if disk_op == constants.DDM_REMOVE:
6100 if len(instance.disks) == 1:
6101 raise errors.OpPrereqError("Cannot remove the last disk of"
6103 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6104 ins_l = ins_l[pnode]
6105 if ins_l.failed or not isinstance(ins_l.data, list):
6106 raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6107 if instance.name in ins_l.data:
6108 raise errors.OpPrereqError("Instance is running, can't remove"
6111 if (disk_op == constants.DDM_ADD and
6112 len(instance.nics) >= constants.MAX_DISKS):
6113 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6114 " add more" % constants.MAX_DISKS)
6115 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6117 if disk_op < 0 or disk_op >= len(instance.disks):
6118 raise errors.OpPrereqError("Invalid disk index %s, valid values"
6120 (disk_op, len(instance.disks)))
6124 def Exec(self, feedback_fn):
6125 """Modifies an instance.
6127 All parameters take effect only at the next restart of the instance.
6130 # Process here the warnings from CheckPrereq, as we don't have a
6131 # feedback_fn there.
6132 for warn in self.warn:
6133 feedback_fn("WARNING: %s" % warn)
6136 instance = self.instance
6138 for disk_op, disk_dict in self.op.disks:
6139 if disk_op == constants.DDM_REMOVE:
6140 # remove the last disk
6141 device = instance.disks.pop()
6142 device_idx = len(instance.disks)
6143 for node, disk in device.ComputeNodeTree(instance.primary_node):
6144 self.cfg.SetDiskID(disk, node)
6145 msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6147 self.LogWarning("Could not remove disk/%d on node %s: %s,"
6148 " continuing anyway", device_idx, node, msg)
6149 result.append(("disk/%d" % device_idx, "remove"))
6150 elif disk_op == constants.DDM_ADD:
6152 if instance.disk_template == constants.DT_FILE:
6153 file_driver, file_path = instance.disks[0].logical_id
6154 file_path = os.path.dirname(file_path)
6156 file_driver = file_path = None
6157 disk_idx_base = len(instance.disks)
6158 new_disk = _GenerateDiskTemplate(self,
6159 instance.disk_template,
6160 instance.name, instance.primary_node,
6161 instance.secondary_nodes,
6166 instance.disks.append(new_disk)
6167 info = _GetInstanceInfoText(instance)
6169 logging.info("Creating volume %s for instance %s",
6170 new_disk.iv_name, instance.name)
6171 # Note: this needs to be kept in sync with _CreateDisks
6173 for node in instance.all_nodes:
6174 f_create = node == instance.primary_node
6176 _CreateBlockDev(self, node, instance, new_disk,
6177 f_create, info, f_create)
6178 except errors.OpExecError, err:
6179 self.LogWarning("Failed to create volume %s (%s) on"
6181 new_disk.iv_name, new_disk, node, err)
6182 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6183 (new_disk.size, new_disk.mode)))
6185 # change a given disk
6186 instance.disks[disk_op].mode = disk_dict['mode']
6187 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6189 for nic_op, nic_dict in self.op.nics:
6190 if nic_op == constants.DDM_REMOVE:
6191 # remove the last nic
6192 del instance.nics[-1]
6193 result.append(("nic.%d" % len(instance.nics), "remove"))
6194 elif nic_op == constants.DDM_ADD:
6195 # mac and bridge should be set, by now
6196 mac = nic_dict['mac']
6197 bridge = nic_dict['bridge']
6198 new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6200 instance.nics.append(new_nic)
6201 result.append(("nic.%d" % (len(instance.nics) - 1),
6202 "add:mac=%s,ip=%s,bridge=%s" %
6203 (new_nic.mac, new_nic.ip, new_nic.bridge)))
6205 # change a given nic
6206 for key in 'mac', 'ip', 'bridge':
6208 setattr(instance.nics[nic_op], key, nic_dict[key])
6209 result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6212 if self.op.hvparams:
6213 instance.hvparams = self.hv_inst
6214 for key, val in self.op.hvparams.iteritems():
6215 result.append(("hv/%s" % key, val))
6218 if self.op.beparams:
6219 instance.beparams = self.be_inst
6220 for key, val in self.op.beparams.iteritems():
6221 result.append(("be/%s" % key, val))
6223 self.cfg.Update(instance)
6228 class LUQueryExports(NoHooksLU):
6229 """Query the exports list
6232 _OP_REQP = ['nodes']
6235 def ExpandNames(self):
6236 self.needed_locks = {}
6237 self.share_locks[locking.LEVEL_NODE] = 1
6238 if not self.op.nodes:
6239 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6241 self.needed_locks[locking.LEVEL_NODE] = \
6242 _GetWantedNodes(self, self.op.nodes)
6244 def CheckPrereq(self):
6245 """Check prerequisites.
6248 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6250 def Exec(self, feedback_fn):
6251 """Compute the list of all the exported system images.
6254 @return: a dictionary with the structure node->(export-list)
6255 where export-list is a list of the instances exported on
6259 rpcresult = self.rpc.call_export_list(self.nodes)
6261 for node in rpcresult:
6262 if rpcresult[node].failed:
6263 result[node] = False
6265 result[node] = rpcresult[node].data
6270 class LUExportInstance(LogicalUnit):
6271 """Export an instance to an image in the cluster.
6274 HPATH = "instance-export"
6275 HTYPE = constants.HTYPE_INSTANCE
6276 _OP_REQP = ["instance_name", "target_node", "shutdown"]
6279 def ExpandNames(self):
6280 self._ExpandAndLockInstance()
6281 # FIXME: lock only instance primary and destination node
6283 # Sad but true, for now we have do lock all nodes, as we don't know where
6284 # the previous export might be, and and in this LU we search for it and
6285 # remove it from its current node. In the future we could fix this by:
6286 # - making a tasklet to search (share-lock all), then create the new one,
6287 # then one to remove, after
6288 # - removing the removal operation altoghether
6289 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6291 def DeclareLocks(self, level):
6292 """Last minute lock declaration."""
6293 # All nodes are locked anyway, so nothing to do here.
6295 def BuildHooksEnv(self):
6298 This will run on the master, primary node and target node.
6302 "EXPORT_NODE": self.op.target_node,
6303 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6305 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6306 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6307 self.op.target_node]
6310 def CheckPrereq(self):
6311 """Check prerequisites.
6313 This checks that the instance and node names are valid.
6316 instance_name = self.op.instance_name
6317 self.instance = self.cfg.GetInstanceInfo(instance_name)
6318 assert self.instance is not None, \
6319 "Cannot retrieve locked instance %s" % self.op.instance_name
6320 _CheckNodeOnline(self, self.instance.primary_node)
6322 self.dst_node = self.cfg.GetNodeInfo(
6323 self.cfg.ExpandNodeName(self.op.target_node))
6325 if self.dst_node is None:
6326 # This is wrong node name, not a non-locked node
6327 raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6328 _CheckNodeOnline(self, self.dst_node.name)
6329 _CheckNodeNotDrained(self, self.dst_node.name)
6331 # instance disk type verification
6332 for disk in self.instance.disks:
6333 if disk.dev_type == constants.LD_FILE:
6334 raise errors.OpPrereqError("Export not supported for instances with"
6335 " file-based disks")
6337 def Exec(self, feedback_fn):
6338 """Export an instance to an image in the cluster.
6341 instance = self.instance
6342 dst_node = self.dst_node
6343 src_node = instance.primary_node
6344 if self.op.shutdown:
6345 # shutdown the instance, but not the disks
6346 result = self.rpc.call_instance_shutdown(src_node, instance)
6347 msg = result.RemoteFailMsg()
6349 raise errors.OpExecError("Could not shutdown instance %s on"
6351 (instance.name, src_node, msg))
6353 vgname = self.cfg.GetVGName()
6357 # set the disks ID correctly since call_instance_start needs the
6358 # correct drbd minor to create the symlinks
6359 for disk in instance.disks:
6360 self.cfg.SetDiskID(disk, src_node)
6363 for idx, disk in enumerate(instance.disks):
6364 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6365 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6366 if new_dev_name.failed or not new_dev_name.data:
6367 self.LogWarning("Could not snapshot disk/%d on node %s",
6369 snap_disks.append(False)
6371 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6372 logical_id=(vgname, new_dev_name.data),
6373 physical_id=(vgname, new_dev_name.data),
6374 iv_name=disk.iv_name)
6375 snap_disks.append(new_dev)
6378 if self.op.shutdown and instance.admin_up:
6379 result = self.rpc.call_instance_start(src_node, instance, None, None)
6380 msg = result.RemoteFailMsg()
6382 _ShutdownInstanceDisks(self, instance)
6383 raise errors.OpExecError("Could not start instance: %s" % msg)
6385 # TODO: check for size
6387 cluster_name = self.cfg.GetClusterName()
6388 for idx, dev in enumerate(snap_disks):
6390 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6391 instance, cluster_name, idx)
6392 if result.failed or not result.data:
6393 self.LogWarning("Could not export disk/%d from node %s to"
6394 " node %s", idx, src_node, dst_node.name)
6395 msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6397 self.LogWarning("Could not remove snapshot for disk/%d from node"
6398 " %s: %s", idx, src_node, msg)
6400 result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6401 if result.failed or not result.data:
6402 self.LogWarning("Could not finalize export for instance %s on node %s",
6403 instance.name, dst_node.name)
6405 nodelist = self.cfg.GetNodeList()
6406 nodelist.remove(dst_node.name)
6408 # on one-node clusters nodelist will be empty after the removal
6409 # if we proceed the backup would be removed because OpQueryExports
6410 # substitutes an empty list with the full cluster node list.
6412 exportlist = self.rpc.call_export_list(nodelist)
6413 for node in exportlist:
6414 if exportlist[node].failed:
6416 if instance.name in exportlist[node].data:
6417 if not self.rpc.call_export_remove(node, instance.name):
6418 self.LogWarning("Could not remove older export for instance %s"
6419 " on node %s", instance.name, node)
6422 class LURemoveExport(NoHooksLU):
6423 """Remove exports related to the named instance.
6426 _OP_REQP = ["instance_name"]
6429 def ExpandNames(self):
6430 self.needed_locks = {}
6431 # We need all nodes to be locked in order for RemoveExport to work, but we
6432 # don't need to lock the instance itself, as nothing will happen to it (and
6433 # we can remove exports also for a removed instance)
6434 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6436 def CheckPrereq(self):
6437 """Check prerequisites.
6441 def Exec(self, feedback_fn):
6442 """Remove any export.
6445 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6446 # If the instance was not found we'll try with the name that was passed in.
6447 # This will only work if it was an FQDN, though.
6449 if not instance_name:
6451 instance_name = self.op.instance_name
6453 exportlist = self.rpc.call_export_list(self.acquired_locks[
6454 locking.LEVEL_NODE])
6456 for node in exportlist:
6457 if exportlist[node].failed:
6458 self.LogWarning("Failed to query node %s, continuing" % node)
6460 if instance_name in exportlist[node].data:
6462 result = self.rpc.call_export_remove(node, instance_name)
6463 if result.failed or not result.data:
6464 logging.error("Could not remove export for instance %s"
6465 " on node %s", instance_name, node)
6467 if fqdn_warn and not found:
6468 feedback_fn("Export not found. If trying to remove an export belonging"
6469 " to a deleted instance please use its Fully Qualified"
6473 class TagsLU(NoHooksLU):
6476 This is an abstract class which is the parent of all the other tags LUs.
6480 def ExpandNames(self):
6481 self.needed_locks = {}
6482 if self.op.kind == constants.TAG_NODE:
6483 name = self.cfg.ExpandNodeName(self.op.name)
6485 raise errors.OpPrereqError("Invalid node name (%s)" %
6488 self.needed_locks[locking.LEVEL_NODE] = name
6489 elif self.op.kind == constants.TAG_INSTANCE:
6490 name = self.cfg.ExpandInstanceName(self.op.name)
6492 raise errors.OpPrereqError("Invalid instance name (%s)" %
6495 self.needed_locks[locking.LEVEL_INSTANCE] = name
6497 def CheckPrereq(self):
6498 """Check prerequisites.
6501 if self.op.kind == constants.TAG_CLUSTER:
6502 self.target = self.cfg.GetClusterInfo()
6503 elif self.op.kind == constants.TAG_NODE:
6504 self.target = self.cfg.GetNodeInfo(self.op.name)
6505 elif self.op.kind == constants.TAG_INSTANCE:
6506 self.target = self.cfg.GetInstanceInfo(self.op.name)
6508 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6512 class LUGetTags(TagsLU):
6513 """Returns the tags of a given object.
6516 _OP_REQP = ["kind", "name"]
6519 def Exec(self, feedback_fn):
6520 """Returns the tag list.
6523 return list(self.target.GetTags())
6526 class LUSearchTags(NoHooksLU):
6527 """Searches the tags for a given pattern.
6530 _OP_REQP = ["pattern"]
6533 def ExpandNames(self):
6534 self.needed_locks = {}
6536 def CheckPrereq(self):
6537 """Check prerequisites.
6539 This checks the pattern passed for validity by compiling it.
6543 self.re = re.compile(self.op.pattern)
6544 except re.error, err:
6545 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6546 (self.op.pattern, err))
6548 def Exec(self, feedback_fn):
6549 """Returns the tag list.
6553 tgts = [("/cluster", cfg.GetClusterInfo())]
6554 ilist = cfg.GetAllInstancesInfo().values()
6555 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6556 nlist = cfg.GetAllNodesInfo().values()
6557 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6559 for path, target in tgts:
6560 for tag in target.GetTags():
6561 if self.re.search(tag):
6562 results.append((path, tag))
6566 class LUAddTags(TagsLU):
6567 """Sets a tag on a given object.
6570 _OP_REQP = ["kind", "name", "tags"]
6573 def CheckPrereq(self):
6574 """Check prerequisites.
6576 This checks the type and length of the tag name and value.
6579 TagsLU.CheckPrereq(self)
6580 for tag in self.op.tags:
6581 objects.TaggableObject.ValidateTag(tag)
6583 def Exec(self, feedback_fn):
6588 for tag in self.op.tags:
6589 self.target.AddTag(tag)
6590 except errors.TagError, err:
6591 raise errors.OpExecError("Error while setting tag: %s" % str(err))
6593 self.cfg.Update(self.target)
6594 except errors.ConfigurationError:
6595 raise errors.OpRetryError("There has been a modification to the"
6596 " config file and the operation has been"
6597 " aborted. Please retry.")
6600 class LUDelTags(TagsLU):
6601 """Delete a list of tags from a given object.
6604 _OP_REQP = ["kind", "name", "tags"]
6607 def CheckPrereq(self):
6608 """Check prerequisites.
6610 This checks that we have the given tag.
6613 TagsLU.CheckPrereq(self)
6614 for tag in self.op.tags:
6615 objects.TaggableObject.ValidateTag(tag)
6616 del_tags = frozenset(self.op.tags)
6617 cur_tags = self.target.GetTags()
6618 if not del_tags <= cur_tags:
6619 diff_tags = del_tags - cur_tags
6620 diff_names = ["'%s'" % tag for tag in diff_tags]
6622 raise errors.OpPrereqError("Tag(s) %s not found" %
6623 (",".join(diff_names)))
6625 def Exec(self, feedback_fn):
6626 """Remove the tag from the object.
6629 for tag in self.op.tags:
6630 self.target.RemoveTag(tag)
6632 self.cfg.Update(self.target)
6633 except errors.ConfigurationError:
6634 raise errors.OpRetryError("There has been a modification to the"
6635 " config file and the operation has been"
6636 " aborted. Please retry.")
6639 class LUTestDelay(NoHooksLU):
6640 """Sleep for a specified amount of time.
6642 This LU sleeps on the master and/or nodes for a specified amount of
6646 _OP_REQP = ["duration", "on_master", "on_nodes"]
6649 def ExpandNames(self):
6650 """Expand names and set required locks.
6652 This expands the node list, if any.
6655 self.needed_locks = {}
6656 if self.op.on_nodes:
6657 # _GetWantedNodes can be used here, but is not always appropriate to use
6658 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6660 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6661 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6663 def CheckPrereq(self):
6664 """Check prerequisites.
6668 def Exec(self, feedback_fn):
6669 """Do the actual sleep.
6672 if self.op.on_master:
6673 if not utils.TestDelay(self.op.duration):
6674 raise errors.OpExecError("Error during master delay test")
6675 if self.op.on_nodes:
6676 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6678 raise errors.OpExecError("Complete failure from rpc call")
6679 for node, node_result in result.items():
6681 if not node_result.data:
6682 raise errors.OpExecError("Failure during rpc call to node %s,"
6683 " result: %s" % (node, node_result.data))
6686 class IAllocator(object):
6687 """IAllocator framework.
6689 An IAllocator instance has three sets of attributes:
6690 - cfg that is needed to query the cluster
6691 - input data (all members of the _KEYS class attribute are required)
6692 - four buffer attributes (in|out_data|text), that represent the
6693 input (to the external script) in text and data structure format,
6694 and the output from it, again in two formats
6695 - the result variables from the script (success, info, nodes) for
6700 "mem_size", "disks", "disk_template",
6701 "os", "tags", "nics", "vcpus", "hypervisor",
6707 def __init__(self, lu, mode, name, **kwargs):
6709 # init buffer variables
6710 self.in_text = self.out_text = self.in_data = self.out_data = None
6711 # init all input fields so that pylint is happy
6714 self.mem_size = self.disks = self.disk_template = None
6715 self.os = self.tags = self.nics = self.vcpus = None
6716 self.hypervisor = None
6717 self.relocate_from = None
6719 self.required_nodes = None
6720 # init result fields
6721 self.success = self.info = self.nodes = None
6722 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6723 keyset = self._ALLO_KEYS
6724 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6725 keyset = self._RELO_KEYS
6727 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6728 " IAllocator" % self.mode)
6730 if key not in keyset:
6731 raise errors.ProgrammerError("Invalid input parameter '%s' to"
6732 " IAllocator" % key)
6733 setattr(self, key, kwargs[key])
6735 if key not in kwargs:
6736 raise errors.ProgrammerError("Missing input parameter '%s' to"
6737 " IAllocator" % key)
6738 self._BuildInputData()
6740 def _ComputeClusterData(self):
6741 """Compute the generic allocator input data.
6743 This is the data that is independent of the actual operation.
6747 cluster_info = cfg.GetClusterInfo()
6750 "version": constants.IALLOCATOR_VERSION,
6751 "cluster_name": cfg.GetClusterName(),
6752 "cluster_tags": list(cluster_info.GetTags()),
6753 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6754 # we don't have job IDs
6756 iinfo = cfg.GetAllInstancesInfo().values()
6757 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6761 node_list = cfg.GetNodeList()
6763 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6764 hypervisor_name = self.hypervisor
6765 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6766 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6768 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6770 node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6771 cluster_info.enabled_hypervisors)
6772 for nname, nresult in node_data.items():
6773 # first fill in static (config-based) values
6774 ninfo = cfg.GetNodeInfo(nname)
6776 "tags": list(ninfo.GetTags()),
6777 "primary_ip": ninfo.primary_ip,
6778 "secondary_ip": ninfo.secondary_ip,
6779 "offline": ninfo.offline,
6780 "drained": ninfo.drained,
6781 "master_candidate": ninfo.master_candidate,
6784 if not ninfo.offline:
6786 if not isinstance(nresult.data, dict):
6787 raise errors.OpExecError("Can't get data for node %s" % nname)
6788 remote_info = nresult.data
6789 for attr in ['memory_total', 'memory_free', 'memory_dom0',
6790 'vg_size', 'vg_free', 'cpu_total']:
6791 if attr not in remote_info:
6792 raise errors.OpExecError("Node '%s' didn't return attribute"
6793 " '%s'" % (nname, attr))
6795 remote_info[attr] = int(remote_info[attr])
6796 except ValueError, err:
6797 raise errors.OpExecError("Node '%s' returned invalid value"
6798 " for '%s': %s" % (nname, attr, err))
6799 # compute memory used by primary instances
6800 i_p_mem = i_p_up_mem = 0
6801 for iinfo, beinfo in i_list:
6802 if iinfo.primary_node == nname:
6803 i_p_mem += beinfo[constants.BE_MEMORY]
6804 if iinfo.name not in node_iinfo[nname].data:
6807 i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6808 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6809 remote_info['memory_free'] -= max(0, i_mem_diff)
6812 i_p_up_mem += beinfo[constants.BE_MEMORY]
6814 # compute memory used by instances
6816 "total_memory": remote_info['memory_total'],
6817 "reserved_memory": remote_info['memory_dom0'],
6818 "free_memory": remote_info['memory_free'],
6819 "total_disk": remote_info['vg_size'],
6820 "free_disk": remote_info['vg_free'],
6821 "total_cpus": remote_info['cpu_total'],
6822 "i_pri_memory": i_p_mem,
6823 "i_pri_up_memory": i_p_up_mem,
6827 node_results[nname] = pnr
6828 data["nodes"] = node_results
6832 for iinfo, beinfo in i_list:
6833 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6834 for n in iinfo.nics]
6836 "tags": list(iinfo.GetTags()),
6837 "admin_up": iinfo.admin_up,
6838 "vcpus": beinfo[constants.BE_VCPUS],
6839 "memory": beinfo[constants.BE_MEMORY],
6841 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6843 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6844 "disk_template": iinfo.disk_template,
6845 "hypervisor": iinfo.hypervisor,
6847 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6849 instance_data[iinfo.name] = pir
6851 data["instances"] = instance_data
6855 def _AddNewInstance(self):
6856 """Add new instance data to allocator structure.
6858 This in combination with _AllocatorGetClusterData will create the
6859 correct structure needed as input for the allocator.
6861 The checks for the completeness of the opcode must have already been
6867 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6869 if self.disk_template in constants.DTS_NET_MIRROR:
6870 self.required_nodes = 2
6872 self.required_nodes = 1
6876 "disk_template": self.disk_template,
6879 "vcpus": self.vcpus,
6880 "memory": self.mem_size,
6881 "disks": self.disks,
6882 "disk_space_total": disk_space,
6884 "required_nodes": self.required_nodes,
6886 data["request"] = request
6888 def _AddRelocateInstance(self):
6889 """Add relocate instance data to allocator structure.
6891 This in combination with _IAllocatorGetClusterData will create the
6892 correct structure needed as input for the allocator.
6894 The checks for the completeness of the opcode must have already been
6898 instance = self.lu.cfg.GetInstanceInfo(self.name)
6899 if instance is None:
6900 raise errors.ProgrammerError("Unknown instance '%s' passed to"
6901 " IAllocator" % self.name)
6903 if instance.disk_template not in constants.DTS_NET_MIRROR:
6904 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6906 if len(instance.secondary_nodes) != 1:
6907 raise errors.OpPrereqError("Instance has not exactly one secondary node")
6909 self.required_nodes = 1
6910 disk_sizes = [{'size': disk.size} for disk in instance.disks]
6911 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6916 "disk_space_total": disk_space,
6917 "required_nodes": self.required_nodes,
6918 "relocate_from": self.relocate_from,
6920 self.in_data["request"] = request
6922 def _BuildInputData(self):
6923 """Build input data structures.
6926 self._ComputeClusterData()
6928 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6929 self._AddNewInstance()
6931 self._AddRelocateInstance()
6933 self.in_text = serializer.Dump(self.in_data)
6935 def Run(self, name, validate=True, call_fn=None):
6936 """Run an instance allocator and return the results.
6940 call_fn = self.lu.rpc.call_iallocator_runner
6943 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6946 if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6947 raise errors.OpExecError("Invalid result from master iallocator runner")
6949 rcode, stdout, stderr, fail = result.data
6951 if rcode == constants.IARUN_NOTFOUND:
6952 raise errors.OpExecError("Can't find allocator '%s'" % name)
6953 elif rcode == constants.IARUN_FAILURE:
6954 raise errors.OpExecError("Instance allocator call failed: %s,"
6955 " output: %s" % (fail, stdout+stderr))
6956 self.out_text = stdout
6958 self._ValidateResult()
6960 def _ValidateResult(self):
6961 """Process the allocator results.
6963 This will process and if successful save the result in
6964 self.out_data and the other parameters.
6968 rdict = serializer.Load(self.out_text)
6969 except Exception, err:
6970 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6972 if not isinstance(rdict, dict):
6973 raise errors.OpExecError("Can't parse iallocator results: not a dict")
6975 for key in "success", "info", "nodes":
6976 if key not in rdict:
6977 raise errors.OpExecError("Can't parse iallocator results:"
6978 " missing key '%s'" % key)
6979 setattr(self, key, rdict[key])
6981 if not isinstance(rdict["nodes"], list):
6982 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6984 self.out_data = rdict
6987 class LUTestAllocator(NoHooksLU):
6988 """Run allocator tests.
6990 This LU runs the allocator tests
6993 _OP_REQP = ["direction", "mode", "name"]
6995 def CheckPrereq(self):
6996 """Check prerequisites.
6998 This checks the opcode parameters depending on the director and mode test.
7001 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7002 for attr in ["name", "mem_size", "disks", "disk_template",
7003 "os", "tags", "nics", "vcpus"]:
7004 if not hasattr(self.op, attr):
7005 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7007 iname = self.cfg.ExpandInstanceName(self.op.name)
7008 if iname is not None:
7009 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7011 if not isinstance(self.op.nics, list):
7012 raise errors.OpPrereqError("Invalid parameter 'nics'")
7013 for row in self.op.nics:
7014 if (not isinstance(row, dict) or
7017 "bridge" not in row):
7018 raise errors.OpPrereqError("Invalid contents of the"
7019 " 'nics' parameter")
7020 if not isinstance(self.op.disks, list):
7021 raise errors.OpPrereqError("Invalid parameter 'disks'")
7022 for row in self.op.disks:
7023 if (not isinstance(row, dict) or
7024 "size" not in row or
7025 not isinstance(row["size"], int) or
7026 "mode" not in row or
7027 row["mode"] not in ['r', 'w']):
7028 raise errors.OpPrereqError("Invalid contents of the"
7029 " 'disks' parameter")
7030 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7031 self.op.hypervisor = self.cfg.GetHypervisorType()
7032 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7033 if not hasattr(self.op, "name"):
7034 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7035 fname = self.cfg.ExpandInstanceName(self.op.name)
7037 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7039 self.op.name = fname
7040 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7042 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7045 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7046 if not hasattr(self.op, "allocator") or self.op.allocator is None:
7047 raise errors.OpPrereqError("Missing allocator name")
7048 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7049 raise errors.OpPrereqError("Wrong allocator test '%s'" %
7052 def Exec(self, feedback_fn):
7053 """Run the allocator test.
7056 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7057 ial = IAllocator(self,
7060 mem_size=self.op.mem_size,
7061 disks=self.op.disks,
7062 disk_template=self.op.disk_template,
7066 vcpus=self.op.vcpus,
7067 hypervisor=self.op.hypervisor,
7070 ial = IAllocator(self,
7073 relocate_from=list(self.relocate_from),
7076 if self.op.direction == constants.IALLOCATOR_DIR_IN:
7077 result = ial.in_text
7079 ial.Run(self.op.allocator, validate=False)
7080 result = ial.out_text